[Swift-commit] cog r3716

swift at ci.uchicago.edu swift at ci.uchicago.edu
Wed Jul 10 15:50:02 CDT 2013


------------------------------------------------------------------------
r3716 | hategan | 2013-07-10 15:49:08 -0500 (Wed, 10 Jul 2013) | 1 line

fixed only-one-batch-of-jobs-works bug in coasters
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java	(revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java	(working copy)
@@ -34,7 +34,7 @@
         this.task = task;
         this.channelContext = channelContext;
         this.task.addStatusListener(this);
-        NotificationManager.getDefault().registerTask(task.getIdentity().getValue(), task, this);
+        NotificationManager.getDefault().registerListener(task.getIdentity().getValue(), task, this);
     }
     
    
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(working copy)
@@ -14,17 +14,16 @@
 
 import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.coaster.service.CoasterService;
-import org.globus.cog.abstraction.impl.common.StatusEvent;
+import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
 import org.globus.cog.abstraction.impl.execution.coaster.SubmitJobCommand;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
 import org.globus.cog.abstraction.interfaces.Status;
-import org.globus.cog.abstraction.interfaces.StatusListener;
 import org.globus.cog.abstraction.interfaces.Task;
 import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
 import org.globus.cog.karajan.workflow.service.commands.Command;
 import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
 
-public class Cpu implements Comparable<Cpu>, Callback, StatusListener {
+public class Cpu implements Comparable<Cpu>, Callback, ExtendedStatusListener {
     public static final Logger logger = Logger.getLogger(Cpu.class);
 
     private static PullThread pullThread;
@@ -215,7 +214,8 @@
     }
 
     boolean launchSequential() {
-        running.getTask().addStatusListener(this);
+        Task t = running.getTask();
+        NotificationManager.getDefault().registerListener(t.getIdentity().getValue(), t, this);
         idleTime += timeDiff();
         timelast = running.getEndTime();
         if (timelast == null) {
@@ -360,14 +360,13 @@
         task.setStatus(Status.SUBMITTED);
     }
 
-     public synchronized void statusChanged(StatusEvent event) {
+     public synchronized void statusChanged(Status s, String out, String err) {
          if (logger.isDebugEnabled()) {
-             logger.debug(event);
+             logger.debug(s);
          }
-         if (event.getStatus().isTerminal()) {
-             running.getTask().removeStatusListener(this);
+         if (s.isTerminal()) {
              running.setEndTime(Time.now());
-             if (event.getStatus().getStatusCode() == Status.FAILED) {
+             if (s.getStatusCode() == Status.FAILED) {
                  failedJobs++;
                  totalFailedJobs++;
              }
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java	(revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java	(working copy)
@@ -339,7 +339,7 @@
         String value = cloneID.getValue() + ":" + i;
         cloneID.setValue(value);
         clone.setIdentity(cloneID);
-        NotificationManager.getDefault().registerTask(value, clone, this);
+        NotificationManager.getDefault().registerListener(value, clone, this);
 
         // Update Task Specification
         JobSpecification spec = (JobSpecification) clone.getSpecification();
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java	(revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java	(working copy)
@@ -58,7 +58,7 @@
         lastNotificationTime = System.currentTimeMillis();
     }
 
-    public void registerTask(String id, Task task, ExtendedStatusListener l) {
+    public void registerListener(String id, Task task, ExtendedStatusListener l) {
         List<ExtendedStatus> p;
         synchronized (listeners) {
             listeners.put(id, new TaskListenerPair(task, l));
@@ -74,21 +74,22 @@
     public void notificationReceived(String id, Status s, String out, String err) {
         if (logger.isDebugEnabled())
             logger.debug("recvd: for: " + id + " " + s);
-        TaskListenerPair l;
+        TaskListenerPair ls;
         synchronized (listeners) {
             if (s.isTerminal()) {
-                l = listeners.remove(id);
+                ls = listeners.remove(id);
             }
             else {
-                l = listeners.get(id);
+                ls = listeners.get(id);
             }
             lastNotificationTime = System.currentTimeMillis();
-            if (l == null) {
+            if (ls == null) {
             	addPending(id, new ExtendedStatus(s, out, err));
             }
         }
-        if (l != null) {
-            l.listener.statusChanged(s, out, err);
+        if (ls != null) {
+            for (ExtendedStatusListener l : ls.listeners)
+            l.statusChanged(s, out, err);
         }
     }
 
@@ -165,11 +166,16 @@
     
     private static final class TaskListenerPair {
         public final Task task;
-        public final ExtendedStatusListener listener;
+        public final List<ExtendedStatusListener> listeners;
         
         public TaskListenerPair(Task task, ExtendedStatusListener listener) {
             this.task = task;
-            this.listener = listener;
+            this.listeners = new LinkedList<ExtendedStatusListener>();
+            this.listeners.add(listener);
         }
+        
+        public void addListener(ExtendedStatusListener l) {
+            this.listeners.add(l);
+        }
     }
 }
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java	(revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java	(working copy)
@@ -236,7 +236,7 @@
             if (logger.isDebugEnabled()) {
                 logger.debug("Submitted task " + getTask() + ". Job id: " + jobid);
             }
-            NotificationManager.getDefault().registerTask(jobid, getTask(), this);
+            NotificationManager.getDefault().registerListener(jobid, getTask(), this);
         }
     }
 



More information about the Swift-commit mailing list