[Swift-commit] r5831 - in trunk: libexec resources src/org/griphyn/vdl/engine src/org/griphyn/vdl/karajan src/org/griphyn/vdl/karajan/lib

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Sat Jul 14 01:13:43 CDT 2012


Author: hategan
Date: 2012-07-14 01:13:42 -0500 (Sat, 14 Jul 2012)
New Revision: 5831

Added:
   trunk/src/org/griphyn/vdl/karajan/lib/UnitEnd.java
   trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java
Modified:
   trunk/libexec/vdl-lib.xml
   trunk/resources/Karajan.stg
   trunk/resources/swiftscript.stg
   trunk/resources/swiftscript.xsd
   trunk/src/org/griphyn/vdl/engine/Karajan.java
   trunk/src/org/griphyn/vdl/karajan/ArrayIndexFutureList.java
   trunk/src/org/griphyn/vdl/karajan/DSHandleFutureWrapper.java
   trunk/src/org/griphyn/vdl/karajan/FuturePairIterator.java
   trunk/src/org/griphyn/vdl/karajan/HangChecker.java
   trunk/src/org/griphyn/vdl/karajan/Monitor.java
   trunk/src/org/griphyn/vdl/karajan/WaitingThreadsMonitor.java
   trunk/src/org/griphyn/vdl/karajan/lib/New.java
Log:
slightly nicer hang checker output (i.e. stack traces), print line where variables are declared when dumping them, and dependency cycle detection in the hang checker

Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/libexec/vdl-lib.xml	2012-07-14 06:13:42 UTC (rev 5831)
@@ -107,6 +107,9 @@
 
 	<export name="log"><elementDef classname="org.griphyn.vdl.karajan.lib.Log"/></export>
 
+	<export name="unitStart"><elementDef classname="org.griphyn.vdl.karajan.lib.UnitStart"/></export>
+	<export name="unitEnd"><elementDef classname="org.griphyn.vdl.karajan.lib.UnitEnd"/></export>
+
 	<export name="kickstart"><elementDef classname="org.griphyn.vdl.karajan.lib.Kickstart"/></export>
 
 	<export name="dirname"><elementDef classname="org.griphyn.vdl.karajan.lib.PathUtils"/></export>

Modified: trunk/resources/Karajan.stg
===================================================================
--- trunk/resources/Karajan.stg	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/resources/Karajan.stg	2012-07-14 06:13:42 UTC (rev 5831)
@@ -58,7 +58,7 @@
 >>
 
 procedure(name,outputs,inputs,arguments,optargs,binding,declarations,statements,config,line,cleanups) ::= <<
-<element name="$name$"$if(arguments)$ arguments="$proc_args(args=arguments)$"$endif$$if(optargs)$ optargs="$proc_args(args=optargs)$"$endif$>
+<element name="$name$"$if(arguments)$ arguments="$proc_args(args=arguments)$"$endif$$if(optargs)$ optargs="$proc_args(args=optargs)$"$endif$ _defline="$line$">
   $optargs:default_arg();separator="\n"$
   $inputs:vdl_log_input();separator="\n"$
   $outputs:vdl_log_output();separator="\n"$
@@ -72,14 +72,14 @@
 >>
 
 compound(outputs,inputs,declarations,statements,config,name,cleanups) ::= <<
