[Swift-commit] r6335 - in branches/faster: libexec src/org/griphyn/vdl/karajan/lib

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Wed Mar 6 01:24:14 CST 2013


Author: hategan
Date: 2013-03-06 01:24:13 -0600 (Wed, 06 Mar 2013)
New Revision: 6335

Modified:
   branches/faster/libexec/swift.k
   branches/faster/src/org/griphyn/vdl/karajan/lib/Execute.java
Log:
when replication is disabled, don't use the mechanism used by replication to start jobs, but run directly

Modified: branches/faster/libexec/swift.k
===================================================================
--- branches/faster/libexec/swift.k	2013-03-06 07:23:14 UTC (rev 6334)
+++ branches/faster/libexec/swift.k	2013-03-06 07:24:13 UTC (rev 6335)
@@ -199,17 +199,37 @@
 						throttled {
 							setProgress(progress, "Selecting site")
 							restartOnError(number(swift:configProperty("execution.retries"))) {
-								replicationChannel := channel:new()
-								//trigger the first job
-								discard(append(replicationChannel, true)) 
-								replicationGroup := UID()
-								parallelFor(i, replicationChannel) {
+								if (swift:configProperty("replication.enabled") == "true") {
+									replicationChannel := channel:new()
+									//trigger the first job
+									discard(append(replicationChannel, true)) 
+									replicationGroup := UID()
+									parallelFor(i, replicationChannel) {
+										try {
+											execute2(
+												progress, 
+												tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
+												stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+												stagein, stageout, restartout, replicationGroup, replicationChannel
+											)
+										}
+										else catch(exception) {
+											if (matches(exception, "^Abort$")) {
+												// ignored
+											}
+											else {
+												throw(exception)
+											}
+										}
+									}
+								}
+								else {
 									try {
 										execute2(
 											progress, 
 											tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
 											stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
-											stagein, stageout, restartout, replicationGroup, replicationChannel
+											stagein, stageout, restartout, null, null
 										)
 									}
 									else catch(exception) {

Modified: branches/faster/src/org/griphyn/vdl/karajan/lib/Execute.java
===================================================================
--- branches/faster/src/org/griphyn/vdl/karajan/lib/Execute.java	2013-03-06 07:23:14 UTC (rev 6334)
+++ branches/faster/src/org/griphyn/vdl/karajan/lib/Execute.java	2013-03-06 07:24:13 UTC (rev 6335)
@@ -42,6 +42,7 @@
 import org.griphyn.vdl.karajan.lib.RuntimeStats.ProgressState;
 import org.griphyn.vdl.karajan.lib.replication.CanceledReplicaException;
 import org.griphyn.vdl.karajan.lib.replication.ReplicationManager;
+import org.griphyn.vdl.util.VDL2Config;
 
 public class Execute extends GridExec {
 	public static final Logger logger = Logger.getLogger(Execute.class);
@@ -53,6 +54,8 @@
 	
 	private VarRef<Context> context;
 	
+	private boolean replicationEnabled;
+	
 	@Override
     protected Signature getSignature() {
 	    Signature sig = super.getSignature();
@@ -67,12 +70,17 @@
     protected void addLocals(Scope scope) {
         super.addLocals(scope);
         context = scope.getVarRef("#context");
+        VDL2Config config = (VDL2Config) context.getValue().getAttribute("SWIFT:CONFIG");
+        replicationEnabled = "true".equals(config.getProperty("replication.enabled"));
     }
 
     @Override
     public void submitScheduled(Scheduler scheduler, Task task, Stack stack, Object constraints) {
 		try {
-			registerReplica(stack, task);
+		    setTaskIdentity(stack, task);
+		    if (replicationEnabled) {
+		        registerReplica(stack, task);
+		    }
 			log(task, stack);
 			
 			TaskStateFuture tsf = new SwiftTaskStateFuture(stack, task, false);
@@ -99,8 +107,6 @@
 	}
 
 	protected void registerReplica(Stack stack, Task task) throws CanceledReplicaException {
-		setTaskIdentity(stack, task);
-		
 		String rg = this.replicationGroup.getValue(stack);
 		if (rg != null) {
 			getReplicationManager(stack).register(rg, task);
@@ -122,7 +128,9 @@
     				ProgressState ps = progress.getValue(stack);
     				if (c == Status.SUBMITTED) {
     				    ps.setState("Submitted");
-    					getReplicationManager(stack).submitted(task, e.getStatus().getTime());
+    				    if (replicationEnabled) {
+    				        getReplicationManager(stack).submitted(task, e.getStatus().getTime());
+    				    }
     				}
     				else if (c == Status.STAGE_IN) {
     				    ps.setState("Stage in");
@@ -132,11 +140,15 @@
     				}
     				else if (c == Status.ACTIVE) {
     					ps.setState("Active");
-    					getReplicationManager(stack).active(task, e.getStatus().getTime());
-    					Execute.this.replicationChannel.getValue(stack).close();
+    					if (replicationEnabled) {
+    					    getReplicationManager(stack).active(task, e.getStatus().getTime());
+    					    Execute.this.replicationChannel.getValue(stack).close();
+    					}
     				}
     				else if (e.getStatus().isTerminal()) {
-    				    getReplicationManager(stack).terminated(task);
+    				    if (replicationEnabled) {
+    				        getReplicationManager(stack).terminated(task);
+    				    }
     				}
     				else if (c == ReplicationManager.STATUS_NEEDS_REPLICATION) {
     					ps.setState("Replicating");




More information about the Swift-commit mailing list