[Swift-commit] cog r3677

swift at ci.uchicago.edu swift at ci.uchicago.edu
Wed Jul 3 20:45:03 CDT 2013


------------------------------------------------------------------------
r3677 | hategan | 2013-07-03 20:41:23 -0500 (Wed, 03 Jul 2013) | 1 line

added redirect ability to coasters
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java	(working copy)
@@ -31,6 +31,8 @@
             int code = getInDataAsInt(2);
             String message = getInDataAsString(3);
             
+            String out = null, err = null;
+            
             Status s = new StatusImpl();
             s.setStatusCode(status);
             if (status == Status.FAILED && code != 0) {
@@ -41,13 +43,17 @@
             	    s.setException(new JobException(code));
             	}
             }
+            if (status == Status.FAILED || status == Status.COMPLETED) {
+                if (getInDataSize() == 7) {
+                    out = getInDataAsString(5);
+                    err = getInDataAsString(6);
+                }
+            }
             if (message != null && !message.equals("")) {
                 s.setMessage(message);
             }
-            if (getInDataChunks().size() > 4) {
-                s.setTime(new Date(this.getInDataAsLong(4)));
-            }
-            NotificationManager.getDefault().notificationReceived(jobId, s);
+            s.setTime(new Date(this.getInDataAsLong(4)));
+            NotificationManager.getDefault().notificationReceived(jobId, s, out, err);
             sendReply("OK");
         }
         catch (Exception e) {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java	(working copy)
@@ -13,22 +13,32 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.impl.common.execution.JobException;
 import org.globus.cog.abstraction.interfaces.Status;
 import org.globus.cog.karajan.workflow.service.ProtocolException;
 import org.globus.cog.karajan.workflow.service.commands.Command;
 
 public class JobStatusCommand extends Command {
+    public static final Logger logger = Logger.getLogger(JobStatusCommand.class);
+    
     public static final String NAME = "JOBSTATUS";
 
     private String taskId;
     private Status status;
+    private String out, err;
 
     public JobStatusCommand(String taskId, Status status) {
         super(NAME);
         this.taskId = taskId;
         this.status = status;
     }
+    
+    public JobStatusCommand(String taskId, Status status, String out, String err) {
+        this(taskId, status);
+        this.out = out;
+        this.err = err;
+    }
 
 
     public void send() throws ProtocolException {
@@ -63,5 +73,12 @@
         }
         addOutData(sb.toString());
         addOutData(status.getTime().getTime());
+        if (out != null && err != null) {
+            addOutData(out);
+            addOutData(err);
+        }
+        else if (out != null || err != null) {
+            logger.warn("Only one of job STDOUT or STDERR is non-null");
+        }
     }
 }
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/ExtendedStatusListener.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/ExtendedStatusListener.java	(revision 0)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/ExtendedStatusListener.java	(revision 3677)
@@ -0,0 +1,7 @@
+package org.globus.cog.abstraction.coaster.service.job.manager;
+
+import org.globus.cog.abstraction.interfaces.Status;
+
+public interface ExtendedStatusListener {
+    public void statusChanged(Status s, String out, String err);
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java	(working copy)
@@ -9,11 +9,7 @@
  */
 package org.globus.cog.abstraction.coaster.service.job.manager;
 
-import java.util.LinkedList;
-import java.util.TimerTask;
-
 import org.apache.log4j.Logger;
-import org.globus.cog.abstraction.coaster.service.CoasterService;
 import org.globus.cog.abstraction.coaster.service.JobStatusCommand;
 import org.globus.cog.abstraction.impl.common.StatusEvent;
 import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
@@ -26,64 +22,47 @@
 import org.globus.cog.karajan.workflow.service.commands.Command;
 import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
 
-public class TaskNotifier implements StatusListener, Callback {
+public class TaskNotifier implements StatusListener, ExtendedStatusListener, Callback {
     public static final Logger logger = Logger.getLogger(TaskNotifier.class);
 
-    public static final int CONGESTION_THRESHOLD = 64;
-
     private ChannelContext channelContext;
     private Task task;
     private KarajanChannel channel;
     private static int notacknowledged;
-    private static LinkedList<Entry> queue;
 
-    static {
-        queue = new LinkedList<Entry>();
-        CoasterService.addPeriodicWatchdog(new TimerTask() {
-            public void run() {
-                synchronized (TaskNotifier.class) {
-                    if (logger.isInfoEnabled()) {
-                        logger.info("Congestion queue size: " + queue.size());
-                    }
-                    checkQueue();
-                }
-            }
-        }, 10000);
-    }
-
     public TaskNotifier(Task task, ChannelContext channelContext) {
         this.task = task;
         this.channelContext = channelContext;
-        task.addStatusListener(this);
-        NotificationManager.getDefault().registerTask(task.getIdentity().getValue(), task);
+        this.task.addStatusListener(this);
+        NotificationManager.getDefault().registerTask(task.getIdentity().getValue(), task, this);
     }
-
+    
+   
     public void statusChanged(StatusEvent event) {
-        int code = event.getStatus().getStatusCode();
+        sendStatus(this, event.getStatus(), null, null);
+    }
+
+    public void statusChanged(Status s, String out, String err) {
+        int code = s.getStatusCode();
         if (code != Status.SUBMITTED && code != Status.SUBMITTING) {
-            sendStatus(this, event.getStatus());
+            sendStatus(this, s, out, err);
         }
     }
 
-    public static synchronized void sendStatus(TaskNotifier tn, Status s) {
-        if (notacknowledged >= CONGESTION_THRESHOLD) {
-            queue.addLast(new Entry(tn, s));
-        }
-        else {
-            String taskId = tn.task.getIdentity().toString();
-            JobStatusCommand c = new JobStatusCommand(taskId, s);
-            try {
-                tn.channel = ChannelManager.getManager().reserveChannel(tn.channelContext);
-                if (s.isTerminal()) {
-                    ChannelManager.getManager().releaseLongTerm(tn.channel);
-                }
-                c.executeAsync(tn.channel, tn);
-                notacknowledged++;
+    public static synchronized void sendStatus(TaskNotifier tn, Status s, String out, String err) {
+        String taskId = tn.task.getIdentity().toString();
+        JobStatusCommand c = new JobStatusCommand(taskId, s, out, err);
+        try {
+            tn.channel = ChannelManager.getManager().reserveChannel(tn.channelContext);
+            if (s.isTerminal()) {
+                ChannelManager.getManager().releaseLongTerm(tn.channel);
             }
-            catch (Exception e) {
-                logger.warn("Failed to send task notification", e);
-            }
+            c.executeAsync(tn.channel, tn);
+            notacknowledged++;
         }
+        catch (Exception e) {
+            logger.warn("Failed to send task notification", e);
+        }
     }
     
     public void errorReceived(Command cmd, String msg, Exception t) {
@@ -91,7 +70,6 @@
         ChannelManager.getManager().releaseChannel(channel);
         synchronized(TaskNotifier.class) {
             notacknowledged--;
-            checkQueue();
         }
     }
 
@@ -99,24 +77,6 @@
         ChannelManager.getManager().releaseChannel(channel);
         synchronized(TaskNotifier.class) {
             notacknowledged--;
-            checkQueue();
         }
     }
-
-    private static void checkQueue() {
-        if (notacknowledged < CONGESTION_THRESHOLD && !queue.isEmpty()) {
-            Entry e = queue.removeFirst();
-            sendStatus(e.tn, e.s);
-        }
-    }
-    
-    private static class Entry {
-        TaskNotifier tn;
-        Status s;
-        
-        public Entry(TaskNotifier tn, Status s) {
-            this.tn = tn;
-            this.s = s;
-        }
-    }
 }
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockTask.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockTask.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockTask.java	(working copy)
@@ -107,12 +107,11 @@
         js.addArgument(join(settings.getCallbackURIs(), ","));
         js.addArgument(block.getId());
 
-        if (settings.getWorkerLoggingLevel().equals("NONE"))
-        {
+        settings.setWorkerLoggingLevel("DEBUG");
+        if (settings.getWorkerLoggingLevel().equals("NONE")) {
           js.addArgument("NOLOGGING");
         }
-        else
-        {
+        else {
         	String logDir = settings.getWorkerLoggingDirectory();
         	if (logDir.equals("DEFAULT"))
         		js.addArgument(Bootstrap.LOG_DIR.getAbsolutePath());
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Job.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Job.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Job.java	(working copy)
@@ -15,6 +15,7 @@
 import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.impl.common.StatusImpl;
 import org.globus.cog.abstraction.impl.common.execution.WallTime;
+import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
 import org.globus.cog.abstraction.interfaces.Status;
 import org.globus.cog.abstraction.interfaces.Task;
@@ -179,7 +180,8 @@
     }
 
     public void fail(String message, Exception e) {
-        task.setStatus(new StatusImpl(Status.FAILED, message, e));
+        NotificationManager.getDefault().notificationReceived(task.getIdentity().getValue(), 
+            new StatusImpl(Status.FAILED, message, e), null, null);
     }
 
     public Task getTask() {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(working copy)
@@ -346,7 +346,7 @@
             bqp.jobTerminated(running);
         }
         if (running != null) {
-            running.fail("Task failed: " + msg, e);
+            running.fail("Block task failed: " + msg, e);
         }
     }
 
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java	(working copy)
@@ -13,13 +13,11 @@
 
 import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.impl.common.IdentityImpl;
-import org.globus.cog.abstraction.impl.common.StatusEvent;
 import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
 import org.globus.cog.abstraction.interfaces.ExecutionService;
 import org.globus.cog.abstraction.interfaces.Identity;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
 import org.globus.cog.abstraction.interfaces.Status;
-import org.globus.cog.abstraction.interfaces.StatusListener;
 import org.globus.cog.abstraction.interfaces.Task;
 import org.globus.cog.util.ProcessListener;
 import org.globus.cog.util.ProcessMonitor;
@@ -33,7 +31,7 @@
  *
  * @author wozniak
  */
-public class Mpiexec implements ProcessListener, StatusListener {
+public class Mpiexec implements ProcessListener, ExtendedStatusListener {
 
     public static final Logger logger = Logger.getLogger(Mpiexec.class);
 
@@ -336,15 +334,12 @@
         // Clone original job as proxy job
         Task clone = (Task) job.getTask().clone();
 
-        // Set clone to notify this Mpiexec instance
-        clone.addStatusListener(this);
-
         // Update Task Identity and set Notification
         Identity cloneID = new IdentityImpl(clone.getIdentity());
         String value = cloneID.getValue() + ":" + i;
         cloneID.setValue(value);
         clone.setIdentity(cloneID);
-        NotificationManager.getDefault().registerTask(value, clone);
+        NotificationManager.getDefault().registerTask(value, clone, this);
 
         // Update Task Specification
         JobSpecification spec = (JobSpecification) clone.getSpecification();
@@ -399,10 +394,10 @@
      * Multiplex Hydra proxy StatusEvents into the StatusEvents for
      * the original job
      */
-    public void statusChanged(StatusEvent event) {
-        logger.debug(event);
+    public void statusChanged(Status s, String out, String err) {
+        logger.debug(s);
         synchronized (statusCount) {
-            int code = event.getStatus().getStatusCode();
+            int code = s.getStatusCode();
             Integer count = statusCount.get(code);
             if (count == null)
                 count = 1;
@@ -410,12 +405,11 @@
                 count++;
             statusCount.put(code, count);
             if (count == proxies.size())
-                propagate(event);
+                propagate(s);
         }
     }
 
-    private void propagate(StatusEvent event) {
-        Status s = event.getStatus();
+    private void propagate(Status s) {
         logger.debug("propagating: to: " + job + " " + s);
         job.getTask().setStatus(s);
     }
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java	(working copy)
@@ -92,6 +92,7 @@
         spec.setStdInput(helper.read("stdin"));
         spec.setStdOutput(helper.read("stdout"));
         spec.setStdError(helper.read("stderr"));
+        spec.setRedirected(helper.read("redirect") != null);
         String s;
         while ((s = helper.read("arg")) != null) {
             spec.addArgument(s);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java	(working copy)
@@ -16,6 +16,7 @@
 import java.util.Map;
 
 import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.job.manager.ExtendedStatusListener;
 import org.globus.cog.abstraction.impl.common.StatusImpl;
 import org.globus.cog.abstraction.interfaces.ServiceContact;
 import org.globus.cog.abstraction.interfaces.Status;
@@ -42,56 +43,58 @@
     /** 
        Map from Task IDs to Tasks
      */
-    private Map<String, Task> tasks;
+    private Map<String, TaskListenerPair> listeners;
     
     /**
        Map from Task IDs to Status updates that arrived before the 
        Task existed in the Map {@link tasks}
      */
-    private Map<String, List<Status>> pending;
+    private Map<String, List<ExtendedStatus>> pending;
     private long lastNotificationTime;
 
     public NotificationManager() {
-        tasks = new HashMap<String, Task>();
-        pending = new HashMap<String, List<Status>>();
+        listeners = new HashMap<String, TaskListenerPair>();
+        pending = new HashMap<String, List<ExtendedStatus>>();
         lastNotificationTime = System.currentTimeMillis();
     }
 
-    public void registerTask(String id, Task task) {
-        List<Status> p;
-        synchronized (tasks) {
-            tasks.put(id, task);
+    public void registerTask(String id, Task task, ExtendedStatusListener l) {
+        List<ExtendedStatus> p;
+        synchronized (listeners) {
+            listeners.put(id, new TaskListenerPair(task, l));
             p = pending.remove(id);
         }
-        if (p != null)
-            for (Status status : p)
-                setStatus(task, status);
+        if (p != null) {
+            for (ExtendedStatus e : p) {
+                notify(l, e);
+            }
+        }
     }
 
-    public void notificationReceived(String id, Status s) {
+    public void notificationReceived(String id, Status s, String out, String err) {
         if (logger.isDebugEnabled())
             logger.debug("recvd: for: " + id + " " + s);
-        Task task;
-        synchronized (tasks) {
+        TaskListenerPair l;
+        synchronized (listeners) {
             if (s.isTerminal()) {
-                task = tasks.remove(id);
+                l = listeners.remove(id);
             }
             else {
-                task = tasks.get(id);
+                l = listeners.get(id);
             }
             lastNotificationTime = System.currentTimeMillis();
-            if (task == null) {
-            	addPending(id, s);
+            if (l == null) {
+            	addPending(id, new ExtendedStatus(s, out, err));
             }
         }
-        if (task != null) {
-            setStatus(task, s);
+        if (l != null) {
+            l.listener.statusChanged(s, out, err);
         }
     }
 
     public long getIdleTime() {
-        synchronized (tasks) {
-            if (tasks.size() == 0 && lastNotificationTime != 0) {
+        synchronized (listeners) {
+            if (listeners.size() == 0 && lastNotificationTime != 0) {
                 return System.currentTimeMillis() - lastNotificationTime;
             }
             else {
@@ -101,50 +104,72 @@
     }
 
     public void notIdle() {
-        synchronized (tasks) {
+        synchronized (listeners) {
             lastNotificationTime = System.currentTimeMillis();
         }
     }
 
     public int getActiveTaskCount() {
-        synchronized (tasks) {
-            return tasks.size();
+        synchronized (listeners) {
+            return listeners.size();
         }
     }
 
-    private void setStatus(Task t, Status s) {
+    private void notify(ExtendedStatusListener l, ExtendedStatus e) {
         try {
-            t.setStatus(s);
+            l.statusChanged(e.status, e.out, e.err);
         }
-        catch (Exception e) {
-            logger.warn("Could not set task status", e);
+        catch (Exception ex) {
+            logger.warn("Could not set task status", ex);
         }
     }
 
-    private void addPending(String id, Status status) {
-        List<Status> p = pending.get(id);
+    private void addPending(String id, ExtendedStatus es) {
+        List<ExtendedStatus> p = pending.get(id);
         if (p == null) {
-            p = new LinkedList<Status>();
+            p = new LinkedList<ExtendedStatus>();
             pending.put(id, p);
         }
-        p.add(status);
+        p.add(es);
     }
 
     public void serviceTaskEnded(ServiceContact contact1, 
                                  String msg) {
-        List<Task> ts;
-        synchronized (tasks) {
-            ts = new ArrayList<Task>(tasks.values());
+        List<Map.Entry<String, TaskListenerPair>> ts;
+        synchronized (listeners) {
+            ts = new ArrayList<Map.Entry<String, TaskListenerPair>>(listeners.entrySet());
         }
         logger.info(contact1.toString());
-        for (Task t : ts) {
+        for (Map.Entry<String, TaskListenerPair> e: ts) {
             ServiceContact contact2 = 
-                t.getService(0).getServiceContact();
+                e.getValue().task.getService(0).getServiceContact();
             logger.info(contact2.toString());
             if (contact2.equals(contact1))
-                notificationReceived(t.getIdentity().toString(), 
+                notificationReceived(e.getKey(), 
                                      new StatusImpl(Status.FAILED, 
-                                                    msg, null));
+                                                    msg, null), null, null);
         }
     }
+    
+    private static final class ExtendedStatus {
+        public final Status status;
+        public final String out;
+        public final String err;
+        
+        public ExtendedStatus(Status status, String out, String err) {
+            this.status = status;
+            this.out = out;
+            this.err = err;
+        }
+    }
+    
+    private static final class TaskListenerPair {
+        public final Task task;
+        public final ExtendedStatusListener listener;
+        
+        public TaskListenerPair(Task task, ExtendedStatusListener listener) {
+            this.task = task;
+            this.listener = listener;
+        }
+    }
 }
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java	(working copy)
@@ -15,6 +15,7 @@
 import java.util.regex.Pattern;
 
 import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.job.manager.ExtendedStatusListener;
 import org.globus.cog.abstraction.coaster.service.local.CoasterResourceTracker;
 import org.globus.cog.abstraction.coaster.service.local.LocalRequestManager;
 import org.globus.cog.abstraction.coaster.service.local.LocalService;
@@ -42,7 +43,7 @@
 import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
 import org.ietf.jgss.GSSCredential;
 
-public class JobSubmissionTaskHandler extends AbstractDelegatedTaskHandler implements Callback {
+public class JobSubmissionTaskHandler extends AbstractDelegatedTaskHandler implements Callback, ExtendedStatusListener {
     private static Logger logger = Logger.getLogger(JobSubmissionTaskHandler.class);
 
     private static Set<Object> configured, configuring;
@@ -235,10 +236,21 @@
             if (logger.isDebugEnabled()) {
                 logger.debug("Submitted task " + getTask() + ". Job id: " + jobid);
             }
-            NotificationManager.getDefault().registerTask(jobid, getTask());
+            NotificationManager.getDefault().registerTask(jobid, getTask(), this);
         }
     }
 
+    public void statusChanged(Status s, String out, String err) {
+        Task t = getTask();
+        if (out != null) {
+            t.setStdOutput(out);
+        }
+        if (err != null) {
+            t.setStdError(err);
+        }
+        t.setStatus(s);
+    }
+
     public boolean getAutostart() {
         return autostart;
     }
@@ -363,15 +375,16 @@
         Task t = new TaskImpl();
         t.setType(Task.JOB_SUBMISSION);
         JobSpecification js = new JobSpecificationImpl();
-        js.setExecutable("/bin/sleep");
+        js.setExecutable("/bin/date");
         int base = (int) (rnd.nextDouble() * 20) + 5;
-        js.addArgument(String.valueOf(base + (int) (rnd.nextDouble() * base)));
+        //js.addArgument(String.valueOf(base + (int) (rnd.nextDouble() * base)));
         js.setAttribute("maxwalltime", "00:00:" + String.valueOf(base * 2));
-        js.setAttribute("slots", "8");
+        js.setAttribute("slots", "2");
         js.setAttribute("lowOverallocation", "6");
-        js.setAttribute("nodeGranularity", "8");
-        js.setAttribute("maxNodes", "8");
-        js.setAttribute("remoteMonitorEnabled", "true");
+        js.setAttribute("nodeGranularity", "1");
+        js.setAttribute("maxNodes", "2");
+        js.setRedirected(true);
+        //js.setAttribute("remoteMonitorEnabled", "true");
         t.setSpecification(js);
         ExecutionService s = new ExecutionServiceImpl();
         // s.setServiceContact(new ServiceContactImpl("localhost"));
@@ -388,7 +401,7 @@
         // JobSubmissionTaskHandler th = new JobSubmissionTaskHandler(
         // AbstractionFactory.newExecutionTaskHandler("local"));
         JobSubmissionTaskHandler th = new JobSubmissionTaskHandler();
-        // th.setAutostart(true);
+        th.setAutostart(true);
         th.submit(t);
         return t;
     }
@@ -400,11 +413,11 @@
         	rnd = new Random();
         	rnd.setSeed(10L);
             long s = System.currentTimeMillis();
-            Task[] ts = new Task[2048];
+            Task[] ts = new Task[1];
             for (int i = 0; i < ts.length; i++) {
                 ts[i] = submitTask();
-                if (i % 100 == 0) {
-                    System.err.println(i + " submitted");
+                if (i % 1 == 0) {
+                    System.err.println((i + 1) + " submitted");
                 }
             }
             for (int i = 0; i < ts.length; i++) {
@@ -416,6 +429,9 @@
                         ts[i].getStatus().getException().printStackTrace();
                     }
                 }
+                else {
+                    System.out.println(ts[i].getStdOutput());
+                }
                 if (i % 100 == 0 && i > 0) {
                     System.err.println(i + " done");
                 }
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java	(revision 3676)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java	(working copy)
@@ -96,6 +96,10 @@
         add(sb, "stdin", spec.getStdInput());
         add(sb, "stdout", spec.getStdOutput());
         add(sb, "stderr", spec.getStdError());
+        
+        if (spec.isRedirected()) {
+            add(sb, "redirect", true);
+        }
 
         for (String arg : spec.getArgumentsAsList())
             add(sb, "arg", arg);
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl	(revision 3676)
+++ modules/provider-coaster/resources/worker.pl	(working copy)
@@ -126,6 +126,7 @@
 # TODO: Make this configurable (#537)
 use constant MAX_RECONNECT_ATTEMPTS => 3;
 use constant NEVER => 9999999999;
+use constant NULL_TIMESTAMP => "\x00\x00\x00\x00\x00\x00\x00\x00";
 
 use constant JOB_CHECK_INTERVAL => 0.1; # seconds
 
@@ -137,6 +138,9 @@
 use constant IOBUFSZ => 32768;
 use constant IOBLOCKSZ => 8;
 
+# Maximum size of re-directed output
+use constant MAX_OUT_REDIR_SIZE => 2048;
+
 # If true, enable a profile result that is written to the log
 my $PROFILE = 0;
 # Contains tuples (EVENT, PID, TIMESTAMP) (flattened)
@@ -1042,9 +1046,16 @@
 	my ($jobid, $statusCode, $errorCode, $msg) = @_;
 	
 	queueCmd((nullCB(), "JOBSTATUS", $jobid, 
-			encodeInt($statusCode), encodeInt($errorCode), $msg));
+			encodeInt($statusCode), encodeInt($errorCode), $msg, NULL_TIMESTAMP));
 }
 
+sub queueJobStatusCmdExt {
+	my ($jobid, $statusCode, $errorCode, $msg, $out, $err) = @_;
+	
+	queueCmd((nullCB(), "JOBSTATUS", $jobid, 
+			encodeInt($statusCode), encodeInt($errorCode), $msg, NULL_TIMESTAMP, $out, $err));
+}
+
 sub dieNicely {
 	my ($msg) = @_;
 	
@@ -1529,16 +1540,80 @@
 	}
 }
 
+sub readFile {
+	my ($jobid, $fname) = @_;
+	
+	wlog DEBUG, "$jobid Reading $fname\n";
+	if (-e $fname) {
+		my $fd;
+		my $content;
+		
+		$content = "";
+		
+		open($fd, "<", $fname) or return "Error: could not open $fname";
+		while (<$fd>) {
+			chomp;
+			$content = $content . $_;
+			if (length($content) > MAX_OUT_REDIR_SIZE) {
+				close($fd);
+				$content = $content . "\n<output truncated>";
+				last;
+			}
+		}
+		close($fd);
+		wlog DEBUG, "$jobid $fname: $content\n";
+		return $content;
+	}
+	else {
+		wlog DEBUG, "$jobid $fname does not exist\n";
+		return "";
+	}
+}
+
+sub readFiles {
+	my ($jobid) = @_;
+	
+	my $pid = $JOBDATA{$jobid}{"pid"};
+	
+	return (readFile($jobid, tmpSFile($pid, "out")), readFile($jobid, tmpSFile($pid, "err")));
+}
+
 sub sendStatus {
 	my ($jobid) = @_;
 
 	my $ec = $JOBDATA{$jobid}{"exitcode"};
-
+	
+	my $stdoutRedir;
+	my $stderrRedir;
+	my $redirect;
+	
+	$redirect = 0;
+	
+	if (defined $JOBDATA{$jobid}{"job"}{"redirect"}) {
+		wlog DEBUG, "$jobid Output is redirected\n";
+		($stdoutRedir, $stderrRedir) = readFiles($jobid);
+		$redirect = 1;
+	}
+	else {
+		$stdoutRedir = "";
+		$stderrRedir = "";
+	}
+	
 	if ($ec == 0) {
-		queueJobStatusCmd($jobid, COMPLETED, 0, "");
+		if ($redirect) {
+			queueJobStatusCmdExt($jobid, COMPLETED, 0, "", $stdoutRedir, $stderrRedir);
+		}
+		else {
+			queueJobStatusCmd($jobid, COMPLETED, 0, "");
+		}
 	}
 	else {
-		queueJobStatusCmd($jobid, FAILED, $ec, getExitMessage($jobid, $ec));
+		if ($redirect) {
+			queueJobStatusCmdExt($jobid, FAILED, $ec, getExitMessage($jobid, $ec), $stdoutRedir, $stderrRedir);
+		}
+		else {
+			queueJobStatusCmd($jobid, FAILED, $ec, getExitMessage($jobid, $ec));
+		}
 	}
 }
 
@@ -1833,7 +1908,7 @@
 	pipe($PARENT_R, $CHILD_W);
 
 	$pid = fork();
-
+	
 	if (defined($pid)) {
 		if ($pid == 0) {
 			close $PARENT_R;
@@ -1843,6 +1918,7 @@
 		else {
 			wlog INFO, "$JOBID Forked process $pid. Waiting for its completion\n";
 			close $CHILD_W;
+			$JOBDATA{$JOBID}{"pid"} = $pid;
 			$JOBS_RUNNING++;
 			$JOBWAITDATA{$JOBID} = {
 				pid => $pid,
@@ -1970,11 +2046,17 @@
 	return 1;
 }
 
+sub tmpSFile {
+	my ($pid, $suffix) = @_;
+	
+	return "/tmp/$pid.$suffix";
+}
+
 sub runjob {
 	my ($WR, $JOB, $JOBARGS, $JOBENV, $JOBSLOT, $WORKERPID) = @_;
 	my $executable = $$JOB{"executable"};
-	my $stdout = $$JOB{"stdout"};
-	my $stderr = $$JOB{"stderr"};
+	my $sout = $$JOB{"stdout"};
+	my $serr = $$JOB{"stderr"};
 
 	my $cwd = getcwd();
 	# wlog DEBUG, "CWD: $cwd\n";
@@ -1987,19 +2069,24 @@
     $ENV{"SWIFT_WORKER_PID"} = $WORKERPID;
 	unshift @$JOBARGS, $executable;
 	wlog DEBUG, "Command: @$JOBARGS\n";
-	if (defined $$JOB{directory}) {
+	if (defined $$JOB{"directory"}) {
 		wlog DEBUG, "chdir: $$JOB{directory}\n";
 	    chdir $$JOB{directory};
 	}
-	if (defined $stdout) {
-		wlog DEBUG, "STDOUT: $stdout\n";
+	if (defined $$JOB{"redirect"}) {
+		wlog DEBUG, "Redirection is on\n";
+		$sout = tmpSFile($$, "out");
+		$serr = tmpSFile($$, "err");
+	}
+	if (defined $sout) {
+		wlog DEBUG, "STDOUT: $sout\n";
 		close STDOUT;
-		open STDOUT, ">$stdout" or dieNicely("Cannot redirect STDOUT");
+		open STDOUT, ">$sout" or dieNicely("Cannot redirect STDOUT");
 	}
-	if (defined $stderr) {
-		wlog DEBUG, "STDERR: $stderr\n";
+	if (defined $serr) {
+		wlog DEBUG, "STDERR: $serr\n";
 		close STDERR;
-		open STDERR, ">$stderr" or dieNicely("Cannot redirect STDERR");
+		open STDERR, ">$serr" or dieNicely("Cannot redirect STDERR");
 	}
 	close STDIN;
 



More information about the Swift-commit mailing list