[Swift-commit] Cog update

swift at ci.uchicago.edu swift at ci.uchicago.edu
Thu Apr 12 19:20:05 CDT 2012


------------------------------------------------------------------------
r3373 | hategan | 2012-04-12 19:17:39 -0500 (Thu, 12 Apr 2012) | 1 line

use non-blocking socket I/O in worker
------------------------------------------------------------------------
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl	(revision 3372)
+++ modules/provider-coaster/resources/worker.pl	(working copy)
@@ -11,6 +11,7 @@
 #
 
 use IO::Socket;
+use IO::Select;
 use File::Basename;
 use File::Path;
 use File::Copy;
@@ -109,10 +110,10 @@
 use constant MAX_RECONNECT_ATTEMPTS => 3;
 use constant NEVER => 9999999999;
 
-use constant JOB_CHECK_SKIP => 32;
+use constant JOB_CHECK_INTERVAL => 0.1; # seconds
 
 my $JOBS_RUNNING = 0;
-
+my $LAST_JOB_CHECK_TIME = 0;
 my $JOB_COUNT = 0;
 
 use constant BUFSZ => 2048;
@@ -178,6 +179,10 @@
 
 my %SUSPENDED_TRANSFERS = ();
 
+# partial message being sent when
+# writing to the socket would have blocked
+my %partialSend;
+
 # the structure of the above maps is (fields marked with "*" are optional):
 #	tag: [state, time]
 #
@@ -235,7 +240,7 @@
 	"WORKERSHELLCMD" => \&workershellcmd,
 );
 
-my @CMDQ = ();
+my @SENDQ = ();
 
 my @URIS = split(/,/, $URISTR);
 my @SCHEME;
@@ -452,19 +457,44 @@
 	return $arg;
 }
 
+sub sockSend {
+	my ($buf) = @_;
+	
+	my $start = time();
+	my $r = $SOCK->send($buf, 0);
+	if (!defined $r) {
+		if ($! == POSIX::EWOULDBLOCK) {
+			wlog(TRACE, "Send would block\n");
+			$r = 0;
+		}
+		else {
+			wlog(WARN, "Send failed: $!\n") and die "Send failed: $!";
+		}
+	}
+	my $diff = sprintf("%.8f", time() - $start);
+	
+	my $left = length($buf) - $r;
+	
+	wlog(DEBUG, "sent: $r, left: $left, time: $diff\n");
+
+	return $left;
+}
+
 sub sendm {
-	my ($tag, $flags, $msg) = @_;
+	my ($tag, $flags, $msg, $data) = @_;
 	my $len = length($msg);
 	my $buf = pack("VVVVV", $tag, $flags, $len, ($tag ^ $flags ^ $len), 0);
+	wlog(TRACE, "hdr: $buf\n");
 	$buf = $buf.$msg;
 
 	wlog(DEBUG, "OUT: len=$len, tag=$tag, flags=$flags\n");
 	wlog(TRACE, "$msg\n");
-
-	$SOCK->blocking(1);
-	eval { defined($SOCK->send($buf)); } or wlog(WARN, "Send failed: $!\n") and die "Send failed: $!";
-
-	#eval {defined($SOCK->send($buf))} or wlog(WARN, "Send failed: $!\n");
+	
+	my $msgBytesLeft = sockSend($buf);
+	if ($msgBytesLeft != 0) {
+		%partialSend = ("buf" => substr($buf, length($buf) - $msgBytesLeft), "data" => $data);
+	}
+	return $msgBytesLeft; 
 }
 
 sub sendFrags {
@@ -473,14 +503,34 @@
 	my $flg2;
 	my $msg;
 	my $yield;
+	my $msgBytesLeft;
 
 	do {
 		($flg2, $msg, $yield) = $$data{"nextData"}($data);
 		if (defined($msg)) {
-			sendm($tag, $flg | $flg2, $msg);
+			$msgBytesLeft = sendm($tag, $flg | $flg2, $msg, $data);
+			if ($msgBytesLeft != 0) {
+				$partialSend{"tag"} = $tag;
+				$partialSend{"flg2"} = $flg2;				
+				$yield = 1;
+			}
 		}
 	} while (($flg2 & FINAL_FLAG) == 0 && !$yield);
+	
+	if ($msgBytesLeft == 0) {
+		sendmDone($tag, $flg2, $data);
+		# would not block
+		return 0;
+	}
+	else {
+		# would block
+		return 1;
+	}
+}
 
