[Swift-commit] r5975 - trunk/src/org/griphyn/vdl/karajan/lib

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Tue Oct 16 18:46:06 CDT 2012


Author: hategan
Date: 2012-10-16 18:46:06 -0500 (Tue, 16 Oct 2012)
New Revision: 5975

Added:
   trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
Removed:
   trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java
Modified:
   trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java
Log:
restored ThrottledParallelFor

Deleted: trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java	2012-10-16 17:05:20 UTC (rev 5974)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java	2012-10-16 23:46:06 UTC (rev 5975)
@@ -1,111 +0,0 @@
-/*
- * Copyright 2012 University of Chicago
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.griphyn.vdl.karajan.lib;
-
-import org.apache.log4j.Logger;
-import org.globus.cog.karajan.arguments.Arg;
-import org.globus.cog.karajan.stack.VariableStack;
-import org.globus.cog.karajan.util.Identifier;
-import org.globus.cog.karajan.util.KarajanIterator;
-import org.globus.cog.karajan.util.ThreadingContext;
-import org.globus.cog.karajan.workflow.ExecutionException;
-import org.globus.cog.karajan.workflow.futures.FutureFault;
-import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
-import org.globus.cog.karajan.workflow.nodes.AbstractUParallelIterator;
-import org.globus.cog.karajan.workflow.nodes.FlowNode;
-import org.globus.cog.karajan.workflow.nodes.FlowNode.FNTP;
-
-public class SelfCloseParallelFor extends AbstractUParallelIterator {
-	public static final Logger logger = Logger
-			.getLogger(SelfCloseParallelFor.class);
-	
-	public static final Arg A_NAME = new Arg.Positional("name");
-	public static final Arg A_IN = new Arg.Positional("in");
-	public static final Arg A_SELF_CLOSE = new Arg.Optional("selfclose", Boolean.FALSE);
-
-	static {
-		setArguments(SelfCloseParallelFor.class, new Arg[] { A_NAME, A_IN, A_SELF_CLOSE });
-	}
-
-    protected void partialArgumentsEvaluated(VariableStack stack)
-            throws ExecutionException {
-        stack.setVar("selfclose", A_SELF_CLOSE.getValue(stack));
-        super.partialArgumentsEvaluated(stack);
-    }
-    
-    protected void citerate(VariableStack stack, Identifier var, KarajanIterator i)
-            throws ExecutionException {
-        try {
-            synchronized(stack.currentFrame()) {
-                while (i.hasNext()) {
-                    Object value = i.next();
-                    VariableStack copy = stack.copy();
-                    copy.enter();
-                    ThreadingContext.set(copy, ThreadingContext.get(copy).split(i.current()));
-                    setIndex(copy, getArgCount());
-                    setArgsDone(copy);
-                    copy.setVar(var.getName(), value);
-                    int r = preIncRunning(stack);
-                    startElement(getArgCount(), copy);
-                }
-            }
-            if (FlowNode.debug) {
-                threadTracker.remove(new FNTP(this, ThreadingContext.get(stack)));
-            }
-            // Now make sure all iterations have not completed
-            int left = preDecRunning(stack);
-            if (left == 0) {
-                complete(stack);
-            }
-        }
-        catch (FutureIteratorIncomplete fii) {
-            synchronized (stack.currentFrame()) {
-                // if this is defined, then resume from iterate
-                stack.setVar(ITERATOR, i);
-            }
-            fii.getFutureIterator().addModificationAction(this, stack);
-        }
-    }
-    
-    protected void iterationCompleted(VariableStack stack) throws ExecutionException {
-        stack.leave();
-        int running;
-        synchronized(stack.currentFrame()) {
-            running = preDecRunning(stack);
-            if (running == 1) {
-                KarajanIterator iter = (KarajanIterator) stack.currentFrame().getVar(ITERATOR);
-                if (stack.currentFrame().getVar("selfclose").equals(Boolean.TRUE)) {
-                    try {
-                        iter.hasNext();
-                    }
-                    catch (FutureFault f) {
-                        running = 0;
-                    }
-                }
-            }
-        }
-        if (running == 0) {
-            complete(stack);
-        }
-    }
-    		
-	@Override
-    public String getTextualName() {
-        return "foreach";
-	}
-}

Modified: trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java	2012-10-16 17:05:20 UTC (rev 5974)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java	2012-10-16 23:46:06 UTC (rev 5975)
@@ -30,7 +30,7 @@
 import org.griphyn.vdl.util.VDL2Config;
 
 public class Throttled extends Sequential {
-    public static final int DEFAULT_MAX_THREADS = 1024;
+    public static final int DEFAULT_MAX_THREADS = 1000000;
     
     private LinkedList<VariableStack> waiting;
     private int maxThreadCount, current;
@@ -38,7 +38,7 @@
     public Throttled() {
         try {
             maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
-                            .getProperty("foreach.max.threads", String.valueOf(DEFAULT_MAX_THREADS)));
+                            .getProperty("exec.throttle", String.valueOf(DEFAULT_MAX_THREADS)));
         }
         catch (IOException e) {
             maxThreadCount = DEFAULT_MAX_THREADS;

Added: trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java	2012-10-16 23:46:06 UTC (rev 5975)
@@ -0,0 +1,309 @@
+/*
+ * Copyright 2012 University of Chicago
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.griphyn.vdl.karajan.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableNotFoundException;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.Identifier;
+import org.globus.cog.karajan.util.KarajanIterator;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.util.TypeUtil;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
+import org.globus.cog.karajan.workflow.futures.FutureFault;
+import org.globus.cog.karajan.workflow.futures.FutureIterator;
+import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
+import org.globus.cog.karajan.workflow.futures.FutureListener;
+import org.globus.cog.karajan.workflow.futures.ListenerStackPair;
+import org.globus.cog.karajan.workflow.nodes.AbstractParallelIterator;
+import org.griphyn.vdl.util.VDL2Config;
+
+public class ThrottledParallelFor extends AbstractParallelIterator {
+	public static final Logger logger = Logger
+			.getLogger(ThrottledParallelFor.class);
+	
+	public static final int DEFAULT_MAX_THREADS = 1024;
+
+	public static final Arg A_NAME = new Arg.Positional("name");
+	public static final Arg A_IN = new Arg.Positional("in");
+	public static final Arg A_SELF_CLOSE = new Arg.Optional("selfclose", Boolean.FALSE);
+
+	static {
+		setArguments(ThrottledParallelFor.class, new Arg[] { A_NAME, A_IN, A_SELF_CLOSE });
+	}
+
+	public static final String THREAD_COUNT = "#threadcount";
+
+	private int maxThreadCount = -1;
+
+    protected void partialArgumentsEvaluated(VariableStack stack)
+            throws ExecutionException {
+        stack.setVar("selfclose", A_SELF_CLOSE.getValue(stack));
+        super.partialArgumentsEvaluated(stack);
+    }
+
+    public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
+			throws ExecutionException {
+		if (elementCount() > 0) {
+			if (logger.isDebugEnabled()) {
+				logger.debug("iterateParallel: " + stack.parentFrame());
+			}
+			stack.setVar(VAR, var);
+			setChildFailed(stack, false);
+			stack.setCaller(this);
+			initThreadCount(stack, TypeUtil.toBoolean(stack.currentFrame().getVar("selfclose")), i);
+			stack.currentFrame().deleteVar("selfclose");
+			citerate(stack, var, i);
+		}
+		else {
+			complete(stack);
+		}
+	}
+
+	protected void citerate(VariableStack stack, Identifier var,
+			KarajanIterator i) throws ExecutionException {
+		ThreadCount tc = getThreadCount(stack);
+		
+		// we can bulk operations at the start to avoid contention
+		// on the counter since at least as many
+		// threads as reported by available() are available
+		int available = tc.available();
+		try {
+		    int j = 0;
+		    try {
+    		    for (; j < available && i.hasNext(); j++) {
+    		        VariableStack copy = stack.copy();
+                    copy.enter();
+                    ThreadingContext.set(copy, ThreadingContext.get(copy).split(
+                            i.current()));
+                    setIndex(copy, getArgCount());
+                    setArgsDone(copy);
+                    copy.setVar(var.getName(), i.next());
+                    startElement(getArgCount(), copy);
+    		    }
+		    }
+		    finally {
+		        tc.add(j);
+		    }
+			while (i.hasNext()) {
+				Object value = tc.tryIncrement();
+				VariableStack copy = stack.copy();
+				copy.enter();
+				ThreadingContext.set(copy, ThreadingContext.get(copy).split(
+						i.current()));
+				setIndex(copy, getArgCount());
+				setArgsDone(copy);
+				copy.setVar(var.getName(), value);
+				startElement(getArgCount(), copy);
+			}
+			
+			int left;
+			synchronized(tc) {
+			    // can only have closed and running = 0 in one place
+			    tc.close();
+			    left = tc.current();
+			}
+			if (left == 0) {
+				complete(stack);
+			}
+		}
+		catch (FutureIteratorIncomplete fii) {
+			synchronized (stack.currentFrame()) {
+                stack.setVar(ITERATOR, i);
+            }
+            fii.getFutureIterator().addModificationAction(this, stack);
+		}
+	}
+	
+	public void failed(VariableStack stack, ExecutionException e) throws ExecutionException {
+        if (!testAndSetChildFailed(stack)) {
+            if (stack.parentFrame().isDefined(VAR)) {
+                stack.leave();
+            }
+            failImmediately(stack, e);
+        }
+    }
+
+	protected void iterationCompleted(VariableStack stack)
+			throws ExecutionException {
+		stack.leave();
+		ThreadCount tc = getThreadCount(stack);
+		int running;
+		boolean closed;
+		boolean iteratorHasValues;
+		synchronized(tc) {
+		    closed = tc.isClosed();
+		    running = tc.decrement();
+		    iteratorHasValues = !tc.selfClose || tc.iteratorHasValues();
+		}
+		if (running == 0) {
+		    if (closed || !iteratorHasValues) {
+		        complete(stack);
+		    }
+		}
+	}
+
+	private void initThreadCount(VariableStack stack, boolean selfClose, KarajanIterator i) {
+		if (maxThreadCount < 0) {
+			try {
+				maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
+						.getProperty("foreach.max.threads", String.valueOf(DEFAULT_MAX_THREADS)));
+			}
+			catch (IOException e) {
+				maxThreadCount = DEFAULT_MAX_THREADS;
+			}
+		}
+		stack.setVar(THREAD_COUNT, new ThreadCount(maxThreadCount, selfClose, i));
+	}
+
+	private ThreadCount getThreadCount(VariableStack stack)
+			throws VariableNotFoundException {
+		return (ThreadCount) stack.getVar(THREAD_COUNT);
+	}
+	
+	@Override
+    public String getTextualName() {
+        return "foreach";
+    }
+
+    private static class ThreadCount implements FutureIterator {
+		private int maxThreadCount;
+		private int crt;
+		private boolean selfClose, closed;
+		private List<ListenerStackPair> listeners;
+		private KarajanIterator i;
+
+		public ThreadCount(int maxThreadCount, boolean selfClose, KarajanIterator i) {
+			this.maxThreadCount = maxThreadCount;
+			this.selfClose = selfClose;
+			this.i = i;
+			crt = 0;
+		}
+		
+		public boolean iteratorHasValues() {
+            try {
+                return i.hasNext();
+            }
+            catch (FutureFault e) {
+                return false;
+            }
+        }
+
+        public boolean isSelfClose() {
+		    return selfClose;
+		}
+        
+        public synchronized int available() {
+            return maxThreadCount - crt;
+        }
+        
+        public synchronized void add(int count) {
+            crt += count;
+        }
+
+		public synchronized Object tryIncrement() {
+		    // there is no way that both crt == 0 and i has no values outside this critical section
+			if (crt < maxThreadCount) {
+				Object o = i.next();
+				crt++;
+				return o;
+			}
+			else {
+				throw new FutureIteratorIncomplete(this, this);
+			}
+		}
+
+		public synchronized int decrement() {
+			crt--;
+			notifyListeners();
+			return crt;
+		}
+
+		private void notifyListeners() {
+			if (listeners != null) {
+				Iterator<ListenerStackPair> i = listeners.iterator();
+				listeners = null;
+				while (i.hasNext()) {
+					ListenerStackPair etp = i.next();
+					i.remove();
+					etp.listener.futureModified(this, etp.stack);
+				}
+			}
+		}
+
+		public boolean hasAvailable() {
+			return false;
+		}
+
+		public int count() {
+			return 0;
+		}
+
+		public synchronized int current() {
+			return crt;
+		}
+
+		public Object peek() {
+			return null;
+		}
+
+		public boolean hasNext() {
+			return false;
+		}
+
+		public Object next() {
+			return null;
+		}
+
+		public void remove() {
+		}
+
+		public synchronized void addModificationAction(FutureListener target,
+				VariableStack stack) {
+			if (listeners == null) {
+				listeners = new ArrayList<ListenerStackPair>();
+			}
+			listeners.add(new ListenerStackPair(target, stack));
+			if (crt < maxThreadCount) {
+				notifyListeners();
+			}
+		}
+
+		public synchronized void close() {
+		    this.closed = true;
+		}
+
+		public void fail(FutureEvaluationException e) {
+		}
+
+		public Object getValue() {
+			return null;
+		}
+
+		public synchronized boolean isClosed() {
+		    return closed;
+		}
+	}
+}




More information about the Swift-commit mailing list