[Swift-commit] Cog update
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Thu Apr 12 19:20:05 CDT 2012
------------------------------------------------------------------------
r3373 | hategan | 2012-04-12 19:17:39 -0500 (Thu, 12 Apr 2012) | 1 line
use non-blocking socket I/O in worker
------------------------------------------------------------------------
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl (revision 3372)
+++ modules/provider-coaster/resources/worker.pl (working copy)
@@ -11,6 +11,7 @@
#
use IO::Socket;
+use IO::Select;
use File::Basename;
use File::Path;
use File::Copy;
@@ -109,10 +110,10 @@
use constant MAX_RECONNECT_ATTEMPTS => 3;
use constant NEVER => 9999999999;
-use constant JOB_CHECK_SKIP => 32;
+use constant JOB_CHECK_INTERVAL => 0.1; # seconds
my $JOBS_RUNNING = 0;
-
+my $LAST_JOB_CHECK_TIME = 0;
my $JOB_COUNT = 0;
use constant BUFSZ => 2048;
@@ -178,6 +179,10 @@
my %SUSPENDED_TRANSFERS = ();
+# partial message being sent when
+# writing to the socket would have blocked
+my %partialSend;
+
# the structure of the above maps is (fields marked with "*" are optional):
# tag: [state, time]
#
@@ -235,7 +240,7 @@
"WORKERSHELLCMD" => \&workershellcmd,
);
-my @CMDQ = ();
+my @SENDQ = ();
my @URIS = split(/,/, $URISTR);
my @SCHEME;
@@ -452,19 +457,44 @@
return $arg;
}
+sub sockSend {
+ my ($buf) = @_;
+
+ my $start = time();
+ my $r = $SOCK->send($buf, 0);
+ if (!defined $r) {
+ if ($! == POSIX::EWOULDBLOCK) {
+ wlog(TRACE, "Send would block\n");
+ $r = 0;
+ }
+ else {
+ wlog(WARN, "Send failed: $!\n") and die "Send failed: $!";
+ }
+ }
+ my $diff = sprintf("%.8f", time() - $start);
+
+ my $left = length($buf) - $r;
+
+ wlog(DEBUG, "sent: $r, left: $left, time: $diff\n");
+
+ return $left;
+}
+
sub sendm {
- my ($tag, $flags, $msg) = @_;
+ my ($tag, $flags, $msg, $data) = @_;
my $len = length($msg);
my $buf = pack("VVVVV", $tag, $flags, $len, ($tag ^ $flags ^ $len), 0);
+ wlog(TRACE, "hdr: $buf\n");
$buf = $buf.$msg;
wlog(DEBUG, "OUT: len=$len, tag=$tag, flags=$flags\n");
wlog(TRACE, "$msg\n");
-
- $SOCK->blocking(1);
- eval { defined($SOCK->send($buf)); } or wlog(WARN, "Send failed: $!\n") and die "Send failed: $!";
-
- #eval {defined($SOCK->send($buf))} or wlog(WARN, "Send failed: $!\n");
+
+ my $msgBytesLeft = sockSend($buf);
+ if ($msgBytesLeft != 0) {
+ %partialSend = ("buf" => substr($buf, length($buf) - $msgBytesLeft), "data" => $data);
+ }
+ return $msgBytesLeft;
}
sub sendFrags {
@@ -473,14 +503,34 @@
my $flg2;
my $msg;
my $yield;
+ my $msgBytesLeft;
do {
($flg2, $msg, $yield) = $$data{"nextData"}($data);
if (defined($msg)) {
- sendm($tag, $flg | $flg2, $msg);
+ $msgBytesLeft = sendm($tag, $flg | $flg2, $msg, $data);
+ if ($msgBytesLeft != 0) {
+ $partialSend{"tag"} = $tag;
+ $partialSend{"flg2"} = $flg2;
+ $yield = 1;
+ }
}
} while (($flg2 & FINAL_FLAG) == 0 && !$yield);
+
+ if ($msgBytesLeft == 0) {
+ sendmDone($tag, $flg2, $data);
+ # would not block
+ return 0;
+ }
+ else {
+ # would block
+ return 1;
+ }
+}
+sub sendmDone {
+ my ($tag, $flg2, $data) = @_;
+
if (($flg2 & FINAL_FLAG) == 0) {
# final flag not set; put it back in the queue
wlog TRACE, "$tag yielding\n";
@@ -602,46 +652,62 @@
};
}
-
-sub sendCmdInt {
- my ($cont, $state) = @_;
- my $ctag = $$state{"tag"};
- if (!defined $ctag) {
- $ctag = $TAG++;
- registerCmd($ctag, $cont);
- # make the tag accessible to the data generator
- $$state{"tag"} = $ctag;
+sub sendInternal {
+ my ($tag, $cont, $flags, $state) = @_;
+
+ if (!defined $tag) {
+ $tag = $$state{"tag"};
+ if (!defined $tag) {
+ $tag = $TAG++;
+ registerCmd($tag, $cont);
+ # make the tag accessible to the data generator
+ $$state{"tag"} = $tag;
+ }
}
- sendFrags($ctag, 0, $state);
- return $ctag;
+ return sendFrags($tag, $flags, $state);
}
-sub sendCmd {
- my @cmd = @_;
- my $cont = shift(@cmd);
- return sendCmdInt($cont, arrayData(@cmd));
+sub resumeSend {
+ if (%partialSend) {
+ wlog(DEBUG, "Resuming partial send\n");
+ my $buf = $partialSend{"buf"};
+ my $msgBytesLeft = sockSend($buf);
+ if ($msgBytesLeft != 0) {
+ $partialSend{"buf"} = substr($buf, length($buf) - $msgBytesLeft);
+ return 1;
+ }
+ else {
+ sendmDone($partialSend{"tag"}, $partialSend{"flg2"}, $partialSend{"data"});
+ undef %partialSend;
+ return 0;
+ }
+ }
+ else {
+ # i.e. would not block
+ return 0;
+ }
}
sub queueCmd {
my @cmd = @_;
my $cont = shift(@cmd);
# $cont is the continuation (what gets called when a reply is received)
- push @CMDQ, [$cont, arrayData(@cmd)];
+ push @SENDQ, [undef, $cont, 0, arrayData(@cmd)];
}
sub queueCmdCustomDataHandling {
my ($cont, $state) = @_;
- push @CMDQ, [$cont, $state];
+ push @SENDQ, [undef, $cont, 0, $state];
}
-sub sendReply {
+sub queueReply {
my ($tag, @msgs) = @_;
- sendFrags($tag, REPLY_FLAG, arrayData(@msgs));
+ push @SENDQ, [$tag, undef, REPLY_FLAG, arrayData(@msgs)];
}
-sub sendError {
+sub queueError {
my ($tag, @msgs) = @_;
- sendFrags($tag, REPLY_FLAG | ERROR_FLAG, arrayData(@msgs));
+ push @SENDQ, [$tag, undef, REPLY_FLAG | ERROR_FLAG, arrayData(@msgs)];
}
sub unpackData {
@@ -693,7 +759,7 @@
push(@$request, $msg);
if ($timeout) {
- sendError($tag, ("Timed out waiting for all fragments"));
+ queueError($tag, ("Timed out waiting for all fragments"));
}
elsif (!($flags & FINAL_FLAG)) {
return;
@@ -706,7 +772,7 @@
$HANDLERS{$cmd}->($tag, 0, $request);
}
else {
- sendError($tag, ("Unknown command: $cmd"));
+ queueError($tag, ("Unknown command: $cmd"));
}
}
}
@@ -802,7 +868,6 @@
sub recvOne {
my $buf;
- $SOCK->blocking(0);
$SOCK->recv($buf, 20 - length($DATA));
if (length($buf) > 0) {
$DATA = $DATA . $buf;
@@ -814,8 +879,6 @@
}
}
else {
- #sleep 1ms
- select(undef, undef, undef, 0.001);
checkTimeouts();
}
}
@@ -830,26 +893,63 @@
sub mainloop {
+ my $r = new IO::Select();
+ $r->add($SOCK);
while(1) {
- loopOne();
+ loopOne($r);
}
}
-sub loopOne {
- my $cmd;
+sub checkHeartbeat {
if (time() - $LAST_HEARTBEAT > HEARTBEAT_INTERVAL) {
queueCmd(heartbeatCB(), "HEARTBEAT");
$LAST_HEARTBEAT = time();
}
- # send whatever is now queued; don't clear the queue, since
- # things may be added to it while stuff is being sent
- my $sz = scalar(@CMDQ);
- for (my $i = 0; $i < $sz; $i++) {
- $cmd = shift(@CMDQ);
- sendCmdInt(@$cmd);
+}
+
+sub loopOne {
+ my ($r) = @_;
+ my ($rset, $wset);
+
+ checkHeartbeat();
+ checkJobs();
+
+ if (@SENDQ) {
+ # if there are commands to send, don't just wait for data
+ # to read from the socket
+ ($rset, $wset) = IO::Select->select($r, $r, undef, 0.1);
}
- checkJobs();
- recvOne();
+ else {
+ ($rset, $wset) = IO::Select->select($r, undef, undef, 0.1);
+ }
+ if ($rset) {
+ # can read
+ wlog(DEBUG, "Can read\n");
+ recvOne();
+ }
+
+ if ($wset) {
+ # can write
+ wlog(DEBUG, "Can write\n");
+ my $wouldBlock;
+ # if last write didn't finish, try to finish it now
+ $wouldBlock = resumeSend();
+
+ if (!$wouldBlock) {
+ my $cmd;
+ # send whatever is now queued; don't clear the queue, since
+ # things may be added to it while stuff is being sent
+ my $sz = scalar(@SENDQ);
+ wlog(DEBUG, "SENDQ size: $sz\n");
+ for (my $i = 0; $i < $sz; $i++) {
+ $cmd = shift(@SENDQ);
+ $wouldBlock = sendInternal(@$cmd);
+ if ($wouldBlock) {
+ last;
+ }
+ }
+ }
+ }
}
sub printreply {
@@ -916,7 +1016,7 @@
sub register {
my ($tag, $timeout, $reply) = @_;
- sendReply($tag, ("OK"));
+ queueReply($tag, ("OK"));
}
sub writeprofile {
@@ -936,7 +1036,7 @@
sub shutdownw {
my ($tag, $timeout, $msgs) = @_;
wlog DEBUG, "Shutdown command received\n";
- sendReply($tag, ("OK"));
+ queueReply($tag, ("OK"));
wlog INFO, "Acknowledged shutdown.\n";
wlog INFO, "Ran a total of $JOB_COUNT jobs\n";
if ($PROFILE) {
@@ -953,7 +1053,7 @@
my ($tag, $timeout, $msgs) = @_;
$LAST_HEARTBEAT = time();
my $msg = int(time() * 1000);
- sendReply($tag, ("$msg"));
+ queueReply($tag, ("$msg"));
}
sub workershellcmd {
@@ -964,22 +1064,22 @@
wlog DEBUG, "chdir $1\n";
chdir $1;
if ($! ne '') {
- sendError($tag, ("$!"));
+ queueError($tag, ("$!"));
}
else {
- sendReply($tag, ("OK", ""));
+ queueReply($tag, ("OK", ""));
}
}
elsif ($cmd =~ m/mls\s*(.*)/) {
wlog DEBUG, "mls $1\n";
$out = `ls -d $1 2>/dev/null`;
- sendReply($tag, ("OK", "$out"));
+ queueReply($tag, ("OK", "$out"));
}
else {
wlog DEBUG, "workershellcmd: $cmd\n";
$out = `$cmd 2>&1`;
wlog TRACE, "result: $out\n";
- sendReply($tag, ("OK", "$out"));
+ queueReply($tag, ("OK", "$out"));
}
}
@@ -1164,12 +1264,12 @@
if (scalar @$STAGE <= $STAGEINDEX) {
wlog DEBUG, "$jobid Done staging in files ($STAGEINDEX, $STAGE)\n";
$JOBDATA{$jobid}{"stageindex"} = 0;
- sendCmd((nullCB(), "JOBSTATUS", $jobid, ACTIVE, "0", "workerid=$ID"));
+ queueCmd((nullCB(), "JOBSTATUS", $jobid, ACTIVE, "0", "workerid=$ID"));
forkjob($jobid);
}
else {
if ($STAGEINDEX == 0) {
- sendCmd((nullCB(), "JOBSTATUS", $jobid, STAGEIN, "0", "workerid=$ID"));
+ queueCmd((nullCB(), "JOBSTATUS", $jobid, STAGEIN, "0", "workerid=$ID"));
}
wlog DEBUG, "$jobid Staging in $$STAGE[$STAGEINDEX]\n";
$JOBDATA{$jobid}{"stageindex"} = $STAGEINDEX + 1;
@@ -1209,7 +1309,7 @@
queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "524", "$@"));
}
else {
- sendCmd(($state, "GET", $src, $dst));
+ queueCmd(($state, "GET", $src, $dst));
}
}
@@ -1325,7 +1425,7 @@
if (!$skip) {
if (!defined($JOBDATA{$jobid}{"stagoutStatusSent"})) {
wlog DEBUG, "$jobid Sending STAGEOUT status\n";
- sendCmd((nullCB(), "JOBSTATUS", $jobid, STAGEOUT, "0", "workerid=$ID"));
+ queueCmd((nullCB(), "JOBSTATUS", $jobid, STAGEOUT, "0", "workerid=$ID"));
$JOBDATA{$jobid}{"jobStatusSent"} = 1;
}
my $rfile = $$STAGED[$STAGEINDEX];
@@ -1597,11 +1697,11 @@
wlog DEBUG, "$JOBID Job details $ds\n";
- sendError($tag, ("Missing job identity"));
+ queueError($tag, ("Missing job identity"));
return 0;
}
elsif (!(defined $executable)) {
- sendError($tag, ("Missing executable"));
+ queueError($tag, ("Missing executable"));
return 0;
}
else {
@@ -1614,14 +1714,14 @@
my $c;
foreach $c (@$cleanup) {
if (substr($c, 0, $dirlen) ne $dir) {
- sendError($tag, ("Cannot clean up outside of the job directory (cleanup: $c, jobdir: $dir)"));
+ queueError($tag, ("Cannot clean up outside of the job directory (cleanup: $c, jobdir: $dir)"));
return 0;
}
}
chdir $dir;
wlog DEBUG, "$JOBID Job check ok (dir: $dir)\n";
wlog DEBUG, "$JOBID Sending submit reply (tag=$tag)\n";
- sendReply($tag, ("OK"));
+ queueReply($tag, ("OK"));
wlog DEBUG, "$JOBID Submit reply sent (tag=$tag)\n";
return 1;
}
@@ -1679,7 +1779,13 @@
my $JOBCHECKCOUNT = 0;
sub checkJobs {
- $JOBCHECKCOUNT = ($JOBCHECKCOUNT + 1) % JOB_CHECK_SKIP;
+ my $now = time();
+
+ if ($now - $LAST_JOB_CHECK_TIME < JOB_CHECK_INTERVAL) {
+ return;
+ }
+ $LAST_JOB_CHECK_TIME = $now;
+
if ($JOBCHECKCOUNT != 0) {
return;
}
More information about the Swift-commit
mailing list