[Swift-commit] r3243 - trunk/src/org/griphyn/vdl/karajan/lib
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Wed Feb 17 15:59:57 CST 2010
Author: hategan
Date: 2010-02-17 15:59:57 -0600 (Wed, 17 Feb 2010)
New Revision: 3243
Modified:
trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
Log:
terminate iterations if selfclose is set and if there are no current ongoing iterators and the iterator is empty
Modified: trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java 2010-02-17 21:58:52 UTC (rev 3242)
+++ trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java 2010-02-17 21:59:57 UTC (rev 3243)
@@ -29,6 +29,7 @@
import org.globus.cog.karajan.workflow.events.EventTargetPair;
import org.globus.cog.karajan.workflow.events.FutureNotificationEvent;
import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
+import org.globus.cog.karajan.workflow.futures.FutureFault;
import org.globus.cog.karajan.workflow.futures.FutureIterator;
import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
import org.globus.cog.karajan.workflow.nodes.AbstractParallelIterator;
@@ -42,16 +43,23 @@
public static final Arg A_NAME = new Arg.Positional("name");
public static final Arg A_IN = new Arg.Positional("in");
+ public static final Arg A_SELF_CLOSE = new Arg.Optional("selfclose", Boolean.FALSE);
static {
- setArguments(ThrottledParallelFor.class, new Arg[] { A_NAME, A_IN });
+ setArguments(ThrottledParallelFor.class, new Arg[] { A_NAME, A_IN, A_SELF_CLOSE });
}
public static final String THREAD_COUNT = "#threadcount";
private int maxThreadCount = -1;
- public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
+ protected void partialArgumentsEvaluated(VariableStack stack)
+ throws ExecutionException {
+ stack.setVar("selfclose", A_SELF_CLOSE.getValue(stack));
+ super.partialArgumentsEvaluated(stack);
+ }
+
+ public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
throws ExecutionException {
if (elementCount() > 0) {
if (logger.isDebugEnabled()) {
@@ -61,7 +69,8 @@
setChildFailed(stack, false);
stack.setCaller(this);
initializeChannelBuffers(stack);
- initThreadCount(stack);
+ initThreadCount(stack, TypeUtil.toBoolean(stack.currentFrame().getVar("selfclose")), i);
+ stack.currentFrame().deleteVar("selfclose");
citerate(stack, var, i);
}
else {
@@ -74,7 +83,7 @@
ThreadCount tc = getThreadCount(stack);
try {
while (i.hasNext()) {
- Object value = tc.tryIncrement(i);
+ Object value = tc.tryIncrement();
VariableStack copy = stack.copy();
copy.enter();
ThreadingContext.set(copy, ThreadingContext.get(copy).split(
@@ -86,7 +95,12 @@
startElement(getArgCount(), copy);
}
- int left = tc.decrement();
+ int left;
+ synchronized(tc) {
+ // can only have closed and running = 0 in one place
+ tc.close();
+ left = tc.current();
+ }
if (left == 0) {
complete(stack);
}
@@ -105,13 +119,22 @@
closeBuffers(stack);
stack.leave();
ThreadCount tc = getThreadCount(stack);
- int running = tc.decrement();
+ int running;
+ boolean closed;
+ boolean iteratorHasValues;
+ synchronized(tc) {
+ closed = tc.isClosed();
+ running = tc.decrement();
+ iteratorHasValues = !tc.selfClose || tc.iteratorHasValues();
+ }
if (running == 0) {
- complete(stack);
+ if (closed || !iteratorHasValues) {
+ complete(stack);
+ }
}
}
- private void initThreadCount(VariableStack stack) {
+ private void initThreadCount(VariableStack stack, boolean selfClose, KarajanIterator i) {
if (maxThreadCount < 0) {
try {
maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
@@ -121,7 +144,7 @@
maxThreadCount = DEFAULT_MAX_THREADS;
}
}
- stack.setVar(THREAD_COUNT, new ThreadCount(maxThreadCount));
+ stack.setVar(THREAD_COUNT, new ThreadCount(maxThreadCount, selfClose, i));
}
private ThreadCount getThreadCount(VariableStack stack)
@@ -132,14 +155,32 @@
private static class ThreadCount implements FutureIterator {
private int maxThreadCount;
private int crt;
+ private boolean selfClose, closed;
private List listeners;
+ private KarajanIterator i;
- public ThreadCount(int maxThreadCount) {
+ public ThreadCount(int maxThreadCount, boolean selfClose, KarajanIterator i) {
this.maxThreadCount = maxThreadCount;
- crt = 1;
+ this.selfClose = selfClose;
+ this.i = i;
+ crt = 0;
}
+
+ public boolean iteratorHasValues() {
+ try {
+ return i.hasNext();
+ }
+ catch (FutureFault e) {
+ return false;
+ }
+ }
- public synchronized Object tryIncrement(KarajanIterator i) {
+ public boolean isSelfClose() {
+ return selfClose;
+ }
+
+ public synchronized Object tryIncrement() {
+ // there is no way that both crt == 0 and i has no values outside this critical section
if (crt < maxThreadCount) {
Object o = i.next();
crt++;
@@ -175,8 +216,8 @@
return 0;
}
- public int current() {
- return 0;
+ public synchronized int current() {
+ return crt;
}
public Object peek() {
@@ -205,7 +246,8 @@
}
}
- public void close() {
+ public synchronized void close() {
+ this.closed = true;
}
public void fail(FutureEvaluationException e) {
@@ -215,8 +257,8 @@
return null;
}
- public boolean isClosed() {
- return false;
+ public synchronized boolean isClosed() {
+ return closed;
}
}
}
\ No newline at end of file
More information about the Swift-commit
mailing list