[Swift-commit] r5965 - in trunk: libexec src/org/griphyn/vdl/karajan/lib

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Sun Oct 14 22:37:49 CDT 2012


Author: hategan
Date: 2012-10-14 22:37:49 -0500 (Sun, 14 Oct 2012)
New Revision: 5965

Added:
   trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java
   trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java
Removed:
   trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
Modified:
   trunk/libexec/execute-default.k
   trunk/libexec/vdl-lib.xml
Log:
removed throttling from foreach and added throttling to app invocation

Modified: trunk/libexec/execute-default.k
===================================================================
--- trunk/libexec/execute-default.k	2012-10-15 03:35:17 UTC (rev 5964)
+++ trunk/libexec/execute-default.k	2012-10-15 03:37:49 UTC (rev 5965)
@@ -10,7 +10,7 @@
 		derr == false then(
 			if(
 				sys:not(done) try(
-					sequential(
+					throttled(
 						log(LOG:INFO, "START thread={#thread} tr={tr}")
 						vdl:setprogress("Selecting site")
 						restartOnError(".*", vdl:configProperty("execution.retries"),

Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml	2012-10-15 03:35:17 UTC (rev 5964)
+++ trunk/libexec/vdl-lib.xml	2012-10-15 03:37:49 UTC (rev 5965)
@@ -124,7 +124,8 @@
 	<export name="execute"><elementDef classname="org.griphyn.vdl.karajan.lib.Execute"/></export>
 	<export name="expandArguments"><elementDef classname="org.griphyn.vdl.karajan.lib.ExpandArguments"/></export>
 
-	<export name="tparallelFor"><elementDef classname="org.griphyn.vdl.karajan.lib.ThrottledParallelFor"/></export>
+	<export name="tparallelFor"><elementDef classname="org.griphyn.vdl.karajan.lib.SelfCloseParallelFor"/></export>
+	<export name="throttled"><elementDef classname="org.griphyn.vdl.karajan.lib.Throttled"/></export>
 
 	<export name="appStageins"><elementDef classname="org.griphyn.vdl.karajan.lib.AppStageins"/></export>
 	<export name="appStageouts"><elementDef classname="org.griphyn.vdl.karajan.lib.AppStageouts"/></export>

Added: trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java	2012-10-15 03:37:49 UTC (rev 5965)
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2012 University of Chicago
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.griphyn.vdl.karajan.lib;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.Identifier;
+import org.globus.cog.karajan.util.KarajanIterator;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.futures.FutureFault;
+import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
+import org.globus.cog.karajan.workflow.nodes.AbstractUParallelIterator;
+import org.globus.cog.karajan.workflow.nodes.FlowNode;
+import org.globus.cog.karajan.workflow.nodes.FlowNode.FNTP;
+
+public class SelfCloseParallelFor extends AbstractUParallelIterator {
+	public static final Logger logger = Logger
+			.getLogger(SelfCloseParallelFor.class);
+	
+	public static final Arg A_NAME = new Arg.Positional("name");
+	public static final Arg A_IN = new Arg.Positional("in");
+	public static final Arg A_SELF_CLOSE = new Arg.Optional("selfclose", Boolean.FALSE);
+
+	static {
+		setArguments(SelfCloseParallelFor.class, new Arg[] { A_NAME, A_IN, A_SELF_CLOSE });
+	}
+
+    protected void partialArgumentsEvaluated(VariableStack stack)
+            throws ExecutionException {
+        stack.setVar("selfclose", A_SELF_CLOSE.getValue(stack));
+        super.partialArgumentsEvaluated(stack);
+    }
+    
+    protected void citerate(VariableStack stack, Identifier var, KarajanIterator i)
+            throws ExecutionException {
+        try {
+            synchronized(stack.currentFrame()) {
+                while (i.hasNext()) {
+                    Object value = i.next();
+                    VariableStack copy = stack.copy();
+                    copy.enter();
+                    ThreadingContext.set(copy, ThreadingContext.get(copy).split(i.current()));
+                    setIndex(copy, getArgCount());
+                    setArgsDone(copy);
+                    copy.setVar(var.getName(), value);
+                    int r = preIncRunning(stack);
+                    startElement(getArgCount(), copy);
+                }
+            }
+            if (FlowNode.debug) {
+                threadTracker.remove(new FNTP(this, ThreadingContext.get(stack)));
+            }
+            // Now make sure all iterations have not completed
+            int left = preDecRunning(stack);
+            if (left == 0) {
+                complete(stack);
+            }
+        }
+        catch (FutureIteratorIncomplete fii) {
+            synchronized (stack.currentFrame()) {
+                // if this is defined, then resume from iterate
+                stack.setVar(ITERATOR, i);
+            }
+            fii.getFutureIterator().addModificationAction(this, stack);
+        }
+    }
+    
+    protected void iterationCompleted(VariableStack stack) throws ExecutionException {
+        stack.leave();
+        int running;
+        synchronized(stack.currentFrame()) {
+            running = preDecRunning(stack);
+            if (running == 1) {
+                KarajanIterator iter = (KarajanIterator) stack.currentFrame().getVar(ITERATOR);
+                if (stack.currentFrame().getVar("selfclose").equals(Boolean.TRUE)) {
+                    try {
+                        iter.hasNext();
+                    }
+                    catch (FutureFault f) {
+                        running = 0;
+                    }
+                }
+            }
+        }
+        if (running == 0) {
+            complete(stack);
+        }
+    }
+    		
+	@Override
+    public String getTextualName() {
+        return "foreach";
+	}
+}

Added: trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java	2012-10-15 03:37:49 UTC (rev 5965)
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2012 University of Chicago
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Created on Oct 14, 2012
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.TypeUtil;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.nodes.Sequential;
+import org.griphyn.vdl.util.VDL2Config;
+
+public class Throttled extends Sequential {
+    public static final int DEFAULT_MAX_THREADS = 1024;
+    
+    private LinkedList<VariableStack> waiting;
+    private int maxThreadCount, current;
+    
+    public Throttled() {
+        try {
+            maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
+                            .getProperty("foreach.max.threads", String.valueOf(DEFAULT_MAX_THREADS)));
+        }
+        catch (IOException e) {
+            maxThreadCount = DEFAULT_MAX_THREADS;
+        }
+        current = 0;
+        waiting = new LinkedList<VariableStack>();
+    }
+
+    @Override
+    protected void executeChildren(VariableStack stack)
+            throws ExecutionException {
+        synchronized(this) {
+            if (current == maxThreadCount) {
+                waiting.addLast(stack);
+                return;
+            }
+            else {
+                current++;
+            }
+        }
+        super.executeChildren(stack);
+    }
+
+    @Override
+    protected void post(VariableStack stack) throws ExecutionException {
+        synchronized(this) {
+            if (!waiting.isEmpty()) {
+                super.executeChildren(waiting.removeFirst());
+            }
+            else {
+                current--;
+            }
+        }
+        super.post(stack);
+    }    
+}

Deleted: trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java	2012-10-15 03:35:17 UTC (rev 5964)
+++ trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java	2012-10-15 03:37:49 UTC (rev 5965)
@@ -1,309 +0,0 @@
-/*
- * Copyright 2012 University of Chicago
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.griphyn.vdl.karajan.lib;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.globus.cog.karajan.arguments.Arg;
-import org.globus.cog.karajan.stack.VariableNotFoundException;
-import org.globus.cog.karajan.stack.VariableStack;
-import org.globus.cog.karajan.util.Identifier;
-import org.globus.cog.karajan.util.KarajanIterator;
-import org.globus.cog.karajan.util.ThreadingContext;
-import org.globus.cog.karajan.util.TypeUtil;
-import org.globus.cog.karajan.workflow.ExecutionException;
-import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
-import org.globus.cog.karajan.workflow.futures.FutureFault;
-import org.globus.cog.karajan.workflow.futures.FutureIterator;
-import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
-import org.globus.cog.karajan.workflow.futures.FutureListener;
-import org.globus.cog.karajan.workflow.futures.ListenerStackPair;
-import org.globus.cog.karajan.workflow.nodes.AbstractParallelIterator;
-import org.griphyn.vdl.util.VDL2Config;
-
-public class ThrottledParallelFor extends AbstractParallelIterator {
-	public static final Logger logger = Logger
-			.getLogger(ThrottledParallelFor.class);
-	
-	public static final int DEFAULT_MAX_THREADS = 1024;
-
-	public static final Arg A_NAME = new Arg.Positional("name");
-	public static final Arg A_IN = new Arg.Positional("in");
-	public static final Arg A_SELF_CLOSE = new Arg.Optional("selfclose", Boolean.FALSE);
-
-	static {
-		setArguments(ThrottledParallelFor.class, new Arg[] { A_NAME, A_IN, A_SELF_CLOSE });
-	}
-
-	public static final String THREAD_COUNT = "#threadcount";
-
-	private int maxThreadCount = -1;
-
-    protected void partialArgumentsEvaluated(VariableStack stack)
-            throws ExecutionException {
-        stack.setVar("selfclose", A_SELF_CLOSE.getValue(stack));
-        super.partialArgumentsEvaluated(stack);
-    }
-
-    public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
-			throws ExecutionException {
-		if (elementCount() > 0) {
-			if (logger.isDebugEnabled()) {
-				logger.debug("iterateParallel: " + stack.parentFrame());
-			}
-			stack.setVar(VAR, var);
-			setChildFailed(stack, false);
-			stack.setCaller(this);
-			initThreadCount(stack, TypeUtil.toBoolean(stack.currentFrame().getVar("selfclose")), i);
-			stack.currentFrame().deleteVar("selfclose");
-			citerate(stack, var, i);
-		}
-		else {
-			complete(stack);
-		}
-	}
-
-	protected void citerate(VariableStack stack, Identifier var,
-			KarajanIterator i) throws ExecutionException {
-		ThreadCount tc = getThreadCount(stack);
-		
-		// we can bulk operations at the start to avoid contention
-		// on the counter since at least as many
-		// threads as reported by available() are available
-		int available = tc.available();
-		try {
-		    int j = 0;
-		    try {
-    		    for (; j < available && i.hasNext(); j++) {
-    		        VariableStack copy = stack.copy();
-                    copy.enter();
-                    ThreadingContext.set(copy, ThreadingContext.get(copy).split(
-                            i.current()));
-                    setIndex(copy, getArgCount());
-                    setArgsDone(copy);
-                    copy.setVar(var.getName(), i.next());
-                    startElement(getArgCount(), copy);
-    		    }
-		    }
-		    finally {
-		        tc.add(j);
-		    }
-			while (i.hasNext()) {
-				Object value = tc.tryIncrement();
-				VariableStack copy = stack.copy();
-				copy.enter();
-				ThreadingContext.set(copy, ThreadingContext.get(copy).split(
-						i.current()));
-				setIndex(copy, getArgCount());
-				setArgsDone(copy);
-				copy.setVar(var.getName(), value);
-				startElement(getArgCount(), copy);
-			}
-			
-			int left;
-			synchronized(tc) {
-			    // can only have closed and running = 0 in one place
-			    tc.close();
-			    left = tc.current();
-			}
-			if (left == 0) {
-				complete(stack);
-			}
-		}
-		catch (FutureIteratorIncomplete fii) {
-			synchronized (stack.currentFrame()) {
-                stack.setVar(ITERATOR, i);
-            }
-            fii.getFutureIterator().addModificationAction(this, stack);
-		}
-	}
-	
-	public void failed(VariableStack stack, ExecutionException e) throws ExecutionException {
-        if (!testAndSetChildFailed(stack)) {
-            if (stack.parentFrame().isDefined(VAR)) {
-                stack.leave();
-            }
-            failImmediately(stack, e);
-        }
-    }
-
-	protected void iterationCompleted(VariableStack stack)
-			throws ExecutionException {
-		stack.leave();
-		ThreadCount tc = getThreadCount(stack);
-		int running;
-		boolean closed;
-		boolean iteratorHasValues;
-		synchronized(tc) {
-		    closed = tc.isClosed();
-		    running = tc.decrement();
-		    iteratorHasValues = !tc.selfClose || tc.iteratorHasValues();
-		}
-		if (running == 0) {
-		    if (closed || !iteratorHasValues) {
-		        complete(stack);
-		    }
-		}
-	}
-
-	private void initThreadCount(VariableStack stack, boolean selfClose, KarajanIterator i) {
-		if (maxThreadCount < 0) {
-			try {
-				maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
-						.getProperty("foreach.max.threads", String.valueOf(DEFAULT_MAX_THREADS)));
-			}
-			catch (IOException e) {
-				maxThreadCount = DEFAULT_MAX_THREADS;
-			}
-		}
-		stack.setVar(THREAD_COUNT, new ThreadCount(maxThreadCount, selfClose, i));
-	}
-
-	private ThreadCount getThreadCount(VariableStack stack)
-			throws VariableNotFoundException {
-		return (ThreadCount) stack.getVar(THREAD_COUNT);
-	}
-	
-	@Override
-    public String getTextualName() {
-        return "foreach";
-    }
-
-    private static class ThreadCount implements FutureIterator {
-		private int maxThreadCount;
-		private int crt;
-		private boolean selfClose, closed;
-		private List<ListenerStackPair> listeners;
-		private KarajanIterator i;
-
-		public ThreadCount(int maxThreadCount, boolean selfClose, KarajanIterator i) {
-			this.maxThreadCount = maxThreadCount;
-			this.selfClose = selfClose;
-			this.i = i;
-			crt = 0;
-		}
-		
-		public boolean iteratorHasValues() {
-            try {
-                return i.hasNext();
-            }
-            catch (FutureFault e) {
-                return false;
-            }
-        }
-
-        public boolean isSelfClose() {
-		    return selfClose;
-		}
-        
-        public synchronized int available() {
-            return maxThreadCount - crt;
-        }
-        
-        public synchronized void add(int count) {
-            crt += count;
-        }
-
-		public synchronized Object tryIncrement() {
-		    // there is no way that both crt == 0 and i has no values outside this critical section
-			if (crt < maxThreadCount) {
-				Object o = i.next();
-				crt++;
-				return o;
-			}
-			else {
-				throw new FutureIteratorIncomplete(this, this);
-			}
-		}
-
-		public synchronized int decrement() {
-			crt--;
-			notifyListeners();
-			return crt;
-		}
-
-		private void notifyListeners() {
-			if (listeners != null) {
-				Iterator<ListenerStackPair> i = listeners.iterator();
-				listeners = null;
-				while (i.hasNext()) {
-					ListenerStackPair etp = i.next();
-					i.remove();
-					etp.listener.futureModified(this, etp.stack);
-				}
-			}
-		}
-
-		public boolean hasAvailable() {
-			return false;
-		}
-
-		public int count() {
-			return 0;
-		}
-
-		public synchronized int current() {
-			return crt;
-		}
-
-		public Object peek() {
-			return null;
-		}
-
-		public boolean hasNext() {
-			return false;
-		}
-
-		public Object next() {
-			return null;
-		}
-
-		public void remove() {
-		}
-
-		public synchronized void addModificationAction(FutureListener target,
-				VariableStack stack) {
-			if (listeners == null) {
-				listeners = new ArrayList<ListenerStackPair>();
-			}
-			listeners.add(new ListenerStackPair(target, stack));
-			if (crt < maxThreadCount) {
-				notifyListeners();
-			}
-		}
-
-		public synchronized void close() {
-		    this.closed = true;
-		}
-
-		public void fail(FutureEvaluationException e) {
-		}
-
-		public Object getValue() {
-			return null;
-		}
-
-		public synchronized boolean isClosed() {
-		    return closed;
-		}
-	}
-}




More information about the Swift-commit mailing list