[Swift-commit] r2783 - in SwiftApps/LigandAtlas: . swift
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Mon Mar 30 11:48:40 CDT 2009
Author: zzhang
Date: 2009-03-30 11:48:40 -0500 (Mon, 30 Mar 2009)
New Revision: 2783
Added:
SwiftApps/LigandAtlas/swift/
SwiftApps/LigandAtlas/swift/run_swift.sh
SwiftApps/LigandAtlas/swift/vdl-int.k
SwiftApps/LigandAtlas/swift/wrapper.sh
Log:
changes Zhao has done to swift to make MTIO working
Added: SwiftApps/LigandAtlas/swift/run_swift.sh
===================================================================
--- SwiftApps/LigandAtlas/swift/run_swift.sh (rev 0)
+++ SwiftApps/LigandAtlas/swift/run_swift.sh 2009-03-30 16:48:40 UTC (rev 2783)
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+ if [ -z "$4" ]; then
+ echo "usage: $0 <FALKON_ID> <NUM_NODES> <swift_script> <working_dir>"
+ exit 1
+ fi
+
+CUR_DIR=`pwd`
+
+
+SERVICE_LIST_FILE=/home/falkon/users/${USER}/${1}/config/Client-service-URIs.config
+
+
+NUM_NODES=$2
+SWIFT_SCRIPT=$3
+WORKING=$4
+let NUM_ION=NUM_NODES/64
+
+echo "waiting for at least ${NUM_NODES} nodes to register before submitting workload..."
+falkon-wait-for-allocation.sh ${SERVICE_LIST_FILE} ${NUM_ION}
+
+echo "found at least ${NUM_NODES} registered, submitting workload..."
+
+rm -rf sites.xml
+
+echo "<?xml version=\"1.0\" encoding=\"UTF-8\"?>
+<config xmlns=\"http://www.griphyn.org/chimera/GVDS-PoolConfig\"
+xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" version=\"1.5\">" > sites.xml
+
+cat $SERVICE_LIST_FILE | awk -v var=$WORKING 'NR>1 {printf "<pool handle=\"bgp%.3d\">\n\t<gridftp url=\"local://localhost\"/>\n\t<execution provider=\"deef\" url=\"http://%s:50001/wsrf/services/GenericPortal/core/WS/GPFactoryService\"/>\n\t<workdirectory>%s</workdirectory>\n\t<profile namespace=\"karajan\" key=\"jobThrottle\">8</profile>\n\t<profile namespace=\"karajan\" key=\"initialScore\">1000</profile>\n</pool>\n\n", NR-2, $(1), var}' >> sites.xml
+
+echo "</config>" >> sites.xml
+
+time swift -sites.file ./sites.xml -tc.file ./tc.data $3
Property changes on: SwiftApps/LigandAtlas/swift/run_swift.sh
___________________________________________________________________
Name: svn:executable
+ *
Added: SwiftApps/LigandAtlas/swift/vdl-int.k
===================================================================
--- SwiftApps/LigandAtlas/swift/vdl-int.k (rev 0)
+++ SwiftApps/LigandAtlas/swift/vdl-int.k 2009-03-30 16:48:40 UTC (rev 2783)
@@ -0,0 +1,514 @@
+import("sys.k")
+import("task.k")
+import("vdl-lib.xml")
+/*
+ * Things that are not exposed to the translated file
+ */
+
+global(LOG:DEBUG, "debug")
+global(LOG:INFO, "info")
+global(LOG:WARN, "warn")
+global(LOG:ERROR, "error")
+global(LOG:FATAL, "fatal")
+
+namespace("vdl"
+ export(
+ element(rmdir, [dir, host]
+ parallelFor(entry, file:list(dir, host=host)
+ epath := concat(dir, "/", entry)
+ if(
+ file:isDirectory(epath, host=host) rmdir(epath, host)
+ file:remove(epath, host=host)
+ )
+ )
+ dir:remove(dir, host=host)
+ )
+
+ element(createdirs, [path, dir, host]
+ dc := dircat(dir, path)
+ log(LOG:INFO, "START path={path} dir={dir} - Creating directory structure")
+
+ dir:make(dc, host=host)
+ )
+
+ element(isDone, [stageout]
+ sys:and(
+ for(pv, stageout
+ [path, var] := each(pv)
+ vdl:isLogged(var, path)
+ )
+ sys:not(isEmpty(stageout))
+ )
+ )
+
+ element(mark, [restarts, err, optional(mapping)]
+ if(
+ err for(pv, restarts
+ [path, var] := each(pv)
+ vdl:setFutureFault(var, path=path, mapping=mapping)
+ )
+ )
+ )
+
+ element(flatten, [...]
+ if (
+ isEmpty(...) ""
+
+ concat(
+ for(i, butLast(...), if(isList(i) flatten(i) i), "^") last(...)
+ )
+ )
+ )
+
+ element(checkJobStatus, [rhost, wfdir, jobid, tr, jobdir]
+ log(LOG:DEBUG, "START jobid={jobid}")
+ try(
+ sequential(
+ /*
+ * This is a bit of optimization, but I'm not completely
+ * sure of its correctness. The goal is to both detect
+ * the presence of the success file and remove it, all
+ * in one operation. It relies on file:remove() throwing
+ * an exception if the file is not there.
+ */
+ file:remove("{wfdir}/status/{jobdir}/{jobid}-success", host=rhost)
+ log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
+ )
+ if(
+ file:exists("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost) then(
+ log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
+ task:transfer(srchost=rhost, srcdir="{wfdir}/status/{jobdir}", srcfile="{jobid}-error")
+ error := parallel(
+ file:remove("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)
+ sequential(
+ str:strip(file:read("{jobid}-error"))
+ file:remove("{jobid}-error")
+ )
+ )
+ throw(error)
+ )
+ else (
+ log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
+ throw("No status file was found. Check the shared filesystem on {rhost}")
+ )
+ )
+ )
+ )
+
+ element(initSharedDir, [rhost]
+ once(list(rhost, "shared")
+ vdl:setprogress("Initializing site shared directory")
+
+ log(LOG:INFO, "START host={rhost} - Initializing shared directory")
+
+ wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+ sharedDir := dircat(wfdir, "shared")
+ dir:make(sharedDir, host=rhost)
+ transfer(srcdir="{vds.home}/libexec/", srcfile="wrapper.sh", destdir=sharedDir, desthost=rhost)
+ transfer(srcdir="{vds.home}/libexec/", srcfile="seq.sh", destdir=sharedDir, desthost=rhost)
+ dir:make(dircat(wfdir, "kickstart"), host=rhost)
+ dir:make(dircat(wfdir, "status"), host=rhost)
+ dir:make(dircat(wfdir, "info"), host=rhost)
+ /*l := list(for(i, sys:range(97, 122), chr(i)), for(i, sys:range(48, 57), chr(i)))
+ for(one, l,
+ for(two, l,
+ dir:make(dircat("{wfdir}/shared/" , str:concat(one , "/" , two)), host=rhost)
+ )
+ )
+ for(one, l,
+ for(two, l,
+ dir:make(dircat("{wfdir}/info/" , str:concat(one , "/" , two)), host=rhost)
+ )
+ )*/
+ wfdir, sharedDir
+ //we send the cleanup data to vdl:main()
+ to(cleanup, list(wfdir, rhost))
+ log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
+ )
+ )
+
+ element(inFileDirs, [stageins]
+ for(file, stageins
+ reldirname(file)
+ )
+ )
+
+ element(outFileDirs, [stageouts]
+ for(pv, stageouts
+ [path, var] := each(pv)
+
+ file := vdl:filename(vdl:getfield(var, path = path))
+
+ dirname(file)
+ )
+ )
+
+ element(inFiles, [stageins]
+ pathnames(stageins)
+ )
+
+ element(outFiles, [stageouts]
+ for(pv, stageouts
+ [path, var] := each(pv)
+
+ file := vdl:filename(vdl:getfield(var, path = path))
+
+ file
+ )
+ )
+
+ element(fileDirs, [stageins, stageouts]
+ list(
+ unique(
+ inFileDirs(stageins)
+ outFileDirs(stageouts)
+ )
+ )
+ )
+
+ element(createDirSet, [jobid, destdir, host, dirs]
+ /*
+ * Ideally this would be done by creating a tree of the directories
+ * to be created and (eventually) exploiting the concurrency in that.
+ */
+ log(LOG:INFO, "START jobid={jobid} host={host} - Initializing directory structure")
+ for(u, dirs
+ cacheOn(list(u, destdir, host)
+ createdirs(u, destdir, host)
+ )
+ )
+ log(LOG:INFO, "END jobid={jobid} - Done initializing directory structure")
+ )
+
+ element(cleanup, [dir, host]
+ log(LOG:INFO, "START dir={dir} host={host}")
+ if(vdl:configProperty("sitedir.keep") == "false"
+ task:execute("/bin/rm", arguments="-rf {dir}", host=host, batch=true)
+ )
+ log(LOG:INFO, "END dir={dir} host={host}")
+ )
+
+ element(cleanups, [cleanup]
+ log(LOG:INFO, "START cleanups={cleanup}")
+ parallelFor(i, cleanup
+ [dir, host] := each(i)
+ try(
+ cleanup(dir, host)
+ catch(".*",
+ log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
+ to(warnings, exception("Cleanup on {host} failed", exception))
+ )
+ )
+ )
+ )
+
+ element(cleanupFiles, [files, host]
+ uParallelFor(r, files
+ log(LOG:INFO, "Purging ", r, " on ", host)
+ file:remove(r, host=host)
+ vdl:cacheFileRemoved(r, host)
+ )
+ )
+
+ element(doStagein, [jobid, files, dir, host]
+ log(LOG:INFO, "START jobid={jobid} - Staging in files")
+ uParallelFor(file, files
+ provider := provider(file)
+ srchost := hostname(file)
+ srcdir := dirname(file)
+ destdir := dircat(dir, reldirname(file))
+ filename := basename(file)
+ size := file:size("{srcdir}/{filename}", host=srchost, provider=provider)
+
+ vdl:cacheAddAndLockFile(filename, destdir, host, size
+ cleanupFiles(cacheFilesToRemove, host)
+ log(LOG:DEBUG, "FILE_STAGE_IN_START file={file} srchost={srchost} srcdir={srcdir} srcname={filename} ",
+ "desthost={host} destdir={destdir} provider={provider}")
+ restartOnError(".*", 2
+ task:transfer(srcprovider=provider, srchost=srchost, srcfile=filename,
+ srcdir=srcdir, desthost=host, destdir=destdir)
+ )
+ log(LOG:DEBUG, "FILE_STAGE_IN_END file={file} srchost={srchost} srcdir={srcdir} srcname={filename} ",
+ "desthost={host} destdir={destdir} provider={provider}")
+ )
+ )
+ log(LOG:INFO, "END jobid={jobid} - Staging in finished")
+ )
+
+ element(doStageout, [jobid, stageouts, dir, host]
+ log(LOG:INFO, "START jobid={jobid} - Staging out files")
+ done := list(
+ uparallelFor(pv, stageouts
+ [path, var] := each(pv)
+ file := vdl:absfilename(vdl:getfield(var, path = path))
+ provider := vdl:provider(file)
+ dhost := vdl:hostname(file)
+ rdir := dircat(dir, reldirname(file))
+ bname := basename(file)
+ ldir := dirname(file)
+ fullLocal := dircat(ldir, bname)
+ fullRemote := dircat(rdir, bname)
+
+ log(LOG:DEBUG, "FILE_STAGE_OUT_START srcname={bname} srcdir={rdir} srchost={host} ",
+ "destdir={ldir} desthost={dhost} provider={provider}")
+ //make sure we do have the directory on the client side
+ /*dir:make(ldir)
+ restartOnError(".*", 2
+ task:transfer(srchost=host, srcfile=bname,
+ srcdir=rdir, destdir=ldir, desthost=dhost, destprovider=provider)
+ )*/
+ log(LOG:DEBUG, "FILE_STAGE_OUT_END srcname={bname} srcdir={rdir} srchost={host} ",
+ "destdir={ldir} desthost={dhost} provider={provider}")
+
+ list(bname, rdir, host, file:size(fullLocal))
+ )
+ )
+ uParallelFor(f, done
+ [bname, rdir, host, size] := each(f)
+ vdl:cacheAddFile(bname, rdir, host, size
+ cleanupFiles(cacheFilesToRemove, host)
+ )
+ )
+ log(LOG:INFO, "END jobid={jobid} - Staging out finished")
+ )
+
+ element(doRestartlog, [restartouts]
+ uParallelFor(f, restartouts,
+ [path, var] := each(f)
+ vdl:logvar(var, path)
+ )
+ )
+
+ element(graphStuff, [tr, stagein, stageout, err, optional(args)]
+ if(
+ vdl:configProperty("pgraph") != "false" then(
+ errprops := if(err ",color=lightsalmon" "")
+ tp := vdl:threadPrefix()
+ to(graph,
+ concat(str:quote(tp), " [label=", str:quote("{tr}{errprops}"), "]")
+ )
+ for(si, stagein
+ si := basename(si)
+ to(graph
+ concat(str:quote(si), " [shape=parallelogram{errprops}]")
+ concat(str:quote(si), " -> ", str:quote(tp))
+ )
+ )
+ for(pv, stageout
+ [path, var] := each(pv)
+ file := vdl:fileName(vdl:getfield(var, path=path))
+ file := basename(file)
+ label := vdl:niceName(var, path = path)
+ to(graph
+ concat(str:quote(file), " [shape=parallelogram,label=",
+ str:quote("{label}{errprops}"), "]")
+ concat(str:quote(tp), " -> ", str:quote(file))
+ )
+ )
+ )
+ )
+ )
+
+ element(fileSizes, [files]
+ math:sum(
+ for(f, files, file:size(file))
+ )
+ )
+
+ element(transferStandardFiles, [rhost, tmpdir, jobid, stdout, stderr]
+ concat(
+ for(f, list(list("stderr.txt", stderr), list("stdout.txt", stdout))
+ [name, file] := each(f)
+ destfile := "{jobid}-{file}"
+ nl()
+ "{name}: "
+ try(
+ sequential(
+ task:transfer(srchost=rhost, srcdir=tmpdir, srcfile=file,
+ destfile=destfile)
+
+ file:read(destfile)
+ )
+ nl()
+ )
+ maybe(file:remove(destfile))
+ )
+ )
+ )
+
+ element(transferKickstartRec, [rhost, wfdir, jobid, jobdir]
+ if(sys:not(file:exists("{VDL:SCRIPTNAME}-{VDL:RUNID}.d"))
+ task:dir:make("{VDL:SCRIPTNAME}-{VDL:RUNID}.d")
+ )
+ recfile := "{jobid}-kickstart.xml"
+ srcdir := dircat(concat(wfdir, "/kickstart/"), jobdir)
+ try(
+ task:transfer(srchost=rhost, srcdir=srcdir, srcfile=recfile, destdir="{VDL:SCRIPTNAME}-{VDL:RUNID}.d/")
+ (
+ maybe(file:remove(recfile))
+ log(LOG:WARN, "Failed to transfer kickstart records from {srcdir}/{rhost}", exception)
+ )
+ )
+ recfile
+ )
+
+ element(transferWrapperLog, [rhost, wfdir, jobid, jobdir]
+ if(sys:not(file:exists("{VDL:SCRIPTNAME}-{VDL:RUNID}.d"))
+ task:dir:make("{VDL:SCRIPTNAME}-{VDL:RUNID}.d")
+ )
+ recfile := "{jobid}-info"
+ srcdir := dircat(concat(wfdir, "/info/"), jobdir)
+ try(
+ task:transfer(srchost=rhost, srcdir=srcdir, srcfile=recfile, destdir="{VDL:SCRIPTNAME}-{VDL:RUNID}.d/")
+ (
+ maybe(file:remove(recfile))
+ log(LOG:WARN, "Failed to transfer wrapper log from {srcdir}/{rhost}")
+ log(LOG:DEBUG, "Exception for wrapper log failure from {srcdir}/{rhost}: ", exception)
+ )
+ )
+ recfile
+ )
+
+ element(execute2, [tr, optional(arguments, stdin, stdout, stderr), stagein, stageout, restartout,
+ replicationGroup, replicationChannel]
+ stagein := list(unique(each(stagein)))
+ stageout := list(unique(each(stageout)))
+ allocateHost(rhost, constraints=vdl:jobConstraints(tr)
+
+ [wfdir, sharedDir] := try(
+ initSharedDir(rhost)
+ throw(exception("Could not initialize shared directory on {rhost}", exception))
+ )
+
+ uid := uid()
+ jobdir := concat(substring(uid, from=0,to=1), "/", substring(uid, from=1, to=2))
+ jobid := concat(tr, "-", uid)
+
+ log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+ vdl:setprogress("Stage in")
+ tmpdir := dircat(concat(wfdir, "/jobs/", jobdir), jobid)
+
+ stdout := try(stdout, "stdout.txt")
+ stderr := try(stderr, "stderr.txt")
+
+ kickstart := vdl:kickstart(rhost)
+
+ try(
+ sequential(
+ fileDirs := fileDirs(stagein, stageout)
+
+ createDirSet(jobid, sharedDir, rhost, fileDirs)
+ doStagein(jobid, stagein, sharedDir, rhost)
+
+ log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " tmpdir={tmpdir} host={rhost}")
+
+ vdl:setprogress("Submitting")
+ vdl:execute("/bin/bash",
+ list("shared/wrapper.sh", jobid,
+ "-jobdir", jobdir,
+ "-e", vdl:executable(tr, rhost),
+ "-out", stdout,
+ "-err", stderr,
+ "-i", maybe(stdin),
+ "-d", flatten(each(fileDirs)),
+ "-if", flatten(infiles(stagein)),
+ "-of", flatten(outfiles(stageout)),
+ "-k", kickstart,
+ "-a", maybe(each(arguments)))
+ directory=wfdir
+ redirect=false
+ host=rhost
+ vdl:tcprofile(tr, rhost) //this gets various app params from the tc, such as environment, walltime, etc
+ replicationGroup=replicationGroup
+ replicationChannel=replicationChannel
+ jobid=jobid
+ )
+
+
+ log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+
+
+ /* need to stage the files to upper scratch area in case they are not transfered to another site
+ before all the files get cleaned out */
+
+
+ vdl:setprogress("Stage out")
+ doStageout(jobid, stageout, sharedDir, rhost)
+ doRestartlog(restartout)
+ if(
+ kickstart != "" & vdl:configProperty("kickstart.always.transfer") == "true"
+ discard(transferKickstartRec(rhost, wfdir, jobid, jobdir))
+ )
+ if(
+ vdl:configProperty("wrapperlog.always.transfer") == "true"
+ discard(transferWrapperLog(rhost, wfdir, jobid, jobdir))
+ )
+ vdl:cacheUnlockFiles(stagein, sharedDir, rhost, cleanupFiles(cacheFilesToRemove, rhost))
+ log(LOG:DEBUG, "JOB_END jobid={jobid}")
+ )
+ catch("^Abort$"
+ log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+ vdl:cacheUnlockFiles(stagein, sharedDir, rhost, force=false
+ cleanupFiles(cacheFilesToRemove, rhost)
+ )
+ throw(exception)
+ )
+ catch("^(?!Abort$).*"
+ vdl:setprogress("Failed but can retry")
+ log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+ vdl:cacheUnlockFiles(stagein, sharedDir, rhost, force=false
+ cleanupFiles(cacheFilesToRemove, rhost)
+ )
+
+ outs := transferStandardFiles(rhost, tmpdir, jobid, stdout, stderr)
+
+ discard(transferWrapperLog(rhost, wfdir, jobid, jobdir))
+ kickstartRec := if(
+ kickstart == "" ""
+ else(
+ try(
+ (
+ recfile := transferKickstartRec(rhost, wfdir, jobid, jobdir)
+ "KickstartRecord: {recfile}"
+ )
+ ""
+ )
+ )
+ )
+
+ throw(
+ exception(
+ concat(
+ "Exception in {tr}:", nl(),
+ maybe("Arguments: {arguments}", nl()),
+ "Host: {rhost}", nl(),
+ "Directory: {tmpdir}",
+ "{outs}", nl(),
+ "----", nl(),
+ kickstartRec
+ )
+ exception
+ )
+ )
+ )
+ )
+ )
+ )
+
+ element(generateProvenanceGraph, [gdata]
+ pgraph := vdl:configProperty("pgraph")
+ gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
+ file:write(gname
+ "digraph SwiftProvenance {{", nl()
+ " graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
+ " node [", vdl:configProperty("pgraph.node.options"), "];", nl()
+
+ for(i, gdata
+ " ", i, nl()
+ )
+ "}", nl()
+ )
+ log(LOG:INFO, "Provenance graph saved in ", gname)
+ )
+ )
+)
Added: SwiftApps/LigandAtlas/swift/wrapper.sh
===================================================================
--- SwiftApps/LigandAtlas/swift/wrapper.sh (rev 0)
+++ SwiftApps/LigandAtlas/swift/wrapper.sh 2009-03-30 16:48:40 UTC (rev 2783)
@@ -0,0 +1,288 @@
+# this script must be invoked inside of bash, not plain sh
+PATH=/fuse/bin:/fuse/usr/bin:$PATH
+infosection() {
+ echo >& "$INFO"
+ echo "_____________________________________________________________________________" >& "$INFO"
+ echo >& "$INFO"
+ echo " $1" >& "$INFO"
+ echo "_____________________________________________________________________________" >& "$INFO"
+ echo >& "$INFO"
+}
+
+info() {
+ infosection "uname -a"
+ uname -a 2>&1 >& "$INFO"
+ infosection "id"
+ id 2>&1 >& "$INFO"
+ infosection "env"
+ env 2>&1 >& "$INFO"
+ infosection "df"
+ df 2>&1 >& "$INFO"
+ infosection "/proc/cpuinfo"
+ cat /proc/cpuinfo 2>&1 >& "$INFO"
+ infosection "/proc/meminfo"
+ cat /proc/meminfo 2>&1 >& "$INFO"
+ infosection "command line"
+ echo $COMMANDLINE 2>&1 >& "$INFO"
+}
+
+logstate() {
+ echo "Progress " `date +"%Y-%m-%d %H:%M:%S"` " $@" >& "$INFO"
+}
+
+log() {
+ echo "$@" >& "$INFO"
+}
+
+fail() {
+ EC=$1
+ shift
+ log $@
+ info
+ exit $EC
+}
+
+checkError() {
+ if [ "$?" != "0" ]; then
+ fail $@
+ fi
+}
+
+checkEmpty() {
+ if [ "$1" == "" ]; then
+ shift
+ fail 254 $@
+ fi
+}
+
+getarg() {
+ NAME=$1
+ shift
+ VALUE=""
+ SHIFTCOUNT=0
+ if [ "$1" == "$NAME" ]; then
+ shift
+ let "SHIFTCOUNT=$SHIFTCOUNT+1"
+ while [ "${1:0:1}" != "-" ] && [ "$#" != "0" ]; do
+ VALUE="$VALUE $1"
+ shift
+ let "SHIFTCOUNT=$SHIFTCOUNT+1"
+ done
+ else
+ fail 254 "Missing $NAME argument"
+ fi
+ VALUE="${VALUE:1}"
+}
+
+openinfo() {
+ exec 3<> $1
+ INFO=3
+}
+
+closeinfo() {
+ exec 3>&-
+}
+
+cioinput() {
+ INPUT=$1
+ FILEPATH=`dirname $INPUT`
+ FILENAME=`basename $INPUT`
+ TYPE=${INPUT%%/*}
+ echo "INPUT_TYPE: $TYPE" >> /dev/shm/cio
+ if [ "$TYPE" == "common" ] && [ -e /dev/shm/share/$FILENAME ]; then
+ echo "cioinput(): link for common input $INPUT" >> /dev/shm/cio
+ ln -s "/dev/shm/share/$FILENAME" "$DIR/$L"
+ elif [ "$TYPE" == "_concurrent" ]; then
+ echo "cioinput(): toruscp for intermediate data $INPUT" >> /dev/shm/cio
+ mkdir -p $FILEPATH
+ echo "cioinput(): $DIR/$FILEPATH" >> /dev/shm/cio
+ /home/zzhang/cio/bin/toruscp.sh $FILENAME $DIR/$FILEPATH
+ else
+ echo "cioinput(): copy from GPFS $INPUT" >> /dev/shm/cio
+ cp "$PWD/shared/$L" "$DIR/$L"
+ fi
+}
+ciooutput() {
+ OUTPUT=$1
+ FILEPATH=`dirname $OUTPUT`
+ FILENAME=`basename $OUTPUT`
+ TYPE=${OUTPUT%%/*}
+ echo "OUTPUT_TYPE: $TYPE" >> /dev/shm/cio
+
+ if [ "$TYPE" == "_concurrent" ]; then
+ echo "ciooutput(): write intermediate data $OUTPUT" >> /dev/shm/cio
+ /home/zzhang/cio/bin/chtput.rb $FILENAME $RANK
+ cp $OUTPUT /dev/shm/share/
+ else
+ echo "ciooutput(): write regular data $OUTPUT" >> /dev/shm/cio
+ #dd if="$OUTPUT" of="$WFDIR/shared/$OUTPUT" bs=128k
+ echo "$OUTPUT /chirp/multi/${CHIRP_ADD}@stripe/" >> /dev/shm/chirp_add
+ cp "$OUTPUT" /chirp/multi/${CHIRP_ADD}@stripe/
+ fi
+}
+echo $@ >> /dev/shm/log
+RANK=`echo $CONTROL_INIT | awk -F, '{print $4}'`
+echo "RANK: $RANK" >> /dev/shm/log
+COMMANDLINE=$@
+WFDIR=$PWD
+ID=$1
+checkEmpty "$ID" "Missing job ID"
+
+shift
+
+getarg "-jobdir" "$@"
+JOBDIR=$VALUE
+shift $SHIFTCOUNT
+
+checkEmpty "$JOBDIR" "Missing job directory prefix"
+mkdir -p /dev/shm/swift-info/$JOBDIR
+
+closeinfo
+openinfo "/dev/shm/swift-info/$JOBDIR/${ID}-info"
+#openinfo "/dev/null"
+
+logstate "LOG_START"
+
+getarg "-e" "$@"
+EXEC=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-out" "$@"
+STDOUT=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-err" "$@"
+STDERR=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-i" "$@"
+STDIN=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-d" "$@"
+DIRS=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-if" "$@"
+INF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-of" "$@"
+OUTF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-k" "$@"
+KICKSTART=$VALUE
+shift $SHIFTCOUNT
+
+if [ "$1" == "-a" ]; then
+ shift
+else
+ fail 254 "Missing arguments (-a option)"
+fi
+
+if [ "X$SWIFT_JOBDIR_PATH" != "X" ]; then
+ DIR=${SWIFT_JOBDIR_PATH}/$JOBDIR/$ID
+ COPYNOTLINK=1
+else
+ DIR=/dev/shm/swift-work/$JOBDIR/$ID
+ COPYNOTLINK=0
+fi
+
+PATH=$PATH:/bin:/usr/bin
+
+if [ "$PATHPREFIX" != "" ]; then
+export PATH=$PATHPREFIX:$PATH
+fi
+
+IFS="^"
+
+logstate "CREATE_JOBDIR"
+mkdir -p $DIR
+
+logstate "CREATE_INPUTDIR"
+
+for D in $DIRS ; do
+ mkdir -p "$DIR/$D"
+ checkError 254 "Failed to create input directory $D"
+done
+
+#cd $DIR
+logstate "LINK_INPUTS"
+for L in $INF ; do
+ if [ $COPYNOTLINK = 1 ]; then
+ cp "$PWD/shared/$L" "$DIR/$L"
+ checkError 254 "Failed to copy input file $L"
+ else
+ cioinput $L
+ #cp "$PWD/shared/$L" "$DIR/$L"
+ checkError 254 "Failed to link input file $L"
+ fi
+done
+
+logstate "EXECUTE"
+
+cd $DIR
+
+if [ "$KICKSTART" == "" ]; then
+ if [ "$STDIN" == "" ]; then
+ "$EXEC" "$@" 1>"$STDOUT" 2>"$STDERR"
+ else
+ "$EXEC" "$@" 1>"$STDOUT" 2>"$STDERR" <"$STDIN"
+ fi
+ checkError $? "Exit code $?"
+else
+ if [ ! -f "$KICKSTART" ]; then
+ fail 254 "The Kickstart executable ($KICKSTART) was not found"
+ elif [ ! -x "$KICKSTART" ]; then
+ fail 254 "The Kickstart executable ($KICKSTART) does not have the executable bit set"
+ else
+ mkdir -p $WFDIR/kickstart/$JOBDIR
+ if [ "$STDIN" == "" ]; then
+ "$KICKSTART" -H -o "$STDOUT" -e "$STDERR" "$TMPEXEC" "$@" 1>kickstart.xml 2>"$STDERR"
+ else
+ "$KICKSTART" -H -o "$STDOUT" -i "$STDIN" -e "$STDERR" "$TMPEXEC" "$@" 1>kickstart.xml 2>"$STDERR"
+ fi
+ export APPEXIT=$?
+ mv -f kickstart.xml "$WFDIR/kickstart/$JOBDIR/$ID-kickstart.xml" 2>&1 >& "$INFO"
+ checkError 254 "Failed to copy Kickstart record to shared directory"
+ if [ "$APPEXIT" != "0" ]; then
+ fail $APPEXIT "Exit code $APPEXIT"
+ fi
+ fi
+fi
+
+logstate "EXECUTE_DONE"
+
+MISSING=
+for O in $OUTF ; do
+ if [ ! -f "$DIR/$O" ]; then
+ if [ "$MISSING" == "" ]; then
+ MISSING=$O
+ else
+ MISSING="$MISSING, $O"
+ fi
+ fi
+done
+if [ "$MISSING" != "" ]; then
+ fail 254 "The following output files were not created by the application: $MISSING"
+fi
+
+logstate "COPYING_OUTPUTS"
+for O in $OUTF ; do
+ #cp "$DIR/$O" "$WFDIR/shared/$O" 2>&1 >& "$INFO"
+ #cp "$DIR/$O" "$WFDIR/shared/$O"
+ #dd if="$DIR/$O" of="$WFDIR/shared/$JOBDIR/$O" bs=128k
+ #dd if="$DIR/$O" of="$WFDIR/shared/$O" bs=128k
+ ciooutput $O
+ checkError 254 "Failed to copy output file $O to shared directory"
+done
+
+logstate "RM_JOBDIR"
+
+closeinfo
+#rm -f "$WFDIR/info/$JOBDIR/${ID}-info"
+#echo "$WFDIR/info/$JOBDIR/${ID}-info" >> /dev/shm/log
+#mkdir -p "$WFDIR/info/$JOBDIR/"
+#dd if=/dev/shm/swift-info/$JOBDIR/${ID}-info of="$WFDIR/info/$JOBDIR/${ID}-info" bs=128k
+#dd if=/dev/shm/swift-info/$JOBDIR/${ID}-info of="/fuse/tmp/${ID}-info" bs=128k
More information about the Swift-commit
mailing list