[Swift-commit] r5797 - trunk/libexec

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Sat May 26 18:14:21 CDT 2012


Author: hategan
Date: 2012-05-26 18:14:20 -0500 (Sat, 26 May 2012)
New Revision: 5797

Added:
   trunk/libexec/_swiftwrap.wrapperstaging
   trunk/libexec/vdl-int-wrapper-staging.k
Modified:
   trunk/libexec/vdl.k
Log:
added wrapper staging libraries and wrapper

Added: trunk/libexec/_swiftwrap.wrapperstaging
===================================================================
--- trunk/libexec/_swiftwrap.wrapperstaging	                        (rev 0)
+++ trunk/libexec/_swiftwrap.wrapperstaging	2012-05-26 23:14:20 UTC (rev 5797)
@@ -0,0 +1,544 @@
+#!/bin/bash
+# this script must be invoked inside of bash, not plain sh
+# note that this script modifies $IFS
+
+# Toggle debugging output from debug()
+DEBUG=0
+
+infosection() {
+	echo >& "$INFO"
+	echo "_____________________________________________________________________________" >& "$INFO"
+	echo >& "$INFO"
+	echo "        $1" >& "$INFO"
+	echo "_____________________________________________________________________________" >& "$INFO"
+	echo >& "$INFO"
+}
+
+info() {
+	infosection "command line"
+	echo $COMMANDLINE 2>&1 >& "$INFO"
+	infosection "uname -a"
+	uname -a 2>&1 >& "$INFO"
+	infosection "id"
+	id 2>&1 >& "$INFO"
+	infosection "env"
+	env 2>&1 >& "$INFO"
+	infosection "df"
+	df 2>&1 >& "$INFO"
+        if [ -e "/proc/cpuinfo" ]; then
+		infosection "/proc/cpuinfo"
+		cat /proc/cpuinfo 2>&1 >& "$INFO"
+	fi
+	if [ -e "/proc/meminfo" ]; then
+		infosection "/proc/meminfo"
+		cat /proc/meminfo 2>&1 >& "$INFO"
+	fi
+	if [ -f "$STDOUT" ] ; then
+		infosection "stdout"
+		cat $STDOUT >& "$INFO"
+	fi
+	if [ -f "$STDERR" ] ; then
+		infosection "stderr"
+		cat $STDERR >& "$INFO"
+	fi
+}
+
+logstate() {
+	echo "Progress " `date +"%Y-%m-%d %H:%M:%S.%N%z"` " $@" >& "$INFO"
+}
+
+log() {
+	echo "$@" >& "$INFO"
+}
+
+debug() {
+	[[ $DEBUG == 1 ]] && echo "$@" >& "$INFO"
+}
+
+stagein() {
+	SRC=$1
+	DST=$2
+	
+	log "Staging in $URLPREFIX/$SRC to $DST"
+	
+	case $URLPREFIX in
+		file://*)
+			SRC2="${URLPREFIX#file://}/$SRC"
+			log "Copying $SRC2 to $DST"
+			cp $SRC2 $DST
+			;;
+		*://*)
+			fail 254 "Cannot handle protocol ${URLPREFIX%%://*}"
+			;;
+		*)
+			fail 254 "Invalid URL prefix: $URLPREFIX"
+			;;
+	esac
+}
+
+stageout() {
+	SRC=$1
+	DST=$2
+	
+	log "Staging out $SRC to $URLPREFIX/$DST"
+	
+	case $URLPREFIX in
+		file://*)
+			DST2="${URLPREFIX#file://}/$DST"
+			DIR=`dirname $DST2`
+			mkdir -p $DIR
+			log "Copying $SRC to $DST2"
+			cp $SRC $DST2
+			;;
+		*://*)
+			fail 254 "Cannot handle protocol ${URLPREFIX%%://*}"
+			;;
+		*)
+			fail 254 "Invalid URL prefix: $URLPREFIX"
+			;;
+	esac
+}
+
+
+fail() {
+	EC=$1
+	shift
+	
+	if [ "X$DIR" != "X" ]; then
+		echo $@ >"$DIR/_error"
+		stageout "$DIR/_error" "$JOBDIR/error"
+	else
+		echo $@
+	fi
+		
+	log $@
+	info
+	if [ "$STATUSMODE" = "files" ]; then
+		exit 0
+	else
+		exit $EC
+	fi
+}
+
+checkError() {
+	if [ "$?" != "0" ]; then
+		fail $@
+	fi
+}
+
+checkEmpty() {
+	if [ "$1" == "" ]; then
+		shift
+		fail 254 $@
+	fi
+}
+
+checkparamfile() {
+	log "checking for paramfile"
+	if [ "$1" == "-p" ]; then
+		PARAMFILE="$SCRATCH/_paramfile"
+		stagein "$URLPREFIX/$2" "$PARAMFILE"
+	fi
+	log "paramfile is: $PARAMFILE"
+}
+
+getarg() {
+	NAME=$1
+	shift
+	VALUE=""
+	SHIFTCOUNT=0
+	if [ "$PARAMFILE" == "" ] && [ "$1" == "$NAME" ]; then
+		shift
+		let "SHIFTCOUNT=$SHIFTCOUNT+1"
+		while [ "${1:0:1}" != "-" ] && [ "$#" != "0" ]; do
+			VALUE="$VALUE $1"
+			shift
+			let "SHIFTCOUNT=$SHIFTCOUNT+1"
+		done
+		VALUE="${VALUE:1}"
+	elif [ "$PARAMFILE" != "" ] && grep -E "^$NAME " $PARAMFILE ; then
+		VALUE=$(grep -E "^$NAME " $PARAMFILE | cut -d ' ' -f 2-)
+	else
+		fail 254 "Missing $NAME argument"
+	fi
+}
+
+openinfo() {
+	exec 3<> $1
+	INFO=3
+}
+
+closeinfo() {
+	exec 3>&-
+}
+
+contains() {
+	ARRAY=$1
+	X=$2
+
+	for a in ${!ARRAY}
+	do
+		if [[ ${a} == ${X} ]]; then
+			return 0
+		fi
+	done
+	return 1
+}
+
+genScripts() {
+	echo "#!/bin/bash" > run.sh
+	echo -n "\"$EXEC\" " >> run.sh
+	for CMDARG in "${CMDARGS[@]}"; do
+    	echo -n "\"$CMDARG\" " >> run.sh
+	done
+	echo "1>\"$STDOUT\" 2>\"$STDERR\"" >> run.sh
+	chmod +x run.sh
+}
+
+cdm_local_output()
+{
+ 	L=$1
+
+	if [[ $CDM_FILE == "" ]]; then
+		return
+	fi
+
+ 	CDM_POLICY=$( cdm_lookup shared/cdm.pl $CDM_FILE $L )
+	if [[ $CDM_POLICY == "LOCAL" ]]; then
+		cdm_local_output_perform $L $CDM_POLICY
+	fi
+}
+
+cdm_local_output_perform()
+{
+	L=$1
+	TOOL=$2
+	REMOTE_DIR=$3
+	FLAGS=$3
+	log "Copying $REMOTE_DIR/$FILE to $JOBDIR/$FILE"
+	mkdir -p $REMOTE_DIR
+	checkError 254 "CDM[LOCAL]: mkdir -p $REMOTE_DIR failed!"
+	$TOOL $FLAGS $JOBDIR/$FILE $REMOTE_DIR/$FILE
+	checkError 254 "CDM[LOCAL]: Tool failed!"
+}
+
+cdm_gather()
+{
+	GATHER_OUTPUT=${*}
+	if [[ $CDM_FILE == "" ]]; then
+		return
+	fi
+	if [[ $GATHER_OUTPUT == "" ]]; then
+		return
+	fi
+
+	cdm_gather_action $GATHER_MAX $GATHER_OUTPUT
+}
+
+COMMANDLINE=$@
+
+PARAMFILE=
+
+openinfo "wrapper.log"
+ID=$1
+checkEmpty "$ID" "Missing job ID"
+
+shift
+
+getarg "-urlprefix" "$@"
+URLPREFIX=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-jobdir" "$@"
+JOBDIR=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-scratch" "$@"
+SCRATCH=$VALUE/$ID
+shift $SHIFTCOUNT
+
+checkparamfile "$@"
+
+INFODIR=$SCRATCH
+
+checkEmpty "$JOBDIR" "Missing job directory prefix"
+mkdir -p $INFODIR
+closeinfo
+
+if [ -z $MPI_RANK ]; then
+	INFOFILE="$INFODIR/_info"
+else
+	# Rename info file for each rank
+	INFOFILE="$INFODIR/_info-${PMI_RANK}"
+	# Build list of per-rank info files
+	echo $INFOFILE >> $INFODIR/_info
+fi
+rm -f $INFOFILE
+openinfo "$INFOFILE"
+
+logstate "LOG_START"
+infosection "Wrapper (_swiftwrap)"
+
+getarg "-e" "$@"
+EXEC=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-out" "$@"
+STDOUT=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-err" "$@"
+STDERR=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-i" "$@"
+STDIN=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-d" "$@"
+DIRS=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-if" "$@"
+INF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-of" "$@"
+OUTF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-wt" "$@"
+WRAPPERLOG_ALWAYS_TRANSFER=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-sk" "$@"
+SITEDIR_KEEP=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-cdmfile" "$@"
+CDM_FILE=
+if [ "X$VALUE" != "X" ]; then
+	CDM_FILE=shared/$VALUE
+fi
+shift $SHIFTCOUNT
+
+getarg "-status" "$@"
+STATUSMODE=$VALUE
+shift $SHIFTCOUNT
+
+declare -a CMDARGS
+if [ "$PARAMFILE" == "" ] && [ "$1" == "-a" ] ; then
+	shift
+	CMDARGS=("$@")
+elif [ "$PARAMFILE" != "" ] ; then
+	CMDARGS=()
+	FIRST=1
+	while read line ; do
+		if [ "$FIRST" == "1" ] ; then
+			CMDARGS=("$line")
+			FIRST=0
+		else
+			CMDARGS=("${CMDARGS[@]}" "$line")
+		fi
+	done < <(grep -E "^-a " $PARAMFILE | cut -d " " -f 2-)
+else
+	fail 254 "Missing arguments (-a option)"
+fi
+
+if [ "X$CDM_FILE" != "X" ]; then
+	# TODO
+	logstate "SOURCE_CDM_LIB $WFDIR/shared/cdm_lib.sh"
+	source $WFDIR/shared/cdm_lib.sh
+	checkError 254 "Could not source: $WFDIR/shared/cdm_lib.sh"
+fi
+
+if [ "X$SCRATCH" == "X" ]; then
+	error "Wrapper staging requires a scratch directory to be specified"
+fi
+
+DIR="$SCRATCH/$JOBID"
+
+PATH=$PATH:/bin:/usr/bin
+
+if [ "$PATHPREFIX" != "" ]; then
+	export PATH=$PATHPREFIX:$PATH
+fi
+
+if [ "$SWIFT_EXTRA_INFO" != "" ]; then
+	log "EXTRAINFO=$($SWIFT_EXTRA_INFO)"
+fi
+
+if [ "X${EXEC:0:1}" != "X/" ] ; then
+	export ORIGEXEC=$EXEC
+	export EXEC=$(which $EXEC)
+	if [ "X$EXEC" = "X" ] ; then
+		fail 254 "Cannot find executable $ORIGEXEC on site system path"
+	fi
+fi
+
+log "PID=$$"
+log "HOST=$HOST"
+log "PWD=$PWD"
+log "DIR=$DIR"
+log "EXEC=$EXEC"
+log "STDIN=$STDIN"
+log "STDOUT=$STDOUT"
+log "STDERR=$STDERR"
+log "DIRS=$DIRS"
+log "INF=$INF"
+log "OUTF=$OUTF"
+log "WRAPPERLOG_ALWAYS_TRANSFER=$WRAPPER_LOG_ALWAYS_TRANSFER"
+log "SITEDIR_KEEP=$SITEDIR_KEEP"
+log "CDM_FILE=$CDM_FILE"
+log "ARGS=$@"
+log "ARGC=$#"
+[ -n $MPI_RANK ] && log "MPI_RANK=$MPI_RANK" && log "PMI_RANK=$PMI_RANK"
+IFS="|"
+
+logstate "CREATE_JOBDIR"
+mkdir -p $DIR
+checkError 254 "Failed to create job directory $DIR"
+log "Created job directory: $DIR"
+
+if [[ $PMI_RANK == "" || $PMI_RANK == 0 ]]; then
+
+	logstate "CREATE_INPUTDIR"
+	for D in $DIRS ; do
+		mkdir -p "$DIR/$D" 2>&1 >>"$INFO"
+		checkError 254 "Failed to create input directory $D"
+		log "Created output directory: $DIR/$D"
+	done
+	
+	logstate "LINK_INPUTS"
+	for L in $INF ; do
+	    CDM_POLICY="DEFAULT"
+		if [[ $CDM_FILE != "" ]]; then
+			CDM_POLICY=$( cdm_lookup shared/cdm.pl $CDM_FILE $L )
+		fi
+		if [[ $CDM_POLICY != "DEFAULT" && $CDM_POLICY != "EXTERNAL"* ]]; then
+			log "CDM_POLICY: $L -> $CDM_POLICY"
+			eval cdm_action $DIR "INPUT" $L $CDM_POLICY
+			continue
+		fi
+		
+		stagein $L "$DIR/$L"
+	done
+	
+	if [[ $CDM_FILE != "" ]]; then
+	    logstate "LINK_CDM_OUTPUTS"
+	    SKIPPED_OUTPUT=()
+		GATHER_OUTPUT=()
+		for L in $OUTF ; do
+			CDM_POLICY=$( cdm_lookup shared/cdm.pl $CDM_FILE $L )
+			log "CDM_POLICY: $L -> $CDM_POLICY"
+			if [[ $CDM_POLICY != "DEFAULT" &&
+				  $CDM_POLICY != "BROADCAST"* ]]; then
+	    	    eval cdm_action $DIR "OUTPUT" $L $CDM_POLICY
+				SKIPPED_OUTPUT=( $SKIPPED_OUTPUT $L )
+			fi
+			if [ $CDM_POLICY == "GATHER" ]; then
+				GATHER_OUTPUT=( $GATHER_OUTPUT $L )
+			elif [ $CDM_POLICY == "LOCAL" ]; then
+				CDM_LOCAL_OUTPUT=( $CDM_LOCAL_OUTPUT $L )
+			fi
+		done
+	fi
+
+fi # PMI_RANK==0
+
+debug "Moving to jobdir: $DIR"
+cd $DIR
+if [ $? != 0 ]; then
+	log "PWD: $PWD"
+	log $( find . )
+	fail 254 "Could not cd to: $DIR"
+fi
+logstate "EXECUTE"
+
+debug "Command line: $EXEC ${CMDARGS[@]}"
+
+if [ ! -f "$EXEC" ]; then
+	fail 254 "The executable $EXEC does not exist"
+fi
+if [ ! -x "$EXEC" ]; then
+	fail 254 "The executable $EXEC does not have the executable bit set"
+fi
+
+if [ "$STDIN" == "" ]; then
+	if [ "$SWIFT_GEN_SCRIPTS" != "" ]; then
+		genScripts
+	fi
+	"$EXEC" "${CMDARGS[@]}" 1>"$STDOUT" 2>"$STDERR"
+else
+	if [ "$SWIFT_GEN_SCRIPTS" != "" ]; then
+		genScripts
+	fi
+	"$EXEC" "${CMDARGS[@]}" 1>"$STDOUT" 2>"$STDERR" <"$STDIN"
+fi
+checkError $? "Application $EXEC failed with an exit code of $?"
+
+logstate "EXECUTE_DONE"
+log "Job ran successfully"
+
+if [[ $MPI_RANK == "" || $MPI_RANK == 0 ]]; then
+
+	MISSING=
+	for O in $OUTF ; do
+		if [ ! -f "$DIR/$O" ]; then
+			if [ "$MISSING" == "" ]; then
+				MISSING=$O
+			else
+				MISSING="$MISSING, $O"
+			fi
+		fi
+	done
+	if [ "$MISSING" != "" ]; then
+		log $( find . )
+		fail 254 "The following output files were not created by the application: $MISSING"
+	fi
+	
+	logstate "MOVING_OUTPUTS $OUTF"
+	for O in $OUTF ; do
+		if ! contains SKIPPED_OUTPUT $O ; then
+			stageout "$DIR/$O" "$O" 
+		fi
+	done
+	
+	cdm_local_output $CDM_LOCAL_OUTPUT
+	cdm_gather $GATHER_OUTPUT
+	
+	if [ "$STATUSMODE" = "files" ]; then
+		logstate "TOUCH_SUCCESS"
+		touch ${ID}-success
+		stageout "$DIR/${ID}-success" "$JOBDIR/success"  
+	fi
+	
+	log "Moving back to workflow directory $WFDIR"
+	cd $WFDIR
+	if [ $? != 0 ]; then
+		fail 254 "Could not cd to workflow directory: $WFDIR"
+	fi
+	
+	if [ "$WRAPPERLOG_ALWAYS_TRANSFER" == "true" ]; then
+		stageout "$INFOFILE" "$JOBDIR/info"
+	fi
+	if [ "$SITEDIR_KEEP" != "true" ]; then
+		logstate "RM_JOBDIR"
+		rm -rf "$DIR" 2>&1 >& "$INFO"
+		checkError 254 "Failed to remove job directory $DIR"
+	fi
+else
+	# Allow rank 0 to write output
+	sleep 1
+fi # MPI_RANK==0
+
+logstate "END"
+
+closeinfo
+
+if [ "$WRAPPER_LOG_ALWAYS_TRANSFER" == "true" ]; then
+	stageout "$DIR/${ID}-info" "$JOBID/${ID}-info"
+fi
+
+# ensure we exit with a 0 after a successful execution
+exit 0


