[Swift-commit] r6253 - branches/faster/src/org/griphyn/vdl/karajan/lib
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Fri Feb 8 23:30:12 CST 2013
Author: hategan
Date: 2013-02-08 23:30:12 -0600 (Fri, 08 Feb 2013)
New Revision: 6253
Modified:
branches/faster/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
Log:
fixe self-close semantics in foreach
Modified: branches/faster/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
===================================================================
--- branches/faster/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java 2013-02-09 05:29:51 UTC (rev 6252)
+++ branches/faster/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java 2013-02-09 05:30:12 UTC (rev 6253)
@@ -23,7 +23,10 @@
import java.util.List;
import java.util.StringTokenizer;
+import k.rt.ConditionalYield;
import k.rt.ExecutionException;
+import k.rt.Future;
+import k.rt.FutureListener;
import k.rt.KRunnable;
import k.rt.Stack;
import k.thr.LWThread;
@@ -57,6 +60,8 @@
private ArgRef<String> _kvar;
private ArgRef<String> _vvar;
private ArgRef<String> _traceline;
+
+ private boolean sc;
@Override
protected Signature getSignature() {
@@ -111,6 +116,7 @@
}
forTracer = Tracer.getTracer(this, "FOREACH");
iterationTracer = Tracer.getTracer(this, "ITERATION");
+ sc = selfClose.getValue();
return super.compileBody(w, argScope, scope);
}
@@ -152,7 +158,7 @@
protected void runBody(final LWThread thr) {
int i = thr.checkSliceAndPopState();
Iterator<Object> it = (Iterator<Object>) thr.popState();
- ThrottledThreadSet ts = (ThrottledThreadSet) thr.popState();
+ TPFThreadSet ts = (TPFThreadSet) thr.popState();
int fc = thr.popIntState();
List<RefCount> drefs = (List<RefCount>) thr.popState();
Stack stack = thr.getStack();
@@ -160,7 +166,13 @@
switch(i) {
case 0:
it = in.getValue(stack).iterator();
- ts = new ThrottledThreadSet(getMaxThreads());
+ if (sc) {
+ ts = new TPFSCThreadSet(it, getMaxThreads());
+ }
+ else {
+ ts = new TPFThreadSet(it, getMaxThreads());
+ }
+
drefs = buildRefs(stack);
ts.lock();
fc = stack.frameCount() + 1;
@@ -175,8 +187,8 @@
ts.checkFailed();
- startBulk(thr, ts, it, fc, drefs);
- startRest(thr, ts, it, fc, drefs);
+ startBulk(thr, ts, fc, drefs);
+ startRest(thr, ts, fc, drefs);
ts.unlock();
decRefs(drefs);
@@ -195,12 +207,12 @@
}
}
- private boolean startBulk(LWThread thr, ThrottledThreadSet ts, Iterator<Object> it, int fcf, List<RefCount> refs) {
+ private boolean startBulk(LWThread thr, TPFThreadSet ts, int fcf, List<RefCount> refs) {
int available = ts.freeSlots();
int j = 0;
Stack stack = thr.getStack();
- for (; j < available && it.hasNext(); j++) {
- if (startOne(thr, ts, it.next(), fcf, refs)) {
+ for (; j < available && ts.hasNext(); j++) {
+ if (startOne(thr, ts, ts.next(), fcf, refs)) {
// aborted
return true;
}
@@ -208,17 +220,27 @@
return false;
}
- private boolean startRest(LWThread thr, ThrottledThreadSet ts, Iterator<Object> it, int fcf, List<RefCount> refs) {
+ private boolean startRest(LWThread thr, TPFThreadSet ts, int fcf, List<RefCount> refs) {
Stack stack = thr.getStack();
- while (it.hasNext()) {
+ while (ts.hasNext()) {
ts.waitForSlot();
- if (startOne(thr, ts, it.next(), fcf, refs)) {
+ if (startOne(thr, ts, ts.next(), fcf, refs)) {
return true;
}
}
return false;
}
+ private boolean iteratorHasValues(Iterator<Object> it) {
+ try {
+ it.hasNext();
+ return true;
+ }
+ catch (ConditionalYield y) {
+ return false;
+ }
+ }
+
private boolean startOne(final LWThread thr, final ThreadSet ts, final Object value, final int fcf, List<RefCount> refs) {
incRefs(refs);
LWThread ct = thr.fork(new KRunnable() {
@@ -307,4 +329,99 @@
public String getTextualName() {
return "foreach";
}
+
+ private static class TPFThreadSet extends ThrottledThreadSet {
+ protected final Iterator<Object> it;
+
+ public TPFThreadSet(Iterator<Object> it, int maxThreads) {
+ super(maxThreads);
+ this.it = it;
+ }
+
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ public Object next() {
+ return it.next();
+ }
+ }
+
+ private static class TPFSCThreadSet extends TPFThreadSet implements FutureListener {
+ private Helper helper;
+ private boolean closed;
+
+ public TPFSCThreadSet(Iterator<Object> it, int maxThreads) {
+ super(it, maxThreads);
+ helper = new Helper();
+ }
+
+ public synchronized boolean hasNext() {
+ if (closed) {
+ return false;
+ }
+ try {
+ return it.hasNext();
+ }
+ catch (ConditionalYield y) {
+ helper.resetItUpdated();
+ y.getFuture().addListener(this);
+ throw new ConditionalYield(helper);
+ }
+ }
+
+ @Override
+ public synchronized void threadDone(LWThread thr, ExecutionException e) {
+ super.threadDone(thr, e);
+ if (getRunning() == 1 && !iteratorHasValues()) {
+ closed = true;
+ helper.awake();
+ }
+ }
+
+ private boolean iteratorHasValues() {
+ try {
+ it.hasNext();
+ return true;
+ }
+ catch (Yield y) {
+ return false;
+ }
+ }
+
+ @Override
+ public void futureUpdated(Future fv) {
+ helper.awake();
+ }
+ }
+
+ private static class Helper implements Future {
+ private FutureListener l;
+ private boolean itUpdated;
+
+ @Override
+ public synchronized void addListener(FutureListener l) {
+ if (itUpdated) {
+ l.futureUpdated(this);
+ }
+ else {
+ this.l = l;
+ }
+ }
+
+ public synchronized void resetItUpdated() {
+ this.itUpdated = false;
+ }
+
+ public synchronized void awake() {
+ if (l == null) {
+ this.itUpdated = true;
+ }
+ else {
+ FutureListener l = this.l;
+ this.l = null;
+ l.futureUpdated(this);
+ }
+ }
+ }
}
More information about the Swift-commit
mailing list