[Swift-commit] r6187 - branches/faster/libexec

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Tue Jan 29 23:28:48 CST 2013


Author: hategan
Date: 2013-01-29 23:28:47 -0600 (Tue, 29 Jan 2013)
New Revision: 6187

Added:
   branches/faster/libexec/swift-int-staging.k
   branches/faster/libexec/swift-int-wrapper-staging.k
Removed:
   branches/faster/libexec/vdl-int-staging.k
   branches/faster/libexec/vdl-int-wrapper-staging.k
Modified:
   branches/faster/libexec/swift-int.k
   branches/faster/libexec/swift.k
Log:
selectable swift-int-xxx

Copied: branches/faster/libexec/swift-int-staging.k (from rev 6170, branches/faster/libexec/vdl-int-staging.k)
===================================================================
--- branches/faster/libexec/swift-int-staging.k	                        (rev 0)
+++ branches/faster/libexec/swift-int-staging.k	2013-01-30 05:28:47 UTC (rev 6187)
@@ -0,0 +1,171 @@
+import(sys)
+import(task)
+import('swift-lib')
+/*
+ * Things that are not exposed to the translated file
+ */
+
+
+WRAPPER_TRANSFER_MODE :=
+	if (vdl:configProperty("wrapperlog.always.transfer") == "true", 
+		STAGING_MODE:IF_PRESENT, STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
+
+pinOption := configProperty("provider.staging.pin.swiftfiles")
+
+PIN := if(pinOption == "true", "pinned:", "")
+PROVENANCE_GRAPH_ENABLED := (vdl:configProperty("pgraph") != "false")
+
+namespace(swift) {
+
+	initDDir := function() {
+		ddir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
+		
+		once(ddir) {
+			if(!file:exists(ddir)) {
+				task:dir:make(ddir)
+			}
+		}
+		ddir
+	}
+	
+	inFiles := function(stageins) {
+		pathnames(stageins)
+	}
+	
+	fileSizes := function(files) {
+		math:sum(
+			for(f, files, file:size(f))
+		)
+	}
+
+	export(cleanups,
+		function(cleanup) {
+			log(LOG:INFO, "START cleanups={cleanup}")
+		}
+	)
+
+	readErrorFile := function(dir, jobid) {
+		str:strip(file:read("{dir}/{jobid}.error"))
+		file:remove("{dir}/{jobid}.error")
+	}
+	
+	export(execute2,
+		function(progress, tr, stagein, stageout,  restartout
+			replicationGroup, replicationChannel
+			arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+	
+			stagein := list(unique(each(stagein)))
+			stageout := list(unique(each(stageout)))
+
+			allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
+
+				ddir := initDDir()
+
+				uid := UID()
+				jobdir := substring(uid, from=0, to=1)
+				jobid := "{tr}-{uid}"
+
+				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
+
+				wrapper := "_swiftwrap.staging"
+				wrapfile := "{ddir}/param-{jobid}"
+				
+				stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
+				stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr))
+
+				wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+				tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
+				cdmfile := cdm:file()
+
+				try {
+					log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " tmpdir={tmpdir} host={rhost}")
+
+					setProgress(progress, "Submitting")
+
+					swift:execute(
+						siteProfile(rhost, "swift:wrapperInterpreter"),
+						list(
+							siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+							wrapper,
+							"-e", vdl:executable(tr, rhost),
+							"-out", stdout,
+							"-err", stderr,
+							"-i", if (stdin != null, getFieldValue(stdin)),
+							"-d", flatten(unique(outFileDirs(stageout))),
+							"-if", flatten(infiles(stagein)),
+							"-of", flatten(outfiles(stageout)),
+							"-k",
+							"-cdmfile", cdmfile,
+							"-status", "provider"
+							"-a", if (arguments != null, each(arguments))
+						)
+						directory = tmpdir
+						redirect = false
+						host = rhost
+
+						TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
+						replicationGroup = replicationGroup
+						replicationChannel = replicationChannel
+						jobid = jobid
+					
+						stagingMethod := siteProfile(rhost, "swift:stagingMethod", default="proxy")
+
+						stageIn("{PIN}{stagingMethod}://localhost/{swift.home}/libexec/{wrapper}", wrapper)
+
+						if (cdmfile != "") {
+							d := dirname(cdmfile)
+							file := basename(cdmfile)
+							dir := if (d == "", "./", str:concat(d,"/"))
+							loc := "{PIN}{stagingMethod}://localhost/"
+							stageIn("{loc}{dir}{file}", cdmfile)
+							stageIn("{loc}{swift.home}/libexec/cdm.pl", "cdm.pl")
+							stageIn("{loc}{swift.home}/libexec/cdm_lib.sh", "cdm_lib.sh")
+						}
+
+						appStageins(jobid, stagein, ".", stagingMethod)
+
+						stageOut("wrapper.log", "{stagingMethod}://localhost/{ddir}/{jobid}.info", 
+							mode = WRAPPER_TRANSFER_MODE)
+						//stageOut("{stdout}", "{stagingMethod}://localhost/{ddir}/{stdout}")
+						//stageOut("{stderr}", "{stagingMethod}://localhost/{ddir}/{stderr}")
+						stageOut("wrapper.error", "{stagingMethod}://localhost/{ddir}/{jobid}.error", 
+							mode = STAGING_MODE:IF_PRESENT)
+						appStageouts(jobid, stageout, ".", stagingMethod)
+
+						task:cleanUp(".") //the whole job directory
+					) // execute
+					doRestartlog(restartout)
+					log(LOG:DEBUG, "JOB_END jobid={jobid}")
+				}
+				else catch(prev) {
+					if (matches(prev, "^Abort$")) {
+						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+						throw(prev)
+					}
+					else {
+						setProgress(progress, "Failed but can retry")
+						exception := try(exception(readErrorFile(ddir, jobid)), prev)
+						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+					
+						throw(
+							exception(
+								concat(
+									"Exception in {tr}:",
+									if (arguments != null, "\n    Arguments: {arguments}")
+									"\n    Host: {rhost}",
+									"\n    Directory: {tmpdir}"
+								)
+								exception
+							)
+						)
+					}
+				}
+			}
+		}
+	)
+}
+
+// Local variables:
+// tab-width: 4
+// indent-tabs-mode: 1
+// End:

