[Swift-commit] r5189 - trunk/bin/grid

ketan at ci.uchicago.edu ketan at ci.uchicago.edu
Wed Sep 28 15:25:15 CDT 2011


Author: ketan
Date: 2011-09-28 15:25:15 -0500 (Wed, 28 Sep 2011)
New Revision: 5189

Added:
   trunk/bin/grid/my-swift-workers
Log:
temp copy of an improved swift-workers, work in progress

Added: trunk/bin/grid/my-swift-workers
===================================================================
--- trunk/bin/grid/my-swift-workers	                        (rev 0)
+++ trunk/bin/grid/my-swift-workers	2011-09-28 20:25:15 UTC (rev 5189)
@@ -0,0 +1,332 @@
+#! /usr/bin/env ruby
+
+require 'erb'
+require 'mk_catalog'
+require 'etc'
+
+=begin
+- Start the n coaster services: n is the number of sites in the whitelist
+- Build coasters.sites.xml: In order to do this get all the greensites in an array of objects and iterate through them to generate the coasters.xml
+- Start the workers where each worker connects to a servcie
+=end
+
+# **Ketan ARGV[0]=greensites, ARGV[1]=extenci (why?: it's just an appname), ARGV[2]=workercontact in the form of http://Hosturl:port
+$stdout.sync = true
+
+# FIXME: Is this the best way to get lib functions from bin/grid into RUBYLIB ?
+$:[$:.length] = File.dirname($0)
+
+
+class Site
+  attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
+  attr_reader :submit_file
+
+  # **Ketan: Generate the worker job
+  def gen_submit(count = 1)
+
+    ov=$VERBOSE
+    $VERBOSE=nil
+    workerExecutable = `which worker.pl`
+    workerWrapper = `which run-worker.sh`
+    $VERBOSE=ov
+
+    #   workerContact = "http://communicado.ci.uchicago.edu:36906"
+    #   **Ketan: This will change, only read the url and supply port from the service.wports file
+    workerContact = ARGV[2]
+
+    job = %Q[
+      universe = grid
+      stream_output = False
+      stream_error = False
+      transfer_executable = true
+      periodic_remove = JobStatus == 5
+      notification = Never
+
+      globus_rsl    = (maxwalltime=240)
+      grid_resource = <%= @grid_resource %>
+      executable    = #{workerWrapper}
+      arguments     = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
+      environment   = WORKER_LOGGING_LEVEL=INFO
+      Input         = #{workerExecutable}
+      Error         = condor/$(Process).err
+      Output        = condor/$(Process).out
+      log           = condor.log
+
+      queue #{count}
+    ]
+
+    # **Ketan: Does this submits the worker? If yes, this would be required to be submitted for each service.
+    ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
+  
+  end
+  
+  # **Ketan: Submit the above generated job `count` times
+  def submit_job(count)
+    puts "submit_job: Submitting #{@name} #{count} jobs"
+    count = count.to_i
+    output = ""
+    submitfile = gen_submit(count)
+    IO.popen("condor_submit", "w+") do |submit|
+      submit.puts submitfile
+      submit.close_write
+      output = submit.read
+    end
+    output
+  end
+
+  def queued
+    ov=$VERBOSE
+    $VERBOSE=nil
+    jobs = `condor_q  #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
+    $VERBOSE=ov
+    jobs.split(" ").size
+  end
+
+  def running
+    ov=$VERBOSE
+    $VERBOSE=nil
+    jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
+    $VERBOSE=ov
+    jobs.split(" ").size
+  end
+
+end
+
+=begin
+# For reference:
+JobStatus in job ClassAds
+
+0	Unexpanded	U
+1	Idle	I
+2	Running	R
+3	Removed	X
+4	Completed	C
+5	Held	H
+6	Submission_err	E
+=end
+
+# **Ketan: Program execution starts from here
+if __FILE__ == $0
+  raise "No greenlist file" if !ARGV[0]
+
+  # **Ketan: Read the lines in greensites file and start that many services
+  numlines = %x{wc -l #{ARGV[0]}}.split.first.to_i
+
+  # **Ketan: Remove any worker or service ports file present from the previous runs
+  system("rm -f service.sports service.wports")
+
+  system("start-swift-service #{numlines} &")
+
+  service_ports=[]
+  worker_ports=[]
+
+  sleep 40
+
+  sports_file=File.open("service.sports")
+  sports_file.each_line { |line|
+       service_ports.push line
+  }
+
+  sports_file.close
+  wports_file=File.open("service.wports")
+  wports_file.each_line { |line|
+      worker_ports.push line
+  }
+
+  wports_file.close
+ 
+  puts "service ports"
+  puts service_ports
+  puts "============="
+  puts "worker ports"
+  puts worker_ports
+
+  # **Ketan: This will change, should be the first line of service.
+  
+  start_port = 61100 # FIXME
+  #start_port = 
+  ctr        = 0
+  threads    = []
+  ARGV[1]    = "scec" if !ARGV[1]
+  greenlist  = IO.readlines(ARGV[0]).map { |line| line.chomp! }
+  $username = Etc.getlogin
+
+  puts "Username = #{$username}"
+
+  minSiteJobs = 2
+  paddedDemand = 0
+  swiftDemand = 0
+  totalCores = 0
+  totalRunning = 0
+
+  ress_parse(ARGV[1]) do |name, value|
+    next if not greenlist.index(name) and not greenlist.empty?
+    totalCores += (value.throttle * 100 + 2).to_i
+  end
+  
+  puts "totalCores for green sites = #{totalCores}"
+  
+  demandThread = Thread.new("monitor-demand") do |t|
+    puts "starting demand thread"
+    while true do
+      puts "in demand thread"
+      swiftDemand = IO.read("swiftDemand").to_i  # Replace this with sensor of Swift demand
+      # swiftDemand = 15
+      paddedDemand = (swiftDemand * 1.2).to_i
+      ov=$VERBOSE;$VERBOSE=nil
+      totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
+      $VERBOSE=ov
+      puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
+      sleep 60
+    end
+  end
+
+  sites=Array.new
+      
+  ress_parse(ARGV[1]) do |name, value|
+    next if not greenlist.index(name) and not greenlist.empty?
+    site               = Site.new
+    site.name          = name
+    site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
+    site.gridftp       = "gsiftp://#{value.url}"
+    site.app_dir       = value.app_dir
+    site.data_dir      = value.data_dir
+    #site.port          = start_port + ctr
+    site.port          = service_ports[ctr] #**Ketan: assuming this is the worker that needs to connect back to its corresponding service
+    
+    sites.push site
+
+    # **Ketan: can put the coasters_osg.xml template right here.
+   coaster_sites = %q[
+      <config>
+      <% ctr = 0
+        sites.each_key do |name|
+        jm       = sites[name].jm
+        url      = sites[name].url
+        app_dir  = sites[name].app_dir
+        data_dir = sites[name].data_dir
+        throttle = sites[name].throttle
+      %>
+      <pool handle="<%=name%>">
+       <execution provider="coaster-persistent" url="http://localhost:<%= coaster_service + ctr %>" jobmanager="local:local" />
+       <profile namespace="globus" key="workerManager">passive</profile>
+       <profile namespace="karajan" key="initialScore">10000</profile>
+       <profile namespace="karajan" key="jobThrottle"><%=throttle%></profile>
+       <profile namespace="globus" key="jobsPerNode">16</profile>
+       <gridftp  url="gsiftp://<%=url%>"/>
+       <workdirectory><%=data_dir%>/swift_scratch</workdirectory>
+      </pool>
+      <% ctr += 1
+         end 
+      %>
+     </config>
+    ] 
+
+    # local per-site attributes:
+    cores = (value.throttle * 100 + 2).to_i
+    siteFraction = cores.to_f / totalCores.to_f
+    siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+    siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+
+    printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
+    targetQueued = 3
+
+    site.gen_submit
+
+    threads << Thread.new(name) do |job|
+      trip=0
+      while true do
+        if ( (swiftDemand) > totalRunning ) then
+          # demands > running: enforce N-queued algorithm
+          queued = site.queued
+          running = site.running
+          printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
+          if (running+queued) == 0 then
+            newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
+            printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
+              trip, name, newJobs, siteFraction * 100, paddedDemand
+            site.submit_job(newJobs)
+          elsif queued == 0 then
+            toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
+            printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
+              trip, name, queued, targetQueued, toRun
+            site.submit_job(toRun)
+          elsif queued < targetQueued
+            printf "trip %d site %s queued %d below target %d - submitting %d\n",
+              trip, name, queued, targetQueued, targetQueued-queued
+            site.submit_job(targetQueued - queued)
+          end
+          trip += 1
+          # puts "#{name}: #{total}"
+        end
+      sleep 60
+      end
+    end
+    ctr += 1
+  end
+end
+threads.each { |job| job.join }
+puts "All threads completed."
+
+# TODO:
+#
+# tag jobs for each run uniquely, and track them as a unique factory instance
+#
+
+=begin
+
+"Keep N Queued" Algorithm
+
+Goal:
+- monitor a running swift script to track its changing demand for cores
+- increase the # of running workers to meet the demand
+- let workers that are idle time out when supply is greater than demand
+
+Initially:
+- set a constant demand
+- determine #cores at each site
+
+initialPressure = 1.2  # increase demand 
+initialDemand = 50     # initial demand prior to first poll of Swift, to prime the worker pool ahead of Swift demand
+
+- set a constant target queued for each site based on ncores
+- set a target #running 
+
+THREAD 0:
+  demand = initialDemand
+  for each site
+    site.need = (site.cores/totalcores) * demand
+  sleep delay
+
+  while swiftScriptIsRunning
+    get demand
+    get #running
+
+  
+THREAD for each site
+  while swiftScriptIsRunning
+    get site.running
+    get set.queued  
+    need = demand - running
+    if need > 0
+      if running+queued = 0
+
+keep queued on each site:
+ max( expectation, 50% of observation )
+
+     toalc=1000 
+     sitec = 200 20% d=100 ex=20 q=20
+     r=50 q=25
+=end
+
+=begin
+def dump(file, template, binding)
+  file_out = File.open(file, "w")
+  file_out.puts ERB.new(template, 0, "%<>").result(binding)
+  file_out.close
+end
+
+dump("coaster_osg.xml", coaster_sites, binding)
+
+=end
+


Property changes on: trunk/bin/grid/my-swift-workers
___________________________________________________________________
Added: svn:executable
   + *




More information about the Swift-commit mailing list