[Swift-commit] r5844 - trunk/src/org/griphyn/vdl/karajan

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Mon Jul 16 23:54:36 CDT 2012


Author: hategan
Date: 2012-07-16 23:54:36 -0500 (Mon, 16 Jul 2012)
New Revision: 5844

Modified:
   trunk/src/org/griphyn/vdl/karajan/HangChecker.java
Log:
use a graph for the implementation (cleaner code); if no loops found, also print a list of threads that don't depend on other threads

Modified: trunk/src/org/griphyn/vdl/karajan/HangChecker.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/HangChecker.java	2012-07-17 04:49:09 UTC (rev 5843)
+++ trunk/src/org/griphyn/vdl/karajan/HangChecker.java	2012-07-17 04:54:36 UTC (rev 5844)
@@ -20,6 +20,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -38,7 +39,6 @@
 import org.globus.cog.karajan.workflow.ExecutionException;
 import org.globus.cog.karajan.workflow.events.EventBus;
 import org.globus.cog.karajan.workflow.nodes.grid.SchedulerNode;
-import org.griphyn.vdl.mapping.AbstractDataNode;
 import org.griphyn.vdl.mapping.DSHandle;
 
 public class HangChecker extends TimerTask {
@@ -71,7 +71,15 @@
                     PrintStream ps = new PrintStream(os);
                     Monitor.dumpVariables(ps);
                     Monitor.dumpThreads(ps);
-                    findCycles(ps);
+                    try {
+                        Graph g = buildGraph();
+                        if (!findCycles(ps, g)) {
+                            findThreadsToBlame(ps, g);
+                        }
+                    }
+                    catch (Exception e) {
+                        logger.warn("Failed to build dependency graph", e);
+                    }
                     logger.warn(os.toString());
                     ps.close();
                 }
@@ -83,9 +91,29 @@
         }
     }
     
