[Swift-commit] r2093 - in trunk: libexec src/org/griphyn/vdl/karajan/lib src/org/griphyn/vdl/mapping

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Mon Jul 7 12:21:04 CDT 2008


Author: benc
Date: 2008-07-07 12:21:04 -0500 (Mon, 07 Jul 2008)
New Revision: 2093

Added:
   trunk/src/org/griphyn/vdl/karajan/lib/IsRestartable.java
Modified:
   trunk/libexec/execute-default.k
   trunk/libexec/execute-dryrun.k
   trunk/libexec/execute-typecheck.k
   trunk/libexec/vdl-int.k
   trunk/libexec/vdl-lib.xml
   trunk/libexec/vdl.k
   trunk/src/org/griphyn/vdl/mapping/AbstractDataNode.java
   trunk/src/org/griphyn/vdl/mapping/DSHandle.java
Log:
distinguish between file-mapped and restartable datasets, ready for non-file-based datasets

Modified: trunk/libexec/execute-default.k
===================================================================
--- trunk/libexec/execute-default.k	2008-07-07 16:57:50 UTC (rev 2092)
+++ trunk/libexec/execute-default.k	2008-07-07 17:21:04 UTC (rev 2093)
@@ -1,9 +1,9 @@
-element([tr, optional(arguments, stdin, stdout, stderr, deperror, mdeperror), channel(stagein), channel(stageout)]
+element([tr, optional(arguments, stdin, stdout, stderr, deperror, mdeperror), channel(stagein), channel(stageout), channel(restartout)]
 
 	vdl:initprogressstate()
 	vdl:setprogress("Initializing")
 
-	done := isDone(stageout)
+	done := isDone(restartout)
 	derr := try(deperror, false)
 	merr := try(mdeperror, false)
 
@@ -23,13 +23,13 @@
 									execute2(
 										tr, maybe(arguments=arguments), 
 										maybe(stdin=stdin), maybe(stdout=stdout), maybe(stderr=stderr), 
-										stagein, stageout, replicationGroup, replicationChannel
+										stagein, stageout, restartout, replicationGroup, replicationChannel
 									)
 									catch("^Abort$")
 								)
 							)
 						)
-						mark(stageout, err=false, mapping=false)
+						mark(restartout, err=false, mapping=false)
 						graphStuff(tr, stagein, stageout, err=false, maybe(args=arguments))
 						echo("{tr} completed")
 						log(LOG:INFO, "END_SUCCESS thread={#thread} tr={tr}")
@@ -46,7 +46,7 @@
 							else (
 								to(errors, exception)
 								log(LOG:INFO, exception)
-								mark(stageout, err=true, mapping=false)
+								mark(restartout, err=true, mapping=false)
 								graphStuff(tr, stagein, stageout, err=true, maybe(args=arguments))
 							)
 						)
@@ -68,7 +68,7 @@
 					log(LOG:INFO, exception)
 				)
 			)
-			mark(stageout, err=true, mapping=merr)
+			mark(restartout, err=true, mapping=merr)
 			graphStuff(tr, stagein, stageout, err=true, maybe(args=arguments))
 		)
 	)

