[Swift-commit] r6210 - branches/faster/libexec
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Fri Feb 1 20:07:51 CST 2013
Author: hategan
Date: 2013-02-01 20:07:50 -0600 (Fri, 01 Feb 2013)
New Revision: 6210
Modified:
branches/faster/libexec/swift-int-staging.k
branches/faster/libexec/swift-int-wrapper-staging.k
branches/faster/libexec/swift-int.k
branches/faster/libexec/swift.k
Log:
made provider staging "work"
Modified: branches/faster/libexec/swift-int-staging.k
===================================================================
--- branches/faster/libexec/swift-int-staging.k 2013-02-01 15:57:31 UTC (rev 6209)
+++ branches/faster/libexec/swift-int-staging.k 2013-02-02 02:07:50 UTC (rev 6210)
@@ -5,15 +5,18 @@
* Things that are not exposed to the translated file
*/
+SWIFT:SCRIPT_NAME := contextAttribute("SWIFT:SCRIPT_NAME")
+SWIFT:RUN_ID := contextAttribute("SWIFT:RUN_ID")
+SWIFT:HOME := contextAttribute("SWIFT:HOME")
WRAPPER_TRANSFER_MODE :=
- if (vdl:configProperty("wrapperlog.always.transfer") == "true",
+ if (configProperty("wrapperlog.always.transfer") == "true",
STAGING_MODE:IF_PRESENT, STAGING_MODE:ON_ERROR + STAGING_MODE:IF_PRESENT)
pinOption := configProperty("provider.staging.pin.swiftfiles")
PIN := if(pinOption == "true", "pinned:", "")
-PROVENANCE_GRAPH_ENABLED := (vdl:configProperty("pgraph") != "false")
+PROVENANCE_GRAPH_ENABLED := (configProperty("pgraph") != "false")
namespace(swift) {
@@ -73,7 +76,7 @@
stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr))
- wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+ wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
cdmfile := cdm:file()
@@ -83,17 +86,18 @@
setProgress(progress, "Submitting")
swift:execute(
+ progress,
siteProfile(rhost, "swift:wrapperInterpreter"),
list(
siteProfile(rhost, "swift:wrapperInterpreterOptions"),
wrapper,
- "-e", vdl:executable(tr, rhost),
+ "-e", executable(tr, rhost),
"-out", stdout,
"-err", stderr,
"-i", if (stdin != null, getFieldValue(stdin)),
"-d", flatten(unique(outFileDirs(stageout))),
- "-if", flatten(infiles(stagein)),
- "-of", flatten(outfiles(stageout)),
+ "-if", flatten(inFiles(stagein)),
+ "-of", flatten(outFiles(stageout)),
"-k",
"-cdmfile", cdmfile,
"-status", "provider"
@@ -110,31 +114,35 @@
stagingMethod := siteProfile(rhost, "swift:stagingMethod", default="proxy")
- stageIn("{PIN}{stagingMethod}://localhost/{swift.home}/libexec/{wrapper}", wrapper)
+ stageIn("{PIN}{stagingMethod}://localhost/{SWIFT:HOME}/libexec/{wrapper}", wrapper)
if (cdmfile != "") {
- d := dirname(cdmfile)
+ d := swift:dirname(cdmfile)
file := basename(cdmfile)
- dir := if (d == "", "./", str:concat(d,"/"))
+ dir := if (d == "", "./", "{d}/")
loc := "{PIN}{stagingMethod}://localhost/"
stageIn("{loc}{dir}{file}", cdmfile)
- stageIn("{loc}{swift.home}/libexec/cdm.pl", "cdm.pl")
- stageIn("{loc}{swift.home}/libexec/cdm_lib.sh", "cdm_lib.sh")
+ stageIn("{loc}{SWIFT:HOME}/libexec/cdm.pl", "cdm.pl")
+ stageIn("{loc}{SWIFT:HOME}/libexec/cdm_lib.sh", "cdm_lib.sh")
}
appStageins(jobid, stagein, ".", stagingMethod)
stageOut("wrapper.log", "{stagingMethod}://localhost/{ddir}/{jobid}.info",
mode = WRAPPER_TRANSFER_MODE)
- //stageOut("{stdout}", "{stagingMethod}://localhost/{ddir}/{stdout}")
- //stageOut("{stderr}", "{stagingMethod}://localhost/{ddir}/{stderr}")
+ if (false) {
+ stageOut("{stdout}", "{stagingMethod}://localhost/{ddir}/{stdout}")
+ stageOut("{stderr}", "{stagingMethod}://localhost/{ddir}/{stderr}")
+ }
stageOut("wrapper.error", "{stagingMethod}://localhost/{ddir}/{jobid}.error",
mode = STAGING_MODE:IF_PRESENT)
appStageouts(jobid, stageout, ".", stagingMethod)
- task:cleanUp(".") //the whole job directory
- ) // execute
- doRestartlog(restartout)
+ task:cleanUp(".")
+ )
+
+
+ doRestartLog(restartout)
log(LOG:DEBUG, "JOB_END jobid={jobid}")
}
else catch(prev) {
Modified: branches/faster/libexec/swift-int-wrapper-staging.k
===================================================================
--- branches/faster/libexec/swift-int-wrapper-staging.k 2013-02-01 15:57:31 UTC (rev 6209)
+++ branches/faster/libexec/swift-int-wrapper-staging.k 2013-02-02 02:07:50 UTC (rev 6210)
@@ -213,6 +213,7 @@
if (wrapperMode == "files") {
swift:execute(
+ progress,
siteProfile(rhost, "swift:wrapperInterpreter"),
list(
siteProfile(rhost, "swift:wrapperInterpreterOptions"),
Modified: branches/faster/libexec/swift-int.k
===================================================================
--- branches/faster/libexec/swift-int.k 2013-02-01 15:57:31 UTC (rev 6209)
+++ branches/faster/libexec/swift-int.k 2013-02-02 02:07:50 UTC (rev 6210)
@@ -307,35 +307,6 @@
log(LOG:INFO, "END jobid={jobid} - Staging out finished")
}
- export(graphStuff,
- function(tr, stagein, stageout, err, args = null) {
- if (configProperty("pgraph") != "false") {
- errprops := if(err, ",color=lightsalmon", ",color=lightsteelblue1")
- tp := currentThread()
- to (graph) {
- concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
- }
- for (si, stagein) {
- si := basename(si)
- to(graph) {
- concat(str:quote(si), " [shape=parallelogram]")
- concat(str:quote(si), " -> ", str:quote(tp))
- }
- }
- for (pv, stageout) {
- (path, var) := each(pv)
- file := fileName(getField(var, path=path))
- file := basename(file)
- label := niceName(var, path = path)
- to(graph) {
- concat(str:quote(file), " [shape=parallelogram,label=", str:quote(label), "]")
- concat(str:quote(tp), " -> ", str:quote(file))
- }
- }
- }
- }
- )
-
fileSizes := function(files) {
math:sum(
for(f, files, file:size(f))
@@ -396,11 +367,13 @@
replicationGroup, replicationChannel
arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
+ log(LOG:INFO, "A ", currentThread())
stagein := list(unique(each(stagein)))
stageout := list(unique(each(stageout)))
+ log(LOG:INFO, "X ", currentThread())
allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
-
+ log(LOG:INFO, "B ", currentThread())
ddir := initDDir(),
(wfdir, sharedDir) :=
try {
@@ -410,6 +383,7 @@
throw(exception("Could not initialize shared directory on {rhost}", exception))
}
+ log(LOG:INFO, "C ", currentThread())
uid := UID()
jobdir := substring(uid, from=0, to=1)
jobid := "{tr}-{uid}"
@@ -445,6 +419,7 @@
}
}
+ log(LOG:INFO, "D ", currentThread())
setProgress(progress, "Stage in")
tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
Modified: branches/faster/libexec/swift.k
===================================================================
--- branches/faster/libexec/swift.k 2013-02-01 15:57:31 UTC (rev 6209)
+++ branches/faster/libexec/swift.k 2013-02-02 02:07:50 UTC (rev 6210)
@@ -4,7 +4,6 @@
import('swift-operators', export = true)
import('swift-lib', export = true)
import('swift-xs', export = true)
-import('swift-int')
SWIFT:SCRIPT_NAME := contextAttribute("SWIFT:SCRIPT_NAME")
SWIFT:RUN_ID := contextAttribute("SWIFT:RUN_ID")
@@ -15,6 +14,10 @@
pstaging := (configProperty("use.provider.staging") == "true")
wstaging := (configProperty("use.wrapper.staging") == "true")
+ impl := if (pstaging, "swift-int-staging.k", if (wstaging, "swift-int-wrapper-staging.k", "swift-int.k"))
+
+ import(file = impl)
+
import(java)
once("vdl.k-print-version") {
@@ -175,10 +178,6 @@
log(LOG:INFO, "Provenance graph saved in ", gname)
}
- impl := if (pstaging, "swift-int-staging.k", if (wstaging, "swift-int-wrapper-staging.k", "swift-int.k"))
-
- import(file = impl)
-
export(execute,
function(
tr, arguments = null,
More information about the Swift-commit
mailing list