-    private static void findCycles(PrintStream ps) {
-    	Map<VariableStack, DSHandle> wt = WaitingThreadsMonitor.getAllThreads();
+    private void findThreadsToBlame(PrintStream ps, Graph g) {
+        Map<VariableStack, DSHandle> wt = WaitingThreadsMonitor.getAllThreads();
+        Set<VariableStack> sl = g.nodeSet();
+        Set<VariableStack> loners = new HashSet<VariableStack>(wt.keySet());
+        for (VariableStack s : sl) {
+            for (Graph.Edge e : g.getEdgesFrom(s)) {
+                loners.remove(e.to);
+            }
+        }
+        if (!loners.isEmpty()) {
+            ps.println();
+            ps.println("The following threads are independently hung:");
+            for (VariableStack s : loners) {
+                Monitor.dumpThread(ps, s, wt.get(s));
+                ps.println();
+            }
+            ps.println("----");
+        }
+    }
+
+    private Graph buildGraph() throws VariableNotFoundException {
         Map<VariableStack, List<DSHandle>> ot = WaitingThreadsMonitor.getOutputs();
+        Map<VariableStack, DSHandle> wt = WaitingThreadsMonitor.getAllThreads();
         Map<DSHandle, List<VariableStack>> rwt = new HashMap<DSHandle, List<VariableStack>>();
         
         for (Map.Entry<VariableStack, DSHandle> e : wt.entrySet()) {
@@ -96,16 +124,44 @@
             }
             l.add(e.getKey());
         }
-                
+        
+        Graph g = new Graph();
+
+        // if n1 -> n2, then n1 produces an output that is used by n2
+        for (Map.Entry<VariableStack, List<DSHandle>> e : ot.entrySet()) {
+            for (DSHandle h : e.getValue()) {
+                List<VariableStack> sl = rwt.get(h);
+                if (sl != null) {
+                    for (VariableStack s : sl) {
+                        g.addEdge(e.getKey(), s, h);
+                    }
+                }
+            }
+            
+            ThreadingContext tc;
+            
+            tc = ThreadingContext.get(e.getKey()); 
+            for (VariableStack stk : ot.keySet()) {
+                if (tc.isStrictlySubContext(ThreadingContext.get(stk))) {
+                    g.addEdge(e.getKey(), stk, null);
+                }
+            }
+        }
+        
+        return g;
+    }
+
+    private static boolean findCycles(PrintStream ps, Graph g) {                
         System.out.print("Finding dependency loops...");
+        System.out.flush();
         
         Set<VariableStack> seen = new HashSet<VariableStack>();
         LinkedList<Object> cycle = new LinkedList<Object>();
         List<LinkedList<Object>> cycles = new ArrayList<LinkedList<Object>>();
-        for (VariableStack t : wt.keySet()) {
+        for (VariableStack t : g.nodeSet()) {
             seen.clear();
             cycle.clear();
-            findLoop(t, rwt, ot, seen, cycle, cycles);
+            findLoop(t, g, seen, cycle, cycles);
         }
         System.out.println();
         
@@ -115,12 +171,21 @@
         else if (cycles.size() > 1) {
         	ps.println(cycles.size() + " dependency loops found:");
         }
+        else {
+            ps.println("No dependency loops found.");
+        }
         int index = 0;
         for (LinkedList<Object> c : cycles) {
             index++;
             if (cycles.size() > 1) {
                 ps.println("* " + index);
             }
+            
+            // rotate the cycle so that "the above must complete" is not the first thing.
+            while (c.getLast() == null) {
+                Object o1 = c.removeFirst(); Object o2 = c.removeFirst();
+                c.add(o1); c.add(o2);
+            }
             Object prev = c.getLast();
             for (Object o : c) {
                 if (o instanceof VariableStack) {
@@ -148,6 +213,7 @@
         if (cycles.size() > 0) {
         	ps.println("----");
         }
+        return !cycles.isEmpty();
     }
         
     private static boolean isDuplicate(List<LinkedList<Object>> cycles, LinkedList<Object> cycle) {
@@ -210,8 +276,7 @@
         return true;
     }
 
-    private static void findLoop(VariableStack t, Map<DSHandle, List<VariableStack>> rwt,
-            Map<VariableStack, List<DSHandle>> ot, Set<VariableStack> seen, LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
+    private static void findLoop(VariableStack t, Graph g, Set<VariableStack> seen, LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
         if (cycles.size() > MAX_CYCLES) {
             return;
         }
@@ -232,47 +297,70 @@
         }
         cycle.add(t);
         seen.add(t);
-        // follow all the outputs of t
-        followOutputs(t, rwt, ot, seen, cycle, cycles);
         
-        // now follow all the outputs of parent threads to t
-        try {
-            ThreadingContext tc = ThreadingContext.get(t);
-            for (VariableStack stk : ot.keySet()) {
-                if (tc.isStrictlySubContext(ThreadingContext.get(stk))) {
-                    seen.add(stk);
-                    cycle.add(null);
-                    cycle.add(stk);
-                    followOutputs(stk, rwt, ot, seen, cycle, cycles);
-                    cycle.removeLast();
-                    cycle.removeLast();
-                    seen.remove(stk);
-                }
-            }
+        for (Graph.Edge e : g.getEdgesFrom(t)) {
+            cycle.add(e.contents);
+            findLoop(e.to, g, seen, cycle, cycles);
+            cycle.removeLast();
         }
-        catch (VariableNotFoundException e) {
-            e.printStackTrace();
-        }
+
         cycle.removeLast();
         seen.remove(t);
     }
+    
+    public static class Graph {
+        public static class Edge {
+            public final VariableStack to;
+            public final DSHandle contents;
+            
+            public Edge(VariableStack to, DSHandle contents) {
+                this.to = to;
+                this.contents = contents;
+            }
+        }
+        
+        private Map<VariableStack, List<Edge>> outEdges = new HashMap<VariableStack, List<Edge>>();
 
-    private static void followOutputs(VariableStack t,
-            Map<DSHandle, List<VariableStack>> rwt,
-            Map<VariableStack, List<DSHandle>> ot, Set<VariableStack> seen,
-            LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
-        List<DSHandle> l = ot.get(t);
-        if (l != null) {
-            for (DSHandle h : l) {
-                cycle.add(h);
-                List<VariableStack> l2 = rwt.get(h);
-                if (l2 != null) {
-                    for (VariableStack t2 : l2) {
-                        findLoop(t2, rwt, ot, seen, cycle, cycles);
-                    }
+        public void addEdge(VariableStack from, VariableStack to, DSHandle contents) {
+            List<Edge> l = outEdges.get(from);
+            if (l == null) {
+                l = new ArrayList<Edge>();
+                outEdges.put(from, l);
+            }
+            l.add(new Edge(to, contents));
+        }
+
+        public void dump(PrintStream ps) {
+            for (Map.Entry<VariableStack, List<Edge>> e : outEdges.entrySet()) {
+                for (Edge edge : e.getValue()) {
+                    String tcf = getThreadingContext(e.getKey());
+                    String tct = getThreadingContext(edge.to);
+                    ps.println(tcf + " -> " + tct);
                 }
-                cycle.removeLast();
             }
         }
+
+        private String getThreadingContext(VariableStack s) {
+            try {
+                return String.valueOf(ThreadingContext.get(s));
+            }
+            catch (VariableNotFoundException e) {
+                return "?";
+            }
+        }
+
+        public List<Edge> getEdgesFrom(VariableStack t) {
+            List<Edge> l = outEdges.get(t);
+            if (l == null) {
+                return Collections.emptyList();
+            }
+            else {
+                return l;
+            }
+        }
+
+        public Set<VariableStack> nodeSet() {
+            return outEdges.keySet();
+        }
     }
 }




More information about the Swift-commit mailing list