[Swift-commit] r5831 - in trunk: libexec resources src/org/griphyn/vdl/engine src/org/griphyn/vdl/karajan src/org/griphyn/vdl/karajan/lib
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Sat Jul 14 01:13:43 CDT 2012
Author: hategan
Date: 2012-07-14 01:13:42 -0500 (Sat, 14 Jul 2012)
New Revision: 5831
Added:
trunk/src/org/griphyn/vdl/karajan/lib/UnitEnd.java
trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java
Modified:
trunk/libexec/vdl-lib.xml
trunk/resources/Karajan.stg
trunk/resources/swiftscript.stg
trunk/resources/swiftscript.xsd
trunk/src/org/griphyn/vdl/engine/Karajan.java
trunk/src/org/griphyn/vdl/karajan/ArrayIndexFutureList.java
trunk/src/org/griphyn/vdl/karajan/DSHandleFutureWrapper.java
trunk/src/org/griphyn/vdl/karajan/FuturePairIterator.java
trunk/src/org/griphyn/vdl/karajan/HangChecker.java
trunk/src/org/griphyn/vdl/karajan/Monitor.java
trunk/src/org/griphyn/vdl/karajan/WaitingThreadsMonitor.java
trunk/src/org/griphyn/vdl/karajan/lib/New.java
Log:
slightly nicer hang checker output (i.e. stack traces), print line where variables are declared when dumping them, and dependency cycle detection in the hang checker
Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/libexec/vdl-lib.xml 2012-07-14 06:13:42 UTC (rev 5831)
@@ -107,6 +107,9 @@
<export name="log"><elementDef classname="org.griphyn.vdl.karajan.lib.Log"/></export>
+ <export name="unitStart"><elementDef classname="org.griphyn.vdl.karajan.lib.UnitStart"/></export>
+ <export name="unitEnd"><elementDef classname="org.griphyn.vdl.karajan.lib.UnitEnd"/></export>
+
<export name="kickstart"><elementDef classname="org.griphyn.vdl.karajan.lib.Kickstart"/></export>
<export name="dirname"><elementDef classname="org.griphyn.vdl.karajan.lib.PathUtils"/></export>
Modified: trunk/resources/Karajan.stg
===================================================================
--- trunk/resources/Karajan.stg 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/resources/Karajan.stg 2012-07-14 06:13:42 UTC (rev 5831)
@@ -58,7 +58,7 @@
>>
procedure(name,outputs,inputs,arguments,optargs,binding,declarations,statements,config,line,cleanups) ::= <<
-<element name="$name$"$if(arguments)$ arguments="$proc_args(args=arguments)$"$endif$$if(optargs)$ optargs="$proc_args(args=optargs)$"$endif$>
+<element name="$name$"$if(arguments)$ arguments="$proc_args(args=arguments)$"$endif$$if(optargs)$ optargs="$proc_args(args=optargs)$"$endif$ _defline="$line$">
$optargs:default_arg();separator="\n"$
$inputs:vdl_log_input();separator="\n"$
$outputs:vdl_log_output();separator="\n"$
@@ -72,14 +72,14 @@
>>
compound(outputs,inputs,declarations,statements,config,name,cleanups) ::= <<
-<log level="info"><string>STARTCOMPOUND thread={#thread} name=$name$</string></log>
+<unitStart name="$name$" type="COMPOUND" outputs="$outputs:list();separator=","$"/>
$declarations;separator="\n"$
$if(statements)$
$parallel(statements=statements)$
$endif$
$outputs:vdl_closedataset();separator="\n"$
$cleanups:vdl_cleandataset();separator="\n"$
-<log level="info"><string>ENDCOMPOUND thread={#thread}</string></log>
+<unitEnd name="$name$" type="COMPOUND"/>
>>
proc_args(args) ::= <<
@@ -99,7 +99,7 @@
>>
vdl_execute(outputs,inputs,attributes,application,name,line) ::= <<
-<log level="debug" message="PROCEDURE line=$line$ thread={#thread} name=$name$"/>
+<unitStart name="$name$" line="$line$" type="PROCEDURE" outputs="$outputs:list();separator=","$"/>
<vdl:execute>
$attributes$
<vdl:tr>$application.exec$</vdl:tr>
@@ -108,7 +108,7 @@
$vdl_arguments(attributes=application.attributes,arguments=application.arguments, stdin=application.stdin,stdout=application.stdout,stderr=application.stderr)$
</vdl:execute>
$outputs:vdl_closedataset();separator="\n"$
-<log level="debug" message="PROCEDURE_END line=$line$"/>
+<unitEnd name="$name$" line="$line$" type="PROCEDURE"/>
>>
vdl_log_input() ::= <<
@@ -145,6 +145,8 @@
<vdl:cleandataset var="{$it$}"/>
>>
+list() ::= <<$it.name$>>
+
vdl_arguments(attributes,arguments,stdin,stdout,stderr) ::= <<
$attributes$
<vdl:arguments>
@@ -202,8 +204,8 @@
// the 'function' template outputs a karajan code fragment
// that calls a function in the 'swiftscript' namespace.
-function(name, args, datatype) ::= <<
-<swiftscript:$name$>
+function(name, args, datatype, line) ::= <<
+<swiftscript:$name$ _traceline="$line$">
$if(args)$ $args$ $endif$
</swiftscript:$name$>
>>
@@ -231,15 +233,14 @@
<vdl:new type="$indexVarType$" value="{\$\$}"/>
</set>
$endif$
- <log level="debug" message="FOREACH_IT_START line=$line$ thread={#thread}"/>
-<log level="debug"><string>SCOPE thread={#thread}</string></log>
+ <unitStart line="$line$" type="FOREACH_IT"/>
$declarations;separator="\n"$
$if(statements)$
$parallel(statements=statements)$
$cleanups:vdl_cleandataset();separator="\n"$
$endif$
- <log level="debug" message="FOREACH_IT_END line=$line$ thread={#thread}"/>
+ <unitEnd line="$line$" type="FOREACH_IT"/>
</vdl:tparallelFor>
>>
@@ -247,17 +248,17 @@
// they are not
// $outputs:vdl_log_output();separator="\n"$
-callInternal(func, outputs, inputs) ::= <<
+callInternal(func, outputs, inputs, line) ::= <<
<sequential>
-<log level="debug" message="INTERNALPROC_START thread={#thread} name=$func$"/>
+<unitStart name="$func$" type="INTERNALPROC" outputs="$outputs:list();separator=","$"/>
<set name="swift#cs"><variable>#thread</variable></set>
-<$func$>
+<$func$ _traceline="$line$">
<parallel>
$outputs:callInternal_log_output();separator="\n"$
$inputs:callInternal_log_input();separator="\n"$
</parallel>
</$func$>
-<log level="debug" message="INTERNALPROC_END thread={#thread}"/>
+<unitEnd name="$func$" type="INTERNALPROC"/>
</sequential>
>>
@@ -287,8 +288,8 @@
</sequential>
>>
-callUserDefined(func, outputs, inputs) ::= <<
-<$func$>
+callUserDefined(func, outputs, inputs, line) ::= <<
+<$func$ _traceline="$line$">
<parallel>
$outputs;separator="\n"$
$inputs;separator="\n"$
@@ -312,19 +313,19 @@
</global>
>>
-variable(name,type,expr,mapping,nil,file,waitfor,datatype,isGlobal) ::= <<
+variable(name,type,expr,mapping,nil,file,waitfor,datatype,isGlobal,line) ::= <<
$if(isGlobal)$<global name="$name$">$else$<set name="$name$">$endif$
$if(mapping)$
- <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$>
+ <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$ _defline="$line$">
$vdl_mapping(mapping=mapping,file=file,waitfor=waitfor)$
</vdl:new>
$else$
$if(file)$
- <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$>
+ <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$ _defline="$line$">
$vdl_mapping(mapping=mapping,file=file,waitfor=waitfor)$
</vdl:new>
$else$
- <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$ />
+ <vdl:new type="$type$" dbgname="$name$"$if(waitfor)$ waitfor="$waitfor;separator=" "$"$endif$ _defline="$line$"/>
$endif$
$endif$
$if(isGlobal)$</global>$else$</set>$endif$
@@ -445,12 +446,12 @@
<if>
<vdl:getfieldvalue>$condition$</vdl:getfieldvalue>
<then>
-<log level="debug"><string>SCOPE thread={#thread}</string></log>
+ <unitStart type="CONDITION_BLOCK"/>
$vthen$
</then>
$if(velse)$
<else>
-<log level="debug"><string>SCOPE thread={#thread}</string></log>
+ <unitStart type="CONDITION_BLOCK"/>
$velse$
</else>
$endif$
Modified: trunk/resources/swiftscript.stg
===================================================================
--- trunk/resources/swiftscript.stg 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/resources/swiftscript.stg 2012-07-14 06:13:42 UTC (rev 5831)
@@ -71,13 +71,13 @@
variable(type,name, sourcelocation, mapping, lfn, global) ::= <<
$if(lfn)$
-<variable name="$name$" type="$type$" isGlobal="$global$"><file name="$lfn$"/></variable>
+<variable name="$name$" type="$type$" isGlobal="$global$" src="$sourcelocation$"><file name="$lfn$"/></variable>
$else$$if(mapping)$
-<variable name="$name$" type="$type$" isGlobal="$global$">
+<variable name="$name$" type="$type$" isGlobal="$global$" src="$sourcelocation$">
$mapping$
</variable>
$else$
-<variable name="$name$" type="$type$" isGlobal="$global$" xsi:nil="true"/>$endif$$endif$
+<variable name="$name$" type="$type$" isGlobal="$global$" xsi:nil="true" src="$sourcelocation$"/>$endif$$endif$
>>
mapping(descriptor,params,sourcelocation) ::= <<
@@ -140,25 +140,10 @@
parameter(type,name,outlink,defaultv,sourcelocation) ::= <<
$if(outlink)$
- <output
+<output name="$name$" type="$type$" $if(defaultv)$>$defaultv$</output>$else$ xsi:nil="true"/>$endif$
$else$
- <input
+<input name="$name$" type="$type$" $if(defaultv)$>$defaultv$</input>$else$ xsi:nil="true"/>$endif$
$endif$
-
-name="$name$" type="$type$"
-
-$if(defaultv)$>
-$defaultv$
-
-$if(outlink)$
- </output>
-$else$
- </input>
-$endif$
-
-$else$
- xsi:nil="true" />
-$endif$
>>
app(exec,profiles,arguments,stdin,stdout,stderr,sourcelocation) ::= <<
@@ -193,7 +178,7 @@
>>
functionInvocation(name,args,sourcelocation) ::= <<
-<function name="$name$">
+<function name="$name$" src="$sourcelocation$">
$if(args)$
$args$
$endif$
Modified: trunk/resources/swiftscript.xsd
===================================================================
--- trunk/resources/swiftscript.xsd 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/resources/swiftscript.xsd 2012-07-14 06:13:42 UTC (rev 5831)
@@ -234,6 +234,8 @@
<xs:documentation>name of the mapping function</xs:documentation>
</xs:annotation>
</xs:attribute>
+
+ <xs:attribute name="src" type="xs:string" />
</xs:complexType>
<!--
@@ -328,6 +330,8 @@
<xs:attribute name="type" type="xs:QName" use="required"/>
<xs:attribute name="isGlobal" type="xs:boolean"/>
+
+ <xs:attribute name="src" type="xs:string"/>
</xs:complexType>
<xs:complexType name="ActualParameter">
Modified: trunk/src/org/griphyn/vdl/engine/Karajan.java
===================================================================
--- trunk/src/org/griphyn/vdl/engine/Karajan.java 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/engine/Karajan.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -350,6 +350,10 @@
}
}
+ private String getLine(String src) {
+ return src.substring(src.indexOf(' ') + 1);
+ }
+
private void setVariableUsed(String s) {
usedVariables.add(s);
}
@@ -359,7 +363,7 @@
VariableScope innerScope = new VariableScope(this, outerScope, VariableScope.ENCLOSURE_NONE);
StringTemplate procST = template("procedure");
containingScope.bodyTemplate.setAttribute("procedures", procST);
- procST.setAttribute("line", proc.getSrc().substring(proc.getSrc().indexOf(' ') + 1));
+ procST.setAttribute("line", getLine(proc.getSrc()));
procST.setAttribute("name", mangle(proc.getName()));
for (int i = 0; i < proc.sizeOfOutputArray(); i++) {
FormalParameter param = proc.getOutputArray(i);
@@ -405,7 +409,7 @@
* original name contains a '_' it will be converted
* to "__" (two underscores).
*/
- private String mangle(String name) {
+ public static String mangle(String name) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < name.length(); i++) {
char c = name.charAt(i);
@@ -416,6 +420,33 @@
}
return sb.toString();
}
+
+ public static String demangle(String name) {
+ StringBuilder sb = new StringBuilder();
+ boolean upper = false;
+ for (int i = 0; i < name.length(); i++) {
+ char c = name.charAt(i);
+ if (c == '_') {
+ if (upper) {
+ sb.append("_");
+ upper = false;
+ }
+ else {
+ upper = true;
+ }
+ }
+ else {
+ if (upper) {
+ upper = false;
+ sb.append(Character.toUpperCase(c));
+ }
+ else {
+ sb.append(Character.toLowerCase(c));
+ }
+ }
+ }
+ return sb.toString();
+ }
public StringTemplate parameter(FormalParameter param, VariableScope scope) throws CompilationException {
StringTemplate paramST = new StringTemplate("parameter");
@@ -440,6 +471,7 @@
variableST.setAttribute("name", var.getName());
variableST.setAttribute("type", var.getType().getLocalPart());
variableST.setAttribute("isGlobal", Boolean.valueOf(var.getIsGlobal()));
+ variableST.setAttribute("line", getLine(var.getSrc()));
variables.add(variableST);
if(!var.isNil()) {
@@ -682,6 +714,7 @@
("Unknown procedure invocation mode "+proc.getInvocationMode());
}
callST.setAttribute("func", mangle(procName));
+ callST.setAttribute("line", getLine(call.getSrc()));
/* Does number of input arguments match */
for (int i = 0; i < proc.sizeOfInputArray(); i++) {
if (proc.getInputArray(i).isOptional())
@@ -1076,6 +1109,7 @@
public StringTemplate function(Function func, VariableScope scope) throws CompilationException {
StringTemplate funcST = template("function");
funcST.setAttribute("name", mangle(func.getName()));
+ funcST.setAttribute("line", getLine(func.getSrc()));
ProcedureSignature funcSignature = functionsMap.get(func.getName());
if(funcSignature == null) {
throw new CompilationException("Unknown function: @"+func.getName());
@@ -1350,7 +1384,7 @@
if(arrayMode) {
actualType = actualType + "[" + indexType + "]";
newst = template("slicearray");
- }
+ }
else {
newst = template("extractstructelement");
}
Modified: trunk/src/org/griphyn/vdl/karajan/ArrayIndexFutureList.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/ArrayIndexFutureList.java 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/ArrayIndexFutureList.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -37,6 +37,7 @@
import org.globus.cog.karajan.workflow.futures.FutureNotYetAvailable;
import org.globus.cog.karajan.workflow.futures.ListenerStackPair;
import org.griphyn.vdl.mapping.ArrayDataNode;
+import org.griphyn.vdl.mapping.DSHandle;
public class ArrayIndexFutureList implements FutureList, FutureWrapper {
private ArrayList<Object> keys;
@@ -107,6 +108,10 @@
public Object getValue() {
return this;
}
+
+ public DSHandle getHandle() {
+ return node;
+ }
public void addModificationAction(FutureListener target, VariableStack stack) {
synchronized(node) {
@@ -114,7 +119,7 @@
listeners = new LinkedList<ListenerStackPair>();
}
listeners.add(new ListenerStackPair(target, stack));
- WaitingThreadsMonitor.addThread(stack);
+ WaitingThreadsMonitor.addThread(stack, node);
if (!node.isClosed()) {
return;
}
@@ -139,7 +144,7 @@
EventBus.post(new Runnable() {
public void run() {
lsp.listener.futureModified(ArrayIndexFutureList.this, lsp.stack);
- }
+ }
});
}
}
Modified: trunk/src/org/griphyn/vdl/karajan/DSHandleFutureWrapper.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/DSHandleFutureWrapper.java 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/DSHandleFutureWrapper.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -53,7 +53,7 @@
listeners = new LinkedList<ListenerStackPair>();
}
listeners.add(new ListenerStackPair(target, stack));
- WaitingThreadsMonitor.addThread(stack);
+ WaitingThreadsMonitor.addThread(stack, node);
if (!node.isClosed()) {
return;
}
Modified: trunk/src/org/griphyn/vdl/karajan/FuturePairIterator.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/FuturePairIterator.java 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/FuturePairIterator.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -110,7 +110,7 @@
}
public synchronized void addModificationAction(FutureListener target, VariableStack stack) {
- WaitingThreadsMonitor.addThread(stack);
+ WaitingThreadsMonitor.addThread(stack, array.getHandle());
array.addModificationAction(target, stack);
}
Modified: trunk/src/org/griphyn/vdl/karajan/HangChecker.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/HangChecker.java 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/HangChecker.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -19,6 +19,14 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -28,6 +36,7 @@
import org.globus.cog.karajan.workflow.ExecutionException;
import org.globus.cog.karajan.workflow.events.EventBus;
import org.globus.cog.karajan.workflow.nodes.grid.SchedulerNode;
+import org.griphyn.vdl.mapping.DSHandle;
public class HangChecker extends TimerTask {
public static final Logger logger = Logger.getLogger(HangChecker.class);
@@ -58,6 +67,7 @@
PrintStream ps = new PrintStream(os);
Monitor.dumpVariables(ps);
Monitor.dumpThreads(ps);
+ findCycles(ps);
logger.warn(os.toString());
ps.close();
}
@@ -68,4 +78,131 @@
logger.warn("Exception caught during hang check", e);
}
}
+
+ private static void findCycles(PrintStream ps) {
+ Map<VariableStack, DSHandle> wt = WaitingThreadsMonitor.getAllThreads();
+ Map<VariableStack, List<DSHandle>> ot = WaitingThreadsMonitor.getOutputs();
+ Map<DSHandle, List<VariableStack>> rwt = new HashMap<DSHandle, List<VariableStack>>();
+
+ for (Map.Entry<VariableStack, DSHandle> e : wt.entrySet()) {
+ List<VariableStack> l = rwt.get(e.getValue());
+ if (l == null) {
+ l = new LinkedList<VariableStack>();
+ rwt.put(e.getValue(), l);
+ }
+ l.add(e.getKey());
+ }
+
+ Set<VariableStack> seen = new HashSet<VariableStack>();
+ LinkedList<Object> cycle = new LinkedList<Object>();
+ List<LinkedList<Object>> cycles = new ArrayList<LinkedList<Object>>();
+ for (VariableStack t : wt.keySet()) {
+ seen.clear();
+ cycle.clear();
+ findLoop(t, rwt, ot, seen, cycle, cycles);
+ }
+
+ cycles = removeDuplicates(cycles);
+
+ if (cycles.size() == 1) {
+ ps.println("Dependency loop found:");
+ }
+ else if (cycles.size() > 1) {
+ ps.println(cycles.size() + " dependency loops found:");
+ }
+ int index = 0;
+ for (LinkedList<Object> c : cycles) {
+ index++;
+ if (cycles.size() > 1) {
+ ps.println("* " + index);
+ }
+ Object prev = c.getLast();
+ for (Object o : c) {
+ if (o instanceof VariableStack) {
+ ps.println("\t" + Monitor.varWithLine((DSHandle) prev) + " is needed by: ");
+ for (String t : Monitor.getSwiftTrace((VariableStack) o)) {
+ ps.println("\t\t" + t);
+ }
+ }
+ else {
+ prev = o;
+ ps.println("\tand produces " + Monitor.varWithLine((DSHandle) o));
+ ps.println();
+ }
+ }
+ }
+ if (cycles.size() > 0) {
+ ps.println("----");
+ }
+ }
+
+ private static List<LinkedList<Object>> removeDuplicates(List<LinkedList<Object>> cycles) {
+ List<LinkedList<Object>> nc = new LinkedList<LinkedList<Object>>();
+ while (!cycles.isEmpty()) {
+ Iterator<LinkedList<Object>> i = cycles.iterator();
+ LinkedList<Object> first = i.next();
+ i.remove();
+
+ while (i.hasNext()) {
+ if (isSameCycle(first, i.next())) {
+ i.remove();
+ }
+ }
+ nc.add(first);
+ }
+ return nc;
+ }
+
+ private static boolean isSameCycle(LinkedList<Object> a, LinkedList<Object> b) {
+ if (a.size() != b.size()) {
+ return false;
+ }
+ Iterator<Object> i = a.iterator();
+ Object o = i.next();
+ Iterator<Object> j = b.iterator();
+ while (j.hasNext()) {
+ if (o == j.next()) {
+ while (i.hasNext()) {
+ if (!j.hasNext()) {
+ j = b.iterator();
+ }
+ if (i.next() != j.next()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static void findLoop(VariableStack t, Map<DSHandle, List<VariableStack>> rwt,
+ Map<VariableStack, List<DSHandle>> ot, Set<VariableStack> seen, LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
+ if (t == null) {
+ return;
+ }
+ if (seen.contains(t)) {
+ cycles.add(new LinkedList<Object>(cycle));
+ return;
+ }
+ cycle.add(t);
+ seen.add(t);
+ // follow all the outputs of t
+ List<DSHandle> l = ot.get(t);
+ if (l != null) {
+ for (DSHandle h : l) {
+ cycle.add(h);
+ List<VariableStack> l2 = rwt.get(h);
+ if (l2 != null) {
+ for (VariableStack t2 : l2) {
+ findLoop(t2, rwt, ot, seen, cycle, cycles);
+ }
+ }
+ cycle.removeLast();
+ }
+ }
+ cycle.removeLast();
+ seen.remove(t);
+ }
+
}
Modified: trunk/src/org/griphyn/vdl/karajan/Monitor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/Monitor.java 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/Monitor.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -28,6 +28,7 @@
import java.awt.event.ActionListener;
import java.awt.event.MouseEvent;
import java.awt.event.MouseListener;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -36,10 +37,13 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.swing.BorderFactory;
import javax.swing.JButton;
@@ -56,6 +60,9 @@
import org.globus.cog.karajan.util.ThreadingContext;
import org.globus.cog.karajan.workflow.events.EventTargetPair;
import org.globus.cog.karajan.workflow.futures.Future;
+import org.globus.cog.karajan.workflow.nodes.FlowElement;
+import org.globus.cog.karajan.workflow.nodes.FlowNode;
+import org.griphyn.vdl.engine.Karajan;
import org.griphyn.vdl.mapping.AbstractDataNode;
import org.griphyn.vdl.mapping.ArrayDataNode;
import org.griphyn.vdl.mapping.DSHandle;
@@ -184,15 +191,15 @@
crtdisp = THREADS;
ArrayList<String> al = new ArrayList<String>();
wt = new ArrayList<VariableStack>();
- Collection<VariableStack> c = WaitingThreadsMonitor.getAllThreads();
- for (VariableStack stack : c) {
+ Map<VariableStack, DSHandle> c = WaitingThreadsMonitor.getAllThreads();
+ for (Map.Entry<VariableStack, DSHandle> entry : c.entrySet()) {
try {
- al.add(String.valueOf(ThreadingContext.get(stack)));
+ al.add(String.valueOf(ThreadingContext.get(entry.getKey())));
}
catch (VariableNotFoundException e1) {
al.add("unknown thread");
}
- wt.add(stack);
+ wt.add(entry.getKey());
}
ThreadModel m = new ThreadModel(al);
@@ -217,44 +224,51 @@
public static void dumpVariables(PrintStream ps) {
ps.println("\nRegistered futures:");
Map<DSHandle, Future> map = FutureTracker.get().getMap();
+ Map<DSHandle, Future> copy;
synchronized (map) {
- for (Map.Entry<DSHandle, Future> en : map.entrySet()) {
- Future f = en.getValue();
- AbstractDataNode handle = (AbstractDataNode) en.getKey();
- String value = "-";
- try {
- if (handle.getValue() != null) {
- value = "";
- }
+ copy = new HashMap<DSHandle, Future>(map);
+ }
+ for (Map.Entry<DSHandle, Future> en : copy.entrySet()) {
+ Future f = en.getValue();
+ AbstractDataNode handle = (AbstractDataNode) en.getKey();
+ String value = "-";
+ try {
+ if (handle.getValue() != null) {
+ value = "";
}
- catch (DependentException e) {
- value = "<dependent exception>";
- }
- catch (Exception e) {
- value = "<exception>";
- }
- try {
- ps.println(handle.getType() + " " + handle.getDisplayableName() + " " + value + " " + f);
- }
- catch (Exception e) {
- ps.println(handle.getDisplayableName() + " - error");
- e.printStackTrace(ps);
- }
}
+ catch (DependentException e) {
+ value = "<dependent exception>";
+ }
+ catch (Exception e) {
+ value = "<exception>";
+ }
+ try {
+ ps.println(handle.getType() + " " + handle.getDisplayableName() + " " + value + " " + f);
+ }
+ catch (Exception e) {
+ ps.println(handle.getDisplayableName() + " - error");
+ e.printStackTrace(ps);
+ }
ps.println("----");
}
}
-
- public void dumpThreads() {
+
+ public void dumpThreads() {
dumpThreads(System.out);
}
public static void dumpThreads(PrintStream pw) {
pw.println("\nWaiting threads:");
- Collection<VariableStack> c = WaitingThreadsMonitor.getAllThreads();
- for (VariableStack stack : c) {
+ Map<VariableStack, DSHandle> c = WaitingThreadsMonitor.getAllThreads();
+ for (Map.Entry<VariableStack, DSHandle> e : c.entrySet()) {
try {
- pw.println(String.valueOf(ThreadingContext.get(stack)));
+ pw.println("Thread: " + String.valueOf(ThreadingContext.get(e.getKey())) + ", waiting on "
+ + varWithLine(e.getValue()));
+
+ for (String t : getSwiftTrace(e.getKey())) {
+ pw.println("\t" + t);
+ }
}
catch (VariableNotFoundException e1) {
pw.println("unknown thread");
@@ -263,7 +277,49 @@
pw.println("----");
}
- public class VariableModel extends AbstractTableModel {
+ public static String varWithLine(DSHandle value) {
+ String line = value.getRoot().getParam("line");
+ return value.getRoot().getParam("dbgname") +
+ (value == value.getRoot() ? "" : value.getPathFromRoot()) +
+ (line == null ? "" : " (declared on line " + line + ")");
+ }
+
+ public static String getLastCall(VariableStack stack) {
+ List<Object> trace = Trace.getAsList(stack);
+ for (Object o : trace) {
+ if (o instanceof FlowNode) {
+ FlowNode n = (FlowNode) o;
+ String traceLine = (String) n.getProperty("_traceline");
+ if (traceLine != null) {
+ return(Karajan.demangle(n.getTextualName()) + ", " +
+ fileName(n) + ", line " + traceLine);
+ }
+ }
+ }
+ return "?";
+ }
+
+ public static List<String> getSwiftTrace(VariableStack stack) {
+ List<String> ret = new ArrayList<String>();
+ List<Object> trace = Trace.getAsList(stack);
+ for (Object o : trace) {
+ if (o instanceof FlowNode) {
+ FlowNode n = (FlowNode) o;
+ String traceLine = (String) n.getProperty("_traceline");
+ if (traceLine != null) {
+ ret.add(Karajan.demangle(n.getTextualName()) + ", " +
+ fileName(n) + ", line " + traceLine);
+ }
+ }
+ }
+ return ret;
+ }
+
+ private static String fileName(FlowNode n) {
+ return new File((String) FlowNode.getTreeProperty(FlowElement.FILENAME, n)).getName().replace(".kml", ".swift");
+ }
+
+ public class VariableModel extends AbstractTableModel {
private List<Object[]> l;
public VariableModel(List<List<Object>> lp) {
Modified: trunk/src/org/griphyn/vdl/karajan/WaitingThreadsMonitor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/WaitingThreadsMonitor.java 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/WaitingThreadsMonitor.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -20,39 +20,52 @@
*/
package org.griphyn.vdl.karajan;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.globus.cog.karajan.stack.VariableStack;
+import org.griphyn.vdl.mapping.DSHandle;
public class WaitingThreadsMonitor {
- private static Set<VariableStack> threads;
+ private static Map<VariableStack, DSHandle> threads = new HashMap<VariableStack, DSHandle>();
+ private static Map<VariableStack, List<DSHandle>> outputs = new HashMap<VariableStack, List<DSHandle>>();;
- public synchronized static void addThread(VariableStack stack) {
+ public static void addThread(VariableStack stack, DSHandle waitingOn) {
if (stack != null) {
- getThreads().add(stack);
+ synchronized(threads) {
+ threads.put(stack, waitingOn);
+ }
}
}
-
- private static synchronized Set<VariableStack> getThreads() {
- if (threads == null) {
- threads = new HashSet<VariableStack>();
- }
- return threads;
+
+ public static void removeThread(VariableStack stack) {
+ synchronized(threads) {
+ threads.remove(stack);
+ }
}
- public synchronized static void removeThread(VariableStack stack) {
- getThreads().remove(stack);
+ public static Map<VariableStack, DSHandle> getAllThreads() {
+ synchronized(threads) {
+ return new HashMap<VariableStack, DSHandle>(threads);
+ }
}
-
- public synchronized static Collection<VariableStack> getAllThreads() {
- if (threads == null) {
- return Collections.emptySet();
- }
- else {
- return new HashSet<VariableStack>(threads);
- }
- }
+
+ public static void addOutput(VariableStack stack, List<DSHandle> outputs) {
+ synchronized(outputs) {
+ WaitingThreadsMonitor.outputs.put(stack, outputs);
+ }
+ }
+
+ public static void removeOutput(VariableStack stack) {
+ synchronized(outputs) {
+ outputs.remove(stack);
+ }
+ }
+
+ public static Map<VariableStack, List<DSHandle>> getOutputs() {
+ synchronized(outputs) {
+ return new HashMap<VariableStack, List<DSHandle>>(outputs);
+ }
+ }
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/New.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/New.java 2012-07-12 21:36:00 UTC (rev 5830)
+++ trunk/src/org/griphyn/vdl/karajan/lib/New.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -52,7 +52,7 @@
static {
setArguments(New.class,
- new Arg[] { OA_TYPE, OA_MAPPING, OA_VALUE, OA_DBGNAME, OA_WAITFOR, });
+ new Arg[] { OA_TYPE, OA_MAPPING, OA_VALUE, OA_DBGNAME, OA_WAITFOR});
}
public Object function(VariableStack stack) throws ExecutionException {
@@ -63,14 +63,19 @@
(Map<String,Object>) OA_MAPPING.getValue(stack);
String dbgname = TypeUtil.toString(OA_DBGNAME.getValue(stack));
String waitfor = (String) OA_WAITFOR.getValue(stack);
+ String line = (String) getProperty("_defline");
if (mapping == null) {
- mapping = new HashMap<String,Object>();
+ mapping = new HashMap<String, Object>();
}
if (dbgname != null) {
mapping.put("dbgname", dbgname);
}
+
+ if (line != null) {
+ mapping.put("line", line);
+ }
mapping.put("swift#restartid", getThreadPrefix(stack) + ":" + dbgname);
Added: trunk/src/org/griphyn/vdl/karajan/lib/UnitEnd.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/UnitEnd.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/UnitEnd.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -0,0 +1,45 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 13, 2012
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.nodes.FlowNode;
+import org.griphyn.vdl.karajan.WaitingThreadsMonitor;
+
+public class UnitEnd extends FlowNode {
+ public static final Arg.Positional TYPE = new Arg.Positional("type");
+ public static final Arg.Optional NAME = new Arg.Optional("name", null);
+ public static final Arg.Optional LINE = new Arg.Optional("line", null);
+
+ @Override
+ public void execute(VariableStack stack) throws ExecutionException {
+ executeSimple(stack);
+ complete(stack);
+ }
+
+ @Override
+ public boolean isSimple() {
+ return super.isSimple();
+ }
+
+ @Override
+ public void executeSimple(VariableStack stack) throws ExecutionException {
+ String type = (String) TYPE.getStatic(this);
+ ThreadingContext thread = ThreadingContext.get(stack);
+ String name = (String) NAME.getStatic(this);
+ String line = (String) LINE.getStatic(this);
+
+ UnitStart.log(false, type, thread, name, line);
+ WaitingThreadsMonitor.removeOutput(stack);
+ }
+}
Added: trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java 2012-07-14 06:13:42 UTC (rev 5831)
@@ -0,0 +1,95 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 13, 2012
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.nodes.FlowNode;
+import org.griphyn.vdl.karajan.WaitingThreadsMonitor;
+import org.griphyn.vdl.mapping.DSHandle;
+
+public class UnitStart extends FlowNode {
+ // keep compatibility with log()
+ public static final Logger logger = Logger.getLogger("swift");
+
+ public static final Arg.Positional TYPE = new Arg.Positional("type");
+ public static final Arg.Optional NAME = new Arg.Optional("name", null);
+ public static final Arg.Optional LINE = new Arg.Optional("line", null);
+ public static final Arg.Optional OUTPUTS = new Arg.Optional("outputs", null);
+
+ @Override
+ public void execute(VariableStack stack) throws ExecutionException {
+ executeSimple(stack);
+ complete(stack);
+ }
+
+ @Override
+ public boolean isSimple() {
+ return super.isSimple();
+ }
+
+ @Override
+ public void executeSimple(VariableStack stack) throws ExecutionException {
+ String type = (String) TYPE.getStatic(this);
+ ThreadingContext thread = ThreadingContext.get(stack);
+ String name = (String) NAME.getStatic(this);
+ String line = (String) LINE.getStatic(this);
+
+ log(true, type, thread, name, line);
+
+ String outputs = (String) OUTPUTS.getStatic(this);
+ if (outputs != null) {
+ trackOutputs(stack, outputs);
+ }
+ }
+
+ private void trackOutputs(VariableStack stack, String outputs) {
+ String[] names = outputs.split(",");
+ List<DSHandle> l = new LinkedList<DSHandle>();
+ for (String name : names) {
+ l.add((DSHandle) stack.parentFrame().getVar(name));
+ }
+ WaitingThreadsMonitor.addOutput(stack, l);
+ }
+
+ protected static void log(boolean start, String type, ThreadingContext thread, String name, String line) {
+ if (type.equals("COMPOUND")) {
+ logger.info((start ? "START" : "END") + type + " thread=" + thread + " name=" + name);
+ }
+ else if (type.equals("PROCEDURE")) {
+ if (start) {
+ logger.debug("PROCEDURE line=" + line + " thread=" + thread + " name=" + name);
+ }
+ else {
+ logger.debug("PROCEDURE_END line=" + line + " thread=" + thread + " name=" + name);
+ }
+ }
+ else if (type.equals("FOREACH_IT")) {
+ logger.debug("FOREACH_IT_" + (start ? "START" : "END") + " line=" + line + " thread=" + thread);
+ if (start) {
+ logger.debug("SCOPE thread=" + thread);
+ }
+ }
+ else if (type.equals("INTERNALPROC")) {
+ logger.debug("INTERNALPROC_" + (start ? "START" : "END") + "thread=" + thread + " name=" + name);
+ }
+ else if (type.equals("CONDITION_BLOCK")) {
+ if (start) {
+ logger.debug("SCOPE thread=" + thread);
+ }
+ }
+ }
+}
More information about the Swift-commit
mailing list