[Swift-commit] r5965 - in trunk: libexec src/org/griphyn/vdl/karajan/lib
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Sun Oct 14 22:37:49 CDT 2012
Author: hategan
Date: 2012-10-14 22:37:49 -0500 (Sun, 14 Oct 2012)
New Revision: 5965
Added:
trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java
trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java
Removed:
trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
Modified:
trunk/libexec/execute-default.k
trunk/libexec/vdl-lib.xml
Log:
removed throttling from foreach and added throttling to app invocation
Modified: trunk/libexec/execute-default.k
===================================================================
--- trunk/libexec/execute-default.k 2012-10-15 03:35:17 UTC (rev 5964)
+++ trunk/libexec/execute-default.k 2012-10-15 03:37:49 UTC (rev 5965)
@@ -10,7 +10,7 @@
derr == false then(
if(
sys:not(done) try(
- sequential(
+ throttled(
log(LOG:INFO, "START thread={#thread} tr={tr}")
vdl:setprogress("Selecting site")
restartOnError(".*", vdl:configProperty("execution.retries"),
Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml 2012-10-15 03:35:17 UTC (rev 5964)
+++ trunk/libexec/vdl-lib.xml 2012-10-15 03:37:49 UTC (rev 5965)
@@ -124,7 +124,8 @@
<export name="execute"><elementDef classname="org.griphyn.vdl.karajan.lib.Execute"/></export>
<export name="expandArguments"><elementDef classname="org.griphyn.vdl.karajan.lib.ExpandArguments"/></export>
- <export name="tparallelFor"><elementDef classname="org.griphyn.vdl.karajan.lib.ThrottledParallelFor"/></export>
+ <export name="tparallelFor"><elementDef classname="org.griphyn.vdl.karajan.lib.SelfCloseParallelFor"/></export>
+ <export name="throttled"><elementDef classname="org.griphyn.vdl.karajan.lib.Throttled"/></export>
<export name="appStageins"><elementDef classname="org.griphyn.vdl.karajan.lib.AppStageins"/></export>
<export name="appStageouts"><elementDef classname="org.griphyn.vdl.karajan.lib.AppStageouts"/></export>
Added: trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SelfCloseParallelFor.java 2012-10-15 03:37:49 UTC (rev 5965)
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2012 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.griphyn.vdl.karajan.lib;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.Identifier;
+import org.globus.cog.karajan.util.KarajanIterator;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.futures.FutureFault;
+import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
+import org.globus.cog.karajan.workflow.nodes.AbstractUParallelIterator;
+import org.globus.cog.karajan.workflow.nodes.FlowNode;
+import org.globus.cog.karajan.workflow.nodes.FlowNode.FNTP;
+
+public class SelfCloseParallelFor extends AbstractUParallelIterator {
+ public static final Logger logger = Logger
+ .getLogger(SelfCloseParallelFor.class);
+
+ public static final Arg A_NAME = new Arg.Positional("name");
+ public static final Arg A_IN = new Arg.Positional("in");
+ public static final Arg A_SELF_CLOSE = new Arg.Optional("selfclose", Boolean.FALSE);
+
+ static {
+ setArguments(SelfCloseParallelFor.class, new Arg[] { A_NAME, A_IN, A_SELF_CLOSE });
+ }
+
+ protected void partialArgumentsEvaluated(VariableStack stack)
+ throws ExecutionException {
+ stack.setVar("selfclose", A_SELF_CLOSE.getValue(stack));
+ super.partialArgumentsEvaluated(stack);
+ }
+
+ protected void citerate(VariableStack stack, Identifier var, KarajanIterator i)
+ throws ExecutionException {
+ try {
+ synchronized(stack.currentFrame()) {
+ while (i.hasNext()) {
+ Object value = i.next();
+ VariableStack copy = stack.copy();
+ copy.enter();
+ ThreadingContext.set(copy, ThreadingContext.get(copy).split(i.current()));
+ setIndex(copy, getArgCount());
+ setArgsDone(copy);
+ copy.setVar(var.getName(), value);
+ int r = preIncRunning(stack);
+ startElement(getArgCount(), copy);
+ }
+ }
+ if (FlowNode.debug) {
+ threadTracker.remove(new FNTP(this, ThreadingContext.get(stack)));
+ }
+ // Now make sure all iterations have not completed
+ int left = preDecRunning(stack);
+ if (left == 0) {
+ complete(stack);
+ }
+ }
+ catch (FutureIteratorIncomplete fii) {
+ synchronized (stack.currentFrame()) {
+ // if this is defined, then resume from iterate
+ stack.setVar(ITERATOR, i);
+ }
+ fii.getFutureIterator().addModificationAction(this, stack);
+ }
+ }
+
+ protected void iterationCompleted(VariableStack stack) throws ExecutionException {
+ stack.leave();
+ int running;
+ synchronized(stack.currentFrame()) {
+ running = preDecRunning(stack);
+ if (running == 1) {
+ KarajanIterator iter = (KarajanIterator) stack.currentFrame().getVar(ITERATOR);
+ if (stack.currentFrame().getVar("selfclose").equals(Boolean.TRUE)) {
+ try {
+ iter.hasNext();
+ }
+ catch (FutureFault f) {
+ running = 0;
+ }
+ }
+ }
+ }
+ if (running == 0) {
+ complete(stack);
+ }
+ }
+
+ @Override
+ public String getTextualName() {
+ return "foreach";
+ }
+}
Added: trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Throttled.java 2012-10-15 03:37:49 UTC (rev 5965)
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2012 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Created on Oct 14, 2012
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.TypeUtil;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.nodes.Sequential;
+import org.griphyn.vdl.util.VDL2Config;
+
+public class Throttled extends Sequential {
+ public static final int DEFAULT_MAX_THREADS = 1024;
+
+ private LinkedList<VariableStack> waiting;
+ private int maxThreadCount, current;
+
+ public Throttled() {
+ try {
+ maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
+ .getProperty("foreach.max.threads", String.valueOf(DEFAULT_MAX_THREADS)));
+ }
+ catch (IOException e) {
+ maxThreadCount = DEFAULT_MAX_THREADS;
+ }
+ current = 0;
+ waiting = new LinkedList<VariableStack>();
+ }
+
+ @Override
+ protected void executeChildren(VariableStack stack)
+ throws ExecutionException {
+ synchronized(this) {
+ if (current == maxThreadCount) {
+ waiting.addLast(stack);
+ return;
+ }
+ else {
+ current++;
+ }
+ }
+ super.executeChildren(stack);
+ }
+
+ @Override
+ protected void post(VariableStack stack) throws ExecutionException {
+ synchronized(this) {
+ if (!waiting.isEmpty()) {
+ super.executeChildren(waiting.removeFirst());
+ }
+ else {
+ current--;
+ }
+ }
+ super.post(stack);
+ }
+}
Deleted: trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java 2012-10-15 03:35:17 UTC (rev 5964)
+++ trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java 2012-10-15 03:37:49 UTC (rev 5965)
@@ -1,309 +0,0 @@
-/*
- * Copyright 2012 University of Chicago
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.griphyn.vdl.karajan.lib;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.globus.cog.karajan.arguments.Arg;
-import org.globus.cog.karajan.stack.VariableNotFoundException;
-import org.globus.cog.karajan.stack.VariableStack;
-import org.globus.cog.karajan.util.Identifier;
-import org.globus.cog.karajan.util.KarajanIterator;
-import org.globus.cog.karajan.util.ThreadingContext;
-import org.globus.cog.karajan.util.TypeUtil;
-import org.globus.cog.karajan.workflow.ExecutionException;
-import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
-import org.globus.cog.karajan.workflow.futures.FutureFault;
-import org.globus.cog.karajan.workflow.futures.FutureIterator;
-import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
-import org.globus.cog.karajan.workflow.futures.FutureListener;
-import org.globus.cog.karajan.workflow.futures.ListenerStackPair;
-import org.globus.cog.karajan.workflow.nodes.AbstractParallelIterator;
-import org.griphyn.vdl.util.VDL2Config;
-
-public class ThrottledParallelFor extends AbstractParallelIterator {
- public static final Logger logger = Logger
- .getLogger(ThrottledParallelFor.class);
-
- public static final int DEFAULT_MAX_THREADS = 1024;
-
- public static final Arg A_NAME = new Arg.Positional("name");
- public static final Arg A_IN = new Arg.Positional("in");
- public static final Arg A_SELF_CLOSE = new Arg.Optional("selfclose", Boolean.FALSE);
-
- static {
- setArguments(ThrottledParallelFor.class, new Arg[] { A_NAME, A_IN, A_SELF_CLOSE });
- }
-
- public static final String THREAD_COUNT = "#threadcount";
-
- private int maxThreadCount = -1;
-
- protected void partialArgumentsEvaluated(VariableStack stack)
- throws ExecutionException {
- stack.setVar("selfclose", A_SELF_CLOSE.getValue(stack));
- super.partialArgumentsEvaluated(stack);
- }
-
- public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
- throws ExecutionException {
- if (elementCount() > 0) {
- if (logger.isDebugEnabled()) {
- logger.debug("iterateParallel: " + stack.parentFrame());
- }
- stack.setVar(VAR, var);
- setChildFailed(stack, false);
- stack.setCaller(this);
- initThreadCount(stack, TypeUtil.toBoolean(stack.currentFrame().getVar("selfclose")), i);
- stack.currentFrame().deleteVar("selfclose");
- citerate(stack, var, i);
- }
- else {
- complete(stack);
- }
- }
-
- protected void citerate(VariableStack stack, Identifier var,
- KarajanIterator i) throws ExecutionException {
- ThreadCount tc = getThreadCount(stack);
-
- // we can bulk operations at the start to avoid contention
- // on the counter since at least as many
- // threads as reported by available() are available
- int available = tc.available();
- try {
- int j = 0;
- try {
- for (; j < available && i.hasNext(); j++) {
- VariableStack copy = stack.copy();
- copy.enter();
- ThreadingContext.set(copy, ThreadingContext.get(copy).split(
- i.current()));
- setIndex(copy, getArgCount());
- setArgsDone(copy);
- copy.setVar(var.getName(), i.next());
- startElement(getArgCount(), copy);
- }
- }
- finally {
- tc.add(j);
- }
- while (i.hasNext()) {
- Object value = tc.tryIncrement();
- VariableStack copy = stack.copy();
- copy.enter();
- ThreadingContext.set(copy, ThreadingContext.get(copy).split(
- i.current()));
- setIndex(copy, getArgCount());
- setArgsDone(copy);
- copy.setVar(var.getName(), value);
- startElement(getArgCount(), copy);
- }
-
- int left;
- synchronized(tc) {
- // can only have closed and running = 0 in one place
- tc.close();
- left = tc.current();
- }
- if (left == 0) {
- complete(stack);
- }
- }
- catch (FutureIteratorIncomplete fii) {
- synchronized (stack.currentFrame()) {
- stack.setVar(ITERATOR, i);
- }
- fii.getFutureIterator().addModificationAction(this, stack);
- }
- }
-
- public void failed(VariableStack stack, ExecutionException e) throws ExecutionException {
- if (!testAndSetChildFailed(stack)) {
- if (stack.parentFrame().isDefined(VAR)) {
- stack.leave();
- }
- failImmediately(stack, e);
- }
- }
-
- protected void iterationCompleted(VariableStack stack)
- throws ExecutionException {
- stack.leave();
- ThreadCount tc = getThreadCount(stack);
- int running;
- boolean closed;
- boolean iteratorHasValues;
- synchronized(tc) {
- closed = tc.isClosed();
- running = tc.decrement();
- iteratorHasValues = !tc.selfClose || tc.iteratorHasValues();
- }
- if (running == 0) {
- if (closed || !iteratorHasValues) {
- complete(stack);
- }
- }
- }
-
- private void initThreadCount(VariableStack stack, boolean selfClose, KarajanIterator i) {
- if (maxThreadCount < 0) {
- try {
- maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
- .getProperty("foreach.max.threads", String.valueOf(DEFAULT_MAX_THREADS)));
- }
- catch (IOException e) {
- maxThreadCount = DEFAULT_MAX_THREADS;
- }
- }
- stack.setVar(THREAD_COUNT, new ThreadCount(maxThreadCount, selfClose, i));
- }
-
- private ThreadCount getThreadCount(VariableStack stack)
- throws VariableNotFoundException {
- return (ThreadCount) stack.getVar(THREAD_COUNT);
- }
-
- @Override
- public String getTextualName() {
- return "foreach";
- }
-
- private static class ThreadCount implements FutureIterator {
- private int maxThreadCount;
- private int crt;
- private boolean selfClose, closed;
- private List<ListenerStackPair> listeners;
- private KarajanIterator i;
-
- public ThreadCount(int maxThreadCount, boolean selfClose, KarajanIterator i) {
- this.maxThreadCount = maxThreadCount;
- this.selfClose = selfClose;
- this.i = i;
- crt = 0;
- }
-
- public boolean iteratorHasValues() {
- try {
- return i.hasNext();
- }
- catch (FutureFault e) {
- return false;
- }
- }
-
- public boolean isSelfClose() {
- return selfClose;
- }
-
- public synchronized int available() {
- return maxThreadCount - crt;
- }
-
- public synchronized void add(int count) {
- crt += count;
- }
-
- public synchronized Object tryIncrement() {
- // there is no way that both crt == 0 and i has no values outside this critical section
- if (crt < maxThreadCount) {
- Object o = i.next();
- crt++;
- return o;
- }
- else {
- throw new FutureIteratorIncomplete(this, this);
- }
- }
-
- public synchronized int decrement() {
- crt--;
- notifyListeners();
- return crt;
- }
-
- private void notifyListeners() {
- if (listeners != null) {
- Iterator<ListenerStackPair> i = listeners.iterator();
- listeners = null;
- while (i.hasNext()) {
- ListenerStackPair etp = i.next();
- i.remove();
- etp.listener.futureModified(this, etp.stack);
- }
- }
- }
-
- public boolean hasAvailable() {
- return false;
- }
-
- public int count() {
- return 0;
- }
-
- public synchronized int current() {
- return crt;
- }
-
- public Object peek() {
- return null;
- }
-
- public boolean hasNext() {
- return false;
- }
-
- public Object next() {
- return null;
- }
-
- public void remove() {
- }
-
- public synchronized void addModificationAction(FutureListener target,
- VariableStack stack) {
- if (listeners == null) {
- listeners = new ArrayList<ListenerStackPair>();
- }
- listeners.add(new ListenerStackPair(target, stack));
- if (crt < maxThreadCount) {
- notifyListeners();
- }
- }
-
- public synchronized void close() {
- this.closed = true;
- }
-
- public void fail(FutureEvaluationException e) {
- }
-
- public Object getValue() {
- return null;
- }
-
- public synchronized boolean isClosed() {
- return closed;
- }
- }
-}
More information about the Swift-commit
mailing list