[Swift-commit] r5797 - trunk/libexec
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Sat May 26 18:14:21 CDT 2012
Author: hategan
Date: 2012-05-26 18:14:20 -0500 (Sat, 26 May 2012)
New Revision: 5797
Added:
trunk/libexec/_swiftwrap.wrapperstaging
trunk/libexec/vdl-int-wrapper-staging.k
Modified:
trunk/libexec/vdl.k
Log:
added wrapper staging libraries and wrapper
Added: trunk/libexec/_swiftwrap.wrapperstaging
===================================================================
--- trunk/libexec/_swiftwrap.wrapperstaging (rev 0)
+++ trunk/libexec/_swiftwrap.wrapperstaging 2012-05-26 23:14:20 UTC (rev 5797)
@@ -0,0 +1,544 @@
+#!/bin/bash
+# this script must be invoked inside of bash, not plain sh
+# note that this script modifies $IFS
+
+# Toggle debugging output from debug()
+DEBUG=0
+
+infosection() {
+ echo >& "$INFO"
+ echo "_____________________________________________________________________________" >& "$INFO"
+ echo >& "$INFO"
+ echo " $1" >& "$INFO"
+ echo "_____________________________________________________________________________" >& "$INFO"
+ echo >& "$INFO"
+}
+
+info() {
+ infosection "command line"
+ echo $COMMANDLINE 2>&1 >& "$INFO"
+ infosection "uname -a"
+ uname -a 2>&1 >& "$INFO"
+ infosection "id"
+ id 2>&1 >& "$INFO"
+ infosection "env"
+ env 2>&1 >& "$INFO"
+ infosection "df"
+ df 2>&1 >& "$INFO"
+ if [ -e "/proc/cpuinfo" ]; then
+ infosection "/proc/cpuinfo"
+ cat /proc/cpuinfo 2>&1 >& "$INFO"
+ fi
+ if [ -e "/proc/meminfo" ]; then
+ infosection "/proc/meminfo"
+ cat /proc/meminfo 2>&1 >& "$INFO"
+ fi
+ if [ -f "$STDOUT" ] ; then
+ infosection "stdout"
+ cat $STDOUT >& "$INFO"
+ fi
+ if [ -f "$STDERR" ] ; then
+ infosection "stderr"
+ cat $STDERR >& "$INFO"
+ fi
+}
+
+logstate() {
+ echo "Progress " `date +"%Y-%m-%d %H:%M:%S.%N%z"` " $@" >& "$INFO"
+}
+
+log() {
+ echo "$@" >& "$INFO"
+}
+
+debug() {
+ [[ $DEBUG == 1 ]] && echo "$@" >& "$INFO"
+}
+
+stagein() {
+ SRC=$1
+ DST=$2
+
+ log "Staging in $URLPREFIX/$SRC to $DST"
+
+ case $URLPREFIX in
+ file://*)
+ SRC2="${URLPREFIX#file://}/$SRC"
+ log "Copying $SRC2 to $DST"
+ cp $SRC2 $DST
+ ;;
+ *://*)
+ fail 254 "Cannot handle protocol ${URLPREFIX%%://*}"
+ ;;
+ *)
+ fail 254 "Invalid URL prefix: $URLPREFIX"
+ ;;
+ esac
+}
+
+stageout() {
+ SRC=$1
+ DST=$2
+
+ log "Staging out $SRC to $URLPREFIX/$DST"
+
+ case $URLPREFIX in
+ file://*)
+ DST2="${URLPREFIX#file://}/$DST"
+ DIR=`dirname $DST2`
+ mkdir -p $DIR
+ log "Copying $SRC to $DST2"
+ cp $SRC $DST2
+ ;;
+ *://*)
+ fail 254 "Cannot handle protocol ${URLPREFIX%%://*}"
+ ;;
+ *)
+ fail 254 "Invalid URL prefix: $URLPREFIX"
+ ;;
+ esac
+}
+
+
+fail() {
+ EC=$1
+ shift
+
+ if [ "X$DIR" != "X" ]; then
+ echo $@ >"$DIR/_error"
+ stageout "$DIR/_error" "$JOBDIR/error"
+ else
+ echo $@
+ fi
+
+ log $@
+ info
+ if [ "$STATUSMODE" = "files" ]; then
+ exit 0
+ else
+ exit $EC
+ fi
+}
+
+checkError() {
+ if [ "$?" != "0" ]; then
+ fail $@
+ fi
+}
+
+checkEmpty() {
+ if [ "$1" == "" ]; then
+ shift
+ fail 254 $@
+ fi
+}
+
+checkparamfile() {
+ log "checking for paramfile"
+ if [ "$1" == "-p" ]; then
+ PARAMFILE="$SCRATCH/_paramfile"
+ stagein "$URLPREFIX/$2" "$PARAMFILE"
+ fi
+ log "paramfile is: $PARAMFILE"
+}
+
+getarg() {
+ NAME=$1
+ shift
+ VALUE=""
+ SHIFTCOUNT=0
+ if [ "$PARAMFILE" == "" ] && [ "$1" == "$NAME" ]; then
+ shift
+ let "SHIFTCOUNT=$SHIFTCOUNT+1"
+ while [ "${1:0:1}" != "-" ] && [ "$#" != "0" ]; do
+ VALUE="$VALUE $1"
+ shift
+ let "SHIFTCOUNT=$SHIFTCOUNT+1"
+ done
+ VALUE="${VALUE:1}"
+ elif [ "$PARAMFILE" != "" ] && grep -E "^$NAME " $PARAMFILE ; then
+ VALUE=$(grep -E "^$NAME " $PARAMFILE | cut -d ' ' -f 2-)
+ else
+ fail 254 "Missing $NAME argument"
+ fi
+}
+
+openinfo() {
+ exec 3<> $1
+ INFO=3
+}
+
+closeinfo() {
+ exec 3>&-
+}
+
+contains() {
+ ARRAY=$1
+ X=$2
+
+ for a in ${!ARRAY}
+ do
+ if [[ ${a} == ${X} ]]; then
+ return 0
+ fi
+ done
+ return 1
+}
+
+genScripts() {
+ echo "#!/bin/bash" > run.sh
+ echo -n "\"$EXEC\" " >> run.sh
+ for CMDARG in "${CMDARGS[@]}"; do
+ echo -n "\"$CMDARG\" " >> run.sh
+ done
+ echo "1>\"$STDOUT\" 2>\"$STDERR\"" >> run.sh
+ chmod +x run.sh
+}
+
+cdm_local_output()
+{
+ L=$1
+
+ if [[ $CDM_FILE == "" ]]; then
+ return
+ fi
+
+ CDM_POLICY=$( cdm_lookup shared/cdm.pl $CDM_FILE $L )
+ if [[ $CDM_POLICY == "LOCAL" ]]; then
+ cdm_local_output_perform $L $CDM_POLICY
+ fi
+}
+
+cdm_local_output_perform()
+{
+ L=$1
+ TOOL=$2
+ REMOTE_DIR=$3
+ FLAGS=$3
+ log "Copying $REMOTE_DIR/$FILE to $JOBDIR/$FILE"
+ mkdir -p $REMOTE_DIR
+ checkError 254 "CDM[LOCAL]: mkdir -p $REMOTE_DIR failed!"
+ $TOOL $FLAGS $JOBDIR/$FILE $REMOTE_DIR/$FILE
+ checkError 254 "CDM[LOCAL]: Tool failed!"
+}
+
+cdm_gather()
+{
+ GATHER_OUTPUT=${*}
+ if [[ $CDM_FILE == "" ]]; then
+ return
+ fi
+ if [[ $GATHER_OUTPUT == "" ]]; then
+ return
+ fi
+
+ cdm_gather_action $GATHER_MAX $GATHER_OUTPUT
+}
+
+COMMANDLINE=$@
+
+PARAMFILE=
+
+openinfo "wrapper.log"
+ID=$1
+checkEmpty "$ID" "Missing job ID"
+
+shift
+
+getarg "-urlprefix" "$@"
+URLPREFIX=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-jobdir" "$@"
+JOBDIR=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-scratch" "$@"
+SCRATCH=$VALUE/$ID
+shift $SHIFTCOUNT
+
+checkparamfile "$@"
+
+INFODIR=$SCRATCH
+
+checkEmpty "$JOBDIR" "Missing job directory prefix"
+mkdir -p $INFODIR
+closeinfo
+
+if [ -z $MPI_RANK ]; then
+ INFOFILE="$INFODIR/_info"
+else
+ # Rename info file for each rank
+ INFOFILE="$INFODIR/_info-${PMI_RANK}"
+ # Build list of per-rank info files
+ echo $INFOFILE >> $INFODIR/_info
+fi
+rm -f $INFOFILE
+openinfo "$INFOFILE"
+
+logstate "LOG_START"
+infosection "Wrapper (_swiftwrap)"
+
+getarg "-e" "$@"
+EXEC=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-out" "$@"
+STDOUT=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-err" "$@"
+STDERR=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-i" "$@"
+STDIN=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-d" "$@"
+DIRS=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-if" "$@"
+INF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-of" "$@"
+OUTF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-wt" "$@"
+WRAPPERLOG_ALWAYS_TRANSFER=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-sk" "$@"
+SITEDIR_KEEP=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-cdmfile" "$@"
+CDM_FILE=
+if [ "X$VALUE" != "X" ]; then
+ CDM_FILE=shared/$VALUE
+fi
+shift $SHIFTCOUNT
+
+getarg "-status" "$@"
+STATUSMODE=$VALUE
+shift $SHIFTCOUNT
+
+declare -a CMDARGS
+if [ "$PARAMFILE" == "" ] && [ "$1" == "-a" ] ; then
+ shift
+ CMDARGS=("$@")
+elif [ "$PARAMFILE" != "" ] ; then
+ CMDARGS=()
+ FIRST=1
+ while read line ; do
+ if [ "$FIRST" == "1" ] ; then
+ CMDARGS=("$line")
+ FIRST=0
+ else
+ CMDARGS=("${CMDARGS[@]}" "$line")
+ fi
+ done < <(grep -E "^-a " $PARAMFILE | cut -d " " -f 2-)
+else
+ fail 254 "Missing arguments (-a option)"
+fi
+
+if [ "X$CDM_FILE" != "X" ]; then
+ # TODO
+ logstate "SOURCE_CDM_LIB $WFDIR/shared/cdm_lib.sh"
+ source $WFDIR/shared/cdm_lib.sh
+ checkError 254 "Could not source: $WFDIR/shared/cdm_lib.sh"
+fi
+
+if [ "X$SCRATCH" == "X" ]; then
+ error "Wrapper staging requires a scratch directory to be specified"
+fi
+
+DIR="$SCRATCH/$JOBID"
+
+PATH=$PATH:/bin:/usr/bin
+
+if [ "$PATHPREFIX" != "" ]; then
+ export PATH=$PATHPREFIX:$PATH
+fi
+
+if [ "$SWIFT_EXTRA_INFO" != "" ]; then
+ log "EXTRAINFO=$($SWIFT_EXTRA_INFO)"
+fi
+
+if [ "X${EXEC:0:1}" != "X/" ] ; then
+ export ORIGEXEC=$EXEC
+ export EXEC=$(which $EXEC)
+ if [ "X$EXEC" = "X" ] ; then
+ fail 254 "Cannot find executable $ORIGEXEC on site system path"
+ fi
+fi
+
+log "PID=$$"
+log "HOST=$HOST"
+log "PWD=$PWD"
+log "DIR=$DIR"
+log "EXEC=$EXEC"
+log "STDIN=$STDIN"
+log "STDOUT=$STDOUT"
+log "STDERR=$STDERR"
+log "DIRS=$DIRS"
+log "INF=$INF"
+log "OUTF=$OUTF"
+log "WRAPPERLOG_ALWAYS_TRANSFER=$WRAPPER_LOG_ALWAYS_TRANSFER"
+log "SITEDIR_KEEP=$SITEDIR_KEEP"
+log "CDM_FILE=$CDM_FILE"
+log "ARGS=$@"
+log "ARGC=$#"
+[ -n $MPI_RANK ] && log "MPI_RANK=$MPI_RANK" && log "PMI_RANK=$PMI_RANK"
+IFS="|"
+
+logstate "CREATE_JOBDIR"
+mkdir -p $DIR
+checkError 254 "Failed to create job directory $DIR"
+log "Created job directory: $DIR"
+
+if [[ $PMI_RANK == "" || $PMI_RANK == 0 ]]; then
+
+ logstate "CREATE_INPUTDIR"
+ for D in $DIRS ; do
+ mkdir -p "$DIR/$D" 2>&1 >>"$INFO"
+ checkError 254 "Failed to create input directory $D"
+ log "Created output directory: $DIR/$D"
+ done
+
+ logstate "LINK_INPUTS"
+ for L in $INF ; do
+ CDM_POLICY="DEFAULT"
+ if [[ $CDM_FILE != "" ]]; then
+ CDM_POLICY=$( cdm_lookup shared/cdm.pl $CDM_FILE $L )
+ fi
+ if [[ $CDM_POLICY != "DEFAULT" && $CDM_POLICY != "EXTERNAL"* ]]; then
+ log "CDM_POLICY: $L -> $CDM_POLICY"
+ eval cdm_action $DIR "INPUT" $L $CDM_POLICY
+ continue
+ fi
+
+ stagein $L "$DIR/$L"
+ done
+
+ if [[ $CDM_FILE != "" ]]; then
+ logstate "LINK_CDM_OUTPUTS"
+ SKIPPED_OUTPUT=()
+ GATHER_OUTPUT=()
+ for L in $OUTF ; do
+ CDM_POLICY=$( cdm_lookup shared/cdm.pl $CDM_FILE $L )
+ log "CDM_POLICY: $L -> $CDM_POLICY"
+ if [[ $CDM_POLICY != "DEFAULT" &&
+ $CDM_POLICY != "BROADCAST"* ]]; then
+ eval cdm_action $DIR "OUTPUT" $L $CDM_POLICY
+ SKIPPED_OUTPUT=( $SKIPPED_OUTPUT $L )
+ fi
+ if [ $CDM_POLICY == "GATHER" ]; then
+ GATHER_OUTPUT=( $GATHER_OUTPUT $L )
+ elif [ $CDM_POLICY == "LOCAL" ]; then
+ CDM_LOCAL_OUTPUT=( $CDM_LOCAL_OUTPUT $L )
+ fi
+ done
+ fi
+
+fi # PMI_RANK==0
+
+debug "Moving to jobdir: $DIR"
+cd $DIR
+if [ $? != 0 ]; then
+ log "PWD: $PWD"
+ log $( find . )
+ fail 254 "Could not cd to: $DIR"
+fi
+logstate "EXECUTE"
+
+debug "Command line: $EXEC ${CMDARGS[@]}"
+
+if [ ! -f "$EXEC" ]; then
+ fail 254 "The executable $EXEC does not exist"
+fi
+if [ ! -x "$EXEC" ]; then
+ fail 254 "The executable $EXEC does not have the executable bit set"
+fi
+
+if [ "$STDIN" == "" ]; then
+ if [ "$SWIFT_GEN_SCRIPTS" != "" ]; then
+ genScripts
+ fi
+ "$EXEC" "${CMDARGS[@]}" 1>"$STDOUT" 2>"$STDERR"
+else
+ if [ "$SWIFT_GEN_SCRIPTS" != "" ]; then
+ genScripts
+ fi
+ "$EXEC" "${CMDARGS[@]}" 1>"$STDOUT" 2>"$STDERR" <"$STDIN"
+fi
+checkError $? "Application $EXEC failed with an exit code of $?"
+
+logstate "EXECUTE_DONE"
+log "Job ran successfully"
+
+if [[ $MPI_RANK == "" || $MPI_RANK == 0 ]]; then
+
+ MISSING=
+ for O in $OUTF ; do
+ if [ ! -f "$DIR/$O" ]; then
+ if [ "$MISSING" == "" ]; then
+ MISSING=$O
+ else
+ MISSING="$MISSING, $O"
+ fi
+ fi
+ done
+ if [ "$MISSING" != "" ]; then
+ log $( find . )
+ fail 254 "The following output files were not created by the application: $MISSING"
+ fi
+
+ logstate "MOVING_OUTPUTS $OUTF"
+ for O in $OUTF ; do
+ if ! contains SKIPPED_OUTPUT $O ; then
+ stageout "$DIR/$O" "$O"
+ fi
+ done
+
+ cdm_local_output $CDM_LOCAL_OUTPUT
+ cdm_gather $GATHER_OUTPUT
+
+ if [ "$STATUSMODE" = "files" ]; then
+ logstate "TOUCH_SUCCESS"
+ touch ${ID}-success
+ stageout "$DIR/${ID}-success" "$JOBDIR/success"
+ fi
+
+ log "Moving back to workflow directory $WFDIR"
+ cd $WFDIR
+ if [ $? != 0 ]; then
+ fail 254 "Could not cd to workflow directory: $WFDIR"
+ fi
+
+ if [ "$WRAPPERLOG_ALWAYS_TRANSFER" == "true" ]; then
+ stageout "$INFOFILE" "$JOBDIR/info"
+ fi
+ if [ "$SITEDIR_KEEP" != "true" ]; then
+ logstate "RM_JOBDIR"
+ rm -rf "$DIR" 2>&1 >& "$INFO"
+ checkError 254 "Failed to remove job directory $DIR"
+ fi
+else
+ # Allow rank 0 to write output
+ sleep 1
+fi # MPI_RANK==0
+
+logstate "END"
+
+closeinfo
+
+if [ "$WRAPPER_LOG_ALWAYS_TRANSFER" == "true" ]; then
+ stageout "$DIR/${ID}-info" "$JOBID/${ID}-info"
+fi
+
+# ensure we exit with a 0 after a successful execution
+exit 0
Property changes on: trunk/libexec/_swiftwrap.wrapperstaging
___________________________________________________________________
Added: svn:executable
+ *
Added: trunk/libexec/vdl-int-wrapper-staging.k
===================================================================
--- trunk/libexec/vdl-int-wrapper-staging.k (rev 0)
+++ trunk/libexec/vdl-int-wrapper-staging.k 2012-05-26 23:14:20 UTC (rev 5797)
@@ -0,0 +1,465 @@
+import("sys.k")
+import("task.k")
+import("vdl-lib.xml")
+/*
+ * Things that are not exposed to the translated file
+ */
+
+global(LOG:DEBUG, "debug")
+global(LOG:INFO, "info")
+global(LOG:WARN, "warn")
+global(LOG:ERROR, "error")
+global(LOG:FATAL, "fatal")
+
+global(URL_PREFIX,
+ elementDef(getURLPrefix, className="org.griphyn.vdl.karajan.lib.GetURLPrefix")
+ getURLPrefix()
+)
+
+global(WRAPPERLOG_ALWAYS_TRANSFER, vdl:configProperty("wrapperlog.always.transfer"))
+global(SITEDIR_KEEP, vdl:configProperty("sitedir.keep"))
+
+
+namespace("vdl"
+ export(
+ element(rmdir, [dir, host]
+ parallelFor(entry, file:list(dir, host=host)
+ epath := "{dir}/{entry}"
+ if(
+ file:isDirectory(epath, host=host) rmdir(epath, host)
+ file:remove(epath, host=host)
+ )
+ )
+ dir:remove(dir, host=host)
+ )
+
+ element(createdirs, [path, dir, host]
+ dc := dircat(dir, path)
+ log(LOG:INFO, "START path={path} dir={dir} - Creating directory structure")
+
+ dir:make(dc, host=host)
+ )
+
+ element(checkJobStatus, [wfdir, jobid, tr]
+ log(LOG:DEBUG, "START jobid={jobid}")
+ try(
+ sequential(
+ /*
+ * This is a bit of optimization, but I'm not completely
+ * sure of its correctness. The goal is to both detect
+ * the presence of the success file and remove it, all
+ * in one operation. It relies on file:remove() throwing
+ * an exception if the file is not there.
+ */
+ file:remove("{wfdir}/status/{jobdir}/{jobid}-success", host=rhost)
+ log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
+ )
+ sequential(
+ try (
+ msg = checkErrorFile(rhost, wfdir, jobid, jobdir)
+ sequential (
+ log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
+ throw("No status file was found. Check the shared filesystem on {rhost}")
+ )
+ )
+ )
+ throw(checkErrorFile(rhost, wfdir, jobid, jobdir))
+ )
+ )
+
+ element(checkErrorFile, [wfdir, jobid]
+ if (
+ file:exists("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost) then(
+ log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
+ task:transfer(srchost=rhost, srcdir="{wfdir}/status/{jobdir}", srcfile="{jobid}-error")
+ error := parallel(
+ file:remove("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)
+ sequential(
+ str:strip(file:read("{jobid}-error"))
+ file:remove("{jobid}-error")
+ )
+ )
+ error
+ )
+ else (
+ log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Error file missing")
+ throw("No status file was found. Check the shared filesystem on {rhost}")
+ )
+ )
+ )
+
+ element(initSharedDir, [rhost]
+ once(list(rhost, "shared")
+ vdl:setprogress("Initializing site shared directory")
+
+ log(LOG:INFO, "START host={rhost} - Initializing shared directory")
+
+ wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+ dir:make(wfdir, host=rhost)
+ transfer(srcdir="{swift.home}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
+
+ wfdir
+ //we send the cleanup data to vdl:main()
+ to(cleanup, list(wfdir, rhost))
+ log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
+ )
+ )
+
+ element(initDDir, []
+ ddir := "{VDL:SCRIPTNAME}-{VDL:RUNID}.d"
+ once(ddir
+ if(sys:not(file:exists(ddir))
+ task:dir:make(ddir)
+ )
+ )
+ ddir
+ )
+
+ element(inFiles, [stageins]
+ pathnames(stageins)
+ )
+
+ element(fileDirs, [stageins, stageouts]
+ list(
+ unique(
+ inFileDirs(stageins)
+ outFileDirs(stageouts)
+ )
+ )
+ )
+
+ element(createDirSet, [jobid, destdir, host, dirs]
+ /*
+ * Ideally this would be done by creating a tree of the directories
+ * to be created and (eventually) exploiting the concurrency in that.
+ */
+ log(LOG:INFO, "START jobid={jobid} host={host} - Initializing directory structure")
+ for(u, dirs
+ cacheOn(list(u, destdir, host)
+ createdirs(u, destdir, host)
+ )
+ )
+ log(LOG:INFO, "END jobid={jobid} - Done initializing directory structure")
+ )
+
+ element(cleanup, [dir, host]
+ log(LOG:INFO, "START dir={dir} host={host}")
+ cdmfile := cdm:file()
+ log(LOG:DEBUG, "cdmfile {cdmfile}")
+ if(cdmfile != "" &
+ cdm:get("GATHER_DIR") != "UNSET" then(
+ log(LOG:INFO, "submitting cdm_cleanup.sh to {dir}")
+ task:transfer(srcfile="cdm_cleanup.sh",
+ srcdir="{swift.home}/libexec",
+ desthost=host, destdir=dir)
+ task:transfer(srcfile="cdm_lib.sh",
+ srcdir="{swift.home}/libexec",
+ desthost=host, destdir=dir)
+ log(LOG:INFO, "execute: cdm_cleanup.sh")
+ task:execute(
+ executable="/bin/bash",
+ arguments=list("{dir}/cdm_cleanup.sh",
+ cdm:get("GATHER_DIR"), cdm:get("GATHER_TARGET")
+ sys:uid() )
+ host=host, batch=true, tcprofile(host))
+ )
+ )
+ if(vdl:configProperty("sitedir.keep") == "false"
+ task:execute(
+ vdl:siteprofile(host, "swift:cleanupCommand"),
+ arguments=list(
+ vdl:siteprofile(host, "swift:cleanupCommandOptions"),
+ dir
+ )
+ host=host, batch=true, tcprofile(host))
+ )
+ log(LOG:INFO, "END dir={dir} host={host}")
+ )
+
+ element(cleanups, [cleanup]
+ log(LOG:INFO, "START cleanups={cleanup}")
+ parallelFor(i, cleanup
+ [dir, host] := each(i)
+ try(
+ vdl:cleanup(dir, host)
+ catch(".*",
+ log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
+ to(warnings, exception("Cleanup on {host} failed", exception))
+ )
+ )
+ )
+ log(LOG:INFO, "END cleanups={cleanup}")
+ )
+
+ element(cleanupFiles, [files, host]
+ uParallelFor(r, files
+ log(LOG:INFO, "Purging ", r, " on ", host)
+ file:remove(r, host=host)
+ vdl:cacheFileRemoved(r, host)
+ )
+ )
+
+ element(stageWrapperParams, [jobid, wrapfile, dir, host]
+ log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
+ provider := provider(wrapfile)
+ srchost := hostname(wrapfile)
+ srcdir := vdl:dirname(wrapfile)
+ destdir := dir
+ filename := basename(wrapfile)
+
+ cacheOn(list(destdir, host)
+ dir:make(destdir, host=host, provider=provider)
+ )
+
+ log(LOG:INFO, "END jobid={jobid}")
+ )
+
+ element(graphStuff, [tr, stagein, stageout, err, optional(args)]
+ if(
+ vdl:configProperty("pgraph") != "false" then(
+ errprops := if(err ",color=lightsalmon" ",color=lightsteelblue1")
+ tp := vdl:threadPrefix()
+ to(graph,
+ concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
+ )
+ for(si, stagein
+ si := basename(si)
+ to(graph
+ concat(str:quote(si), " [shape=parallelogram]")
+ concat(str:quote(si), " -> ", str:quote(tp))
+ )
+ )
+ for(pv, stageout
+ [path, var] := each(pv)
+ file := vdl:fileName(vdl:getfield(var, path=path))
+ file := basename(file)
+ label := vdl:niceName(var, path = path)
+ to(graph
+ concat(str:quote(file), " [shape=parallelogram,label=",
+ str:quote(label), "]")
+ concat(str:quote(tp), " -> ", str:quote(file))
+ )
+ )
+ )
+ )
+ )
+
+ element(fileSizes, [files]
+ math:sum(
+ for(f, files, file:size(file))
+ )
+ )
+
+ element(readStandardFiles, [jobdir, stdout, stderr]
+ concat(
+ for(f, list(list("stderr.txt", stderr), list("stdout.txt", stdout))
+ [name, file] := each(f)
+ destfile := "{jobdir}/{file}"
+ nl()
+ "{name}: "
+ try(
+ file:read(destfile)
+ nl()
+ )
+ )
+ )
+ )
+
+
+ element(execute2, [tr, optional(arguments, stdin, stdout, stderr, attributes), stagein, stageout, restartout,
+ replicationGroup, replicationChannel]
+ stagein := list(unique(each(stagein)))
+ stageout := list(unique(each(stageout)))
+
+ allocateHost(rhost, constraints=vdl:jobConstraints(tr, stagein=stagein)
+
+ ddir := initDDir()
+ wfdir := try(
+ initSharedDir(rhost)
+ throw(exception("Could not initialize shared directory on {rhost}", exception))
+ )
+
+ uid := uid()
+ jobid := "{tr}-{uid}"
+
+ jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
+
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+
+ statusMode := configProperty("status.mode",host=rhost)
+ wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
+
+ wrapfile := "{jobdir}/_paramfile"
+
+ stdout := try(getFieldValue(stdout), "stdout.txt")
+ stderr := try(getFieldValue(stderr), "stderr.txt")
+ fileDirs := fileDirs(stagein, stageout)
+ os := vdl:siteprofile(rhost, "SYSINFO:OS")
+
+ scratch := vdl:siteprofile(rhost, "scratch")
+
+ if(wrapperMode == "files"
+ sequential(
+ sys:file:write(wrapfile,
+ "-e ",vdl:executable(tr, rhost), nl(),
+ "-out ", stdout, nl(),
+ "-err ", stderr, nl(),
+ "-i ", maybe(getFieldValue(stdin)), nl(),
+ "-d ", flatten(each(fileDirs)), nl(),
+ "-if ", flatten(infiles(stagein)), nl(),
+ "-of ", flatten(outfiles(stageout)), nl(),
+ "-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+ "-sk", SITEDIR_KEEP,
+ "-cdmfile ", cdm:file(), nl(),
+ "-status ", statusMode, nl(),
+ for(a, arguments, "-a ", a, nl())
+ )
+ )
+ )
+
+ vdl:setprogress("Stage in")
+
+ try(
+ sequential(
+
+ if(wrapperMode == "files"
+ stageWrapperParams(jobid, wrapfile, wfdir, rhost)
+ )
+
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " host={rhost}")
+
+ vdl:setprogress("Submitting")
+
+ if(wrapperMode == "files"
+ vdl:execute(
+ vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
+ list(
+ vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
+ "_swiftwrap.wrapperstaging",
+ jobid,
+ "-urlprefix", URL_PREFIX,
+ "-jobdir", jobdir,
+ "-scratch", scratch,
+ "-p", wrapfile
+ )
+ directory=wfdir
+ redirect=false
+ host=rhost
+ vdl:tcprofile(rhost, maybe(attributes=attributes), tr=tr) //this gets various app params from the tc, such as environment, walltime, etc
+ replicationGroup=replicationGroup
+ replicationChannel=replicationChannel
+ jobid=jobid
+ )
+ )
+ if(wrapperMode == "args"
+ vdl:execute(
+ vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
+ list(
+ vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
+ "_swiftwrap.wrapperstaging",
+ jobid,
+ "-urlprefix", URL_PREFIX,
+ "-jobdir", jobdir,
+ "-scratch", scratch,
+ "-e", vdl:executable(tr, rhost),
+ "-out", stdout,
+ "-err", stderr,
+ "-i", maybe(getFieldValue(stdin)),
+ "-d", flatten(each(fileDirs)),
+ "-if", flatten(infiles(stagein)),
+ "-of", flatten(outfiles(stageout)),
+ "-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+ "-sk", SITEDIR_KEEP,
+ "-cdmfile", cdm:file(),
+ "-status", statusMode,
+ "-a", maybe(each(arguments))
+ )
+ directory=wfdir
+ redirect=false
+ host=rhost
+ vdl:tcprofile(rhost, maybe(attributes=attributes), tr=tr)
+ replicationGroup=replicationGroup
+ replicationChannel=replicationChannel
+ jobid=jobid
+ )
+ )
+
+ vdl:setprogress("Checking status")
+ if(statusMode == "files"
+ checkJobStatus(rhost, wfdir, jobid, tr)
+ )
+
+ if(wrapperMode == "files"
+ file:remove(wrapfile)
+ )
+
+ log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+
+
+ /* need to stage the files to upper scratch area in case they are not transfered to another site
+ before all the files get cleaned out */
+
+
+ vdl:setprogress("Stage out")
+ doRestartlog(restartout)
+
+ log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ )
+ catch("^Abort$"
+ log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+ throw(exception)
+ )
+ catch("^(?!Abort$).*"
+ vdl:setprogress("Failed but can retry")
+ prev := exception
+ exception := try(exception(checkErrorFile(rhost, wfdir, jobid)), prev)
+
+ log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+ if(matches(exception,".*executable bit.*")
+ generateError(exception)
+ )
+
+ outs := readStandardFiles(jobdir, stdout, stderr)
+
+ throw(
+ exception(
+ concat(
+ "Exception in {tr}:", nl(),
+ maybe("Arguments: ", arguments, nl()),
+ "Host: {rhost}", nl(),
+ "Directory: {scratch}/{jobid}",
+ "{outs}", nl(),
+ "----", nl()
+ )
+ exception
+ )
+ )
+ )
+ )
+ )
+ )
+
+ element(generateProvenanceGraph, [gdata]
+ pgraph := vdl:configProperty("pgraph")
+ gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
+ file:write(gname
+ "digraph SwiftProvenance {{", nl()
+ " graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
+ " node [", vdl:configProperty("pgraph.node.options"), "];", nl()
+
+ for(i, gdata
+ " ", i, nl()
+ )
+ "}", nl()
+ )
+ log(LOG:INFO, "Provenance graph saved in ", gname)
+ )
+ )
+)
+
+// Local variables:
+// mode: scheme
+// tab-width: 4
+// indent-tabs-mode: t
+// End:
Modified: trunk/libexec/vdl.k
===================================================================
--- trunk/libexec/vdl.k 2012-05-25 09:56:26 UTC (rev 5796)
+++ trunk/libexec/vdl.k 2012-05-26 23:14:20 UTC (rev 5797)
@@ -11,7 +11,12 @@
import("vdl-lib.xml", export = true)
pstaging := configProperty("use.provider.staging")
- int := if (pstaging == "true", "vdl-int-staging.k", "vdl-int.k")
+ wstaging := configProperty("use.wrapper.staging")
+ int := if (
+ pstaging == "true", "vdl-int-staging.k",
+ wstaging == "true", "vdl-int-wrapper-staging.k",
+ "vdl-int.k"
+ )
import(int)
import("java.k")
More information about the Swift-commit
mailing list