Copied: branches/faster/libexec/swift-int-wrapper-staging.k (from rev 6169, branches/faster/libexec/vdl-int-wrapper-staging.k)
===================================================================
--- branches/faster/libexec/swift-int-wrapper-staging.k	                        (rev 0)
+++ branches/faster/libexec/swift-int-wrapper-staging.k	2013-01-30 05:28:47 UTC (rev 6187)
@@ -0,0 +1,329 @@
+import(sys)
+import(task)
+import('swift-lib')
+
+
+getURLPrefix := def("org.griphyn.vdl.karajan.lib.GetURLPrefix")
+
+URL_PREFIX := getURLPrefix() 
+
+WRAPPERLOG_ALWAYS_TRANSFER := vdl:configProperty("wrapperlog.always.transfer")
+SITEDIR_KEEP := vdl:configProperty("sitedir.keep")
+
+
+namespace(swift) {
+	
+	checkJobStatus := function(jobdir, jobid, tr) {
+		log(LOG:DEBUG, "START jobid={jobid}")
+		try {
+			file:remove("{jobdir}/_swift.success")
+			log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
+		}
+		else {
+			throw(checkErrorFile(jobdir, jobid))
+		}
+	}
+		
+	checkErrorFile := function(jobdir, jobid) {
+		if (file:exists("{jobdir}/_swift.error")) {
+			log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
+			error := str:strip(file:read("{jobdir}/_swift.error"))
+			file:remove("{jobdir}/_swift.error")
+			error
+		}
+		else {
+			log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
+			throw("No status file was found")
+		}
+	}
+
+	initSharedDir := function(progress, rhost) {
+		once(list(rhost, "shared")) {
+			setProgress(progress, "Initializing site shared directory")
+
+			log(LOG:INFO, "START host={rhost} - Initializing shared directory")
+
+			wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+			dir:make(wfdir, host=rhost)
+			transfer(srcdir="{swift.home}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
+
+			wfdir
+			to(cleanup, list(wfdir, rhost))
+			log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
+		}
+	}
+	
+	initDDir := function() {
+		ddir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
+		
+		once(ddir) {
+			if(!file:exists(ddir)) {
+				task:dir:make(ddir)
+			}
+		}
+		ddir
+	}
+
+
+	inFiles := function(stageins) {
+		pathnames(stageins)
+	}
+
+	fileDirs := function(stageins, stageouts) {
+		list(
+			unique(
+				inFileDirs(stageins)
+				outFileDirs(stageouts)
+			)
+		)
+	}
+
+	cleanup := function(dir, host) {
+		log(LOG:INFO, "START dir={dir} host={host}")
+		if(vdl:configProperty("sitedir.keep") == "false") {
+			task:execute(
+				vdl:siteprofile(host, "swift:cleanupCommand"),
+				arguments = list(
+					siteProfile(host, "swift:cleanupCommandOptions"),
+					dir
+				)
+				host=host, batch=true, TCProfile(host)
+			)
+			log(LOG:INFO, "END dir={dir} host={host}")
+		}
+	}
+
+	cleanups := function(cleanup) {
+		log(LOG:INFO, "START cleanups={cleanup}")
+		parallelFor(i, cleanup) {
+			(dir, host) := each(i)
+			try {
+				vdl:cleanup(dir, host)
+			}
+			else catch(exception) {
+				log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
+				to(warnings, exception("Cleanup on {host} failed", exception))
+			}
+		}
+		log(LOG:INFO, "END cleanups={cleanup}")
+	}
+
+	stageWrapperParams := function(jobid, wrapfile, dir, host) {
+		log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
+		provider := provider(wrapfile)
+		srchost := hostname(wrapfile)
+		srcdir := vdl:dirname(wrapfile)
+		destdir := dir
+		filename := basename(wrapfile)
+
+		cacheOn(list(destdir, host)
+			dir:make(destdir, host=host, provider=provider)
+		)
+
+		log(LOG:INFO, "END jobid={jobid}")
+	}
+
+		
+	readStandardFiles := function(jobdir, stdout, stderr) {
+		concat(
+			for(f, [["_swift.stderr", stderr], ["_swift.stdout", stdout]]) {
+				(name, file) := each(f)
+				destfile := "{jobdir}/{file}"
+				"\n    {name}: "
+				try {
+					file:read(destfile)
+					"\n"
+				}
+				else {
+					""
+				}
+			}
+		)
+	}
+		
+
+	export(execute2,
+		function(progress, tr, stagein, stageout,  restartout
+			replicationGroup, replicationChannel
+			arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+
+			stagein := list(unique(each(stagein)))
+			stageout := list(unique(each(stageout)))
+
+			allocateHost(rhost, constraints=jobConstraints(tr, stagein = stagein)) {
+
+				ddir := initDDir()
+				wfdir := try {
+					initSharedDir(rhost)
+				}
+				else catch(exception) {
+					throw(exception("Could not initialize shared directory on {rhost}", exception))
+				}
+
+				uid := UID()
+				jobid := "{tr}-{uid}"
+				
+				jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
+
+				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+
+				statusMode := configProperty("status.mode", host = rhost)
+				wrapperMode := configProperty("wrapper.parameter.mode", host = rhost)
+
+				wrapfile := "{jobdir}/_paramfile"
+
+				stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
+				stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr))
+
+				fileDirs := fileDirs(stagein, stageout)
+				os := siteProfile(rhost, "SYSINFO:OS")
+				
+				scratch := siteProfile(rhost, "scratch")
+
+				if(wrapperMode == "files") {
+					file:write(wrapfile,
+						"-e ",vdl:executable(tr, rhost),
+						"\n-out ", stdout,
+						"\n-err ", stderr,
+						"\n-i ", if (stdin != null, getFieldValue(stdin)),
+						"\n-d ", flatten(each(fileDirs)),
+						"\n-if ", flatten(inFiles(stagein)),
+						"\n-of ", flatten(outFiles(stageout)),
+						"\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+						"\n-sk", SITEDIR_KEEP,
+						"\n-cdmfile ", cdm:file(),
+						"\n-status ", statusMode,
+						for(a, arguments) {
+							"\n-a ", a
+						}
+					)
+				}
+				
+
+				setProgress(progress, "Stage in")
+
+				try {
+					if (wrapperMode == "files") {
+						stageWrapperParams(jobid, wrapfile, wfdir, rhost)
+					}
+
+					log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " host={rhost}")
+
+					setProgress(progress, "Submitting")
+
+					if (wrapperMode == "files") {
+						swift:execute(
+							siteProfile(rhost, "swift:wrapperInterpreter"),
+							list(
+								siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+								"_swiftwrap.wrapperstaging",
+								jobid,
+								"-urlprefix", URL_PREFIX,
+								"-jobdir", jobdir,
+								"-scratch", scratch, 
+								"-p", wrapfile
+							)
+							directory=wfdir
+							redirect=false
+							host=rhost
+							TCProfile(rhost, if (attributes != null, attributes = attributes), tr=tr)
+							replicationGroup=replicationGroup
+							replicationChannel=replicationChannel
+							jobid=jobid
+						)
+					}
+					if (wrapperMode == "args") {
+						swift:execute(
+							siteProfile(rhost, "swift:wrapperInterpreter"),
+							list(
+								siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+								"_swiftwrap.wrapperstaging",
+								jobid,
+								"-urlprefix", URL_PREFIX,
+								"-jobdir", jobdir,
+								"-scratch", scratch,
+								"-e", vdl:executable(tr, rhost),
+								"-out", stdout,
+								"-err", stderr,
+								"-i", if (stdin != null, getFieldValue(stdin)),
+								"-d", flatten(each(fileDirs)),
+								"-if", flatten(inFiles(stagein)),
+								"-of", flatten(outFiles(stageout)),
+								"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+								"-sk", SITEDIR_KEEP,
+								"-cdmfile", cdm:file(),
+								"-status", statusMode,
+								"-a", if (arguments != null, each(arguments))
+							)
+							directory=wfdir
+							redirect=false
+							host=rhost
+							TCProfile(rhost, if(attributes != null, attributes = attributes), tr=tr)
+							replicationGroup=replicationGroup
+							replicationChannel=replicationChannel
+							jobid=jobid
+						)
+					}
+					
+					setProgress(progress, "Checking status")
+					if (statusMode == "files") {
+						checkJobStatus(jobdir, jobid, tr)
+					}
+
+					if (wrapperMode == "files") {
+						file:remove(wrapfile)
+					}
+
+					log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+
+
+					/* need to stage the files to upper scratch area in case they are not transfered to another site
+					   before all the files get cleaned out */
+
+
+					setProgress(progress, "Stage out")
+					doRestartlog(restartout)
+					
+					log(LOG:DEBUG, "JOB_END jobid={jobid}")
+				}
+				else catch(prev) {
+					if (matches(prev, "^Abort$")) {
+						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+						throw(prev)
+					}
+					else {
+						setProgress(progress, "Failed but can retry")
+						exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
+					
+						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+						if (matches(exception,".*executable bit.*")) {
+							throw(exception)
+						}
+						
+						outs := readStandardFiles(jobdir, stdout, stderr)
+						
+						throw(
+							exception(
+								concat(
+									"Exception in {tr}:",
+									if (arguments != null, "\n    Arguments: {arguments}")
+									"\n    Host: {rhost}",
+									"\n    Directory: {tmpdir}",
+									"{outs}",
+								)
+								exception
+							)
+						)
+					}
+				}
+			}
+		}
+	)
+}
+
+// Local variables:
+// mode: scheme
+// tab-width: 4
+// indent-tabs-mode: t
+// End:

Modified: branches/faster/libexec/swift-int.k
===================================================================
--- branches/faster/libexec/swift-int.k	2013-01-30 05:28:14 UTC (rev 6186)
+++ branches/faster/libexec/swift-int.k	2013-01-30 05:28:47 UTC (rev 6187)
@@ -455,7 +455,7 @@
 						stageWrapperParams(jobid, jobdir, wrapfile, wfdir, rhost)
 					}
 
-					log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " tmpdir={tmpdir} host={rhost}")
+					log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " tmpdir={tmpdir} host={rhost}")
 
 					setProgress(progress, "Submitting")
 
@@ -501,7 +501,7 @@
 							directory = wfdir
 							redirect = false
 							host = rhost
-							TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
+							TCProfile(rhost, if(attributes != null, attributes = attributes), tr = tr)
 							replicationGroup = replicationGroup
 							replicationChannel = replicationChannel
 							jobid = jobid
@@ -569,7 +569,7 @@
 							exception(
 								concat(
 									"Exception in {tr}:",
-									maybe("\n    Arguments: ", arguments),
+									if (arguments != null, "\n    Arguments: {arguments}")
 									"\n    Host: {rhost}",
 									"\n    Directory: {tmpdir}",
 									"{outs}",
@@ -582,24 +582,6 @@
 			}
 		}
 	)