Property changes on: trunk/libexec/_swiftwrap.wrapperstaging
___________________________________________________________________
Added: svn:executable
   + *

Added: trunk/libexec/vdl-int-wrapper-staging.k
===================================================================
--- trunk/libexec/vdl-int-wrapper-staging.k	                        (rev 0)
+++ trunk/libexec/vdl-int-wrapper-staging.k	2012-05-26 23:14:20 UTC (rev 5797)
@@ -0,0 +1,465 @@
+import("sys.k")
+import("task.k")
+import("vdl-lib.xml")
+/*
+ * Things that are not exposed to the translated file
+ */
+
+global(LOG:DEBUG, "debug")
+global(LOG:INFO, "info")
+global(LOG:WARN, "warn")
+global(LOG:ERROR, "error")
+global(LOG:FATAL, "fatal")
+
+global(URL_PREFIX, 
+	elementDef(getURLPrefix, className="org.griphyn.vdl.karajan.lib.GetURLPrefix")
+	getURLPrefix() 
+)
+
+global(WRAPPERLOG_ALWAYS_TRANSFER, vdl:configProperty("wrapperlog.always.transfer"))
+global(SITEDIR_KEEP, vdl:configProperty("sitedir.keep"))
+
+
+namespace("vdl"
+	export(
+		element(rmdir, [dir, host]
+			parallelFor(entry, file:list(dir, host=host)
+				epath := "{dir}/{entry}"
+				if(
+					file:isDirectory(epath, host=host) rmdir(epath, host)
+					file:remove(epath, host=host)
+				)
+			)
+			dir:remove(dir, host=host)
+		)
+
+		element(createdirs, [path, dir, host]
+			dc := dircat(dir, path)
+			log(LOG:INFO, "START path={path} dir={dir} - Creating directory structure")
+
+			dir:make(dc, host=host)
+		)
+
+		element(checkJobStatus, [wfdir, jobid, tr]
+			log(LOG:DEBUG, "START jobid={jobid}")
+			try(
+				sequential(
+					/*
+					 * This is a bit of optimization, but I'm not completely
+					 * sure of its correctness. The goal is to both detect
+					 * the presence of the success file and remove it, all
+					 * in one operation. It relies on file:remove() throwing
+					 * an exception if the file is not there.
+					 */
+					file:remove("{wfdir}/status/{jobdir}/{jobid}-success", host=rhost)
+					log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
+				)
+				sequential(
+					try (
+						msg = checkErrorFile(rhost, wfdir, jobid, jobdir)
+						sequential (
+							log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
+							throw("No status file was found. Check the shared filesystem on {rhost}")
+						)
+					)
+				)
+				throw(checkErrorFile(rhost, wfdir, jobid, jobdir))
+			)
+		)
+		
+		element(checkErrorFile, [wfdir, jobid]
+			if (
+				file:exists("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost) then(
+					log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
+					task:transfer(srchost=rhost, srcdir="{wfdir}/status/{jobdir}", srcfile="{jobid}-error")
+					error := parallel(
+						file:remove("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)
+						sequential(
+							str:strip(file:read("{jobid}-error"))
+							file:remove("{jobid}-error")
+						)
+					)
+					error
+				)
+				else (
+					log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Error file missing")
+					throw("No status file was found. Check the shared filesystem on {rhost}")
+				)
+			)
+		)
+
+		element(initSharedDir, [rhost]
+			once(list(rhost, "shared")
+				vdl:setprogress("Initializing site shared directory")
+
+				log(LOG:INFO, "START host={rhost} - Initializing shared directory")
+
+				wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+				dir:make(wfdir, host=rhost)
+				transfer(srcdir="{swift.home}/libexec/", srcfile="_swiftwrap.wrapperstaging", destdir=wfdir, desthost=rhost)
+
+				wfdir
+				//we send the cleanup data to vdl:main()
+				to(cleanup, list(wfdir, rhost))
+				log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
+			)
+		)
+
+		element(initDDir, []
+			ddir := "{VDL:SCRIPTNAME}-{VDL:RUNID}.d"
+			once(ddir
+				if(sys:not(file:exists(ddir))
+					task:dir:make(ddir)
+				)
+			)
+			ddir
+		)
+
+		element(inFiles, [stageins]
+			pathnames(stageins)
+		)
+
+		element(fileDirs, [stageins, stageouts]
+			list(
+				unique(
+					inFileDirs(stageins)
+					outFileDirs(stageouts)
+				)
+			)
+		)
+
+		element(createDirSet, [jobid, destdir, host, dirs]
+			/*
+			 * Ideally this would be done by creating a tree of the directories
+			 * to be created and (eventually) exploiting the concurrency in that.
+			 */
+			log(LOG:INFO, "START jobid={jobid} host={host} - Initializing directory structure")
+			for(u, dirs
+				cacheOn(list(u, destdir, host)
+					createdirs(u, destdir, host)
+				)
+			)
+			log(LOG:INFO, "END jobid={jobid} - Done initializing directory structure")
+		)
+
+		element(cleanup, [dir, host]
+			log(LOG:INFO, "START dir={dir} host={host}")
+			cdmfile := cdm:file()
+			log(LOG:DEBUG, "cdmfile {cdmfile}")
+			if(cdmfile != "" &
+                           cdm:get("GATHER_DIR") != "UNSET" then(
+				log(LOG:INFO, "submitting cdm_cleanup.sh to {dir}")
+				task:transfer(srcfile="cdm_cleanup.sh",
+					      srcdir="{swift.home}/libexec",
+					      desthost=host, destdir=dir)
+				task:transfer(srcfile="cdm_lib.sh",
+					      srcdir="{swift.home}/libexec",
+					      desthost=host, destdir=dir)
+			        log(LOG:INFO, "execute: cdm_cleanup.sh")
+				task:execute(
+					     executable="/bin/bash",
+					     arguments=list("{dir}/cdm_cleanup.sh",
+					                     cdm:get("GATHER_DIR"), cdm:get("GATHER_TARGET")
+							     sys:uid() )
+					     host=host, batch=true, tcprofile(host))
+				)
+			)
+			if(vdl:configProperty("sitedir.keep") == "false"
+				task:execute(
+					vdl:siteprofile(host, "swift:cleanupCommand"),
+					arguments=list(
+						vdl:siteprofile(host, "swift:cleanupCommandOptions"),
+						dir
+					)
+					host=host, batch=true, tcprofile(host))
+			)
+			log(LOG:INFO, "END dir={dir} host={host}")
+		)
+
+		element(cleanups, [cleanup]
+			log(LOG:INFO, "START cleanups={cleanup}")
+			parallelFor(i, cleanup
+				[dir, host] := each(i)
+				try(
+					vdl:cleanup(dir, host)
+					catch(".*",
+						log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
+						to(warnings, exception("Cleanup on {host} failed", exception))
+					)
+				)
+			)
+			log(LOG:INFO, "END cleanups={cleanup}")
+		)
+
+		element(cleanupFiles, [files, host]
+			uParallelFor(r, files
+				log(LOG:INFO, "Purging ", r, " on ", host)
+				file:remove(r, host=host)
+				vdl:cacheFileRemoved(r, host)
+			)
+		)
+
+		element(stageWrapperParams, [jobid, wrapfile, dir, host]
+			log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
+			provider := provider(wrapfile)
+			srchost := hostname(wrapfile)
+			srcdir := vdl:dirname(wrapfile)
+			destdir := dir
+			filename := basename(wrapfile)
+
+			cacheOn(list(destdir, host)
+				dir:make(destdir, host=host, provider=provider)
+			)
+
+			log(LOG:INFO, "END jobid={jobid}")
+		)
+
+		element(graphStuff, [tr, stagein, stageout, err, optional(args)]
+			if(
+				vdl:configProperty("pgraph") != "false" then(
+					errprops := if(err ",color=lightsalmon" ",color=lightsteelblue1")
+					tp := vdl:threadPrefix()
+					to(graph,
+						concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
+					)
+					for(si, stagein
+						si := basename(si)
+						to(graph
+							concat(str:quote(si), " [shape=parallelogram]")
+							concat(str:quote(si), " -> ", str:quote(tp))
+						)
+					)
+					for(pv, stageout
+						[path, var] := each(pv)
+						file := vdl:fileName(vdl:getfield(var, path=path))
+						file := basename(file)
+						label := vdl:niceName(var, path = path)
+						to(graph
+							concat(str:quote(file), " [shape=parallelogram,label=",
+								str:quote(label), "]")
+							concat(str:quote(tp), " -> ", str:quote(file))
+						)
+					)
+				)
+			)
+		)
+
+		element(fileSizes, [files]
+			math:sum(
+				for(f, files, file:size(file))
+			)
+		)
+		
+		element(readStandardFiles, [jobdir, stdout, stderr]
+			concat(
+				for(f, list(list("stderr.txt", stderr), list("stdout.txt", stdout))
+					[name, file] := each(f)
+					destfile := "{jobdir}/{file}"
+					nl()
+					"{name}: "
+					try(
+						file:read(destfile)
+						nl()
+					)
+				)
+			)
+		)
+		
+
+		element(execute2, [tr, optional(arguments, stdin, stdout, stderr, attributes), stagein, stageout,  restartout,
+			replicationGroup, replicationChannel]
+			stagein := list(unique(each(stagein)))
+			stageout := list(unique(each(stageout)))
+
+			allocateHost(rhost, constraints=vdl:jobConstraints(tr, stagein=stagein)
+
+				ddir := initDDir()
+				wfdir := try(
+					initSharedDir(rhost)
+					throw(exception("Could not initialize shared directory on {rhost}", exception))
+				)
+
+				uid := uid()
+				jobid := "{tr}-{uid}"
+				
+				jobdir := concat(ddir, "/jobs/", substring(uid, from=0, to=1), "/{jobid}/")
+
+				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+
+				statusMode := configProperty("status.mode",host=rhost)
+				wrapperMode := configProperty("wrapper.parameter.mode",host=rhost)
+
+				wrapfile := "{jobdir}/_paramfile"
+
+				stdout := try(getFieldValue(stdout), "stdout.txt")
+				stderr := try(getFieldValue(stderr), "stderr.txt")
+				fileDirs := fileDirs(stagein, stageout)
+				os := vdl:siteprofile(rhost, "SYSINFO:OS")
+				
+				scratch := vdl:siteprofile(rhost, "scratch")
+
+				if(wrapperMode == "files"
+					sequential(
+						sys:file:write(wrapfile,
+							"-e ",vdl:executable(tr, rhost), nl(),
+							"-out ", stdout, nl(),
+							"-err ", stderr, nl(),
+							"-i ", maybe(getFieldValue(stdin)), nl(),
+							"-d ", flatten(each(fileDirs)), nl(),
+							"-if ", flatten(infiles(stagein)), nl(),
+							"-of ", flatten(outfiles(stageout)), nl(),
+							"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+							"-sk", SITEDIR_KEEP,
+							"-cdmfile ", cdm:file(), nl(),
+							"-status ", statusMode, nl(),
+							for(a, arguments, "-a ", a, nl())
+						)
+					)
+				)
+
+				vdl:setprogress("Stage in")
+
+				try(
+					sequential(
+
+						if(wrapperMode == "files"
+							stageWrapperParams(jobid, wrapfile, wfdir, rhost)
+						)
+
+						log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " host={rhost}")
+
+						vdl:setprogress("Submitting")
+
+						if(wrapperMode == "files"
+							vdl:execute(
+								vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
+								list(
+									vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
+									"_swiftwrap.wrapperstaging",
+									jobid,
+									"-urlprefix", URL_PREFIX,
+									"-jobdir", jobdir,
+									"-scratch", scratch, 
+									"-p", wrapfile
+								)
+								directory=wfdir
+								redirect=false
+								host=rhost
+								vdl:tcprofile(rhost, maybe(attributes=attributes), tr=tr) //this gets various app params from the tc, such as environment, walltime, etc
+								replicationGroup=replicationGroup
+								replicationChannel=replicationChannel
+								jobid=jobid
+							)
+						)
+						if(wrapperMode == "args"
+							vdl:execute(
+								vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
+								list(
+									vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
+									"_swiftwrap.wrapperstaging",
+									jobid,
+									"-urlprefix", URL_PREFIX,
+									"-jobdir", jobdir,
+									"-scratch", scratch,
+									"-e", vdl:executable(tr, rhost),
+									"-out", stdout,
+									"-err", stderr,
+									"-i", maybe(getFieldValue(stdin)),
+									"-d", flatten(each(fileDirs)),
+									"-if", flatten(infiles(stagein)),
+									"-of", flatten(outfiles(stageout)),
+									"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
+									"-sk", SITEDIR_KEEP,
+									"-cdmfile", cdm:file(),
+									"-status", statusMode,
+									"-a", maybe(each(arguments))
+								)
+								directory=wfdir
+								redirect=false
+								host=rhost
+								vdl:tcprofile(rhost, maybe(attributes=attributes), tr=tr)
+								replicationGroup=replicationGroup
+								replicationChannel=replicationChannel
+								jobid=jobid
+							)
+						)
+
+						vdl:setprogress("Checking status")
+						if(statusMode == "files"
+							checkJobStatus(rhost, wfdir, jobid, tr)
+						)
+
+						if(wrapperMode == "files"
+							file:remove(wrapfile)
+						)
+
+						log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+
+
+						/* need to stage the files to upper scratch area in case they are not transfered to another site
+						   before all the files get cleaned out */
+
+
+						vdl:setprogress("Stage out")
+						doRestartlog(restartout)
+						
+						log(LOG:DEBUG, "JOB_END jobid={jobid}")
+					)
+					catch("^Abort$"
+						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+						throw(exception)
+					)
+					catch("^(?!Abort$).*"
+						vdl:setprogress("Failed but can retry")
+						prev := exception
+						exception := try(exception(checkErrorFile(rhost, wfdir, jobid)), prev)
+						
+						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+
+						if(matches(exception,".*executable bit.*")
+							generateError(exception)
+						)
+						
+						outs := readStandardFiles(jobdir, stdout, stderr)
+
+						throw(
+							exception(
+								concat(
+									"Exception in {tr}:", nl(),
+									maybe("Arguments: ", arguments, nl()),
+									"Host: {rhost}", nl(),
+									"Directory: {scratch}/{jobid}",
+									"{outs}", nl(),
+									"----", nl()
+								)
+								exception
+							)
+						)
+					)
+				)
+			)
+		)
+
+		element(generateProvenanceGraph, [gdata]
+			pgraph := vdl:configProperty("pgraph")
+			gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
+			file:write(gname
+				"digraph SwiftProvenance {{", nl()
+				"	graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
+				"	node [", vdl:configProperty("pgraph.node.options"), "];", nl()
+
+				for(i, gdata
+					"	", i, nl()
+				)
+				"}", nl()
+			)
+			log(LOG:INFO, "Provenance graph saved in ", gname)
+		)
+	)
+)
+
+// Local variables:
+// mode: scheme
+// tab-width: 4
+// indent-tabs-mode: t
+// End:

Modified: trunk/libexec/vdl.k
===================================================================
--- trunk/libexec/vdl.k	2012-05-25 09:56:26 UTC (rev 5796)
+++ trunk/libexec/vdl.k	2012-05-26 23:14:20 UTC (rev 5797)
@@ -11,7 +11,12 @@
 	import("vdl-lib.xml", export = true)
 
 	pstaging := configProperty("use.provider.staging")
-	int := if (pstaging == "true", "vdl-int-staging.k", "vdl-int.k")
+	wstaging := configProperty("use.wrapper.staging")
+	int := if (
+		pstaging == "true", "vdl-int-staging.k",
+		wstaging == "true", "vdl-int-wrapper-staging.k", 
+		"vdl-int.k"
+	)
 
 	import(int)
 	import("java.k")




More information about the Swift-commit mailing list