Modified: trunk/libexec/execute-dryrun.k
===================================================================
--- trunk/libexec/execute-dryrun.k	2008-07-07 16:57:50 UTC (rev 2092)
+++ trunk/libexec/execute-dryrun.k	2008-07-07 17:21:04 UTC (rev 2093)
@@ -1,4 +1,4 @@
-element([tr, optional(arguments, stdin, stdout, stderr, deperror), channel(stagein), channel(stageout)]
+element([tr, optional(arguments, stdin, stdout, stderr, deperror), channel(stagein), channel(stageout), channel(restartout)]
 
 	done := isDone(stageout)
 				

Modified: trunk/libexec/execute-typecheck.k
===================================================================
--- trunk/libexec/execute-typecheck.k	2008-07-07 16:57:50 UTC (rev 2092)
+++ trunk/libexec/execute-typecheck.k	2008-07-07 17:21:04 UTC (rev 2093)
@@ -1,3 +1,3 @@
-element([tr, optional(arguments, stdin, stdout, stderr), channel(stagein), channel(stageout)]
+element([tr, optional(arguments, stdin, stdout, stderr), channel(stagein), channel(stageout), channel(restartout)]
 	mark(stageout, err=false)
 )

Modified: trunk/libexec/vdl-int.k
===================================================================
--- trunk/libexec/vdl-int.k	2008-07-07 16:57:50 UTC (rev 2092)
+++ trunk/libexec/vdl-int.k	2008-07-07 17:21:04 UTC (rev 2093)
@@ -41,9 +41,9 @@
 			)
 		)
 			
-		element(mark, [stageout, err, optional(mapping)]
+		element(mark, [restarts, err, optional(mapping)]
 			if(
-				err for(pv, stageout
+				err for(pv, restarts
 					[path, var] := each(pv)
 					vdl:setFutureFault(var, path=path, mapping=mapping)
 				)
@@ -246,7 +246,6 @@
 					    task:transfer(srchost=host, srcfile=bname,
 					        srcdir=rdir, destdir=ldir, desthost=dhost, destprovider=provider)
 					)
-					vdl:logvar(var, path)
 					log(LOG:DEBUG, "FILE_STAGE_OUT_END srcname={bname} srcdir={rdir} srchost={host} ",
 						"destdir={ldir} desthost={dhost} provider={provider}")
 	
@@ -261,6 +260,13 @@
 			)
 			log(LOG:INFO, "END jobid={jobid} - Staging out finished")
 		)
+
+		element(doRestartlog, [restartouts]
+			uParallelFor(f, restartouts,
+				[path, var] := each(f)
+				vdl:logvar(var, path)
+			)
+		)
 		
 		element(graphStuff, [tr, stagein, stageout, err, optional(args)]
 			if(
@@ -352,7 +358,7 @@
 			recfile
 		)
 
-		element(execute2, [tr, optional(arguments, stdin, stdout, stderr), stagein, stageout, 
+		element(execute2, [tr, optional(arguments, stdin, stdout, stderr), stagein, stageout,  restartout,
 			replicationGroup, replicationChannel]
 			stagein := list(unique(each(stagein)))
 			stageout := list(unique(each(stageout)))
@@ -418,6 +424,7 @@
 						
 						vdl:setprogress("Stage out")
 						doStageout(jobid, stageout, sharedDir, rhost)
+						doRestartlog(restartout)
 						if(
 							kickstart != "" & vdl:configProperty("kickstart.always.transfer") == "true"
 							discard(transferKickstartRec(rhost, wfdir, jobid, jobdir))

Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml	2008-07-07 16:57:50 UTC (rev 2092)
+++ trunk/libexec/vdl-lib.xml	2008-07-07 17:21:04 UTC (rev 2093)
@@ -47,6 +47,7 @@
 	<export name="getFieldValue"><elementDef classname="org.griphyn.vdl.karajan.lib.GetFieldValue"/></export>
 	<export name="waitFieldValue"><elementDef classname="org.griphyn.vdl.karajan.lib.WaitFieldValue"/></export>
 	<export name="isFileBound"><elementDef classname="org.griphyn.vdl.karajan.lib.IsFileBound"/></export>
+	<export name="isRestartable"><elementDef classname="org.griphyn.vdl.karajan.lib.IsRestartable"/></export>
 	<export name="fileSet"><elementDef classname="org.griphyn.vdl.karajan.lib.FileSet"/></export>
 	<export name="fringePaths"><elementDef classname="org.griphyn.vdl.karajan.lib.FringePaths"/></export>
 	<export name="assign"><elementDef classname="org.griphyn.vdl.karajan.lib.Assign"/></export>

Modified: trunk/libexec/vdl.k
===================================================================
--- trunk/libexec/vdl.k	2008-07-07 16:57:50 UTC (rev 2092)
+++ trunk/libexec/vdl.k	2008-07-07 17:21:04 UTC (rev 2093)
@@ -100,6 +100,20 @@
 					deperror = true
 				)
 			)
+			try(
+				if(vdl:isRestartable(var) 
+					channel:to(restartout,
+						for(path, vdl:fringePaths(var)
+							list(path, var)
+						)
+					)
+				)
+				catch(".*not mapped.*"
+					log(LOG:DEBUG, exception)
+					mdeperror = true
+					deperror = true
+				)
+			)
 		)
 
 		element(mapping, [descriptor, ...]

Added: trunk/src/org/griphyn/vdl/karajan/lib/IsRestartable.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/IsRestartable.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/IsRestartable.java	2008-07-07 17:21:04 UTC (rev 2093)
@@ -0,0 +1,25 @@
+package org.griphyn.vdl.karajan.lib;
+
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.griphyn.vdl.mapping.AbstractDataNode;
+import org.griphyn.vdl.mapping.DSHandle;
+
+/** Determines if a variable is 'restartable'; that is, if we restart the
+    workflow, will this variable still have its content.
+*/
+    
+    
+
+public class IsRestartable extends VDLFunction {
+	static {
+		setArguments(IsRestartable.class, new Arg[] { PA_VAR });
+	}
+
+	public Object function(VariableStack stack) throws ExecutionException {
+		DSHandle var = (DSHandle) PA_VAR.getValue(stack);
+		return Boolean.valueOf(var.isRestartable());
+	}
+}
+

Modified: trunk/src/org/griphyn/vdl/mapping/AbstractDataNode.java
===================================================================
--- trunk/src/org/griphyn/vdl/mapping/AbstractDataNode.java	2008-07-07 16:57:50 UTC (rev 2092)
+++ trunk/src/org/griphyn/vdl/mapping/AbstractDataNode.java	2008-07-07 17:21:04 UTC (rev 2093)
@@ -65,6 +65,10 @@
 		return field.getType().isPrimitive();
 	}
 
+	public boolean isRestartable() {
+		return !isPrimitive();
+	}
+
 	protected Field getField() {
 		return field;
 	}

Modified: trunk/src/org/griphyn/vdl/mapping/DSHandle.java
===================================================================
--- trunk/src/org/griphyn/vdl/mapping/DSHandle.java	2008-07-07 16:57:50 UTC (rev 2092)
+++ trunk/src/org/griphyn/vdl/mapping/DSHandle.java	2008-07-07 17:21:04 UTC (rev 2093)
@@ -82,4 +82,6 @@
     Mapper getMapper();
 
     public String getIdentifier();
+
+    public boolean isRestartable();
 }




More information about the Swift-commit mailing list