[Swift-commit] r3621 - SwiftApps/SwiftR/Swift/exec

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Wed Sep 8 16:39:47 CDT 2010


Author: wilde
Date: 2010-09-08 16:39:47 -0500 (Wed, 08 Sep 2010)
New Revision: 3621

Added:
   SwiftApps/SwiftR/Swift/exec/start-swift-workers
   SwiftApps/SwiftR/Swift/exec/worker.pl
Log:
Interim verion of modified worker for R and script to start persistent passive worker enemble.

Added: SwiftApps/SwiftR/Swift/exec/start-swift-workers
===================================================================
--- SwiftApps/SwiftR/Swift/exec/start-swift-workers	                        (rev 0)
+++ SwiftApps/SwiftR/Swift/exec/start-swift-workers	2010-09-08 21:39:47 UTC (rev 3621)
@@ -0,0 +1,140 @@
+#! /bin/bash
+
+COMPUTEHOSTS=$1
+#  COMPUTEHOSTS='crush thwomp stomp crank steamroller grind churn trounce thrash vanquish'
+#  COMPUTEHOSTS='communicado'
+
+shift
+
+# Find our bin dir (to use for running utility scripts)
+
+SWIFTRBIN=$(cd $(dirname $0); pwd)
+echo SWIFTRBIN=$SWIFTRBIN
+workerscript=$SWIFTRBIN/worker.pl
+
+function wait-and-start-workers
+{
+  # Look for:
+  # Passive queue processor initialized. Callback URI is http://140.221.8.62:55379
+
+  for try in $(seq 1 20); do
+    uriline=$(grep "Passive queue processor initialized. Callback URI is" $out 2> /dev/null)
+    if [ "_$uriline" = _ ]; then
+      sleep 1
+    else
+      break;
+    fi
+  done
+  CONTACT=$(echo $uriline | sed -e 's/^.*http:/http:/')
+  echo Coaster contact: $CONTACT
+
+  LOGDIR=$(pwd)/swiftworkerlogs # full path. FIXME: Generate this with remote-side paths if not shared dir env?
+  mkdir -p $LOGDIR
+
+  IDLETIMEOUT=$((60*60)) # 1 hour
+
+  # CA="X509_CERT_DIR=$CERTDIR X509_CADIR=$CERTDIR"
+  for host in $(echo $COMPUTEHOSTS); do
+    timestamp=$(date "+%Y.%m%d.%H%M%S")
+    random=$(awk "BEGIN {printf \"%0.5d\", $RANDOM}")
+    ID=$timestamp.$random
+#     ssh $host WORKER_LOGGING_ENABLED=false $workerscript $CONTACT $ID $LOGDIR $IDLETIMEOUT &
+#      ssh $host '/bin/sh -c "'"WORKER_LOGGING_ENABLED=true $workerscript $CONTACT $ID $LOGDIR $IDLETIMEOUT"' & echo \$!"' >remotepid.$host </dev/null &
+
+       ssh $host '/bin/sh -c '\'"WORKER_LOGGING_ENABLED=true $workerscript $CONTACT $ID $LOGDIR $IDLETIMEOUT 2>&1 & echo PID=\$!"\'  >remotepid.$host </dev/null &
+
+    sshpids="$sshpids $!"
+  done
+
+  echo Started workers from these ssh processes: $sshpids
+  echo $sshpids > $sshpidfile
+  # ps $sshpids
+}
+
+if ! which swift >& /dev/null; then
+  echo $0: \'swift\' command not found in PATH
+  exit 1
+fi
+
+
+mkdir -p service
+servicedir=service
+cd service
+out=`mktemp swift.stdouterr.XXXX`
+
+coaster-service -nosec >& coaster-service.log &
+coasterservicepid=$!
+
+cat >sites.xml <<END
+<config>
+  <pool handle="localhost">
+    <execution provider="coaster-persistent" url="http://bridled.ci.uchicago.edu" jobmanager="local:local"/>
+    <profile namespace="globus" key="workerManager">passive</profile>
+    <profile namespace="globus" key="workersPerNode">4</profile>
+    <profile key="jobThrottle" namespace="karajan">.03</profile>
+    <profile namespace="karajan" key="initialScore">10000</profile>
+    <filesystem provider="local" url="none" />
+    <workdirectory>/home/wilde/swiftwork</workdirectory>
+  </pool>
+</config>
+END
+
+cat >tc <<END
+localhost sh /bin/sh null null null
+END
+
+cat >cf <<END
+
+wrapperlog.always.transfer=true
+sitedir.keep=true
+execution.retries=0
+lazy.errors=false
+status.mode=provider
+use.provider.staging=false
+
+END
+
+cat >passivate.swift <<END
+
+type file;
+
+app passivate ()
+{
+  sh "-c" "echo dummy swift job;";
+}
+
+passivate();
+
+END
+
+touch $out
+sshpidfile=${out/stdouterr/workerpids}
+
+echo swift output is in: $out, pids in $sshpidfile
+
+TRAPS="EXIT 1 2 3 15"  # Signals and conditions to trap
+
+function onexit {
+  trap - $TRAPS
+  sshpids=$(cat $sshpidfile)
+  echo "Terminating worker processes $sshpids, starter $starterpid, and coaster-service pid $coasterservicepid"
+  if [ "_$sshpids$starterpid$coasterservicepid" != _ ]; then
+    echo kill $sshpids $starterpid $coasterservicepid >& /dev/null
+  fi
+  for rpfile in $(ls -1 remotepid.*); do
+    rpid=$(cat $rpfile)
+    rhost=$(echo $rpfile | sed -e 's/remotepid.//')
+    echo from $rpfile: ssh $rhost kill $rpid
+  done
+}
+
+trap onexit $TRAPS
+
+wait-and-start-workers &
+starterpid=$!
+
+swift -config cf -tc.file tc -sites.file sites.xml passivate.swift 2>&1 | tee $out
+
+echo "==> Service started."
+
+wait


Property changes on: SwiftApps/SwiftR/Swift/exec/start-swift-workers
___________________________________________________________________
Name: svn:executable
   + *

