[Swift-commit] r3203 - trunk/libexec
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Thu Jan 21 15:32:32 CST 2010
Author: hategan
Date: 2010-01-21 15:32:32 -0600 (Thu, 21 Jan 2010)
New Revision: 3203
Added:
trunk/libexec/_swiftwrap.staging
trunk/libexec/vdl-int-staging.k
Modified:
trunk/libexec/vdl.k
Log:
added provider staging specific files
Added: trunk/libexec/_swiftwrap.staging
===================================================================
--- trunk/libexec/_swiftwrap.staging (rev 0)
+++ trunk/libexec/_swiftwrap.staging 2010-01-21 21:32:32 UTC (rev 3203)
@@ -0,0 +1,284 @@
+# this script must be invoked inside of bash, not plain sh
+
+infosection() {
+ echo >& "$INFO"
+ echo "_____________________________________________________________________________" >& "$INFO"
+ echo >& "$INFO"
+ echo " $1" >& "$INFO"
+ echo "_____________________________________________________________________________" >& "$INFO"
+ echo >& "$INFO"
+}
+
+info() {
+ infosection "uname -a"
+ uname -a 2>&1 >& "$INFO"
+ infosection "id"
+ id 2>&1 >& "$INFO"
+ infosection "env"
+ env 2>&1 >& "$INFO"
+ infosection "df"
+ df 2>&1 >& "$INFO"
+ if [ -e "/proc/cpuinfo" ]; then
+ infosection "/proc/cpuinfo"
+ cat /proc/cpuinfo 2>&1 >& "$INFO"
+ fi
+ if [ -e "/proc/meminfo" ]; then
+ infosection "/proc/meminfo"
+ cat /proc/meminfo 2>&1 >& "$INFO"
+ fi
+ infosection "command line"
+ echo $COMMANDLINE 2>&1 >& "$INFO"
+ if [ -f "$STDOUT" ] ; then
+ infosection "stdout"
+ cat $STDOUT >& "$INFO"
+ fi
+ if [ -f "$STDERR" ] ; then
+ infosection "stderr"
+ cat $STDERR >& "$INFO"
+ fi
+}
+
+logstate() {
+ echo "Progress " `date +"%Y-%m-%d %H:%M:%S.%N%z"` " $@" >& "$INFO"
+}
+
+log() {
+ echo "$@" >& "$INFO"
+}
+
+fail() {
+ EC=$1
+ shift
+ if [ "$STATUSMODE" = "files" ]; then
+ echo $@ >"$WFDIR/status/$JOBDIR/${ID}-error"
+ fi
+ log $@
+ info
+ if [ "$STATUSMODE" = "files" ]; then
+ exit 0
+ else
+ exit $EC
+ fi
+}
+
+checkError() {
+ if [ "$?" != "0" ]; then
+ fail $@
+ fi
+}
+
+checkEmpty() {
+ if [ "$1" == "" ]; then
+ shift
+ fail 254 $@
+ fi
+}
+
+checkparamfile() {
+ log "checking for paramfile"
+ if [ "$1" == "-p" ]; then
+ JOBDIR=$2
+ PARAMFILE=${WFDIR}/parameters/${JOBDIR}/param-${ID}
+ fi
+ log "paramfile is: $PARAMFILE"
+}
+
+getarg() {
+ NAME=$1
+ shift
+ VALUE=""
+ SHIFTCOUNT=0
+ if [ "$PARAMFILE" == "" ] && [ "$1" == "$NAME" ]; then
+ shift
+ let "SHIFTCOUNT=$SHIFTCOUNT+1"
+ while [ "${1:0:1}" != "-" ] && [ "$#" != "0" ]; do
+ VALUE="$VALUE $1"
+ shift
+ let "SHIFTCOUNT=$SHIFTCOUNT+1"
+ done
+ VALUE="${VALUE:1}"
+ elif [ "$PARAMFILE" != "" ] && grep -E "^$NAME " $PARAMFILE ; then
+ VALUE=$(grep -E "^$NAME " $PARAMFILE | cut -d ' ' -f 2-)
+ else
+ fail 254 "Missing $NAME argument"
+ fi
+}
+
+openinfo() {
+ exec 3<> $1
+ INFO=3
+}
+
+closeinfo() {
+ exec 3>&-
+}
+
+COMMANDLINE=$@
+
+# make the WFDIR absolute
+PARAMFILE=
+
+openinfo "wrapper.log"
+
+checkparamfile "$@"
+
+if [ "X$INFODIR" == "X" ]; then
+ INFODIR="."
+fi
+
+logstate "LOG_START"
+infosection "Wrapper"
+
+getarg "-e" "$@"
+EXEC=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-out" "$@"
+STDOUT=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-err" "$@"
+STDERR=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-i" "$@"
+STDIN=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-d" "$@"
+DIRS=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-if" "$@"
+INF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-of" "$@"
+OUTF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-k" "$@"
+KICKSTART=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-status" "$@"
+STATUSMODE=$VALUE
+shift $SHIFTCOUNT
+
+declare -a CMDARGS
+if [ "$PARAMFILE" == "" ] && [ "$1" == "-a" ] ; then
+ shift
+ CMDARGS=("$@")
+elif [ "$PARAMFILE" != "" ] ; then
+ CMDARGS=()
+ FIRST=1
+ while read line ; do
+ if [ "$FIRST" == "1" ] ; then
+ CMDARGS=("$line")
+ FIRST=0
+ else
+ CMDARGS=("${CMDARGS[*]}" "$line")
+ fi
+ done < <(grep -E "^-a " $PARAMFILE | cut -d " " -f 2-)
+else
+ fail 254 "Missing arguments (-a option)"
+fi
+
+PATH=$PATH:/bin:/usr/bin
+
+if [ "$PATHPREFIX" != "" ]; then
+ export PATH=$PATHPREFIX:$PATH
+fi
+
+if [ "$SWIFT_EXTRA_INFO" != "" ]; then
+ log "EXTRAINFO=$($SWIFT_EXTRA_INFO)"
+fi
+
+if [ "X${EXEC:0:1}" != "X/" ] ; then
+ export ORIGEXEC=$EXEC
+ export EXEC=$(which $EXEC)
+ if [ "X$EXEC" = "X" ] ; then
+ fail 254 "Cannot find executable $ORIGEXEC on site system path"
+ fi
+fi
+
+log "EXEC=$EXEC"
+log "STDIN=$STDIN"
+log "STDOUT=$STDOUT"
+log "STDERR=$STDERR"
+log "DIRS=$DIRS"
+log "INF=$INF"
+log "OUTF=$OUTF"
+log "KICKSTART=$KICKSTART"
+log "ARGS=$@"
+log "ARGC=$#"
+
+IFS="|"
+
+logstate "CREATE_INPUTDIR"
+for D in $DIRS ; do
+ mkdir -p "$D" 2>&1 >>"$INFO"
+ checkError 254 "Failed to create input directory $D"
+ log "Created output directory: $D"
+done
+
+logstate "EXECUTE"
+
+#ls >>$WRAPPERLOG
+if [ ! -f "$EXEC" ]; then
+ fail 254 "The executable $EXEC does not exist"
+fi
+if [ ! -x "$EXEC" ]; then
+ fail 254 "The executable $EXEC does not have the executable bit set"
+fi
+
+if [ "$STDIN" == "" ]; then
+ if [ "$SWIFT_GEN_SCRIPTS" != "" ]; then
+ echo "#!/bin/bash" > run.sh
+ echo "\"$EXEC\" \"${CMDARGS[@]}\" 1>\"$STDOUT\" 2>\"$STDERR\"" >> run.sh
+ chmod +x run.sh
+ fi
+ "$EXEC" "${CMDARGS[@]}" 1>"$STDOUT" 2>"$STDERR"
+else
+ if [ "$SWIFT_GEN_SCRIPTS" != "" ]; then
+ echo "#!/bin/bash" > run.sh
+ echo "\"$EXEC\" \"${CMDARGS[@]}\" 1>\"$STDOUT\" 2>\"$STDERR\" <\"$STDIN\"" >> run.sh
+ chmod +x run.sh
+ fi
+ "$EXEC" "${CMDARGS[@]}" 1>"$STDOUT" 2>"$STDERR" <"$STDIN"
+fi
+checkError $? "Exit code $?"
+
+if [ ! -s "$STDOUT" ]; then
+ log "Removing empty stdout"
+ rm -f $STDOUT
+fi
+if [ ! -s "$STDERR" ]; then
+ log "Removing empty stderr"
+ rm -f $STDERR
+fi
+
+logstate "EXECUTE_DONE"
+log "Job ran successfully"
+
+MISSING=
+for O in $OUTF ; do
+ if [ ! -f "$O" ]; then
+ if [ "$MISSING" == "" ]; then
+ MISSING=$O
+ else
+ MISSING="$MISSING, $O"
+ fi
+ fi
+done
+if [ "$MISSING" != "" ]; then
+ fail 254 "The following output files were not created by the application: $MISSING"
+fi
+
+logstate "END"
+
+closeinfo
+
+# ensure we exit with a 0 after a successful execution
+exit 0
+
Property changes on: trunk/libexec/_swiftwrap.staging
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/libexec/vdl-int-staging.k
===================================================================
--- trunk/libexec/vdl-int-staging.k (rev 0)
+++ trunk/libexec/vdl-int-staging.k 2010-01-21 21:32:32 UTC (rev 3203)
@@ -0,0 +1,292 @@
+import("sys.k")
+import("task.k")
+import("vdl-lib.xml")
+/*
+ * Things that are not exposed to the translated file
+ */
+
+global(LOG:DEBUG, "debug")
+global(LOG:INFO, "info")
+global(LOG:WARN, "warn")
+global(LOG:ERROR, "error")
+global(LOG:FATAL, "fatal")
+
+namespace("vdl"
+ export(
+ element(isDone, [stageout]
+ sys:and(
+ for(pv, stageout
+ [path, var] := each(pv)
+ vdl:isLogged(var, path)
+ )
+ sys:not(isEmpty(stageout))
+ )
+ )
+
+ element(mark, [restarts, err, optional(mapping)]
+ if(
+ err for(pv, restarts
+ [path, var] := each(pv)
+ vdl:setFutureFault(var, path=path, mapping=mapping)
+ )
+ )
+ )
+
+ element(flatten, [...]
+ if (
+ isEmpty(...) ""
+
+ concat(
+ for(i, butLast(...), if(isList(i) flatten(i) i), "|") last(...)
+ )
+ )
+ )
+
+ element(initDDir, []
+ ddir := "{VDL:SCRIPTNAME}-{VDL:RUNID}.d"
+ once(ddir
+ if(sys:not(file:exists(ddir))
+ task:dir:make(ddir)
+ )
+ )
+ ddir
+ )
+
+ element(inFileDirs, [stageins]
+ for(file, stageins
+ reldirname(file)
+ )
+ )
+
+ element(outFileDirs, [stageouts]
+ for(pv, stageouts
+ [path, var] := each(pv)
+
+ file := vdl:filename(vdl:getfield(var, path = path))
+
+ dirname(file)
+ )
+ )
+
+ element(inFiles, [stageins]
+ pathnames(stageins)
+ )
+
+
+ element(outFiles, [stageouts]
+ for(pv, stageouts
+ [path, var] := each(pv)
+
+ file := vdl:filename(vdl:getfield(var, path = path))
+
+ file
+ )
+ )
+
+ element(fileDirs, [stageins, stageouts]
+ list(
+ unique(
+ inFileDirs(stageins)
+ outFileDirs(stageouts)
+ )
+ )
+ )
+
+ element(appStageins, [jobid, files, dir]
+ for(file, files
+ provider := provider(file)
+ srchost := hostname(file)
+ srcdir := dirname(file)
+ destdir := dircat(dir, reldirname(file))
+ filename := basename(file)
+
+ stageIn(
+ "{provider}://{srchost}/{srcdir}/{filename}",
+ "{destdir}/{filename}"
+ )
+ )
+ )
+
+ element(appStageouts, [jobid, stageouts, dir]
+ for(pv, stageouts
+ [path, var] := each(pv)
+ file := vdl:absfilename(vdl:getfield(var, path = path))
+ provider := vdl:provider(file)
+ dhost := vdl:hostname(file)
+ rdir := dircat(dir, reldirname(file))
+ bname := basename(file)
+ ldir := dirname(file)
+ fullLocal := dircat(ldir, bname)
+ fullRemote := dircat(rdir, bname)
+
+ stageOut(
+ "{rdir}/{bname}",
+ "{provider}://{dhost}/{ldir}/{bname}"
+ )
+ )
+ )
+
+ element(doRestartlog, [restartouts]
+ uParallelFor(f, restartouts,
+ [path, var] := each(f)
+ vdl:logvar(var, path)
+ )
+ )
+
+ element(graphStuff, [tr, stagein, stageout, err, optional(args)]
+ if(
+ vdl:configProperty("pgraph") != "false" then(
+ errprops := if(err ",color=lightsalmon" ",color=lightsteelblue1")
+ tp := vdl:threadPrefix()
+ to(graph,
+ concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
+ )
+ for(si, stagein
+ si := basename(si)
+ to(graph
+ concat(str:quote(si), " [shape=parallelogram]")
+ concat(str:quote(si), " -> ", str:quote(tp))
+ )
+ )
+ for(pv, stageout
+ [path, var] := each(pv)
+ file := vdl:fileName(vdl:getfield(var, path=path))
+ file := basename(file)
+ label := vdl:niceName(var, path = path)
+ to(graph
+ concat(str:quote(file), " [shape=parallelogram,label=",
+ str:quote(label), "]")
+ concat(str:quote(tp), " -> ", str:quote(file))
+ )
+ )
+ )
+ )
+ )
+
+ element(fileSizes, [files]
+ math:sum(
+ for(f, files, file:size(file))
+ )
+ )
+
+ element(cleanups, [cleanup]
+ log(LOG:INFO, "START cleanups={cleanup}")
+ )
+
+ element(execute2, [tr, optional(arguments, stdin, stdout, stderr), stagein, stageout, restartout,
+ replicationGroup, replicationChannel]
+ stagein := list(unique(each(stagein)))
+ stageout := list(unique(each(stageout)))
+ allocateHost(rhost, constraints=vdl:jobConstraints(tr, stagein=stagein)
+
+ ddir := initDDir()
+
+ uid := uid()
+ jobdir := substring(uid, from=0, to=1)
+ jobid := concat(tr, "-", uid)
+
+ fileDirs := fileDirs(stagein, stageout)
+
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+
+ wrapper := "_swiftwrap.staging"
+ wrapfile := "{ddir}/param-{jobid}"
+
+ stdout := try(stdout, "stdout.txt")
+ stderr := try(stderr, "stderr.txt")
+
+ vdl:setprogress("Stage in")
+ wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+ tmpdir := dircat(concat(wfdir, "/jobs/", jobdir), jobid)
+
+ try(
+ sequential(
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " tmpdir={tmpdir} host={rhost}")
+
+ vdl:setprogress("Submitting")
+
+ vdl:execute(
+ vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
+ list(
+ vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
+ wrapper,
+ "-e", vdl:executable(tr, rhost),
+ "-out", stdout,
+ "-err", stderr,
+ "-i", maybe(stdin),
+ "-d", flatten(each(fileDirs)),
+ "-if", flatten(infiles(stagein)),
+ "-of", flatten(outfiles(stageout)),
+ "-k",
+ "-status", "provider"
+ "-a", maybe(each(arguments))
+ )
+ directory = "{wfdir}-{jobdir}-{jobid}"
+ redirect = false
+ host = rhost
+ vdl:tcprofile(rhost, tr = tr) //this gets various app params from the tc, such as environment, walltime, etc
+ replicationGroup = replicationGroup
+ replicationChannel = replicationChannel
+ jobid = jobid
+
+ /*stageIn("copy://localhost/{swift.home}/libexec/{wrapper}", wrapper)
+ appStageins(jobid, stagein, ".")
+ stageOut("wrapper.log", "copy://localhost/{ddir}/{jobid}.info")
+ stageOut("{stdout}", "copy://localhost/{ddir}/{stdout}")
+ stageOut("{stderr}", "copy://localhost/{ddir}/{stderr}")*/
+
+ stageIn("file://localhost/{swift.home}/libexec/{wrapper}", wrapper)
+ appStageins(jobid, stagein, ".")
+ stageOut("wrapper.log", "file://localhost/{ddir}/{jobid}.info")
+ stageOut("{stdout}", "file://localhost/{ddir}/{stdout}")
+ stageOut("{stderr}", "file://localhost/{ddir}/{stderr}")
+
+ task:cleanUp("") //the whole job directory
+ appStageouts(jobid, stageout, ".")
+ )
+ doRestartlog(restartout)
+ log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ )
+ catch("^Abort$"
+ log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+ throw(exception)
+ )
+ catch("^(?!Abort$).*"
+ vdl:setprogress("Failed but can retry")
+ log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+ throw(
+ exception(
+ concat(
+ "Exception in {tr}:", nl(),
+ maybe("Arguments: {arguments}", nl()),
+ "Host: {rhost}", nl(),
+ "Directory: {tmpdir}",
+ "TODO: outs", nl(),
+ "----", nl()
+ )
+ exception
+ )
+ )
+ )
+ )
+ )
+ )
+
+ element(generateProvenanceGraph, [gdata]
+ pgraph := vdl:configProperty("pgraph")
+ gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
+ file:write(gname
+ "digraph SwiftProvenance {{", nl()
+ " graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
+ " node [", vdl:configProperty("pgraph.node.options"), "];", nl()
+
+ for(i, gdata
+ " ", i, nl()
+ )
+ "}", nl()
+ )
+ log(LOG:INFO, "Provenance graph saved in ", gname)
+ )
+ )
+)
Modified: trunk/libexec/vdl.k
===================================================================
--- trunk/libexec/vdl.k 2010-01-06 03:36:22 UTC (rev 3202)
+++ trunk/libexec/vdl.k 2010-01-21 21:32:32 UTC (rev 3203)
@@ -9,8 +9,11 @@
import("vdl-sc.k", export = true)
import("vdl-lib.xml", export = true)
+
+ pstaging := configProperty("use.provider.staging")
+ int := if (pstaging == "true", "vdl-int-staging.k", "vdl-int.k")
- import("vdl-int.k")
+ import(int)
import("java.k")
once("vdl.k-print-version"
More information about the Swift-commit
mailing list