+sub sendmDone {
+	my ($tag, $flg2, $data) = @_;
+	
 	if (($flg2 & FINAL_FLAG) == 0) {
 		# final flag not set; put it back in the queue
 		wlog TRACE, "$tag yielding\n";
@@ -602,46 +652,62 @@
 	};
 }
 
-
-sub sendCmdInt {
-	my ($cont, $state) = @_;
-	my $ctag = $$state{"tag"};
-	if (!defined $ctag) {
-		$ctag =  $TAG++;
-		registerCmd($ctag, $cont);
-		# make the tag accessible to the data generator
-		$$state{"tag"} = $ctag;
+sub sendInternal {
+	my ($tag, $cont, $flags, $state) = @_;
+	
+	if (!defined $tag) {
+		$tag = $$state{"tag"};
+		if (!defined $tag) {
+			$tag =  $TAG++;
+			registerCmd($tag, $cont);
+			# make the tag accessible to the data generator
+			$$state{"tag"} = $tag;
+		}
 	}
-	sendFrags($ctag, 0, $state);
-	return $ctag;
+	return sendFrags($tag, $flags, $state);
 }
 
-sub sendCmd {
-	my @cmd = @_;
-	my $cont = shift(@cmd);
-	return sendCmdInt($cont, arrayData(@cmd));
+sub resumeSend {
+	if (%partialSend) {
+		wlog(DEBUG, "Resuming partial send\n");
+		my $buf = $partialSend{"buf"};
+		my $msgBytesLeft = sockSend($buf);
+		if ($msgBytesLeft != 0) {
+			$partialSend{"buf"} = substr($buf, length($buf) - $msgBytesLeft);
+			return 1; 
+		}
+		else {
+			sendmDone($partialSend{"tag"}, $partialSend{"flg2"}, $partialSend{"data"});
+			undef %partialSend;
+			return 0;
+		}
+	}
+	else {
+		# i.e. would not block
+		return 0;
+	}
 }
 
 sub queueCmd {
 	my @cmd = @_;
 	my $cont = shift(@cmd);
 	# $cont is the continuation (what gets called when a reply is received)
-	push @CMDQ, [$cont, arrayData(@cmd)];
+	push @SENDQ, [undef, $cont, 0, arrayData(@cmd)];
 }
 
 sub queueCmdCustomDataHandling {
 	my ($cont, $state) = @_;
-	push @CMDQ, [$cont, $state];
+	push @SENDQ, [undef, $cont, 0, $state];
 }
 
-sub sendReply {
+sub queueReply {
 	my ($tag, @msgs) = @_;
-	sendFrags($tag, REPLY_FLAG, arrayData(@msgs));
+	push @SENDQ, [$tag, undef, REPLY_FLAG, arrayData(@msgs)];
 }
 
-sub sendError {
+sub queueError {
 	my ($tag, @msgs) = @_;
-	sendFrags($tag, REPLY_FLAG | ERROR_FLAG, arrayData(@msgs));
+	push @SENDQ, [$tag, undef, REPLY_FLAG | ERROR_FLAG, arrayData(@msgs)];
 }
 
 sub unpackData {
@@ -693,7 +759,7 @@
 	push(@$request, $msg);
 
 	if ($timeout) {
-		sendError($tag, ("Timed out waiting for all fragments"));
+		queueError($tag, ("Timed out waiting for all fragments"));
 	}
 	elsif (!($flags & FINAL_FLAG)) {
 		return;
@@ -706,7 +772,7 @@
 			$HANDLERS{$cmd}->($tag, 0, $request);
 		}
 		else {
-			sendError($tag, ("Unknown command: $cmd"));
+			queueError($tag, ("Unknown command: $cmd"));
 		}
 	}
 }
@@ -802,7 +868,6 @@
 
 sub recvOne {
 	my $buf;
-	$SOCK->blocking(0);
 	$SOCK->recv($buf, 20 - length($DATA));
 	if (length($buf) > 0) {
 		$DATA = $DATA . $buf;
@@ -814,8 +879,6 @@
 		}
 	}
 	else {
-		#sleep 1ms
-		select(undef, undef, undef, 0.001);
 		checkTimeouts();
 	}
 }
@@ -830,26 +893,63 @@
 
 
 sub mainloop {
+	my $r = new IO::Select();
+	$r->add($SOCK);
 	while(1) {
-		loopOne();
+		loopOne($r);
 	}
 }
 
-sub loopOne {
-	my $cmd;
+sub checkHeartbeat {
 	if (time() - $LAST_HEARTBEAT > HEARTBEAT_INTERVAL) {
 		queueCmd(heartbeatCB(), "HEARTBEAT");
 		$LAST_HEARTBEAT = time();
 	}
-	# send whatever is now queued; don't clear the queue, since
-	# things may be added to it while stuff is being sent
-	my $sz = scalar(@CMDQ);
-	for (my $i = 0; $i < $sz; $i++)  {
-		$cmd = shift(@CMDQ);
-		sendCmdInt(@$cmd);
+}
+
+sub loopOne {
+	my ($r) = @_;
+	my ($rset, $wset);
+	
+	checkHeartbeat();
+	checkJobs();
+	
+	if (@SENDQ) {
+		# if there are commands to send, don't just wait for data
+		# to read from the socket
+		($rset, $wset) = IO::Select->select($r, $r, undef, 0.1);
 	}
-	checkJobs();
-	recvOne();
+	else {
+		($rset, $wset) = IO::Select->select($r, undef, undef, 0.1);
+	}
+	if ($rset) {
+		# can read
+		wlog(DEBUG, "Can read\n");
+		recvOne();
+	}
+	
+	if ($wset) {
+		# can write
+		wlog(DEBUG, "Can write\n");
+		my $wouldBlock;
+		# if last write didn't finish, try to finish it now
+		$wouldBlock = resumeSend();
+		
+		if (!$wouldBlock) {
+			my $cmd;
+			# send whatever is now queued; don't clear the queue, since
+			# things may be added to it while stuff is being sent
+			my $sz = scalar(@SENDQ);
+			wlog(DEBUG, "SENDQ size: $sz\n");
+			for (my $i = 0; $i < $sz; $i++)  {
+				$cmd = shift(@SENDQ);
+				$wouldBlock = sendInternal(@$cmd);
+				if ($wouldBlock) {
+					last;
+				}
+			}
+		}
+	}
 }
 
 sub printreply {
@@ -916,7 +1016,7 @@
 
 sub register {
 	my ($tag, $timeout, $reply) = @_;
-	sendReply($tag, ("OK"));
+	queueReply($tag, ("OK"));
 }
 
 sub writeprofile {
@@ -936,7 +1036,7 @@
 sub shutdownw {
 	my ($tag, $timeout, $msgs) = @_;
 	wlog DEBUG, "Shutdown command received\n";
-	sendReply($tag, ("OK"));
+	queueReply($tag, ("OK"));
 	wlog INFO, "Acknowledged shutdown.\n";
 	wlog INFO, "Ran a total of $JOB_COUNT jobs\n";
 	if ($PROFILE) {
@@ -953,7 +1053,7 @@
 	my ($tag, $timeout, $msgs) = @_;
 	$LAST_HEARTBEAT = time();
 	my $msg = int(time() * 1000);
-	sendReply($tag, ("$msg"));
+	queueReply($tag, ("$msg"));
 }
 
 sub workershellcmd {
@@ -964,22 +1064,22 @@
 		wlog DEBUG, "chdir $1\n";
 		chdir $1;
 		if ($! ne '') {
-			sendError($tag, ("$!"));
+			queueError($tag, ("$!"));
 		}
 		else {
-			sendReply($tag, ("OK", ""));
+			queueReply($tag, ("OK", ""));
 		}
 	}
 	elsif ($cmd =~ m/mls\s*(.*)/) {
 		wlog DEBUG, "mls $1\n";
 		$out = `ls -d $1 2>/dev/null`;
-		sendReply($tag, ("OK", "$out"));
+		queueReply($tag, ("OK", "$out"));
 	}
 	else {
 		wlog DEBUG, "workershellcmd: $cmd\n";
 		$out = `$cmd 2>&1`;
 		wlog TRACE, "result: $out\n";
-		sendReply($tag, ("OK", "$out"));
+		queueReply($tag, ("OK", "$out"));
 	}
 }
 
@@ -1164,12 +1264,12 @@
 	if (scalar @$STAGE <= $STAGEINDEX) {
 		wlog DEBUG, "$jobid Done staging in files ($STAGEINDEX, $STAGE)\n";
 		$JOBDATA{$jobid}{"stageindex"} = 0;
-		sendCmd((nullCB(), "JOBSTATUS", $jobid, ACTIVE, "0", "workerid=$ID"));
+		queueCmd((nullCB(), "JOBSTATUS", $jobid, ACTIVE, "0", "workerid=$ID"));
 		forkjob($jobid);
 	}
 	else {
 		if ($STAGEINDEX == 0) {
-			sendCmd((nullCB(), "JOBSTATUS", $jobid, STAGEIN, "0", "workerid=$ID"));
+			queueCmd((nullCB(), "JOBSTATUS", $jobid, STAGEIN, "0", "workerid=$ID"));
 		}
 		wlog DEBUG, "$jobid Staging in $$STAGE[$STAGEINDEX]\n";
 		$JOBDATA{$jobid}{"stageindex"} =  $STAGEINDEX + 1;
@@ -1209,7 +1309,7 @@
 		queueCmd((nullCB(), "JOBSTATUS", $jobid, FAILED, "524", "$@"));
 	}
 	else {
-		sendCmd(($state, "GET", $src, $dst));
+		queueCmd(($state, "GET", $src, $dst));
 	}
 }
 
@@ -1325,7 +1425,7 @@
 		if (!$skip) {
 			if (!defined($JOBDATA{$jobid}{"stagoutStatusSent"})) {
 				wlog DEBUG, "$jobid Sending STAGEOUT status\n";
-				sendCmd((nullCB(), "JOBSTATUS", $jobid, STAGEOUT, "0", "workerid=$ID"));
+				queueCmd((nullCB(), "JOBSTATUS", $jobid, STAGEOUT, "0", "workerid=$ID"));
 				$JOBDATA{$jobid}{"jobStatusSent"} = 1;
 			}
 			my $rfile = $$STAGED[$STAGEINDEX];
@@ -1597,11 +1697,11 @@
 
 		wlog DEBUG, "$JOBID Job details $ds\n";
 
-		sendError($tag, ("Missing job identity"));
+		queueError($tag, ("Missing job identity"));
 		return 0;
 	}
 	elsif (!(defined $executable)) {
-		sendError($tag, ("Missing executable"));
+		queueError($tag, ("Missing executable"));
 		return 0;
 	}
 	else {
@@ -1614,14 +1714,14 @@
 		my $c;
 		foreach $c (@$cleanup) {
 			if (substr($c, 0, $dirlen) ne $dir) {
-				sendError($tag, ("Cannot clean up outside of the job directory (cleanup: $c, jobdir: $dir)"));
+				queueError($tag, ("Cannot clean up outside of the job directory (cleanup: $c, jobdir: $dir)"));
 				return 0;
 			}
 		}
 		chdir $dir;
 		wlog DEBUG, "$JOBID Job check ok (dir: $dir)\n";
 		wlog DEBUG, "$JOBID Sending submit reply (tag=$tag)\n";
-		sendReply($tag, ("OK"));
+		queueReply($tag, ("OK"));
 		wlog DEBUG, "$JOBID Submit reply sent (tag=$tag)\n";
 		return 1;
 	}
@@ -1679,7 +1779,13 @@
 my $JOBCHECKCOUNT = 0;
 
 sub checkJobs {
-	$JOBCHECKCOUNT = ($JOBCHECKCOUNT + 1) % JOB_CHECK_SKIP;
+	my $now = time();
+	
+	if ($now - $LAST_JOB_CHECK_TIME < JOB_CHECK_INTERVAL) {
+		return;
+	} 
+	$LAST_JOB_CHECK_TIME = $now;
+	
 	if ($JOBCHECKCOUNT != 0) {
 		return;
 	}



More information about the Swift-commit mailing list