[Swift-commit] r7851 - trunk/libexec

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Sat May 10 13:05:54 CDT 2014


Author: hategan
Date: 2014-05-10 13:05:51 -0500 (Sat, 10 May 2014)
New Revision: 7851

Modified:
   trunk/libexec/swift-int-staging.k
   trunk/libexec/swift-int-wrapper-staging.k
   trunk/libexec/swift-int.k
   trunk/libexec/swift-lib.k
   trunk/libexec/swift.k
Log:
interned fields; data type reorganization; propagation of lazy failures in internal functions

Modified: trunk/libexec/swift-int-staging.k
===================================================================
--- trunk/libexec/swift-int-staging.k	2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift-int-staging.k	2014-05-10 18:05:51 UTC (rev 7851)
@@ -20,11 +20,10 @@
 PROVENANCE_GRAPH_ENABLED := (configProperty("pgraph") != "false")
 CLEANUP_ENABLED := (configProperty("sitedir.keep") != "true")
 
-namespace(swift) {
+DEBUG_DIR := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
+CDM_FILE := cdm:file()
 
-	initDDir := function() {
-		"{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
-	}
+namespace(swift) {
 		
 	fileSizes := function(files) {
 		math:sum(
@@ -60,140 +59,136 @@
 	}
 	
 	export(execute2,
-		function(progress, tr, stagein, stageout,
+		function(rhost, progress, tr, stagein, stageout,
 			replicationGroup, replicationChannel,
 			arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
-	
-			allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
 
-				ddir := initDDir()
+			uid := UID()
+			jobdir := substring(uid, 0, to=1)
+			jobid := "{tr}-{uid}"
 
-				uid := UID()
-				jobdir := substring(uid, 0, to=1)
-				jobid := "{tr}-{uid}"
+			log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
+			
+			wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
+			tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid),				
+			
+			(fileDirs, inFiles, outFiles, outCollect) := getStagingInfo(stagein, stageout)
 
-				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
-				
-				wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
-				tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
-				cdmfile := cdm:file(),
-				
-				(fileDirs, inFiles, outFiles, outCollect) := getStagingInfo(stagein, stageout)
+			try {
+				log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), 
+					" tmpdir={tmpdir} host={rhost}")
 
-				try {
-					log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), 
-						" tmpdir={tmpdir} host={rhost}")
+				setProgress(progress, "Submitting")
 
-					setProgress(progress, "Submitting")
+				swift:execute(
+					progress,
+					siteProfile(rhost, "swift:wrapperInterpreter"),
+					list(
+						siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+						"_swiftwrap.staging",
+						"-e", executable(tr, rhost),
+						"-out", if(stdout == null, "stdout.txt", getFieldValue(stdout)),
+						"-err", if(stderr == null, "stderr.txt", getFieldValue(stderr)),
+						"-i", if (stdin != null, getFieldValue(stdin)),
+						"-d", str:join(fileDirs, "|"),
+						"-if", str:join(remoteFileNames(inFiles), "|"),
+						"-of", str:join(remoteFileNames(outFiles), "|"),
+						"-cf", str:join(remoteFileNames(outCollect), "|"),
+						"-cdmfile", CDM_FILE,
+						"-status", "provider"
+						"-a", if (arguments != null, each(arguments))
+					)
+					directory = tmpdir
+					redirect = false
+					host = rhost
 
-					swift:execute(
-						progress,
-						siteProfile(rhost, "swift:wrapperInterpreter"),
-						list(
-							siteProfile(rhost, "swift:wrapperInterpreterOptions"),
-							"_swiftwrap.staging",
-							"-e", executable(tr, rhost),
-							"-out", if(stdout == null, "stdout.txt", getFieldValue(stdout)),
-							"-err", if(stderr == null, "stderr.txt", getFieldValue(stderr)),
-							"-i", if (stdin != null, getFieldValue(stdin)),
-							"-d", str:join(fileDirs, "|"),
-							"-if", str:join(remoteFileNames(inFiles), "|"),
-							"-of", str:join(remoteFileNames(outFiles), "|"),
-							"-cf", str:join(remoteFileNames(outCollect), "|"),
-							"-cdmfile", cdmfile,
-							"-status", "provider"
-							"-a", if (arguments != null, each(arguments))
-						)
-						directory = tmpdir
-						redirect = false
-						host = rhost
+					TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
+					replicationGroup = replicationGroup
+					replicationChannel = replicationChannel
+					jobid = jobid
+				
+					stagingMethod := siteProfile(rhost, "swift:stagingMethod", default="proxy")
 
-						TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
-						replicationGroup = replicationGroup
-						replicationChannel = replicationChannel
-						jobid = jobid
-					
-						stagingMethod := siteProfile(rhost, "swift:stagingMethod", default="proxy")
+					stageIn("{PIN}{stagingMethod}://localhost/{SWIFT:HOME}/libexec/_swiftwrap.staging", "_swiftwrap.staging")
 
-						stageIn("{PIN}{stagingMethod}://localhost/{SWIFT:HOME}/libexec/_swiftwrap.staging", "_swiftwrap.staging")
+					if (CDM_FILE != "") {
+						d := swift:dirname(CDM_FILE)
+						file := basename(CDM_FILE)
+						dir := if (d == "", "./", "{d}/")
+						loc := "{PIN}{stagingMethod}://localhost/"
+						stageIn("{loc}{dir}{file}", CDM_FILE)
+						stageIn("{loc}{SWIFT:HOME}/libexec/cdm.pl", "cdm.pl")
+						stageIn("{loc}{SWIFT:HOME}/libexec/cdm_lib.sh", "cdm_lib.sh")
+					}
 
-						if (cdmfile != "") {
-							d := swift:dirname(cdmfile)
-							file := basename(cdmfile)
-							dir := if (d == "", "./", "{d}/")
-							loc := "{PIN}{stagingMethod}://localhost/"
-							stageIn("{loc}{dir}{file}", cdmfile)
-							stageIn("{loc}{SWIFT:HOME}/libexec/cdm.pl", "cdm.pl")
-							stageIn("{loc}{SWIFT:HOME}/libexec/cdm_lib.sh", "cdm_lib.sh")
-						}
+					appStageins(jobid, inFiles, stagingMethod)
 
-						appStageins(jobid, inFiles, stagingMethod)
-
-						stageOut("wrapper.log", "{stagingMethod}://localhost/{ddir}/{jobid}.info", 
-							mode = WRAPPER_TRANSFER_MODE)
-						
-						if (stdout == null) {
-							// if not stdout requested, only stage on error
-							stageOut("stdout.txt", "{stagingMethod}://localhost/{ddir}/{jobid}.stdout", 
-								mode = STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
-						}
-						else {
-							stageOut("{stdout}", "{stagingMethod}://localhost/{ddir}/{stdout}",
-								mode = STAGING_MODE:IF_PRESENT)
-						}
-						if (stderr == null) {
-							stageOut("stderr.txt", "{stagingMethod}://localhost/{ddir}/{jobid}.stderr",
-								mode = STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
-						}
-						else {
-							stageOut("{stderr}", "{stagingMethod}://localhost/{ddir}/{stderr}",
-								mode = STAGING_MODE:IF_PRESENT)
-						}
-						stageOut("wrapper.error", "{stagingMethod}://localhost/{ddir}/{jobid}.error", 
-							mode = STAGING_MODE:IF_PRESENT)
-						appStageouts(jobid, outFiles, outCollect, stagingMethod)
-						if (!isEmpty(outCollect)) {
-							stageOut("_collect", "{stagingMethod}://localhost/{ddir}/{jobid}.collect", 
-								mode = STAGING_MODE:ALWAYS)
-						}
-						
-						if (CLEANUP_ENABLED) {
-							task:cleanUp(".")
-						}
-					)
+					stageOut("wrapper.log", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.info", 
+						mode = WRAPPER_TRANSFER_MODE)
 					
-					
-					if (!isEmpty(outCollect)) {
-						readCollectList("{ddir}/{jobid}.collect")
+					if (stdout == null) {
+						// if not stdout requested, only stage on error
+						stageOut("stdout.txt", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.stdout", 
+							mode = STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
 					}
 					else {
-						[]
+						stdoutf := getFieldValue(stdout)
+						stageOut(stdoutf, "{stagingMethod}://localhost/{DEBUG_DIR}/{stdoutf}",
+							mode = STAGING_MODE:IF_PRESENT)
 					}
-					
-					log(LOG:DEBUG, "JOB_END jobid={jobid}")
-				}
-				else catch(prev) {
-					if (matches(prev, "^Abort$")) {
-						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
-						throw(prev)
+					if (stderr == null) {
+						stageOut("stderr.txt", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.stderr",
+							mode = STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
 					}
 					else {
-						setProgress(progress, "Failed but can retry")
-						exception := try(exception(readErrorFiles(ddir, jobid, stdout, stderr), prev), prev)
-						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+						stderrf := getFieldValue(stderr)
+						stageOut(stderrf, "{stagingMethod}://localhost/{DEBUG_DIR}/{stderrf}",
+							mode = STAGING_MODE:IF_PRESENT)
+					}
+					stageOut("wrapper.error", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.error", 
+						mode = STAGING_MODE:IF_PRESENT)
+					appStageouts(jobid, outFiles, outCollect, stagingMethod)
+					if (!isEmpty(outCollect)) {
+						stageOut("_collect", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.collect", 
+							mode = STAGING_MODE:ALWAYS)
+					}
 					
-						throw(
-							exception(
-								concat(
-									"Exception in {tr}:",
-									if (arguments != null, "\n    Arguments: {arguments}")
-									"\n    Host: {rhost}",
-									"\n    Directory: {tmpdir}"
-								)
-								exception
+					if (CLEANUP_ENABLED) {
+						task:cleanUp(".")
+					}
+				)
+				
+				
+				if (!isEmpty(outCollect)) {
+					readCollectList("{DEBUG_DIR}/{jobid}.collect")
+				}
+				else {
+					[]
+				}
+				
+				log(LOG:DEBUG, "JOB_END jobid={jobid}")
+			}
+			else catch(prev) {
+				if (matches(prev, "^Abort$")) {
+					log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+					throw(prev)
+				}
+				else {
+					setProgress(progress, "Failed but can retry")
+					exception := try(exception(readErrorFiles(DEBUG_DIR, jobid, stdout, stderr), prev), prev)
+					log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+				
+					throw(
+						exception(
+							concat(
+								"Exception in {tr}:",
+								if (arguments != null, "\n    Arguments: {arguments}")
+								"\n    Host: {rhost}",
+								"\n    Directory: {tmpdir}"
 							)
+							exception
 						)
-					}
+					)
 				}
 			}
 		}

Modified: trunk/libexec/swift-int-wrapper-staging.k
===================================================================
--- trunk/libexec/swift-int-wrapper-staging.k	2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift-int-wrapper-staging.k	2014-05-10 18:05:51 UTC (rev 7851)
@@ -129,178 +129,175 @@
 		
 
 	export(execute2,
-		function(progress, tr, stagein, stageout,
+		function(rhost, progress, tr, stagein, stageout,
 			replicationGroup, replicationChannel,
 			arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
 
-			allocateHost(rhost, constraints=jobConstraints(tr, stagein = stagein)) {
+			ddir := initDDir()
+			wfdir := try {
+				initSharedDir(rhost)
+			}
+			else catch(exception) {
+				throw(exception("Could not initialize shared directory on {rhost}", exception))
+			}
 
-				ddir := initDDir()
-				wfdir := try {
-					initSharedDir(rhost)
-				}
-				else catch(exception) {
-					throw(exception("Could not initialize shared directory on {rhost}", exception))
-				}
+			uid := UID()
+			jobid := "{tr}-{uid}"
+			
+			jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
 
-				uid := UID()
-				jobid := "{tr}-{uid}"
-				
-				jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
+			log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
 
-				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+			statusMode := configProperty("status.mode", host = rhost)
+			wrapperMode := configProperty("wrapper.parameter.mode", host = rhost)
 
-				statusMode := configProperty("status.mode", host = rhost)
-				wrapperMode := configProperty("wrapper.parameter.mode", host = rhost)
+			wrapfile := "{jobdir}/_paramfile"
 
-				wrapfile := "{jobdir}/_paramfile"
+			stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
+			stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
+			
+			(localFileDirs, remoteFileDirs, inFiles, outFiles) := getStagingInfo(stagein, stageout)
 
-				stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
-				stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
-				
-				(localFileDirs, remoteFileDirs, inFiles, outFiles) := getStagingInfo(stagein, stageout)
+			os := siteProfile(rhost, "SYSINFO:OS")
+			
+			scratch := siteProfile(rhost, "scratch")
 
-				os := siteProfile(rhost, "SYSINFO:OS")
-				
-				scratch := siteProfile(rhost, "scratch")
+			if(wrapperMode == "files") {
+				file:write(wrapfile,
+					"-e ",vdl:executable(tr, rhost),
+					"\n-out ", stdout,
+					"\n-err ", stderr,
+					"\n-i ", if (stdin != null, getFieldValue(stdin)),
+					"\n-d ", str:join(remoteFileDirs, "|"),
+					"\n-if ", str:join(remoteFileNames(inFiles), "|"),
+					"\n-of ", str:join(remoteFileNames(outFiles), "|"),
+					"\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+					"\n-sk", SITEDIR_KEEP,
+					"\n-cdmfile ", cdm:file(),
+					"\n-status ", statusMode,
+					for(a, arguments) {
+						"\n-a ", a
+					}
+				)
+			}
+			
 
-				if(wrapperMode == "files") {
-					file:write(wrapfile,
-						"-e ",vdl:executable(tr, rhost),
-						"\n-out ", stdout,
-						"\n-err ", stderr,
-						"\n-i ", if (stdin != null, getFieldValue(stdin)),
-						"\n-d ", str:join(remoteFileDirs, "|"),
-						"\n-if ", str:join(remoteFileNames(inFiles), "|"),
-						"\n-of ", str:join(remoteFileNames(outFiles), "|"),
-						"\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
-						"\n-sk", SITEDIR_KEEP,
-						"\n-cdmfile ", cdm:file(),
-						"\n-status ", statusMode,
-						for(a, arguments) {
-							"\n-a ", a
-						}
-					)
+			setProgress(progress, "Stage in")
+
+			try {
+				if (wrapperMode == "files") {
+					stageWrapperParams(jobid, wrapfile, wfdir, rhost)
 				}
-				
 
-				setProgress(progress, "Stage in")
+				log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " host={rhost}")
 
-				try {
-					if (wrapperMode == "files") {
-						stageWrapperParams(jobid, wrapfile, wfdir, rhost)
-					}
+				setProgress(progress, "Submitting")
 
-					log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " host={rhost}")
-
-					setProgress(progress, "Submitting")
-
-					if (wrapperMode == "files") {
-						swift:execute(
-							progress, 
-							siteProfile(rhost, "swift:wrapperInterpreter"),
-							list(
-								siteProfile(rhost, "swift:wrapperInterpreterOptions"),
-								"_swiftwrap.wrapperstaging",
-								jobid,
-								"-urlprefix", URL_PREFIX,
-								"-jobdir", jobdir,
-								"-scratch", scratch, 
-								"-p", wrapfile
-							)
-							directory=wfdir
-							redirect=false
-							host=rhost
-							TCProfile(rhost, if (attributes != null, attributes = attributes), tr=tr)
-							replicationGroup=replicationGroup
-							replicationChannel=replicationChannel
-							jobid=jobid
+				if (wrapperMode == "files") {
+					swift:execute(
+						progress, 
+						siteProfile(rhost, "swift:wrapperInterpreter"),
+						list(
+							siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+							"_swiftwrap.wrapperstaging",
+							jobid,
+							"-urlprefix", URL_PREFIX,
+							"-jobdir", jobdir,
+							"-scratch", scratch, 
+							"-p", wrapfile
 						)
-					}
-					if (wrapperMode == "args") {
-						swift:execute(
-							siteProfile(rhost, "swift:wrapperInterpreter"),
-							list(
-								siteProfile(rhost, "swift:wrapperInterpreterOptions"),
-								"_swiftwrap.wrapperstaging",
-								jobid,
-								"-urlprefix", URL_PREFIX,
-								"-jobdir", jobdir,
-								"-scratch", scratch,
-								"-e", vdl:executable(tr, rhost),
-								"-out", stdout,
-								"-err", stderr,
-								"-i", if (stdin != null, getFieldValue(stdin)),
-								"-d", str:join(remoteFileDirs, "|"),
-								"-if", str:join(remoteFileNames(inFiles), "|"),
-								"-of", str:join(remoteFileNames(outFiles), "|"),
-								"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
-								"-sk", SITEDIR_KEEP,
-								"-cdmfile", cdm:file(),
-								"-status", statusMode,
-								"-a", if (arguments != null, each(arguments))
-							)
-							directory=wfdir
-							redirect=false
-							host=rhost
-							TCProfile(rhost, if(attributes != null, attributes = attributes), tr=tr)
-							replicationGroup=replicationGroup
-							replicationChannel=replicationChannel
-							jobid=jobid
+						directory=wfdir
+						redirect=false
+						host=rhost
+						TCProfile(rhost, if (attributes != null, attributes = attributes), tr=tr)
+						replicationGroup=replicationGroup
+						replicationChannel=replicationChannel
+						jobid=jobid
+					)
+				}
+				if (wrapperMode == "args") {
+					swift:execute(
+						siteProfile(rhost, "swift:wrapperInterpreter"),
+						list(
+							siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+							"_swiftwrap.wrapperstaging",
+							jobid,
+							"-urlprefix", URL_PREFIX,
+							"-jobdir", jobdir,
+							"-scratch", scratch,
+							"-e", vdl:executable(tr, rhost),
+							"-out", stdout,
+							"-err", stderr,
+							"-i", if (stdin != null, getFieldValue(stdin)),
+							"-d", str:join(remoteFileDirs, "|"),
+							"-if", str:join(remoteFileNames(inFiles), "|"),
+							"-of", str:join(remoteFileNames(outFiles), "|"),
+							"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+							"-sk", SITEDIR_KEEP,
+							"-cdmfile", cdm:file(),
+							"-status", statusMode,
+							"-a", if (arguments != null, each(arguments))
 						)
-					}
-					
-					setProgress(progress, "Checking status")
-					if (statusMode == "files") {
-						checkJobStatus(jobdir, jobid, tr)
-					}
+						directory=wfdir
+						redirect=false
+						host=rhost
+						TCProfile(rhost, if(attributes != null, attributes = attributes), tr=tr)
+						replicationGroup=replicationGroup
+						replicationChannel=replicationChannel
+						jobid=jobid
+					)
+				}
+				
+				setProgress(progress, "Checking status")
+				if (statusMode == "files") {
+					checkJobStatus(jobdir, jobid, tr)
+				}
 
-					if (wrapperMode == "files") {
-						file:remove(wrapfile)
-					}
+				if (wrapperMode == "files") {
+					file:remove(wrapfile)
+				}
 
-					log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+				log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
 
 
-					/* need to stage the files to upper scratch area in case they are not transfered to another site
-					   before all the files get cleaned out */
+				/* need to stage the files to upper scratch area in case they are not transfered to another site
+				   before all the files get cleaned out */
 
 
-					setProgress(progress, "Stage out")
-					doRestartlog(stageout)
-					
-					log(LOG:DEBUG, "JOB_END jobid={jobid}")
+				setProgress(progress, "Stage out")
+				doRestartlog(stageout)
+				
+				log(LOG:DEBUG, "JOB_END jobid={jobid}")
+			}
+			else catch(prev) {
+				if (matches(prev, "^Abort$")) {
+					log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+					throw(prev)
 				}
-				else catch(prev) {
-					if (matches(prev, "^Abort$")) {
-						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
-						throw(prev)
+				else {
+					setProgress(progress, "Failed but can retry")
+					exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
+				
+					log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+					if (matches(exception,".*executable bit.*")) {
+						throw(exception)
 					}
-					else {
-						setProgress(progress, "Failed but can retry")
-						exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
 					
-						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
-
-						if (matches(exception,".*executable bit.*")) {
-							throw(exception)
-						}
-						
-						outs := readStandardFiles(jobdir, stdout, stderr)
-						
-						throw(
-							exception(
-								concat(
-									"Exception in {tr}:",
-									if (arguments != null, "\n    Arguments: {arguments}")
-									"\n    Host: {rhost}",
-									"\n    Directory: {tmpdir}",
-									"{outs}",
-								)
-								exception
+					outs := readStandardFiles(jobdir, stdout, stderr)
+					
+					throw(
+						exception(
+							concat(
+								"Exception in {tr}:",
+								if (arguments != null, "\n    Arguments: {arguments}")
+								"\n    Host: {rhost}",
+								"\n    Directory: {tmpdir}",
+								"{outs}",
 							)
+							exception
 						)
-					}
+					)
 				}
 			}
 		}

Modified: trunk/libexec/swift-int.k
===================================================================
--- trunk/libexec/swift-int.k	2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift-int.k	2014-05-10 18:05:51 UTC (rev 7851)
@@ -10,6 +10,14 @@
 SWIFT:HOME := contextAttribute("SWIFT:HOME")
 SWIFT:DEBUG_DIR_PREFIX := contextAttribute("SWIFT:DEBUG_DIR_PREFIX")
 
+RUN_DIR := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
+SHARED_DIR := dircat(RUN_DIR, "shared")
+DEBUG_DIR := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
+
+if (!file:exists(DEBUG_DIR)) {
+	task:dir:make(DEBUG_DIR)
+}
+
 namespace(swift) {
 
 	rmdir := function(dir, host) {
@@ -32,12 +40,12 @@
 		dir:make(dc, host=host)
 	}
 	
-	checkErrorFile := function(rhost, wfdir, jobid, jobdir) {
-		if (file:exists("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)) {
+	checkErrorFile := function(rhost, jobid, jobdir) {
+		if (file:exists("{RUN_DIR}/status/{jobdir}/{jobid}-error", host=rhost)) {
 			log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
-			task:transfer("{jobid}-error", srchost=rhost, srcdir="{wfdir}/status/{jobdir}")
+			task:transfer("{jobid}-error", srchost=rhost, srcdir="{RUN_DIR}/status/{jobdir}")
 			error := parallel(
-				file:remove("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)
+				file:remove("{RUN_DIR}/status/{jobdir}/{jobid}-error", host=rhost)
 				sequential(
 					str:strip(file:read("{jobid}-error"))
 					file:remove("{jobid}-error")
@@ -51,15 +59,15 @@
 		}
 	}
 
-	checkJobStatus := function(rhost, wfdir, jobid, tr, jobdir) {
+	checkJobStatus := function(rhost, jobid, tr, jobdir) {
 		log(LOG:DEBUG, "START jobid={jobid}")
 		try {
-			file:remove("{wfdir}/status/{jobdir}/{jobid}-success", host=rhost)
+			file:remove("{RUN_DIR}/status/{jobdir}/{jobid}-success", host=rhost)
 			log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
 		}
 		else {
 			msg := try {
-				checkErrorFile(rhost, wfdir, jobid, jobdir)
+				checkErrorFile(rhost, jobid, jobdir)
 			}
 			else {
 				log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
@@ -74,43 +82,29 @@
 			setProgress(progress, "Initializing site shared directory")
 			log(LOG:INFO, "START host={rhost} - Initializing shared directory")
 
-			wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
-			sharedDir := dircat(wfdir, "shared")
+			dir:make(SHARED_DIR, host = rhost)
+			transfer(siteProfile(rhost, "swift:wrapperScript"), srcdir="{SWIFT:HOME}/libexec/", destdir=SHARED_DIR, desthost=rhost)
+			transfer("_swiftseq", srcdir="{SWIFT:HOME}/libexec/", destdir=SHARED_DIR, desthost=rhost)
+			dir:make(dircat(RUN_DIR, "kickstart"), host=rhost)
 
-			dir:make(sharedDir, host = rhost)
-			transfer(siteProfile(rhost, "swift:wrapperScript"), srcdir="{SWIFT:HOME}/libexec/", destdir=sharedDir, desthost=rhost)
-			transfer("_swiftseq", srcdir="{SWIFT:HOME}/libexec/", destdir=sharedDir, desthost=rhost)
-			dir:make(dircat(wfdir, "kickstart"), host=rhost)
-
-			statusMode := configProperty("status.mode",host=rhost)
+			statusMode := configProperty("status.mode", host=rhost)
 			if (statusMode == "files") {
-				dir:make(dircat(wfdir, "status"), host=rhost)
+				dir:make(dircat(RUN_DIR, "status"), host=rhost)
 			}
 
-			wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
+			wrapperMode := configProperty("wrapper.parameter.mode", host=rhost)
 			if (wrapperMode == "files") {
-				dir:make(dircat(wfdir, "parameters"), host=rhost)
+				dir:make(dircat(RUN_DIR, "parameters"), host=rhost)
 			}
 
-			dir:make(dircat(wfdir, "info"), host=rhost)
-			wfdir, sharedDir
+			dir:make(dircat(RUN_DIR, "info"), host=rhost)
+
 			//we send the cleanup data to vdl:main()
-			to(cleanup, list(wfdir, rhost))
+			to(cleanup, list(RUN_DIR, rhost))
 			log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
 		}
 	}
 
-	initDDir := function() {
-		ddir := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
-		
-		once(ddir) {
-			if(!file:exists(ddir)) {
-				task:dir:make(ddir)
-			}
-		}
-		ddir
-	}
-
 	createDirSet := function(jobid, destdir, host, dirs) {
 		/*
 		 * Ideally this would be done by creating a tree of the directories
@@ -308,11 +302,11 @@
 	}
 
 
-	transferWrapperLog := function(rhost, wfdir, jobid, jobdir) {
+	transferWrapperLog := function(rhost, jobid, jobdir) {
 		recfile := "{jobid}-info"
-		srcdir := dircat("{wfdir}/info/", jobdir)
+		srcdir := dircat("{RUN_DIR}/info/", jobdir)
 		try {
-			task:transfer(recfile, srchost=rhost, srcdir=srcdir, destdir="{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d/")
+			task:transfer(recfile, srchost=rhost, srcdir=srcdir, destdir=DEBUG_DIR)
 		}
 		else catch (exception) {
 			maybe(file:remove(recfile))
@@ -339,196 +333,192 @@
 	)
 
 	export(execute2,
-		function(progress, tr, stagein, stageout,
+		function(rhost, progress, tr, stagein, stageout,
 			replicationGroup, replicationChannel,
 			arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
-			
-			allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
-				ddir := initDDir(),
-				(wfdir, sharedDir) := 
-					try {
-						initSharedDir(progress, rhost)
-					}
-					else catch(exception) {
-						throw(exception("Could not initialize shared directory on {rhost}", exception))
-					}
-					
-				uid := UID()
-				jobdir := substring(uid, 0, to=1)
-				jobid := "{tr}-{uid}"
 
-				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
+			try {
+				initSharedDir(progress, rhost)
+			}
+			else catch(exception) {
+				throw(exception("Could not initialize shared directory on {rhost}", exception))
+			}
+				
+			uid := UID()
+			jobdir := substring(uid, 0, to=1)
+			jobid := "{tr}-{uid}"
 
-				statusMode := configProperty("status.mode",host=rhost)
-				wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
+			log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
 
-				wrapfile := "{ddir}/param-{jobid}"
+			statusMode := configProperty("status.mode",host=rhost)
+			wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
 
-				stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
-				stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
-				
-				(fileDirs, inFiles, outFiles, outCollect) := getStagingInfo(stagein, stageout)
+			wrapfile := "{DEBUG_DIR}/param-{jobid}"
 
-				os := siteProfile(rhost, "SYSINFO:OS")
-				
-				if(wrapperMode == "files") {
-					sys:file:write(wrapfile) {
-						"-scratch ", try(siteProfile(rhost, "scratch"), ""),
-						"\n-e ", executable(tr, rhost),
-						"\n-out ", stdout,
-						"\n-err ", stderr,
-						"\n-i ", if (stdin != null, getFieldValue(stdin)),
-						"\n-d ", str:join(fileDirs, "|"),
-						"\n-if ", str:join(remoteFileNames(inFiles), "|"),
-						"\n-of ", str:join(remoteFileNames(outFiles), "|"),
-						"\n-cf ", str:join(remoteFileNames(outCollect), "|"),
-						"\n-cdmfile ", cdm:file(),
-						"\n-status ", statusMode,
-						for(a, arguments) {
-							"\n-a ", a
-						}
+			stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
+			stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
+			
+			(fileDirs, inFiles, outFiles, outCollect) := getStagingInfo(stagein, stageout)
+
+			os := siteProfile(rhost, "SYSINFO:OS")
+			
+			if (wrapperMode == "files") {
+				sys:file:write(wrapfile) {
+					"-scratch ", try(siteProfile(rhost, "scratch"), ""),
+					"\n-e ", executable(tr, rhost),
+					"\n-out ", stdout,
+					"\n-err ", stderr,
+					"\n-i ", if (stdin != null, getFieldValue(stdin)),
+					"\n-d ", str:join(fileDirs, "|"),
+					"\n-if ", str:join(remoteFileNames(inFiles), "|"),
+					"\n-of ", str:join(remoteFileNames(outFiles), "|"),
+					"\n-cf ", str:join(remoteFileNames(outCollect), "|"),
+					"\n-cdmfile ", cdm:file(),
+					"\n-status ", statusMode,
+					for(a, arguments) {
+						"\n-a ", a
 					}
 				}
+			}
 
-				setProgress(progress, "Stage in")
-				tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
+			setProgress(progress, "Stage in")
+			tmpdir := dircat("{RUN_DIR}/jobs/{jobdir}", jobid)
 
-				try {
-					createDirSet(jobid, sharedDir, rhost, fileDirs)
-					doStagein(jobid, sharedDir, rhost, inFiles)
-					if(wrapperMode == "files") {
-						stageWrapperParams(jobid, jobdir, wrapfile, wfdir, rhost)
-					}
+			try {
+				createDirSet(jobid, SHARED_DIR, rhost, fileDirs)
+				doStagein(jobid, SHARED_DIR, rhost, inFiles)
+				if (wrapperMode == "files") {
+					stageWrapperParams(jobid, jobdir, wrapfile, RUN_DIR, rhost)
+				}
 
-					log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " tmpdir={tmpdir} host={rhost}")
+				log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " tmpdir={tmpdir} host={rhost}")
 
-					setProgress(progress, "Submitting")
+				setProgress(progress, "Submitting")
 
-					if (wrapperMode == "files") {
-						swift:execute(
-							progress,
-							siteProfile(rhost, "swift:wrapperInterpreter"),
-							list(
-								siteProfile(rhost, "swift:wrapperInterpreterOptions"),
-								dircat("shared", siteProfile(rhost, "swift:wrapperScript"), os=os),
-								jobid, "-p", jobdir
-							)
-							directory = wfdir
-							redirect = false
-							host = rhost
-							TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
-							replicationGroup = replicationGroup
-							replicationChannel = replicationChannel
-							jobid = jobid
+				if (wrapperMode == "files") {
+					swift:execute(
+						progress,
+						siteProfile(rhost, "swift:wrapperInterpreter"),
+						list(
+							siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+							dircat("shared", siteProfile(rhost, "swift:wrapperScript"), os=os),
+							jobid, "-p", jobdir
 						)
-					}
-					else if (wrapperMode == "args") {
-						swift:execute(
-							progress,
-							siteProfile(rhost, "swift:wrapperInterpreter"),
-							list(
-								siteProfile(rhost, "swift:wrapperInterpreterOptions"),
-								dircat("shared", siteProfile(rhost, "swift:wrapperScript"), os=os),
-								jobid,
-								"-jobdir", jobdir,
-								"-scratch", try(siteProfile(rhost, "scratch"), "")
-								"-e", executable(tr, rhost),
-								"-out", stdout,
-								"-err", stderr,
-								"-i", if (stdin != null, getFieldValue(stdin)),
-								"-d", str:join(fileDirs, "|"),
-								"-if", str:join(remoteFileNames(inFiles), "|"),
-								"-of", str:join(remoteFileNames(outFiles), "|"),
-								"-cf", str:join(remoteFileNames(outCollect), "|"),
-								"-cdmfile", cdm:file(),
-								"-status", statusMode,
-								"-a", if (arguments != null, each(arguments))
-							)
-							directory = wfdir
-							redirect = false
-							host = rhost
-							TCProfile(rhost, if(attributes != null, attributes = attributes), tr = tr)
-							replicationGroup = replicationGroup
-							replicationChannel = replicationChannel
-							jobid = jobid
+						directory = RUN_DIR
+						redirect = false
+						host = rhost
+						TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
+						replicationGroup = replicationGroup
+						replicationChannel = replicationChannel
+						jobid = jobid
+					)
+				}
+				else if (wrapperMode == "args") {
+					swift:execute(
+						progress,
+						siteProfile(rhost, "swift:wrapperInterpreter"),
+						list(
+							siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+							dircat("shared", siteProfile(rhost, "swift:wrapperScript"), os=os),
+							jobid,
+							"-jobdir", jobdir,
+							"-scratch", try(siteProfile(rhost, "scratch"), "")
+							"-e", executable(tr, rhost),
+							"-out", stdout,
+							"-err", stderr,
+							"-i", if (stdin != null, getFieldValue(stdin)),
+							"-d", str:join(fileDirs, "|"),
+							"-if", str:join(remoteFileNames(inFiles), "|"),
+							"-of", str:join(remoteFileNames(outFiles), "|"),
+							"-cf", str:join(remoteFileNames(outCollect), "|"),
+							"-cdmfile", cdm:file(),
+							"-status", statusMode,
+							"-a", if (arguments != null, each(arguments))
 						)
-					}
+						directory = RUN_DIR
+						redirect = false
+						host = rhost
+						TCProfile(rhost, if(attributes != null, attributes = attributes), tr = tr)
+						replicationGroup = replicationGroup
+						replicationChannel = replicationChannel
+						jobid = jobid
+					)
+				}
 
-					setProgress(progress, "Checking status")
-					if (statusMode == "files") {
-						checkJobStatus(rhost, wfdir, jobid, tr, jobdir)
-					}
+				setProgress(progress, "Checking status")
+				if (statusMode == "files") {
+					checkJobStatus(rhost, jobid, tr, jobdir)
+				}
 
-					if (wrapperMode == "files") {
-						file:remove(wrapfile)
-					}
+				if (wrapperMode == "files") {
+					file:remove(wrapfile)
+				}
 
-					log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+				log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
 
 
-					/* need to stage the files to upper scratch area in case they are not transfered to another site
-					   before all the files get cleaned out */
+				/* need to stage the files to upper scratch area in case they are not transfered to another site
+				   before all the files get cleaned out */
 
 
-					setProgress(progress, "Stage out")
-					if (isEmpty(outCollect)) {
-						doStageout(jobid, sharedDir, rhost, outFiles)
-						[] // empty collect list
-					}
-					else {
-						doStageoutCollect(jobid, sharedDir, rhost, outFiles)
-					}
+				setProgress(progress, "Stage out")
+				if (isEmpty(outCollect)) {
+					doStageout(jobid, SHARED_DIR, rhost, outFiles)
+					[] // empty collect list
+				}
+				else {
+					doStageoutCollect(jobid, SHARED_DIR, rhost, outFiles)
+				}
+				
+				if (configProperty("wrapperlog.always.transfer") == "true") {
+					discard(transferWrapperLog(rhost, RUN_DIR, jobid, jobdir))
+				}
+				
+				cacheUnlockFiles(inFiles, SHARED_DIR, rhost) {
+					cleanupFiles(cacheFilesToRemove, rhost)
+				}
 					
-					if (configProperty("wrapperlog.always.transfer") == "true") {
-						discard(transferWrapperLog(rhost, wfdir, jobid, jobdir))
-					}
-					
-					cacheUnlockFiles(inFiles, sharedDir, rhost) {
+				log(LOG:DEBUG, "JOB_END jobid={jobid}")
+			}
+			else catch(prev) {
+				if (matches(prev, "^Abort$")) {
+					log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+					cacheUnlockFiles(inFiles, SHARED_DIR, rhost, force=false) {
 						cleanupFiles(cacheFilesToRemove, rhost)
 					}
-						
-					log(LOG:DEBUG, "JOB_END jobid={jobid}")
+					throw(prev)
 				}
-				else catch(prev) {
-					if (matches(prev, "^Abort$")) {
-						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
-						cacheUnlockFiles(inFiles, sharedDir, rhost, force=false) {
-							cleanupFiles(cacheFilesToRemove, rhost)
-						}
-						throw(prev)
+				else {
+					setProgress(progress, "Failed but can retry")
+					exception := try(exception(checkErrorFile(rhost, jobid, jobdir), prev), prev)
+					
+					log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+					if (matches(exception,".*executable bit.*")) {
+						throw(exception)
 					}
-					else {
-						setProgress(progress, "Failed but can retry")
-						exception := try(exception(checkErrorFile(rhost, wfdir, jobid, jobdir), prev), prev)
-						
-						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
 
-						if (matches(exception,".*executable bit.*")) {
-							throw(exception)
-						}
+					cacheUnlockFiles(inFiles, SHARED_DIR, rhost, force=false) {
+						cleanupFiles(cacheFilesToRemove, rhost)
+					}
 
-						cacheUnlockFiles(inFiles, sharedDir, rhost, force=false) {
-							cleanupFiles(cacheFilesToRemove, rhost)
-						}
+					outs := transferStandardFiles(rhost, tmpdir, jobid, stdout, stderr)
 
-						outs := transferStandardFiles(rhost, tmpdir, jobid, stdout, stderr)
+					discard(maybe(transferWrapperLog(rhost, jobid, jobdir)))
 
-						discard(maybe(transferWrapperLog(rhost, wfdir, jobid, jobdir)))
-
-					
-						throw(
-							exception(
-								concat(
-									"Exception in {tr}:",
-									if (arguments != null, "\n    Arguments: {arguments}")
-									"\n    Host: {rhost}",
-									"\n    Directory: {tmpdir}",
-									"{outs}",
-								)
-								exception
+				
+					throw(
+						exception(
+							concat(
+								"Exception in {tr}:",
+								if (arguments != null, "\n    Arguments: {arguments}")
+								"\n    Host: {rhost}",
+								"\n    Directory: {tmpdir}",
+								"{outs}",
 							)
+							exception
 						)
-					}
+					)
 				}
 			}
 		}

Modified: trunk/libexec/swift-lib.k
===================================================================
--- trunk/libexec/swift-lib.k	2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift-lib.k	2014-05-10 18:05:51 UTC (rev 7851)
@@ -52,6 +52,7 @@
 	export(initProgressState, def("org.griphyn.vdl.karajan.lib.RuntimeStats$InitProgressState"))
 	export(setProgress, def("org.griphyn.vdl.karajan.lib.RuntimeStats$SetProgress"))
 	export(new, def("org.griphyn.vdl.karajan.lib.New"))
+	export(field, def("org.griphyn.vdl.karajan.lib.GetFieldConst"))
 	export(createArray, def("org.griphyn.vdl.karajan.lib.CreateArray"))
 	/* used from VDL2 for arguments to apps and returns relative paths */
 	export(fileName, def("org.griphyn.vdl.karajan.lib.FileName"))

Modified: trunk/libexec/swift.k
===================================================================
--- trunk/libexec/swift.k	2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift.k	2014-05-10 18:05:51 UTC (rev 7851)
@@ -179,7 +179,7 @@
 			attributes = null, 
 			deperror = false, mdeperror = false, 
 			channel(stagein), channel(stageout)) {
-	
+		
 			progress := initProgressState()
 		
 			done := isDone(stageout)
@@ -199,12 +199,14 @@
 									replicationGroup := UID()
 									parallelFor(i, replicationChannel) {
 										try {
-											execute2(
-												progress, 
-												tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
-												stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
-												stagein, stageout, replicationGroup, replicationChannel
-											)
+											allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
+												execute2(
+													rhost, progress, 
+													tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
+													stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+													stagein, stageout, replicationGroup, replicationChannel
+												)
+											}
 										}
 										else catch(exception) {
 											if (matches(exception, "^Abort$")) {
@@ -218,12 +220,14 @@
 								}
 								else {
 									try {
-										execute2(
-											progress, 
-											tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
-											stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
-											stagein, stageout, null, null
-										)
+										allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
+											execute2(
+												rhost, progress, 
+												tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
+												stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+												stagein, stageout, null, null
+											)
+										}
 									}
 									else catch(exception) {
 										if (matches(exception, "^Abort$")) {




More information about the Swift-commit mailing list