[Swift-commit] r2671 - trunk/src/org/griphyn/vdl/karajan/lib
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Mon Mar 9 12:18:21 CDT 2009
Author: hategan
Date: 2009-03-09 12:18:20 -0500 (Mon, 09 Mar 2009)
New Revision: 2671
Added:
trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
Log:
forgot the essential part
Added: trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java 2009-03-09 17:18:20 UTC (rev 2671)
@@ -0,0 +1,222 @@
+// ----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Mar 21, 2005
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableNotFoundException;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.Identifier;
+import org.globus.cog.karajan.util.KarajanIterator;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.util.TypeUtil;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.events.Event;
+import org.globus.cog.karajan.workflow.events.EventBus;
+import org.globus.cog.karajan.workflow.events.EventListener;
+import org.globus.cog.karajan.workflow.events.EventTargetPair;
+import org.globus.cog.karajan.workflow.events.FutureNotificationEvent;
+import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
+import org.globus.cog.karajan.workflow.futures.FutureIterator;
+import org.globus.cog.karajan.workflow.futures.FutureIteratorIncomplete;
+import org.globus.cog.karajan.workflow.nodes.AbstractParallelIterator;
+import org.griphyn.vdl.util.VDL2Config;
+
+public class ThrottledParallelFor extends AbstractParallelIterator {
+ public static final Logger logger = Logger
+ .getLogger(ThrottledParallelFor.class);
+
+ public static final int DEFAULT_MAX_THREADS = 1024;
+
+ public static final Arg A_NAME = new Arg.Positional("name");
+ public static final Arg A_IN = new Arg.Positional("in");
+
+ static {
+ setArguments(ThrottledParallelFor.class, new Arg[] { A_NAME, A_IN });
+ }
+
+ public static final String THREAD_COUNT = "#threadcount";
+
+ private int maxThreadCount = -1;
+
+ public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
+ throws ExecutionException {
+ if (elementCount() > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("iterateParallel: " + stack.parentFrame());
+ }
+ stack.setVar(VAR, var);
+ setChildFailed(stack, false);
+ stack.setVar(CALLER, this);
+ initializeChannelBuffers(stack);
+ initThreadCount(stack);
+ citerate(stack, var, i);
+ }
+ else {
+ complete(stack);
+ }
+ }
+
+ protected void citerate(VariableStack stack, Identifier var,
+ KarajanIterator i) throws ExecutionException {
+ ThreadCount tc = getThreadCount(stack);
+ try {
+ while (i.hasNext()) {
+ Object value = tc.tryIncrement(i);
+ VariableStack copy = stack.copy();
+ copy.enter();
+ ThreadingContext.set(copy, ThreadingContext.get(copy).split(
+ i.current()));
+ setIndex(copy, getArgCount());
+ setArgsDone(copy);
+ copy.setVar(var.getName(), value);
+ addChannelBuffers(copy);
+ startElement(getArgCount(), copy);
+ }
+
+ int left = tc.decrement();
+ if (left == 0) {
+ complete(stack);
+ }
+ }
+ catch (FutureIteratorIncomplete fii) {
+ stack.setVar(ITERATOR, i);
+ fii.getFutureIterator().addModificationAction(
+ this,
+ new FutureNotificationEvent(ITERATE, this, fii
+ .getFutureIterator(), stack));
+ }
+ }
+
+ protected void iterationCompleted(VariableStack stack)
+ throws ExecutionException {
+ closeBuffers(stack);
+ stack.leave();
+ ThreadCount tc = getThreadCount(stack);
+ int running = tc.decrement();
+ if (running == 0) {
+ complete(stack);
+ }
+ }
+
+ private void initThreadCount(VariableStack stack) {
+ if (maxThreadCount < 0) {
+ try {
+ maxThreadCount = TypeUtil.toInt(VDL2Config.getConfig()
+ .getProperty("foreach.max.threads", String.valueOf(DEFAULT_MAX_THREADS)));
+ }
+ catch (IOException e) {
+ maxThreadCount = DEFAULT_MAX_THREADS;
+ }
+ }
+ stack.setVar(THREAD_COUNT, new ThreadCount(maxThreadCount));
+ }
+
+ private ThreadCount getThreadCount(VariableStack stack)
+ throws VariableNotFoundException {
+ return (ThreadCount) stack.getVar(THREAD_COUNT);
+ }
+
+ private static class ThreadCount implements FutureIterator {
+ private int maxThreadCount;
+ private int crt;
+ private List listeners;
+
+ public ThreadCount(int maxThreadCount) {
+ this.maxThreadCount = maxThreadCount;
+ crt = 1;
+ }
+
+ public synchronized Object tryIncrement(KarajanIterator i) {
+ if (crt < maxThreadCount) {
+ Object o = i.next();
+ crt++;
+ return o;
+ }
+ else {
+ throw new FutureIteratorIncomplete(this, this);
+ }
+ }
+
+ public synchronized int decrement() {
+ crt--;
+ notifyListeners();
+ return crt;
+ }
+
+ private void notifyListeners() {
+ if (listeners != null) {
+ Iterator i = listeners.iterator();
+ while (i.hasNext()) {
+ EventTargetPair etp = (EventTargetPair) i.next();
+ i.remove();
+ EventBus.post(etp.getTarget(), etp.getEvent());
+ }
+ }
+ }
+
+ public boolean hasAvailable() {
+ return false;
+ }
+
+ public int count() {
+ return 0;
+ }
+
+ public int current() {
+ return 0;
+ }
+
+ public Object peek() {
+ return null;
+ }
+
+ public boolean hasNext() {
+ return false;
+ }
+
+ public Object next() {
+ return null;
+ }
+
+ public void remove() {
+ }
+
+ public synchronized void addModificationAction(EventListener target,
+ Event event) {
+ if (listeners == null) {
+ listeners = new ArrayList();
+ }
+ listeners.add(new EventTargetPair(event, target));
+ if (crt < maxThreadCount) {
+ notifyListeners();
+ }
+ }
+
+ public void close() {
+ }
+
+ public void fail(FutureEvaluationException e) {
+ }
+
+ public Object getValue() throws ExecutionException {
+ return null;
+ }
+
+ public boolean isClosed() {
+ return false;
+ }
+ }
+}
\ No newline at end of file
More information about the Swift-commit
mailing list