-
-	export(generateProvenanceGraph,
-		function(gdata) {
-			pgraph := configProperty("pgraph")
-			gname := if(pgraph == "true", "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.dot", pgraph)
-			file:write(gname) {
-				"digraph SwiftProvenance \{\n",
-				"	graph [", configProperty("pgraph.graph.options"), "];\n",
-				"	node [", configProperty("pgraph.node.options"), "];\n",
-	
-				for(i, gdata) {
-					"	", i, "\n"
-				}
-				"}\n"
-			}
-			log(LOG:INFO, "Provenance graph saved in ", gname)
-		}
-	)
 }
 
 // Local variables:

Modified: branches/faster/libexec/swift.k
===================================================================
--- branches/faster/libexec/swift.k	2013-01-30 05:28:14 UTC (rev 6186)
+++ branches/faster/libexec/swift.k	2013-01-30 05:28:47 UTC (rev 6187)
@@ -12,8 +12,8 @@
 
 namespace(swift) {
 
-	pstaging := configProperty("use.provider.staging")
-	wstaging := configProperty("use.wrapper.staging")
+	pstaging := (configProperty("use.provider.staging") == "true")
+	wstaging := (configProperty("use.wrapper.staging") == "true")
 	
 	import(java)
 
@@ -132,6 +132,53 @@
 		}
 	)
 	
