[Swift-commit] r3881 - in branches/release-0.92/src/org/griphyn/vdl/karajan/lib: . replication
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Thu Jan 6 17:19:27 CST 2011
Author: hategan
Date: 2011-01-06 17:19:27 -0600 (Thu, 06 Jan 2011)
New Revision: 3881
Modified:
branches/release-0.92/src/org/griphyn/vdl/karajan/lib/Execute.java
branches/release-0.92/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java
Log:
removed job info message: it is available by logging abstraction classes; use setStack(); remove done jobs from running map in ReplicationManager; also remove canceled jobs from running map in ReplicationManager to prevent repeated cancelling in case provide does not properly implement cancel()
Modified: branches/release-0.92/src/org/griphyn/vdl/karajan/lib/Execute.java
===================================================================
--- branches/release-0.92/src/org/griphyn/vdl/karajan/lib/Execute.java 2011-01-06 22:40:54 UTC (rev 3880)
+++ branches/release-0.92/src/org/griphyn/vdl/karajan/lib/Execute.java 2011-01-06 23:19:27 UTC (rev 3881)
@@ -15,6 +15,7 @@
import org.globus.cog.karajan.util.TypeUtil;
import org.globus.cog.karajan.workflow.ExecutionException;
import org.globus.cog.karajan.workflow.KarajanRuntimeException;
+import org.globus.cog.karajan.workflow.events.EventBus;
import org.globus.cog.karajan.workflow.futures.FutureVariableArguments;
import org.globus.cog.karajan.workflow.nodes.grid.GridExec;
import org.griphyn.vdl.karajan.lib.replication.CanceledReplicaException;
@@ -48,9 +49,7 @@
registerReplica(stack, task);
log(task, stack);
scheduler.addJobStatusListener(this, task);
- synchronized (tasks) {
- tasks.put(task, stack);
- }
+ setStack(task, stack);
scheduler.enqueue(task, constraints);
}
catch (CanceledReplicaException e) {
@@ -72,19 +71,6 @@
if (logger.isDebugEnabled()) {
logger.debug("jobid="+jobid+" task=" + task);
}
- else if (logger.isInfoEnabled()) {
- Specification spec = task.getSpecification();
- if (spec instanceof JobSpecification) {
- JobSpecification jobspec = (JobSpecification) spec;
- logger.info("Submit: " +
- "in: " + jobspec.getDirectory() +
- " command: " + jobspec.getExecutable() +
- " " + jobspec.getArguments());
- }
- else {
- logger.info("Submit: " + spec);
- }
- }
}
protected void registerReplica(VariableStack stack, Task task) throws CanceledReplicaException {
@@ -115,6 +101,9 @@
getReplicationManager(stack).active(task, e.getStatus().getTime());
((FutureVariableArguments) A_REPLICATION_CHANNEL.getValue(stack)).close();
}
+ else if (e.getStatus().isTerminal()) {
+ getReplicationManager(stack).terminated(task);
+ }
else if (c == ReplicationManager.STATUS_NEEDS_REPLICATION) {
RuntimeStats.setProgress(stack, "Replicating");
((FutureVariableArguments) A_REPLICATION_CHANNEL.getValue(stack)).append(Boolean.TRUE);
Modified: branches/release-0.92/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java
===================================================================
--- branches/release-0.92/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java 2011-01-06 22:40:54 UTC (rev 3880)
+++ branches/release-0.92/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java 2011-01-06 23:19:27 UTC (rev 3881)
@@ -5,7 +5,6 @@
import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -30,7 +29,7 @@
private int n;
private long s;
private long s2;
- private Map queued, running;
+ private Map<Task, Date> queued, running;
private int minQueueTime, limit;
private boolean enabled;
private ReplicationGroups replicationGroups;
@@ -39,8 +38,8 @@
public ReplicationManager(Scheduler scheduler) {
this.replicationGroups = new ReplicationGroups(scheduler);
this.scheduler = scheduler;
- queued = new HashMap();
- running = new HashMap();
+ queued = new HashMap<Task, Date>();
+ running = new HashMap<Task, Date>();
try {
minQueueTime = Integer.parseInt(VDL2Config.getConfig().getProperty(
"replication.min.queue.time"));
@@ -80,7 +79,7 @@
if (enabled) {
Date submitted;
synchronized (this) {
- submitted = (Date) queued.remove(task);
+ submitted = queued.remove(task);
registerRunning(task, time);
if (submitted != null) {
long delta = (time.getTime() - submitted.getTime()) / 1000;
@@ -104,7 +103,9 @@
seconds = WallTime.timeToSeconds(walltime.toString());
}
Date deadline = new Date(time.getTime() + WALLTIME_DEADLINE_MULTIPLIER * seconds * 1000);
- running.put(task, deadline);
+ synchronized (this) {
+ running.put(task, deadline);
+ }
}
public synchronized int getN() {
@@ -130,26 +131,21 @@
}
public void checkTasks() {
- Map m, r;
+ Map<Task, Date> m, r;
synchronized (this) {
- m = new HashMap(queued);
- r = new HashMap(running);
+ m = new HashMap<Task, Date>(queued);
+ r = new HashMap<Task, Date>(running);
}
- Iterator i;
- i = m.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- Task t = (Task) e.getKey();
- Date d = (Date) e.getValue();
+ for (Map.Entry<Task, Date> e : m.entrySet()) {
+ Task t = e.getKey();
+ Date d = e.getValue();
if (shouldBeReplicated(t, d)) {
replicationGroups.requestReplica(t);
}
}
- i = r.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- Task t = (Task) e.getKey();
- Date d = (Date) e.getValue();
+ for (Map.Entry<Task, Date> e : r.entrySet()) {
+ Task t = e.getKey();
+ Date d = e.getValue();
if (isOverDeadline(t, d)) {
logger.info(t + ": deadline passed. Cancelling job.");
cancelTask(t);
@@ -183,5 +179,15 @@
private void cancelTask(Task t) {
scheduler.cancelTask(t, "Walltime exceeded");
+ // prevent repeated cancelling in case the provider doesn't support cancel()
+ synchronized (this) {
+ running.remove(t);
+ }
}
+
+ public void terminated(Task task) {
+ synchronized (this) {
+ running.remove(task);
+ }
+ }
}
\ No newline at end of file
More information about the Swift-commit
mailing list