[Swift-commit] cog r3677
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Wed Jul 3 20:45:03 CDT 2013
------------------------------------------------------------------------
r3677 | hategan | 2013-07-03 20:41:23 -0500 (Wed, 03 Jul 2013) | 1 line
added redirect ability to coasters
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java (working copy)
@@ -31,6 +31,8 @@
int code = getInDataAsInt(2);
String message = getInDataAsString(3);
+ String out = null, err = null;
+
Status s = new StatusImpl();
s.setStatusCode(status);
if (status == Status.FAILED && code != 0) {
@@ -41,13 +43,17 @@
s.setException(new JobException(code));
}
}
+ if (status == Status.FAILED || status == Status.COMPLETED) {
+ if (getInDataSize() == 7) {
+ out = getInDataAsString(5);
+ err = getInDataAsString(6);
+ }
+ }
if (message != null && !message.equals("")) {
s.setMessage(message);
}
- if (getInDataChunks().size() > 4) {
- s.setTime(new Date(this.getInDataAsLong(4)));
- }
- NotificationManager.getDefault().notificationReceived(jobId, s);
+ s.setTime(new Date(this.getInDataAsLong(4)));
+ NotificationManager.getDefault().notificationReceived(jobId, s, out, err);
sendReply("OK");
}
catch (Exception e) {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java (working copy)
@@ -13,22 +13,32 @@
import java.io.PrintWriter;
import java.io.StringWriter;
+import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.execution.JobException;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.commands.Command;
public class JobStatusCommand extends Command {
+ public static final Logger logger = Logger.getLogger(JobStatusCommand.class);
+
public static final String NAME = "JOBSTATUS";
private String taskId;
private Status status;
+ private String out, err;
public JobStatusCommand(String taskId, Status status) {
super(NAME);
this.taskId = taskId;
this.status = status;
}
+
+ public JobStatusCommand(String taskId, Status status, String out, String err) {
+ this(taskId, status);
+ this.out = out;
+ this.err = err;
+ }
public void send() throws ProtocolException {
@@ -63,5 +73,12 @@
}
addOutData(sb.toString());
addOutData(status.getTime().getTime());
+ if (out != null && err != null) {
+ addOutData(out);
+ addOutData(err);
+ }
+ else if (out != null || err != null) {
+ logger.warn("Only one of job STDOUT or STDERR is non-null");
+ }
}
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/ExtendedStatusListener.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/ExtendedStatusListener.java (revision 0)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/ExtendedStatusListener.java (revision 3677)
@@ -0,0 +1,7 @@
+package org.globus.cog.abstraction.coaster.service.job.manager;
+
+import org.globus.cog.abstraction.interfaces.Status;
+
+public interface ExtendedStatusListener {
+ public void statusChanged(Status s, String out, String err);
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java (working copy)
@@ -9,11 +9,7 @@
*/
package org.globus.cog.abstraction.coaster.service.job.manager;
-import java.util.LinkedList;
-import java.util.TimerTask;
-
import org.apache.log4j.Logger;
-import org.globus.cog.abstraction.coaster.service.CoasterService;
import org.globus.cog.abstraction.coaster.service.JobStatusCommand;
import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
@@ -26,64 +22,47 @@
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
-public class TaskNotifier implements StatusListener, Callback {
+public class TaskNotifier implements StatusListener, ExtendedStatusListener, Callback {
public static final Logger logger = Logger.getLogger(TaskNotifier.class);
- public static final int CONGESTION_THRESHOLD = 64;
-
private ChannelContext channelContext;
private Task task;
private KarajanChannel channel;
private static int notacknowledged;
- private static LinkedList<Entry> queue;
- static {
- queue = new LinkedList<Entry>();
- CoasterService.addPeriodicWatchdog(new TimerTask() {
- public void run() {
- synchronized (TaskNotifier.class) {
- if (logger.isInfoEnabled()) {
- logger.info("Congestion queue size: " + queue.size());
- }
- checkQueue();
- }
- }
- }, 10000);
- }
-
public TaskNotifier(Task task, ChannelContext channelContext) {
this.task = task;
this.channelContext = channelContext;
- task.addStatusListener(this);
- NotificationManager.getDefault().registerTask(task.getIdentity().getValue(), task);
+ this.task.addStatusListener(this);
+ NotificationManager.getDefault().registerTask(task.getIdentity().getValue(), task, this);
}
-
+
+
public void statusChanged(StatusEvent event) {
- int code = event.getStatus().getStatusCode();
+ sendStatus(this, event.getStatus(), null, null);
+ }
+
+ public void statusChanged(Status s, String out, String err) {
+ int code = s.getStatusCode();
if (code != Status.SUBMITTED && code != Status.SUBMITTING) {
- sendStatus(this, event.getStatus());
+ sendStatus(this, s, out, err);
}
}
- public static synchronized void sendStatus(TaskNotifier tn, Status s) {
- if (notacknowledged >= CONGESTION_THRESHOLD) {
- queue.addLast(new Entry(tn, s));
- }
- else {
- String taskId = tn.task.getIdentity().toString();
- JobStatusCommand c = new JobStatusCommand(taskId, s);
- try {
- tn.channel = ChannelManager.getManager().reserveChannel(tn.channelContext);
- if (s.isTerminal()) {
- ChannelManager.getManager().releaseLongTerm(tn.channel);
- }
- c.executeAsync(tn.channel, tn);
- notacknowledged++;
+ public static synchronized void sendStatus(TaskNotifier tn, Status s, String out, String err) {
+ String taskId = tn.task.getIdentity().toString();
+ JobStatusCommand c = new JobStatusCommand(taskId, s, out, err);
+ try {
+ tn.channel = ChannelManager.getManager().reserveChannel(tn.channelContext);
+ if (s.isTerminal()) {
+ ChannelManager.getManager().releaseLongTerm(tn.channel);
}
- catch (Exception e) {
- logger.warn("Failed to send task notification", e);
- }
+ c.executeAsync(tn.channel, tn);
+ notacknowledged++;
}
+ catch (Exception e) {
+ logger.warn("Failed to send task notification", e);
+ }
}
public void errorReceived(Command cmd, String msg, Exception t) {
@@ -91,7 +70,6 @@
ChannelManager.getManager().releaseChannel(channel);
synchronized(TaskNotifier.class) {
notacknowledged--;
- checkQueue();
}
}
@@ -99,24 +77,6 @@
ChannelManager.getManager().releaseChannel(channel);
synchronized(TaskNotifier.class) {
notacknowledged--;
- checkQueue();
}
}
-
- private static void checkQueue() {
- if (notacknowledged < CONGESTION_THRESHOLD && !queue.isEmpty()) {
- Entry e = queue.removeFirst();
- sendStatus(e.tn, e.s);
- }
- }
-
- private static class Entry {
- TaskNotifier tn;
- Status s;
-
- public Entry(TaskNotifier tn, Status s) {
- this.tn = tn;
- this.s = s;
- }
- }
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockTask.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockTask.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockTask.java (working copy)
@@ -107,12 +107,11 @@
js.addArgument(join(settings.getCallbackURIs(), ","));
js.addArgument(block.getId());
- if (settings.getWorkerLoggingLevel().equals("NONE"))
- {
+ settings.setWorkerLoggingLevel("DEBUG");
+ if (settings.getWorkerLoggingLevel().equals("NONE")) {
js.addArgument("NOLOGGING");
}
- else
- {
+ else {
String logDir = settings.getWorkerLoggingDirectory();
if (logDir.equals("DEFAULT"))
js.addArgument(Bootstrap.LOG_DIR.getAbsolutePath());
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Job.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Job.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Job.java (working copy)
@@ -15,6 +15,7 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.StatusImpl;
import org.globus.cog.abstraction.impl.common.execution.WallTime;
+import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
@@ -179,7 +180,8 @@
}
public void fail(String message, Exception e) {
- task.setStatus(new StatusImpl(Status.FAILED, message, e));
+ NotificationManager.getDefault().notificationReceived(task.getIdentity().getValue(),
+ new StatusImpl(Status.FAILED, message, e), null, null);
}
public Task getTask() {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (working copy)
@@ -346,7 +346,7 @@
bqp.jobTerminated(running);
}
if (running != null) {
- running.fail("Task failed: " + msg, e);
+ running.fail("Block task failed: " + msg, e);
}
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java (working copy)
@@ -13,13 +13,11 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.IdentityImpl;
-import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
import org.globus.cog.abstraction.interfaces.ExecutionService;
import org.globus.cog.abstraction.interfaces.Identity;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Status;
-import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.util.ProcessListener;
import org.globus.cog.util.ProcessMonitor;
@@ -33,7 +31,7 @@
*
* @author wozniak
*/
-public class Mpiexec implements ProcessListener, StatusListener {
+public class Mpiexec implements ProcessListener, ExtendedStatusListener {
public static final Logger logger = Logger.getLogger(Mpiexec.class);
@@ -336,15 +334,12 @@
// Clone original job as proxy job
Task clone = (Task) job.getTask().clone();
- // Set clone to notify this Mpiexec instance
- clone.addStatusListener(this);
-
// Update Task Identity and set Notification
Identity cloneID = new IdentityImpl(clone.getIdentity());
String value = cloneID.getValue() + ":" + i;
cloneID.setValue(value);
clone.setIdentity(cloneID);
- NotificationManager.getDefault().registerTask(value, clone);
+ NotificationManager.getDefault().registerTask(value, clone, this);
// Update Task Specification
JobSpecification spec = (JobSpecification) clone.getSpecification();
@@ -399,10 +394,10 @@
* Multiplex Hydra proxy StatusEvents into the StatusEvents for
* the original job
*/
- public void statusChanged(StatusEvent event) {
- logger.debug(event);
+ public void statusChanged(Status s, String out, String err) {
+ logger.debug(s);
synchronized (statusCount) {
- int code = event.getStatus().getStatusCode();
+ int code = s.getStatusCode();
Integer count = statusCount.get(code);
if (count == null)
count = 1;
@@ -410,12 +405,11 @@
count++;
statusCount.put(code, count);
if (count == proxies.size())
- propagate(event);
+ propagate(s);
}
}
- private void propagate(StatusEvent event) {
- Status s = event.getStatus();
+ private void propagate(Status s) {
logger.debug("propagating: to: " + job + " " + s);
job.getTask().setStatus(s);
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java (working copy)
@@ -92,6 +92,7 @@
spec.setStdInput(helper.read("stdin"));
spec.setStdOutput(helper.read("stdout"));
spec.setStdError(helper.read("stderr"));
+ spec.setRedirected(helper.read("redirect") != null);
String s;
while ((s = helper.read("arg")) != null) {
spec.addArgument(s);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java (working copy)
@@ -16,6 +16,7 @@
import java.util.Map;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.job.manager.ExtendedStatusListener;
import org.globus.cog.abstraction.impl.common.StatusImpl;
import org.globus.cog.abstraction.interfaces.ServiceContact;
import org.globus.cog.abstraction.interfaces.Status;
@@ -42,56 +43,58 @@
/**
Map from Task IDs to Tasks
*/
- private Map<String, Task> tasks;
+ private Map<String, TaskListenerPair> listeners;
/**
Map from Task IDs to Status updates that arrived before the
Task existed in the Map {@link tasks}
*/
- private Map<String, List<Status>> pending;
+ private Map<String, List<ExtendedStatus>> pending;
private long lastNotificationTime;
public NotificationManager() {
- tasks = new HashMap<String, Task>();
- pending = new HashMap<String, List<Status>>();
+ listeners = new HashMap<String, TaskListenerPair>();
+ pending = new HashMap<String, List<ExtendedStatus>>();
lastNotificationTime = System.currentTimeMillis();
}
- public void registerTask(String id, Task task) {
- List<Status> p;
- synchronized (tasks) {
- tasks.put(id, task);
+ public void registerTask(String id, Task task, ExtendedStatusListener l) {
+ List<ExtendedStatus> p;
+ synchronized (listeners) {
+ listeners.put(id, new TaskListenerPair(task, l));
p = pending.remove(id);
}
- if (p != null)
- for (Status status : p)
- setStatus(task, status);
+ if (p != null) {
+ for (ExtendedStatus e : p) {
+ notify(l, e);
+ }
+ }
}
- public void notificationReceived(String id, Status s) {
+ public void notificationReceived(String id, Status s, String out, String err) {
if (logger.isDebugEnabled())
logger.debug("recvd: for: " + id + " " + s);
- Task task;
- synchronized (tasks) {
+ TaskListenerPair l;
+ synchronized (listeners) {
if (s.isTerminal()) {
- task = tasks.remove(id);
+ l = listeners.remove(id);
}
else {
- task = tasks.get(id);
+ l = listeners.get(id);
}
lastNotificationTime = System.currentTimeMillis();
- if (task == null) {
- addPending(id, s);
+ if (l == null) {
+ addPending(id, new ExtendedStatus(s, out, err));
}
}
- if (task != null) {
- setStatus(task, s);
+ if (l != null) {
+ l.listener.statusChanged(s, out, err);
}
}
public long getIdleTime() {
- synchronized (tasks) {
- if (tasks.size() == 0 && lastNotificationTime != 0) {
+ synchronized (listeners) {
+ if (listeners.size() == 0 && lastNotificationTime != 0) {
return System.currentTimeMillis() - lastNotificationTime;
}
else {
@@ -101,50 +104,72 @@
}
public void notIdle() {
- synchronized (tasks) {
+ synchronized (listeners) {
lastNotificationTime = System.currentTimeMillis();
}
}
public int getActiveTaskCount() {
- synchronized (tasks) {
- return tasks.size();
+ synchronized (listeners) {
+ return listeners.size();
}
}
- private void setStatus(Task t, Status s) {
+ private void notify(ExtendedStatusListener l, ExtendedStatus e) {
try {
- t.setStatus(s);
+ l.statusChanged(e.status, e.out, e.err);
}
- catch (Exception e) {
- logger.warn("Could not set task status", e);
+ catch (Exception ex) {
+ logger.warn("Could not set task status", ex);
}
}
- private void addPending(String id, Status status) {
- List<Status> p = pending.get(id);
+ private void addPending(String id, ExtendedStatus es) {
+ List<ExtendedStatus> p = pending.get(id);
if (p == null) {
- p = new LinkedList<Status>();
+ p = new LinkedList<ExtendedStatus>();
pending.put(id, p);
}
- p.add(status);
+ p.add(es);
}
public void serviceTaskEnded(ServiceContact contact1,
String msg) {
- List<Task> ts;
- synchronized (tasks) {
- ts = new ArrayList<Task>(tasks.values());
+ List<Map.Entry<String, TaskListenerPair>> ts;
+ synchronized (listeners) {
+ ts = new ArrayList<Map.Entry<String, TaskListenerPair>>(listeners.entrySet());
}
logger.info(contact1.toString());
- for (Task t : ts) {
+ for (Map.Entry<String, TaskListenerPair> e: ts) {
ServiceContact contact2 =
- t.getService(0).getServiceContact();
+ e.getValue().task.getService(0).getServiceContact();
logger.info(contact2.toString());
if (contact2.equals(contact1))
- notificationReceived(t.getIdentity().toString(),
+ notificationReceived(e.getKey(),
new StatusImpl(Status.FAILED,
- msg, null));
+ msg, null), null, null);
}
}
+
+ private static final class ExtendedStatus {
+ public final Status status;
+ public final String out;
+ public final String err;
+
+ public ExtendedStatus(Status status, String out, String err) {
+ this.status = status;
+ this.out = out;
+ this.err = err;
+ }
+ }
+
+ private static final class TaskListenerPair {
+ public final Task task;
+ public final ExtendedStatusListener listener;
+
+ public TaskListenerPair(Task task, ExtendedStatusListener listener) {
+ this.task = task;
+ this.listener = listener;
+ }
+ }
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java (working copy)
@@ -15,6 +15,7 @@
import java.util.regex.Pattern;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.job.manager.ExtendedStatusListener;
import org.globus.cog.abstraction.coaster.service.local.CoasterResourceTracker;
import org.globus.cog.abstraction.coaster.service.local.LocalRequestManager;
import org.globus.cog.abstraction.coaster.service.local.LocalService;
@@ -42,7 +43,7 @@
import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
import org.ietf.jgss.GSSCredential;
-public class JobSubmissionTaskHandler extends AbstractDelegatedTaskHandler implements Callback {
+public class JobSubmissionTaskHandler extends AbstractDelegatedTaskHandler implements Callback, ExtendedStatusListener {
private static Logger logger = Logger.getLogger(JobSubmissionTaskHandler.class);
private static Set<Object> configured, configuring;
@@ -235,10 +236,21 @@
if (logger.isDebugEnabled()) {
logger.debug("Submitted task " + getTask() + ". Job id: " + jobid);
}
- NotificationManager.getDefault().registerTask(jobid, getTask());
+ NotificationManager.getDefault().registerTask(jobid, getTask(), this);
}
}
+ public void statusChanged(Status s, String out, String err) {
+ Task t = getTask();
+ if (out != null) {
+ t.setStdOutput(out);
+ }
+ if (err != null) {
+ t.setStdError(err);
+ }
+ t.setStatus(s);
+ }
+
public boolean getAutostart() {
return autostart;
}
@@ -363,15 +375,16 @@
Task t = new TaskImpl();
t.setType(Task.JOB_SUBMISSION);
JobSpecification js = new JobSpecificationImpl();
- js.setExecutable("/bin/sleep");
+ js.setExecutable("/bin/date");
int base = (int) (rnd.nextDouble() * 20) + 5;
- js.addArgument(String.valueOf(base + (int) (rnd.nextDouble() * base)));
+ //js.addArgument(String.valueOf(base + (int) (rnd.nextDouble() * base)));
js.setAttribute("maxwalltime", "00:00:" + String.valueOf(base * 2));
- js.setAttribute("slots", "8");
+ js.setAttribute("slots", "2");
js.setAttribute("lowOverallocation", "6");
- js.setAttribute("nodeGranularity", "8");
- js.setAttribute("maxNodes", "8");
- js.setAttribute("remoteMonitorEnabled", "true");
+ js.setAttribute("nodeGranularity", "1");
+ js.setAttribute("maxNodes", "2");
+ js.setRedirected(true);
+ //js.setAttribute("remoteMonitorEnabled", "true");
t.setSpecification(js);
ExecutionService s = new ExecutionServiceImpl();
// s.setServiceContact(new ServiceContactImpl("localhost"));
@@ -388,7 +401,7 @@
// JobSubmissionTaskHandler th = new JobSubmissionTaskHandler(
// AbstractionFactory.newExecutionTaskHandler("local"));
JobSubmissionTaskHandler th = new JobSubmissionTaskHandler();
- // th.setAutostart(true);
+ th.setAutostart(true);
th.submit(t);
return t;
}
@@ -400,11 +413,11 @@
rnd = new Random();
rnd.setSeed(10L);
long s = System.currentTimeMillis();
- Task[] ts = new Task[2048];
+ Task[] ts = new Task[1];
for (int i = 0; i < ts.length; i++) {
ts[i] = submitTask();
- if (i % 100 == 0) {
- System.err.println(i + " submitted");
+ if (i % 1 == 0) {
+ System.err.println((i + 1) + " submitted");
}
}
for (int i = 0; i < ts.length; i++) {
@@ -416,6 +429,9 @@
ts[i].getStatus().getException().printStackTrace();
}
}
+ else {
+ System.out.println(ts[i].getStdOutput());
+ }
if (i % 100 == 0 && i > 0) {
System.err.println(i + " done");
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java (revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java (working copy)
@@ -96,6 +96,10 @@
add(sb, "stdin", spec.getStdInput());
add(sb, "stdout", spec.getStdOutput());
add(sb, "stderr", spec.getStdError());
+
+ if (spec.isRedirected()) {
+ add(sb, "redirect", true);
+ }
for (String arg : spec.getArgumentsAsList())
add(sb, "arg", arg);
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl (revision 3676)
+++ modules/provider-coaster/resources/worker.pl (working copy)
@@ -126,6 +126,7 @@
# TODO: Make this configurable (#537)
use constant MAX_RECONNECT_ATTEMPTS => 3;
use constant NEVER => 9999999999;
+use constant NULL_TIMESTAMP => "\x00\x00\x00\x00\x00\x00\x00\x00";
use constant JOB_CHECK_INTERVAL => 0.1; # seconds
@@ -137,6 +138,9 @@
use constant IOBUFSZ => 32768;
use constant IOBLOCKSZ => 8;
+# Maximum size of re-directed output
+use constant MAX_OUT_REDIR_SIZE => 2048;
+
# If true, enable a profile result that is written to the log
my $PROFILE = 0;
# Contains tuples (EVENT, PID, TIMESTAMP) (flattened)
@@ -1042,9 +1046,16 @@
my ($jobid, $statusCode, $errorCode, $msg) = @_;
queueCmd((nullCB(), "JOBSTATUS", $jobid,
- encodeInt($statusCode), encodeInt($errorCode), $msg));
+ encodeInt($statusCode), encodeInt($errorCode), $msg, NULL_TIMESTAMP));
}
+sub queueJobStatusCmdExt {
+ my ($jobid, $statusCode, $errorCode, $msg, $out, $err) = @_;
+
+ queueCmd((nullCB(), "JOBSTATUS", $jobid,
+ encodeInt($statusCode), encodeInt($errorCode), $msg, NULL_TIMESTAMP, $out, $err));
+}
+
sub dieNicely {
my ($msg) = @_;
@@ -1529,16 +1540,80 @@
}
}
+sub readFile {
+ my ($jobid, $fname) = @_;
+
+ wlog DEBUG, "$jobid Reading $fname\n";
+ if (-e $fname) {
+ my $fd;
+ my $content;
+
+ $content = "";
+
+ open($fd, "<", $fname) or return "Error: could not open $fname";
+ while (<$fd>) {
+ chomp;
+ $content = $content . $_;
+ if (length($content) > MAX_OUT_REDIR_SIZE) {
+ close($fd);
+ $content = $content . "\n<output truncated>";
+ last;
+ }
+ }
+ close($fd);
+ wlog DEBUG, "$jobid $fname: $content\n";
+ return $content;
+ }
+ else {
+ wlog DEBUG, "$jobid $fname does not exist\n";
+ return "";
+ }
+}
+
+sub readFiles {
+ my ($jobid) = @_;
+
+ my $pid = $JOBDATA{$jobid}{"pid"};
+
+ return (readFile($jobid, tmpSFile($pid, "out")), readFile($jobid, tmpSFile($pid, "err")));
+}
+
sub sendStatus {
my ($jobid) = @_;
my $ec = $JOBDATA{$jobid}{"exitcode"};
-
+
+ my $stdoutRedir;
+ my $stderrRedir;
+ my $redirect;
+
+ $redirect = 0;
+
+ if (defined $JOBDATA{$jobid}{"job"}{"redirect"}) {
+ wlog DEBUG, "$jobid Output is redirected\n";
+ ($stdoutRedir, $stderrRedir) = readFiles($jobid);
+ $redirect = 1;
+ }
+ else {
+ $stdoutRedir = "";
+ $stderrRedir = "";
+ }
+
if ($ec == 0) {
- queueJobStatusCmd($jobid, COMPLETED, 0, "");
+ if ($redirect) {
+ queueJobStatusCmdExt($jobid, COMPLETED, 0, "", $stdoutRedir, $stderrRedir);
+ }
+ else {
+ queueJobStatusCmd($jobid, COMPLETED, 0, "");
+ }
}
else {
- queueJobStatusCmd($jobid, FAILED, $ec, getExitMessage($jobid, $ec));
+ if ($redirect) {
+ queueJobStatusCmdExt($jobid, FAILED, $ec, getExitMessage($jobid, $ec), $stdoutRedir, $stderrRedir);
+ }
+ else {
+ queueJobStatusCmd($jobid, FAILED, $ec, getExitMessage($jobid, $ec));
+ }
}
}
@@ -1833,7 +1908,7 @@
pipe($PARENT_R, $CHILD_W);
$pid = fork();
-
+
if (defined($pid)) {
if ($pid == 0) {
close $PARENT_R;
@@ -1843,6 +1918,7 @@
else {
wlog INFO, "$JOBID Forked process $pid. Waiting for its completion\n";
close $CHILD_W;
+ $JOBDATA{$JOBID}{"pid"} = $pid;
$JOBS_RUNNING++;
$JOBWAITDATA{$JOBID} = {
pid => $pid,
@@ -1970,11 +2046,17 @@
return 1;
}
+sub tmpSFile {
+ my ($pid, $suffix) = @_;
+
+ return "/tmp/$pid.$suffix";
+}
+
sub runjob {
my ($WR, $JOB, $JOBARGS, $JOBENV, $JOBSLOT, $WORKERPID) = @_;
my $executable = $$JOB{"executable"};
- my $stdout = $$JOB{"stdout"};
- my $stderr = $$JOB{"stderr"};
+ my $sout = $$JOB{"stdout"};
+ my $serr = $$JOB{"stderr"};
my $cwd = getcwd();
# wlog DEBUG, "CWD: $cwd\n";
@@ -1987,19 +2069,24 @@
$ENV{"SWIFT_WORKER_PID"} = $WORKERPID;
unshift @$JOBARGS, $executable;
wlog DEBUG, "Command: @$JOBARGS\n";
- if (defined $$JOB{directory}) {
+ if (defined $$JOB{"directory"}) {
wlog DEBUG, "chdir: $$JOB{directory}\n";
chdir $$JOB{directory};
}
- if (defined $stdout) {
- wlog DEBUG, "STDOUT: $stdout\n";
+ if (defined $$JOB{"redirect"}) {
+ wlog DEBUG, "Redirection is on\n";
+ $sout = tmpSFile($$, "out");
+ $serr = tmpSFile($$, "err");
+ }
+ if (defined $sout) {
+ wlog DEBUG, "STDOUT: $sout\n";
close STDOUT;
- open STDOUT, ">$stdout" or dieNicely("Cannot redirect STDOUT");
+ open STDOUT, ">$sout" or dieNicely("Cannot redirect STDOUT");
}
- if (defined $stderr) {
- wlog DEBUG, "STDERR: $stderr\n";
+ if (defined $serr) {
+ wlog DEBUG, "STDERR: $serr\n";
close STDERR;
- open STDERR, ">$stderr" or dieNicely("Cannot redirect STDERR");
+ open STDERR, ">$serr" or dieNicely("Cannot redirect STDERR");
}
close STDIN;
More information about the Swift-commit
mailing list