[Swift-commit] r7975 - trunk/libexec
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Wed Jul 9 14:38:06 CDT 2014
Author: hategan
Date: 2014-07-09 14:38:06 -0500 (Wed, 09 Jul 2014)
New Revision: 7975
Modified:
trunk/libexec/scheduler.k
trunk/libexec/swift-int-staging.k
trunk/libexec/swift-int-wrapper-staging.k
trunk/libexec/swift-int.k
trunk/libexec/swift-lib.k
trunk/libexec/swift.k
Log:
new config mechanism II
Modified: trunk/libexec/scheduler.k
===================================================================
--- trunk/libexec/scheduler.k 2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/scheduler.k 2014-07-09 19:38:06 UTC (rev 7975)
@@ -2,31 +2,20 @@
import(task)
import('swift-lib')
-sites := swift:configProperty("sites.file")
-TCFile := swift:configProperty("tc.file")
+config := contextAttribute("SWIFT:CONFIG")
-log(LOG:INFO, "Using sites file: {sites}")
-if (!file:exists(sites)) {
- throw("Could not find sites file: {sites}")
-}
-
-log(LOG:INFO, "Using tc.data: {TCFile}")
-
-scheduler("vds-adaptive", shareID = "swift:scheduler:{sites}"
- property("transformationCatalogFile", TCFile)
- property("clusteringEnabled", swift:configProperty("clustering.enabled"))
- property("clusteringQueueDelay", swift:configProperty("clustering.queue.delay"))
- property("clusteringMinTime", swift:configProperty("clustering.min.time"))
+scheduler("vds-adaptive", shareID = "swift:scheduler"
+ property("config", config)
- property("hostSubmitThrottle", swift:configProperty("throttle.host.submit"))
- property("submitThrottle", swift:configProperty("throttle.submit"))
+ property("hostSubmitThrottle", swift:configProperty("hostJobSubmitThrottle"))
+ property("submitThrottle", swift:configProperty("jobSubmitThrottle"))
property("jobsPerCpu", "off")
- property("maxTransfers", swift:configProperty("throttle.transfers"))
- property("maxFileOperations", swift:configProperty("throttle.file.operations"))
- property("jobThrottle", swift:configProperty("throttle.score.job.factor"))
+ property("maxTransfers", swift:configProperty("fileTransfersThrottle"))
+ property("maxFileOperations", swift:configProperty("fileOperationsThrottle"))
+ property("jobThrottle", swift:configProperty("siteScoreThrottlingFactor"))
task:availableHandlers(type = "execution", includeAliases = true)
task:availableHandlers(type = "file", includeAliases = true)
- resources = swift:siteCatalog(sites)
+ resources = swift:siteCatalog(config)
)
Modified: trunk/libexec/swift-int-staging.k
===================================================================
--- trunk/libexec/swift-int-staging.k 2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift-int-staging.k 2014-07-09 19:38:06 UTC (rev 7975)
@@ -11,19 +11,18 @@
SWIFT:DEBUG_DIR_PREFIX := contextAttribute("SWIFT:DEBUG_DIR_PREFIX")
WRAPPER_TRANSFER_MODE :=
- if (configProperty("wrapperlog.always.transfer") == "true",
+ if (configProperty("alwaysTransferWrapperLog"),
STAGING_MODE:IF_PRESENT, STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
-pinOption := configProperty("provider.staging.pin.swiftfiles")
+pinOption := configProperty("providerStagingPinSwiftFiles")
-PIN := if(pinOption == "true", "pinned:", "")
-PROVENANCE_GRAPH_ENABLED := (configProperty("pgraph") != "false")
-CLEANUP_ENABLED := (configProperty("sitedir.keep") != "true")
+PIN := if(pinOption, "pinned:", "")
+CLEANUP_ENABLED := !configProperty("keepSiteDir")
DEBUG_DIR := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
CDM_FILE := cdm:file()
-namespace(swift) {
+namespace(providerStaging) {
fileSizes := function(files) {
math:sum(
@@ -31,12 +30,6 @@
)
}
- export(cleanups,
- function(cleanup) {
- log(LOG:INFO, "START cleanups={cleanup}")
- }
- )
-
readErrorFiles := function(dir, jobid, stdout, stderr) {
concat(
if(file:exists("{dir}/{jobid}.error")) {
@@ -59,9 +52,8 @@
}
export(execute2,
- function(rhost, progress, tr, stagein, stageout,
- replicationGroup, replicationChannel,
- arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+ function(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+ stagein, stageout, replicationGroup, replicationChannel) {
uid := UID()
jobdir := substring(uid, 0, to=1)
Modified: trunk/libexec/swift-int-wrapper-staging.k
===================================================================
--- trunk/libexec/swift-int-wrapper-staging.k 2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift-int-wrapper-staging.k 2014-07-09 19:38:06 UTC (rev 7975)
@@ -2,28 +2,17 @@
import(task)
import('swift-lib')
+URL_PREFIX := getURLPrefix()
-getURLPrefix := def("org.griphyn.vdl.karajan.lib.GetURLPrefix")
+WRAPPER_LOG_ALWAYS_TRANSFER := configProperty("alwaysTransferWrapperLog")
+SWIFT:SCRIPT_NAME := contextAttribute("SWIFT:SCRIPT_NAME")
+SWIFT:RUN_ID := contextAttribute("SWIFT:RUN_ID")
+SWIFT:HOME := contextAttribute("SWIFT:HOME")
+SITEDIR_KEEP := configProperty("keepSiteDir")
-URL_PREFIX := getURLPrefix()
-WRAPPERLOG_ALWAYS_TRANSFER := vdl:configProperty("wrapperlog.always.transfer")
-SITEDIR_KEEP := vdl:configProperty("sitedir.keep")
-
-
-namespace(swift) {
+namespace(wrapperStaging) {
- checkJobStatus := function(jobdir, jobid, tr) {
- log(LOG:DEBUG, "START jobid={jobid}")
- try {
- file:remove("{jobdir}/_swift.success")
- log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
- }
- else {
- throw(checkErrorFile(jobdir, jobid))
- }
- }
-
checkErrorFile := function(jobdir, jobid) {
if (file:exists("{jobdir}/_swift.error")) {
log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
@@ -36,6 +25,17 @@
throw("No status file was found")
}
}
+
+ checkJobStatus := function(jobdir, jobid, tr) {
+ log(LOG:DEBUG, "START jobid={jobid}")
+ try {
+ file:remove("{jobdir}/_swift.success")
+ log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
+ }
+ else {
+ throw(checkErrorFile(jobdir, jobid))
+ }
+ }
initSharedDir := function(progress, rhost) {
once(list(rhost, "shared")) {
@@ -43,9 +43,9 @@
log(LOG:INFO, "START host={rhost} - Initializing shared directory")
- wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+ wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
dir:make(wfdir, host=rhost)
- transfer(srcdir="{swift.home}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
+ transfer(srcdir="{SWIFT:HOME}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
wfdir
to(cleanup, list(wfdir, rhost))
@@ -64,48 +64,14 @@
ddir
}
- cleanup := function(dir, host) {
- log(LOG:INFO, "START dir={dir} host={host}")
- if(vdl:configProperty("sitedir.keep") == "false") {
- task:execute(
- vdl:siteprofile(host, "cleanupCommand"),
- arguments = list(
- siteProfile(host, "cleanupCommandOptions"),
- dir
- )
- host=host, batch=true, TCProfile(host)
- )
- log(LOG:INFO, "END dir={dir} host={host}")
- }
- }
+ stageWrapperParams := function(jobid, jobdir, wrapfile, dir, host) {
+ log(LOG:INFO, "START jobid={jobid} - staging in wrapper params"),
+ (provider, srchost, destdir, filename, srcdir) := splitFileURL(wrapfile, dir, destdir="parameters/{jobdir}")
- cleanups := function(cleanup) {
- log(LOG:INFO, "START cleanups={cleanup}")
- parallelFor(i, cleanup) {
- (dir, host) := each(i)
- try {
- vdl:cleanup(dir, host)
- }
- else catch(exception) {
- log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
- to(warnings, exception("Cleanup on {host} failed", exception))
- }
+ cache(list(destdir, host)) {
+ dir:make(destdir, host=host, provider=provider)
}
- log(LOG:INFO, "END cleanups={cleanup}")
- }
- stageWrapperParams := function(jobid, wrapfile, dir, host) {
- log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
- provider := provider(wrapfile)
- srchost := hostname(wrapfile)
- srcdir := vdl:dirname(wrapfile)
- destdir := dir
- filename := basename(wrapfile)
-
- cacheOn(list(destdir, host)
- dir:make(destdir, host=host, provider=provider)
- )
-
log(LOG:INFO, "END jobid={jobid}")
}
@@ -129,9 +95,8 @@
export(execute2,
- function(rhost, progress, tr, stagein, stageout,
- replicationGroup, replicationChannel,
- arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+ function(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+ stagein, stageout, replicationGroup, replicationChannel) {
ddir := initDDir()
wfdir := try {
@@ -146,10 +111,10 @@
jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
- log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
- statusMode := configProperty("status.mode", host = rhost)
- wrapperMode := configProperty("wrapper.parameter.mode", host = rhost)
+ statusMode := configProperty("statusMode", host = rhost)
+ wrapperMode := configProperty("wrapperParameterMode", host = rhost)
wrapfile := "{jobdir}/_paramfile"
@@ -163,22 +128,22 @@
scratch := siteProfile(rhost, "scratch")
if(wrapperMode == "files") {
- file:write(wrapfile,
- "-e ",vdl:executable(tr, rhost),
+ file:write(wrapfile) {
+ "-e ", executable(tr, rhost),
"\n-out ", stdout,
"\n-err ", stderr,
"\n-i ", if (stdin != null, getFieldValue(stdin)),
"\n-d ", str:join(remoteFileDirs, "|"),
"\n-if ", str:join(remoteFileNames(inFiles), "|"),
"\n-of ", str:join(remoteFileNames(outFiles), "|"),
- "\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+ "\n-wt", WRAPPER_LOG_ALWAYS_TRANSFER,
"\n-sk", SITEDIR_KEEP,
"\n-cdmfile ", cdm:file(),
"\n-status ", statusMode,
for(a, arguments) {
"\n-a ", a
}
- )
+ }
}
@@ -186,7 +151,7 @@
try {
if (wrapperMode == "files") {
- stageWrapperParams(jobid, wrapfile, wfdir, rhost)
+ stageWrapperParams(jobid, jobdir, wrapfile, wfdir, rhost)
}
log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " host={rhost}")
@@ -225,14 +190,14 @@
"-urlprefix", URL_PREFIX,
"-jobdir", jobdir,
"-scratch", scratch,
- "-e", vdl:executable(tr, rhost),
+ "-e", executable(tr, rhost),
"-out", stdout,
"-err", stderr,
"-i", if (stdin != null, getFieldValue(stdin)),
"-d", str:join(remoteFileDirs, "|"),
"-if", str:join(remoteFileNames(inFiles), "|"),
"-of", str:join(remoteFileNames(outFiles), "|"),
- "-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+ "-wt", WRAPPER_LOG_ALWAYS_TRANSFER,
"-sk", SITEDIR_KEEP,
"-cdmfile", cdm:file(),
"-status", statusMode,
@@ -265,7 +230,7 @@
setProgress(progress, "Stage out")
- doRestartlog(stageout)
+ doRestartLog(stageout)
log(LOG:DEBUG, "JOB_END jobid={jobid}")
}
@@ -292,7 +257,7 @@
"Exception in {tr}:",
if (arguments != null, "\n Arguments: {arguments}")
"\n Host: {rhost}",
- "\n Directory: {tmpdir}",
+ "\n Directory: {jobdir}",
"{outs}",
)
exception
Modified: trunk/libexec/swift-int.k
===================================================================
--- trunk/libexec/swift-int.k 2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift-int.k 2014-07-09 19:38:06 UTC (rev 7975)
@@ -14,11 +14,13 @@
SHARED_DIR := dircat(RUN_DIR, "shared")
DEBUG_DIR := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
+WRAPPER_LOG_ALWAYS_TRANSFER := configProperty("alwaysTransferWrapperLog")
+
if (!file:exists(DEBUG_DIR)) {
task:dir:make(DEBUG_DIR)
}
-namespace(swift) {
+namespace(swiftStaging) {
rmdir := function(dir, host) {
parallelFor(entry, file:list(dir, host=host)) {
@@ -86,12 +88,12 @@
transfer(siteProfile(rhost, "wrapperScript"), srcdir="{SWIFT:HOME}/libexec/", destdir=SHARED_DIR, desthost=rhost)
transfer("_swiftseq", srcdir="{SWIFT:HOME}/libexec/", destdir=SHARED_DIR, desthost=rhost)
- statusMode := configProperty("status.mode", host=rhost)
+ statusMode := configProperty("statusMode", host=rhost)
if (statusMode == "files") {
dir:make(dircat(RUN_DIR, "status"), host=rhost)
}
- wrapperMode := configProperty("wrapper.parameter.mode", host=rhost)
+ wrapperMode := configProperty("wrapperParameterMode", host=rhost)
if (wrapperMode == "files") {
dir:make(dircat(RUN_DIR, "parameters"), host=rhost)
}
@@ -121,30 +123,6 @@
log(LOG:INFO, "END jobid={jobid} - Done initializing directory structure")
}
- cleanup := function(dir, host) {
- log(LOG:INFO, "START dir={dir} host={host}")
- cdmfile := cdm:file()
- log(LOG:DEBUG, "cdmfile {cdmfile}")
- if (cdmfile != "" & cdm:get("GATHER_DIR") != "UNSET") {
- log(LOG:INFO, "submitting cdm_cleanup.sh to {dir}")
- task:transfer("cdm_cleanup.sh",
- srcdir="{SWIFT:HOME}/libexec",
- desthost=host, destdir=dir)
- task:transfer("cdm_lib.sh",
- srcdir="{SWIFT:HOME}/libexec",
- desthost=host, destdir=dir)
- log(LOG:INFO, "execute: cdm_cleanup.sh")
- task:execute("/bin/bash", list("{dir}/cdm_cleanup.sh", cdm:get("GATHER_DIR"), cdm:get("GATHER_TARGET"), UID())
- host=host, batch=true, TCProfile(host))
- }
- if (swift:configProperty("sitedir.keep") == "false") {
- task:execute(siteProfile(host, "cleanupCommand"),
- list(siteProfile(host, "cleanupCommandOptions"), dir)
- host=host, batch=true, TCProfile(host))
- }
- log(LOG:INFO, "END dir={dir} host={host}")
- }
-
cleanupFiles := function(files, host) {
parallelFor(r, files) {
log(LOG:INFO, "Purging ", r, " on ", host)
@@ -313,28 +291,10 @@
}
recfile
}
-
- export(cleanups,
- function(cleanup) {
- log(LOG:INFO, "START cleanups={cleanup}")
- parallelFor(i, cleanup) {
- (dir, host) := each(i)
- try {
- cleanup(dir, host)
- }
- else catch(exception) {
- log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
- to(warnings, exception("Cleanup on {host} failed", exception))
- }
- }
- log(LOG:INFO, "END cleanups={cleanup}")
- }
- )
export(execute2,
- function(rhost, progress, tr, stagein, stageout,
- replicationGroup, replicationChannel,
- arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+ function(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+ stagein, stageout, replicationGroup, replicationChannel) {
try {
initSharedDir(progress, rhost)
@@ -349,8 +309,8 @@
log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
- statusMode := configProperty("status.mode",host=rhost)
- wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
+ statusMode := configProperty("statusMode", host=rhost)
+ wrapperMode := configProperty("wrapperParameterMode", host=rhost)
wrapfile := "{DEBUG_DIR}/param-{jobid}"
@@ -469,7 +429,7 @@
doStageoutCollect(jobid, SHARED_DIR, rhost, outFiles)
}
- if (configProperty("wrapperlog.always.transfer") == "true") {
+ if (WRAPPER_LOG_ALWAYS_TRANSFER) {
discard(transferWrapperLog(rhost, jobid, jobdir))
}
Modified: trunk/libexec/swift-lib.k
===================================================================
--- trunk/libexec/swift-lib.k 2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift-lib.k 2014-07-09 19:38:06 UTC (rev 7975)
@@ -139,6 +139,8 @@
export(unwrapClosedList, def("org.griphyn.vdl.karajan.lib.UnwrapClosedList"))
export(siteCatalog, def("org.griphyn.vdl.karajan.lib.SiteCatalog"))
+
+ export(getURLPrefix, def("org.griphyn.vdl.karajan.lib.GetURLPrefix"))
}
namespace(cdm) {
Modified: trunk/libexec/swift.k
===================================================================
--- trunk/libexec/swift.k 2014-07-09 19:37:52 UTC (rev 7974)
+++ trunk/libexec/swift.k 2014-07-09 19:38:06 UTC (rev 7975)
@@ -5,33 +5,69 @@
import('swift-lib', export = true)
import('swift-xs', export = true)
+import('swift-int')
+import('swift-int-staging')
+import('swift-int-wrapper-staging')
+
SWIFT:SCRIPT_NAME := contextAttribute("SWIFT:SCRIPT_NAME")
SWIFT:RUN_ID := contextAttribute("SWIFT:RUN_ID")
SWIFT:HOME := contextAttribute("SWIFT:HOME")
+SITEDIR_KEEP := configProperty("keepSiteDir")
-namespace(swift) {
-
- pstaging := (configProperty("use.provider.staging") == "true")
- wstaging := (configProperty("use.wrapper.staging") == "true")
-
- impl := if (pstaging, "swift-int-staging.k", if (wstaging, "swift-int-wrapper-staging.k", "swift-int.k"))
-
- import(file = impl)
-
- import(java)
-
+namespace(swift) {
export(stageIn, def("org.griphyn.vdl.karajan.lib.Stagein"))
export(stageOut, def("org.griphyn.vdl.karajan.lib.Stageout"))
export(parameterLog,
function(direction, variable, id) {
- if (configProperty("provenance.log") == "true") {
+ if (configProperty("logProvenance")) {
thread := currentThread()
log("info", "PARAM thread={thread} direction={direction} variable={variable} provenanceid={id}")
}
}
)
+ cleanup := function(dir, host) {
+ log(LOG:INFO, "START dir={dir} host={host}")
+ cdmfile := cdm:file()
+ log(LOG:DEBUG, "cdmfile {cdmfile}")
+ if (cdmfile != "" & cdm:get("GATHER_DIR") != "UNSET") {
+ log(LOG:INFO, "submitting cdm_cleanup.sh to {dir}")
+ task:transfer("cdm_cleanup.sh",
+ srcdir="{SWIFT:HOME}/libexec",
+ desthost=host, destdir=dir)
+ task:transfer("cdm_lib.sh",
+ srcdir="{SWIFT:HOME}/libexec",
+ desthost=host, destdir=dir)
+ log(LOG:INFO, "execute: cdm_cleanup.sh")
+ task:execute("/bin/bash", list("{dir}/cdm_cleanup.sh", cdm:get("GATHER_DIR"), cdm:get("GATHER_TARGET"), UID())
+ host=host, batch=true, TCProfile(host))
+ }
+ if (!SITEDIR_KEEP) {
+ task:execute(siteProfile(host, "cleanupCommand"),
+ list(siteProfile(host, "cleanupCommandOptions"), dir)
+ host=host, batch=true, TCProfile(host))
+ }
+ log(LOG:INFO, "END dir={dir} host={host}")
+ }
+
+ export(cleanups,
+ function(cleanup) {
+ log(LOG:INFO, "START cleanups={cleanup}")
+ parallelFor(i, cleanup) {
+ (dir, host) := each(i)
+ try {
+ cleanup(dir, host)
+ }
+ else catch(exception) {
+ log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
+ to(warnings, exception("Cleanup on {host} failed", exception))
+ }
+ }
+ log(LOG:INFO, "END cleanups={cleanup}")
+ }
+ )
+
export(split,
function(var) {
each(str:split(getFieldValue(var), " "))
@@ -92,8 +128,27 @@
to(cleanup, unique(for(c, cleanup, c)))
)
}
- )
+ )
+ executeSelect := function(rhost, progress, tr, arguments, attributes,
+ stdin, stdout, stderr, stagein, stageout, replicationGroup, replicationChannel) {
+
+ staging := configProperty("staging", host = rhost)
+
+ if (staging == "swift") {
+ swiftStaging:execute2(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+ stagein, stageout, replicationGroup, replicationChannel)
+ }
+ else if (staging == "provider") {
+ providerStaging:execute2(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+ stagein, stageout, replicationGroup, replicationChannel)
+ }
+ else {
+ wrapperStaging:execute2(rhost, progress, tr, arguments, attributes, stdin, stdout, stderr,
+ stagein, stageout, replicationGroup, replicationChannel)
+ }
+ }
+
export(execute,
function(
tr, arguments = null,
@@ -113,8 +168,8 @@
try {
throttled {
setProgress(progress, "Selecting site")
- collectList := restartOnError(number(swift:configProperty("execution.retries"))) {
- if (swift:configProperty("replication.enabled") == "true") {
+ collectList := restartOnError(swift:configProperty("executionRetries")) {
+ if (swift:configProperty("replicationEnabled")) {
replicationChannel := channel:new()
//trigger the first job
discard(append(replicationChannel, true))
@@ -122,10 +177,11 @@
parallelFor(i, replicationChannel) {
try {
allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
- execute2(
+ executeSelect(
rhost, progress,
- tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
- stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+ tr, if(arguments != null, unwrapClosedList(arguments), []),
+ attributes,
+ stdin, stdout, stderr,
stagein, stageout, replicationGroup, replicationChannel
)
}
@@ -143,10 +199,11 @@
else {
try {
allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
- execute2(
+ executeSelect(
rhost, progress,
- tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
- stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+ tr, if(arguments != null, unwrapClosedList(arguments), []),
+ attributes,
+ stdin, stdout, stderr,
stagein, stageout, null, null
)
}
@@ -169,7 +226,7 @@
else catch(exception) {
log(LOG:INFO, "END_FAILURE thread=", currentThread(), " tr={tr}")
setProgress(progress, "Failed")
- if(swift:configProperty("lazy.errors") == "false") {
+ if(!swift:configProperty("lazyErrors")) {
throw(exception)
}
else {
More information about the Swift-commit
mailing list