[Swift-commit] cog r3762
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Tue Aug 13 15:55:03 CDT 2013
------------------------------------------------------------------------
r3762 | hategan | 2013-08-13 15:54:48 -0500 (Tue, 13 Aug 2013) | 1 line
use signals to forward output of long runnning commands in the shell
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/WorkerShellHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/WorkerShellHandler.java (revision 3761)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/WorkerShellHandler.java (working copy)
@@ -28,7 +28,12 @@
public void requestComplete() throws ProtocolException {
String workerId = getInDataAsString(0);
String command = getInDataAsString(1);
- WorkerShellCommand wsc = new WorkerShellCommand(workerId, command);
+ WorkerShellCommand wsc = new WorkerShellCommand(workerId, command) {
+ @Override
+ public void handleSignal(byte[] data) {
+ forwardSignal(data);
+ }
+ };
BlockQueueProcessor bqp = (BlockQueueProcessor) ((CoasterService) getChannel().getChannelContext().
getService()).getJobQueue().getCoasterQueueProcessor();
try {
@@ -47,6 +52,10 @@
}
}
+ protected void forwardSignal(byte[] data) {
+ this.getChannel().sendTaggedReply(getId(), data, CoasterChannel.SIGNAL_FLAG);
+ }
+
public void errorReceived(Command cmd, String msg, Exception t) {
try {
sendError("Worker error: " + msg, t);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/WorkerShellCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/WorkerShellCommand.java (revision 3761)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/WorkerShellCommand.java (working copy)
@@ -22,4 +22,8 @@
addOutData(workerId);
addOutData(command);
}
+
+ public void input(String str) {
+ // TBD
+ }
}
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl (revision 3761)
+++ modules/provider-coaster/resources/worker.pl (working copy)
@@ -18,7 +18,7 @@
use File::Copy;
use Getopt::Std;
use FileHandle;
-use Cwd;
+use Cwd "realpath";
use POSIX;
use POSIX ":sys_wait_h";
use strict;
@@ -275,6 +275,8 @@
my %JOBWAITDATA = ();
my %JOBDATA = ();
+my %ACTIVECMDS = ();
+my $SHELLCWD = getcwd();
# CDM variables:
my $PINNED_READY = 0;
@@ -745,6 +747,11 @@
push @SENDQ, [$tag, undef, SIGNAL_FLAG, arrayData(@msgs)];
}
+sub queueReplySignal {
+ my ($tag, @msgs) = @_;
+ push @SENDQ, [$tag, undef, REPLY_FLAG | SIGNAL_FLAG, arrayData(@msgs)];
+}
+
sub queueError {
my ($tag, @msgs) = @_;
push @SENDQ, [$tag, undef, REPLY_FLAG | ERROR_FLAG, arrayData(@msgs)];
@@ -934,6 +941,7 @@
checkHeartbeat();
checkJobs();
+ checkCommands();
if (@SENDQ) {
# if there are commands to send, don't just wait for data
@@ -982,6 +990,27 @@
}
}
+sub checkCommands {
+ for my $tag (keys %ACTIVECMDS) {
+ my $out = $ACTIVECMDS{$tag};
+ my $rin = 0;
+ vec($rin, fileno($out), 1) = 1;
+ if (select($rin, undef, undef, 0)) {
+ my $data;
+ my $count = sysread($out, $data, 1024);
+ if ($count == 0) {
+ # eof
+ wlog DEBUG, "Command output done for $tag\n";
+ queueReply($tag, ("OK", ""));
+ delete $ACTIVECMDS{$tag};
+ }
+ else {
+ wlog DEBUG, "Command output $tag: $data\n";
+ queueReplySignal($tag, ($data));
+ }
+ }
+ }
+}
sub printreply {
my ($tag, $timeout, $err, $fin, $reply) = @_;
@@ -1118,13 +1147,13 @@
my $out;
if ($cmd =~ m/cd\s*(.*)/) {
wlog DEBUG, "chdir $1\n";
- chdir $1;
- if ($! ne '') {
- queueError($tag, ("$!"));
+ if (substr($1, 0, 1) eq "/") {
+ $SHELLCWD = $1;
}
else {
- queueReply($tag, ("OK", ""));
+ $SHELLCWD = realpath("$SHELLCWD/$1");
}
+ queueReply($tag, ("OK", "CWD: $SHELLCWD"));
}
elsif ($cmd =~ m/mls\s*(.*)/) {
wlog DEBUG, "mls $1\n";
@@ -1132,10 +1161,16 @@
queueReply($tag, ("OK", "$out"));
}
else {
- wlog DEBUG, "workershellcmd: $cmd\n";
- $out = `$cmd 2>&1`;
- wlog TRACE, "result: $out\n";
- queueReply($tag, ("OK", "$out"));
+ wlog DEBUG, "workershellcmd $tag: $cmd\n";
+ my $err = 0;
+ open my $out, "-|", "cd $SHELLCWD && $cmd 2>&1" or $err = 1;
+ if ($err) {
+ wlog DEBUG, "Cannot launch $cmd ($tag)\n";
+ queueError($tag, ("Error starting $cmd"));
+ }
+ else {
+ $ACTIVECMDS{$tag} = $out;
+ }
}
}
More information about the Swift-commit
mailing list