[Swift-commit] r5338 - trunk/bin/grid
ketan at ci.uchicago.edu
ketan at ci.uchicago.edu
Sat Dec 3 19:42:07 CST 2011
Author: ketan
Date: 2011-12-03 19:42:06 -0600 (Sat, 03 Dec 2011)
New Revision: 5338
Added:
trunk/bin/grid/run-gwms-workers
Removed:
trunk/bin/grid/gwms-swift-workers
Log:
added gwms workers run script
Deleted: trunk/bin/grid/gwms-swift-workers
===================================================================
--- trunk/bin/grid/gwms-swift-workers 2011-12-02 20:56:36 UTC (rev 5337)
+++ trunk/bin/grid/gwms-swift-workers 2011-12-04 01:42:06 UTC (rev 5338)
@@ -1,241 +0,0 @@
-#! /usr/bin/env ruby
-
-$stdout.sync = true
-
-$:[$:.length] = File.dirname($0)
-
-require 'mk_catalog'
-require 'etc'
-
-class Site
- attr_accessor :grid_resource, :gridftp, :data_dir, :app_dir, :name, :port
- attr_reader :submit_file
-
- def gen_submit(count = 1)
-
- ov=$VERBOSE
- $VERBOSE=nil
- workerExecutable = `which worker.pl`
- workerWrapper = `which run-worker.sh`
- $VERBOSE=ov
- workerContact = ARGV[2]
-
- job = %Q[
- universe = vanilla
- stream_output = False
- stream_error = False
-
- transfer_executable = true
- should_transfer_files = YES
- WhenToTransferOutput = ON_EXIT
-
- #periodic_remove = JobStatus == 5
- notification = NEVER
-
- executable = #{workerWrapper}
- arguments = #{workerContact} <%= @name.gsub(/__.*/,"") %> /tmp
- environment = WORKER_LOGGING_LEVEL=DEBUG
- Input = #{workerExecutable}
- Error = condor/$(Process).err
- Output = condor/$(Process).out
- log = condor.log
-
- queue #{count}
- ]
-
- ERB.new(job.gsub(/^\s+/, ""), 0, "%<>", "@submit_file").result(binding)
- end
- def submit_job(count)
- puts "submit_job: Submitting #{@name} #{count} jobs"
- count = count.to_i
- output = ""
- submitfile = gen_submit(count)
- IO.popen("condor_submit", "w+") do |submit|
- submit.puts submitfile
- submit.close_write
- output = submit.read
- end
- output
- end
-
- def queued
- ov=$VERBOSE
- $VERBOSE=nil
- jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 1' -format \"%s \" GlobalJobId`
- $VERBOSE=ov
- jobs.split(" ").size
- end
-
- def running
- ov=$VERBOSE
- $VERBOSE=nil
- jobs = `condor_q #{$username} -const 'GridResource == \"#{@grid_resource}\" && JobStatus == 2' -format \"%s \" GlobalJobId`
- $VERBOSE=ov
- jobs.split(" ").size
- end
-
-end
-
-=begin
-# For reference:
-JobStatus in job ClassAds
-
-0 Unexpanded U
-1 Idle I
-2 Running R
-3 Removed X
-4 Completed C
-5 Held H
-6 Submission_err E
-=end
-
-if __FILE__ == $0
- raise "No greenlist file" if !ARGV[0]
-
- start_port = 61100 # FIXME
- ctr = 0
- threads = []
- ARGV[1] = "scec" if !ARGV[1]
- greenlist = IO.readlines(ARGV[0]).map { |line| line.chomp! }
- $username = Etc.getlogin
-
- puts "Username = #{$username}"
-
- minSiteJobs = 2
- paddedDemand = 0
- swiftDemand = 0
- totalCores = 0
- totalRunning = 0
-
- ress_parse(ARGV[1]) do |name, value|
- next if not greenlist.index(name) and not greenlist.empty?
- totalCores += (value.throttle * 100 + 2).to_i
- end
- puts "totalCores for green sites = #{totalCores}"
-
- demandThread = Thread.new("monitor-demand") do |t|
- puts "starting demand thread"
- while true do
- puts "in demand thread"
- swiftDemand = IO.read("swiftDemand").to_i # Replace this with sensor of Swift demand
- # swiftDemand = 15
- paddedDemand = (swiftDemand * 1.2).to_i
- ov=$VERBOSE;$VERBOSE=nil
- totalRunning = `condor_q #{$username} -const 'JobStatus == 2' -format \"%s \" GlobalJobId`.split(" ").size
- $VERBOSE=ov
- puts "*** demandThread: swiftDemand=#{swiftDemand} paddedDemand=#{paddedDemand} totalRunning=#{totalRunning}"
- sleep 60
- end
- end
-
- ress_parse(ARGV[1]) do |name, value|
- next if not greenlist.index(name) and not greenlist.empty?
- site = Site.new
- site.name = name
- site.grid_resource = "gt2 #{value.url}/jobmanager-#{value.jm}"
- #site.grid_resource = "fork"
- site.gridftp = "gsiftp://#{value.url}"
- site.app_dir = value.app_dir
- site.data_dir = value.data_dir
- site.port = start_port + ctr
-
- # local per-site attributes:
-
- cores = (value.throttle * 100 + 2).to_i
- siteFraction = cores.to_f / totalCores.to_f
- siteTargetRunning = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
- siteTargetQueued = [ (swiftDemand.to_f * siteFraction), minSiteJobs ].max
-
- printf "site: %5d cores %2d%% %s\n", cores, siteFraction * 100, name
- targetQueued = 3
-
- site.gen_submit
-
- threads << Thread.new(name) do |job|
- trip=0
- while true do
- if ( (swiftDemand) > totalRunning ) then
- # demands > running: enforce N-queued algorithm
- queued = site.queued
- running = site.running
- printf "trip %d site %s running %d queued %d\n", trip, name,running,queued
- if (running+queued) == 0 then
- newJobs = [ (paddedDemand * siteFraction).to_i, minSiteJobs ].max
- printf "trip %d site %s empty - submitting %d (%d%% of demand %d)\n",
- trip, name, newJobs, siteFraction * 100, paddedDemand
- site.submit_job(newJobs)
- elsif queued == 0 then
- toRun = [ running * 1.2, [(paddedDemand * siteFraction).to_i, minSiteJobs ].max ].max
- printf "trip %d site %s queued %d target %d has drained queue - submitting %d\n",
- trip, name, queued, targetQueued, toRun
- site.submit_job(toRun)
- elsif queued < targetQueued
- printf "trip %d site %s queued %d below target %d - submitting %d\n",
- trip, name, queued, targetQueued, targetQueued-queued
- site.submit_job(targetQueued - queued)
- end
- trip += 1
- # puts "#{name}: #{total}"
- end
- sleep 60
- end
- end
-
- ctr += 1
- end
-end
-threads.each { |job| job.join }
-puts "All threads completed."
-
-# TODO:
-#
-# tag jobs for each run uniquely, and track them as a unique factory instance
-#
-
-=begin
-
-"Keep N Queued" Algorithm
-
-Goal:
-- monitor a running swift script to track its changing demand for cores
-- increase the # of running workers to meet the demand
-- let workers that are idle time out when supply is greater than demand
-
-Initially:
-- set a constant demand
-- determine #cores at each site
-
-initialPressure = 1.2 # increase demand
-initialDemand = 50 # initial demand prior to first poll of Swift, to prime the worker pool ahead of Swift demand
-
-- set a constant target queued for each site based on ncores
-- set a target #running
-
-THREAD 0:
- demand = initialDemand
- for each site
- site.need = (site.cores/totalcores) * demand
- sleep delay
-
-
- while swiftScriptIsRunning
- get demand
- get #running
-
-
-
-THREAD for each site
- while swiftScriptIsRunning
- get site.running
- get set.queued
- need = demand - running
- if need > 0
- if running+queued = 0
-
-keep queued on each site:
- max( expectation, 50% of observation )
-
- toalc=1000
- sitec = 200 20% d=100 ex=20 q=20
- r=50 q=25
-=end
Added: trunk/bin/grid/run-gwms-workers
===================================================================
--- trunk/bin/grid/run-gwms-workers (rev 0)
+++ trunk/bin/grid/run-gwms-workers 2011-12-04 01:42:06 UTC (rev 5338)
@@ -0,0 +1,36 @@
+#!/bin/bash
+
+workerExecutable=`which worker.pl`
+workerWrapper=`which run-worker.sh`
+workerContact=$1
+
+cat > myjob.condor <<EOF
+universe = vanilla
+
+transfer_output = true
+transfer_error = true
+
+stream_output = False
+stream_error = False
+
+transfer_executable = true
+should_transfer_files = YES
+WhenToTransferOutput = ON_EXIT
+
+periodic_remove = JobStatus == 5
+PeriodicRelease = (NumGlobusSubmits < 5) && ((CurrentTime - EnteredCurrentStatus) > (60*60))
+notification = NEVER
+
+executable = $workerWrapper
+arguments = $workerContact scec /tmp
+environment = WORKER_LOGGING_LEVEL=DEBUG
+Input = $workerExecutable
+Error = condor/job.err
+Output = condor/job.out
+log = condor.log
+
+queue 50
+EOF
+
+condor_submit myjob.condor
+
More information about the Swift-commit
mailing list