[Swift-commit] r5321 - trunk/bin/grid
ketan at ci.uchicago.edu
ketan at ci.uchicago.edu
Tue Nov 29 10:31:59 CST 2011
Author: ketan
Date: 2011-11-29 10:31:58 -0600 (Tue, 29 Nov 2011)
New Revision: 5321
Modified:
trunk/bin/grid/gwms-swift-workers
Log:
adjusting parameters in gwms worker
Modified: trunk/bin/grid/gwms-swift-workers
===================================================================
--- trunk/bin/grid/gwms-swift-workers 2011-11-29 16:23:35 UTC (rev 5320)
+++ trunk/bin/grid/gwms-swift-workers 2011-11-29 16:31:58 UTC (rev 5321)
@@ -2,92 +2,75 @@
$stdout.sync = true
-# FIXME: Is this the best way to get lib functions from bin/grid into RUBYLIB ?
-
$:[$:.length] = File.dirname($0)
require 'mk_catalog'
require 'etc'
class Site
- attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
- attr_reader :submit_file
+ attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
+ attr_reader :submit_file
-# executable = <%= @app_dir %>/worker.pl # FIXME (below)
+ def gen_submit(count = 1)
-# transfer_executable = True
+ ov=$VERBOSE
+ $VERBOSE=nil
+ workerExecutable = `which worker.pl`
+ workerWrapper = `which run-worker.sh`
+ $VERBOSE=ov
+ workerContact = ARGV[2]
-# executable = /home/wilde/swift/src/0.92/cog/modules/swift/dist/swift-svn/bin/worker.pl
-# arguments = http://128.135.125.17:<%= port %> <%= name %> /tmp 14400
+ job = %Q[
+ universe = vanilla
+ stream_output = False
+ stream_error = False
+ transfer_executable = true
+ periodic_remove = JobStatus == 5
+ notification = NEVER
-# WORKER_LOGGING_LEVEL=$LOGLEVEL $HOME/swift_gridtools/worker.pl $SERVICEURL swork${worker} $LOGDIR >& /dev/null &
+ #globus_rsl = (maxwalltime=240)
+ #grid_resource = <%= @grid_resource %>
+ executable = #{workerWrapper}
+ arguments = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
+ environment = WORKER_LOGGING_LEVEL=DEBUG
+ Input = #{workerExecutable}
+ Error = condor/$(Process).err
+ Output = condor/$(Process).out
+ log = condor.log
- def gen_submit(count = 1)
-
- ov=$VERBOSE
- $VERBOSE=nil
- workerExecutable = `which worker.pl`
- workerWrapper = `which run-worker.sh`
- $VERBOSE=ov
-# workerContact = "http://communicado.ci.uchicago.edu:36906"
- workerContact = ARGV[2]
-
-# for submit file: log = condor.log
-# <% count.times { %>queue
-# <% } %>
-# log = condor/$(Process).log
-
- job = %Q[
- universe = grid
- stream_output = False
- stream_error = False
- transfer_executable = true
- periodic_remove = JobStatus == 5
- notification = NEVER
-
- globus_rsl = (maxwalltime=240)
- grid_resource = <%= @grid_resource %>
- executable = #{workerWrapper}
- arguments = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
- environment = WORKER_LOGGING_LEVEL=DEBUG
- Input = #{workerExecutable}
- Error = condor/$(Process).err
- Output = condor/$(Process).out
- log = condor.log
-
- queue #{count}
+ queue #{count}
]
- ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
- end
- def submit_job(count)
- puts "submit_job: Submitting #{@name} #{count} jobs"
- count = count.to_i
- output = ""
- submitfile = gen_submit(count)
- IO.popen("condor_submit", "w+") do |submit|
- submit.puts submitfile
- submit.close_write
- output = submit.read
+ ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
end
- output
- end
+ def submit_job(count)
+ puts "submit_job: Submitting #{@name} #{count} jobs"
+ count = count.to_i
+ output = ""
+ submitfile = gen_submit(count)
+ IO.popen("condor_submit", "w+") do |submit|
+ submit.puts submitfile
+ submit.close_write
+ output = submit.read
+ end
+ output
+ end
- def queued
- ov=$VERBOSE
- $VERBOSE=nil
- jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
- $VERBOSE=ov
- jobs.split(" ").size
- end
+ def queued
+ ov=$VERBOSE
+ $VERBOSE=nil
+ jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
+ $VERBOSE=ov
+ jobs.split(" ").size
+ end
- def running
- ov=$VERBOSE
- $VERBOSE=nil
- jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
- $VERBOSE=ov
- jobs.split(" ").size
- end
+ def running
+ ov=$VERBOSE
+ $VERBOSE=nil
+ jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
+ $VERBOSE=ov
+ jobs.split(" ").size
+ end
end
@@ -105,99 +88,99 @@
=end
if __FILE__ == $0
- raise "No greenlist file" if !ARGV[0]
+ raise "No greenlist file" if !ARGV[0]
- start_port = 61100 # FIXME
- ctr = 0
- threads = []
- ARGV[1] = "scec" if !ARGV[1]
- greenlist = IO.readlines(ARGV[0]).map { |line| line.chomp! }
- $username = Etc.getlogin
+ start_port = 61100 # FIXME
+ ctr = 0
+ threads = []
+ ARGV[1] = "scec" if !ARGV[1]
+ greenlist = IO.readlines(ARGV[0]).map { |line| line.chomp! }
+ $username = Etc.getlogin
- puts "Username = #{$username}"
+ puts "Username = #{$username}"
- minSiteJobs = 2
- paddedDemand = 0
- swiftDemand = 0
- totalCores = 0
- totalRunning = 0
+ minSiteJobs = 2
+ paddedDemand = 0
+ swiftDemand = 0
+ totalCores = 0
+ totalRunning = 0
- ress_parse(ARGV[1]) do |name, value|
- next if not greenlist.index(name) and not greenlist.empty?
- totalCores += (value.throttle * 100 + 2).to_i
- end
- puts "totalCores for green sites = #{totalCores}"
+ ress_parse(ARGV[1]) do |name, value|
+ next if not greenlist.index(name) and not greenlist.empty?
+ totalCores += (value.throttle * 100 + 2).to_i
+ end
+ puts "totalCores for green sites = #{totalCores}"
- demandThread = Thread.new("monitor-demand") do |t|
- puts "starting demand thread"
- while true do
- puts "in demand thread"
- swiftDemand = IO.read("swiftDemand").to_i # Replace this with sensor of Swift demand
- # swiftDemand = 15
- paddedDemand = (swiftDemand * 1.2).to_i
- ov=$VERBOSE;$VERBOSE=nil
- totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
- $VERBOSE=ov
- puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
- sleep 60
+ demandThread = Thread.new("monitor-demand") do |t|
+ puts "starting demand thread"
+ while true do
+ puts "in demand thread"
+ swiftDemand = IO.read("swiftDemand").to_i # Replace this with sensor of Swift demand
+ # swiftDemand = 15
+ paddedDemand = (swiftDemand * 1.2).to_i
+ ov=$VERBOSE;$VERBOSE=nil
+ totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
+ $VERBOSE=ov
+ puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
+ sleep 60
+ end
end
- end
- ress_parse(ARGV[1]) do |name, value|
- next if not greenlist.index(name) and not greenlist.empty?
- site = Site.new
- site.name = name
- site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
- #site.grid_resource = "fork"
- site.gridftp = "gsiftp://#{value.url}"
- site.app_dir = value.app_dir
- site.data_dir = value.data_dir
- site.port = start_port + ctr
+ ress_parse(ARGV[1]) do |name, value|
+ next if not greenlist.index(name) and not greenlist.empty?
+ site = Site.new
+ site.name = name
+ site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
+ #site.grid_resource = "fork"
+ site.gridftp = "gsiftp://#{value.url}"
+ site.app_dir = value.app_dir
+ site.data_dir = value.data_dir
+ site.port = start_port + ctr
- # local per-site attributes:
+ # local per-site attributes:
- cores = (value.throttle * 100 + 2).to_i
- siteFraction = cores.to_f / totalCores.to_f
- siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
- siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+ cores = (value.throttle * 100 + 2).to_i
+ siteFraction = cores.to_f / totalCores.to_f
+ siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+ siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
- printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
- targetQueued = 3
+ printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
+ targetQueued = 3
- site.gen_submit
+ site.gen_submit
- threads << Thread.new(name) do |job|
- trip=0
- while true do
- if ( (swiftDemand) > totalRunning ) then
- # demands > running: enforce N-queued algorithm
- queued = site.queued
- running = site.running
- printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
- if (running+queued) == 0 then
- newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
- printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
- trip, name, newJobs, siteFraction * 100, paddedDemand
- site.submit_job(newJobs)
- elsif queued == 0 then
- toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
- printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
- trip, name, queued, targetQueued, toRun
- site.submit_job(toRun)
- elsif queued < targetQueued
- printf "trip %d site %s queued %d below target %d - submitting %d\n",
- trip, name, queued, targetQueued, targetQueued-queued
- site.submit_job(targetQueued - queued)
- end
- trip += 1
- # puts "#{name}: #{total}"
+ threads << Thread.new(name) do |job|
+ trip=0
+ while true do
+ if ( (swiftDemand) > totalRunning ) then
+ # demands > running: enforce N-queued algorithm
+ queued = site.queued
+ running = site.running
+ printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
+ if (running+queued) == 0 then
+ newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
+ printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
+ trip, name, newJobs, siteFraction * 100, paddedDemand
+ site.submit_job(newJobs)
+ elsif queued == 0 then
+ toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
+ printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
+ trip, name, queued, targetQueued, toRun
+ site.submit_job(toRun)
+ elsif queued < targetQueued
+ printf "trip %d site %s queued %d below target %d - submitting %d\n",
+ trip, name, queued, targetQueued, targetQueued-queued
+ site.submit_job(targetQueued - queued)
+ end
+ trip += 1
+ # puts "#{name}: #{total}"
+ end
+ sleep 60
+ end
end
- sleep 60
- end
- end
- ctr += 1
- end
+ ctr += 1
+ end
end
threads.each { |job| job.join }
puts "All threads completed."
@@ -238,7 +221,7 @@
get #running
-
+
THREAD for each site
while swiftScriptIsRunning
get site.running
More information about the Swift-commit
mailing list