[Swift-commit] cog r3716
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Wed Jul 10 15:50:02 CDT 2013
------------------------------------------------------------------------
r3716 | hategan | 2013-07-10 15:49:08 -0500 (Wed, 10 Jul 2013) | 1 line
fixed only-one-batch-of-jobs-works bug in coasters
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java (revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java (working copy)
@@ -34,7 +34,7 @@
this.task = task;
this.channelContext = channelContext;
this.task.addStatusListener(this);
- NotificationManager.getDefault().registerTask(task.getIdentity().getValue(), task, this);
+ NotificationManager.getDefault().registerListener(task.getIdentity().getValue(), task, this);
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (working copy)
@@ -14,17 +14,16 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.coaster.service.CoasterService;
-import org.globus.cog.abstraction.impl.common.StatusEvent;
+import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
import org.globus.cog.abstraction.impl.execution.coaster.SubmitJobCommand;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Status;
-import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
-public class Cpu implements Comparable<Cpu>, Callback, StatusListener {
+public class Cpu implements Comparable<Cpu>, Callback, ExtendedStatusListener {
public static final Logger logger = Logger.getLogger(Cpu.class);
private static PullThread pullThread;
@@ -215,7 +214,8 @@
}
boolean launchSequential() {
- running.getTask().addStatusListener(this);
+ Task t = running.getTask();
+ NotificationManager.getDefault().registerListener(t.getIdentity().getValue(), t, this);
idleTime += timeDiff();
timelast = running.getEndTime();
if (timelast == null) {
@@ -360,14 +360,13 @@
task.setStatus(Status.SUBMITTED);
}
- public synchronized void statusChanged(StatusEvent event) {
+ public synchronized void statusChanged(Status s, String out, String err) {
if (logger.isDebugEnabled()) {
- logger.debug(event);
+ logger.debug(s);
}
- if (event.getStatus().isTerminal()) {
- running.getTask().removeStatusListener(this);
+ if (s.isTerminal()) {
running.setEndTime(Time.now());
- if (event.getStatus().getStatusCode() == Status.FAILED) {
+ if (s.getStatusCode() == Status.FAILED) {
failedJobs++;
totalFailedJobs++;
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java (revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Mpiexec.java (working copy)
@@ -339,7 +339,7 @@
String value = cloneID.getValue() + ":" + i;
cloneID.setValue(value);
clone.setIdentity(cloneID);
- NotificationManager.getDefault().registerTask(value, clone, this);
+ NotificationManager.getDefault().registerListener(value, clone, this);
// Update Task Specification
JobSpecification spec = (JobSpecification) clone.getSpecification();
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java (revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java (working copy)
@@ -58,7 +58,7 @@
lastNotificationTime = System.currentTimeMillis();
}
- public void registerTask(String id, Task task, ExtendedStatusListener l) {
+ public void registerListener(String id, Task task, ExtendedStatusListener l) {
List<ExtendedStatus> p;
synchronized (listeners) {
listeners.put(id, new TaskListenerPair(task, l));
@@ -74,21 +74,22 @@
public void notificationReceived(String id, Status s, String out, String err) {
if (logger.isDebugEnabled())
logger.debug("recvd: for: " + id + " " + s);
- TaskListenerPair l;
+ TaskListenerPair ls;
synchronized (listeners) {
if (s.isTerminal()) {
- l = listeners.remove(id);
+ ls = listeners.remove(id);
}
else {
- l = listeners.get(id);
+ ls = listeners.get(id);
}
lastNotificationTime = System.currentTimeMillis();
- if (l == null) {
+ if (ls == null) {
addPending(id, new ExtendedStatus(s, out, err));
}
}
- if (l != null) {
- l.listener.statusChanged(s, out, err);
+ if (ls != null) {
+ for (ExtendedStatusListener l : ls.listeners)
+ l.statusChanged(s, out, err);
}
}
@@ -165,11 +166,16 @@
private static final class TaskListenerPair {
public final Task task;
- public final ExtendedStatusListener listener;
+ public final List<ExtendedStatusListener> listeners;
public TaskListenerPair(Task task, ExtendedStatusListener listener) {
this.task = task;
- this.listener = listener;
+ this.listeners = new LinkedList<ExtendedStatusListener>();
+ this.listeners.add(listener);
}
+
+ public void addListener(ExtendedStatusListener l) {
+ this.listeners.add(l);
+ }
}
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java (revision 3715)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java (working copy)
@@ -236,7 +236,7 @@
if (logger.isDebugEnabled()) {
logger.debug("Submitted task " + getTask() + ". Job id: " + jobid);
}
- NotificationManager.getDefault().registerTask(jobid, getTask(), this);
+ NotificationManager.getDefault().registerListener(jobid, getTask(), this);
}
}
More information about the Swift-commit
mailing list