[Swift-commit] r3881 - in branches/release-0.92/src/org/griphyn/vdl/karajan/lib: . replication

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Thu Jan 6 17:19:27 CST 2011


Author: hategan
Date: 2011-01-06 17:19:27 -0600 (Thu, 06 Jan 2011)
New Revision: 3881

Modified:
   branches/release-0.92/src/org/griphyn/vdl/karajan/lib/Execute.java
   branches/release-0.92/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java
Log:
removed job info message: it is available by logging abstraction classes; use setStack(); remove done jobs from running map in ReplicationManager; also remove canceled jobs from running map in ReplicationManager to prevent repeated cancelling in case provide does not properly implement cancel()

Modified: branches/release-0.92/src/org/griphyn/vdl/karajan/lib/Execute.java
===================================================================
--- branches/release-0.92/src/org/griphyn/vdl/karajan/lib/Execute.java	2011-01-06 22:40:54 UTC (rev 3880)
+++ branches/release-0.92/src/org/griphyn/vdl/karajan/lib/Execute.java	2011-01-06 23:19:27 UTC (rev 3881)
@@ -15,6 +15,7 @@
 import org.globus.cog.karajan.util.TypeUtil;
 import org.globus.cog.karajan.workflow.ExecutionException;
 import org.globus.cog.karajan.workflow.KarajanRuntimeException;
+import org.globus.cog.karajan.workflow.events.EventBus;
 import org.globus.cog.karajan.workflow.futures.FutureVariableArguments;
 import org.globus.cog.karajan.workflow.nodes.grid.GridExec;
 import org.griphyn.vdl.karajan.lib.replication.CanceledReplicaException;
@@ -48,9 +49,7 @@
 			registerReplica(stack, task);
 			log(task, stack);
 			scheduler.addJobStatusListener(this, task);
-			synchronized (tasks) {
-				tasks.put(task, stack);
-			}
+			setStack(task, stack);
 			scheduler.enqueue(task, constraints);
 		}
 		catch (CanceledReplicaException e) {
@@ -72,19 +71,6 @@
 	    if (logger.isDebugEnabled()) {
 	        logger.debug("jobid="+jobid+" task=" + task);
 	    }
-	    else if (logger.isInfoEnabled()) {
-	        Specification spec = task.getSpecification();
-	        if (spec instanceof JobSpecification) {
-	            JobSpecification jobspec = (JobSpecification) spec;
-	            logger.info("Submit: " +
-	                "in: " + jobspec.getDirectory() +
-	                " command: " + jobspec.getExecutable() + 
-	                " " + jobspec.getArguments());
-	        }
-	        else {
-	            logger.info("Submit: " + spec);
-	        }
-	    }
 	}
 
 	protected void registerReplica(VariableStack stack, Task task) throws CanceledReplicaException {
@@ -115,6 +101,9 @@
 					getReplicationManager(stack).active(task, e.getStatus().getTime());
 					((FutureVariableArguments) A_REPLICATION_CHANNEL.getValue(stack)).close();
 				}
+				else if (e.getStatus().isTerminal()) {
+				    getReplicationManager(stack).terminated(task);
+				}
 				else if (c == ReplicationManager.STATUS_NEEDS_REPLICATION) {
 					RuntimeStats.setProgress(stack, "Replicating");
 					((FutureVariableArguments) A_REPLICATION_CHANNEL.getValue(stack)).append(Boolean.TRUE);

Modified: branches/release-0.92/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java
===================================================================
--- branches/release-0.92/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java	2011-01-06 22:40:54 UTC (rev 3880)
+++ branches/release-0.92/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java	2011-01-06 23:19:27 UTC (rev 3881)
@@ -5,7 +5,6 @@
 
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
@@ -30,7 +29,7 @@
     private int n;
     private long s;
     private long s2;
-    private Map queued, running;
+    private Map<Task, Date> queued, running;
     private int minQueueTime, limit;
     private boolean enabled;
     private ReplicationGroups replicationGroups;
@@ -39,8 +38,8 @@
     public ReplicationManager(Scheduler scheduler) {
         this.replicationGroups = new ReplicationGroups(scheduler);
         this.scheduler = scheduler;
-        queued = new HashMap();
-        running = new HashMap();
+        queued = new HashMap<Task, Date>();
+        running = new HashMap<Task, Date>();
         try {
             minQueueTime = Integer.parseInt(VDL2Config.getConfig().getProperty(
                     "replication.min.queue.time"));
@@ -80,7 +79,7 @@
         if (enabled) {
             Date submitted;
             synchronized (this) {
-                submitted = (Date) queued.remove(task);
+                submitted = queued.remove(task);
                 registerRunning(task, time);
                 if (submitted != null) {
                     long delta = (time.getTime() - submitted.getTime()) / 1000;
@@ -104,7 +103,9 @@
        		seconds = WallTime.timeToSeconds(walltime.toString());
         }
         Date deadline = new Date(time.getTime() + WALLTIME_DEADLINE_MULTIPLIER * seconds * 1000);
-        running.put(task, deadline);
+        synchronized (this) {
+            running.put(task, deadline);
+        }
     }
 
     public synchronized int getN() {
@@ -130,26 +131,21 @@
     }
 
     public void checkTasks() {
-        Map m, r;
+        Map<Task, Date> m, r;
         synchronized (this) {
-            m = new HashMap(queued);
-            r = new HashMap(running);
+            m = new HashMap<Task, Date>(queued);
+            r = new HashMap<Task, Date>(running);
         }
-        Iterator i;
-        i = m.entrySet().iterator();
-        while (i.hasNext()) {
-            Map.Entry e = (Map.Entry) i.next();
-            Task t = (Task) e.getKey();
-            Date d = (Date) e.getValue();
+        for (Map.Entry<Task, Date> e : m.entrySet()) {
+            Task t = e.getKey();
+            Date d = e.getValue();
             if (shouldBeReplicated(t, d)) {
                 replicationGroups.requestReplica(t);
             }
         }
-        i = r.entrySet().iterator();
-        while (i.hasNext()) {
-            Map.Entry e = (Map.Entry) i.next();
-            Task t = (Task) e.getKey();
-            Date d = (Date) e.getValue();
+        for (Map.Entry<Task, Date> e : r.entrySet()) {
+            Task t = e.getKey();
+            Date d = e.getValue();
             if (isOverDeadline(t, d)) {
                 logger.info(t + ": deadline passed. Cancelling job.");
                 cancelTask(t);
@@ -183,5 +179,15 @@
     
     private void cancelTask(Task t) {
         scheduler.cancelTask(t, "Walltime exceeded");
+        // prevent repeated cancelling in case the provider doesn't support cancel()
+        synchronized (this) {
+            running.remove(t);
+        }
     }
+
+    public void terminated(Task task) {
+        synchronized (this) {
+            running.remove(task);
+        }
+    }
 }
\ No newline at end of file




More information about the Swift-commit mailing list