[Swift-commit] r7975 - trunk/libexec

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Wed Jul 9 14:38:06 CDT 2014


Author: hategan
Date: 2014-07-09 14:38:06 -0500 (Wed, 09 Jul 2014)
New Revision: 7975

Modified:
   trunk/libexec/scheduler.k
   trunk/libexec/swift-int-staging.k
   trunk/libexec/swift-int-wrapper-staging.k
   trunk/libexec/swift-int.k
   trunk/libexec/swift-lib.k
   trunk/libexec/swift.k
Log:
new config mechanism II

Modified: trunk/libexec/scheduler.k
===================================================================
--- trunk/libexec/scheduler.k	2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/scheduler.k	2014-07-09 19:38:06 UTC (rev 7975)
@@ -2,31 +2,20 @@
 import(task)
 import('swift-lib')
 
-sites := swift:configProperty("sites.file")
-TCFile := swift:configProperty("tc.file")
+config := contextAttribute("SWIFT:CONFIG")
 
-log(LOG:INFO, "Using sites file: {sites}")
-if (!file:exists(sites)) {
-	throw("Could not find sites file: {sites}")
-}
-
-log(LOG:INFO, "Using tc.data: {TCFile}")
-
-scheduler("vds-adaptive", shareID = "swift:scheduler:{sites}"
-	property("transformationCatalogFile", TCFile)
-	property("clusteringEnabled", swift:configProperty("clustering.enabled"))
-	property("clusteringQueueDelay", swift:configProperty("clustering.queue.delay"))
-	property("clusteringMinTime", swift:configProperty("clustering.min.time"))
+scheduler("vds-adaptive", shareID = "swift:scheduler"
+	property("config", config)
 	
-	property("hostSubmitThrottle", swift:configProperty("throttle.host.submit"))
-	property("submitThrottle", swift:configProperty("throttle.submit"))
+	property("hostSubmitThrottle", swift:configProperty("hostJobSubmitThrottle"))
+	property("submitThrottle", swift:configProperty("jobSubmitThrottle"))
 	property("jobsPerCpu", "off")
-	property("maxTransfers", swift:configProperty("throttle.transfers"))
-	property("maxFileOperations", swift:configProperty("throttle.file.operations"))
-	property("jobThrottle", swift:configProperty("throttle.score.job.factor"))
+	property("maxTransfers", swift:configProperty("fileTransfersThrottle"))
+	property("maxFileOperations", swift:configProperty("fileOperationsThrottle"))
+	property("jobThrottle", swift:configProperty("siteScoreThrottlingFactor"))
 	
 	task:availableHandlers(type = "execution", includeAliases = true)
 	task:availableHandlers(type = "file", includeAliases = true)
 	
-	resources = swift:siteCatalog(sites)
+	resources = swift:siteCatalog(config)
 )

Modified: trunk/libexec/swift-int-staging.k
===================================================================
--- trunk/libexec/swift-int-staging.k	2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift-int-staging.k	2014-07-09 19:38:06 UTC (rev 7975)
@@ -11,19 +11,18 @@
 SWIFT:DEBUG_DIR_PREFIX := contextAttribute("SWIFT:DEBUG_DIR_PREFIX")
 
 WRAPPER_TRANSFER_MODE :=
-	if (configProperty("wrapperlog.always.transfer") == "true", 
+	if (configProperty("alwaysTransferWrapperLog"), 
 		STAGING_MODE:IF_PRESENT, STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
 
-pinOption := configProperty("provider.staging.pin.swiftfiles")
+pinOption := configProperty("providerStagingPinSwiftFiles")
 
-PIN := if(pinOption == "true", "pinned:", "")
-PROVENANCE_GRAPH_ENABLED := (configProperty("pgraph") != "false")
-CLEANUP_ENABLED := (configProperty("sitedir.keep") != "true")
+PIN := if(pinOption, "pinned:", "")
+CLEANUP_ENABLED := !configProperty("keepSiteDir")
 
 DEBUG_DIR := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
 CDM_FILE := cdm:file()
 
-namespace(swift) {
+namespace(providerStaging) {
 		
 	fileSizes := function(files) {
 		math:sum(
@@ -31,12 +30,6 @@
 		)
 	}
 
-	export(cleanups,
-		function(cleanup) {
-			log(LOG:INFO, "START cleanups={cleanup}")
-		}
-	)
-
 	readErrorFiles := function(dir, jobid, stdout, stderr) {
 		concat(
 			if(file:exists("{dir}/{jobid}.error")) {
@@ -59,9 +52,8 @@
 	}
 	
 	export(execute2,
-		function(rhost, progress, tr, stagein, stageout,
-			replicationGroup, replicationChannel,
-			arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+		function(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr, 
+			stagein, stageout, replicationGroup, replicationChannel) {
 
 			uid := UID()
 			jobdir := substring(uid, 0, to=1)

Modified: trunk/libexec/swift-int-wrapper-staging.k
===================================================================
--- trunk/libexec/swift-int-wrapper-staging.k	2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift-int-wrapper-staging.k	2014-07-09 19:38:06 UTC (rev 7975)
@@ -2,28 +2,17 @@
 import(task)
 import('swift-lib')
 
+URL_PREFIX := getURLPrefix()
 
-getURLPrefix := def("org.griphyn.vdl.karajan.lib.GetURLPrefix")
+WRAPPER_LOG_ALWAYS_TRANSFER := configProperty("alwaysTransferWrapperLog")
+SWIFT:SCRIPT_NAME := contextAttribute("SWIFT:SCRIPT_NAME")
+SWIFT:RUN_ID := contextAttribute("SWIFT:RUN_ID")
+SWIFT:HOME := contextAttribute("SWIFT:HOME")
+SITEDIR_KEEP := configProperty("keepSiteDir")
 
-URL_PREFIX := getURLPrefix() 
 
-WRAPPERLOG_ALWAYS_TRANSFER := vdl:configProperty("wrapperlog.always.transfer")
-SITEDIR_KEEP := vdl:configProperty("sitedir.keep")
-
-
-namespace(swift) {
+namespace(wrapperStaging) {
 	
-	checkJobStatus := function(jobdir, jobid, tr) {
-		log(LOG:DEBUG, "START jobid={jobid}")
-		try {
-			file:remove("{jobdir}/_swift.success")
-			log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
-		}
-		else {
-			throw(checkErrorFile(jobdir, jobid))
-		}
-	}
-		
 	checkErrorFile := function(jobdir, jobid) {
 		if (file:exists("{jobdir}/_swift.error")) {
 			log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
@@ -36,6 +25,17 @@
 			throw("No status file was found")
 		}
 	}
+	
+	checkJobStatus := function(jobdir, jobid, tr) {
+		log(LOG:DEBUG, "START jobid={jobid}")
+		try {
+			file:remove("{jobdir}/_swift.success")
+			log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
+		}
+		else {
+			throw(checkErrorFile(jobdir, jobid))
+		}
+	}
 
 	initSharedDir := function(progress, rhost) {
 		once(list(rhost, "shared")) {
@@ -43,9 +43,9 @@
 
 			log(LOG:INFO, "START host={rhost} - Initializing shared directory")
 
-			wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+			wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
 			dir:make(wfdir, host=rhost)
-			transfer(srcdir="{swift.home}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
+			transfer(srcdir="{SWIFT:HOME}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
 
 			wfdir
 			to(cleanup, list(wfdir, rhost))
@@ -64,48 +64,14 @@
 		ddir
 	}
 
-	cleanup := function(dir, host) {
-		log(LOG:INFO, "START dir={dir} host={host}")
-		if(vdl:configProperty("sitedir.keep") == "false") {
-			task:execute(
-				vdl:siteprofile(host, "cleanupCommand"),
-				arguments = list(
-					siteProfile(host, "cleanupCommandOptions"),
-					dir
-				)
-				host=host, batch=true, TCProfile(host)
-			)
-			log(LOG:INFO, "END dir={dir} host={host}")
-		}
-	}
+	stageWrapperParams := function(jobid, jobdir, wrapfile, dir, host) {
+		log(LOG:INFO, "START jobid={jobid} - staging in wrapper params"),
+		(provider, srchost, destdir, filename, srcdir) := splitFileURL(wrapfile, dir, destdir="parameters/{jobdir}")
 
-	cleanups := function(cleanup) {
-		log(LOG:INFO, "START cleanups={cleanup}")
-		parallelFor(i, cleanup) {
-			(dir, host) := each(i)
-			try {
-				vdl:cleanup(dir, host)
-			}
-			else catch(exception) {
-				log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
-				to(warnings, exception("Cleanup on {host} failed", exception))
-			}
+		cache(list(destdir, host)) {
+			dir:make(destdir, host=host, provider=provider)
 		}
-		log(LOG:INFO, "END cleanups={cleanup}")
-	}
 
-	stageWrapperParams := function(jobid, wrapfile, dir, host) {
-		log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
-		provider := provider(wrapfile)
-		srchost := hostname(wrapfile)
-		srcdir := vdl:dirname(wrapfile)
-		destdir := dir
-		filename := basename(wrapfile)
-
-		cacheOn(list(destdir, host)
-			dir:make(destdir, host=host, provider=provider)
-		)
-
 		log(LOG:INFO, "END jobid={jobid}")
 	}
 
@@ -129,9 +95,8 @@
 		
 
 	export(execute2,
-		function(rhost, progress, tr, stagein, stageout,
-			replicationGroup, replicationChannel,
-			arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+		function(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr, 
+			stagein, stageout, replicationGroup, replicationChannel) {
 
 			ddir := initDDir()
 			wfdir := try {
@@ -146,10 +111,10 @@
 			
 			jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
 
-			log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+			log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
 
-			statusMode := configProperty("status.mode", host = rhost)
-			wrapperMode := configProperty("wrapper.parameter.mode", host = rhost)
+			statusMode := configProperty("statusMode", host = rhost)
+			wrapperMode := configProperty("wrapperParameterMode", host = rhost)
 
 			wrapfile := "{jobdir}/_paramfile"
 
@@ -163,22 +128,22 @@
 			scratch := siteProfile(rhost, "scratch")
 
 			if(wrapperMode == "files") {
-				file:write(wrapfile,
-					"-e ",vdl:executable(tr, rhost),
+				file:write(wrapfile) {
+					"-e ", executable(tr, rhost),
 					"\n-out ", stdout,
 					"\n-err ", stderr,
 					"\n-i ", if (stdin != null, getFieldValue(stdin)),
 					"\n-d ", str:join(remoteFileDirs, "|"),
 					"\n-if ", str:join(remoteFileNames(inFiles), "|"),
 					"\n-of ", str:join(remoteFileNames(outFiles), "|"),
-					"\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+					"\n-wt", WRAPPER_LOG_ALWAYS_TRANSFER,
 					"\n-sk", SITEDIR_KEEP,
 					"\n-cdmfile ", cdm:file(),
 					"\n-status ", statusMode,
 					for(a, arguments) {
 						"\n-a ", a
 					}
-				)
+				}
 			}
 			
 
@@ -186,7 +151,7 @@
 
 			try {
 				if (wrapperMode == "files") {
-					stageWrapperParams(jobid, wrapfile, wfdir, rhost)
+					stageWrapperParams(jobid, jobdir, wrapfile, wfdir, rhost)
 				}
 
 				log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " host={rhost}")
@@ -225,14 +190,14 @@
 							"-urlprefix", URL_PREFIX,
 							"-jobdir", jobdir,
 							"-scratch", scratch,
-							"-e", vdl:executable(tr, rhost),
+							"-e", executable(tr, rhost),
 							"-out", stdout,
 							"-err", stderr,
 							"-i", if (stdin != null, getFieldValue(stdin)),
 							"-d", str:join(remoteFileDirs, "|"),
 							"-if", str:join(remoteFileNames(inFiles), "|"),
 							"-of", str:join(remoteFileNames(outFiles), "|"),
-							"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+							"-wt", WRAPPER_LOG_ALWAYS_TRANSFER,
 							"-sk", SITEDIR_KEEP,
 							"-cdmfile", cdm:file(),
 							"-status", statusMode,
@@ -265,7 +230,7 @@
 
 
 				setProgress(progress, "Stage out")
-				doRestartlog(stageout)
+				doRestartLog(stageout)
 				
 				log(LOG:DEBUG, "JOB_END jobid={jobid}")
 			}
@@ -292,7 +257,7 @@
 								"Exception in {tr}:",
 								if (arguments != null, "\n    Arguments: {arguments}")
 								"\n    Host: {rhost}",
-								"\n    Directory: {tmpdir}",
+								"\n    Directory: {jobdir}",
 								"{outs}",
 							)
 							exception

Modified: trunk/libexec/swift-int.k
===================================================================
--- trunk/libexec/swift-int.k	2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift-int.k	2014-07-09 19:38:06 UTC (rev 7975)
@@ -14,11 +14,13 @@
 SHARED_DIR := dircat(RUN_DIR, "shared")
 DEBUG_DIR := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
 
+WRAPPER_LOG_ALWAYS_TRANSFER := configProperty("alwaysTransferWrapperLog")
+
 if (!file:exists(DEBUG_DIR)) {
 	task:dir:make(DEBUG_DIR)
 }
 
-namespace(swift) {
+namespace(swiftStaging) {
 
 	rmdir := function(dir, host) {
 		parallelFor(entry, file:list(dir, host=host)) {
@@ -86,12 +88,12 @@
 			transfer(siteProfile(rhost, "wrapperScript"), srcdir="{SWIFT:HOME}/libexec/", destdir=SHARED_DIR, desthost=rhost)
 			transfer("_swiftseq", srcdir="{SWIFT:HOME}/libexec/", destdir=SHARED_DIR, desthost=rhost)
 
-			statusMode := configProperty("status.mode", host=rhost)
+			statusMode := configProperty("statusMode", host=rhost)
 			if (statusMode == "files") {
 				dir:make(dircat(RUN_DIR, "status"), host=rhost)
 			}
 
-			wrapperMode := configProperty("wrapper.parameter.mode", host=rhost)
+			wrapperMode := configProperty("wrapperParameterMode", host=rhost)
 			if (wrapperMode == "files") {
 				dir:make(dircat(RUN_DIR, "parameters"), host=rhost)
 			}
@@ -121,30 +123,6 @@
 		log(LOG:INFO, "END jobid={jobid} - Done initializing directory structure")
 	}
 
-	cleanup := function(dir, host) {
-		log(LOG:INFO, "START dir={dir} host={host}")
-		cdmfile := cdm:file()
-		log(LOG:DEBUG, "cdmfile {cdmfile}")
-		if (cdmfile != "" & cdm:get("GATHER_DIR") != "UNSET") {
-			log(LOG:INFO, "submitting cdm_cleanup.sh to {dir}")
-			task:transfer("cdm_cleanup.sh",
-				srcdir="{SWIFT:HOME}/libexec",
-				desthost=host, destdir=dir)
-			task:transfer("cdm_lib.sh",
-				srcdir="{SWIFT:HOME}/libexec",
-				desthost=host, destdir=dir)
-		        log(LOG:INFO, "execute: cdm_cleanup.sh")
-			task:execute("/bin/bash", list("{dir}/cdm_cleanup.sh", cdm:get("GATHER_DIR"), cdm:get("GATHER_TARGET"), UID())
-				host=host, batch=true, TCProfile(host))
-		}
-		if (swift:configProperty("sitedir.keep") == "false") {
-			task:execute(siteProfile(host, "cleanupCommand"),
-				list(siteProfile(host, "cleanupCommandOptions"), dir)
-				host=host, batch=true, TCProfile(host))
-		}
-		log(LOG:INFO, "END dir={dir} host={host}")
-	}
-
 	cleanupFiles := function(files, host) {
 		parallelFor(r, files) {
 			log(LOG:INFO, "Purging ", r, " on ", host)
@@ -313,28 +291,10 @@
 		}
 		recfile
 	}
-	
-	export(cleanups,
-		function(cleanup) {
-			log(LOG:INFO, "START cleanups={cleanup}")
-			parallelFor(i, cleanup) {
-				(dir, host) := each(i)
-				try {
-					cleanup(dir, host)
-				}
-				else catch(exception) {
-					log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
-					to(warnings, exception("Cleanup on {host} failed", exception))
-				}
-			}
-			log(LOG:INFO, "END cleanups={cleanup}")
-		}
-	)
 
 	export(execute2,
-		function(rhost, progress, tr, stagein, stageout,
-			replicationGroup, replicationChannel,
-			arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+		function(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr, 
+			stagein, stageout, replicationGroup, replicationChannel) {
 
 			try {
 				initSharedDir(progress, rhost)
@@ -349,8 +309,8 @@
 
 			log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
 
-			statusMode := configProperty("status.mode",host=rhost)
-			wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
+			statusMode := configProperty("statusMode", host=rhost)
+			wrapperMode := configProperty("wrapperParameterMode", host=rhost)
 
 			wrapfile := "{DEBUG_DIR}/param-{jobid}"
 
@@ -469,7 +429,7 @@
 					doStageoutCollect(jobid, SHARED_DIR, rhost, outFiles)
 				}
 				
-				if (configProperty("wrapperlog.always.transfer") == "true") {
+				if (WRAPPER_LOG_ALWAYS_TRANSFER) {
 					discard(transferWrapperLog(rhost, jobid, jobdir))
 				}
 				

Modified: trunk/libexec/swift-lib.k
===================================================================
--- trunk/libexec/swift-lib.k	2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift-lib.k	2014-07-09 19:38:06 UTC (rev 7975)
@@ -139,6 +139,8 @@
 	export(unwrapClosedList, def("org.griphyn.vdl.karajan.lib.UnwrapClosedList"))
 	
 	export(siteCatalog, def("org.griphyn.vdl.karajan.lib.SiteCatalog"))
+	
+	export(getURLPrefix, def("org.griphyn.vdl.karajan.lib.GetURLPrefix"))
 }
 
 namespace(cdm) {

Modified: trunk/libexec/swift.k
===================================================================
--- trunk/libexec/swift.k	2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift.k	2014-07-09 19:38:06 UTC (rev 7975)
@@ -5,33 +5,69 @@
 import('swift-lib', export = true)
 import('swift-xs', export = true)
 
+import('swift-int')
+import('swift-int-staging')
+import('swift-int-wrapper-staging')
+
 SWIFT:SCRIPT_NAME := contextAttribute("SWIFT:SCRIPT_NAME")
 SWIFT:RUN_ID := contextAttribute("SWIFT:RUN_ID")
 SWIFT:HOME := contextAttribute("SWIFT:HOME")
+SITEDIR_KEEP := configProperty("keepSiteDir")
 
-namespace(swift) {
-
-	pstaging := (configProperty("use.provider.staging") == "true")
-	wstaging := (configProperty("use.wrapper.staging") == "true")
-	
-	impl := if (pstaging, "swift-int-staging.k", if (wstaging, "swift-int-wrapper-staging.k", "swift-int.k"))
-	
-	import(file = impl)
-	
-	import(java)
-
+namespace(swift) {	
 	export(stageIn, def("org.griphyn.vdl.karajan.lib.Stagein"))
 	export(stageOut, def("org.griphyn.vdl.karajan.lib.Stageout"))
 
 	export(parameterLog,
 		function(direction, variable, id) {
-			if (configProperty("provenance.log") == "true") {
+			if (configProperty("logProvenance")) {
 				thread := currentThread()
 				log("info", "PARAM thread={thread} direction={direction} variable={variable} provenanceid={id}")
 			}
 		}
 	)
 	
+	cleanup := function(dir, host) {
+		log(LOG:INFO, "START dir={dir} host={host}")
+		cdmfile := cdm:file()
+		log(LOG:DEBUG, "cdmfile {cdmfile}")
+		if (cdmfile != "" & cdm:get("GATHER_DIR") != "UNSET") {
+			log(LOG:INFO, "submitting cdm_cleanup.sh to {dir}")
+			task:transfer("cdm_cleanup.sh",
+				srcdir="{SWIFT:HOME}/libexec",
+				desthost=host, destdir=dir)
+			task:transfer("cdm_lib.sh",
+				srcdir="{SWIFT:HOME}/libexec",
+				desthost=host, destdir=dir)
+		        log(LOG:INFO, "execute: cdm_cleanup.sh")
+			task:execute("/bin/bash", list("{dir}/cdm_cleanup.sh", cdm:get("GATHER_DIR"), cdm:get("GATHER_TARGET"), UID())
+				host=host, batch=true, TCProfile(host))
+		}
+		if (!SITEDIR_KEEP) {
+			task:execute(siteProfile(host, "cleanupCommand"),
+				list(siteProfile(host, "cleanupCommandOptions"), dir)
+				host=host, batch=true, TCProfile(host))
+		}
+		log(LOG:INFO, "END dir={dir} host={host}")
+	}
+	
+	export(cleanups,
+		function(cleanup) {
+			log(LOG:INFO, "START cleanups={cleanup}")
+			parallelFor(i, cleanup) {
+				(dir, host) := each(i)
+				try {
+					cleanup(dir, host)
+				}
+				else catch(exception) {
+					log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
+					to(warnings, exception("Cleanup on {host} failed", exception))
+				}
+			}
+			log(LOG:INFO, "END cleanups={cleanup}")
+		}
+	)
+	
 	export(split,
 		function(var) {
 			each(str:split(getFieldValue(var), " "))
@@ -92,8 +128,27 @@
 				to(cleanup, unique(for(c, cleanup, c)))
 			)
 		}
-	)	
+	)
 	
+	executeSelect := function(rhost, progress, tr, arguments, attributes,
+		stdin, stdout, stderr, stagein, stageout, replicationGroup, replicationChannel) {
+		
+		staging := configProperty("staging", host = rhost)
+		
+		if (staging == "swift") {
+			swiftStaging:execute2(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+				stagein, stageout, replicationGroup, replicationChannel)
+		}
+		else if (staging == "provider") {
+			providerStaging:execute2(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+				stagein, stageout, replicationGroup, replicationChannel)
+		}
+		else {
+			wrapperStaging:execute2(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+				stagein, stageout, replicationGroup, replicationChannel)
+		}
+	}
+	
 	export(execute,
 		function(
 			tr, arguments = null, 
@@ -113,8 +168,8 @@
 					try {
 						throttled {
 							setProgress(progress, "Selecting site")
-							collectList := restartOnError(number(swift:configProperty("execution.retries"))) {
-								if (swift:configProperty("replication.enabled") == "true") {
+							collectList := restartOnError(swift:configProperty("executionRetries")) {
+								if (swift:configProperty("replicationEnabled")) {
 									replicationChannel := channel:new()
 									//trigger the first job
 									discard(append(replicationChannel, true)) 
@@ -122,10 +177,11 @@
 									parallelFor(i, replicationChannel) {
 										try {
 											allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
-												execute2(
+												executeSelect(
 													rhost, progress, 
-													tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
-													stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+													tr, if(arguments != null, unwrapClosedList(arguments), []),
+													attributes,
+													stdin, stdout, stderr, 
 													stagein, stageout, replicationGroup, replicationChannel
 												)
 											}
@@ -143,10 +199,11 @@
 								else {
 									try {
 										allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
-											execute2(
+											executeSelect(
 												rhost, progress, 
-												tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
-												stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+												tr, if(arguments != null, unwrapClosedList(arguments), []),
+												attributes,
+												stdin, stdout, stderr, 
 												stagein, stageout, null, null
 											)
 										}
@@ -169,7 +226,7 @@
 					else catch(exception) {
 						log(LOG:INFO, "END_FAILURE thread=", currentThread(), " tr={tr}")
 						setProgress(progress, "Failed")
-						if(swift:configProperty("lazy.errors") == "false") {
+						if(!swift:configProperty("lazyErrors")) {
 							throw(exception)
 						}
 						else {




More information about the Swift-commit mailing list