[Swift-commit] r5320 - trunk/bin/grid
ketan at ci.uchicago.edu
ketan at ci.uchicago.edu
Tue Nov 29 10:23:35 CST 2011
Author: ketan
Date: 2011-11-29 10:23:35 -0600 (Tue, 29 Nov 2011)
New Revision: 5320
Added:
trunk/bin/grid/gwms-swift-workers
Log:
added gwms swift workers script
Added: trunk/bin/grid/gwms-swift-workers
===================================================================
--- trunk/bin/grid/gwms-swift-workers (rev 0)
+++ trunk/bin/grid/gwms-swift-workers 2011-11-29 16:23:35 UTC (rev 5320)
@@ -0,0 +1,256 @@
+#! /usr/bin/env ruby
+
+$stdout.sync = true
+
+# FIXME: Is this the best way to get lib functions from bin/grid into RUBYLIB ?
+
+$:[$:.length] = File.dirname($0)
+
+require 'mk_catalog'
+require 'etc'
+
+class Site
+ attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
+ attr_reader :submit_file
+
+# executable = <%= @app_dir %>/worker.pl # FIXME (below)
+
+# transfer_executable = True
+
+# executable = /home/wilde/swift/src/0.92/cog/modules/swift/dist/swift-svn/bin/worker.pl
+# arguments = http://128.135.125.17:<%= port %> <%= name %> /tmp 14400
+
+# WORKER_LOGGING_LEVEL=$LOGLEVEL $HOME/swift_gridtools/worker.pl $SERVICEURL swork${worker} $LOGDIR >& /dev/null &
+
+ def gen_submit(count = 1)
+
+ ov=$VERBOSE
+ $VERBOSE=nil
+ workerExecutable = `which worker.pl`
+ workerWrapper = `which run-worker.sh`
+ $VERBOSE=ov
+# workerContact = "http://communicado.ci.uchicago.edu:36906"
+ workerContact = ARGV[2]
+
+# for submit file: log = condor.log
+# <% count.times { %>queue
+# <% } %>
+# log = condor/$(Process).log
+
+ job = %Q[
+ universe = grid
+ stream_output = False
+ stream_error = False
+ transfer_executable = true
+ periodic_remove = JobStatus == 5
+ notification = NEVER
+
+ globus_rsl = (maxwalltime=240)
+ grid_resource = <%= @grid_resource %>
+ executable = #{workerWrapper}
+ arguments = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
+ environment = WORKER_LOGGING_LEVEL=DEBUG
+ Input = #{workerExecutable}
+ Error = condor/$(Process).err
+ Output = condor/$(Process).out
+ log = condor.log
+
+ queue #{count}
+ ]
+ ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
+ end
+
+ def submit_job(count)
+ puts "submit_job: Submitting #{@name} #{count} jobs"
+ count = count.to_i
+ output = ""
+ submitfile = gen_submit(count)
+ IO.popen("condor_submit", "w+") do |submit|
+ submit.puts submitfile
+ submit.close_write
+ output = submit.read
+ end
+ output
+ end
+
+ def queued
+ ov=$VERBOSE
+ $VERBOSE=nil
+ jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
+ $VERBOSE=ov
+ jobs.split(" ").size
+ end
+
+ def running
+ ov=$VERBOSE
+ $VERBOSE=nil
+ jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
+ $VERBOSE=ov
+ jobs.split(" ").size
+ end
+
+end
+
+=begin
+# For reference:
+JobStatus in job ClassAds
+
+0 Unexpanded U
+1 Idle I
+2 Running R
+3 Removed X
+4 Completed C
+5 Held H
+6 Submission_err E
+=end
+
+if __FILE__ == $0
+ raise "No greenlist file" if !ARGV[0]
+
+ start_port = 61100 # FIXME
+ ctr = 0
+ threads = []
+ ARGV[1] = "scec" if !ARGV[1]
+ greenlist = IO.readlines(ARGV[0]).map { |line| line.chomp! }
+ $username = Etc.getlogin
+
+ puts "Username = #{$username}"
+
+ minSiteJobs = 2
+ paddedDemand = 0
+ swiftDemand = 0
+ totalCores = 0
+ totalRunning = 0
+
+ ress_parse(ARGV[1]) do |name, value|
+ next if not greenlist.index(name) and not greenlist.empty?
+ totalCores += (value.throttle * 100 + 2).to_i
+ end
+ puts "totalCores for green sites = #{totalCores}"
+
+ demandThread = Thread.new("monitor-demand") do |t|
+ puts "starting demand thread"
+ while true do
+ puts "in demand thread"
+ swiftDemand = IO.read("swiftDemand").to_i # Replace this with sensor of Swift demand
+ # swiftDemand = 15
+ paddedDemand = (swiftDemand * 1.2).to_i
+ ov=$VERBOSE;$VERBOSE=nil
+ totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
+ $VERBOSE=ov
+ puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
+ sleep 60
+ end
+ end
+
+ ress_parse(ARGV[1]) do |name, value|
+ next if not greenlist.index(name) and not greenlist.empty?
+ site = Site.new
+ site.name = name
+ site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
+ #site.grid_resource = "fork"
+ site.gridftp = "gsiftp://#{value.url}"
+ site.app_dir = value.app_dir
+ site.data_dir = value.data_dir
+ site.port = start_port + ctr
+
+ # local per-site attributes:
+
+ cores = (value.throttle * 100 + 2).to_i
+ siteFraction = cores.to_f / totalCores.to_f
+ siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+ siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+
+ printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
+ targetQueued = 3
+
+ site.gen_submit
+
+ threads << Thread.new(name) do |job|
+ trip=0
+ while true do
+ if ( (swiftDemand) > totalRunning ) then
+ # demands > running: enforce N-queued algorithm
+ queued = site.queued
+ running = site.running
+ printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
+ if (running+queued) == 0 then
+ newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
+ printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
+ trip, name, newJobs, siteFraction * 100, paddedDemand
+ site.submit_job(newJobs)
+ elsif queued == 0 then
+ toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
+ printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
+ trip, name, queued, targetQueued, toRun
+ site.submit_job(toRun)
+ elsif queued < targetQueued
+ printf "trip %d site %s queued %d below target %d - submitting %d\n",
+ trip, name, queued, targetQueued, targetQueued-queued
+ site.submit_job(targetQueued - queued)
+ end
+ trip += 1
+ # puts "#{name}: #{total}"
+ end
+ sleep 60
+ end
+ end
+
+ ctr += 1
+ end
+end
+threads.each { |job| job.join }
+puts "All threads completed."
+
+# TODO:
+#
+# tag jobs for each run uniquely, and track them as a unique factory instance
+#
+
+=begin
+
+"Keep N Queued" Algorithm
+
+Goal:
+- monitor a running swift script to track its changing demand for cores
+- increase the # of running workers to meet the demand
+- let workers that are idle time out when supply is greater than demand
+
+Initially:
+- set a constant demand
+- determine #cores at each site
+
+initialPressure = 1.2 # increase demand
+initialDemand = 50 # initial demand prior to first poll of Swift, to prime the worker pool ahead of Swift demand
+
+- set a constant target queued for each site based on ncores
+- set a target #running
+
+THREAD 0:
+ demand = initialDemand
+ for each site
+ site.need = (site.cores/totalcores) * demand
+ sleep delay
+
+
+ while swiftScriptIsRunning
+ get demand
+ get #running
+
+
+
+THREAD for each site
+ while swiftScriptIsRunning
+ get site.running
+ get set.queued
+ need = demand - running
+ if need > 0
+ if running+queued = 0
+
+keep queued on each site:
+ max( expectation, 50% of observation )
+
+ toalc=1000
+ sitec = 200 20% d=100 ex=20 q=20
+ r=50 q=25
+=end
Property changes on: trunk/bin/grid/gwms-swift-workers
___________________________________________________________________
Added: svn:executable
+ *
More information about the Swift-commit
mailing list