-<log level="info"><string>STARTCOMPOUND thread={#thread} name=$name$</string></log>
+<unitStart name="$name$" type="COMPOUND" outputs="$outputs:list();separator=","$"/>
 $declarations;separator="\n"$
 $if(statements)$
 $parallel(statements=statements)$
 $endif$
 $outputs:vdl_closedataset();separator="\n"$
 $cleanups:vdl_cleandataset();separator="\n"$
-<log level="info"><string>ENDCOMPOUND thread={#thread}</string></log>
+<unitEnd name="$name$" type="COMPOUND"/>
 >>
 
 proc_args(args) ::= <<
@@ -99,7 +99,7 @@
 >>
 
 vdl_execute(outputs,inputs,attributes,application,name,line) ::= <<
-<log level="debug" message="PROCEDURE line=$line$ thread={#thread} name=$name$"/>
+<unitStart name="$name$" line="$line$" type="PROCEDURE" outputs="$outputs:list();separator=","$"/>
 <vdl:execute>
   $attributes$
   <vdl:tr>$application.exec$</vdl:tr>
@@ -108,7 +108,7 @@
   $vdl_arguments(attributes=application.attributes,arguments=application.arguments, stdin=application.stdin,stdout=application.stdout,stderr=application.stderr)$
 </vdl:execute>
 $outputs:vdl_closedataset();separator="\n"$
-<log level="debug" message="PROCEDURE_END line=$line$"/>
+<unitEnd name="$name$" line="$line$" type="PROCEDURE"/>
 >>
 
 vdl_log_input() ::= <<
@@ -145,6 +145,8 @@
 <vdl:cleandataset var="{$it$}"/>
 >>
 
+list() ::= <<$it.name$>>
+
 vdl_arguments(attributes,arguments,stdin,stdout,stderr) ::= <<
   $attributes$
 <vdl:arguments>
@@ -202,8 +204,8 @@
 // the 'function' template outputs a karajan code fragment
 // that calls a function in the 'swiftscript' namespace.
 
-function(name, args, datatype) ::= <<
-<swiftscript:$name$>
+function(name, args, datatype, line) ::= <<
+<swiftscript:$name$ _traceline="$line$">
   $if(args)$ $args$ $endif$
 </swiftscript:$name$>
 >>
@@ -231,15 +233,14 @@
       <vdl:new type="$indexVarType$" value="{\$\$}"/>
     </set>
 $endif$
-	<log level="debug" message="FOREACH_IT_START line=$line$ thread={#thread}"/>
-<log level="debug"><string>SCOPE thread={#thread}</string></log>
+	<unitStart line="$line$" type="FOREACH_IT"/>
 
     $declarations;separator="\n"$
     $if(statements)$
       $parallel(statements=statements)$
       $cleanups:vdl_cleandataset();separator="\n"$
     $endif$
-    <log level="debug" message="FOREACH_IT_END line=$line$ thread={#thread}"/>
+    <unitEnd line="$line$" type="FOREACH_IT"/>
 </vdl:tparallelFor>
 >>
 
@@ -247,17 +248,17 @@
 // they are not
 // $outputs:vdl_log_output();separator="\n"$
 
-callInternal(func, outputs, inputs) ::= <<
+callInternal(func, outputs, inputs, line) ::= <<
 <sequential>
-<log level="debug" message="INTERNALPROC_START thread={#thread} name=$func$"/>
+<unitStart name="$func$" type="INTERNALPROC" outputs="$outputs:list();separator=","$"/>
 <set name="swift#cs"><variable>#thread</variable></set>
-<$func$>
+<$func$ _traceline="$line$">
   <parallel>
     $outputs:callInternal_log_output();separator="\n"$
     $inputs:callInternal_log_input();separator="\n"$
   </parallel>
 </$func$>
-<log level="debug" message="INTERNALPROC_END thread={#thread}"/>
+<unitEnd name="$func$" type="INTERNALPROC"/>
 </sequential>
 >>
 
@@ -287,8 +288,8 @@
 </sequential>
 >>
 
-callUserDefined(func, outputs, inputs) ::= <<
-<$func$>
+callUserDefined(func, outputs, inputs, line) ::= <<
+<$func$ _traceline="$line$">
   <parallel>
     $outputs;separator="\n"$
     $inputs;separator="\n"$
@@ -312,19 +313,19 @@
 </global>
 >>
 
-variable(name,type,expr,mapping,nil,file,waitfor,datatype,isGlobal) ::= <<
+variable(name,type,expr,mapping,nil,file,waitfor,datatype,isGlobal,line) ::= <<
 $if(isGlobal)$<global name="$name$">$else$<set name="$name$">$endif$
   $if(mapping)$
-  <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$>
+  <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$ _defline="$line$">
     $vdl_mapping(mapping=mapping,file=file,waitfor=waitfor)$
   </vdl:new>
   $else$
     $if(file)$
-      <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$>
+      <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$ _defline="$line$">
         $vdl_mapping(mapping=mapping,file=file,waitfor=waitfor)$
       </vdl:new>
     $else$
-      <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$ />
+      <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$ _defline="$line$"/>
     $endif$
   $endif$
 $if(isGlobal)$</global>$else$</set>$endif$
@@ -445,12 +446,12 @@
 <if>
   <vdl:getfieldvalue>$condition$</vdl:getfieldvalue>
   <then>
-<log level="debug"><string>SCOPE thread={#thread}</string></log>
+  	<unitStart type="CONDITION_BLOCK"/>
     $vthen$
   </then>
   $if(velse)$
   <else>
-<log level="debug"><string>SCOPE thread={#thread}</string></log>
+  	<unitStart type="CONDITION_BLOCK"/>
     $velse$
   </else>
   $endif$

Modified: trunk/resources/swiftscript.stg
===================================================================
--- trunk/resources/swiftscript.stg	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/resources/swiftscript.stg	2012-07-14 06:13:42 UTC (rev 5831)
@@ -71,13 +71,13 @@
 
 variable(type,name, sourcelocation, mapping, lfn, global) ::= <<
 $if(lfn)$
-<variable name="$name$" type="$type$" isGlobal="$global$"><file name="$lfn$"/></variable>
+<variable name="$name$" type="$type$" isGlobal="$global$" src="$sourcelocation$"><file name="$lfn$"/></variable>
 $else$$if(mapping)$
-<variable name="$name$" type="$type$" isGlobal="$global$">
+<variable name="$name$" type="$type$" isGlobal="$global$" src="$sourcelocation$">
   $mapping$
 </variable>
 $else$
-<variable name="$name$" type="$type$" isGlobal="$global$" xsi:nil="true"/>$endif$$endif$
+<variable name="$name$" type="$type$" isGlobal="$global$" xsi:nil="true" src="$sourcelocation$"/>$endif$$endif$
 >>
 
 mapping(descriptor,params,sourcelocation) ::= <<
@@ -140,25 +140,10 @@
 parameter(type,name,outlink,defaultv,sourcelocation) ::= <<
 
 $if(outlink)$
-  <output
+<output name="$name$" type="$type$" $if(defaultv)$>$defaultv$</output>$else$ xsi:nil="true"/>$endif$
 $else$
-  <input
+<input name="$name$" type="$type$" $if(defaultv)$>$defaultv$</input>$else$ xsi:nil="true"/>$endif$
 $endif$
-
-name="$name$" type="$type$"
-
-$if(defaultv)$>
-$defaultv$
-
-$if(outlink)$
-  </output>
-$else$
-  </input>
-$endif$
-
-$else$
- xsi:nil="true" />
-$endif$
 >>
 
 app(exec,profiles,arguments,stdin,stdout,stderr,sourcelocation) ::= <<
@@ -193,7 +178,7 @@
 >>
 
 functionInvocation(name,args,sourcelocation) ::= <<
-<function name="$name$">
+<function name="$name$" src="$sourcelocation$">
 $if(args)$
   $args$
 $endif$

Modified: trunk/resources/swiftscript.xsd
===================================================================
--- trunk/resources/swiftscript.xsd	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/resources/swiftscript.xsd	2012-07-14 06:13:42 UTC (rev 5831)
@@ -234,6 +234,8 @@
         <xs:documentation>name of the mapping function</xs:documentation>
       </xs:annotation>
     </xs:attribute>
+    
+    <xs:attribute name="src" type="xs:string" />
   </xs:complexType>
 
   <!--
@@ -328,6 +330,8 @@
     <xs:attribute name="type" type="xs:QName" use="required"/>
 
     <xs:attribute name="isGlobal" type="xs:boolean"/>
+    
+    <xs:attribute name="src" type="xs:string"/>
   </xs:complexType>
 
   <xs:complexType name="ActualParameter">

Modified: trunk/src/org/griphyn/vdl/engine/Karajan.java
===================================================================
--- trunk/src/org/griphyn/vdl/engine/Karajan.java	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/engine/Karajan.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -350,6 +350,10 @@
 	    }
     }
 	
+	private String getLine(String src) {
+		return src.substring(src.indexOf(' ') + 1);
+	}
+	
 	private void setVariableUsed(String s) {
 	    usedVariables.add(s);
     }
@@ -359,7 +363,7 @@
 		VariableScope innerScope = new VariableScope(this, outerScope, VariableScope.ENCLOSURE_NONE);
 		StringTemplate procST = template("procedure");
 		containingScope.bodyTemplate.setAttribute("procedures", procST);
-		procST.setAttribute("line", proc.getSrc().substring(proc.getSrc().indexOf(' ') + 1));
+		procST.setAttribute("line", getLine(proc.getSrc()));
 		procST.setAttribute("name", mangle(proc.getName()));
 		for (int i = 0; i < proc.sizeOfOutputArray(); i++) {
 			FormalParameter param = proc.getOutputArray(i);
@@ -405,7 +409,7 @@
      * original name contains a '_' it will be converted 
      * to "__" (two underscores).
      */
-	private String mangle(String name) {
+	public static String mangle(String name) {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < name.length(); i++) {
             char c = name.charAt(i);
@@ -416,6 +420,33 @@
         }
         return sb.toString();
     }
+	
+	public static String demangle(String name) {
+        StringBuilder sb = new StringBuilder();
+        boolean upper = false;
+        for (int i = 0; i < name.length(); i++) {
+            char c = name.charAt(i);
+            if (c == '_') {
+            	if (upper) {
+            	    sb.append("_");
+            	    upper = false;
+            	}
+            	else {
+            	    upper = true;
+            	}
+            }
+            else {
+            	if (upper) {
+            		upper = false;
+            		sb.append(Character.toUpperCase(c));
+            	}
+            	else {
+            	    sb.append(Character.toLowerCase(c));
+            	}
+            }
+        }
+        return sb.toString();
+    }
 
     public StringTemplate parameter(FormalParameter param, VariableScope scope) throws CompilationException {
 		StringTemplate paramST = new StringTemplate("parameter");
@@ -440,6 +471,7 @@
 		variableST.setAttribute("name", var.getName());
 		variableST.setAttribute("type", var.getType().getLocalPart());
 		variableST.setAttribute("isGlobal", Boolean.valueOf(var.getIsGlobal()));
+		variableST.setAttribute("line", getLine(var.getSrc()));
 		variables.add(variableST);
 
 		if(!var.isNil()) {
@@ -682,6 +714,7 @@
 				("Unknown procedure invocation mode "+proc.getInvocationMode());
 			}
 			callST.setAttribute("func", mangle(procName));
+			callST.setAttribute("line", getLine(call.getSrc()));
 			/* Does number of input arguments match */
 			for (int i = 0; i < proc.sizeOfInputArray(); i++) {
 				if (proc.getInputArray(i).isOptional())
@@ -1076,6 +1109,7 @@
 	public StringTemplate function(Function func, VariableScope scope) throws CompilationException {
 		StringTemplate funcST = template("function");
 		funcST.setAttribute("name", mangle(func.getName()));
+		funcST.setAttribute("line", getLine(func.getSrc()));
 		ProcedureSignature funcSignature =  functionsMap.get(func.getName());
 		if(funcSignature == null) {
 			throw new CompilationException("Unknown function: @"+func.getName());
@@ -1350,7 +1384,7 @@
 			if(arrayMode) {
 			    actualType = actualType + "[" + indexType + "]";
 				newst = template("slicearray");
-			} 
+			}
 			else {
 				newst = template("extractstructelement");
 			}

Modified: trunk/src/org/griphyn/vdl/karajan/ArrayIndexFutureList.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/ArrayIndexFutureList.java	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/ArrayIndexFutureList.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -37,6 +37,7 @@
 import org.globus.cog.karajan.workflow.futures.FutureNotYetAvailable;
 import org.globus.cog.karajan.workflow.futures.ListenerStackPair;
 import org.griphyn.vdl.mapping.ArrayDataNode;
+import org.griphyn.vdl.mapping.DSHandle;
 
 public class ArrayIndexFutureList implements FutureList, FutureWrapper {
     private ArrayList<Object> keys;
@@ -107,6 +108,10 @@
     public Object getValue() {
         return this;
     }
+    
+    public DSHandle getHandle() {
+        return node;
+    }
 
     public void addModificationAction(FutureListener target, VariableStack stack) {
         synchronized(node) {
@@ -114,7 +119,7 @@
                 listeners = new LinkedList<ListenerStackPair>();
             }
             listeners.add(new ListenerStackPair(target, stack));
-            WaitingThreadsMonitor.addThread(stack);
+            WaitingThreadsMonitor.addThread(stack, node);
             if (!node.isClosed()) {
                 return;
             }
@@ -139,7 +144,7 @@
             EventBus.post(new Runnable() {
                 public void run() {
                     lsp.listener.futureModified(ArrayIndexFutureList.this, lsp.stack);
-                } 
+                }
             });
         }
     }

Modified: trunk/src/org/griphyn/vdl/karajan/DSHandleFutureWrapper.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/DSHandleFutureWrapper.java	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/DSHandleFutureWrapper.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -53,7 +53,7 @@
                 listeners = new LinkedList<ListenerStackPair>();
             }
             listeners.add(new ListenerStackPair(target, stack));
-            WaitingThreadsMonitor.addThread(stack);
+            WaitingThreadsMonitor.addThread(stack, node);
             if (!node.isClosed()) {
                 return;
             }

Modified: trunk/src/org/griphyn/vdl/karajan/FuturePairIterator.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/FuturePairIterator.java	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/FuturePairIterator.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -110,7 +110,7 @@
 	}
 
 	public synchronized void addModificationAction(FutureListener target, VariableStack stack) {
-		WaitingThreadsMonitor.addThread(stack);
+		WaitingThreadsMonitor.addThread(stack, array.getHandle());
 		array.addModificationAction(target, stack);
 	}
 	

Modified: trunk/src/org/griphyn/vdl/karajan/HangChecker.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/HangChecker.java	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/HangChecker.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -19,6 +19,14 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -28,6 +36,7 @@
 import org.globus.cog.karajan.workflow.ExecutionException;
 import org.globus.cog.karajan.workflow.events.EventBus;
 import org.globus.cog.karajan.workflow.nodes.grid.SchedulerNode;
+import org.griphyn.vdl.mapping.DSHandle;
 
 public class HangChecker extends TimerTask {
     public static final Logger logger = Logger.getLogger(HangChecker.class);
@@ -58,6 +67,7 @@
                     PrintStream ps = new PrintStream(os);
                     Monitor.dumpVariables(ps);
                     Monitor.dumpThreads(ps);
+                    findCycles(ps);
                     logger.warn(os.toString());
                     ps.close();
                 }
@@ -68,4 +78,131 @@
             logger.warn("Exception caught during hang check", e);
         }
     }
+    
+    private static void findCycles(PrintStream ps) {
+    	Map<VariableStack, DSHandle> wt = WaitingThreadsMonitor.getAllThreads();
+        Map<VariableStack, List<DSHandle>> ot = WaitingThreadsMonitor.getOutputs();
+        Map<DSHandle, List<VariableStack>> rwt = new HashMap<DSHandle, List<VariableStack>>();
+        
+        for (Map.Entry<VariableStack, DSHandle> e : wt.entrySet()) {
+            List<VariableStack> l = rwt.get(e.getValue());
+            if (l == null) {
+                l = new LinkedList<VariableStack>();
+                rwt.put(e.getValue(), l);
+            }
+            l.add(e.getKey());
+        }
+        
+        Set<VariableStack> seen = new HashSet<VariableStack>();
+        LinkedList<Object> cycle = new LinkedList<Object>();
+        List<LinkedList<Object>> cycles = new ArrayList<LinkedList<Object>>();
+        for (VariableStack t : wt.keySet()) {
+            seen.clear();
+            cycle.clear();
+            findLoop(t, rwt, ot, seen, cycle, cycles);
+        }
+        
+        cycles = removeDuplicates(cycles);
+        
+        if (cycles.size() == 1) {
+            ps.println("Dependency loop found:");
+        }
+        else if (cycles.size() > 1) {
+        	ps.println(cycles.size() + " dependency loops found:");
+        }
+        int index = 0;
+        for (LinkedList<Object> c : cycles) {
+            index++;
+            if (cycles.size() > 1) {
+                ps.println("* " + index);
+            }
+            Object prev = c.getLast();
+            for (Object o : c) {
+                if (o instanceof VariableStack) {
+                    ps.println("\t" + Monitor.varWithLine((DSHandle) prev) + " is needed by: ");
+                    for (String t : Monitor.getSwiftTrace((VariableStack) o)) {
+                    	ps.println("\t\t" + t);
+                    }
+                }
+                else { 
+                    prev = o;
+                    ps.println("\tand produces " + Monitor.varWithLine((DSHandle) o));
+                    ps.println();
+                }
+            }
+        }
+        if (cycles.size() > 0) {
+        	ps.println("----");
+        }
+    }
+    
+    private static List<LinkedList<Object>> removeDuplicates(List<LinkedList<Object>> cycles) {
+    	List<LinkedList<Object>> nc = new LinkedList<LinkedList<Object>>();
+    	while (!cycles.isEmpty()) {
+    		Iterator<LinkedList<Object>> i = cycles.iterator();
+    		LinkedList<Object> first = i.next();
+    		i.remove();
+    		
+    		while (i.hasNext()) {
+    			if (isSameCycle(first, i.next())) {
+    				i.remove();
+    			}
+    		}
+    		nc.add(first);
+    	}
+    	return nc;
+    }
+
+    private static boolean isSameCycle(LinkedList<Object> a, LinkedList<Object> b) {
+        if (a.size() != b.size()) {
+        	return false;
+        }
+        Iterator<Object> i = a.iterator();
+        Object o = i.next();
+        Iterator<Object> j = b.iterator();
+        while (j.hasNext()) {
+        	if (o == j.next()) {
+        		while (i.hasNext()) {
+        		    if (!j.hasNext()) {
+        		        j = b.iterator();
+        		    }
+        		    if (i.next() != j.next()) {
+        		    	return false;
+        		    }
+        		}
+        		return true;
+        	}
+        }
+        return false;
+    }
+
+    private static void findLoop(VariableStack t, Map<DSHandle, List<VariableStack>> rwt,
+            Map<VariableStack, List<DSHandle>> ot, Set<VariableStack> seen, LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
+        if (t == null) {
+            return;
+        }
+        if (seen.contains(t)) {
+            cycles.add(new LinkedList<Object>(cycle));
+            return;
+        }
+        cycle.add(t);
+        seen.add(t);
+        // follow all the outputs of t
+        List<DSHandle> l = ot.get(t);
+        if (l != null) {
+            for (DSHandle h : l) {
+                cycle.add(h);
+                List<VariableStack> l2 = rwt.get(h);
+                if (l2 != null) {
+                    for (VariableStack t2 : l2) {
+                        findLoop(t2, rwt, ot, seen, cycle, cycles);
+                    }
+                }
+                cycle.removeLast();
+            }
+        }
+        cycle.removeLast();
+        seen.remove(t);
+    }
+
 }

Modified: trunk/src/org/griphyn/vdl/karajan/Monitor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/Monitor.java	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/Monitor.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -28,6 +28,7 @@
 import java.awt.event.ActionListener;
 import java.awt.event.MouseEvent;
 import java.awt.event.MouseListener;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -36,10 +37,13 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.swing.BorderFactory;
 import javax.swing.JButton;
@@ -56,6 +60,9 @@
 import org.globus.cog.karajan.util.ThreadingContext;
 import org.globus.cog.karajan.workflow.events.EventTargetPair;
 import org.globus.cog.karajan.workflow.futures.Future;
+import org.globus.cog.karajan.workflow.nodes.FlowElement;
+import org.globus.cog.karajan.workflow.nodes.FlowNode;
+import org.griphyn.vdl.engine.Karajan;
 import org.griphyn.vdl.mapping.AbstractDataNode;
 import org.griphyn.vdl.mapping.ArrayDataNode;
 import org.griphyn.vdl.mapping.DSHandle;
@@ -184,15 +191,15 @@
 			crtdisp = THREADS;
 			ArrayList<String> al = new ArrayList<String>();
 			wt = new ArrayList<VariableStack>();
-			Collection<VariableStack> c = WaitingThreadsMonitor.getAllThreads();
-			for (VariableStack stack : c) {
+			Map<VariableStack, DSHandle> c = WaitingThreadsMonitor.getAllThreads();
+			for (Map.Entry<VariableStack, DSHandle> entry : c.entrySet()) {
 				try {
-					al.add(String.valueOf(ThreadingContext.get(stack)));
+					al.add(String.valueOf(ThreadingContext.get(entry.getKey())));
 				}
 				catch (VariableNotFoundException e1) {
 					al.add("unknown thread");
 				}
-				wt.add(stack);
+				wt.add(entry.getKey());
 			}
 
 			ThreadModel m = new ThreadModel(al);
@@ -217,44 +224,51 @@
 	public static void dumpVariables(PrintStream ps) {
 		ps.println("\nRegistered futures:");
 		Map<DSHandle, Future> map = FutureTracker.get().getMap();
+		Map<DSHandle, Future> copy;
 		synchronized (map) {
-			for (Map.Entry<DSHandle, Future> en : map.entrySet()) {
-				Future f = en.getValue();
-				AbstractDataNode handle = (AbstractDataNode) en.getKey();
-				String value = "-";
-				try {
-					if (handle.getValue() != null) {
-						value = "";
-					}
+		    copy = new HashMap<DSHandle, Future>(map);
+		}
+		for (Map.Entry<DSHandle, Future> en : copy.entrySet()) {
+			Future f = en.getValue();
+			AbstractDataNode handle = (AbstractDataNode) en.getKey();
+			String value = "-";
+			try {
+				if (handle.getValue() != null) {
+					value = "";
 				}
-				catch (DependentException e) {
-					value = "<dependent exception>";
-				}
-				catch (Exception e) {
-				    value = "<exception>";
-				}
-				try {
-				    ps.println(handle.getType() + " " + handle.getDisplayableName() + " " + value + " " + f);
-				}
-				catch (Exception e) {
-				    ps.println(handle.getDisplayableName() + " - error");
-				    e.printStackTrace(ps);
-				}
 			}
+			catch (DependentException e) {
+				value = "<dependent exception>";
+			}
+			catch (Exception e) {
+			    value = "<exception>";
+			}
+			try {
+			    ps.println(handle.getType() + " " + handle.getDisplayableName() + " " + value + " " + f);
+			}
+			catch (Exception e) {
+			    ps.println(handle.getDisplayableName() + " - error");
+			    e.printStackTrace(ps);
+			}
 			ps.println("----");
 		}
 	}
-
-	public void dumpThreads() {
+	
+    public void dumpThreads() {
 		dumpThreads(System.out);
 	}
 
 	public static void dumpThreads(PrintStream pw) {
 		pw.println("\nWaiting threads:");
-		Collection<VariableStack> c = WaitingThreadsMonitor.getAllThreads();
-		for (VariableStack stack : c) {
+		Map<VariableStack, DSHandle> c = WaitingThreadsMonitor.getAllThreads();
+		for (Map.Entry<VariableStack, DSHandle> e : c.entrySet()) {
 			try {
-				pw.println(String.valueOf(ThreadingContext.get(stack)));
+				pw.println("Thread: " + String.valueOf(ThreadingContext.get(e.getKey())) + ", waiting on " 
+						+ varWithLine(e.getValue()));
+
+				for (String t : getSwiftTrace(e.getKey())) {
+					pw.println("\t" + t);
+				}
 			}
 			catch (VariableNotFoundException e1) {
 				pw.println("unknown thread");
@@ -263,7 +277,49 @@
 		pw.println("----");
 	}
 
-	public class VariableModel extends AbstractTableModel {
+	public static String varWithLine(DSHandle value) {
+		String line = value.getRoot().getParam("line");
+		return value.getRoot().getParam("dbgname") + 
+            (value == value.getRoot() ? "" : value.getPathFromRoot()) + 
+            (line == null ? "" : " (declared on line " + line + ")");
+    }
+    
+    public static String getLastCall(VariableStack stack) {
+        List<Object> trace = Trace.getAsList(stack);
+        for (Object o : trace) {
+            if (o instanceof FlowNode) {
+                FlowNode n = (FlowNode) o;
+                String traceLine = (String) n.getProperty("_traceline");
+                if (traceLine != null) {
+                    return(Karajan.demangle(n.getTextualName()) + ", " + 
+                            fileName(n) + ", line " + traceLine);
+                }
+            }
+        }
+        return "?";
+    }
+    
+    public static List<String> getSwiftTrace(VariableStack stack) {
+    	List<String> ret = new ArrayList<String>();
+    	List<Object> trace = Trace.getAsList(stack);
+        for (Object o : trace) {
+            if (o instanceof FlowNode) {
+                FlowNode n = (FlowNode) o;
+                String traceLine = (String) n.getProperty("_traceline");
+                if (traceLine != null) {
+                	ret.add(Karajan.demangle(n.getTextualName()) + ", " + 
+                            fileName(n) + ", line " + traceLine);
+                }
+            }
+        }
+        return ret;
+    }
+
+    private static String fileName(FlowNode n) {
+        return new File((String) FlowNode.getTreeProperty(FlowElement.FILENAME, n)).getName().replace(".kml", ".swift");
+    }
+
+    public class VariableModel extends AbstractTableModel {
 		private List<Object[]> l;
 
 		public VariableModel(List<List<Object>> lp) {

Modified: trunk/src/org/griphyn/vdl/karajan/WaitingThreadsMonitor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/WaitingThreadsMonitor.java	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/WaitingThreadsMonitor.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -20,39 +20,52 @@
  */
 package org.griphyn.vdl.karajan;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.globus.cog.karajan.stack.VariableStack;
+import org.griphyn.vdl.mapping.DSHandle;
 
 public class WaitingThreadsMonitor {
-	private static Set<VariableStack> threads;
+	private static Map<VariableStack, DSHandle> threads = new HashMap<VariableStack, DSHandle>();
+	private static Map<VariableStack, List<DSHandle>> outputs = new HashMap<VariableStack, List<DSHandle>>();;
 	
-	public synchronized static void addThread(VariableStack stack) {
+	public static void addThread(VariableStack stack, DSHandle waitingOn) {
 	    if (stack != null) {
-	        getThreads().add(stack);
+	        synchronized(threads) {
+	            threads.put(stack, waitingOn);
+	        }
 	    }
 	}
-	
-	private static synchronized Set<VariableStack> getThreads() {
-		if (threads == null) {
-			threads = new HashSet<VariableStack>();
-		}
-		return threads;
+		
+	public static void removeThread(VariableStack stack) {
+	    synchronized(threads) {
+	        threads.remove(stack);
+	    }
 	}
 	
-	public synchronized static void removeThread(VariableStack stack) {
-		getThreads().remove(stack);
+	public static Map<VariableStack, DSHandle> getAllThreads() {
+	    synchronized(threads) {
+	        return new HashMap<VariableStack, DSHandle>(threads);
+	    }
 	}
-	
-	public synchronized static Collection<VariableStack> getAllThreads() {
-		if (threads == null) {
-			return Collections.emptySet();
-		}
-		else {
-			return new HashSet<VariableStack>(threads);
-		}
-	}
+
+    public static void addOutput(VariableStack stack, List<DSHandle> outputs) {
+        synchronized(outputs) {
+            WaitingThreadsMonitor.outputs.put(stack, outputs);
+        }
+    }
+
+    public static void removeOutput(VariableStack stack) {
+        synchronized(outputs) {
+            outputs.remove(stack);
+        }
+    }
+    
+    public static Map<VariableStack, List<DSHandle>> getOutputs() {
+        synchronized(outputs) {
+            return new HashMap<VariableStack, List<DSHandle>>(outputs);
+        }
+    }
 }

Modified: trunk/src/org/griphyn/vdl/karajan/lib/New.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/New.java	2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/lib/New.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -52,7 +52,7 @@
 
 	static {
 		setArguments(New.class,
-				new Arg[] { OA_TYPE, OA_MAPPING, OA_VALUE, OA_DBGNAME, OA_WAITFOR, });
+				new Arg[] { OA_TYPE, OA_MAPPING, OA_VALUE, OA_DBGNAME, OA_WAITFOR});
 	}
 
 	public Object function(VariableStack stack) throws ExecutionException {
@@ -63,14 +63,19 @@
 		    (Map<String,Object>) OA_MAPPING.getValue(stack);
 		String dbgname = TypeUtil.toString(OA_DBGNAME.getValue(stack));
 		String waitfor = (String) OA_WAITFOR.getValue(stack);
+		String line = (String) getProperty("_defline");
 
 		if (mapping == null) {
-			mapping = new HashMap<String,Object>();
+			mapping = new HashMap<String, Object>();
 		}
 
 		if (dbgname != null) {
 			mapping.put("dbgname", dbgname);
 		}
+		
+		if (line != null) {
+		    mapping.put("line", line);
+		}
 
 		mapping.put("swift#restartid", getThreadPrefix(stack) + ":" + dbgname);
 

Added: trunk/src/org/griphyn/vdl/karajan/lib/UnitEnd.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/UnitEnd.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/UnitEnd.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -0,0 +1,45 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 13, 2012
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.nodes.FlowNode;
+import org.griphyn.vdl.karajan.WaitingThreadsMonitor;
+
+public class UnitEnd extends FlowNode {    
+    public static final Arg.Positional TYPE = new Arg.Positional("type");
+    public static final Arg.Optional NAME = new Arg.Optional("name", null);
+    public static final Arg.Optional LINE = new Arg.Optional("line", null);
+    
+    @Override
+    public void execute(VariableStack stack) throws ExecutionException {
+        executeSimple(stack);
+        complete(stack);
+    }
+    
+    @Override
+    public boolean isSimple() {
+        return super.isSimple();
+    }
+    
+    @Override
+    public void executeSimple(VariableStack stack) throws ExecutionException {
+        String type = (String) TYPE.getStatic(this);
+        ThreadingContext thread = ThreadingContext.get(stack);
+        String name = (String) NAME.getStatic(this);
+        String line = (String) LINE.getStatic(this);
+        
+        UnitStart.log(false, type, thread, name, line);
+        WaitingThreadsMonitor.removeOutput(stack);
+    }
+}

Added: trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java	2012-07-14 06:13:42 UTC (rev 5831)
@@ -0,0 +1,95 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 13, 2012
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.nodes.FlowNode;
+import org.griphyn.vdl.karajan.WaitingThreadsMonitor;
+import org.griphyn.vdl.mapping.DSHandle;
+
+public class UnitStart extends FlowNode {
+    // keep compatibility with log()
+    public static final Logger logger = Logger.getLogger("swift");
+    
+    public static final Arg.Positional TYPE = new Arg.Positional("type");
+    public static final Arg.Optional NAME = new Arg.Optional("name", null);
+    public static final Arg.Optional LINE = new Arg.Optional("line", null);
+    public static final Arg.Optional OUTPUTS = new Arg.Optional("outputs", null);
+    
+    @Override
+    public void execute(VariableStack stack) throws ExecutionException {
+        executeSimple(stack);
+        complete(stack);
+    }
+    
+    @Override
+    public boolean isSimple() {
+        return super.isSimple();
+    }
+    
+    @Override
+    public void executeSimple(VariableStack stack) throws ExecutionException {
+        String type = (String) TYPE.getStatic(this);
+        ThreadingContext thread = ThreadingContext.get(stack);
+        String name = (String) NAME.getStatic(this);
+        String line = (String) LINE.getStatic(this);
+        
+        log(true, type, thread, name, line);
+        
+        String outputs = (String) OUTPUTS.getStatic(this);
+        if (outputs != null) {
+            trackOutputs(stack, outputs);
+        }
+    }
+
+    private void trackOutputs(VariableStack stack, String outputs) {
+        String[] names = outputs.split(",");
+        List<DSHandle> l = new LinkedList<DSHandle>();
+        for (String name : names) {
+            l.add((DSHandle) stack.parentFrame().getVar(name));
+        }
+        WaitingThreadsMonitor.addOutput(stack, l);
+    }
+
+    protected static void log(boolean start, String type, ThreadingContext thread, String name, String line) {
+        if (type.equals("COMPOUND")) {
+            logger.info((start ? "START" : "END") + type + " thread=" + thread + " name=" + name);
+        }
+        else if (type.equals("PROCEDURE")) {
+            if (start) {
+                logger.debug("PROCEDURE line=" + line + " thread=" + thread + " name=" + name);
+            }
+            else {
+                logger.debug("PROCEDURE_END line=" + line + " thread=" + thread + " name=" + name);
+            }
+        }
+        else if (type.equals("FOREACH_IT")) {
+            logger.debug("FOREACH_IT_" + (start ? "START" : "END") + " line=" + line + " thread=" + thread);
+            if (start) {
+                logger.debug("SCOPE thread=" + thread);
+            }
+        }
+        else if (type.equals("INTERNALPROC")) {
+            logger.debug("INTERNALPROC_" + (start ? "START" : "END") + "thread=" + thread + " name=" + name);
+        }
+        else if (type.equals("CONDITION_BLOCK")) {
+            if (start) {
+                logger.debug("SCOPE thread=" + thread);
+            }
+        }
+    }
+}




More information about the Swift-commit mailing list