[Swift-commit] r5189 - trunk/bin/grid
ketan at ci.uchicago.edu
ketan at ci.uchicago.edu
Wed Sep 28 15:25:15 CDT 2011
Author: ketan
Date: 2011-09-28 15:25:15 -0500 (Wed, 28 Sep 2011)
New Revision: 5189
Added:
trunk/bin/grid/my-swift-workers
Log:
temp copy of an improved swift-workers, work in progress
Added: trunk/bin/grid/my-swift-workers
===================================================================
--- trunk/bin/grid/my-swift-workers (rev 0)
+++ trunk/bin/grid/my-swift-workers 2011-09-28 20:25:15 UTC (rev 5189)
@@ -0,0 +1,332 @@
+#! /usr/bin/env ruby
+
+require 'erb'
+require 'mk_catalog'
+require 'etc'
+
+=begin
+- Start the n coaster services: n is the number of sites in the whitelist
+- Build coasters.sites.xml: In order to do this get all the greensites in an array of objects and iterate through them to generate the coasters.xml
+- Start the workers where each worker connects to a servcie
+=end
+
+# **Ketan ARGV[0]=greensites, ARGV[1]=extenci (why?: it's just an appname), ARGV[2]=workercontact in the form of http://Hosturl:port
+$stdout.sync = true
+
+# FIXME: Is this the best way to get lib functions from bin/grid into RUBYLIB ?
+$:[$:.length] = File.dirname($0)
+
+
+class Site
+ attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
+ attr_reader :submit_file
+
+ # **Ketan: Generate the worker job
+ def gen_submit(count = 1)
+
+ ov=$VERBOSE
+ $VERBOSE=nil
+ workerExecutable = `which worker.pl`
+ workerWrapper = `which run-worker.sh`
+ $VERBOSE=ov
+
+ # workerContact = "http://communicado.ci.uchicago.edu:36906"
+ # **Ketan: This will change, only read the url and supply port from the service.wports file
+ workerContact = ARGV[2]
+
+ job = %Q[
+ universe = grid
+ stream_output = False
+ stream_error = False
+ transfer_executable = true
+ periodic_remove = JobStatus == 5
+ notification = Never
+
+ globus_rsl = (maxwalltime=240)
+ grid_resource = <%= @grid_resource %>
+ executable = #{workerWrapper}
+ arguments = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
+ environment = WORKER_LOGGING_LEVEL=INFO
+ Input = #{workerExecutable}
+ Error = condor/$(Process).err
+ Output = condor/$(Process).out
+ log = condor.log
+
+ queue #{count}
+ ]
+
+ # **Ketan: Does this submits the worker? If yes, this would be required to be submitted for each service.
+ ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
+
+ end
+
+ # **Ketan: Submit the above generated job `count` times
+ def submit_job(count)
+ puts "submit_job: Submitting #{@name} #{count} jobs"
+ count = count.to_i
+ output = ""
+ submitfile = gen_submit(count)
+ IO.popen("condor_submit", "w+") do |submit|
+ submit.puts submitfile
+ submit.close_write
+ output = submit.read
+ end
+ output
+ end
+
+ def queued
+ ov=$VERBOSE
+ $VERBOSE=nil
+ jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
+ $VERBOSE=ov
+ jobs.split(" ").size
+ end
+
+ def running
+ ov=$VERBOSE
+ $VERBOSE=nil
+ jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
+ $VERBOSE=ov
+ jobs.split(" ").size
+ end
+
+end
+
+=begin
+# For reference:
+JobStatus in job ClassAds
+
+0 Unexpanded U
+1 Idle I
+2 Running R
+3 Removed X
+4 Completed C
+5 Held H
+6 Submission_err E
+=end
+
+# **Ketan: Program execution starts from here
+if __FILE__ == $0
+ raise "No greenlist file" if !ARGV[0]
+
+ # **Ketan: Read the lines in greensites file and start that many services
+ numlines = %x{wc -l #{ARGV[0]}}.split.first.to_i
+
+ # **Ketan: Remove any worker or service ports file present from the previous runs
+ system("rm -f service.sports service.wports")
+
+ system("start-swift-service #{numlines} &")
+
+ service_ports=[]
+ worker_ports=[]
+
+ sleep 40
+
+ sports_file=File.open("service.sports")
+ sports_file.each_line { |line|
+ service_ports.push line
+ }
+
+ sports_file.close
+ wports_file=File.open("service.wports")
+ wports_file.each_line { |line|
+ worker_ports.push line
+ }
+
+ wports_file.close
+
+ puts "service ports"
+ puts service_ports
+ puts "============="
+ puts "worker ports"
+ puts worker_ports
+
+ # **Ketan: This will change, should be the first line of service.
+
+ start_port = 61100 # FIXME
+ #start_port =
+ ctr = 0
+ threads = []
+ ARGV[1] = "scec" if !ARGV[1]
+ greenlist = IO.readlines(ARGV[0]).map { |line| line.chomp! }
+ $username = Etc.getlogin
+
+ puts "Username = #{$username}"
+
+ minSiteJobs = 2
+ paddedDemand = 0
+ swiftDemand = 0
+ totalCores = 0
+ totalRunning = 0
+
+ ress_parse(ARGV[1]) do |name, value|
+ next if not greenlist.index(name) and not greenlist.empty?
+ totalCores += (value.throttle * 100 + 2).to_i
+ end
+
+ puts "totalCores for green sites = #{totalCores}"
+
+ demandThread = Thread.new("monitor-demand") do |t|
+ puts "starting demand thread"
+ while true do
+ puts "in demand thread"
+ swiftDemand = IO.read("swiftDemand").to_i # Replace this with sensor of Swift demand
+ # swiftDemand = 15
+ paddedDemand = (swiftDemand * 1.2).to_i
+ ov=$VERBOSE;$VERBOSE=nil
+ totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
+ $VERBOSE=ov
+ puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
+ sleep 60
+ end
+ end
+
+ sites=Array.new
+
+ ress_parse(ARGV[1]) do |name, value|
+ next if not greenlist.index(name) and not greenlist.empty?
+ site = Site.new
+ site.name = name
+ site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
+ site.gridftp = "gsiftp://#{value.url}"
+ site.app_dir = value.app_dir
+ site.data_dir = value.data_dir
+ #site.port = start_port + ctr
+ site.port = service_ports[ctr] #**Ketan: assuming this is the worker that needs to connect back to its corresponding service
+
+ sites.push site
+
+ # **Ketan: can put the coasters_osg.xml template right here.
+ coaster_sites = %q[
+ <config>
+ <% ctr = 0
+ sites.each_key do |name|
+ jm = sites[name].jm
+ url = sites[name].url
+ app_dir = sites[name].app_dir
+ data_dir = sites[name].data_dir
+ throttle = sites[name].throttle
+ %>
+ <pool handle="<%=name%>">
+ <execution provider="coaster-persistent" url="http://localhost:<%= coaster_service + ctr %>" jobmanager="local:local" />
+ <profile namespace="globus" key="workerManager">passive</profile>
+ <profile namespace="karajan" key="initialScore">10000</profile>
+ <profile namespace="karajan" key="jobThrottle"><%=throttle%></profile>
+ <profile namespace="globus" key="jobsPerNode">16</profile>
+ <gridftp url="gsiftp://<%=url%>"/>
+ <workdirectory><%=data_dir%>/swift_scratch</workdirectory>
+ </pool>
+ <% ctr += 1
+ end
+ %>
+ </config>
+ ]
+
+ # local per-site attributes:
+ cores = (value.throttle * 100 + 2).to_i
+ siteFraction = cores.to_f / totalCores.to_f
+ siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+ siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+
+ printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
+ targetQueued = 3
+
+ site.gen_submit
+
+ threads << Thread.new(name) do |job|
+ trip=0
+ while true do
+ if ( (swiftDemand) > totalRunning ) then
+ # demands > running: enforce N-queued algorithm
+ queued = site.queued
+ running = site.running
+ printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
+ if (running+queued) == 0 then
+ newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
+ printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
+ trip, name, newJobs, siteFraction * 100, paddedDemand
+ site.submit_job(newJobs)
+ elsif queued == 0 then
+ toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
+ printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
+ trip, name, queued, targetQueued, toRun
+ site.submit_job(toRun)
+ elsif queued < targetQueued
+ printf "trip %d site %s queued %d below target %d - submitting %d\n",
+ trip, name, queued, targetQueued, targetQueued-queued
+ site.submit_job(targetQueued - queued)
+ end
+ trip += 1
+ # puts "#{name}: #{total}"
+ end
+ sleep 60
+ end
+ end
+ ctr += 1
+ end
+end
+threads.each { |job| job.join }
+puts "All threads completed."
+
+# TODO:
+#
+# tag jobs for each run uniquely, and track them as a unique factory instance
+#
+
+=begin
+
+"Keep N Queued" Algorithm
+
+Goal:
+- monitor a running swift script to track its changing demand for cores
+- increase the # of running workers to meet the demand
+- let workers that are idle time out when supply is greater than demand
+
+Initially:
+- set a constant demand
+- determine #cores at each site
+
+initialPressure = 1.2 # increase demand
+initialDemand = 50 # initial demand prior to first poll of Swift, to prime the worker pool ahead of Swift demand
+
+- set a constant target queued for each site based on ncores
+- set a target #running
+
+THREAD 0:
+ demand = initialDemand
+ for each site
+ site.need = (site.cores/totalcores) * demand
+ sleep delay
+
+ while swiftScriptIsRunning
+ get demand
+ get #running
+
+
+THREAD for each site
+ while swiftScriptIsRunning
+ get site.running
+ get set.queued
+ need = demand - running
+ if need > 0
+ if running+queued = 0
+
+keep queued on each site:
+ max( expectation, 50% of observation )
+
+ toalc=1000
+ sitec = 200 20% d=100 ex=20 q=20
+ r=50 q=25
+=end
+
+=begin
+def dump(file, template, binding)
+ file_out = File.open(file, "w")
+ file_out.puts ERB.new(template, 0, "%<>").result(binding)
+ file_out.close
+end
+
+dump("coaster_osg.xml", coaster_sites, binding)
+
+=end
+
Property changes on: trunk/bin/grid/my-swift-workers
___________________________________________________________________
Added: svn:executable
+ *
More information about the Swift-commit
mailing list