[Swift-commit] r3203 - trunk/libexec

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Thu Jan 21 15:32:32 CST 2010


Author: hategan
Date: 2010-01-21 15:32:32 -0600 (Thu, 21 Jan 2010)
New Revision: 3203

Added:
   trunk/libexec/_swiftwrap.staging
   trunk/libexec/vdl-int-staging.k
Modified:
   trunk/libexec/vdl.k
Log:
added provider staging specific files

Added: trunk/libexec/_swiftwrap.staging
===================================================================
--- trunk/libexec/_swiftwrap.staging	                        (rev 0)
+++ trunk/libexec/_swiftwrap.staging	2010-01-21 21:32:32 UTC (rev 3203)
@@ -0,0 +1,284 @@
+# this script must be invoked inside of bash, not plain sh
+
+infosection() {
+	echo >& "$INFO"
+	echo "_____________________________________________________________________________" >& "$INFO"
+	echo >& "$INFO"
+	echo "        $1" >& "$INFO" 
+	echo "_____________________________________________________________________________" >& "$INFO"
+	echo >& "$INFO"
+}
+
+info() {
+	infosection "uname -a"
+	uname -a 2>&1 >& "$INFO"
+	infosection "id"
+	id 2>&1 >& "$INFO"
+	infosection "env"
+	env 2>&1 >& "$INFO"
+	infosection "df"
+	df 2>&1 >& "$INFO"
+        if [ -e "/proc/cpuinfo" ]; then
+		infosection "/proc/cpuinfo"
+		cat /proc/cpuinfo 2>&1 >& "$INFO"
+	fi
+	if [ -e "/proc/meminfo" ]; then
+		infosection "/proc/meminfo"
+		cat /proc/meminfo 2>&1 >& "$INFO"
+	fi
+	infosection "command line"
+	echo $COMMANDLINE 2>&1 >& "$INFO"
+	if [ -f "$STDOUT" ] ; then
+		infosection "stdout"
+		cat $STDOUT >& "$INFO"
+	fi
+	if [ -f "$STDERR" ] ; then
+		infosection "stderr"
+		cat $STDERR >& "$INFO"
+	fi
+}
+
+logstate() {
+	echo "Progress " `date +"%Y-%m-%d %H:%M:%S.%N%z"` " $@" >& "$INFO"
+}
+
+log() {
+	echo "$@" >& "$INFO"
+}
+
+fail() {
+	EC=$1
+	shift
+	if [ "$STATUSMODE" = "files" ]; then
+		echo $@ >"$WFDIR/status/$JOBDIR/${ID}-error"
+	fi
+	log $@
+	info
+	if [ "$STATUSMODE" = "files" ]; then
+		exit 0
+	else
+		exit $EC
+	fi
+}
+
+checkError() {
+	if [ "$?" != "0" ]; then
+		fail $@
+	fi
+}
+
+checkEmpty() {
+	if [ "$1" == "" ]; then
+		shift
+		fail 254 $@
+	fi
+}
+
+checkparamfile() {
+	log "checking for paramfile"
+	if [ "$1" == "-p" ]; then
+		JOBDIR=$2
+		PARAMFILE=${WFDIR}/parameters/${JOBDIR}/param-${ID}
+	fi
+	log "paramfile is: $PARAMFILE"
+}
+
+getarg() {
+	NAME=$1
+	shift
+	VALUE=""
+	SHIFTCOUNT=0
+	if [ "$PARAMFILE" == "" ] && [ "$1" == "$NAME" ]; then
+		shift
+		let "SHIFTCOUNT=$SHIFTCOUNT+1"
+		while [ "${1:0:1}" != "-" ] && [ "$#" != "0" ]; do
+			VALUE="$VALUE $1"
+			shift
+			let "SHIFTCOUNT=$SHIFTCOUNT+1"
+		done
+		VALUE="${VALUE:1}"
+	elif [ "$PARAMFILE" != "" ] && grep -E "^$NAME " $PARAMFILE ; then
+			VALUE=$(grep -E "^$NAME " $PARAMFILE | cut -d ' ' -f 2-)
+	else
+		fail 254 "Missing $NAME argument"
+	fi
+}
+
+openinfo() {
+	exec 3<> $1
+	INFO=3
+}
+
+closeinfo() {
+	exec 3>&-
+}
+
+COMMANDLINE=$@
+
+# make the WFDIR absolute
+PARAMFILE=
+
+openinfo "wrapper.log"
+
+checkparamfile "$@"
+
+if [ "X$INFODIR" == "X" ]; then
+	INFODIR="."
+fi
+
+logstate "LOG_START"
+infosection "Wrapper"
+
+getarg "-e" "$@"
+EXEC=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-out" "$@"
+STDOUT=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-err" "$@"
+STDERR=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-i" "$@"
+STDIN=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-d" "$@"
+DIRS=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-if" "$@"
+INF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-of" "$@"
+OUTF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-k" "$@"
+KICKSTART=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-status" "$@"
+STATUSMODE=$VALUE
+shift $SHIFTCOUNT
+
+declare -a CMDARGS
+if [ "$PARAMFILE" == "" ] && [ "$1" == "-a" ] ; then
+	shift
+	CMDARGS=("$@")
+elif [ "$PARAMFILE" != "" ] ; then
+	CMDARGS=()
+	FIRST=1
+	while read line ; do
+		if [ "$FIRST" == "1" ] ; then
+			CMDARGS=("$line")
+			FIRST=0
+		else
+			CMDARGS=("${CMDARGS[*]}" "$line")
+		fi
+	done < <(grep -E "^-a " $PARAMFILE | cut -d " " -f 2-)
+else
+	fail 254 "Missing arguments (-a option)"
+fi
+
+PATH=$PATH:/bin:/usr/bin
+
+if [ "$PATHPREFIX" != "" ]; then
+	export PATH=$PATHPREFIX:$PATH
+fi
+
+if [ "$SWIFT_EXTRA_INFO" != "" ]; then
+	log "EXTRAINFO=$($SWIFT_EXTRA_INFO)"
+fi
+
+if [ "X${EXEC:0:1}" != "X/" ] ; then
+	export ORIGEXEC=$EXEC
+	export EXEC=$(which $EXEC)
+	if [ "X$EXEC" = "X" ] ; then
+		fail 254 "Cannot find executable $ORIGEXEC on site system path"
+	fi
+fi
+
+log "EXEC=$EXEC"
+log "STDIN=$STDIN"
+log "STDOUT=$STDOUT"
+log "STDERR=$STDERR"
+log "DIRS=$DIRS"
+log "INF=$INF"
+log "OUTF=$OUTF"
+log "KICKSTART=$KICKSTART"
+log "ARGS=$@"
+log "ARGC=$#"
+
+IFS="|"
+
+logstate "CREATE_INPUTDIR"
+for D in $DIRS ; do
+	mkdir -p "$D" 2>&1 >>"$INFO"
+	checkError 254 "Failed to create input directory $D"
+	log "Created output directory: $D"
+done
+
+logstate "EXECUTE"
+
+#ls >>$WRAPPERLOG
+if [ ! -f "$EXEC" ]; then
+	fail 254 "The executable $EXEC does not exist"
+fi
+if [ ! -x "$EXEC" ]; then
+	fail 254 "The executable $EXEC does not have the executable bit set"
+fi
+
+if [ "$STDIN" == "" ]; then
+	if [ "$SWIFT_GEN_SCRIPTS" != "" ]; then
+		echo "#!/bin/bash" > run.sh
+		echo "\"$EXEC\" \"${CMDARGS[@]}\" 1>\"$STDOUT\" 2>\"$STDERR\"" >> run.sh
+		chmod +x run.sh
+	fi
+	"$EXEC" "${CMDARGS[@]}" 1>"$STDOUT" 2>"$STDERR"
+else
+	if [ "$SWIFT_GEN_SCRIPTS" != "" ]; then
+		echo "#!/bin/bash" > run.sh
+		echo "\"$EXEC\" \"${CMDARGS[@]}\" 1>\"$STDOUT\" 2>\"$STDERR\" <\"$STDIN\"" >> run.sh
+		chmod +x run.sh
+	fi
+	"$EXEC" "${CMDARGS[@]}" 1>"$STDOUT" 2>"$STDERR" <"$STDIN"
+fi
+checkError $? "Exit code $?"
+
+if [ ! -s "$STDOUT" ]; then
+	log "Removing empty stdout"
+	rm -f $STDOUT
+fi
+if [ ! -s "$STDERR" ]; then
+	log "Removing empty stderr"
+	rm -f $STDERR
+fi
+
+logstate "EXECUTE_DONE"
+log "Job ran successfully"
+
+MISSING=
+for O in $OUTF ; do
+	if [ ! -f "$O" ]; then
+		if [ "$MISSING" == "" ]; then
+			MISSING=$O
+		else
+			MISSING="$MISSING, $O"
+		fi
+	fi
+done
+if [ "$MISSING" != "" ]; then
+	fail 254 "The following output files were not created by the application: $MISSING"
+fi
+
+logstate "END"
+
+closeinfo
+
+# ensure we exit with a 0 after a successful execution
+exit 0
+


