[Swift-commit] cog r3762

swift at ci.uchicago.edu swift at ci.uchicago.edu
Tue Aug 13 15:55:03 CDT 2013


------------------------------------------------------------------------
r3762 | hategan | 2013-08-13 15:54:48 -0500 (Tue, 13 Aug 2013) | 1 line

use signals to forward output of long runnning commands in the shell
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/WorkerShellHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/WorkerShellHandler.java	(revision 3761)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/WorkerShellHandler.java	(working copy)
@@ -28,7 +28,12 @@
     public void requestComplete() throws ProtocolException {
         String workerId = getInDataAsString(0);
         String command = getInDataAsString(1);
-        WorkerShellCommand wsc = new WorkerShellCommand(workerId, command);
+        WorkerShellCommand wsc = new WorkerShellCommand(workerId, command) {
+            @Override
+            public void handleSignal(byte[] data) {
+                forwardSignal(data);
+            }
+        };
         BlockQueueProcessor bqp = (BlockQueueProcessor) ((CoasterService) getChannel().getChannelContext().
                 getService()).getJobQueue().getCoasterQueueProcessor();
         try {
@@ -47,6 +52,10 @@
         }
     }
 
+    protected void forwardSignal(byte[] data) {
+        this.getChannel().sendTaggedReply(getId(), data, CoasterChannel.SIGNAL_FLAG);
+    }
+
     public void errorReceived(Command cmd, String msg, Exception t) {
         try {
             sendError("Worker error: " + msg, t);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/WorkerShellCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/WorkerShellCommand.java	(revision 3761)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/WorkerShellCommand.java	(working copy)
@@ -22,4 +22,8 @@
         addOutData(workerId);
         addOutData(command);
     }
+
+    public void input(String str) {
+        // TBD
+    }
 }
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl	(revision 3761)
+++ modules/provider-coaster/resources/worker.pl	(working copy)
@@ -18,7 +18,7 @@
 use File::Copy;
 use Getopt::Std;
 use FileHandle;
-use Cwd;
+use Cwd "realpath";
 use POSIX;
 use POSIX ":sys_wait_h";
 use strict;
@@ -275,6 +275,8 @@
 
 my %JOBWAITDATA = ();
 my %JOBDATA = ();
+my %ACTIVECMDS = ();
+my $SHELLCWD = getcwd();
 
 # CDM variables:
 my $PINNED_READY = 0;
@@ -745,6 +747,11 @@
 	push @SENDQ, [$tag, undef, SIGNAL_FLAG, arrayData(@msgs)];
 }
 
+sub queueReplySignal {
+	my ($tag, @msgs) = @_;
+	push @SENDQ, [$tag, undef, REPLY_FLAG | SIGNAL_FLAG, arrayData(@msgs)];
+}
+
 sub queueError {
 	my ($tag, @msgs) = @_;
 	push @SENDQ, [$tag, undef, REPLY_FLAG | ERROR_FLAG, arrayData(@msgs)];
@@ -934,6 +941,7 @@
 	
 	checkHeartbeat();
 	checkJobs();
+	checkCommands();
 	
 	if (@SENDQ) {
 		# if there are commands to send, don't just wait for data
@@ -982,6 +990,27 @@
 	}
 }
 
+sub checkCommands {
+	for my $tag (keys %ACTIVECMDS) {
+		my $out = $ACTIVECMDS{$tag};
+		my $rin = 0;
+		vec($rin, fileno($out), 1) = 1;
+		if (select($rin, undef, undef, 0)) {
+			my $data;
+			my $count = sysread($out, $data, 1024);
+			if ($count == 0) {
+				# eof
+				wlog DEBUG, "Command output done for $tag\n";
+				queueReply($tag, ("OK", ""));
+				delete $ACTIVECMDS{$tag};
+			}
+			else {
+				wlog DEBUG, "Command output $tag: $data\n";
+				queueReplySignal($tag, ($data));
+			}
+		}
+	}
+}
 
 sub printreply {
 	my ($tag, $timeout, $err, $fin, $reply) = @_;
@@ -1118,13 +1147,13 @@
 	my $out;
 	if ($cmd =~ m/cd\s*(.*)/) {
 		wlog DEBUG, "chdir $1\n";
-		chdir $1;
-		if ($! ne '') {
-			queueError($tag, ("$!"));
+		if (substr($1, 0, 1) eq "/") {
+			$SHELLCWD = $1;
 		}
 		else {
-			queueReply($tag, ("OK", ""));
+			$SHELLCWD = realpath("$SHELLCWD/$1");
 		}
+		queueReply($tag, ("OK", "CWD: $SHELLCWD"));
 	}
 	elsif ($cmd =~ m/mls\s*(.*)/) {
 		wlog DEBUG, "mls $1\n";
@@ -1132,10 +1161,16 @@
 		queueReply($tag, ("OK", "$out"));
 	}
 	else {
-		wlog DEBUG, "workershellcmd: $cmd\n";
-		$out = `$cmd 2>&1`;
-		wlog TRACE, "result: $out\n";
-		queueReply($tag, ("OK", "$out"));
+		wlog DEBUG, "workershellcmd $tag: $cmd\n";
+		my $err = 0;
+		open my $out, "-|", "cd $SHELLCWD && $cmd 2>&1" or $err = 1;
+		if ($err) {
+			wlog DEBUG, "Cannot launch $cmd ($tag)\n";
+			queueError($tag, ("Error starting $cmd"));
+		}
+		else {
+			$ACTIVECMDS{$tag} = $out;
+		}
 	}
 }
 



More information about the Swift-commit mailing list