[Swift-commit] r5320 - trunk/bin/grid

ketan at ci.uchicago.edu ketan at ci.uchicago.edu
Tue Nov 29 10:23:35 CST 2011


Author: ketan
Date: 2011-11-29 10:23:35 -0600 (Tue, 29 Nov 2011)
New Revision: 5320

Added:
   trunk/bin/grid/gwms-swift-workers
Log:
added gwms swift workers script

Added: trunk/bin/grid/gwms-swift-workers
===================================================================
--- trunk/bin/grid/gwms-swift-workers	                        (rev 0)
+++ trunk/bin/grid/gwms-swift-workers	2011-11-29 16:23:35 UTC (rev 5320)
@@ -0,0 +1,256 @@
+#! /usr/bin/env ruby
+
+$stdout.sync = true
+
+# FIXME: Is this the best way to get lib functions from bin/grid into RUBYLIB ?
+
+$:[$:.length] = File.dirname($0)
+
+require 'mk_catalog'
+require 'etc'
+
+class Site
+  attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
+  attr_reader :submit_file
+
+#      executable = <%= @app_dir %>/worker.pl  # FIXME (below)
+
+#       transfer_executable = True
+
+#      executable = /home/wilde/swift/src/0.92/cog/modules/swift/dist/swift-svn/bin/worker.pl
+#      arguments = http://128.135.125.17:<%= port %> <%= name %> /tmp 14400
+
+# WORKER_LOGGING_LEVEL=$LOGLEVEL $HOME/swift_gridtools/worker.pl $SERVICEURL swork${worker} $LOGDIR >& /dev/null &
+
+  def gen_submit(count = 1)
+
+    ov=$VERBOSE
+    $VERBOSE=nil
+    workerExecutable = `which worker.pl`
+    workerWrapper = `which run-worker.sh`
+    $VERBOSE=ov
+#   workerContact = "http://communicado.ci.uchicago.edu:36906"
+    workerContact = ARGV[2]
+
+# for submit file:      log = condor.log
+#       <% count.times { %>queue
+#       <% } %>
+#      log     = condor/$(Process).log
+
+    job = %Q[
+      universe = grid
+      stream_output = False
+      stream_error = False
+      transfer_executable = true
+      periodic_remove = JobStatus == 5
+      notification = NEVER
+
+      globus_rsl = (maxwalltime=240)
+      grid_resource = <%= @grid_resource %>
+      executable = #{workerWrapper}
+      arguments = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
+      environment = WORKER_LOGGING_LEVEL=DEBUG
+      Input   = #{workerExecutable}
+      Error   = condor/$(Process).err
+      Output  = condor/$(Process).out
+      log     = condor.log
+
+      queue #{count}
+    ]
+    ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
+  end
+
+  def submit_job(count)
+    puts "submit_job: Submitting #{@name} #{count} jobs"
+    count = count.to_i
+    output = ""
+    submitfile = gen_submit(count)
+    IO.popen("condor_submit", "w+") do |submit|
+      submit.puts submitfile
+      submit.close_write
+      output = submit.read
+    end
+    output
+  end
+
+  def queued
+    ov=$VERBOSE
+    $VERBOSE=nil
+    jobs = `condor_q  #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
+    $VERBOSE=ov
+    jobs.split(" ").size
+  end
+
+  def running
+    ov=$VERBOSE
+    $VERBOSE=nil
+    jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
+    $VERBOSE=ov
+    jobs.split(" ").size
+  end
+
+end
+
+=begin
+# For reference:
+JobStatus in job ClassAds
+
+0	Unexpanded	U
+1	Idle	I
+2	Running	R
+3	Removed	X
+4	Completed	C
+5	Held	H
+6	Submission_err	E
+=end
+
+if __FILE__ == $0
+  raise "No greenlist file" if !ARGV[0]
+
+  start_port = 61100 # FIXME
+  ctr        = 0
+  threads    = []
+  ARGV[1]    = "scec" if !ARGV[1]
+  greenlist  = IO.readlines(ARGV[0]).map { |line| line.chomp! }
+  $username = Etc.getlogin
+
+  puts "Username = #{$username}"
+
+  minSiteJobs = 2
+  paddedDemand = 0
+  swiftDemand = 0
+  totalCores = 0
+  totalRunning = 0
+
+  ress_parse(ARGV[1]) do |name, value|
+    next if not greenlist.index(name) and not greenlist.empty?
+    totalCores += (value.throttle * 100 + 2).to_i
+  end
+  puts "totalCores for green sites = #{totalCores}"
+
+  demandThread = Thread.new("monitor-demand") do |t|
+    puts "starting demand thread"
+    while true do
+      puts "in demand thread"
+      swiftDemand = IO.read("swiftDemand").to_i  # Replace this with sensor of Swift demand
+      # swiftDemand = 15
+      paddedDemand = (swiftDemand * 1.2).to_i
+      ov=$VERBOSE;$VERBOSE=nil
+      totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
+      $VERBOSE=ov
+      puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
+      sleep 60
+    end
+  end
+
+  ress_parse(ARGV[1]) do |name, value|
+    next if not greenlist.index(name) and not greenlist.empty?
+    site               = Site.new
+    site.name          = name
+    site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
+    #site.grid_resource = "fork"
+    site.gridftp       = "gsiftp://#{value.url}"
+    site.app_dir       = value.app_dir
+    site.data_dir      = value.data_dir
+    site.port          = start_port + ctr
+
+    # local per-site attributes:
+
+    cores = (value.throttle * 100 + 2).to_i
+    siteFraction = cores.to_f / totalCores.to_f
+    siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+    siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+
+    printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
+    targetQueued = 3
+
+    site.gen_submit
+
+    threads << Thread.new(name) do |job|
+      trip=0
+      while true do
+        if ( (swiftDemand) > totalRunning ) then
+          # demands > running: enforce N-queued algorithm
+          queued = site.queued
+          running = site.running
+          printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
+          if (running+queued) == 0 then
+            newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
+            printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
+              trip, name, newJobs, siteFraction * 100, paddedDemand
+            site.submit_job(newJobs)
+          elsif queued == 0 then
+            toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
+            printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
+              trip, name, queued, targetQueued, toRun
+            site.submit_job(toRun)
+          elsif queued < targetQueued
+            printf "trip %d site %s queued %d below target %d - submitting %d\n",
+              trip, name, queued, targetQueued, targetQueued-queued
+            site.submit_job(targetQueued - queued)
+          end
+          trip += 1
+          # puts "#{name}: #{total}"
+        end
+      sleep 60
+      end
+    end
+
+    ctr += 1
+  end
+end
+threads.each { |job| job.join }
+puts "All threads completed."
+
+# TODO:
+#
+# tag jobs for each run uniquely, and track them as a unique factory instance
+#
+
+=begin
+
+"Keep N Queued" Algorithm
+
+Goal:
+- monitor a running swift script to track its changing demand for cores
+- increase the # of running workers to meet the demand
+- let workers that are idle time out when supply is greater than demand
+
+Initially:
+- set a constant demand
+- determine #cores at each site
+
+initialPressure = 1.2  # increase demand 
+initialDemand = 50     # initial demand prior to first poll of Swift, to prime the worker pool ahead of Swift demand
+
+- set a constant target queued for each site based on ncores
+- set a target #running 
+
+THREAD 0:
+  demand = initialDemand
+  for each site
+    site.need = (site.cores/totalcores) * demand
+  sleep delay
+
+
+  while swiftScriptIsRunning
+    get demand
+    get #running
+
+
+  
+THREAD for each site
+  while swiftScriptIsRunning
+    get site.running
+    get set.queued  
+    need = demand - running
+    if need > 0
+      if running+queued = 0
+
+keep queued on each site:
+ max( expectation, 50% of observation )
+
+     toalc=1000 
+     sitec = 200 20% d=100 ex=20 q=20
+     r=50 q=25
+=end


Property changes on: trunk/bin/grid/gwms-swift-workers
___________________________________________________________________
Added: svn:executable
   + *




More information about the Swift-commit mailing list