[Swift-commit] r5835 - in trunk: resources src/org/griphyn/vdl/engine src/org/griphyn/vdl/karajan src/org/griphyn/vdl/karajan/lib
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Sat Jul 14 19:06:37 CDT 2012
Author: hategan
Date: 2012-07-14 19:06:37 -0500 (Sat, 14 Jul 2012)
New Revision: 5835
Modified:
trunk/resources/Karajan.stg
trunk/resources/swiftscript.stg
trunk/resources/swiftscript.xsd
trunk/src/org/griphyn/vdl/engine/Karajan.java
trunk/src/org/griphyn/vdl/engine/VariableScope.java
trunk/src/org/griphyn/vdl/karajan/HangChecker.java
trunk/src/org/griphyn/vdl/karajan/Monitor.java
trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java
Log:
detect more complex dependency loops (such as those cause by partial writes); don't close outputs of procs if they are not partial (since the proc does it itself)
Modified: trunk/resources/Karajan.stg
===================================================================
--- trunk/resources/Karajan.stg 2012-07-15 00:01:19 UTC (rev 5834)
+++ trunk/resources/Karajan.stg 2012-07-15 00:06:37 UTC (rev 5835)
@@ -210,9 +210,9 @@
</swiftscript:$name$>
>>
-iterate(declarations,statements,cond,var,cleanups) ::= <<
+iterate(declarations,statements,cond,var,cleanups,trace,line) ::= <<
-<vdl:infinitecountingwhile var="$var$">
+<vdl:infinitecountingwhile var="$var$" $if(trace)$ _traceline="$line$"$endif$>
$sub_comp(declarations=declarations, statements=statements, cleanups=cleanups)$
<sys:if>
<vdl:getfieldvalue>$cond$</vdl:getfieldvalue>
@@ -222,8 +222,8 @@
</vdl:infinitecountingwhile>
>>
-foreach(var,in,indexVar,indexVarType,declarations,statements,line,selfClose,cleanups) ::= <<
-<vdl:tparallelFor name="\$"$if(selfClose)$ selfClose="$selfClose$"$endif$>
+foreach(var,in,indexVar,indexVarType,declarations,statements,line,selfClose,cleanups,trace) ::= <<
+<vdl:tparallelFor name="\$"$if(selfClose)$ selfClose="$selfClose$"$endif$ $if(trace)$ _traceline="$line$"$endif$>
<getarrayiterator>$in$</getarrayiterator>
<set names="\$\$, $var$">
<each items="{\$}"/>
@@ -364,8 +364,8 @@
<vdl:parameter name="$name$">$expr$</vdl:parameter>
>>
-assign(var,value) ::= <<
- <vdl:setfieldvalue>
+assign(var,value,line) ::= <<
+ <vdl:setfieldvalue $if(line)$_traceline="$line$"$else$_traceline="-1"$endif$>
$var$
$value$
</vdl:setfieldvalue>
@@ -442,8 +442,8 @@
</if>
>>
-if(condition,vthen,velse) ::= <<
-<if>
+if(condition,vthen,velse,line,trace) ::= <<
+<if $if(trace)$ _traceline="$line$"$endif$>
<vdl:getfieldvalue>$condition$</vdl:getfieldvalue>
<then>
<unitStart type="CONDITION_BLOCK"/>
@@ -507,7 +507,13 @@
<partialCloseDataset var="{$var$}" closeID="$closeID$" />
>>
+unitStart(type, outputs) ::= <<
+ <unitStart type="$type$" outputs="$outputs$"/>
+>>
+unitEnd(type) ::= <<
+ <unitEnd type="$type$"/>
+>>
operator ::= [
"+":"vdlop:sum",
Modified: trunk/resources/swiftscript.stg
===================================================================
--- trunk/resources/swiftscript.stg 2012-07-15 00:01:19 UTC (rev 5834)
+++ trunk/resources/swiftscript.stg 2012-07-15 00:06:37 UTC (rev 5835)
@@ -204,7 +204,7 @@
>>
if(cond,body,els,sourcelocation) ::= <<
-<if>
+<if src="$sourcelocation$">
$cond$
<then>
$body$
@@ -247,7 +247,7 @@
>>
iterate(cond,body,var,sourcelocation) ::= <<
-<iterate var="$var$">
+<iterate var="$var$" src="$sourcelocation$">
<body>
$body$
</body>
Modified: trunk/resources/swiftscript.xsd
===================================================================
--- trunk/resources/swiftscript.xsd 2012-07-15 00:01:19 UTC (rev 5834)
+++ trunk/resources/swiftscript.xsd 2012-07-15 00:06:37 UTC (rev 5835)
@@ -484,6 +484,8 @@
item</xs:documentation>
</xs:annotation>
</xs:attribute>
+
+ <xs:attribute name="src" type="xs:string" />
</xs:complexType>
@@ -506,6 +508,8 @@
</xs:complexType>
</xs:element>
</xs:sequence>
+
+ <xs:attribute name="src" type="xs:string" />
</xs:complexType>
Modified: trunk/src/org/griphyn/vdl/engine/Karajan.java
===================================================================
--- trunk/src/org/griphyn/vdl/engine/Karajan.java 2012-07-15 00:01:19 UTC (rev 5834)
+++ trunk/src/org/griphyn/vdl/engine/Karajan.java 2012-07-15 00:06:37 UTC (rev 5835)
@@ -574,6 +574,7 @@
" to a variable of type " + datatype(varST));
assignST.setAttribute("var", varST);
assignST.setAttribute("value", valueST);
+ assignST.setAttribute("line", getLine(assign.getSrc()));
String rootvar = abstractExpressionToRootVariable(assign.getAbstractExpressionArray(0));
scope.addWriter(rootvar, new Integer(callID++), rootVariableIsPartial(assign.getAbstractExpressionArray(0)));
scope.appendStatement(assignST);
@@ -817,8 +818,7 @@
ActualParameter output = call.getOutputArray(i);
StringTemplate argST = actualParameter(output, scope);
callST.setAttribute("outputs", argST);
- String rootvar = abstractExpressionToRootVariable(call.getOutputArray(i).getAbstractExpression());
- scope.addWriter(rootvar, new Integer(callID++), rootVariableIsPartial(call.getOutputArray(i).getAbstractExpression()));
+ addWriterToScope(scope, call.getOutputArray(i).getAbstractExpression());
}
}
if (keywordArgsOutput) {
@@ -837,8 +837,7 @@
throw new CompilationException("Wrong type for output parameter number " + i +
", expected " + formalType + ", got " + actualType);
- String rootvar = abstractExpressionToRootVariable(call.getOutputArray(i).getAbstractExpression());
- scope.addWriter(rootvar, new Integer(callID++), rootVariableIsPartial(call.getOutputArray(i).getAbstractExpression()));
+ addWriterToScope(scope, call.getOutputArray(i).getAbstractExpression());
}
} else { /* Positional arguments */
for (int i = 0; i < call.sizeOfOutputArray(); i++) {
@@ -855,8 +854,7 @@
throw new CompilationException("Wrong type for parameter number " + i +
", expected " + formalType + ", got " + actualType);
- String rootvar = abstractExpressionToRootVariable(call.getOutputArray(i).getAbstractExpression());
- scope.addWriter(rootvar, new Integer(callID++), rootVariableIsPartial(call.getOutputArray(i).getAbstractExpression()));
+ addWriterToScope(scope, call.getOutputArray(i).getAbstractExpression());
}
}
@@ -869,6 +867,16 @@
}
}
+ private void addWriterToScope(VariableScope scope, XmlObject var) throws CompilationException {
+ String rootvar = abstractExpressionToRootVariable(var);
+ boolean partial = rootVariableIsPartial(var);
+ if (!partial) {
+ // don't close variables that are already closed by the function itself
+ scope.inhibitClosing(rootvar);
+ }
+ scope.addWriter(rootvar, new Integer(callID++), partial);
+ }
+
public void iterateStat(Iterate iterate, VariableScope scope) throws CompilationException {
VariableScope loopScope = new VariableScope(this, scope, VariableScope.ENCLOSURE_LOOP);
VariableScope innerScope = new VariableScope(this, loopScope, VariableScope.ENCLOSURE_LOOP);
@@ -876,6 +884,7 @@
loopScope.addVariable(iterate.getVar(), "int");
StringTemplate iterateST = template("iterate");
+ iterateST.setAttribute("line", getLine(iterate.getSrc()));
iterateST.setAttribute("var", iterate.getVar());
innerScope.bodyTemplate = iterateST;
@@ -945,13 +954,14 @@
StringTemplate ifST = template("if");
StringTemplate conditionST = expressionToKarajan(ifstat.getAbstractExpression(), scope);
ifST.setAttribute("condition", conditionST.toString());
+ ifST.setAttribute("line", getLine(ifstat.getSrc()));
if (!datatype(conditionST).equals("boolean"))
throw new CompilationException ("Condition in if statement has to be of boolean type.");
Then thenstat = ifstat.getThen();
Else elsestat = ifstat.getElse();
- VariableScope innerThenScope = new VariableScope(this, scope);
+ VariableScope innerThenScope = new VariableScope(this, scope, VariableScope.ENCLOSURE_CONDITION);
innerThenScope.bodyTemplate = template("sub_comp");
ifST.setAttribute("vthen", innerThenScope.bodyTemplate);
Modified: trunk/src/org/griphyn/vdl/engine/VariableScope.java
===================================================================
--- trunk/src/org/griphyn/vdl/engine/VariableScope.java 2012-07-15 00:01:19 UTC (rev 5834)
+++ trunk/src/org/griphyn/vdl/engine/VariableScope.java 2012-07-15 00:06:37 UTC (rev 5835)
@@ -47,7 +47,12 @@
/** permit no access to the containing scope except for finding
global variables */
public static final int ENCLOSURE_PROCEDURE = 301926;
-
+
+ /** Override ENCLOSURE_LOOP to allow assignment inside a loop
+ * based on some condition
+ */
+ public static final int ENCLOSURE_CONDITION = 301927;
+
public static final Logger logger = Logger.getLogger(VariableScope.class);
/** need this for the program as a whole. probably should factor
@@ -66,6 +71,9 @@
/** The string template in which we will store statements
outputted into this scope. */
public StringTemplate bodyTemplate;
+
+ private List<String> outputs = new ArrayList<String>();
+ private Set<String> inhibitClosing = new HashSet<String>();
public VariableScope(Karajan c, VariableScope parent) {
this(c,parent,ENCLOSURE_ALL);
@@ -114,6 +122,10 @@
public void addVariable(String name, String type) throws CompilationException {
addVariable(name,type,false);
}
+
+ public void inhibitClosing(String name) {
+ inhibitClosing.add(name);
+ }
public void addVariable(String name, String type, boolean global) throws CompilationException {
logger.debug("Adding variable "+name+" of type "+type+" to scope "+hashCode());
@@ -194,6 +206,10 @@
// in the same scope here
public boolean isVariableWriteable(String name, boolean partialWriter) {
if(isVariableLocallyDefined(name)) return true;
+ if(parentScope != null && parentScope.isVariableWriteable(name, true) && enclosureType == ENCLOSURE_CONDITION) {
+ logger.warn("Variable " + name + " might have multiple writers");
+ return true;
+ }
if(parentScope != null && parentScope.isVariableWriteable(name, partialWriter) && enclosureType == ENCLOSURE_ALL) return true;
if(parentScope != null && parentScope.isVariableWriteable(name, partialWriter) && enclosureType == ENCLOSURE_LOOP && partialWriter) return true;
return false;
@@ -206,9 +222,9 @@
/** List of templates to be executed in sequence after the present
in-preparation statement is outputted. */
- List<StringTemplate> presentStatementPostStatements = Collections.synchronizedList(new ArrayList<StringTemplate>());
+ List<StringTemplate> presentStatementPostStatements = new ArrayList<StringTemplate>();
- Map<String, Variable> variableUsage = Collections.synchronizedMap(new HashMap<String, Variable>());
+ Map<String, Variable> variableUsage = new HashMap<String, Variable>();
/** indicates that the present in-preparation statement writes to the
named variable. If the variable is declared in the local scope,
@@ -231,10 +247,14 @@
} else {
logger.debug("Variable "+variableName+" is local but has no template.");
}
- StringTemplate postST = compiler.template("partialclose");
- postST.setAttribute("var", variableName);
- postST.setAttribute("closeID", closeID);
- presentStatementPostStatements.add(postST);
+ outputs.add(variableName);
+ if (!inhibitClosing.contains(variableName)) {
+ StringTemplate postST = compiler.template("partialclose");
+ postST.setAttribute("var", variableName);
+ postST.setAttribute("closeID", closeID);
+
+ presentStatementPostStatements.add(postST);
+ }
} else {
// TODO now we have to walk up the scopes until either we find the
@@ -258,6 +278,7 @@
}
logger.debug("added "+closeID+" to variable "+variableName+" in scope "+hashCode());
} else {
+ isVariableWriteable(variableName, partialWriter);
throw new CompilationException("variable "+variableName+" is not writeable in this scope");
}
}
@@ -310,18 +331,46 @@
void appendStatement(StringTemplate statementST) {
StringTemplate wrapperST = compiler.template("sequential");
bodyTemplate.setAttribute("statements", wrapperST);
+ if (!outputs.isEmpty()) {
+ StringTemplate unitStart = compiler.template("unitStart");
+ unitStart.setAttribute("type", "SCOPE");
+ unitStart.setAttribute("outputs", join(outputs));
+ wrapperST.setAttribute("statements", unitStart);
+ StringTemplate unitEnd = compiler.template("unitEnd");
+ unitEnd.setAttribute("type", "SCOPE");
+ presentStatementPostStatements.add(unitEnd);
+ if ("foreach".equals(statementST.getName()) ||
+ "if".equals(statementST.getName()) ||
+ "iterate".equals(statementST.getName())) {
+ statementST.setAttribute("trace", Boolean.TRUE);
+ }
+ }
wrapperST.setAttribute("statements",statementST);
Iterator<StringTemplate> it = presentStatementPostStatements.iterator();
while(it.hasNext()) {
wrapperST.setAttribute("statements", it.next());
}
- presentStatementPostStatements = Collections.synchronizedList(new ArrayList<StringTemplate>());
+ presentStatementPostStatements.clear();
+ outputs.clear();
+ inhibitClosing.clear();
}
- /** Stores information about a variable that is referred to in this
+ private String join(List<String> l) {
+ StringBuilder sb = new StringBuilder();
+ Iterator<String> i = l.iterator();
+ while (i.hasNext()) {
+ sb.append(i.next());
+ if (i.hasNext()) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
+
+ /** Stores information about a variable that is referred to in this
scope. Should probably get used for dataset marking eventually. */
class Variable {
- List<Object> writingStatements = Collections.synchronizedList(new ArrayList<Object>());
+ List<Object> writingStatements = new ArrayList<Object>();
}
Iterator<String> getVariableIterator() {
Modified: trunk/src/org/griphyn/vdl/karajan/HangChecker.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/HangChecker.java 2012-07-15 00:01:19 UTC (rev 5834)
+++ trunk/src/org/griphyn/vdl/karajan/HangChecker.java 2012-07-15 00:06:37 UTC (rev 5835)
@@ -32,10 +32,11 @@
import org.apache.log4j.Logger;
import org.globus.cog.karajan.scheduler.WeightedHostScoreScheduler;
+import org.globus.cog.karajan.stack.VariableNotFoundException;
import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.ThreadingContext;
import org.globus.cog.karajan.workflow.ExecutionException;
import org.globus.cog.karajan.workflow.events.EventBus;
-import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
import org.globus.cog.karajan.workflow.nodes.grid.SchedulerNode;
import org.griphyn.vdl.mapping.AbstractDataNode;
import org.griphyn.vdl.mapping.DSHandle;
@@ -44,6 +45,7 @@
public static final Logger logger = Logger.getLogger(HangChecker.class);
public static final int CHECK_INTERVAL = 10000;
+ public static final int MAX_CYCLES = 10;
private Timer timer;
private long lastEventCount;
private VariableStack stack;
@@ -94,6 +96,8 @@
}
l.add(e.getKey());
}
+
+ System.out.print("Finding dependency loops...");
Set<VariableStack> seen = new HashSet<VariableStack>();
LinkedList<Object> cycle = new LinkedList<Object>();
@@ -103,9 +107,8 @@
cycle.clear();
findLoop(t, rwt, ot, seen, cycle, cycles);
}
+ System.out.println();
- cycles = removeDuplicates(cycles);
-
if (cycles.size() == 1) {
ps.println("Dependency loop found:");
}
@@ -121,38 +124,39 @@
Object prev = c.getLast();
for (Object o : c) {
if (o instanceof VariableStack) {
- ps.println("\t" + Monitor.varWithLine((DSHandle) prev) + " is needed by: ");
+ if (prev != null) {
+ ps.println("\t" + Monitor.varWithLine((DSHandle) prev) + " is needed by: ");
+ }
+ else {
+ ps.println("\tthe above must complete before the block below can complete:");
+ }
for (String t : Monitor.getSwiftTrace((VariableStack) o)) {
- ps.println("\t\t" + t);
+ ps.println("\t\t" + t);
}
}
- else {
+ else {
prev = o;
- ps.println("\twhich produces " + Monitor.varWithLine((DSHandle) o));
+ if (o != null) {
+ ps.println("\twhich produces " + Monitor.varWithLine((DSHandle) o));
+ }
ps.println();
}
}
}
+
+ // TODO: fail the loops
if (cycles.size() > 0) {
ps.println("----");
}
}
-
- private static List<LinkedList<Object>> removeDuplicates(List<LinkedList<Object>> cycles) {
- List<LinkedList<Object>> nc = new LinkedList<LinkedList<Object>>();
- while (!cycles.isEmpty()) {
- Iterator<LinkedList<Object>> i = cycles.iterator();
- LinkedList<Object> first = i.next();
- i.remove();
-
- while (i.hasNext()) {
- if (isSameCycle(first, i.next())) {
- i.remove();
- }
- }
- nc.add(first);
- }
- return nc;
+
+ private static boolean isDuplicate(List<LinkedList<Object>> cycles, LinkedList<Object> cycle) {
+ for (LinkedList<Object> c : cycles) {
+ if (isSameCycle(c, cycle)) {
+ return true;
+ }
+ }
+ return false;
}
private static boolean isSameCycle(LinkedList<Object> a, LinkedList<Object> b) {
@@ -163,12 +167,12 @@
Object o = i.next();
Iterator<Object> j = b.iterator();
while (j.hasNext()) {
- if (o == j.next()) {
+ if (sameTraces(o, j.next())) {
while (i.hasNext()) {
if (!j.hasNext()) {
j = b.iterator();
}
- if (i.next() != j.next()) {
+ if (!sameTraces(i.next(), j.next())) {
return false;
}
}
@@ -178,18 +182,85 @@
return false;
}
+ private static boolean sameTraces(Object a, Object b) {
+ if (a instanceof DSHandle) {
+ return a == b;
+ }
+ if (b instanceof DSHandle) {
+ return false;
+ }
+ if (a == null || b == null) {
+ return a == b;
+ }
+ VariableStack sa = (VariableStack) a;
+ VariableStack sb = (VariableStack) b;
+
+ List<Object> ta = Monitor.getSwiftTraceElements(sa);
+ List<Object> tb = Monitor.getSwiftTraceElements(sb);
+
+ if (ta.size() != tb.size()) {
+ return false;
+ }
+ for (int i = 0; i < ta.size(); i++) {
+ if (ta.get(i) != tb.get(i)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
private static void findLoop(VariableStack t, Map<DSHandle, List<VariableStack>> rwt,
Map<VariableStack, List<DSHandle>> ot, Set<VariableStack> seen, LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
+ if (cycles.size() > MAX_CYCLES) {
+ return;
+ }
if (t == null) {
return;
}
if (seen.contains(t)) {
- cycles.add(new LinkedList<Object>(cycle));
+ // remove things up to t in the cycle since they are just lead-ins
+ LinkedList<Object> lc = new LinkedList<Object>(cycle);
+ while (lc.getFirst() != t) {
+ lc.removeFirst();
+ }
+ if (!isDuplicate(cycles, lc)) {
+ cycles.add(new LinkedList<Object>(lc));
+ System.out.print(".");
+ }
return;
}
cycle.add(t);
seen.add(t);
// follow all the outputs of t
+ followOutputs(t, rwt, ot, seen, cycle, cycles);
+
+ // now follow all the outputs of parent threads to t
+ try {
+ ThreadingContext tc = ThreadingContext.get(t);
+ for (VariableStack stk : ot.keySet()) {
+ if (tc.isStrictlySubContext(ThreadingContext.get(stk))) {
+ seen.add(stk);
+ cycle.add(null);
+ cycle.add(stk);
+ followOutputs(stk, rwt, ot, seen, cycle, cycles);
+ cycle.removeLast();
+ cycle.removeLast();
+ seen.remove(stk);
+ }
+ }
+ }
+ catch (VariableNotFoundException e) {
+ e.printStackTrace();
+ }
+ cycle.removeLast();
+ seen.remove(t);
+ }
+
+ private static void followOutputs(VariableStack t,
+ Map<DSHandle, List<VariableStack>> rwt,
+ Map<VariableStack, List<DSHandle>> ot, Set<VariableStack> seen,
+ LinkedList<Object> cycle, List<LinkedList<Object>> cycles) {
List<DSHandle> l = ot.get(t);
if (l != null) {
for (DSHandle h : l) {
@@ -203,8 +274,5 @@
cycle.removeLast();
}
}
- cycle.removeLast();
- seen.remove(t);
}
-
}
Modified: trunk/src/org/griphyn/vdl/karajan/Monitor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/Monitor.java 2012-07-15 00:01:19 UTC (rev 5834)
+++ trunk/src/org/griphyn/vdl/karajan/Monitor.java 2012-07-15 00:06:37 UTC (rev 5835)
@@ -314,6 +314,21 @@
}
return ret;
}
+
+ public static List<Object> getSwiftTraceElements(VariableStack stack) {
+ List<Object> ret = new ArrayList<Object>();
+ List<Object> trace = Trace.getAsList(stack);
+ for (Object o : trace) {
+ if (o instanceof FlowNode) {
+ FlowNode n = (FlowNode) o;
+ String traceLine = (String) n.getProperty("_traceline");
+ if (traceLine != null) {
+ ret.add(o);
+ }
+ }
+ }
+ return ret;
+ }
private static String fileName(FlowNode n) {
return new File((String) FlowNode.getTreeProperty(FlowElement.FILENAME, n)).getName().replace(".kml", ".swift");
Modified: trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java 2012-07-15 00:01:19 UTC (rev 5834)
+++ trunk/src/org/griphyn/vdl/karajan/lib/UnitStart.java 2012-07-15 00:06:37 UTC (rev 5835)
@@ -14,6 +14,7 @@
import org.apache.log4j.Logger;
import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableNotFoundException;
import org.globus.cog.karajan.stack.VariableStack;
import org.globus.cog.karajan.util.ThreadingContext;
import org.globus.cog.karajan.workflow.ExecutionException;
@@ -52,15 +53,25 @@
String outputs = (String) OUTPUTS.getStatic(this);
if (outputs != null) {
- trackOutputs(stack, outputs);
+ trackOutputs(stack, outputs, "SCOPE".equals(type));
}
}
- private void trackOutputs(VariableStack stack, String outputs) {
+ private void trackOutputs(VariableStack stack, String outputs, boolean deep) {
String[] names = outputs.split(",");
List<DSHandle> l = new LinkedList<DSHandle>();
for (String name : names) {
- l.add((DSHandle) stack.parentFrame().getVar(name));
+ if (deep) {
+ try {
+ l.add((DSHandle) stack.getVar(name));
+ }
+ catch (VariableNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+ else {
+ l.add((DSHandle) stack.parentFrame().getVar(name));
+ }
}
WaitingThreadsMonitor.addOutput(stack, l);
}
More information about the Swift-commit
mailing list