Property changes on: trunk/libexec/_swiftwrap.staging
___________________________________________________________________
Name: svn:executable
   + *

Added: trunk/libexec/vdl-int-staging.k
===================================================================
--- trunk/libexec/vdl-int-staging.k	                        (rev 0)
+++ trunk/libexec/vdl-int-staging.k	2010-01-21 21:32:32 UTC (rev 3203)
@@ -0,0 +1,292 @@
+import("sys.k")
+import("task.k")
+import("vdl-lib.xml")
+/*
+ * Things that are not exposed to the translated file
+ */
+
+global(LOG:DEBUG, "debug")
+global(LOG:INFO, "info")
+global(LOG:WARN, "warn")
+global(LOG:ERROR, "error")
+global(LOG:FATAL, "fatal")
+
+namespace("vdl"
+	export(
+		element(isDone, [stageout]
+			sys:and(
+				for(pv, stageout
+					[path, var] := each(pv)
+					vdl:isLogged(var, path)
+				)
+				sys:not(isEmpty(stageout))
+			)
+		)
+			
+		element(mark, [restarts, err, optional(mapping)]
+			if(
+				err for(pv, restarts
+					[path, var] := each(pv)
+					vdl:setFutureFault(var, path=path, mapping=mapping)
+				)
+			)
+		)
+		
+		element(flatten, [...]
+			if (
+				isEmpty(...) ""
+				
+				concat(
+					for(i, butLast(...), if(isList(i) flatten(i) i), "|") last(...)
+				)
+			)
+		)		
+				
+		element(initDDir, []
+			ddir := "{VDL:SCRIPTNAME}-{VDL:RUNID}.d"
+			once(ddir
+				if(sys:not(file:exists(ddir))
+					task:dir:make(ddir)
+				)
+			)
+			ddir
+		)
+		
+		element(inFileDirs, [stageins]
+			for(file, stageins
+		 		reldirname(file)
+			)
+		)
+		
+		element(outFileDirs, [stageouts] 
+			for(pv, stageouts
+				[path, var] := each(pv)
+			
+				file := vdl:filename(vdl:getfield(var, path = path))
+				
+				dirname(file)
+			)
+		)
+		
+		element(inFiles, [stageins]
+			pathnames(stageins)
+		)
+		
+		
+		element(outFiles, [stageouts]
+			for(pv, stageouts
+				[path, var] := each(pv)
+			
+				file := vdl:filename(vdl:getfield(var, path = path))
+				
+				file
+			)
+		)
+		
+		element(fileDirs, [stageins, stageouts]
+			list(
+				unique(
+					inFileDirs(stageins)
+					outFileDirs(stageouts)
+				)
+			)
+		)
+														
+		element(appStageins, [jobid, files, dir]
+			for(file, files
+				provider := provider(file)
+				srchost := hostname(file)
+				srcdir := dirname(file)
+				destdir := dircat(dir, reldirname(file))
+				filename := basename(file)
+				
+				stageIn(
+					"{provider}://{srchost}/{srcdir}/{filename}", 
+					"{destdir}/{filename}"
+				)
+			)
+		)
+		
+		element(appStageouts, [jobid, stageouts, dir]
+			for(pv, stageouts
+				[path, var] := each(pv)
+				file := vdl:absfilename(vdl:getfield(var, path = path))
+				provider := vdl:provider(file)
+				dhost := vdl:hostname(file)
+				rdir := dircat(dir, reldirname(file))
+				bname := basename(file)
+				ldir := dirname(file)
+				fullLocal := dircat(ldir, bname)
+				fullRemote := dircat(rdir, bname)
+				
+				stageOut(
+					"{rdir}/{bname}",
+				    "{provider}://{dhost}/{ldir}/{bname}"
+				)									
+			)
+		)
+
+		element(doRestartlog, [restartouts]
+			uParallelFor(f, restartouts,
+				[path, var] := each(f)
+				vdl:logvar(var, path)
+			)
+		)
+		
+		element(graphStuff, [tr, stagein, stageout, err, optional(args)]
+			if(
+				vdl:configProperty("pgraph") != "false" then(
+					errprops := if(err ",color=lightsalmon" ",color=lightsteelblue1")
+					tp := vdl:threadPrefix()
+					to(graph, 
+						concat(str:quote(tp), " [label=", str:quote(tr), "{errprops}]")
+					)
+					for(si, stagein
+						si := basename(si)
+						to(graph
+							concat(str:quote(si), " [shape=parallelogram]")
+							concat(str:quote(si), " -> ", str:quote(tp))
+						)
+					)
+					for(pv, stageout
+						[path, var] := each(pv)
+						file := vdl:fileName(vdl:getfield(var, path=path))
+						file := basename(file)
+						label := vdl:niceName(var, path = path)
+						to(graph
+							concat(str:quote(file), " [shape=parallelogram,label=", 
+								str:quote(label), "]")
+							concat(str:quote(tp), " -> ", str:quote(file))
+						)
+					)
+				)
+			)
+		)
+		
+		element(fileSizes, [files]
+			math:sum(
+				for(f, files, file:size(file))
+			)
+		)
+		
+		element(cleanups, [cleanup]
+			log(LOG:INFO, "START cleanups={cleanup}")
+		)
+
+		element(execute2, [tr, optional(arguments, stdin, stdout, stderr), stagein, stageout,  restartout,
+			replicationGroup, replicationChannel]
+			stagein := list(unique(each(stagein)))
+			stageout := list(unique(each(stageout)))
+			allocateHost(rhost, constraints=vdl:jobConstraints(tr, stagein=stagein)
+				
+				ddir := initDDir()
+				
+				uid := uid()
+				jobdir := substring(uid, from=0, to=1)
+				jobid := concat(tr, "-", uid)
+				
+				fileDirs := fileDirs(stagein, stageout)
+				
+				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+
+				wrapper := "_swiftwrap.staging"
+				wrapfile := "{ddir}/param-{jobid}"
+
+				stdout := try(stdout, "stdout.txt")
+				stderr := try(stderr, "stderr.txt")
+
+				vdl:setprogress("Stage in")
+				wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+				tmpdir := dircat(concat(wfdir, "/jobs/", jobdir), jobid)
+				
+				try(
+					sequential(
+						log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " tmpdir={tmpdir} host={rhost}")
+				
+						vdl:setprogress("Submitting")
+
+						vdl:execute(
+							vdl:siteprofile(rhost, "swift:wrapperInterpreter"),
+							list(
+								vdl:siteprofile(rhost, "swift:wrapperInterpreterOptions"),
+								wrapper,
+								"-e", vdl:executable(tr, rhost), 
+								"-out", stdout,
+								"-err", stderr, 
+								"-i", maybe(stdin),
+								"-d", flatten(each(fileDirs)),
+								"-if", flatten(infiles(stagein)), 
+								"-of", flatten(outfiles(stageout)),
+								"-k",
+								"-status", "provider"
+								"-a", maybe(each(arguments))
+							)
+							directory = "{wfdir}-{jobdir}-{jobid}"
+							redirect = false
+							host = rhost
+							vdl:tcprofile(rhost, tr = tr) //this gets various app params from the tc, such as environment, walltime, etc
+							replicationGroup = replicationGroup
+							replicationChannel = replicationChannel
+							jobid = jobid
+							
+							/*stageIn("copy://localhost/{swift.home}/libexec/{wrapper}", wrapper)
+							appStageins(jobid, stagein, ".")
+							stageOut("wrapper.log", "copy://localhost/{ddir}/{jobid}.info")
+							stageOut("{stdout}", "copy://localhost/{ddir}/{stdout}")
+							stageOut("{stderr}", "copy://localhost/{ddir}/{stderr}")*/
+							
+							stageIn("file://localhost/{swift.home}/libexec/{wrapper}", wrapper)
+							appStageins(jobid, stagein, ".")
+							stageOut("wrapper.log", "file://localhost/{ddir}/{jobid}.info")
+							stageOut("{stdout}", "file://localhost/{ddir}/{stdout}")
+							stageOut("{stderr}", "file://localhost/{ddir}/{stderr}")
+							
+							task:cleanUp("") //the whole job directory
+							appStageouts(jobid, stageout, ".")
+						)
+						doRestartlog(restartout)
+						log(LOG:DEBUG, "JOB_END jobid={jobid}")
+					)
+					catch("^Abort$"
+						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+						throw(exception)
+					)
+					catch("^(?!Abort$).*"
+						vdl:setprogress("Failed but can retry")
+						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+						
+						throw(
+							exception(
+								concat(
+									"Exception in {tr}:", nl(),
+									maybe("Arguments: {arguments}", nl()),
+									"Host: {rhost}", nl(),
+									"Directory: {tmpdir}",
+									"TODO: outs", nl(),
+									"----", nl()
+								)
+								exception
+							)
+						)
+					)
+				)
+			)
+		)
+	
+		element(generateProvenanceGraph, [gdata]
+			pgraph := vdl:configProperty("pgraph")
+			gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
+			file:write(gname
+				"digraph SwiftProvenance {{", nl()
+				"	graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
+				"	node [", vdl:configProperty("pgraph.node.options"), "];", nl()
+						
+				for(i, gdata
+					"	", i, nl()
+				)
+				"}", nl()
+			)
+			log(LOG:INFO, "Provenance graph saved in ", gname)
+		)
+	)
+)

Modified: trunk/libexec/vdl.k
===================================================================
--- trunk/libexec/vdl.k	2010-01-06 03:36:22 UTC (rev 3202)
+++ trunk/libexec/vdl.k	2010-01-21 21:32:32 UTC (rev 3203)
@@ -9,8 +9,11 @@
 
 	import("vdl-sc.k", export = true)
 	import("vdl-lib.xml", export = true)
+	
+	pstaging := configProperty("use.provider.staging")
+	int := if (pstaging == "true", "vdl-int-staging.k", "vdl-int.k")
 
-	import("vdl-int.k")
+	import(int)
 	import("java.k")
 
 	once("vdl.k-print-version"




More information about the Swift-commit mailing list