+	graphStuff := function(tr, stagein, stageout, err, args = null) {
+		if (configProperty("pgraph") != "false") {
+			errprops := if(err, ",color=lightsalmon", ",color=lightsteelblue1")
+			tp := currentThread()
+			to (graph) {
+				concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
+			}
+			for (si, stagein) {
+				si := basename(si)
+				to(graph) {
+					concat(str:quote(si), " [shape=parallelogram]")
+					concat(str:quote(si), " -> ", str:quote(tp))
+				}
+			}
+			for (pv, stageout) {
+				(path, var) := each(pv)
+				file := fileName(getField(var, path=path))
+				file := basename(file)
+				label := niceName(var, path = path)
+				to(graph) {
+					concat(str:quote(file), " [shape=parallelogram,label=", str:quote(label), "]")
+					concat(str:quote(tp), " -> ", str:quote(file))
+				}
+			}
+		}
+	}
+	
+	generateProvenanceGraph := function(gdata) {
+		pgraph := configProperty("pgraph")
+		gname := if(pgraph == "true", "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.dot", pgraph)
+		file:write(gname) {
+			"digraph SwiftProvenance \{\n",
+			"	graph [", configProperty("pgraph.graph.options"), "];\n",
+			"	node [", configProperty("pgraph.node.options"), "];\n",
+
+			for(i, gdata) {
+				"	", i, "\n"
+			}
+			"}\n"
+		}
+		log(LOG:INFO, "Provenance graph saved in ", gname)
+	}
+	
+	impl := if (pstaging, "swift-int-staging.k", if (wstaging, "swift-int-wrapper-staging.k", "swift-int.k"))
+	
+	import(file = impl)
+	
 	export(execute,
 		function(
 			tr, arguments = null, 

Deleted: branches/faster/libexec/vdl-int-staging.k
===================================================================
--- branches/faster/libexec/vdl-int-staging.k	2013-01-30 05:28:14 UTC (rev 6186)
+++ branches/faster/libexec/vdl-int-staging.k	2013-01-30 05:28:47 UTC (rev 6187)
@@ -1,212 +0,0 @@
-import("sys.k")
-import("task.k")
-import("vdl-lib.xml")
-/*
- * Things that are not exposed to the translated file
- */
-
-global(LOG:DEBUG, "debug")
-global(LOG:INFO, "info")
-global(LOG:WARN, "warn")
-global(LOG:ERROR, "error")
-global(LOG:FATAL, "fatal")
-
-global(WRAPPER_TRANSFER_MODE,
-	if(vdl:configProperty("wrapperlog.always.transfer") == "true", 
-		STAGING_MODE:IF_PRESENT, STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT))
-		
-pinOption := configProperty("provider.staging.pin.swiftfiles")
-global(PIN, if(pinOption == "true", "pinned:", ""))
-global(PROVENANCE_GRAPH_ENABLED, vdl:configProperty("pgraph") != "false") 
-
-namespace("vdl") {
-	export(
-		function(initDDir) {
-			ddir := "{VDL:SCRIPTNAME}-{VDL:RUNID}.d"
-			once(ddir) {
-				if(sys:not(file:exists(ddir))) {
-					task:dir:make(ddir)
-				}
-			}
-			ddir
-		}
-
-		function(inFiles, stageins) {
-			pathnames(stageins)
-		}
-
-		function(graphStuff, tr, stagein, stageout, err, optional(args)) {
-			if(PROVENANCE_GRAPH_ENABLED) {
-				errprops := if(err ",color=lightsalmon" ",color=lightsteelblue1")
-				tp := vdl:threadPrefix()
-				to(graph) {
-					concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
-				}
-				for(si, stagein) {
-					si := basename(si)
-					to(graph) {
-						concat(str:quote(si), " [shape=parallelogram]")
-						concat(str:quote(si), " -> ", str:quote(tp))
-					}
-				}
-				for(pv, stageout) {
-					(path, var) := each(pv)
-					file := vdl:fileName(vdl:getfield(var, path=path))
-					file := basename(file)
-					label := vdl:niceName(var, path = path)
-					to(graph) {
-						concat(str:quote(file), " [shape=parallelogram,label=",
-							str:quote(label), "]")
-						concat(str:quote(tp), " -> ", str:quote(file))
-					}
-				}
-			}
-		}
-
-		function(fileSizes, files) {
-			math:sum(
-				for(f, files, file:size(file))
-			)
-		}
-
-		function(cleanups, cleanup) {
-			log(LOG:INFO, "START cleanups={cleanup}")
-		}
-		
-		function(readErrorFile, dir, jobid) {
-			str:strip(file:read("{dir}/{jobid}.error"))
-			file:remove("{dir}/{jobid}.error")
-		}
-		
-		function(execute2, tr, optional(arguments, stdin, stdout, stderr), stagein, stageout,  restartout,
-			replicationGroup, replicationChannel) {
-			
-			stagein := list(unique(each(stagein)))
-			stageout := list(unique(each(stageout)))
-
-			allocateHost(rhost, constraints=vdl:jobConstraints(tr, stagein=stagein)) {
-
-				ddir := initDDir()
-
-				uid := uid()
-				jobdir := substring(uid, from=0, to=1)
-				jobid := "{tr}-{uid}"
-
-				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
-
-				wrapper := "_swiftwrap.staging"
-				wrapfile := "{ddir}/param-{jobid}"
-
-				stdout := try(getFieldValue(stdout), "stdout.txt")
-				stderr := try(getFieldValue(stderr), "stderr.txt")
-
-				wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
-				tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
-				cdmfile := cdm:file()
-
-				try {
-					log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " tmpdir={tmpdir} host={rhost}")
-
-					vdl:setprogress("Submitting")
-
-					vdl:execute(
-						vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
-						list(
-							vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
-							wrapper,
-							"-e", vdl:executable(tr, rhost),
-							"-out", stdout,
-							"-err", stderr,
-							"-i", maybe(getFieldValue(stdin)),
-							"-d", flatten(unique(outFileDirs(stageout))),
-							"-if", flatten(infiles(stagein)),
-							"-of", flatten(outfiles(stageout)),
-							"-k",
-							"-cdmfile", cdmfile,
-							"-status", "provider"
-							"-a", maybe(each(arguments))
-						)
-						directory = tmpdir
-						redirect = false
-						host = rhost
-
-						vdl:tcprofile(rhost, tr = tr) //this gets various app params from the tc, such as environment, walltime, etc
-						replicationGroup = replicationGroup
-						replicationChannel = replicationChannel
-						jobid = jobid
-					
-						stagingMethod := vdl:siteProfile(rhost, "swift:stagingMethod", default="proxy")
-
-						stageIn("{PIN}{stagingMethod}://localhost/{swift.home}/libexec/{wrapper}", wrapper)
-
-						if (cdmfile != "") {
-							d := vdl:dirname(cdmfile)
-							file := basename(cdmfile)
-							dir := if (d == "", "./", str:concat(d,"/"))
-							loc := "{PIN}{stagingMethod}://localhost/"
-							stageIn("{loc}{dir}{file}", cdmfile)
-							stageIn("{loc}{swift.home}/libexec/cdm.pl", "cdm.pl")
-							stageIn("{loc}{swift.home}/libexec/cdm_lib.sh", "cdm_lib.sh")
-						}
-
-						appStageins(jobid, stagein, ".", stagingMethod)
-
-						stageOut("wrapper.log", "{stagingMethod}://localhost/{ddir}/{jobid}.info", 
-							mode = WRAPPER_TRANSFER_MODE)
-						//stageOut("{stdout}", "{stagingMethod}://localhost/{ddir}/{stdout}")
-						//stageOut("{stderr}", "{stagingMethod}://localhost/{ddir}/{stderr}")
-						stageOut("wrapper.error", "{stagingMethod}://localhost/{ddir}/{jobid}.error", 
-							mode = STAGING_MODE:IF_PRESENT)
-						appStageouts(jobid, stageout, ".", stagingMethod)
-
-						task:cleanUp(".") //the whole job directory
-					) // execute
-					doRestartlog(restartout)
-					log(LOG:DEBUG, "JOB_END jobid={jobid}")
-				}
-				else catch(exception, "^Abort$") {
-					log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
-					throw(exception)
-				}
-				else catch(prev, "^(?!Abort$).*") {
-					vdl:setprogress("Failed but can retry")
-					exception := try(exception(readErrorFile(ddir, jobid)), prev)
-					log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
-					
-					throw(
-						exception(
-							concat(
-								"Exception in {tr}:", nl(),
-								maybe("    Arguments: {arguments}", nl()),
-								"    Host: {rhost}", nl(),
-								"    Directory: {tmpdir}", nl()
-							)
-							exception
-						)
-					)
-				}
-			}
-		}
-
-		function(generateProvenanceGraph, gdata) {
-			pgraph := vdl:configProperty("pgraph")
-			gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
-			file:write(gname) {
-				"digraph SwiftProvenance {{", nl()
-				"	graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
-				"	node [", vdl:configProperty("pgraph.node.options"), "];", nl()
-
-				for(i, gdata) {
-					"	", i, nl()
-				}
-				"}", nl()
-			}
-			log(LOG:INFO, "Provenance graph saved in ", gname)
-		}
-	)
-}
-
-// Local variables:
-// tab-width: 4
-// indent-tabs-mode: 1
-// End:

Deleted: branches/faster/libexec/vdl-int-wrapper-staging.k
===================================================================
--- branches/faster/libexec/vdl-int-wrapper-staging.k	2013-01-30 05:28:14 UTC (rev 6186)
+++ branches/faster/libexec/vdl-int-wrapper-staging.k	2013-01-30 05:28:47 UTC (rev 6187)
@@ -1,384 +0,0 @@
-import("sys.k")
-import("task.k")
-import("vdl-lib.xml")
-/*
- * Things that are not exposed to the translated file
- */
-
-global(LOG:DEBUG, "debug")
-global(LOG:INFO, "info")
-global(LOG:WARN, "warn")
-global(LOG:ERROR, "error")
-global(LOG:FATAL, "fatal")
-
-global(URL_PREFIX, 
-	elementDef(getURLPrefix, className="org.griphyn.vdl.karajan.lib.GetURLPrefix")
-	getURLPrefix() 
-)
-
-global(WRAPPERLOG_ALWAYS_TRANSFER, vdl:configProperty("wrapperlog.always.transfer"))
-global(SITEDIR_KEEP, vdl:configProperty("sitedir.keep"))
-
-
-namespace("vdl"
-	export(
-		element(checkJobStatus, [jobdir, jobid, tr]
-			log(LOG:DEBUG, "START jobid={jobid}")
-			try(
-				sequential(
-					/*
-					 * This is a bit of optimization, but I'm not completely
-					 * sure of its correctness. The goal is to both detect
-					 * the presence of the success file and remove it, all
-					 * in one operation. It relies on file:remove() throwing
-					 * an exception if the file is not there.
-					 */
-					file:remove("{jobdir}/_swift.success")
-					log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
-				)
-				throw(checkErrorFile(jobdir, jobid))
-			)
-		)
-		
-		element(checkErrorFile, [jobdir, jobid]
-			if (
-				file:exists("{jobdir}/_swift.error") then(
-					log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
-					error := str:strip(file:read("{jobdir}/_swift.error"))
-					file:remove("{jobdir}/_swift.error")
-					error
-				)
-				else (
-					log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
-					throw("No status file was found")
-				)
-			)
-		)
-
-		element(initSharedDir, [rhost]
-			once(list(rhost, "shared")
-				vdl:setprogress("Initializing site shared directory")
-
-				log(LOG:INFO, "START host={rhost} - Initializing shared directory")
-
-				wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
-				dir:make(wfdir, host=rhost)
-				transfer(srcdir="{swift.home}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
-
-				wfdir
-				//we send the cleanup data to vdl:main()
-				to(cleanup, list(wfdir, rhost))
-				log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
-			)
-		)
-
-		element(initDDir, []
-			ddir := "{VDL:SCRIPTNAME}-{VDL:RUNID}.d"
-			once(ddir
-				if(sys:not(file:exists(ddir))
-					task:dir:make(ddir)
-				)
-			)
-			ddir
-		)
-
-		element(inFiles, [stageins]
-			stageins
-		)
-
-		element(fileDirs, [stageins, stageouts]
-			list(
-				unique(
-					inFileDirs(stageins)
-					outFileDirs(stageouts)
-				)
-			)
-		)
-
-		element(cleanup, [dir, host]
-			log(LOG:INFO, "START dir={dir} host={host}")
-			if(vdl:configProperty("sitedir.keep") == "false"
-				task:execute(
-					vdl:siteprofile(host, "swift:cleanupCommand"),
-					arguments=list(
-						vdl:siteprofile(host, "swift:cleanupCommandOptions"),
-						dir
-					)
-					host=host, batch=true, tcprofile(host))
-			)
-			log(LOG:INFO, "END dir={dir} host={host}")
-		)
-
-		element(cleanups, [cleanup]
-			log(LOG:INFO, "START cleanups={cleanup}")
-			parallelFor(i, cleanup
-				[dir, host] := each(i)
-				try(
-					vdl:cleanup(dir, host)
-					catch(".*",
-						log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
-						to(warnings, exception("Cleanup on {host} failed", exception))
-					)
-				)
-			)
-			log(LOG:INFO, "END cleanups={cleanup}")
-		)
-
-		element(stageWrapperParams, [jobid, wrapfile, dir, host]
-			log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
-			provider := provider(wrapfile)
-			srchost := hostname(wrapfile)
-			srcdir := vdl:dirname(wrapfile)
-			destdir := dir
-			filename := basename(wrapfile)
-
-			cacheOn(list(destdir, host)
-				dir:make(destdir, host=host, provider=provider)
-			)
-
-			log(LOG:INFO, "END jobid={jobid}")
-		)
-
-		element(graphStuff, [tr, stagein, stageout, err, optional(args)]
-			if(
-				vdl:configProperty("pgraph") != "false" then(
-					errprops := if(err ",color=lightsalmon" ",color=lightsteelblue1")
-					tp := vdl:threadPrefix()
-					to(graph,
-						concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
-					)
-					for(si, stagein
-						si := basename(si)
-						to(graph
-							concat(str:quote(si), " [shape=parallelogram]")
-							concat(str:quote(si), " -> ", str:quote(tp))
-						)
-					)
-					for(pv, stageout
-						[path, var] := each(pv)
-						file := vdl:fileName(vdl:getfield(var, path=path))
-						file := basename(file)
-						label := vdl:niceName(var, path = path)
-						to(graph
-							concat(str:quote(file), " [shape=parallelogram,label=",
-								str:quote(label), "]")
-							concat(str:quote(tp), " -> ", str:quote(file))
-						)
-					)
-				)
-			)
-		)
-		
-		element(readStandardFiles, [jobdir, stdout, stderr]
-			concat(
-				for(f, list(list("_swift.stderr", stderr), list("_swift.stdout", stdout))
-					[name, file] := each(f)
-					destfile := "{jobdir}/{file}"
-					nl()
-					"{name}: "
-					try(
-						file:read(destfile)
-						nl()
-					)
-				)
-			)
-		)
-		
-
-		element(execute2, [tr, optional(arguments, stdin, stdout, stderr, attributes), stagein, stageout,  restartout,
-			replicationGroup, replicationChannel]
-			stagein := list(unique(each(stagein)))
-			stageout := list(unique(each(stageout)))
-
-			allocateHost(rhost, constraints=vdl:jobConstraints(tr, stagein=stagein)
-
-				ddir := initDDir()
-				wfdir := try(
-					initSharedDir(rhost)
-					throw(exception("Could not initialize shared directory on {rhost}", exception))
-				)
-
-				uid := uid()
-				jobid := "{tr}-{uid}"
-				
-				jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
-
-				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
-
-				statusMode := configProperty("status.mode",host=rhost)
-				wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
-
-				wrapfile := "{jobdir}/_paramfile"
-
-				stdout := try(getFieldValue(stdout), "_swift.stdout")
-				stderr := try(getFieldValue(stderr), "_swift.stderr")
-				fileDirs := fileDirs(stagein, stageout)
-				os := vdl:siteprofile(rhost, "SYSINFO:OS")
-				
-				scratch := vdl:siteprofile(rhost, "scratch")
-
-				if(wrapperMode == "files"
-					sequential(
-						sys:file:write(wrapfile,
-							"-e ",vdl:executable(tr, rhost), nl(),
-							"-out ", stdout, nl(),
-							"-err ", stderr, nl(),
-							"-i ", maybe(getFieldValue(stdin)), nl(),
-							"-d ", flatten(each(fileDirs)), nl(),
-							"-if ", flatten(inFiles(stagein)), nl(),
-							"-of ", flatten(outFiles(stageout)), nl(),
-							"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
-							"-sk", SITEDIR_KEEP,
-							"-cdmfile ", cdm:file(), nl(),
-							"-status ", statusMode, nl(),
-							for(a, arguments, "-a ", a, nl())
-						)
-					)
-				)
-
-				vdl:setprogress("Stage in")
-
-				try(
-					sequential(
-
-						if(wrapperMode == "files"
-							stageWrapperParams(jobid, wrapfile, wfdir, rhost)
-						)
-
-						log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " host={rhost}")
-
-						vdl:setprogress("Submitting")
-
-						if(wrapperMode == "files"
-							vdl:execute(
-								vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
-								list(
-									vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
-									"_swiftwrap.wrapperstaging",
-									jobid,
-									"-urlprefix", URL_PREFIX,
-									"-jobdir", jobdir,
-									"-scratch", scratch, 
-									"-p", wrapfile
-								)
-								directory=wfdir
-								redirect=false
-								host=rhost
-								vdl:tcprofile(rhost, maybe(attributes=attributes), tr=tr) //this gets various app params from the tc, such as environment, walltime, etc
-								replicationGroup=replicationGroup
-								replicationChannel=replicationChannel
-								jobid=jobid
-							)
-						)
-						if(wrapperMode == "args"
-							vdl:execute(
-								vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
-								list(
-									vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
-									"_swiftwrap.wrapperstaging",
-									jobid,
-									"-urlprefix", URL_PREFIX,
-									"-jobdir", jobdir,
-									"-scratch", scratch,
-									"-e", vdl:executable(tr, rhost),
-									"-out", stdout,
-									"-err", stderr,
-									"-i", maybe(getFieldValue(stdin)),
-									"-d", flatten(each(fileDirs)),
-									"-if", flatten(inFiles(stagein)),
-									"-of", flatten(outFiles(stageout)),
-									"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
-									"-sk", SITEDIR_KEEP,
-									"-cdmfile", cdm:file(),
-									"-status", statusMode,
-									"-a", maybe(each(arguments))
-								)
-								directory=wfdir
-								redirect=false
-								host=rhost
-								vdl:tcprofile(rhost, maybe(attributes=attributes), tr=tr)
-								replicationGroup=replicationGroup
-								replicationChannel=replicationChannel
-								jobid=jobid
-							)
-						)
-
-						vdl:setprogress("Checking status")
-						if(statusMode == "files"
-							checkJobStatus(jobdir, jobid, tr)
-						)
-
-						if(wrapperMode == "files"
-							file:remove(wrapfile)
-						)
-
-						log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
-
-
-						/* need to stage the files to upper scratch area in case they are not transfered to another site
-						   before all the files get cleaned out */
-
-
-						vdl:setprogress("Stage out")
-						doRestartlog(restartout)
-						
-						log(LOG:DEBUG, "JOB_END jobid={jobid}")
-					)
-					catch("^Abort$"
-						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
-						throw(exception)
-					)
-					catch("^(?!Abort$).*"
-						vdl:setprogress("Failed but can retry")
-						prev := exception
-						exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
-						
-						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
-
-						if(matches(exception,".*executable bit.*")
-							generateError(exception)
-						)
-						
-						outs := readStandardFiles(jobdir, stdout, stderr)
-
-						throw(
-							exception(
-								concat(
-									"Exception in {tr}:", nl(),
-									maybe("Arguments: ", arguments, nl()),
-									"Host: {rhost}", nl(),
-									"Directory: {scratch}/{jobid}",
-									"{outs}", nl(),
-									"----", nl()
-								)
-								exception
-							)
-						)
-					)
-				)
-			)
-		)
-
-		element(generateProvenanceGraph, [gdata]
-			pgraph := vdl:configProperty("pgraph")
-			gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
-			file:write(gname
-				"digraph SwiftProvenance {{", nl()
-				"	graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
-				"	node [", vdl:configProperty("pgraph.node.options"), "];", nl()
-
-				for(i, gdata
-					"	", i, nl()
-				)
-				"}", nl()
-			)
-			log(LOG:INFO, "Provenance graph saved in ", gname)
-		)
-	)
-)
-
-// Local variables:
-// mode: scheme
-// tab-width: 4
-// indent-tabs-mode: t
-// End:




More information about the Swift-commit mailing list