[Swift-commit] r6253 - branches/faster/src/org/griphyn/vdl/karajan/lib

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Fri Feb 8 23:30:12 CST 2013


Author: hategan
Date: 2013-02-08 23:30:12 -0600 (Fri, 08 Feb 2013)
New Revision: 6253

Modified:
   branches/faster/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
Log:
fixe self-close semantics in foreach

Modified: branches/faster/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java
===================================================================
--- branches/faster/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java	2013-02-09 05:29:51 UTC (rev 6252)
+++ branches/faster/src/org/griphyn/vdl/karajan/lib/ThrottledParallelFor.java	2013-02-09 05:30:12 UTC (rev 6253)
@@ -23,7 +23,10 @@
 import java.util.List;
 import java.util.StringTokenizer;
 
+import k.rt.ConditionalYield;
 import k.rt.ExecutionException;
+import k.rt.Future;
+import k.rt.FutureListener;
 import k.rt.KRunnable;
 import k.rt.Stack;
 import k.thr.LWThread;
@@ -57,6 +60,8 @@
 	private ArgRef<String> _kvar;
 	private ArgRef<String> _vvar;
 	private ArgRef<String> _traceline;
+	
+	private boolean sc;
 	    
     @Override
     protected Signature getSignature() {
@@ -111,6 +116,7 @@
         }
         forTracer = Tracer.getTracer(this, "FOREACH");
         iterationTracer = Tracer.getTracer(this, "ITERATION");
+        sc = selfClose.getValue();
         return super.compileBody(w, argScope, scope);
     }
     
@@ -152,7 +158,7 @@
     protected void runBody(final LWThread thr) {        
         int i = thr.checkSliceAndPopState();
         Iterator<Object> it = (Iterator<Object>) thr.popState();
-        ThrottledThreadSet ts = (ThrottledThreadSet) thr.popState();
+        TPFThreadSet ts = (TPFThreadSet) thr.popState();
         int fc = thr.popIntState();
         List<RefCount> drefs = (List<RefCount>) thr.popState();
         Stack stack = thr.getStack();
@@ -160,7 +166,13 @@
             switch(i) {
                 case 0:
                     it = in.getValue(stack).iterator();
-                    ts = new ThrottledThreadSet(getMaxThreads());
+                    if (sc) {
+                        ts = new TPFSCThreadSet(it, getMaxThreads());
+                    }
+                    else {
+                        ts = new TPFThreadSet(it, getMaxThreads());
+                    }
+                    
                     drefs = buildRefs(stack);
                     ts.lock();
                     fc = stack.frameCount() + 1;
@@ -175,8 +187,8 @@
                     
                     ts.checkFailed();
                     
-                    startBulk(thr, ts, it, fc, drefs);
-                    startRest(thr, ts, it, fc, drefs);
+                    startBulk(thr, ts, fc, drefs);
+                    startRest(thr, ts, fc, drefs);
                     
                     ts.unlock();
                     decRefs(drefs);
@@ -195,12 +207,12 @@
         }
     }
 
-	private boolean startBulk(LWThread thr, ThrottledThreadSet ts, Iterator<Object> it, int fcf, List<RefCount> refs) {
+	private boolean startBulk(LWThread thr, TPFThreadSet ts, int fcf, List<RefCount> refs) {
 	    int available = ts.freeSlots();
 	    int j = 0;
 	    Stack stack = thr.getStack();
-	    for (; j < available && it.hasNext(); j++) {
-	        if (startOne(thr, ts, it.next(), fcf, refs)) {
+	    for (; j < available && ts.hasNext(); j++) {
+	        if (startOne(thr, ts, ts.next(), fcf, refs)) {
                 // aborted
                 return true;
             }
@@ -208,17 +220,27 @@
         return false;
     }
 	
-	private boolean startRest(LWThread thr, ThrottledThreadSet ts, Iterator<Object> it, int fcf, List<RefCount> refs) {
+	private boolean startRest(LWThread thr, TPFThreadSet ts, int fcf, List<RefCount> refs) {
         Stack stack = thr.getStack();
-        while (it.hasNext()) {
+        while (ts.hasNext()) {
             ts.waitForSlot();
-            if (startOne(thr, ts, it.next(), fcf, refs)) {
+            if (startOne(thr, ts, ts.next(), fcf, refs)) {
                 return true;
             }
         }
         return false;
     }
 
+    private boolean iteratorHasValues(Iterator<Object> it) {
+        try {
+            it.hasNext();
+            return true;
+        }
+        catch (ConditionalYield y) {
+            return false;
+        }
+    }
+
     private boolean startOne(final LWThread thr, final ThreadSet ts, final Object value, final int fcf, List<RefCount> refs) {
         incRefs(refs);
         LWThread ct = thr.fork(new KRunnable() {
@@ -307,4 +329,99 @@
     public String getTextualName() {
         return "foreach";
     }
+	
+	private static class TPFThreadSet extends ThrottledThreadSet {
+	    protected final Iterator<Object> it;
+	    
+	    public TPFThreadSet(Iterator<Object> it, int maxThreads) {
+	        super(maxThreads);
+	        this.it = it;
+	    }
+
+        public boolean hasNext() {
+            return it.hasNext();
+        }
+        
+        public Object next() {
+            return it.next();
+        }
+	}
+	
+	private static class TPFSCThreadSet extends TPFThreadSet implements FutureListener {
+	    private Helper helper;
+	    private boolean closed;
+	    
+	    public TPFSCThreadSet(Iterator<Object> it, int maxThreads) {
+	        super(it, maxThreads);
+	        helper = new Helper();
+	    }
+	    
+	    public synchronized boolean hasNext() {
+	        if (closed) {
+	            return false;
+	        }
+	        try {
+	            return it.hasNext();
+	        }
+	        catch (ConditionalYield y) {
+	            helper.resetItUpdated();
+	            y.getFuture().addListener(this);
+	            throw new ConditionalYield(helper);
+	        }
+	    }
+
+        @Override
+        public synchronized void threadDone(LWThread thr, ExecutionException e) {
+            super.threadDone(thr, e);
+            if (getRunning() == 1 && !iteratorHasValues()) {
+                closed = true;
+                helper.awake();
+            }
+        }
+
+        private boolean iteratorHasValues() {
+            try {
+                it.hasNext();
+                return true;
+            }
+            catch (Yield y) {
+                return false;
+            }
+        }
+
+        @Override
+        public void futureUpdated(Future fv) {
+            helper.awake();
+        }
+	}
+	
+	private static class Helper implements Future {
+	    private FutureListener l;
+	    private boolean itUpdated;
+
+        @Override
+        public synchronized void addListener(FutureListener l) {
+            if (itUpdated) {
+                l.futureUpdated(this);
+            }
+            else {
+                this.l = l;
+            }
+        }
+
+        public synchronized void resetItUpdated() {
+            this.itUpdated = false;
+        }
+
+        public synchronized void awake() {
+            if (l == null) {
+                this.itUpdated = true;
+            }
+            else {
+                FutureListener l = this.l;
+                this.l = null;
+                l.futureUpdated(this);
+            }
+        }
+	}
 }




More information about the Swift-commit mailing list