[Swift-commit] r5321 - trunk/bin/grid

ketan at ci.uchicago.edu ketan at ci.uchicago.edu
Tue Nov 29 10:31:59 CST 2011


Author: ketan
Date: 2011-11-29 10:31:58 -0600 (Tue, 29 Nov 2011)
New Revision: 5321

Modified:
   trunk/bin/grid/gwms-swift-workers
Log:
adjusting parameters in gwms worker

Modified: trunk/bin/grid/gwms-swift-workers
===================================================================
--- trunk/bin/grid/gwms-swift-workers	2011-11-29 16:23:35 UTC (rev 5320)
+++ trunk/bin/grid/gwms-swift-workers	2011-11-29 16:31:58 UTC (rev 5321)
@@ -2,92 +2,75 @@
 
 $stdout.sync = true
 
-# FIXME: Is this the best way to get lib functions from bin/grid into RUBYLIB ?
-
 $:[$:.length] = File.dirname($0)
 
 require 'mk_catalog'
 require 'etc'
 
 class Site
-  attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
-  attr_reader :submit_file
+    attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
+    attr_reader :submit_file
 
-#      executable = <%= @app_dir %>/worker.pl  # FIXME (below)
+    def gen_submit(count = 1)
 
-#       transfer_executable = True
+        ov=$VERBOSE
+        $VERBOSE=nil
+        workerExecutable = `which worker.pl`
+        workerWrapper = `which run-worker.sh`
+        $VERBOSE=ov
+        workerContact = ARGV[2]
 
