[Swift-commit] r3243 - trunk/src/org/griphyn/vdl/karajan/lib

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Wed Feb 17 15:59:57 CST 2010


Author: hategan
Date: 2010-02-17 15:59:57 -0600 (Wed, 17 Feb 2010)
New Revision: 3243

Modified:
   trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
Log:
terminate iterations if selfclose is set and if there are no current ongoing iterators and the iterator is empty

Modified: trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java	2010-02-17 21:58:52 UTC (rev 3242)
+++ trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java	2010-02-17 21:59:57 UTC (rev 3243)
@@ -29,6 +29,7 @@
 import org.globus.cog.karajan.workflow.events.EventTargetPair;
 import org.globus.cog.karajan.workflow.events.FutureNotificationEvent;
 import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
+import org.globus.cog.karajan.workflow.futures.FutureFault;
 import org.globus.cog.karajan.workflow.futures.FutureIterator;
 import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
 import org.globus.cog.karajan.workflow.nodes.AbstractParallelIterator;
@@ -42,16 +43,23 @@
 
 	public static final Arg A_NAME = new Arg.Positional("name");
 	public static final Arg A_IN = new Arg.Positional("in");
+	public static final Arg A_SELF_CLOSE = new Arg.Optional("selfclose", Boolean.FALSE);
 
 	static {
-		setArguments(ThrottledParallelFor.class, new Arg[] { A_NAME, A_IN });
+		setArguments(ThrottledParallelFor.class, new Arg[] { A_NAME, A_IN, A_SELF_CLOSE });
 	}
 
 	public static final String THREAD_COUNT = "#threadcount";
 
 	private int maxThreadCount = -1;
 
-	public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
+    protected void partialArgumentsEvaluated(VariableStack stack)
+            throws ExecutionException {
+        stack.setVar("selfclose", A_SELF_CLOSE.getValue(stack));
+        super.partialArgumentsEvaluated(stack);
+    }
+
+    public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
 			throws ExecutionException {
 		if (elementCount() > 0) {
 			if (logger.isDebugEnabled()) {
@@ -61,7 +69,8 @@
 			setChildFailed(stack, false);
 			stack.setCaller(this);
 			initializeChannelBuffers(stack);
-			initThreadCount(stack);
+			initThreadCount(stack, TypeUtil.toBoolean(stack.currentFrame().getVar("selfclose")), i);
+			stack.currentFrame().deleteVar("selfclose");
 			citerate(stack, var, i);
 		}
 		else {
@@ -74,7 +83,7 @@
 		ThreadCount tc = getThreadCount(stack);
 		try {
 			while (i.hasNext()) {
-				Object value = tc.tryIncrement(i);
+				Object value = tc.tryIncrement();
 				VariableStack copy = stack.copy();
 				copy.enter();
 				ThreadingContext.set(copy, ThreadingContext.get(copy).split(
@@ -86,7 +95,12 @@
 				startElement(getArgCount(), copy);
 			}
 			
-			int left = tc.decrement();
+			int left;
+			synchronized(tc) {
+			    // can only have closed and running = 0 in one place
+			    tc.close();
+			    left = tc.current();
+			}
 			if (left == 0) {
 				complete(stack);
 			}
@@ -105,13 +119,22 @@
 		closeBuffers(stack);
 		stack.leave();
 		ThreadCount tc = getThreadCount(stack);
-		int running = tc.decrement();
+		int running;
+		boolean closed;
+		boolean iteratorHasValues;
+		synchronized(tc) {
+		    closed = tc.isClosed();
+		    running = tc.decrement();
+		    iteratorHasValues = !tc.selfClose || tc.iteratorHasValues();
+		}
 		if (running == 0) {
-			complete(stack);
+		    if (closed || !iteratorHasValues) {
+		        complete(stack);
+		    }
 		}
 	}
 
-	private void initThreadCount(VariableStack stack) {
+	private void initThreadCount(VariableStack stack, boolean selfClose, KarajanIterator i) {
 		if (maxThreadCount < 0) {
 			try {
 				maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
@@ -121,7 +144,7 @@
 				maxThreadCount = DEFAULT_MAX_THREADS;
 			}
 		}
-		stack.setVar(THREAD_COUNT, new ThreadCount(maxThreadCount));
+		stack.setVar(THREAD_COUNT, new ThreadCount(maxThreadCount, selfClose, i));
 	}
 
 	private ThreadCount getThreadCount(VariableStack stack)
@@ -132,14 +155,32 @@
 	private static class ThreadCount implements FutureIterator {
 		private int maxThreadCount;
 		private int crt;
+		private boolean selfClose, closed;
 		private List listeners;
+		private KarajanIterator i;
 
-		public ThreadCount(int maxThreadCount) {
+		public ThreadCount(int maxThreadCount, boolean selfClose, KarajanIterator i) {
 			this.maxThreadCount = maxThreadCount;
-			crt = 1;
+			this.selfClose = selfClose;
+			this.i = i;
+			crt = 0;
 		}
+		
+		public boolean iteratorHasValues() {
+            try {
+                return i.hasNext();
+            }
+            catch (FutureFault e) {
+                return false;
+            }
+        }
 
-		public synchronized Object tryIncrement(KarajanIterator i) {
+        public boolean isSelfClose() {
+		    return selfClose;
+		}
+
+		public synchronized Object tryIncrement() {
+		    // there is no way that both crt == 0 and i has no values outside this critical section
 			if (crt < maxThreadCount) {
 				Object o = i.next();
 				crt++;
@@ -175,8 +216,8 @@
 			return 0;
 		}
 
-		public int current() {
-			return 0;
+		public synchronized int current() {
+			return crt;
 		}
 
 		public Object peek() {
@@ -205,7 +246,8 @@
 			}
 		}
 
-		public void close() {
+		public synchronized void close() {
+		    this.closed = true;
 		}
 
 		public void fail(FutureEvaluationException e) {
@@ -215,8 +257,8 @@
 			return null;
 		}
 
-		public boolean isClosed() {
-			return false;
+		public synchronized boolean isClosed() {
+		    return closed;
 		}
 	}
 }
\ No newline at end of file




More information about the Swift-commit mailing list