[Swift-commit] r6187 - branches/faster/libexec
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Tue Jan 29 23:28:48 CST 2013
Author: hategan
Date: 2013-01-29 23:28:47 -0600 (Tue, 29 Jan 2013)
New Revision: 6187
Added:
branches/faster/libexec/swift-int-staging.k
branches/faster/libexec/swift-int-wrapper-staging.k
Removed:
branches/faster/libexec/vdl-int-staging.k
branches/faster/libexec/vdl-int-wrapper-staging.k
Modified:
branches/faster/libexec/swift-int.k
branches/faster/libexec/swift.k
Log:
selectable swift-int-xxx
Copied: branches/faster/libexec/swift-int-staging.k (from rev 6170, branches/faster/libexec/vdl-int-staging.k)
===================================================================
--- branches/faster/libexec/swift-int-staging.k (rev 0)
+++ branches/faster/libexec/swift-int-staging.k 2013-01-30 05:28:47 UTC (rev 6187)
@@ -0,0 +1,171 @@
+import(sys)
+import(task)
+import('swift-lib')
+/*
+ * Things that are not exposed to the translated file
+ */
+
+
+WRAPPER_TRANSFER_MODE :=
+ if (vdl:configProperty("wrapperlog.always.transfer") == "true",
+ STAGING_MODE:IF_PRESENT, STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
+
+pinOption := configProperty("provider.staging.pin.swiftfiles")
+
+PIN := if(pinOption == "true", "pinned:", "")
+PROVENANCE_GRAPH_ENABLED := (vdl:configProperty("pgraph") != "false")
+
+namespace(swift) {
+
+ initDDir := function() {
+ ddir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
+
+ once(ddir) {
+ if(!file:exists(ddir)) {
+ task:dir:make(ddir)
+ }
+ }
+ ddir
+ }
+
+ inFiles := function(stageins) {
+ pathnames(stageins)
+ }
+
+ fileSizes := function(files) {
+ math:sum(
+ for(f, files, file:size(f))
+ )
+ }
+
+ export(cleanups,
+ function(cleanup) {
+ log(LOG:INFO, "START cleanups={cleanup}")
+ }
+ )
+
+ readErrorFile := function(dir, jobid) {
+ str:strip(file:read("{dir}/{jobid}.error"))
+ file:remove("{dir}/{jobid}.error")
+ }
+
+ export(execute2,
+ function(progress, tr, stagein, stageout, restartout
+ replicationGroup, replicationChannel
+ arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+
+ stagein := list(unique(each(stagein)))
+ stageout := list(unique(each(stageout)))
+
+ allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
+
+ ddir := initDDir()
+
+ uid := UID()
+ jobdir := substring(uid, from=0, to=1)
+ jobid := "{tr}-{uid}"
+
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
+
+ wrapper := "_swiftwrap.staging"
+ wrapfile := "{ddir}/param-{jobid}"
+
+ stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
+ stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr))
+
+ wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+ tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
+ cdmfile := cdm:file()
+
+ try {
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " tmpdir={tmpdir} host={rhost}")
+
+ setProgress(progress, "Submitting")
+
+ swift:execute(
+ siteProfile(rhost, "swift:wrapperInterpreter"),
+ list(
+ siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+ wrapper,
+ "-e", vdl:executable(tr, rhost),
+ "-out", stdout,
+ "-err", stderr,
+ "-i", if (stdin != null, getFieldValue(stdin)),
+ "-d", flatten(unique(outFileDirs(stageout))),
+ "-if", flatten(infiles(stagein)),
+ "-of", flatten(outfiles(stageout)),
+ "-k",
+ "-cdmfile", cdmfile,
+ "-status", "provider"
+ "-a", if (arguments != null, each(arguments))
+ )
+ directory = tmpdir
+ redirect = false
+ host = rhost
+
+ TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
+ replicationGroup = replicationGroup
+ replicationChannel = replicationChannel
+ jobid = jobid
+
+ stagingMethod := siteProfile(rhost, "swift:stagingMethod", default="proxy")
+
+ stageIn("{PIN}{stagingMethod}://localhost/{swift.home}/libexec/{wrapper}", wrapper)
+
+ if (cdmfile != "") {
+ d := dirname(cdmfile)
+ file := basename(cdmfile)
+ dir := if (d == "", "./", str:concat(d,"/"))
+ loc := "{PIN}{stagingMethod}://localhost/"
+ stageIn("{loc}{dir}{file}", cdmfile)
+ stageIn("{loc}{swift.home}/libexec/cdm.pl", "cdm.pl")
+ stageIn("{loc}{swift.home}/libexec/cdm_lib.sh", "cdm_lib.sh")
+ }
+
+ appStageins(jobid, stagein, ".", stagingMethod)
+
+ stageOut("wrapper.log", "{stagingMethod}://localhost/{ddir}/{jobid}.info",
+ mode = WRAPPER_TRANSFER_MODE)
+ //stageOut("{stdout}", "{stagingMethod}://localhost/{ddir}/{stdout}")
+ //stageOut("{stderr}", "{stagingMethod}://localhost/{ddir}/{stderr}")
+ stageOut("wrapper.error", "{stagingMethod}://localhost/{ddir}/{jobid}.error",
+ mode = STAGING_MODE:IF_PRESENT)
+ appStageouts(jobid, stageout, ".", stagingMethod)
+
+ task:cleanUp(".") //the whole job directory
+ ) // execute
+ doRestartlog(restartout)
+ log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ }
+ else catch(prev) {
+ if (matches(prev, "^Abort$")) {
+ log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+ throw(prev)
+ }
+ else {
+ setProgress(progress, "Failed but can retry")
+ exception := try(exception(readErrorFile(ddir, jobid)), prev)
+ log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+ throw(
+ exception(
+ concat(
+ "Exception in {tr}:",
+ if (arguments != null, "\n Arguments: {arguments}")
+ "\n Host: {rhost}",
+ "\n Directory: {tmpdir}"
+ )
+ exception
+ )
+ )
+ }
+ }
+ }
+ }
+ )
+}
+
+// Local variables:
+// tab-width: 4
+// indent-tabs-mode: 1
+// End:
Copied: branches/faster/libexec/swift-int-wrapper-staging.k (from rev 6169, branches/faster/libexec/vdl-int-wrapper-staging.k)
===================================================================
--- branches/faster/libexec/swift-int-wrapper-staging.k (rev 0)
+++ branches/faster/libexec/swift-int-wrapper-staging.k 2013-01-30 05:28:47 UTC (rev 6187)
@@ -0,0 +1,329 @@
+import(sys)
+import(task)
+import('swift-lib')
+
+
+getURLPrefix := def("org.griphyn.vdl.karajan.lib.GetURLPrefix")
+
+URL_PREFIX := getURLPrefix()
+
+WRAPPERLOG_ALWAYS_TRANSFER := vdl:configProperty("wrapperlog.always.transfer")
+SITEDIR_KEEP := vdl:configProperty("sitedir.keep")
+
+
+namespace(swift) {
+
+ checkJobStatus := function(jobdir, jobid, tr) {
+ log(LOG:DEBUG, "START jobid={jobid}")
+ try {
+ file:remove("{jobdir}/_swift.success")
+ log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
+ }
+ else {
+ throw(checkErrorFile(jobdir, jobid))
+ }
+ }
+
+ checkErrorFile := function(jobdir, jobid) {
+ if (file:exists("{jobdir}/_swift.error")) {
+ log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
+ error := str:strip(file:read("{jobdir}/_swift.error"))
+ file:remove("{jobdir}/_swift.error")
+ error
+ }
+ else {
+ log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
+ throw("No status file was found")
+ }
+ }
+
+ initSharedDir := function(progress, rhost) {
+ once(list(rhost, "shared")) {
+ setProgress(progress, "Initializing site shared directory")
+
+ log(LOG:INFO, "START host={rhost} - Initializing shared directory")
+
+ wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+ dir:make(wfdir, host=rhost)
+ transfer(srcdir="{swift.home}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
+
+ wfdir
+ to(cleanup, list(wfdir, rhost))
+ log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
+ }
+ }
+
+ initDDir := function() {
+ ddir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
+
+ once(ddir) {
+ if(!file:exists(ddir)) {
+ task:dir:make(ddir)
+ }
+ }
+ ddir
+ }
+
+
+ inFiles := function(stageins) {
+ pathnames(stageins)
+ }
+
+ fileDirs := function(stageins, stageouts) {
+ list(
+ unique(
+ inFileDirs(stageins)
+ outFileDirs(stageouts)
+ )
+ )
+ }
+
+ cleanup := function(dir, host) {
+ log(LOG:INFO, "START dir={dir} host={host}")
+ if(vdl:configProperty("sitedir.keep") == "false") {
+ task:execute(
+ vdl:siteprofile(host, "swift:cleanupCommand"),
+ arguments = list(
+ siteProfile(host, "swift:cleanupCommandOptions"),
+ dir
+ )
+ host=host, batch=true, TCProfile(host)
+ )
+ log(LOG:INFO, "END dir={dir} host={host}")
+ }
+ }
+
+ cleanups := function(cleanup) {
+ log(LOG:INFO, "START cleanups={cleanup}")
+ parallelFor(i, cleanup) {
+ (dir, host) := each(i)
+ try {
+ vdl:cleanup(dir, host)
+ }
+ else catch(exception) {
+ log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
+ to(warnings, exception("Cleanup on {host} failed", exception))
+ }
+ }
+ log(LOG:INFO, "END cleanups={cleanup}")
+ }
+
+ stageWrapperParams := function(jobid, wrapfile, dir, host) {
+ log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
+ provider := provider(wrapfile)
+ srchost := hostname(wrapfile)
+ srcdir := vdl:dirname(wrapfile)
+ destdir := dir
+ filename := basename(wrapfile)
+
+ cacheOn(list(destdir, host)
+ dir:make(destdir, host=host, provider=provider)
+ )
+
+ log(LOG:INFO, "END jobid={jobid}")
+ }
+
+
+ readStandardFiles := function(jobdir, stdout, stderr) {
+ concat(
+ for(f, [["_swift.stderr", stderr], ["_swift.stdout", stdout]]) {
+ (name, file) := each(f)
+ destfile := "{jobdir}/{file}"
+ "\n {name}: "
+ try {
+ file:read(destfile)
+ "\n"
+ }
+ else {
+ ""
+ }
+ }
+ )
+ }
+
+
+ export(execute2,
+ function(progress, tr, stagein, stageout, restartout
+ replicationGroup, replicationChannel
+ arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+
+ stagein := list(unique(each(stagein)))
+ stageout := list(unique(each(stageout)))
+
+ allocateHost(rhost, constraints=jobConstraints(tr, stagein = stagein)) {
+
+ ddir := initDDir()
+ wfdir := try {
+ initSharedDir(rhost)
+ }
+ else catch(exception) {
+ throw(exception("Could not initialize shared directory on {rhost}", exception))
+ }
+
+ uid := UID()
+ jobid := "{tr}-{uid}"
+
+ jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
+
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+
+ statusMode := configProperty("status.mode", host = rhost)
+ wrapperMode := configProperty("wrapper.parameter.mode", host = rhost)
+
+ wrapfile := "{jobdir}/_paramfile"
+
+ stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
+ stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr))
+
+ fileDirs := fileDirs(stagein, stageout)
+ os := siteProfile(rhost, "SYSINFO:OS")
+
+ scratch := siteProfile(rhost, "scratch")
+
+ if(wrapperMode == "files") {
+ file:write(wrapfile,
+ "-e ",vdl:executable(tr, rhost),
+ "\n-out ", stdout,
+ "\n-err ", stderr,
+ "\n-i ", if (stdin != null, getFieldValue(stdin)),
+ "\n-d ", flatten(each(fileDirs)),
+ "\n-if ", flatten(inFiles(stagein)),
+ "\n-of ", flatten(outFiles(stageout)),
+ "\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+ "\n-sk", SITEDIR_KEEP,
+ "\n-cdmfile ", cdm:file(),
+ "\n-status ", statusMode,
+ for(a, arguments) {
+ "\n-a ", a
+ }
+ )
+ }
+
+
+ setProgress(progress, "Stage in")
+
+ try {
+ if (wrapperMode == "files") {
+ stageWrapperParams(jobid, wrapfile, wfdir, rhost)
+ }
+
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " host={rhost}")
+
+ setProgress(progress, "Submitting")
+
+ if (wrapperMode == "files") {
+ swift:execute(
+ siteProfile(rhost, "swift:wrapperInterpreter"),
+ list(
+ siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+ "_swiftwrap.wrapperstaging",
+ jobid,
+ "-urlprefix", URL_PREFIX,
+ "-jobdir", jobdir,
+ "-scratch", scratch,
+ "-p", wrapfile
+ )
+ directory=wfdir
+ redirect=false
+ host=rhost
+ TCProfile(rhost, if (attributes != null, attributes = attributes), tr=tr)
+ replicationGroup=replicationGroup
+ replicationChannel=replicationChannel
+ jobid=jobid
+ )
+ }
+ if (wrapperMode == "args") {
+ swift:execute(
+ siteProfile(rhost, "swift:wrapperInterpreter"),
+ list(
+ siteProfile(rhost, "swift:wrapperInterpreterOptions"),
+ "_swiftwrap.wrapperstaging",
+ jobid,
+ "-urlprefix", URL_PREFIX,
+ "-jobdir", jobdir,
+ "-scratch", scratch,
+ "-e", vdl:executable(tr, rhost),
+ "-out", stdout,
+ "-err", stderr,
+ "-i", if (stdin != null, getFieldValue(stdin)),
+ "-d", flatten(each(fileDirs)),
+ "-if", flatten(inFiles(stagein)),
+ "-of", flatten(outFiles(stageout)),
+ "-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+ "-sk", SITEDIR_KEEP,
+ "-cdmfile", cdm:file(),
+ "-status", statusMode,
+ "-a", if (arguments != null, each(arguments))
+ )
+ directory=wfdir
+ redirect=false
+ host=rhost
+ TCProfile(rhost, if(attributes != null, attributes = attributes), tr=tr)
+ replicationGroup=replicationGroup
+ replicationChannel=replicationChannel
+ jobid=jobid
+ )
+ }
+
+ setProgress(progress, "Checking status")
+ if (statusMode == "files") {
+ checkJobStatus(jobdir, jobid, tr)
+ }
+
+ if (wrapperMode == "files") {
+ file:remove(wrapfile)
+ }
+
+ log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+
+
+ /* need to stage the files to upper scratch area in case they are not transfered to another site
+ before all the files get cleaned out */
+
+
+ setProgress(progress, "Stage out")
+ doRestartlog(restartout)
+
+ log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ }
+ else catch(prev) {
+ if (matches(prev, "^Abort$")) {
+ log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+ throw(prev)
+ }
+ else {
+ setProgress(progress, "Failed but can retry")
+ exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
+
+ log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+ if (matches(exception,".*executable bit.*")) {
+ throw(exception)
+ }
+
+ outs := readStandardFiles(jobdir, stdout, stderr)
+
+ throw(
+ exception(
+ concat(
+ "Exception in {tr}:",
+ if (arguments != null, "\n Arguments: {arguments}")
+ "\n Host: {rhost}",
+ "\n Directory: {tmpdir}",
+ "{outs}",
+ )
+ exception
+ )
+ )
+ }
+ }
+ }
+ }
+ )
+}
+
+// Local variables:
+// mode: scheme
+// tab-width: 4
+// indent-tabs-mode: t
+// End:
Modified: branches/faster/libexec/swift-int.k
===================================================================
--- branches/faster/libexec/swift-int.k 2013-01-30 05:28:14 UTC (rev 6186)
+++ branches/faster/libexec/swift-int.k 2013-01-30 05:28:47 UTC (rev 6187)
@@ -455,7 +455,7 @@
stageWrapperParams(jobid, jobdir, wrapfile, wfdir, rhost)
}
- log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " tmpdir={tmpdir} host={rhost}")
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)), " tmpdir={tmpdir} host={rhost}")
setProgress(progress, "Submitting")
@@ -501,7 +501,7 @@
directory = wfdir
redirect = false
host = rhost
- TCProfile(rhost, if (attributes != null, attributes = attributes), tr = tr)
+ TCProfile(rhost, if(attributes != null, attributes = attributes), tr = tr)
replicationGroup = replicationGroup
replicationChannel = replicationChannel
jobid = jobid
@@ -569,7 +569,7 @@
exception(
concat(
"Exception in {tr}:",
- maybe("\n Arguments: ", arguments),
+ if (arguments != null, "\n Arguments: {arguments}")
"\n Host: {rhost}",
"\n Directory: {tmpdir}",
"{outs}",
@@ -582,24 +582,6 @@
}
}
)
-
- export(generateProvenanceGraph,
- function(gdata) {
- pgraph := configProperty("pgraph")
- gname := if(pgraph == "true", "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.dot", pgraph)
- file:write(gname) {
- "digraph SwiftProvenance \{\n",
- " graph [", configProperty("pgraph.graph.options"), "];\n",
- " node [", configProperty("pgraph.node.options"), "];\n",
-
- for(i, gdata) {
- " ", i, "\n"
- }
- "}\n"
- }
- log(LOG:INFO, "Provenance graph saved in ", gname)
- }
- )
}
// Local variables:
Modified: branches/faster/libexec/swift.k
===================================================================
--- branches/faster/libexec/swift.k 2013-01-30 05:28:14 UTC (rev 6186)
+++ branches/faster/libexec/swift.k 2013-01-30 05:28:47 UTC (rev 6187)
@@ -12,8 +12,8 @@
namespace(swift) {
- pstaging := configProperty("use.provider.staging")
- wstaging := configProperty("use.wrapper.staging")
+ pstaging := (configProperty("use.provider.staging") == "true")
+ wstaging := (configProperty("use.wrapper.staging") == "true")
import(java)
@@ -132,6 +132,53 @@
}
)
+ graphStuff := function(tr, stagein, stageout, err, args = null) {
+ if (configProperty("pgraph") != "false") {
+ errprops := if(err, ",color=lightsalmon", ",color=lightsteelblue1")
+ tp := currentThread()
+ to (graph) {
+ concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
+ }
+ for (si, stagein) {
+ si := basename(si)
+ to(graph) {
+ concat(str:quote(si), " [shape=parallelogram]")
+ concat(str:quote(si), " -> ", str:quote(tp))
+ }
+ }
+ for (pv, stageout) {
+ (path, var) := each(pv)
+ file := fileName(getField(var, path=path))
+ file := basename(file)
+ label := niceName(var, path = path)
+ to(graph) {
+ concat(str:quote(file), " [shape=parallelogram,label=", str:quote(label), "]")
+ concat(str:quote(tp), " -> ", str:quote(file))
+ }
+ }
+ }
+ }
+
+ generateProvenanceGraph := function(gdata) {
+ pgraph := configProperty("pgraph")
+ gname := if(pgraph == "true", "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.dot", pgraph)
+ file:write(gname) {
+ "digraph SwiftProvenance \{\n",
+ " graph [", configProperty("pgraph.graph.options"), "];\n",
+ " node [", configProperty("pgraph.node.options"), "];\n",
+
+ for(i, gdata) {
+ " ", i, "\n"
+ }
+ "}\n"
+ }
+ log(LOG:INFO, "Provenance graph saved in ", gname)
+ }
+
+ impl := if (pstaging, "swift-int-staging.k", if (wstaging, "swift-int-wrapper-staging.k", "swift-int.k"))
+
+ import(file = impl)
+
export(execute,
function(
tr, arguments = null,
Deleted: branches/faster/libexec/vdl-int-staging.k
===================================================================
--- branches/faster/libexec/vdl-int-staging.k 2013-01-30 05:28:14 UTC (rev 6186)
+++ branches/faster/libexec/vdl-int-staging.k 2013-01-30 05:28:47 UTC (rev 6187)
@@ -1,212 +0,0 @@
-import("sys.k")
-import("task.k")
-import("vdl-lib.xml")
-/*
- * Things that are not exposed to the translated file
- */
-
-global(LOG:DEBUG, "debug")
-global(LOG:INFO, "info")
-global(LOG:WARN, "warn")
-global(LOG:ERROR, "error")
-global(LOG:FATAL, "fatal")
-
-global(WRAPPER_TRANSFER_MODE,
- if(vdl:configProperty("wrapperlog.always.transfer") == "true",
- STAGING_MODE:IF_PRESENT, STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT))
-
-pinOption := configProperty("provider.staging.pin.swiftfiles")
-global(PIN, if(pinOption == "true", "pinned:", ""))
-global(PROVENANCE_GRAPH_ENABLED, vdl:configProperty("pgraph") != "false")
-
-namespace("vdl") {
- export(
- function(initDDir) {
- ddir := "{VDL:SCRIPTNAME}-{VDL:RUNID}.d"
- once(ddir) {
- if(sys:not(file:exists(ddir))) {
- task:dir:make(ddir)
- }
- }
- ddir
- }
-
- function(inFiles, stageins) {
- pathnames(stageins)
- }
-
- function(graphStuff, tr, stagein, stageout, err, optional(args)) {
- if(PROVENANCE_GRAPH_ENABLED) {
- errprops := if(err ",color=lightsalmon" ",color=lightsteelblue1")
- tp := vdl:threadPrefix()
- to(graph) {
- concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
- }
- for(si, stagein) {
- si := basename(si)
- to(graph) {
- concat(str:quote(si), " [shape=parallelogram]")
- concat(str:quote(si), " -> ", str:quote(tp))
- }
- }
- for(pv, stageout) {
- (path, var) := each(pv)
- file := vdl:fileName(vdl:getfield(var, path=path))
- file := basename(file)
- label := vdl:niceName(var, path = path)
- to(graph) {
- concat(str:quote(file), " [shape=parallelogram,label=",
- str:quote(label), "]")
- concat(str:quote(tp), " -> ", str:quote(file))
- }
- }
- }
- }
-
- function(fileSizes, files) {
- math:sum(
- for(f, files, file:size(file))
- )
- }
-
- function(cleanups, cleanup) {
- log(LOG:INFO, "START cleanups={cleanup}")
- }
-
- function(readErrorFile, dir, jobid) {
- str:strip(file:read("{dir}/{jobid}.error"))
- file:remove("{dir}/{jobid}.error")
- }
-
- function(execute2, tr, optional(arguments, stdin, stdout, stderr), stagein, stageout, restartout,
- replicationGroup, replicationChannel) {
-
- stagein := list(unique(each(stagein)))
- stageout := list(unique(each(stageout)))
-
- allocateHost(rhost, constraints=vdl:jobConstraints(tr, stagein=stagein)) {
-
- ddir := initDDir()
-
- uid := uid()
- jobdir := substring(uid, from=0, to=1)
- jobid := "{tr}-{uid}"
-
- log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread=", currentThread(), " host={rhost} replicationGroup={replicationGroup}")
-
- wrapper := "_swiftwrap.staging"
- wrapfile := "{ddir}/param-{jobid}"
-
- stdout := try(getFieldValue(stdout), "stdout.txt")
- stderr := try(getFieldValue(stderr), "stderr.txt")
-
- wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
- tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
- cdmfile := cdm:file()
-
- try {
- log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " tmpdir={tmpdir} host={rhost}")
-
- vdl:setprogress("Submitting")
-
- vdl:execute(
- vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
- list(
- vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
- wrapper,
- "-e", vdl:executable(tr, rhost),
- "-out", stdout,
- "-err", stderr,
- "-i", maybe(getFieldValue(stdin)),
- "-d", flatten(unique(outFileDirs(stageout))),
- "-if", flatten(infiles(stagein)),
- "-of", flatten(outfiles(stageout)),
- "-k",
- "-cdmfile", cdmfile,
- "-status", "provider"
- "-a", maybe(each(arguments))
- )
- directory = tmpdir
- redirect = false
- host = rhost
-
- vdl:tcprofile(rhost, tr = tr) //this gets various app params from the tc, such as environment, walltime, etc
- replicationGroup = replicationGroup
- replicationChannel = replicationChannel
- jobid = jobid
-
- stagingMethod := vdl:siteProfile(rhost, "swift:stagingMethod", default="proxy")
-
- stageIn("{PIN}{stagingMethod}://localhost/{swift.home}/libexec/{wrapper}", wrapper)
-
- if (cdmfile != "") {
- d := vdl:dirname(cdmfile)
- file := basename(cdmfile)
- dir := if (d == "", "./", str:concat(d,"/"))
- loc := "{PIN}{stagingMethod}://localhost/"
- stageIn("{loc}{dir}{file}", cdmfile)
- stageIn("{loc}{swift.home}/libexec/cdm.pl", "cdm.pl")
- stageIn("{loc}{swift.home}/libexec/cdm_lib.sh", "cdm_lib.sh")
- }
-
- appStageins(jobid, stagein, ".", stagingMethod)
-
- stageOut("wrapper.log", "{stagingMethod}://localhost/{ddir}/{jobid}.info",
- mode = WRAPPER_TRANSFER_MODE)
- //stageOut("{stdout}", "{stagingMethod}://localhost/{ddir}/{stdout}")
- //stageOut("{stderr}", "{stagingMethod}://localhost/{ddir}/{stderr}")
- stageOut("wrapper.error", "{stagingMethod}://localhost/{ddir}/{jobid}.error",
- mode = STAGING_MODE:IF_PRESENT)
- appStageouts(jobid, stageout, ".", stagingMethod)
-
- task:cleanUp(".") //the whole job directory
- ) // execute
- doRestartlog(restartout)
- log(LOG:DEBUG, "JOB_END jobid={jobid}")
- }
- else catch(exception, "^Abort$") {
- log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
- throw(exception)
- }
- else catch(prev, "^(?!Abort$).*") {
- vdl:setprogress("Failed but can retry")
- exception := try(exception(readErrorFile(ddir, jobid)), prev)
- log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
-
- throw(
- exception(
- concat(
- "Exception in {tr}:", nl(),
- maybe(" Arguments: {arguments}", nl()),
- " Host: {rhost}", nl(),
- " Directory: {tmpdir}", nl()
- )
- exception
- )
- )
- }
- }
- }
-
- function(generateProvenanceGraph, gdata) {
- pgraph := vdl:configProperty("pgraph")
- gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
- file:write(gname) {
- "digraph SwiftProvenance {{", nl()
- " graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
- " node [", vdl:configProperty("pgraph.node.options"), "];", nl()
-
- for(i, gdata) {
- " ", i, nl()
- }
- "}", nl()
- }
- log(LOG:INFO, "Provenance graph saved in ", gname)
- }
- )
-}
-
-// Local variables:
-// tab-width: 4
-// indent-tabs-mode: 1
-// End:
Deleted: branches/faster/libexec/vdl-int-wrapper-staging.k
===================================================================
--- branches/faster/libexec/vdl-int-wrapper-staging.k 2013-01-30 05:28:14 UTC (rev 6186)
+++ branches/faster/libexec/vdl-int-wrapper-staging.k 2013-01-30 05:28:47 UTC (rev 6187)
@@ -1,384 +0,0 @@
-import("sys.k")
-import("task.k")
-import("vdl-lib.xml")
-/*
- * Things that are not exposed to the translated file
- */
-
-global(LOG:DEBUG, "debug")
-global(LOG:INFO, "info")
-global(LOG:WARN, "warn")
-global(LOG:ERROR, "error")
-global(LOG:FATAL, "fatal")
-
-global(URL_PREFIX,
- elementDef(getURLPrefix, className="org.griphyn.vdl.karajan.lib.GetURLPrefix")
- getURLPrefix()
-)
-
-global(WRAPPERLOG_ALWAYS_TRANSFER, vdl:configProperty("wrapperlog.always.transfer"))
-global(SITEDIR_KEEP, vdl:configProperty("sitedir.keep"))
-
-
-namespace("vdl"
- export(
- element(checkJobStatus, [jobdir, jobid, tr]
- log(LOG:DEBUG, "START jobid={jobid}")
- try(
- sequential(
- /*
- * This is a bit of optimization, but I'm not completely
- * sure of its correctness. The goal is to both detect
- * the presence of the success file and remove it, all
- * in one operation. It relies on file:remove() throwing
- * an exception if the file is not there.
- */
- file:remove("{jobdir}/_swift.success")
- log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
- )
- throw(checkErrorFile(jobdir, jobid))
- )
- )
-
- element(checkErrorFile, [jobdir, jobid]
- if (
- file:exists("{jobdir}/_swift.error") then(
- log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
- error := str:strip(file:read("{jobdir}/_swift.error"))
- file:remove("{jobdir}/_swift.error")
- error
- )
- else (
- log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
- throw("No status file was found")
- )
- )
- )
-
- element(initSharedDir, [rhost]
- once(list(rhost, "shared")
- vdl:setprogress("Initializing site shared directory")
-
- log(LOG:INFO, "START host={rhost} - Initializing shared directory")
-
- wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
- dir:make(wfdir, host=rhost)
- transfer(srcdir="{swift.home}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
-
- wfdir
- //we send the cleanup data to vdl:main()
- to(cleanup, list(wfdir, rhost))
- log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
- )
- )
-
- element(initDDir, []
- ddir := "{VDL:SCRIPTNAME}-{VDL:RUNID}.d"
- once(ddir
- if(sys:not(file:exists(ddir))
- task:dir:make(ddir)
- )
- )
- ddir
- )
-
- element(inFiles, [stageins]
- stageins
- )
-
- element(fileDirs, [stageins, stageouts]
- list(
- unique(
- inFileDirs(stageins)
- outFileDirs(stageouts)
- )
- )
- )
-
- element(cleanup, [dir, host]
- log(LOG:INFO, "START dir={dir} host={host}")
- if(vdl:configProperty("sitedir.keep") == "false"
- task:execute(
- vdl:siteprofile(host, "swift:cleanupCommand"),
- arguments=list(
- vdl:siteprofile(host, "swift:cleanupCommandOptions"),
- dir
- )
- host=host, batch=true, tcprofile(host))
- )
- log(LOG:INFO, "END dir={dir} host={host}")
- )
-
- element(cleanups, [cleanup]
- log(LOG:INFO, "START cleanups={cleanup}")
- parallelFor(i, cleanup
- [dir, host] := each(i)
- try(
- vdl:cleanup(dir, host)
- catch(".*",
- log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
- to(warnings, exception("Cleanup on {host} failed", exception))
- )
- )
- )
- log(LOG:INFO, "END cleanups={cleanup}")
- )
-
- element(stageWrapperParams, [jobid, wrapfile, dir, host]
- log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
- provider := provider(wrapfile)
- srchost := hostname(wrapfile)
- srcdir := vdl:dirname(wrapfile)
- destdir := dir
- filename := basename(wrapfile)
-
- cacheOn(list(destdir, host)
- dir:make(destdir, host=host, provider=provider)
- )
-
- log(LOG:INFO, "END jobid={jobid}")
- )
-
- element(graphStuff, [tr, stagein, stageout, err, optional(args)]
- if(
- vdl:configProperty("pgraph") != "false" then(
- errprops := if(err ",color=lightsalmon" ",color=lightsteelblue1")
- tp := vdl:threadPrefix()
- to(graph,
- concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
- )
- for(si, stagein
- si := basename(si)
- to(graph
- concat(str:quote(si), " [shape=parallelogram]")
- concat(str:quote(si), " -> ", str:quote(tp))
- )
- )
- for(pv, stageout
- [path, var] := each(pv)
- file := vdl:fileName(vdl:getfield(var, path=path))
- file := basename(file)
- label := vdl:niceName(var, path = path)
- to(graph
- concat(str:quote(file), " [shape=parallelogram,label=",
- str:quote(label), "]")
- concat(str:quote(tp), " -> ", str:quote(file))
- )
- )
- )
- )
- )
-
- element(readStandardFiles, [jobdir, stdout, stderr]
- concat(
- for(f, list(list("_swift.stderr", stderr), list("_swift.stdout", stdout))
- [name, file] := each(f)
- destfile := "{jobdir}/{file}"
- nl()
- "{name}: "
- try(
- file:read(destfile)
- nl()
- )
- )
- )
- )
-
-
- element(execute2, [tr, optional(arguments, stdin, stdout, stderr, attributes), stagein, stageout, restartout,
- replicationGroup, replicationChannel]
- stagein := list(unique(each(stagein)))
- stageout := list(unique(each(stageout)))
-
- allocateHost(rhost, constraints=vdl:jobConstraints(tr, stagein=stagein)
-
- ddir := initDDir()
- wfdir := try(
- initSharedDir(rhost)
- throw(exception("Could not initialize shared directory on {rhost}", exception))
- )
-
- uid := uid()
- jobid := "{tr}-{uid}"
-
- jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
-
- log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
-
- statusMode := configProperty("status.mode",host=rhost)
- wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
-
- wrapfile := "{jobdir}/_paramfile"
-
- stdout := try(getFieldValue(stdout), "_swift.stdout")
- stderr := try(getFieldValue(stderr), "_swift.stderr")
- fileDirs := fileDirs(stagein, stageout)
- os := vdl:siteprofile(rhost, "SYSINFO:OS")
-
- scratch := vdl:siteprofile(rhost, "scratch")
-
- if(wrapperMode == "files"
- sequential(
- sys:file:write(wrapfile,
- "-e ",vdl:executable(tr, rhost), nl(),
- "-out ", stdout, nl(),
- "-err ", stderr, nl(),
- "-i ", maybe(getFieldValue(stdin)), nl(),
- "-d ", flatten(each(fileDirs)), nl(),
- "-if ", flatten(inFiles(stagein)), nl(),
- "-of ", flatten(outFiles(stageout)), nl(),
- "-wt", WRAPPERLOG_ALWAYS_TRANSFER,
- "-sk", SITEDIR_KEEP,
- "-cdmfile ", cdm:file(), nl(),
- "-status ", statusMode, nl(),
- for(a, arguments, "-a ", a, nl())
- )
- )
- )
-
- vdl:setprogress("Stage in")
-
- try(
- sequential(
-
- if(wrapperMode == "files"
- stageWrapperParams(jobid, wrapfile, wfdir, rhost)
- )
-
- log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " host={rhost}")
-
- vdl:setprogress("Submitting")
-
- if(wrapperMode == "files"
- vdl:execute(
- vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
- list(
- vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
- "_swiftwrap.wrapperstaging",
- jobid,
- "-urlprefix", URL_PREFIX,
- "-jobdir", jobdir,
- "-scratch", scratch,
- "-p", wrapfile
- )
- directory=wfdir
- redirect=false
- host=rhost
- vdl:tcprofile(rhost, maybe(attributes=attributes), tr=tr) //this gets various app params from the tc, such as environment, walltime, etc
- replicationGroup=replicationGroup
- replicationChannel=replicationChannel
- jobid=jobid
- )
- )
- if(wrapperMode == "args"
- vdl:execute(
- vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
- list(
- vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
- "_swiftwrap.wrapperstaging",
- jobid,
- "-urlprefix", URL_PREFIX,
- "-jobdir", jobdir,
- "-scratch", scratch,
- "-e", vdl:executable(tr, rhost),
- "-out", stdout,
- "-err", stderr,
- "-i", maybe(getFieldValue(stdin)),
- "-d", flatten(each(fileDirs)),
- "-if", flatten(inFiles(stagein)),
- "-of", flatten(outFiles(stageout)),
- "-wt", WRAPPERLOG_ALWAYS_TRANSFER,
- "-sk", SITEDIR_KEEP,
- "-cdmfile", cdm:file(),
- "-status", statusMode,
- "-a", maybe(each(arguments))
- )
- directory=wfdir
- redirect=false
- host=rhost
- vdl:tcprofile(rhost, maybe(attributes=attributes), tr=tr)
- replicationGroup=replicationGroup
- replicationChannel=replicationChannel
- jobid=jobid
- )
- )
-
- vdl:setprogress("Checking status")
- if(statusMode == "files"
- checkJobStatus(jobdir, jobid, tr)
- )
-
- if(wrapperMode == "files"
- file:remove(wrapfile)
- )
-
- log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
-
-
- /* need to stage the files to upper scratch area in case they are not transfered to another site
- before all the files get cleaned out */
-
-
- vdl:setprogress("Stage out")
- doRestartlog(restartout)
-
- log(LOG:DEBUG, "JOB_END jobid={jobid}")
- )
- catch("^Abort$"
- log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
- throw(exception)
- )
- catch("^(?!Abort$).*"
- vdl:setprogress("Failed but can retry")
- prev := exception
- exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
-
- log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
-
- if(matches(exception,".*executable bit.*")
- generateError(exception)
- )
-
- outs := readStandardFiles(jobdir, stdout, stderr)
-
- throw(
- exception(
- concat(
- "Exception in {tr}:", nl(),
- maybe("Arguments: ", arguments, nl()),
- "Host: {rhost}", nl(),
- "Directory: {scratch}/{jobid}",
- "{outs}", nl(),
- "----", nl()
- )
- exception
- )
- )
- )
- )
- )
- )
-
- element(generateProvenanceGraph, [gdata]
- pgraph := vdl:configProperty("pgraph")
- gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
- file:write(gname
- "digraph SwiftProvenance {{", nl()
- " graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
- " node [", vdl:configProperty("pgraph.node.options"), "];", nl()
-
- for(i, gdata
- " ", i, nl()
- )
- "}", nl()
- )
- log(LOG:INFO, "Provenance graph saved in ", gname)
- )
- )
-)
-
-// Local variables:
-// mode: scheme
-// tab-width: 4
-// indent-tabs-mode: t
-// End:
More information about the Swift-commit
mailing list