[Swift-commit] r7851 - trunk/libexec
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Sat May 10 13:05:54 CDT 2014
Author: hategan
Date: 2014-05-10 13:05:51 -0500 (Sat, 10 May 2014)
New Revision: 7851
Modified:
trunk/libexec/swift-int-staging.k
trunk/libexec/swift-int-wrapper-staging.k
trunk/libexec/swift-int.k
trunk/libexec/swift-lib.k
trunk/libexec/swift.k
Log:
interned fields; data type reorganization; propagation of lazy failures in internal functions
Modified: trunk/libexec/swift-int-staging.k
===================================================================
--- trunk/libexec/swift-int-staging.k 2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift-int-staging.k 2014-05-10 18:05:51 UTC (rev 7851)
@@ -20,11 +20,10 @@
PROVENANCE_GRAPH_ENABLED := (configProperty("pgraph") != "false")
CLEANUP_ENABLED := (configProperty("sitedir.keep") != "true")
-namespace(swift) {
+DEBUG_DIR := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
+CDM_FILE := cdm:file()
- initDDir := function() {
- "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
- }
+namespace(swift) {
fileSizes := function(files) {
math:sum(
@@ -60,140 +59,136 @@
}
export(execute2,
- function(progress, tr, stagein, stageout,
+ function(rhost, progress, tr, stagein, stageout,
replicationGroup, replicationChannel,
arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
-
- allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
- ddir := initDDir()
+ uid := UID()
+ jobdir := substring(uid, 0, to=1)
+ jobid := "{tr}-{uid}"
- uid := UID()
- jobdir := substring(uid, 0, to=1)
- jobid := "{tr}-{uid}"
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
+
+ wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
+ tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid),
+
+ (fileDirs, inFiles, outFiles, outCollect) := getStagingInfo(stagein, stageout)
- log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
-
- wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
- tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
- cdmfile := cdm:file(),
-
- (fileDirs, inFiles, outFiles, outCollect) := getStagingInfo(stagein, stageout)
+ try {
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)),
+ " tmpdir={tmpdir} host={rhost}")
- try {
- log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)),
- " tmpdir={tmpdir} host={rhost}")
+ setProgress(progress, "Submitting")
- setProgress(progress, "Submitting")
+ swift:execute(
+ progress,
+ siteProfile(rhost, "swift:wrapperInterpreter"),
+ list(
+ siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+ "_swiftwrap.staging",
+ "-e", executable(tr, rhost),
+ "-out", if(stdout == null, "stdout.txt", getFieldValue(stdout)),
+ "-err", if(stderr == null, "stderr.txt", getFieldValue(stderr)),
+ "-i", if (stdin != null, getFieldValue(stdin)),
+ "-d", str:join(fileDirs, "|"),
+ "-if", str:join(remoteFileNames(inFiles), "|"),
+ "-of", str:join(remoteFileNames(outFiles), "|"),
+ "-cf", str:join(remoteFileNames(outCollect), "|"),
+ "-cdmfile", CDM_FILE,
+ "-status", "provider"
+ "-a", if (arguments != null, each(arguments))
+ )
+ directory = tmpdir
+ redirect = false
+ host = rhost
- swift:execute(
- progress,
- siteProfile(rhost, "swift:wrapperInterpreter"),
- list(
- siteProfile(rhost, "swift:wrapperInterpreterOptions"),
- "_swiftwrap.staging",
- "-e", executable(tr, rhost),
- "-out", if(stdout == null, "stdout.txt", getFieldValue(stdout)),
- "-err", if(stderr == null, "stderr.txt", getFieldValue(stderr)),
- "-i", if (stdin != null, getFieldValue(stdin)),
- "-d", str:join(fileDirs, "|"),
- "-if", str:join(remoteFileNames(inFiles), "|"),
- "-of", str:join(remoteFileNames(outFiles), "|"),
- "-cf", str:join(remoteFileNames(outCollect), "|"),
- "-cdmfile", cdmfile,
- "-status", "provider"
- "-a", if (arguments != null, each(arguments))
- )
- directory = tmpdir
- redirect = false
- host = rhost
+ TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
+ replicationGroup = replicationGroup
+ replicationChannel = replicationChannel
+ jobid = jobid
+
+ stagingMethod := siteProfile(rhost, "swift:stagingMethod", default="proxy")
- TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
- replicationGroup = replicationGroup
- replicationChannel = replicationChannel
- jobid = jobid
-
- stagingMethod := siteProfile(rhost, "swift:stagingMethod", default="proxy")
+ stageIn("{PIN}{stagingMethod}://localhost/{SWIFT:HOME}/libexec/_swiftwrap.staging", "_swiftwrap.staging")
- stageIn("{PIN}{stagingMethod}://localhost/{SWIFT:HOME}/libexec/_swiftwrap.staging", "_swiftwrap.staging")
+ if (CDM_FILE != "") {
+ d := swift:dirname(CDM_FILE)
+ file := basename(CDM_FILE)
+ dir := if (d == "", "./", "{d}/")
+ loc := "{PIN}{stagingMethod}://localhost/"
+ stageIn("{loc}{dir}{file}", CDM_FILE)
+ stageIn("{loc}{SWIFT:HOME}/libexec/cdm.pl", "cdm.pl")
+ stageIn("{loc}{SWIFT:HOME}/libexec/cdm_lib.sh", "cdm_lib.sh")
+ }
- if (cdmfile != "") {
- d := swift:dirname(cdmfile)
- file := basename(cdmfile)
- dir := if (d == "", "./", "{d}/")
- loc := "{PIN}{stagingMethod}://localhost/"
- stageIn("{loc}{dir}{file}", cdmfile)
- stageIn("{loc}{SWIFT:HOME}/libexec/cdm.pl", "cdm.pl")
- stageIn("{loc}{SWIFT:HOME}/libexec/cdm_lib.sh", "cdm_lib.sh")
- }
+ appStageins(jobid, inFiles, stagingMethod)
- appStageins(jobid, inFiles, stagingMethod)
-
- stageOut("wrapper.log", "{stagingMethod}://localhost/{ddir}/{jobid}.info",
- mode = WRAPPER_TRANSFER_MODE)
-
- if (stdout == null) {
- // if not stdout requested, only stage on error
- stageOut("stdout.txt", "{stagingMethod}://localhost/{ddir}/{jobid}.stdout",
- mode = STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
- }
- else {
- stageOut("{stdout}", "{stagingMethod}://localhost/{ddir}/{stdout}",
- mode = STAGING_MODE:IF_PRESENT)
- }
- if (stderr == null) {
- stageOut("stderr.txt", "{stagingMethod}://localhost/{ddir}/{jobid}.stderr",
- mode = STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
- }
- else {
- stageOut("{stderr}", "{stagingMethod}://localhost/{ddir}/{stderr}",
- mode = STAGING_MODE:IF_PRESENT)
- }
- stageOut("wrapper.error", "{stagingMethod}://localhost/{ddir}/{jobid}.error",
- mode = STAGING_MODE:IF_PRESENT)
- appStageouts(jobid, outFiles, outCollect, stagingMethod)
- if (!isEmpty(outCollect)) {
- stageOut("_collect", "{stagingMethod}://localhost/{ddir}/{jobid}.collect",
- mode = STAGING_MODE:ALWAYS)
- }
-
- if (CLEANUP_ENABLED) {
- task:cleanUp(".")
- }
- )
+ stageOut("wrapper.log", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.info",
+ mode = WRAPPER_TRANSFER_MODE)
-
- if (!isEmpty(outCollect)) {
- readCollectList("{ddir}/{jobid}.collect")
+ if (stdout == null) {
+ // if not stdout requested, only stage on error
+ stageOut("stdout.txt", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.stdout",
+ mode = STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
}
else {
- []
+ stdoutf := getFieldValue(stdout)
+ stageOut(stdoutf, "{stagingMethod}://localhost/{DEBUG_DIR}/{stdoutf}",
+ mode = STAGING_MODE:IF_PRESENT)
}
-
- log(LOG:DEBUG, "JOB_END jobid={jobid}")
- }
- else catch(prev) {
- if (matches(prev, "^Abort$")) {
- log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
- throw(prev)
+ if (stderr == null) {
+ stageOut("stderr.txt", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.stderr",
+ mode = STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
}
else {
- setProgress(progress, "Failed but can retry")
- exception := try(exception(readErrorFiles(ddir, jobid, stdout, stderr), prev), prev)
- log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+ stderrf := getFieldValue(stderr)
+ stageOut(stderrf, "{stagingMethod}://localhost/{DEBUG_DIR}/{stderrf}",
+ mode = STAGING_MODE:IF_PRESENT)
+ }
+ stageOut("wrapper.error", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.error",
+ mode = STAGING_MODE:IF_PRESENT)
+ appStageouts(jobid, outFiles, outCollect, stagingMethod)
+ if (!isEmpty(outCollect)) {
+ stageOut("_collect", "{stagingMethod}://localhost/{DEBUG_DIR}/{jobid}.collect",
+ mode = STAGING_MODE:ALWAYS)
+ }
- throw(
- exception(
- concat(
- "Exception in {tr}:",
- if (arguments != null, "\n Arguments: {arguments}")
- "\n Host: {rhost}",
- "\n Directory: {tmpdir}"
- )
- exception
+ if (CLEANUP_ENABLED) {
+ task:cleanUp(".")
+ }
+ )
+
+
+ if (!isEmpty(outCollect)) {
+ readCollectList("{DEBUG_DIR}/{jobid}.collect")
+ }
+ else {
+ []
+ }
+
+ log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ }
+ else catch(prev) {
+ if (matches(prev, "^Abort$")) {
+ log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+ throw(prev)
+ }
+ else {
+ setProgress(progress, "Failed but can retry")
+ exception := try(exception(readErrorFiles(DEBUG_DIR, jobid, stdout, stderr), prev), prev)
+ log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+ throw(
+ exception(
+ concat(
+ "Exception in {tr}:",
+ if (arguments != null, "\n Arguments: {arguments}")
+ "\n Host: {rhost}",
+ "\n Directory: {tmpdir}"
)
+ exception
)
- }
+ )
}
}
}
Modified: trunk/libexec/swift-int-wrapper-staging.k
===================================================================
--- trunk/libexec/swift-int-wrapper-staging.k 2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift-int-wrapper-staging.k 2014-05-10 18:05:51 UTC (rev 7851)
@@ -129,178 +129,175 @@
export(execute2,
- function(progress, tr, stagein, stageout,
+ function(rhost, progress, tr, stagein, stageout,
replicationGroup, replicationChannel,
arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
- allocateHost(rhost, constraints=jobConstraints(tr, stagein = stagein)) {
+ ddir := initDDir()
+ wfdir := try {
+ initSharedDir(rhost)
+ }
+ else catch(exception) {
+ throw(exception("Could not initialize shared directory on {rhost}", exception))
+ }
- ddir := initDDir()
- wfdir := try {
- initSharedDir(rhost)
- }
- else catch(exception) {
- throw(exception("Could not initialize shared directory on {rhost}", exception))
- }
+ uid := UID()
+ jobid := "{tr}-{uid}"
+
+ jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
- uid := UID()
- jobid := "{tr}-{uid}"
-
- jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
- log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+ statusMode := configProperty("status.mode", host = rhost)
+ wrapperMode := configProperty("wrapper.parameter.mode", host = rhost)
- statusMode := configProperty("status.mode", host = rhost)
- wrapperMode := configProperty("wrapper.parameter.mode", host = rhost)
+ wrapfile := "{jobdir}/_paramfile"
- wrapfile := "{jobdir}/_paramfile"
+ stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
+ stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
+
+ (localFileDirs, remoteFileDirs, inFiles, outFiles) := getStagingInfo(stagein, stageout)
- stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
- stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
-
- (localFileDirs, remoteFileDirs, inFiles, outFiles) := getStagingInfo(stagein, stageout)
+ os := siteProfile(rhost, "SYSINFO:OS")
+
+ scratch := siteProfile(rhost, "scratch")
- os := siteProfile(rhost, "SYSINFO:OS")
-
- scratch := siteProfile(rhost, "scratch")
+ if(wrapperMode == "files") {
+ file:write(wrapfile,
+ "-e ",vdl:executable(tr, rhost),
+ "\n-out ", stdout,
+ "\n-err ", stderr,
+ "\n-i ", if (stdin != null, getFieldValue(stdin)),
+ "\n-d ", str:join(remoteFileDirs, "|"),
+ "\n-if ", str:join(remoteFileNames(inFiles), "|"),
+ "\n-of ", str:join(remoteFileNames(outFiles), "|"),
+ "\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+ "\n-sk", SITEDIR_KEEP,
+ "\n-cdmfile ", cdm:file(),
+ "\n-status ", statusMode,
+ for(a, arguments) {
+ "\n-a ", a
+ }
+ )
+ }
+
- if(wrapperMode == "files") {
- file:write(wrapfile,
- "-e ",vdl:executable(tr, rhost),
- "\n-out ", stdout,
- "\n-err ", stderr,
- "\n-i ", if (stdin != null, getFieldValue(stdin)),
- "\n-d ", str:join(remoteFileDirs, "|"),
- "\n-if ", str:join(remoteFileNames(inFiles), "|"),
- "\n-of ", str:join(remoteFileNames(outFiles), "|"),
- "\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
- "\n-sk", SITEDIR_KEEP,
- "\n-cdmfile ", cdm:file(),
- "\n-status ", statusMode,
- for(a, arguments) {
- "\n-a ", a
- }
- )
+ setProgress(progress, "Stage in")
+
+ try {
+ if (wrapperMode == "files") {
+ stageWrapperParams(jobid, wrapfile, wfdir, rhost)
}
-
- setProgress(progress, "Stage in")
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " host={rhost}")
- try {
- if (wrapperMode == "files") {
- stageWrapperParams(jobid, wrapfile, wfdir, rhost)
- }
+ setProgress(progress, "Submitting")
- log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " host={rhost}")
-
- setProgress(progress, "Submitting")
-
- if (wrapperMode == "files") {
- swift:execute(
- progress,
- siteProfile(rhost, "swift:wrapperInterpreter"),
- list(
- siteProfile(rhost, "swift:wrapperInterpreterOptions"),
- "_swiftwrap.wrapperstaging",
- jobid,
- "-urlprefix", URL_PREFIX,
- "-jobdir", jobdir,
- "-scratch", scratch,
- "-p", wrapfile
- )
- directory=wfdir
- redirect=false
- host=rhost
- TCProfile(rhost, if (attributes != null, attributes = attributes), tr=tr)
- replicationGroup=replicationGroup
- replicationChannel=replicationChannel
- jobid=jobid
+ if (wrapperMode == "files") {
+ swift:execute(
+ progress,
+ siteProfile(rhost, "swift:wrapperInterpreter"),
+ list(
+ siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+ "_swiftwrap.wrapperstaging",
+ jobid,
+ "-urlprefix", URL_PREFIX,
+ "-jobdir", jobdir,
+ "-scratch", scratch,
+ "-p", wrapfile
)
- }
- if (wrapperMode == "args") {
- swift:execute(
- siteProfile(rhost, "swift:wrapperInterpreter"),
- list(
- siteProfile(rhost, "swift:wrapperInterpreterOptions"),
- "_swiftwrap.wrapperstaging",
- jobid,
- "-urlprefix", URL_PREFIX,
- "-jobdir", jobdir,
- "-scratch", scratch,
- "-e", vdl:executable(tr, rhost),
- "-out", stdout,
- "-err", stderr,
- "-i", if (stdin != null, getFieldValue(stdin)),
- "-d", str:join(remoteFileDirs, "|"),
- "-if", str:join(remoteFileNames(inFiles), "|"),
- "-of", str:join(remoteFileNames(outFiles), "|"),
- "-wt", WRAPPERLOG_ALWAYS_TRANSFER,
- "-sk", SITEDIR_KEEP,
- "-cdmfile", cdm:file(),
- "-status", statusMode,
- "-a", if (arguments != null, each(arguments))
- )
- directory=wfdir
- redirect=false
- host=rhost
- TCProfile(rhost, if(attributes != null, attributes = attributes), tr=tr)
- replicationGroup=replicationGroup
- replicationChannel=replicationChannel
- jobid=jobid
+ directory=wfdir
+ redirect=false
+ host=rhost
+ TCProfile(rhost, if (attributes != null, attributes = attributes), tr=tr)
+ replicationGroup=replicationGroup
+ replicationChannel=replicationChannel
+ jobid=jobid
+ )
+ }
+ if (wrapperMode == "args") {
+ swift:execute(
+ siteProfile(rhost, "swift:wrapperInterpreter"),
+ list(
+ siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+ "_swiftwrap.wrapperstaging",
+ jobid,
+ "-urlprefix", URL_PREFIX,
+ "-jobdir", jobdir,
+ "-scratch", scratch,
+ "-e", vdl:executable(tr, rhost),
+ "-out", stdout,
+ "-err", stderr,
+ "-i", if (stdin != null, getFieldValue(stdin)),
+ "-d", str:join(remoteFileDirs, "|"),
+ "-if", str:join(remoteFileNames(inFiles), "|"),
+ "-of", str:join(remoteFileNames(outFiles), "|"),
+ "-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+ "-sk", SITEDIR_KEEP,
+ "-cdmfile", cdm:file(),
+ "-status", statusMode,
+ "-a", if (arguments != null, each(arguments))
)
- }
-
- setProgress(progress, "Checking status")
- if (statusMode == "files") {
- checkJobStatus(jobdir, jobid, tr)
- }
+ directory=wfdir
+ redirect=false
+ host=rhost
+ TCProfile(rhost, if(attributes != null, attributes = attributes), tr=tr)
+ replicationGroup=replicationGroup
+ replicationChannel=replicationChannel
+ jobid=jobid
+ )
+ }
+
+ setProgress(progress, "Checking status")
+ if (statusMode == "files") {
+ checkJobStatus(jobdir, jobid, tr)
+ }
- if (wrapperMode == "files") {
- file:remove(wrapfile)
- }
+ if (wrapperMode == "files") {
+ file:remove(wrapfile)
+ }
- log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+ log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
- /* need to stage the files to upper scratch area in case they are not transfered to another site
- before all the files get cleaned out */
+ /* need to stage the files to upper scratch area in case they are not transfered to another site
+ before all the files get cleaned out */
- setProgress(progress, "Stage out")
- doRestartlog(stageout)
-
- log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ setProgress(progress, "Stage out")
+ doRestartlog(stageout)
+
+ log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ }
+ else catch(prev) {
+ if (matches(prev, "^Abort$")) {
+ log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+ throw(prev)
}
- else catch(prev) {
- if (matches(prev, "^Abort$")) {
- log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
- throw(prev)
+ else {
+ setProgress(progress, "Failed but can retry")
+ exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
+
+ log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+ if (matches(exception,".*executable bit.*")) {
+ throw(exception)
}
- else {
- setProgress(progress, "Failed but can retry")
- exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
- log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
-
- if (matches(exception,".*executable bit.*")) {
- throw(exception)
- }
-
- outs := readStandardFiles(jobdir, stdout, stderr)
-
- throw(
- exception(
- concat(
- "Exception in {tr}:",
- if (arguments != null, "\n Arguments: {arguments}")
- "\n Host: {rhost}",
- "\n Directory: {tmpdir}",
- "{outs}",
- )
- exception
+ outs := readStandardFiles(jobdir, stdout, stderr)
+
+ throw(
+ exception(
+ concat(
+ "Exception in {tr}:",
+ if (arguments != null, "\n Arguments: {arguments}")
+ "\n Host: {rhost}",
+ "\n Directory: {tmpdir}",
+ "{outs}",
)
+ exception
)
- }
+ )
}
}
}
Modified: trunk/libexec/swift-int.k
===================================================================
--- trunk/libexec/swift-int.k 2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift-int.k 2014-05-10 18:05:51 UTC (rev 7851)
@@ -10,6 +10,14 @@
SWIFT:HOME := contextAttribute("SWIFT:HOME")
SWIFT:DEBUG_DIR_PREFIX := contextAttribute("SWIFT:DEBUG_DIR_PREFIX")
+RUN_DIR := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
+SHARED_DIR := dircat(RUN_DIR, "shared")
+DEBUG_DIR := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
+
+if (!file:exists(DEBUG_DIR)) {
+ task:dir:make(DEBUG_DIR)
+}
+
namespace(swift) {
rmdir := function(dir, host) {
@@ -32,12 +40,12 @@
dir:make(dc, host=host)
}
- checkErrorFile := function(rhost, wfdir, jobid, jobdir) {
- if (file:exists("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)) {
+ checkErrorFile := function(rhost, jobid, jobdir) {
+ if (file:exists("{RUN_DIR}/status/{jobdir}/{jobid}-error", host=rhost)) {
log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
- task:transfer("{jobid}-error", srchost=rhost, srcdir="{wfdir}/status/{jobdir}")
+ task:transfer("{jobid}-error", srchost=rhost, srcdir="{RUN_DIR}/status/{jobdir}")
error := parallel(
- file:remove("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)
+ file:remove("{RUN_DIR}/status/{jobdir}/{jobid}-error", host=rhost)
sequential(
str:strip(file:read("{jobid}-error"))
file:remove("{jobid}-error")
@@ -51,15 +59,15 @@
}
}
- checkJobStatus := function(rhost, wfdir, jobid, tr, jobdir) {
+ checkJobStatus := function(rhost, jobid, tr, jobdir) {
log(LOG:DEBUG, "START jobid={jobid}")
try {
- file:remove("{wfdir}/status/{jobdir}/{jobid}-success", host=rhost)
+ file:remove("{RUN_DIR}/status/{jobdir}/{jobid}-success", host=rhost)
log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
}
else {
msg := try {
- checkErrorFile(rhost, wfdir, jobid, jobdir)
+ checkErrorFile(rhost, jobid, jobdir)
}
else {
log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
@@ -74,43 +82,29 @@
setProgress(progress, "Initializing site shared directory")
log(LOG:INFO, "START host={rhost} - Initializing shared directory")
- wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
- sharedDir := dircat(wfdir, "shared")
+ dir:make(SHARED_DIR, host = rhost)
+ transfer(siteProfile(rhost, "swift:wrapperScript"), srcdir="{SWIFT:HOME}/libexec/", destdir=SHARED_DIR, desthost=rhost)
+ transfer("_swiftseq", srcdir="{SWIFT:HOME}/libexec/", destdir=SHARED_DIR, desthost=rhost)
+ dir:make(dircat(RUN_DIR, "kickstart"), host=rhost)
- dir:make(sharedDir, host = rhost)
- transfer(siteProfile(rhost, "swift:wrapperScript"), srcdir="{SWIFT:HOME}/libexec/", destdir=sharedDir, desthost=rhost)
- transfer("_swiftseq", srcdir="{SWIFT:HOME}/libexec/", destdir=sharedDir, desthost=rhost)
- dir:make(dircat(wfdir, "kickstart"), host=rhost)
-
- statusMode := configProperty("status.mode",host=rhost)
+ statusMode := configProperty("status.mode", host=rhost)
if (statusMode == "files") {
- dir:make(dircat(wfdir, "status"), host=rhost)
+ dir:make(dircat(RUN_DIR, "status"), host=rhost)
}
- wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
+ wrapperMode := configProperty("wrapper.parameter.mode", host=rhost)
if (wrapperMode == "files") {
- dir:make(dircat(wfdir, "parameters"), host=rhost)
+ dir:make(dircat(RUN_DIR, "parameters"), host=rhost)
}
- dir:make(dircat(wfdir, "info"), host=rhost)
- wfdir, sharedDir
+ dir:make(dircat(RUN_DIR, "info"), host=rhost)
+
//we send the cleanup data to vdl:main()
- to(cleanup, list(wfdir, rhost))
+ to(cleanup, list(RUN_DIR, rhost))
log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
}
}
- initDDir := function() {
- ddir := "{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
-
- once(ddir) {
- if(!file:exists(ddir)) {
- task:dir:make(ddir)
- }
- }
- ddir
- }
-
createDirSet := function(jobid, destdir, host, dirs) {
/*
* Ideally this would be done by creating a tree of the directories
@@ -308,11 +302,11 @@
}
- transferWrapperLog := function(rhost, wfdir, jobid, jobdir) {
+ transferWrapperLog := function(rhost, jobid, jobdir) {
recfile := "{jobid}-info"
- srcdir := dircat("{wfdir}/info/", jobdir)
+ srcdir := dircat("{RUN_DIR}/info/", jobdir)
try {
- task:transfer(recfile, srchost=rhost, srcdir=srcdir, destdir="{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d/")
+ task:transfer(recfile, srchost=rhost, srcdir=srcdir, destdir=DEBUG_DIR)
}
else catch (exception) {
maybe(file:remove(recfile))
@@ -339,196 +333,192 @@
)
export(execute2,
- function(progress, tr, stagein, stageout,
+ function(rhost, progress, tr, stagein, stageout,
replicationGroup, replicationChannel,
arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
-
- allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
- ddir := initDDir(),
- (wfdir, sharedDir) :=
- try {
- initSharedDir(progress, rhost)
- }
- else catch(exception) {
- throw(exception("Could not initialize shared directory on {rhost}", exception))
- }
-
- uid := UID()
- jobdir := substring(uid, 0, to=1)
- jobid := "{tr}-{uid}"
- log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
+ try {
+ initSharedDir(progress, rhost)
+ }
+ else catch(exception) {
+ throw(exception("Could not initialize shared directory on {rhost}", exception))
+ }
+
+ uid := UID()
+ jobdir := substring(uid, 0, to=1)
+ jobid := "{tr}-{uid}"
- statusMode := configProperty("status.mode",host=rhost)
- wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
- wrapfile := "{ddir}/param-{jobid}"
+ statusMode := configProperty("status.mode",host=rhost)
+ wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
- stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
- stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
-
- (fileDirs, inFiles, outFiles, outCollect) := getStagingInfo(stagein, stageout)
+ wrapfile := "{DEBUG_DIR}/param-{jobid}"
- os := siteProfile(rhost, "SYSINFO:OS")
-
- if(wrapperMode == "files") {
- sys:file:write(wrapfile) {
- "-scratch ", try(siteProfile(rhost, "scratch"), ""),
- "\n-e ", executable(tr, rhost),
- "\n-out ", stdout,
- "\n-err ", stderr,
- "\n-i ", if (stdin != null, getFieldValue(stdin)),
- "\n-d ", str:join(fileDirs, "|"),
- "\n-if ", str:join(remoteFileNames(inFiles), "|"),
- "\n-of ", str:join(remoteFileNames(outFiles), "|"),
- "\n-cf ", str:join(remoteFileNames(outCollect), "|"),
- "\n-cdmfile ", cdm:file(),
- "\n-status ", statusMode,
- for(a, arguments) {
- "\n-a ", a
- }
+ stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
+ stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
+
+ (fileDirs, inFiles, outFiles, outCollect) := getStagingInfo(stagein, stageout)
+
+ os := siteProfile(rhost, "SYSINFO:OS")
+
+ if (wrapperMode == "files") {
+ sys:file:write(wrapfile) {
+ "-scratch ", try(siteProfile(rhost, "scratch"), ""),
+ "\n-e ", executable(tr, rhost),
+ "\n-out ", stdout,
+ "\n-err ", stderr,
+ "\n-i ", if (stdin != null, getFieldValue(stdin)),
+ "\n-d ", str:join(fileDirs, "|"),
+ "\n-if ", str:join(remoteFileNames(inFiles), "|"),
+ "\n-of ", str:join(remoteFileNames(outFiles), "|"),
+ "\n-cf ", str:join(remoteFileNames(outCollect), "|"),
+ "\n-cdmfile ", cdm:file(),
+ "\n-status ", statusMode,
+ for(a, arguments) {
+ "\n-a ", a
}
}
+ }
- setProgress(progress, "Stage in")
- tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
+ setProgress(progress, "Stage in")
+ tmpdir := dircat("{RUN_DIR}/jobs/{jobdir}", jobid)
- try {
- createDirSet(jobid, sharedDir, rhost, fileDirs)
- doStagein(jobid, sharedDir, rhost, inFiles)
- if(wrapperMode == "files") {
- stageWrapperParams(jobid, jobdir, wrapfile, wfdir, rhost)
- }
+ try {
+ createDirSet(jobid, SHARED_DIR, rhost, fileDirs)
+ doStagein(jobid, SHARED_DIR, rhost, inFiles)
+ if (wrapperMode == "files") {
+ stageWrapperParams(jobid, jobdir, wrapfile, RUN_DIR, rhost)
+ }
- log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " tmpdir={tmpdir} host={rhost}")
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " tmpdir={tmpdir} host={rhost}")
- setProgress(progress, "Submitting")
+ setProgress(progress, "Submitting")
- if (wrapperMode == "files") {
- swift:execute(
- progress,
- siteProfile(rhost, "swift:wrapperInterpreter"),
- list(
- siteProfile(rhost, "swift:wrapperInterpreterOptions"),
- dircat("shared", siteProfile(rhost, "swift:wrapperScript"), os=os),
- jobid, "-p", jobdir
- )
- directory = wfdir
- redirect = false
- host = rhost
- TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
- replicationGroup = replicationGroup
- replicationChannel = replicationChannel
- jobid = jobid
+ if (wrapperMode == "files") {
+ swift:execute(
+ progress,
+ siteProfile(rhost, "swift:wrapperInterpreter"),
+ list(
+ siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+ dircat("shared", siteProfile(rhost, "swift:wrapperScript"), os=os),
+ jobid, "-p", jobdir
)
- }
- else if (wrapperMode == "args") {
- swift:execute(
- progress,
- siteProfile(rhost, "swift:wrapperInterpreter"),
- list(
- siteProfile(rhost, "swift:wrapperInterpreterOptions"),
- dircat("shared", siteProfile(rhost, "swift:wrapperScript"), os=os),
- jobid,
- "-jobdir", jobdir,
- "-scratch", try(siteProfile(rhost, "scratch"), "")
- "-e", executable(tr, rhost),
- "-out", stdout,
- "-err", stderr,
- "-i", if (stdin != null, getFieldValue(stdin)),
- "-d", str:join(fileDirs, "|"),
- "-if", str:join(remoteFileNames(inFiles), "|"),
- "-of", str:join(remoteFileNames(outFiles), "|"),
- "-cf", str:join(remoteFileNames(outCollect), "|"),
- "-cdmfile", cdm:file(),
- "-status", statusMode,
- "-a", if (arguments != null, each(arguments))
- )
- directory = wfdir
- redirect = false
- host = rhost
- TCProfile(rhost, if(attributes != null, attributes = attributes), tr = tr)
- replicationGroup = replicationGroup
- replicationChannel = replicationChannel
- jobid = jobid
+ directory = RUN_DIR
+ redirect = false
+ host = rhost
+ TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
+ replicationGroup = replicationGroup
+ replicationChannel = replicationChannel
+ jobid = jobid
+ )
+ }
+ else if (wrapperMode == "args") {
+ swift:execute(
+ progress,
+ siteProfile(rhost, "swift:wrapperInterpreter"),
+ list(
+ siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+ dircat("shared", siteProfile(rhost, "swift:wrapperScript"), os=os),
+ jobid,
+ "-jobdir", jobdir,
+ "-scratch", try(siteProfile(rhost, "scratch"), "")
+ "-e", executable(tr, rhost),
+ "-out", stdout,
+ "-err", stderr,
+ "-i", if (stdin != null, getFieldValue(stdin)),
+ "-d", str:join(fileDirs, "|"),
+ "-if", str:join(remoteFileNames(inFiles), "|"),
+ "-of", str:join(remoteFileNames(outFiles), "|"),
+ "-cf", str:join(remoteFileNames(outCollect), "|"),
+ "-cdmfile", cdm:file(),
+ "-status", statusMode,
+ "-a", if (arguments != null, each(arguments))
)
- }
+ directory = RUN_DIR
+ redirect = false
+ host = rhost
+ TCProfile(rhost, if(attributes != null, attributes = attributes), tr = tr)
+ replicationGroup = replicationGroup
+ replicationChannel = replicationChannel
+ jobid = jobid
+ )
+ }
- setProgress(progress, "Checking status")
- if (statusMode == "files") {
- checkJobStatus(rhost, wfdir, jobid, tr, jobdir)
- }
+ setProgress(progress, "Checking status")
+ if (statusMode == "files") {
+ checkJobStatus(rhost, jobid, tr, jobdir)
+ }
- if (wrapperMode == "files") {
- file:remove(wrapfile)
- }
+ if (wrapperMode == "files") {
+ file:remove(wrapfile)
+ }
- log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+ log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
- /* need to stage the files to upper scratch area in case they are not transfered to another site
- before all the files get cleaned out */
+ /* need to stage the files to upper scratch area in case they are not transfered to another site
+ before all the files get cleaned out */
- setProgress(progress, "Stage out")
- if (isEmpty(outCollect)) {
- doStageout(jobid, sharedDir, rhost, outFiles)
- [] // empty collect list
- }
- else {
- doStageoutCollect(jobid, sharedDir, rhost, outFiles)
- }
+ setProgress(progress, "Stage out")
+ if (isEmpty(outCollect)) {
+ doStageout(jobid, SHARED_DIR, rhost, outFiles)
+ [] // empty collect list
+ }
+ else {
+ doStageoutCollect(jobid, SHARED_DIR, rhost, outFiles)
+ }
+
+ if (configProperty("wrapperlog.always.transfer") == "true") {
+ discard(transferWrapperLog(rhost, RUN_DIR, jobid, jobdir))
+ }
+
+ cacheUnlockFiles(inFiles, SHARED_DIR, rhost) {
+ cleanupFiles(cacheFilesToRemove, rhost)
+ }
- if (configProperty("wrapperlog.always.transfer") == "true") {
- discard(transferWrapperLog(rhost, wfdir, jobid, jobdir))
- }
-
- cacheUnlockFiles(inFiles, sharedDir, rhost) {
+ log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ }
+ else catch(prev) {
+ if (matches(prev, "^Abort$")) {
+ log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+ cacheUnlockFiles(inFiles, SHARED_DIR, rhost, force=false) {
cleanupFiles(cacheFilesToRemove, rhost)
}
-
- log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ throw(prev)
}
- else catch(prev) {
- if (matches(prev, "^Abort$")) {
- log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
- cacheUnlockFiles(inFiles, sharedDir, rhost, force=false) {
- cleanupFiles(cacheFilesToRemove, rhost)
- }
- throw(prev)
+ else {
+ setProgress(progress, "Failed but can retry")
+ exception := try(exception(checkErrorFile(rhost, jobid, jobdir), prev), prev)
+
+ log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+ if (matches(exception,".*executable bit.*")) {
+ throw(exception)
}
- else {
- setProgress(progress, "Failed but can retry")
- exception := try(exception(checkErrorFile(rhost, wfdir, jobid, jobdir), prev), prev)
-
- log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
- if (matches(exception,".*executable bit.*")) {
- throw(exception)
- }
+ cacheUnlockFiles(inFiles, SHARED_DIR, rhost, force=false) {
+ cleanupFiles(cacheFilesToRemove, rhost)
+ }
- cacheUnlockFiles(inFiles, sharedDir, rhost, force=false) {
- cleanupFiles(cacheFilesToRemove, rhost)
- }
+ outs := transferStandardFiles(rhost, tmpdir, jobid, stdout, stderr)
- outs := transferStandardFiles(rhost, tmpdir, jobid, stdout, stderr)
+ discard(maybe(transferWrapperLog(rhost, jobid, jobdir)))
- discard(maybe(transferWrapperLog(rhost, wfdir, jobid, jobdir)))
-
-
- throw(
- exception(
- concat(
- "Exception in {tr}:",
- if (arguments != null, "\n Arguments: {arguments}")
- "\n Host: {rhost}",
- "\n Directory: {tmpdir}",
- "{outs}",
- )
- exception
+
+ throw(
+ exception(
+ concat(
+ "Exception in {tr}:",
+ if (arguments != null, "\n Arguments: {arguments}")
+ "\n Host: {rhost}",
+ "\n Directory: {tmpdir}",
+ "{outs}",
)
+ exception
)
- }
+ )
}
}
}
Modified: trunk/libexec/swift-lib.k
===================================================================
--- trunk/libexec/swift-lib.k 2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift-lib.k 2014-05-10 18:05:51 UTC (rev 7851)
@@ -52,6 +52,7 @@
export(initProgressState, def("org.griphyn.vdl.karajan.lib.RuntimeStats$InitProgressState"))
export(setProgress, def("org.griphyn.vdl.karajan.lib.RuntimeStats$SetProgress"))
export(new, def("org.griphyn.vdl.karajan.lib.New"))
+ export(field, def("org.griphyn.vdl.karajan.lib.GetFieldConst"))
export(createArray, def("org.griphyn.vdl.karajan.lib.CreateArray"))
/* used from VDL2 for arguments to apps and returns relative paths */
export(fileName, def("org.griphyn.vdl.karajan.lib.FileName"))
Modified: trunk/libexec/swift.k
===================================================================
--- trunk/libexec/swift.k 2014-05-09 14:35:51 UTC (rev 7850)
+++ trunk/libexec/swift.k 2014-05-10 18:05:51 UTC (rev 7851)
@@ -179,7 +179,7 @@
attributes = null,
deperror = false, mdeperror = false,
channel(stagein), channel(stageout)) {
-
+
progress := initProgressState()
done := isDone(stageout)
@@ -199,12 +199,14 @@
replicationGroup := UID()
parallelFor(i, replicationChannel) {
try {
- execute2(
- progress,
- tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
- stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
- stagein, stageout, replicationGroup, replicationChannel
- )
+ allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
+ execute2(
+ rhost, progress,
+ tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
+ stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+ stagein, stageout, replicationGroup, replicationChannel
+ )
+ }
}
else catch(exception) {
if (matches(exception, "^Abort$")) {
@@ -218,12 +220,14 @@
}
else {
try {
- execute2(
- progress,
- tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
- stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
- stagein, stageout, null, null
- )
+ allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
+ execute2(
+ rhost, progress,
+ tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
+ stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
+ stagein, stageout, null, null
+ )
+ }
}
else catch(exception) {
if (matches(exception, "^Abort$")) {
More information about the Swift-commit
mailing list