[Swift-commit] Cog update
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Thu Feb 9 14:10:08 CST 2012
------------------------------------------------------------------------
r3360 | jmwozniak | 2012-02-09 14:09:35 -0600 (Thu, 09 Feb 2012) | 1 line
New worker_init_cmd()
------------------------------------------------------------------------
Index: modules/provider-coaster/.classpath
===================================================================
--- modules/provider-coaster/.classpath (revision 3359)
+++ modules/provider-coaster/.classpath (working copy)
@@ -14,5 +14,6 @@
<classpathentry kind="src" path="/abstraction-provider-local"/>
<classpathentry kind="src" path="/util"/>
<classpathentry kind="con" path="org.eclipse.jdt.USER_LIBRARY/log4j"/>
+ <classpathentry kind="lib" path="/home/wozniak/proj/cog/lib/log4j-1.2.16.jar"/>
<classpathentry kind="output" path=".build"/>
</classpath>
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl (revision 3359)
+++ modules/provider-coaster/resources/worker.pl (working copy)
@@ -189,7 +189,7 @@
# flags: the protocol flags to send (e.g. err, fin)
# data: the actual data
# yieldFlag: if CONTINUE then it instructs the sending procedure
-# to loop sending data until YIELD is returned
+# to loop sending data until YIELD is returned
#
# dataSent: proc(state, tag) - invoked when all data was sent
# PUT file specific state:
@@ -219,9 +219,9 @@
# state when sending array data:
# index: the current index in the data array
# data: an array containing the data chunks
-#
-#
-#
+#
+#
+#
# time: last communication time (used to determine timeouts)
#
@@ -403,6 +403,13 @@
push(@PROFILE_EVENTS, "START", "N/A", time());
}
logsetup();
+ if (defined $ENV{"WORKER_COPIES"}) {
+ workerCopies($ENV{"WORKER_COPIES"});
+ }
+ if(defined $ENV{"WORKER_INIT_CMD"}) {
+ worker_init_cmd($ENV{"WORKER_INIT_CMD"});
+ }
+
reconnect();
}
@@ -431,6 +438,13 @@
}
}
+sub worker_init_cmd {
+ my ($cmd) = @_;
+ wlog DEBUG, "worker_init_cmd: $cmd\n";
+ my $rc = system($cmd);
+ print "rc: $rc\n";
+}
+
sub trim {
my ($arg) = @_;
$arg =~ s/^\s+|\s+$//g ;
@@ -458,7 +472,7 @@
my $flg2;
my $msg;
my $yield;
-
+
do {
($flg2, $msg, $yield) = $$data{"nextData"}($data);
if (defined($msg)) {
@@ -469,11 +483,11 @@
if (($flg2 & FINAL_FLAG) == 0) {
# final flag not set; put it back in the queue
wlog TRACE, "$tag yielding\n";
-
+
# update last time
my $record = $REPLIES{$tag};
$$record[1] = time();
-
+
queueCmdCustomDataHandling($REPLIES{$tag}, $data);
}
else {
@@ -512,11 +526,11 @@
my ($state) = @_;
my $s = $$state{"state"};
-
+
my $tag = $$state{"tag"};
-
+
wlog TRACE, "$tag nextFileData state=$s\n";
-
+
if ($s == PUT_START) {
$$state{"state"} = $s + 1;
return (0, $$state{"cmd"}, CONTINUE);
@@ -540,7 +554,7 @@
wlog TRACE, "$tag Transfer suspendend; yielding\n";
return (0, undef, YIELD);
}
-
+
my $handle = $$state{"handle"};
my $buffer;
my $sz = read($handle, $buffer, IOBUFSZ);
@@ -642,14 +656,14 @@
my $len = unpack("V", substr($data, 8, 4));
my $hcsum = unpack("V", substr($data, 12, 4));
my $csum = unpack("V", substr($data, 16, 4));
-
+
my $chcsum = ($tag ^ $flg ^ $len);
-
+
if ($chcsum != $hcsum) {
wlog WARN, "Header checksum failed. Computed checksum: $chcsum, checksum: $hcsum\n";
return;
}
-
+
my $msg;
my $frag;
my $alen = 0;
@@ -658,7 +672,7 @@
$alen = $alen + length($frag);
$msg = $msg.$frag;
}
-
+
my $actuallen = length($msg);
wlog(TRACE, " IN: len=$len, actuallen=$actuallen, tag=$tag, flags=$flg, $msg\n");
if ($len != $actuallen) {
@@ -707,7 +721,7 @@
if (exists($REPLIES{$tag})) {
$record = $REPLIES{$tag};
($cont, $lastTime) = ($$record[0], $$record[1]);
- # update last time
+ # update last time
$$record[1] = time();
}
else {
@@ -1068,7 +1082,7 @@
my $jobid = $$state{"jobid"};
my $len = length($reply);
wlog DEBUG, "$jobid getFileCBDataIn jobid: $jobid, state: $s, tag: $tag, flags: $flags, len: $len\n";
-
+
if ($flags & SIGNAL_FLAG) {
if ($reply eq "QUEUED") {
$REPLIES{$tag}[1] = NEVER;
@@ -1323,8 +1337,8 @@
$JOBDATA{$jobid}{"stageoutCount"} = 0;
}
$JOBDATA{$jobid}{"stageoutCount"} += 1;
- wlog DEBUG, "$jobid Stagecount is $JOBDATA{$jobid}{stageoutCount}\n";
-
+ wlog DEBUG, "$jobid Stagecount is $JOBDATA{$jobid}{stageoutCount}\n";
+
queueCmdCustomDataHandling(putFileCB($jobid), fileData("PUT", $lfile, $rfile));
}
elsif ($protocol eq "sfs") {
@@ -1361,9 +1375,9 @@
sub sendStatus {
my ($jobid) = @_;
-
+
my $ec = $JOBDATA{$jobid}{"exitcode"};
-
+
if ($ec == 0) {
queueCmd((nullCB(), "JOBSTATUS", $jobid, COMPLETED, "0", ""));
}
@@ -1385,7 +1399,7 @@
else {
# there were stageouts. Wait until all are acknowledged
# as done by the client. And we keep track of the
- # count of stageouts that weren't acknowledged in
+ # count of stageouts that weren't acknowledged in
# $JOBDATA{$jobid}{"stageoutCount"}
}
}
@@ -1436,7 +1450,7 @@
if (ASYNC) {
wlog DEBUG, "$tag putFileCBDataSent\n";
my $jobid = $$state{"jobid"};
- if ($jobid != -1) {
+ if ($jobid != -1) {
wlog DEBUG, "$tag Data sent, async is on. Staging out next file\n";
stageout($jobid);
}
@@ -1459,7 +1473,7 @@
return;
}
elsif ($reply eq "STOP") {
- $SUSPENDED_TRANSFERS{"$tag"} = 1;
+ $SUSPENDED_TRANSFERS{"$tag"} = 1;
wlog DEBUG, "$tag Got stop request. Suspending transfer.\n";
}
elsif ($reply eq "CONTINUE") {
@@ -1634,7 +1648,7 @@
my ($PARENT_R, $CHILD_W);
pipe($PARENT_R, $CHILD_W);
-
+
$pid = fork();
if (defined($pid)) {
@@ -1788,10 +1802,6 @@
init();
-if (defined $ENV{"WORKER_COPIES"}) {
- workerCopies($ENV{"WORKER_COPIES"});
-}
-
mainloop();
# Code may not reach this point - see shutdownw()
More information about the Swift-commit
mailing list