[Swift-commit] r6335 - in branches/faster: libexec src/org/griphyn/vdl/karajan/lib
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Wed Mar 6 01:24:14 CST 2013
Author: hategan
Date: 2013-03-06 01:24:13 -0600 (Wed, 06 Mar 2013)
New Revision: 6335
Modified:
branches/faster/libexec/swift.k
branches/faster/src/org/griphyn/vdl/karajan/lib/Execute.java
Log:
when replication is disabled, don't use the mechanism used by replication to start jobs, but run directly
Modified: branches/faster/libexec/swift.k
===================================================================
--- branches/faster/libexec/swift.k 2013-03-06 07:23:14 UTC (rev 6334)
+++ branches/faster/libexec/swift.k 2013-03-06 07:24:13 UTC (rev 6335)
@@ -199,17 +199,37 @@
throttled {
setProgress(progress, "Selecting site")
restartOnError(number(swift:configProperty("execution.retries"))) {
- replicationChannel := channel:new()
- //trigger the first job
- discard(append(replicationChannel, true))
- replicationGroup := UID()
- parallelFor(i, replicationChannel) {
+ if (swift:configProperty("replication.enabled") == "true") {
+ replicationChannel := channel:new()
+ //trigger the first job
+ discard(append(replicationChannel, true))
+ replicationGroup := UID()
+ parallelFor(i, replicationChannel) {
+ try {
+ execute2(
+ progress,
+ tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
+ stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+ stagein, stageout, restartout, replicationGroup, replicationChannel
+ )
+ }
+ else catch(exception) {
+ if (matches(exception, "^Abort$")) {
+ // ignored
+ }
+ else {
+ throw(exception)
+ }
+ }
+ }
+ }
+ else {
try {
execute2(
progress,
tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
- stagein, stageout, restartout, replicationGroup, replicationChannel
+ stagein, stageout, restartout, null, null
)
}
else catch(exception) {
Modified: branches/faster/src/org/griphyn/vdl/karajan/lib/Execute.java
===================================================================
--- branches/faster/src/org/griphyn/vdl/karajan/lib/Execute.java 2013-03-06 07:23:14 UTC (rev 6334)
+++ branches/faster/src/org/griphyn/vdl/karajan/lib/Execute.java 2013-03-06 07:24:13 UTC (rev 6335)
@@ -42,6 +42,7 @@
import org.griphyn.vdl.karajan.lib.RuntimeStats.ProgressState;
import org.griphyn.vdl.karajan.lib.replication.CanceledReplicaException;
import org.griphyn.vdl.karajan.lib.replication.ReplicationManager;
+import org.griphyn.vdl.util.VDL2Config;
public class Execute extends GridExec {
public static final Logger logger = Logger.getLogger(Execute.class);
@@ -53,6 +54,8 @@
private VarRef<Context> context;
+ private boolean replicationEnabled;
+
@Override
protected Signature getSignature() {
Signature sig = super.getSignature();
@@ -67,12 +70,17 @@
protected void addLocals(Scope scope) {
super.addLocals(scope);
context = scope.getVarRef("#context");
+ VDL2Config config = (VDL2Config) context.getValue().getAttribute("SWIFT:CONFIG");
+ replicationEnabled = "true".equals(config.getProperty("replication.enabled"));
}
@Override
public void submitScheduled(Scheduler scheduler, Task task, Stack stack, Object constraints) {
try {
- registerReplica(stack, task);
+ setTaskIdentity(stack, task);
+ if (replicationEnabled) {
+ registerReplica(stack, task);
+ }
log(task, stack);
TaskStateFuture tsf = new SwiftTaskStateFuture(stack, task, false);
@@ -99,8 +107,6 @@
}
protected void registerReplica(Stack stack, Task task) throws CanceledReplicaException {
- setTaskIdentity(stack, task);
-
String rg = this.replicationGroup.getValue(stack);
if (rg != null) {
getReplicationManager(stack).register(rg, task);
@@ -122,7 +128,9 @@
ProgressState ps = progress.getValue(stack);
if (c == Status.SUBMITTED) {
ps.setState("Submitted");
- getReplicationManager(stack).submitted(task, e.getStatus().getTime());
+ if (replicationEnabled) {
+ getReplicationManager(stack).submitted(task, e.getStatus().getTime());
+ }
}
else if (c == Status.STAGE_IN) {
ps.setState("Stage in");
@@ -132,11 +140,15 @@
}
else if (c == Status.ACTIVE) {
ps.setState("Active");
- getReplicationManager(stack).active(task, e.getStatus().getTime());
- Execute.this.replicationChannel.getValue(stack).close();
+ if (replicationEnabled) {
+ getReplicationManager(stack).active(task, e.getStatus().getTime());
+ Execute.this.replicationChannel.getValue(stack).close();
+ }
}
else if (e.getStatus().isTerminal()) {
- getReplicationManager(stack).terminated(task);
+ if (replicationEnabled) {
+ getReplicationManager(stack).terminated(task);
+ }
}
else if (c == ReplicationManager.STATUS_NEEDS_REPLICATION) {
ps.setState("Replicating");
More information about the Swift-commit
mailing list