-#      executable = /home/wilde/swift/src/0.92/cog/modules/swift/dist/swift-svn/bin/worker.pl
-#      arguments = http://128.135.125.17:<%= port %> <%= name %> /tmp 14400
+        job = %Q[
+          universe = vanilla
+          stream_output = False
+          stream_error = False
+          transfer_executable = true
+          periodic_remove = JobStatus == 5
+          notification = NEVER
 
-# WORKER_LOGGING_LEVEL=$LOGLEVEL $HOME/swift_gridtools/worker.pl $SERVICEURL swork${worker} $LOGDIR >& /dev/null &
+          #globus_rsl = (maxwalltime=240)
+          #grid_resource = <%= @grid_resource %>
+          executable = #{workerWrapper}
+          arguments = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
+          environment = WORKER_LOGGING_LEVEL=DEBUG
+          Input   = #{workerExecutable}
+          Error   = condor/$(Process).err
+          Output  = condor/$(Process).out
+          log     = condor.log
 
-  def gen_submit(count = 1)
-
-    ov=$VERBOSE
-    $VERBOSE=nil
-    workerExecutable = `which worker.pl`
-    workerWrapper = `which run-worker.sh`
-    $VERBOSE=ov
-#   workerContact = "http://communicado.ci.uchicago.edu:36906"
-    workerContact = ARGV[2]
-
-# for submit file:      log = condor.log
-#       <% count.times { %>queue
-#       <% } %>
-#      log     = condor/$(Process).log
-
-    job = %Q[
-      universe = grid
-      stream_output = False
-      stream_error = False
-      transfer_executable = true
-      periodic_remove = JobStatus == 5
-      notification = NEVER
-
-      globus_rsl = (maxwalltime=240)
-      grid_resource = <%= @grid_resource %>
-      executable = #{workerWrapper}
-      arguments = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
-      environment = WORKER_LOGGING_LEVEL=DEBUG
-      Input   = #{workerExecutable}
-      Error   = condor/$(Process).err
-      Output  = condor/$(Process).out
-      log     = condor.log
-
-      queue #{count}
+          queue #{count}
     ]
-    ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
-  end
 
-  def submit_job(count)
-    puts "submit_job: Submitting #{@name} #{count} jobs"
-    count = count.to_i
-    output = ""
-    submitfile = gen_submit(count)
-    IO.popen("condor_submit", "w+") do |submit|
-      submit.puts submitfile
-      submit.close_write
-      output = submit.read
+        ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
     end
-    output
-  end
+    def submit_job(count)
+        puts "submit_job: Submitting #{@name} #{count} jobs"
+        count = count.to_i
+        output = ""
+        submitfile = gen_submit(count)
+        IO.popen("condor_submit", "w+") do |submit|
+            submit.puts submitfile
+            submit.close_write
+            output = submit.read
+        end
+        output
+    end
 
-  def queued
-    ov=$VERBOSE
-    $VERBOSE=nil
-    jobs = `condor_q  #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
-    $VERBOSE=ov
-    jobs.split(" ").size
-  end
+    def queued
+        ov=$VERBOSE
+        $VERBOSE=nil
+        jobs = `condor_q  #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
+        $VERBOSE=ov
+        jobs.split(" ").size
+    end
 
-  def running
-    ov=$VERBOSE
-    $VERBOSE=nil
-    jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
-    $VERBOSE=ov
-    jobs.split(" ").size
-  end
+    def running
+        ov=$VERBOSE
+        $VERBOSE=nil
+        jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
+        $VERBOSE=ov
+        jobs.split(" ").size
+    end
 
 end
 
@@ -105,99 +88,99 @@
 =end
 
 if __FILE__ == $0
-  raise "No greenlist file" if !ARGV[0]
+    raise "No greenlist file" if !ARGV[0]
 
-  start_port = 61100 # FIXME
-  ctr        = 0
-  threads    = []
-  ARGV[1]    = "scec" if !ARGV[1]
-  greenlist  = IO.readlines(ARGV[0]).map { |line| line.chomp! }
-  $username = Etc.getlogin
+    start_port = 61100 # FIXME
+    ctr        = 0
+    threads    = []
+    ARGV[1]    = "scec" if !ARGV[1]
+    greenlist  = IO.readlines(ARGV[0]).map { |line| line.chomp! }
+    $username = Etc.getlogin
 
-  puts "Username = #{$username}"
+    puts "Username = #{$username}"
 
-  minSiteJobs = 2
-  paddedDemand = 0
-  swiftDemand = 0
-  totalCores = 0
-  totalRunning = 0
+    minSiteJobs = 2
+    paddedDemand = 0
+    swiftDemand = 0
+    totalCores = 0
+    totalRunning = 0
 
-  ress_parse(ARGV[1]) do |name, value|
-    next if not greenlist.index(name) and not greenlist.empty?
-    totalCores += (value.throttle * 100 + 2).to_i
-  end
-  puts "totalCores for green sites = #{totalCores}"
+    ress_parse(ARGV[1]) do |name, value|
+        next if not greenlist.index(name) and not greenlist.empty?
+        totalCores += (value.throttle * 100 + 2).to_i
+    end
+    puts "totalCores for green sites = #{totalCores}"
 
-  demandThread = Thread.new("monitor-demand") do |t|
-    puts "starting demand thread"
-    while true do
-      puts "in demand thread"
-      swiftDemand = IO.read("swiftDemand").to_i  # Replace this with sensor of Swift demand
-      # swiftDemand = 15
-      paddedDemand = (swiftDemand * 1.2).to_i
-      ov=$VERBOSE;$VERBOSE=nil
-      totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
-      $VERBOSE=ov
-      puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
-      sleep 60
+    demandThread = Thread.new("monitor-demand") do |t|
+        puts "starting demand thread"
+        while true do
+            puts "in demand thread"
+            swiftDemand = IO.read("swiftDemand").to_i  # Replace this with sensor of Swift demand
+            # swiftDemand = 15
+            paddedDemand = (swiftDemand * 1.2).to_i
+            ov=$VERBOSE;$VERBOSE=nil
+            totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
+            $VERBOSE=ov
+            puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
+            sleep 60
+        end
     end
-  end
 
-  ress_parse(ARGV[1]) do |name, value|
-    next if not greenlist.index(name) and not greenlist.empty?
-    site               = Site.new
-    site.name          = name
-    site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
-    #site.grid_resource = "fork"
-    site.gridftp       = "gsiftp://#{value.url}"
-    site.app_dir       = value.app_dir
-    site.data_dir      = value.data_dir
-    site.port          = start_port + ctr
+    ress_parse(ARGV[1]) do |name, value|
+        next if not greenlist.index(name) and not greenlist.empty?
+        site               = Site.new
+        site.name          = name
+        site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
+        #site.grid_resource = "fork"
+        site.gridftp       = "gsiftp://#{value.url}"
+        site.app_dir       = value.app_dir
+        site.data_dir      = value.data_dir
+        site.port          = start_port + ctr
 
-    # local per-site attributes:
+        # local per-site attributes:
 
-    cores = (value.throttle * 100 + 2).to_i
-    siteFraction = cores.to_f / totalCores.to_f
-    siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
-    siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+        cores = (value.throttle * 100 + 2).to_i
+        siteFraction = cores.to_f / totalCores.to_f
+        siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
+        siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
 
-    printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
-    targetQueued = 3
+        printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
+        targetQueued = 3
 
-    site.gen_submit
+        site.gen_submit
 
-    threads << Thread.new(name) do |job|
-      trip=0
-      while true do
-        if ( (swiftDemand) > totalRunning ) then
-          # demands > running: enforce N-queued algorithm
-          queued = site.queued
-          running = site.running
-          printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
-          if (running+queued) == 0 then
-            newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
-            printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
-              trip, name, newJobs, siteFraction * 100, paddedDemand
-            site.submit_job(newJobs)
-          elsif queued == 0 then
-            toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
-            printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
-              trip, name, queued, targetQueued, toRun
-            site.submit_job(toRun)
-          elsif queued < targetQueued
-            printf "trip %d site %s queued %d below target %d - submitting %d\n",
-              trip, name, queued, targetQueued, targetQueued-queued
-            site.submit_job(targetQueued - queued)
-          end
-          trip += 1
-          # puts "#{name}: #{total}"
+        threads << Thread.new(name) do |job|
+            trip=0
+            while true do
+                if ( (swiftDemand) > totalRunning ) then
+                    # demands > running: enforce N-queued algorithm
+                    queued = site.queued
+                    running = site.running
+                    printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
+                    if (running+queued) == 0 then
+                        newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
+                        printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
+                            trip, name, newJobs, siteFraction * 100, paddedDemand
+                        site.submit_job(newJobs)
+                    elsif queued == 0 then
+                        toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
+                        printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
+                            trip, name, queued, targetQueued, toRun
+                        site.submit_job(toRun)
+                    elsif queued < targetQueued
+                        printf "trip %d site %s queued %d below target %d - submitting %d\n",
+                            trip, name, queued, targetQueued, targetQueued-queued
+                        site.submit_job(targetQueued - queued)
+                    end
+                    trip += 1
+                    # puts "#{name}: #{total}"
+                end
+                sleep 60
+            end
         end
-      sleep 60
-      end
-    end
 
-    ctr += 1
-  end
+        ctr += 1
+    end
 end
 threads.each { |job| job.join }
 puts "All threads completed."
@@ -238,7 +221,7 @@
     get #running
 
 
-  
+
 THREAD for each site
   while swiftScriptIsRunning
     get site.running




More information about the Swift-commit mailing list