[Swift-commit] r5844 - trunk/src/org/griphyn/vdl/karajan
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Mon Jul 16 23:54:36 CDT 2012
Author: hategan
Date: 2012-07-16 23:54:36 -0500 (Mon, 16 Jul 2012)
New Revision: 5844
Modified:
trunk/src/org/griphyn/vdl/karajan/HangChecker.java
Log:
use a graph for the implementation (cleaner code); if no loops found, also print a list of threads that don't depend on other threads
Modified: trunk/src/org/griphyn/vdl/karajan/HangChecker.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/HangChecker.java 2012-07-17 04:49:09 UTC (rev 5843)
+++ trunk/src/org/griphyn/vdl/karajan/HangChecker.java 2012-07-17 04:54:36 UTC (rev 5844)
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -38,7 +39,6 @@
import org.globus.cog.karajan.workflow.ExecutionException;
import org.globus.cog.karajan.workflow.events.EventBus;
import org.globus.cog.karajan.workflow.nodes.grid.SchedulerNode;
-import org.griphyn.vdl.mapping.AbstractDataNode;
import org.griphyn.vdl.mapping.DSHandle;
public class HangChecker extends TimerTask {
@@ -71,7 +71,15 @@
PrintStream ps = new PrintStream(os);
Monitor.dumpVariables(ps);
Monitor.dumpThreads(ps);
- findCycles(ps);
+ try {
+ Graph g = buildGraph();
+ if (!findCycles(ps, g)) {
+ findThreadsToBlame(ps, g);
+ }
+ }
+ catch (Exception e) {
+ logger.warn("Failed to build dependency graph", e);
+ }
logger.warn(os.toString());
ps.close();
}
@@ -83,9 +91,29 @@
}
}
- private static void findCycles(PrintStream ps) {
- Map<VariableStack, DSHandle> wt = WaitingThreadsMonitor.getAllThreads();
+ private void findThreadsToBlame(PrintStream ps, Graph g) {
+ Map<VariableStack, DSHandle> wt = WaitingThreadsMonitor.getAllThreads();
+ Set<VariableStack> sl = g.nodeSet();
+ Set<VariableStack> loners = new HashSet<VariableStack>(wt.keySet());
+ for (VariableStack s : sl) {
+ for (Graph.Edge e : g.getEdgesFrom(s)) {
+ loners.remove(e.to);
+ }
+ }
+ if (!loners.isEmpty()) {
+ ps.println();
+ ps.println("The following threads are independently hung:");
+ for (VariableStack s : loners) {
+ Monitor.dumpThread(ps, s, wt.get(s));
+ ps.println();
+ }
+ ps.println("----");
+ }
+ }
+
+ private Graph buildGraph() throws VariableNotFoundException {
Map<VariableStack, List<DSHandle>> ot = WaitingThreadsMonitor.getOutputs();
+ Map<VariableStack, DSHandle> wt = WaitingThreadsMonitor.getAllThreads();
Map<DSHandle, List<VariableStack>> rwt = new HashMap<DSHandle, List<VariableStack>>();
for (Map.Entry<VariableStack, DSHandle> e : wt.entrySet()) {
@@ -96,16 +124,44 @@
}
l.add(e.getKey());
}
-
+
+ Graph g = new Graph();
+
+ // if n1 -> n2, then n1 produces an output that is used by n2
+ for (Map.Entry<VariableStack, List<DSHandle>> e : ot.entrySet()) {
+ for (DSHandle h : e.getValue()) {
+ List<VariableStack> sl = rwt.get(h);
+ if (sl != null) {
+ for (VariableStack s : sl) {
+ g.addEdge(e.getKey(), s, h);
+ }
+ }
+ }
+
+ ThreadingContext tc;
+
+ tc = ThreadingContext.get(e.getKey());
+ for (VariableStack stk : ot.keySet()) {
+ if (tc.isStrictlySubContext(ThreadingContext.get(stk))) {
+ g.addEdge(e.getKey(), stk, null);
+ }
+ }
+ }
+
+ return g;
+ }
+
+ private static boolean findCycles(PrintStream ps, Graph g) {
System.out.print("Finding dependency loops...");
+ System.out.flush();
Set<VariableStack> seen = new HashSet<VariableStack>();
LinkedList<Object> cycle = new LinkedList<Object>();
List<LinkedList<Object>> cycles = new ArrayList<LinkedList<Object>>();
- for (VariableStack t : wt.keySet()) {
+ for (VariableStack t : g.nodeSet()) {
seen.clear();
cycle.clear();
- findLoop(t, rwt, ot, seen, cycle, cycles);
+ findLoop(t, g, seen, cycle, cycles);
}
System.out.println();
@@ -115,12 +171,21 @@
else if (cycles.size() > 1) {
ps.println(cycles.size() + " dependency loops found:");
}
+ else {
+ ps.println("No dependency loops found.");
+ }
int index = 0;
for (LinkedList<Object> c : cycles) {
index++;
if (cycles.size() > 1) {
ps.println("* " + index);
}
+
+ // rotate the cycle so that "the above must complete" is not the first thing.
+ while (c.getLast() == null) {
+ Object o1 = c.removeFirst(); Object o2 = c.removeFirst();
+ c.add(o1); c.add(o2);
+ }
Object prev = c.getLast();
for (Object o : c) {
if (o instanceof VariableStack) {
@@ -148,6 +213,7 @@
if (cycles.size() > 0) {
ps.println("----");
}
+ return !cycles.isEmpty();
}
private static boolean isDuplicate(List<LinkedList<Object>> cycles, LinkedList<Object> cycle) {
@@ -210,8 +276,7 @@
return true;
}
- private static void findLoop(VariableStack t, Map<DSHandle, List<VariableStack>> rwt,
- Map<VariableStack, List<DSHandle>> ot, Set<VariableStack> seen, LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
+ private static void findLoop(VariableStack t, Graph g, Set<VariableStack> seen, LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
if (cycles.size() > MAX_CYCLES) {
return;
}
@@ -232,47 +297,70 @@
}
cycle.add(t);
seen.add(t);
- // follow all the outputs of t
- followOutputs(t, rwt, ot, seen, cycle, cycles);
- // now follow all the outputs of parent threads to t
- try {
- ThreadingContext tc = ThreadingContext.get(t);
- for (VariableStack stk : ot.keySet()) {
- if (tc.isStrictlySubContext(ThreadingContext.get(stk))) {
- seen.add(stk);
- cycle.add(null);
- cycle.add(stk);
- followOutputs(stk, rwt, ot, seen, cycle, cycles);
- cycle.removeLast();
- cycle.removeLast();
- seen.remove(stk);
- }
- }
+ for (Graph.Edge e : g.getEdgesFrom(t)) {
+ cycle.add(e.contents);
+ findLoop(e.to, g, seen, cycle, cycles);
+ cycle.removeLast();
}
- catch (VariableNotFoundException e) {
- e.printStackTrace();
- }
+
cycle.removeLast();
seen.remove(t);
}
+
+ public static class Graph {
+ public static class Edge {
+ public final VariableStack to;
+ public final DSHandle contents;
+
+ public Edge(VariableStack to, DSHandle contents) {
+ this.to = to;
+ this.contents = contents;
+ }
+ }
+
+ private Map<VariableStack, List<Edge>> outEdges = new HashMap<VariableStack, List<Edge>>();
- private static void followOutputs(VariableStack t,
- Map<DSHandle, List<VariableStack>> rwt,
- Map<VariableStack, List<DSHandle>> ot, Set<VariableStack> seen,
- LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
- List<DSHandle> l = ot.get(t);
- if (l != null) {
- for (DSHandle h : l) {
- cycle.add(h);
- List<VariableStack> l2 = rwt.get(h);
- if (l2 != null) {
- for (VariableStack t2 : l2) {
- findLoop(t2, rwt, ot, seen, cycle, cycles);
- }
+ public void addEdge(VariableStack from, VariableStack to, DSHandle contents) {
+ List<Edge> l = outEdges.get(from);
+ if (l == null) {
+ l = new ArrayList<Edge>();
+ outEdges.put(from, l);
+ }
+ l.add(new Edge(to, contents));
+ }
+
+ public void dump(PrintStream ps) {
+ for (Map.Entry<VariableStack, List<Edge>> e : outEdges.entrySet()) {
+ for (Edge edge : e.getValue()) {
+ String tcf = getThreadingContext(e.getKey());
+ String tct = getThreadingContext(edge.to);
+ ps.println(tcf + " -> " + tct);
}
- cycle.removeLast();
}
}
+
+ private String getThreadingContext(VariableStack s) {
+ try {
+ return String.valueOf(ThreadingContext.get(s));
+ }
+ catch (VariableNotFoundException e) {
+ return "?";
+ }
+ }
+
+ public List<Edge> getEdgesFrom(VariableStack t) {
+ List<Edge> l = outEdges.get(t);
+ if (l == null) {
+ return Collections.emptyList();
+ }
+ else {
+ return l;
+ }
+ }
+
+ public Set<VariableStack> nodeSet() {
+ return outEdges.keySet();
+ }
}
}
More information about the Swift-commit
mailing list