Added: SwiftApps/SwiftR/Swift/exec/worker.pl
===================================================================
--- SwiftApps/SwiftR/Swift/exec/worker.pl	                        (rev 0)
+++ SwiftApps/SwiftR/Swift/exec/worker.pl	2010-09-08 21:39:47 UTC (rev 3621)
@@ -0,0 +1,1372 @@
+#!/usr/bin/perl
+# Args:
+# 	<URIs> <blockid> <logdir>
+#	where:
+#		<URIs> - comma separated list of URIs for the coaster service; they
+#				will be tried in order
+#		<blockid> - some block id (the log file will be named based on this)
+#		<logdir> - some directory in which the logs should go
+#
+
+use IO::Socket;
+use File::Basename;
+use File::Path;
+use File::Copy;
+use Time::HiRes qw(time);
+use Cwd;
+use POSIX ":sys_wait_h";
+use strict;
+use warnings;
+
+# Create a stack of job slot ids (for auxiliary services):
+#   each slot has a small integer id
+#   take a slot from the stack when starting a job
+#   return the slot to the stack (making it available) when a job ends
+#   pass the slot id to the job via env var SWIFT_JOB_SLOT
+#   jobs can use this to reach a persistent process asscoaited with the "slot"
+#   initial use is to send work to persistent R service workers.
+
+use constant MAXJOBSLOTS => 128; # FIXME: determine maxworkers dynamically?
+my @jobslots=();
+for( my $jobslot=MAXJOBSLOTS-1; $jobslot>=0; $jobslot--) {
+  push @jobslots, $jobslot;
+}
+
+# If ASYNC is on, the following will be done:
+#   1. Stageouts will be done in parallel
+#   2. The job status will be set to "COMPLETED" as soon as the last
+#      file is staged out (and before any cleanup is done).
+use constant ASYNC => 1;
+
+use constant {
+	TRACE => 0,
+	DEBUG => 1,
+	INFO => 2,
+	WARN => 3,
+	ERROR => 4,
+};
+
+use constant {
+	CONTINUE => 0,
+	YIELD => 1,
+};
+
+my $LOGLEVEL = DEBUG;
+
+my @LEVELS = ("TRACE", "DEBUG", "INFO ", "WARN ", "ERROR"); 
+
+use constant {
+	REPLY_FLAG => 0x00000001,
+	FINAL_FLAG => 0x00000002,
+	ERROR_FLAG => 0x00000004,
+	PROGRESSIVE_FLAG => 0x00000008
+};
+
+use constant {
+	COMPLETED => 0x07,
+	FAILED => 0x05,
+	ACTIVE => 0x02,
+	STAGEIN => 0x10,
+	STAGEOUT => 0x11,
+};
+
+my $TAG = 0;
+use constant RETRIES => 3;
+use constant REPLYTIMEOUT => 180;
+use constant MAXFRAGS => 16;
+use constant MAX_RECONNECT_ATTEMPTS => 3;
+
+my $LASTRECV = 0;
+my $JOBS_RUNNING = 0;
+
+my $JOB_COUNT = 0;
+
+use constant BUFSZ => 2048;
+
+# 60 seconds by default. Note that since there is no configuration handshake
+# this would have to match the default interval in the service in order to avoid
+# "lost heartbeats".
+use constant HEARTBEAT_INTERVAL => 2 * 60;
+
+my $ID = "-";
+
+sub wlog {
+	my $msg;
+	my $level = shift;
+	if ($level >= $LOGLEVEL) {
+		foreach $msg (@_) {
+		        my $timestamp = timestring();
+			my $msgline = sprintf("%s %s %s %s", 
+					      $timestamp,
+					      $LEVELS[$level], 
+					      $ID, $msg);
+			print LOG $msgline;
+		}
+	}
+	return 1;
+}
+
+# Command-line arguments:
+
+my $URISTR=$ARGV[0];
+my $BLOCKID=$ARGV[1];
+my $LOGDIR=$ARGV[2];
+my $IDLETIMEOUT = ( $#ARGV <= 2 ) ? (4 * 60) : $ARGV[3];
+
+
+# REQUESTS holds a map of incoming requests
+my %REQUESTS = ();
+
+# REPLIES stores the state of (outgoing) commands for which replies are expected
+my %REPLIES  = ();
+
+my $LOG = logfilename($LOGDIR, $BLOCKID);
+
+my %HANDLERS = (
+	"SHUTDOWN"  => \&shutdownw,
+	"SUBMITJOB" => \&submitjob,
+	"REGISTER"  => \&register,
+	"HEARTBEAT" => \&heartbeat,
+	"WORKERSHELLCMD" => \&workershellcmd,
+);
+
+my @CMDQ = ();
+
+my @URIS = split(/,/, $URISTR); 
+my @SCHEME;
+my @HOSTNAME;
+my @PORT;
+my $URI;
+foreach $URI (@URIS) {
+	if ($URI =~ /(.*):\/\//) { push(@SCHEME, $1); } else { die "Could not parse url scheme: $URI"; }
+	if ($URI =~ /.*:\/\/(.*):/) { push(@HOSTNAME, $1); } else { die "Could not parse url hostname: $URI"; }
+	if ($URI =~ /.*:\/\/.*:(.*)/) { push(@PORT, $1); } else { die "Could not parse url port: $URI"; }
+}
+my $SOCK;
+my $LAST_HEARTBEAT = 0;
+
+my %JOBWAITDATA = ();
+my %JOBDATA = ();
+
+sub logfilename {
+    $LOGDIR = shift;
+    $BLOCKID = shift;
+    my $result = undef;
+    my $uci;
+    if (-r "/proc/personality.sh") {
+	$uci = get_bg_uci();
+	$result = "$LOGDIR/worker-$BLOCKID-$uci.log";
+    }
+    else {
+	$result = "$LOGDIR/worker-$BLOCKID.log";
+    }
+    print STDERR "logfilename: $result\n";
+    return $result;
+}
+
+# Get the BlueGene Universal Component Identifier from Zepto 
+sub get_bg_uci() {
+    my %vars = file2hash("/proc/personality.sh");
+    my $uci = $vars{"BG_UCI"};
+    return $uci;
+}
+
+# Read a file into a hash, with file formatted as: 
+# KEY=VALUE
+sub file2hash() {
+    my $file = shift;
+    my %hash;
+    open FILE, "<$file";
+    while (<FILE>)
+    {
+	chomp;
+	my ($key, $val) = split /=/;
+	$hash{$key} = $val;
+    }
+    close FILE;
+    return %hash;
+}
+
+sub timestring() {
+  my $t = sprintf("%.3f", time());
+  #my @d = localtime(time());
+  #my $t = sprintf("%i/%02i/%02i %02i:%02i",
+  # $d[5]+1900, $d[4], $d[3], $d[2], $d[1]);
+  return $t;
+}
+
+sub hts {
+	my ($H) = @_;
+	
+	my $k;	
+	my $s = "{";
+	my $first = 1;
+	
+	for $k (keys %$H) {
+		if (!$first) {
+			$s = $s.", ";
+		}
+		else {
+			$first = 0;
+		}
+		$s = $s."$k = $$H{$k}";
+	}
+      
+	return $s."}";
+}
+
+sub reconnect() {
+	my $fail = 0;
+	my $success;
+	my $i;
+	my $j; 
+	for ($i = 0; $i < MAX_RECONNECT_ATTEMPTS; $i++) {
+		wlog INFO, "Connecting ($i)...\n";
+		my $sz = @HOSTNAME;
+		$success = 0;
+		for ($j = 0; $j < $sz; $j++) {
+			wlog DEBUG, "Trying $HOSTNAME[$j]:$PORT[$j]...\n";
+			$SOCK = IO::Socket::INET->new(Proto=>'tcp', PeerAddr=>$HOSTNAME[$j], PeerPort=>$PORT[$j], Blocking=>1) || ($fail = 1);
+			if (!$fail) {
+				$success = 1;
+				last;
+			}
+			else {
+				wlog DEBUG, "Connection failed: $!. Trying other addresses\n"; 
+			}
+		}
+		if ($success) {
+			$SOCK->setsockopt(SOL_SOCKET, SO_RCVBUF, 16384);
+			$SOCK->setsockopt(SOL_SOCKET, SO_SNDBUF, 32768);
+			wlog INFO, "Connected\n";
+			$SOCK->blocking(0);
+			queueCmd(registerCB(), "REGISTER", $BLOCKID, "");
+			last;
+		}
+		else {
+			my $delay = 2 ** $i;
+			wlog ERROR, "Connection failed for all addresses. Retrying in $delay seconds\n";
+			select(undef, undef, undef, $delay);
+		}
+	}
+	if (!$success) {
+		die "Failed to connect: $!";
+	}
+	$LAST_HEARTBEAT = time();
+}
+
+sub initlog() {
+        if (defined $ENV{"WORKER_LOGGING_ENABLED"}) {
+		open(LOG, ">>$LOG") or die "Failed to open log file: $!";
+		my $b = select(LOG);
+		$| = 1;
+		select($b);
+		my $date = localtime;
+		wlog INFO, "$BLOCKID Logging started: $date\n";
+	}
+	else {
+		$LOGLEVEL = 999;
+	}
+}
+
+
+sub init() {
+        logsetup();
+        reconnect();
+}
+
+sub logsetup() {
+        my $schemes = join(", ", @SCHEME);
+	my $hosts = join(", ", @HOSTNAME);
+	my $ports = join(", ", @PORT);
+	wlog DEBUG, "uri=$URISTR\n";
+	wlog DEBUG, "scheme=$schemes\n";
+	wlog DEBUG, "host=$hosts\n";
+	wlog DEBUG, "port=$ports\n";
+	wlog DEBUG, "blockid=$BLOCKID\n";
+	wlog DEBUG, "idletimeout=$IDLETIMEOUT\n";
+}
+
+sub sendm {
+	my ($tag, $flags, $msg) = @_;
+	my $len = length($msg);
+	my $buf = pack("VVV", $tag, $flags, $len);
+	$buf = $buf.$msg;
+
+	wlog(DEBUG, "OUT: len=$len, tag=$tag, flags=$flags\n");
+	wlog(TRACE, "$msg\n");
+
+	#($SOCK->send($buf) == length($buf)) || reconnect();
+	$SOCK->blocking(1);
+	eval {defined($SOCK->send($buf))} or wlog(WARN, "Send failed: $!\n") and die "Send failed: $!";
+	#eval {defined($SOCK->send($buf))} or wlog(WARN, "Send failed: $!\n");
+}
+
+sub sendFrags {
+	my ($tag, $flg, $data) = @_;
+	
+	my $flg2;
+	my $msg;
+	my $yield;
+	if (defined($$data{"tag"})) {
+		$tag = $$data{"tag"};
+	}
+	do {
+		($flg2, $msg, $yield) = $$data{"nextData"}($data);
+		sendm($tag, $flg | $flg2, $msg);
+	} while (($flg2 & FINAL_FLAG) == 0 && !$yield);
+	
+	if (($flg2 & FINAL_FLAG) == 0) {
+		# final flag not set; put it back in the queue
+		wlog DEBUG, "$tag yielding\n";
+		$$data{"tag"} = $tag;
+		queueCmdCustomDataHandling($REPLIES{$tag}, $data);
+	}
+	else {
+		if (exists($REPLIES{$tag})) {
+			my $record = $REPLIES{$tag};
+			my ($cont, $start) = ($$record[0], $$record[1]);
+			if (defined($$cont{"dataSent"})) {
+				$$cont{"dataSent"}($cont, $tag);
+			}
+		}
+		wlog(DEBUG, "done sending frags for $tag\n");
+	}
+}
+
+sub nextArrayData {
+	my ($state) = @_;
+	
+	my $index = $$state{"index"};
+	$$state{"index"} = $index + 1;
+	my $data = $$state{"data"};
+	if ($index > $#$data) {
+		die "Index out of bounds";
+	}
+	return ($index >= $#$data ? FINAL_FLAG : 0, $$data[$index], CONTINUE);
+}
+
+sub arrayData {
+	return {
+		"index" => 0,
+		"nextData" => \&nextArrayData,
+		"data" => \@_
+	};
+}
+
+sub nextFileData {
+	my ($state) = @_;
+	
+	my $s = $$state{"state"};
+	if ($s == 0) {
+		$$state{"state"} = $s + 1;
+		return (0, $$state{"cmd"}, CONTINUE);
+	}
+	elsif ($s == 1) {
+		$$state{"state"} = $s + 1;
+		return (0, pack("VV", $$state{"size"}, 0), CONTINUE);
+	}
+	elsif ($s == 2) {
+		$$state{"state"} = $s + 1;
+		return (0, $$state{"lname"}, CONTINUE);
+	}
+	elsif ($s == 3) {
+		$$state{"state"} = $s + 1;
+		$$state{"sent"} = 0;
+		return ($$state{"size"} == 0 ? FINAL_FLAG : 0, $$state{"rname"}, CONTINUE);
+	}
+	else {
+		my $handle = $$state{"handle"};
+		my $buffer;
+		my $sz = read($handle, $buffer, 8192);
+		if (!defined $sz) {
+			wlog INFO, "Failed to read data from file: $!\n";
+			return (FINAL_FLAG + ERROR_FLAG, "$!", CONTINUE);
+		}
+		elsif ($sz == 0 && $$state{"sent"} < $$state{"size"}) {
+			wlog INFO, "File size mismatch. $$state{'size'} vs. $$state{'sent'}\n";
+			return (FINAL_FLAG + ERROR_FLAG, "File size mismatch. Expected $$state{'size'}, got $$state{'sent'}", CONTINUE);
+		}
+		$$state{"sent"} += $sz;
+		wlog DEBUG, "size: $$state{'size'}, sent: $$state{'sent'}\n";
+		if ($$state{"sent"} == $$state{"size"}) {
+			close $handle;
+		}
+		return (($$state{"sent"} < $$state{"size"}) ? 0 : FINAL_FLAG, $buffer, YIELD);
+	}
+}
+
+sub fileData {
+	my ($cmd, $lname, $rname) = @_;
+	
+	my $desc;
+	if (!open($desc, "<", "$lname")) {
+		wlog WARN, "Failed to open $lname\n";
+		# let it go on for now. The next read from the descriptor will fail	
+	}
+	return {
+		"cmd" => $cmd,
+		"state" => 0,
+		"handle" => $desc,
+		"nextData" => \&nextFileData,
+		"size" => -s $lname,
+		"lname" => $lname,
+		"rname" => $rname
+	};
+}
+
+
+sub sendCmdInt {
+	my ($cont, $state) = @_;
+	my $ctag = $$state{"tag"};
+	if (!defined $ctag) {
+		$ctag =  $TAG++;
+		registerCmd($ctag, $cont);
+	}
+	sendFrags($ctag, 0, $state);
+	return $ctag;
+}
+
+sub sendCmd {
+	my @cmd = @_;
+	my $cont = shift(@cmd);
+	return sendCmdInt($cont, arrayData(@cmd));
+}
+
+sub queueCmd {
+	my @cmd = @_;
+	my $cont = shift(@cmd);
+	push @CMDQ, [$cont, arrayData(@cmd)];
+}
+
+sub queueCmdCustomDataHandling {
+	my ($cont, $state) = @_;
+	push @CMDQ, [$cont, $state];
+}
+
+sub sendReply {
+	my ($tag, @msgs) = @_;	
+	sendFrags($tag, REPLY_FLAG, arrayData(@msgs));
+}
+
+sub sendError {
+	my ($tag, @msgs) = @_;
+	sendFrags($tag, REPLY_FLAG | ERROR_FLAG, arrayData(@msgs));
+}
+
+sub unpackData {
+	my ($data) = @_;
+
+	my $lendata = length($data);
+	if ($lendata < 12) {
+		wlog WARN, "Received faulty message (length < 12: $lendata)\n";
+		die "Received faulty message (length < 12: $lendata)";
+	}
+	my $tag = unpack("V", substr($data, 0, 4));
+	my $flg = unpack("V", substr($data, 4, 4));
+	my $len = unpack("V", substr($data, 8, 4));
+	my $msg;
+	my $frag;
+	my $alen = 0;
+	while ($alen < $len) {
+		$SOCK->recv($frag, $len - $alen);
+		$alen = $alen + length($frag);
+		$msg = $msg.$frag;
+	}
+	
+	my $actuallen = length($msg);
+	wlog(TRACE, " IN: len=$len, actuallen=$actuallen, tag=$tag, flags=$flg, $msg\n");
+	if ($len != $actuallen) {
+		wlog(WARN, "len != actuallen\n");
+	}
+	return ($tag, $flg, $msg);
+}
+
+sub processRequest {
+	my ($state, $tag, $timeout, $err, $fin, $msg) = @_;
+	
+	my $request = $$state{"request"};
+	if (!defined($request)) {
+		$request = [];
+		$$state{"request"} = $request;
+	}
+	push(@$request, $msg);
+	
+	if ($timeout) {
+		sendError($tag, ("Timed out waiting for all fragments"));
+	}
+	elsif (!$fin) {
+		return;
+	}
+	else {
+		wlog DEBUG, "Processing request\n";
+		my $cmd = shift(@$request);
+		wlog DEBUG, "Cmd is $cmd\n";
+		if (exists($HANDLERS{$cmd})) {
+			$HANDLERS{$cmd}->($tag, 0, $request);
+		}
+		else {
+			sendError($tag, ("Unknown command: $cmd"));
+		}
+	}
+}
+
+sub process {
+	my ($tag, $flg, $msg) = @_;
+	
+	
+	my $reply = $flg & REPLY_FLAG;
+	my ($record, $cont, $start);
+	
+	if ($reply) {
+		if (exists($REPLIES{$tag})) {
+			$record = $REPLIES{$tag};
+			($cont, $start) = ($$record[0], $$record[1]);
+		}
+		else {
+			wlog(WARN, "received reply to unregistered command (tag=$tag). Discarding.\n");
+			return;
+		}
+	}
+	else {
+		$LASTRECV = time();
+		if (!exists($REQUESTS{$tag})) {
+			$REQUESTS{$tag} = [{"dataIn" => \&processRequest}, time()];
+			wlog DEBUG, "New request ($tag)\n";
+		}
+		$record = $REQUESTS{$tag};
+		($cont, $start) = ($$record[0], $$record[1]);
+	}
+		
+	my $fin = $flg & FINAL_FLAG;
+	my $err = $flg & ERROR_FLAG;
+		
+
+	if ($fin) {
+		if ($reply) {
+			# A reply for a command sent by us has been received, which means that
+			# the lifecycle of the command is complete, therefore the state of
+			# that command can be deleted.
+			delete($REPLIES{$tag});
+		}
+		else {
+			# All fragments of a request have been received. Since the record is 
+			# stored in $cont, $tag, $err, $fin, $msg, we can remove it from the
+			# table of (partial) incoming requests
+			delete($REQUESTS{$tag});
+		}
+		wlog DEBUG, "Fin flag set\n";
+	}
+	
+	$$cont{"dataIn"}($cont, $tag, 0, $err, $fin, $msg);
+	
+	return 1;
+}
+
+sub checkTimeouts2 {
+	my ($hash) = @_;
+	
+	my $now = time();
+	my @del = ();
+	
+	my $k;
+	my $v;
+	
+	while (($k, $v) = each(%$hash)) {
+		if ($now - $$v[1] > REPLYTIMEOUT) {
+			push(@del, $k);
+			my $cont = $$v[0];
+			$$cont{"dataIn"}($cont, $k, 1, 0, 0, "Reply timeout");
+		}
+	}
+	
+	foreach $k (@del) {
+		delete $$hash{$k};
+	}
+}
+
+my $LASTTIMEOUTCHECK = 0;
+
+sub checkTimeouts {
+	my $time = time();
+	if ($time - $LASTTIMEOUTCHECK < 1) {
+		return;
+	}
+	$LASTTIMEOUTCHECK = $time;
+	checkTimeouts2(\%REQUESTS);
+	checkTimeouts2(\%REPLIES);
+	if ($LASTRECV != 0) {
+		my $dif = $time - $LASTRECV;
+		wlog TRACE, "time: $time, lastrecv: $LASTRECV, dif: $dif\n"; 
+		if ($dif >= $IDLETIMEOUT && $JOBS_RUNNING == 0 ) {
+			wlog INFO, "Idle time exceeded (time=$time, LASTRECV=$LASTRECV, dif=$dif)\n";
+			die "Idle time exceeded";
+		}
+	}
+}
+
+sub recvOne {
+	my $data;
+	$SOCK->blocking(0);
+	$SOCK->recv($data, 12);
+	if (length($data) > 0) {
+		# wlog DEBUG, "Received " . unpackData($data) . "\n";
+		eval { process(unpackData($data)); } || (wlog ERROR, "Failed to process data: $@\n" && die "Failed to process data: $@");
+	}
+	else {
+		#sleep 1ms
+		select(undef, undef, undef, 0.001);
+		checkTimeouts();
+	}
+}
+
+sub registerCmd {
+	my ($tag, $cont) = @_;
+	
+	wlog DEBUG, "Replies: ".hts(\%REPLIES)."\n";
+	
+	$REPLIES{$tag} = [$cont, time(), ()];
+}
+
+
+sub mainloop {
+	while(1) {
+		loopOne();
+	}
+}
+
+sub loopOne {
+	my $cmd;
+	if (time() - $LAST_HEARTBEAT > HEARTBEAT_INTERVAL) {
+		queueCmd(heartbeatCB(), "HEARTBEAT");
+		$LAST_HEARTBEAT = time();
+	}
+	# send whatever is now queued; don't clear the queue, since
+	# things may be added to it while stuff is being sent
+	my $sz = scalar(@CMDQ);
+	for (my $i = 0; $i < $sz; $i++)  {
+		$cmd = pop(@CMDQ);
+		sendCmdInt(@$cmd);
+	}
+	checkJobs();
+	recvOne();
+}
+
+sub printreply {
+	my ($tag, $timeout, $err, $fin, $reply) = @_;
+	if ($timeout) {
+		wlog WARN, "Timed out waiting for reply to $tag\n";
+	}
+	else {
+		wlog DEBUG, "$$reply[0]\n";
+	}
+}
+
+sub nullCB {
+	return {
+		"dataIn" => sub {}
+	};
+}
+
+sub registerCB {
+	return {
+		"dataIn" => \&registerCBDataIn
+	};
+}
+
+sub registerCBDataIn {
+	my ($state, $tag, $timeout, $err, $fin, $reply) = @_;
+	
+	if ($timeout) {
+		die "Failed to register (timeout)\n";
+	}
+	elsif ($err) {
+		die "Failed to register (service returned error: ".join("\n", $reply).")";
+	}
+	else {
+		$ID = $reply;
+		wlog INFO, "Registration successful. ID=$ID\n";
+	}
+}
+
+sub heartbeatCB {
+	return {
+		"dataIn" => \&heartbeatCBDataIn
+	};
+}
+
+sub heartbeatCBDataIn {
+	my ($state, $tag, $timeout, $err, $fin, $reply) = @_;
+	
+	if ($timeout) {
+		if (time() - $LAST_HEARTBEAT > 2 * HEARTBEAT_INTERVAL) {
+			wlog WARN, "No heartbeat replies in a while. Dying.\n";
+			die "No response to heartbeat\n";
+		}
+	}
+	elsif ($err) {
+		wlog WARN, "Heartbeat failed: $reply\n";
+		die "Heartbeat failed: $reply\n";
+	} 
+	else {
+		wlog DEBUG, "Heartbeat acknowledged\n";
+	}
+}
+
+
+sub register {
+	my ($tag, $timeout, $reply) = @_;
+	sendReply($tag, ("OK"));
+}
+
+
+sub shutdownw {
+	my ($tag, $timeout, $msgs) = @_;
+	wlog DEBUG, "Shutdown command received\n";
+	sendReply($tag, ("OK"));
+	select(undef, undef, undef, 1);
+	wlog INFO, "Acknowledged shutdown. Exiting\n";
+	wlog INFO, "Ran a total of $JOB_COUNT jobs\n";
+	exit 0;
+}
+
+sub heartbeat {
+	my ($tag, $timeout, $msgs) = @_;
+	sendReply($tag, ("OK"));
+}
+
+sub workershellcmd {
+	my ($tag, $timeout, $msgs) = @_;
+	my $cmd = $$msgs[1];
+	my $out;
+	if ($cmd =~ m/cd\s*(.*)/) {
+		wlog DEBUG, "chdir $1\n";
+		chdir $1;
+		if ($! ne '') {
+			sendError($tag, ("$!"));
+		}
+		else {
+			sendReply($tag, ("OK", ""));
+		}
+	}
+	elsif ($cmd =~ m/mls\s*(.*)/) {
+		wlog DEBUG, "mls $1\n";
+		$out = `ls -d $1 2>/dev/null`;
+		sendReply($tag, ("OK", "$out"));
+	}
+	else {
+		$out = `$cmd 2>&1`;
+		sendReply($tag, ("OK", "$out"));
+	}
+}
+
+sub urisplit {
+	my ($name) = @_;
+
+	if (index($name, ":") == -1) {
+		return ("file", $name);
+	}
+
+	my ($protocol, $path) = split(/:\/\//, $name, 2);
+
+	return ($protocol, $path);
+}
+
+sub getFileCB {
+	my ($jobid, $src, $dst) = @_;
+	
+	my ($protocol, $path) = urisplit($src);
+	wlog DEBUG, "$jobid src: $src, protocol: $protocol, path: $path\n";
+	
+	if (($protocol eq "file") || ($protocol eq "proxy")) {
+		wlog DEBUG, "Opening $dst...\n";
+		my $dir = dirname($dst);
+		if (-f $dir) {
+			die "$jobid Cannot create directory $dir. A file with this name already exists";
+		}
+		if (!-d $dir) {
+			if (!mkpath($dir)) {
+				die "Cannot create directory $dir. $!";
+			}
+		}
+		# don't try open(DESC, ...) (as I did). It will use the same reference 
+		# and concurrent operations will fail. 
+		my $desc;
+		if (!open($desc, ">", "$dst")) {
+			die "Failed to open $dst: $!";
+		}
+		else {
+			wlog DEBUG, "$jobid Opened $dst\n";
+			return {
+				"jobid" => $jobid,
+				"dataIn" => \&getFileCBDataIn,
+				"state" => 0,
+				"lfile" => $dst,
+				"desc" => $desc
+			};
+		}
+	}
+	else {
+		return {
+			"jobid" => $jobid,
+			"dataIn" => \&getFileCBDataInIndirect,
+			"lfile" => $dst,
+		};
+	}
+}
+
+sub getFileCBDataInIndirect {
+	my ($state, $tag, $timeout, $err, $fin, $reply) = @_;
+	
+	my $jobid = $$state{"jobid"};
+	wlog DEBUG, "$jobid getFileCBDataInIndirect jobid: $jobid, tag: $tag, err: $err, fin: $fin\n";
+	if ($err) {
+		queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "520", "Error staging in file: $reply"));
+		delete($JOBDATA{$jobid});
+		return;
+	}
+	elsif ($timeout) {
+		queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "521", "Timeout staging in file"));
+		delete($JOBDATA{$jobid});
+		return;
+	}
+	if ($fin) {
+		stagein($jobid);
+	}
+}
+
+
+sub getFileCBDataIn {
+	my ($state, $tag, $timeout, $err, $fin, $reply) = @_;
+	
+	my $s = $$state{"state"};
+	my $jobid = $$state{"jobid"};
+	wlog DEBUG, "$jobid getFileCBDataIn jobid: $jobid, state: $s, tag: $tag, err: $err, fin: $fin\n";
+	if ($err) {
+		queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "520", "Error staging in file: $reply"));
+		delete($JOBDATA{$jobid});
+		return;
+	}
+	elsif ($timeout) {
+		queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "521", "Timeout staging in file"));
+		delete($JOBDATA{$jobid});
+		return;
+	}
+	elsif ($s == 0) {
+		$$state{"state"} = 1;
+		$$state{"size"} = unpack("V", $reply);
+		my $lfile = $$state{"lfile"};
+	}
+	else {
+		my $desc = $$state{"desc"};
+		if (!(print {$desc} $reply)) {
+			close $desc;
+			wlog DEBUG, "$jobid Could not write to file: $!. Descriptor was $desc; lfile: $$state{'lfile'}\n"; 
+			queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "522", "Could not write to file: $!"));
+			delete($JOBDATA{$jobid});
+			return;
+		}
+	}
+	if ($fin) {
+		my $desc = $$state{"desc"};
+		close $desc;
+		wlog DEBUG, "$jobid Closed $$state{'lfile'}\n";
+		stagein($jobid);
+	}
+}
+
+sub stagein {
+	my ($jobid) = @_;
+	
+	my $STAGE = $JOBDATA{$jobid}{"stagein"};
+	my $STAGED = $JOBDATA{$jobid}{"stageind"}; 
+	my $STAGEINDEX = $JOBDATA{$jobid}{"stageindex"};
+	
+	if (scalar @$STAGE <= $STAGEINDEX) {
+		wlog DEBUG, "$jobid Done staging in files ($STAGEINDEX, $STAGE)\n";
+		$JOBDATA{$jobid}{"stageindex"} = 0;
+		sendCmd((nullCB(), "JOBSTATUS", $jobid, ACTIVE, "0", "workerid=$ID"));
+		forkjob($jobid);
+	}
+	else {
+		if ($STAGEINDEX == 0) {
+			sendCmd((nullCB(), "JOBSTATUS", $jobid, STAGEIN, "0", "workerid=$ID"));
+		}
+		wlog DEBUG, "$jobid Staging in $$STAGE[$STAGEINDEX]\n";
+		$JOBDATA{$jobid}{"stageindex"} =  $STAGEINDEX + 1;
+		my ($protocol, $path) = urisplit($$STAGE[$STAGEINDEX]);
+		if ($protocol eq "sfs") {
+			if (!copy($path, $$STAGED[$STAGEINDEX])) {
+				wlog DEBUG, "$jobid Error staging in $path: $!\n";
+				queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "524", "$@"));
+			}
+			else {
+				stagein($jobid);		
+			}
+		}
+		else {
+			my $state;
+			eval {
+				$state = getFileCB($jobid, $$STAGE[$STAGEINDEX], $$STAGED[$STAGEINDEX]);
+			};
+			if ($@) {
+				wlog DEBUG, "$jobid Error staging in file: $@\n";
+				queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "524", "$@"));	
+			}
+			else {
+				sendCmd(($state, "GET", $$STAGE[$STAGEINDEX], $$STAGED[$STAGEINDEX]));
+			}
+		}
+	}
+}
+
+
+sub stageout {
+	my ($jobid) = @_;
+	
+	wlog DEBUG, "$jobid Staging out\n";
+	my $STAGE = $JOBDATA{$jobid}{"stageout"};
+	my $STAGED = $JOBDATA{$jobid}{"stageoutd"}; 
+	my $STAGEINDEX = $JOBDATA{$jobid}{"stageindex"};
+	
+	my $sz = scalar @$STAGE;
+	wlog DEBUG, "sz: $sz, STAGEINDEX: $STAGEINDEX\n";
+	if (scalar @$STAGE <= $STAGEINDEX) {
+		$JOBDATA{$jobid}{"stageindex"} = 0;
+		wlog DEBUG, "$jobid No more stageouts. Doing cleanup.\n";
+		cleanup($jobid);
+	}
+	else {
+		my $lfile = $$STAGE[$STAGEINDEX];
+		if (-e $lfile) {
+			if ($STAGEINDEX == 0) {
+				wlog DEBUG, "$jobid Sending STAGEOUT status\n";
+				sendCmd((nullCB(), "JOBSTATUS", $jobid, STAGEOUT, "0", "workerid=$ID"));
+			}
+			my $rfile = $$STAGED[$STAGEINDEX];
+			$JOBDATA{$jobid}{"stageindex"} = $STAGEINDEX + 1;
+			wlog DEBUG, "$jobid Staging out $lfile.\n";
+			my ($protocol, $path) = urisplit($rfile);
+			if ($protocol eq "file" || $protocol eq "proxy") {
+				queueCmdCustomDataHandling(putFileCB($jobid), fileData("PUT", $lfile, $rfile));
+			}
+			elsif ($protocol eq "sfs") {
+				if (!copy($lfile, $path)) {
+					queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "528", "$!"));
+					return;
+				}
+				else {
+					stageout($jobid);
+				}
+			}
+			else {
+				queueCmd((putFileCB($jobid), "PUT", pack("VV", 0, 0), $lfile, $rfile));
+			}
+			wlog DEBUG, "$jobid PUT sent.\n";
+		}
+		else {
+			wlog INFO, "$jobid Skipping stageout of missing file ($lfile)\n";
+			$JOBDATA{$jobid}{"stageindex"} = $STAGEINDEX + 1;
+			stageout($jobid);
+		}
+	}
+}
+
+sub cleanup {
+	my ($jobid) = @_;
+	
+	my $ec = $JOBDATA{$jobid}{"exitcode"};
+	if (ASYNC) {
+		if ($ec == 0) {
+			queueCmd((nullCB(), "JOBSTATUS", $jobid, COMPLETED, "0", ""));
+		}
+		else {
+			queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "$ec", "Job failed with an exit code of $ec"));
+		}
+	}
+	
+	if ($ec != 0) {
+		wlog DEBUG, "$jobid Job data: ".hts($JOBDATA{$jobid})."\n";
+		wlog DEBUG, "$jobid Job: ".hts($JOBDATA{$jobid}{'job'})."\n";
+		wlog DEBUG, "$jobid Job dir ".`ls -al $JOBDATA{$jobid}{'job'}{'directory'}`."\n";
+	}
+	
+	my $CLEANUP = $JOBDATA{$jobid}{"cleanup"};
+	my $c;
+	if ($ec == 0) {
+		for $c (@$CLEANUP) {
+			if ($c =~ /\/\.$/) {
+				chop $c;
+				chop $c;
+			}
+			wlog DEBUG, "$jobid Removing $c\n";
+			rmtree($c, {safe => 1, verbose => 0});
+			wlog DEBUG, "$jobid Removed $c\n";
+		}
+	}
+	
+	if (!ASYNC) {
+		if ($ec == 0) {
+			queueCmd((nullCB(), "JOBSTATUS", $jobid, COMPLETED, "0", ""));
+		}
+		else {
+			wlog DEBUG, "$jobid Sending failure.\n";
+			queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "$ec", "Job failed with and exit code of $ec"));
+		}
+	}
+}
+
+sub putFileCB {
+	my ($jobid) = @_;
+	return {
+		"jobid" => $jobid,
+		"dataIn" => \&putFileCBDataIn,
+		"dataSent" => \&putFileCBDataSent
+	};
+}
+
+sub putFileCBDataSent {
+	my ($state, $tag) = @_;
+	
+	if (ASYNC) {
+		wlog DEBUG, "putFileCBDataSent\n";
+		my $jobid = $$state{"jobid"};
+		if ($jobid != -1) {
+			wlog DEBUG, "Data sent, async is on. Staging out next file\n";
+			stageout($jobid);
+		}
+	}
+}
+
+sub putFileCBDataIn {
+	my ($state, $tag, $timeout, $err, $fin, $reply) = @_;
+	
+	wlog DEBUG, "putFileCBDataIn: $reply\n";
+	
+	my $jobid = $$state{"jobid"};
+	
+	if ($err || $timeout) {
+		if ($JOBDATA{$jobid}) {
+			wlog DEBUG, "Stage out failed ($reply)\n";
+			queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "515", "Stage out failed ($reply)"));
+			delete($JOBDATA{$jobid});
+		}
+		return;
+	}
+	elsif ($jobid != -1) {
+		if (!ASYNC) {
+			wlog DEBUG, "Stageout done; staging out next file\n";
+			stageout($jobid);
+		}
+	}
+}
+
+sub isabsolute {
+	my ($fn) = @_;
+	
+	return substr($fn, 0, 1) eq "/";
+}
+
+
+sub submitjob {
+	my ($tag, $timeout, $msgs) = @_;
+	my $desc = $$msgs[0];
+	my @lines = split(/\n/, $desc);
+	my $line;
+	my $JOBID = undef;
+	my %JOB = ();
+	my @JOBARGS = ();
+	my %JOBENV = ();
+	my @STAGEIN = ();
+	my @STAGEIND = ();
+	my @STAGEOUT = ();
+	my @STAGEOUTD = ();
+	my @CLEANUP = ();
+	foreach $line (@lines) {
+		$line =~ s/\\n/\n/;
+		$line =~ s/\\\\/\\/;
+		my @pair = split(/=/, $line, 2);
+		if ($pair[0] eq "arg") {
+			push @JOBARGS, $pair[1];
+		}
+		elsif ($pair[0] eq "env") {
+			my @ep = split(/=/, $pair[1], 2);
+			$JOBENV{"$ep[0]"} = $ep[1];
+		}
+		elsif ($pair[0] eq "identity") {
+			$JOBID = $pair[1];
+		}
+		elsif ($pair[0] eq "stagein") {
+			my @pp = split(/\n/, $pair[1], 2);
+			push @STAGEIN, $pp[0];
+			if (isabsolute($pp[1])) {
+				push @STAGEIND, $pp[1];
+			}
+			else {
+				# there's the assumption here that the directory is sent before
+				# the stagein/out data.
+				push @STAGEIND, $JOB{directory}."/".$pp[1];
+			}
+		}
+		elsif ($pair[0] eq "stageout") {
+			my @pp = split(/\n/, $pair[1], 2);
+			if (isabsolute($pp[0])) {
+				push @STAGEOUT, $pp[0];
+			}
+			else {
+				push @STAGEOUT, $JOB{directory}."/".$pp[0];
+			}
+			push @STAGEOUTD, $pp[1];
+		}
+		elsif ($pair[0] eq "cleanup") {
+			if (isabsolute($pair[1])) {
+				push @CLEANUP, $pair[1];
+			}
+			else {
+				push @CLEANUP, $JOB{directory}."/".$pair[1];
+			}
+		}
+		else {
+			$JOB{$pair[0]} = $pair[1];
+		}
+	}
+	if (checkJob($tag, $JOBID, \%JOB)) {
+		$JOBDATA{$JOBID} = {
+			stagein => \@STAGEIN,
+			stageind => \@STAGEIND,
+			stageindex => 0,
+			job => \%JOB,
+			jobargs => \@JOBARGS,
+			jobenv => \%JOBENV,
+			stageout => \@STAGEOUT,
+			stageoutd => \@STAGEOUTD,
+			cleanup => \@CLEANUP,
+		};
+		
+		stagein($JOBID);
+	}
+}
+
+sub checkJob() {
+	my ($tag, $JOBID, $JOB) = @_;
+	
+	wlog INFO, "$JOBID Job info received (tag=$tag)\n";
+	my $executable = $$JOB{"executable"};
+	if (!(defined $JOBID)) {
+		my $ds = hts($JOB);
+		
+		wlog DEBUG, "$JOBID Job details $ds\n";
+		
+		sendError($tag, ("Missing job identity"));
+		return 0;
+	}
+	elsif (!(defined $executable)) {
+		sendError($tag, ("Missing executable"));
+		return 0;
+	}
+	else {
+		my $dir = $$JOB{directory};
+		if (!defined $dir) {
+			$dir = ".";
+		}
+		my $dirlen = length($dir);
+		my $cleanup = $$JOB{"cleanup"};
+		my $c;
+		foreach $c (@$cleanup) {
+			if (substr($c, 0, $dirlen) ne $dir) {
+				sendError($tag, ("Cannot clean up outside of the job directory (cleanup: $c, jobdir: $dir)"));
+				return 0;
+			}
+		}
+		chdir $dir;
+		wlog DEBUG, "$JOBID Job check ok (dir: $dir)\n";
+		wlog INFO, "$JOBID Sending submit reply (tag=$tag)\n";
+		sendReply($tag, ("OK"));
+		wlog INFO, "$JOBID Submit reply sent (tag=$tag)\n";
+		return 1;
+	}
+}
+
+sub forkjob {
+	my ($JOBID) = @_;
+	my ($pid, $status);
+	
+	my $JOB = $JOBDATA{$JOBID}{"job"};
+	my $JOBARGS = $JOBDATA{$JOBID}{"jobargs"};
+	my $JOBENV = $JOBDATA{$JOBID}{"jobenv"};
+	my $WORKERPID = $$;
+
+	# allocate a jobslot here because we know we are starting exactly one job here
+	# if we verify that we dont have more stageins than slots taking place at once,
+	# we can move this to where the rest of the job options are set. Or we can place
+	# the slot in the JOBWAITDATA. (FIXME: remove when validated)
+
+	my $JOBSLOT = pop(@jobslots);
+	if( ! defined($JOBSLOT) ) {
+	    wlog DEBUG, "Job $JOBID has undefined jobslot\n";
+	}
+	$JOBDATA{$JOBID}{"jobslot"} = $JOBSLOT;
+	
+	pipe(PARENT_R, CHILD_W);
+	$pid = fork();
+	if (defined($pid)) {
+		if ($pid == 0) {
+			close PARENT_R;
+			runjob(\*CHILD_W, $JOB, $JOBARGS, $JOBENV, $JOBSLOT, $WORKERPID);
+			close CHILD_W;
+		}
+		else {
+			wlog DEBUG, "$JOBID Forked process $pid. Waiting for its completion\n";
+			close CHILD_W;
+			$JOBS_RUNNING++;
+			$JOBWAITDATA{$JOBID} = {
+				pid => $pid,
+				pipe => \*PARENT_R,
+			};
+		}
+	}
+	else {
+		queueCmd(nullCB(), "JOBSTATUS", $JOBID, FAILED, "512", "Could not fork child process");
+	}
+	$LASTRECV = time();
+}
+
+my $LASTJOBCHECK = 0;
+
+sub checkJobs {
+	my $time = time();
+	if ($time - $LASTJOBCHECK < 0.100) {
+		return;
+	}
+	$LASTJOBCHECK = $time;
+	if (!%JOBWAITDATA) {
+		return;
+	}
+	
+	wlog DEBUG, "Checking jobs status ($JOBS_RUNNING active)\n";
+	
+	my @DELETEIDS = ();
+	 
+	for my $JOBID (keys %JOBWAITDATA) {
+		if (checkJobStatus($JOBID)) {
+			push @DELETEIDS, $JOBID;
+		}
+	}
+	for my $i (@DELETEIDS) {
+		delete $JOBWAITDATA{$i};
+	}
+}
+
+sub checkJobStatus {
+	my ($JOBID) = @_;
+	
+	
+	my $pid = $JOBWAITDATA{$JOBID}{"pid"};
+	my $RD = $JOBWAITDATA{$JOBID}{"pipe"};
+	
+	my $tid;
+	my $status;
+	
+	wlog DEBUG, "$JOBID Checking pid $pid\n";
+	
+	$tid = waitpid($pid, &WNOHANG);
+	if ($tid != $pid) {
+		# not done
+		wlog DEBUG, "$JOBID Job $pid still running\n";
+		return 0;
+	}
+	else {
+		# exit code is in MSB and signal in LSB, so
+		# switch them such that status & 0xff is the
+		# exit code
+		$status = $? >> 8 + (($? & 0xff) << 8);
+	}
+
+	wlog DEBUG, "$JOBID Child process $pid terminated. Status is $status.\n";
+	my $s;
+	if (!eof($RD)) {
+		$s = <$RD>;
+	}
+	wlog DEBUG, "$JOBID Got output from child. Closing pipe.\n";
+	close $RD;
+	$JOBDATA{$JOBID}{"exitcode"} = $status;
+
+	my $JOBSLOT = $JOBDATA{$JOBID}{"jobslot"};
+	if ( defined $JOBSLOT ) {
+	    push @jobslots,$JOBSLOT; 
+	}
+
+	if (defined $s) {
+		queueCmd(nullCB(), "JOBSTATUS", $JOBID, FAILED, "$status", $s);
+	}
+	else {
+		#queueCmd(nullCB(), "JOBSTATUS", $JOBID, COMPLETED, "$status", "");
+		stageout($JOBID);
+	}
+	$JOB_COUNT++;
+	$JOBS_RUNNING--;
+	
+	return 1;
+}
+
+sub runjob {
+	my ($WR, $JOB, $JOBARGS, $JOBENV, $JOBSLOT, $WORKERPID) = @_;
+	my $executable = $$JOB{"executable"};
+	my $stdout = $$JOB{"stdout"};
+	my $stderr = $$JOB{"stderr"};
+
+	my $cwd = getcwd();
+	wlog DEBUG, "CWD: $cwd\n";
+	wlog DEBUG, "Running $executable\n";
+	if (defined $$JOB{directory}) {
+		wlog DEBUG, "Directory: $$JOB{directory}\n";
+	}
+	my $ename;
+	foreach $ename (keys %$JOBENV) {
+		$ENV{$ename} = $$JOBENV{$ename};
+	}
+	$ENV{"SWIFT_JOB_SLOT"} = $JOBSLOT;
+	$ENV{"SWIFT_WORKER_PID"} = $WORKERPID; 
+	wlog DEBUG, "Command: @$JOBARGS\n";
+	unshift @$JOBARGS, $executable;
+	if (defined $$JOB{directory}) {
+	    chdir $$JOB{directory};
+	}
+	if (defined $stdout) {
+		wlog DEBUG, "STDOUT: $stdout\n";
+		close STDOUT;
+		open STDOUT, ">$stdout" or die "Cannot redirect STDOUT";
+	}
+	if (defined $stderr) {
+		wlog DEBUG, "STDERR: $stderr\n";
+		close STDERR;
+		open STDERR, ">$stderr" or die "Cannot redirect STDERR";
+	}
+	close STDIN;
+	wlog DEBUG, "Command: @$JOBARGS\n";
+	exec { $executable } @$JOBARGS or print $WR "Could not execute $executable: $!\n";
+	die "Could not execute $executable: $!";
+}
+
+initlog();
+
+my $MSG="0";
+
+my $myhost=`hostname`;
+$myhost =~ s/\s+$//;
+
+wlog(INFO, "Running on node $myhost\n");
+# wlog(INFO, "New log name: $LOGNEW \n");
+
+init();
+
+mainloop();
+wlog INFO, "Worker finished. Exiting.\n";
+exit(0);
+
+# This file works well with cperl-mode in the latest emacs
+# Local Variables:
+# indent-tabs-mode: t
+# tab-width: 8
+# cperl-indent-level: 8
+# End:


Property changes on: SwiftApps/SwiftR/Swift/exec/worker.pl
___________________________________________________________________
Name: svn:executable
   + *




More information about the Swift-commit mailing list