[Swift-commit] r2783 - in SwiftApps/LigandAtlas: . swift

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Mon Mar 30 11:48:40 CDT 2009


Author: zzhang
Date: 2009-03-30 11:48:40 -0500 (Mon, 30 Mar 2009)
New Revision: 2783

Added:
   SwiftApps/LigandAtlas/swift/
   SwiftApps/LigandAtlas/swift/run_swift.sh
   SwiftApps/LigandAtlas/swift/vdl-int.k
   SwiftApps/LigandAtlas/swift/wrapper.sh
Log:
changes Zhao has done to swift to make MTIO working


Added: SwiftApps/LigandAtlas/swift/run_swift.sh
===================================================================
--- SwiftApps/LigandAtlas/swift/run_swift.sh	                        (rev 0)
+++ SwiftApps/LigandAtlas/swift/run_swift.sh	2009-03-30 16:48:40 UTC (rev 2783)
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+ if [ -z "$4" ]; then 
+              echo "usage: $0 <FALKON_ID> <NUM_NODES> <swift_script> <working_dir>"
+              exit 1
+          fi
+
+CUR_DIR=`pwd`
+                       
+
+SERVICE_LIST_FILE=/home/falkon/users/${USER}/${1}/config/Client-service-URIs.config
+
+                                               
+NUM_NODES=$2
+SWIFT_SCRIPT=$3
+WORKING=$4
+let NUM_ION=NUM_NODES/64
+
+echo "waiting for at least ${NUM_NODES} nodes to register before submitting workload..."
+falkon-wait-for-allocation.sh ${SERVICE_LIST_FILE} ${NUM_ION}
+                                
+echo "found at least ${NUM_NODES} registered, submitting workload..."
+
+rm -rf sites.xml
+
+echo "<?xml version=\"1.0\" encoding=\"UTF-8\"?>
+<config xmlns=\"http://www.griphyn.org/chimera/GVDS-PoolConfig\"
+xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" version=\"1.5\">" > sites.xml
+
+cat $SERVICE_LIST_FILE | awk -v var=$WORKING 'NR>1 {printf "<pool handle=\"bgp%.3d\">\n\t<gridftp  url=\"local://localhost\"/>\n\t<execution provider=\"deef\" url=\"http://%s:50001/wsrf/services/GenericPortal/core/WS/GPFactoryService\"/>\n\t<workdirectory>%s</workdirectory>\n\t<profile namespace=\"karajan\" key=\"jobThrottle\">8</profile>\n\t<profile namespace=\"karajan\" key=\"initialScore\">1000</profile>\n</pool>\n\n", NR-2, $(1), var}' >> sites.xml
+
+echo "</config>" >> sites.xml
+
+time swift -sites.file ./sites.xml -tc.file ./tc.data $3


Property changes on: SwiftApps/LigandAtlas/swift/run_swift.sh
___________________________________________________________________
Name: svn:executable
   + *

Added: SwiftApps/LigandAtlas/swift/vdl-int.k
===================================================================
--- SwiftApps/LigandAtlas/swift/vdl-int.k	                        (rev 0)
+++ SwiftApps/LigandAtlas/swift/vdl-int.k	2009-03-30 16:48:40 UTC (rev 2783)
@@ -0,0 +1,514 @@
+import("sys.k")
+import("task.k")
+import("vdl-lib.xml")
+/*
+ * Things that are not exposed to the translated file
+ */
+
+global(LOG:DEBUG, "debug")
+global(LOG:INFO, "info")
+global(LOG:WARN, "warn")
+global(LOG:ERROR, "error")
+global(LOG:FATAL, "fatal")
+
+namespace("vdl"
+	export(
+		element(rmdir, [dir, host]
+			parallelFor(entry, file:list(dir, host=host)
+				epath := concat(dir, "/", entry)
+				if(
+					file:isDirectory(epath, host=host) rmdir(epath, host)
+					file:remove(epath, host=host)
+				)
+			)
+			dir:remove(dir, host=host)
+		)
+		
+		element(createdirs, [path, dir, host]
+			dc := dircat(dir, path)
+			log(LOG:INFO, "START path={path} dir={dir} - Creating directory structure")
+
+			dir:make(dc, host=host)
+		)
+							
+		element(isDone, [stageout]
+			sys:and(
+				for(pv, stageout
+					[path, var] := each(pv)
+					vdl:isLogged(var, path)
+				)
+				sys:not(isEmpty(stageout))
+			)
+		)
+			
+		element(mark, [restarts, err, optional(mapping)]
+			if(
+				err for(pv, restarts
+					[path, var] := each(pv)
+					vdl:setFutureFault(var, path=path, mapping=mapping)
+				)
+			)
+		)
+		
+		element(flatten, [...]
+			if (
+				isEmpty(...) ""
+				
+				concat(
+					for(i, butLast(...), if(isList(i) flatten(i) i), "^") last(...)
+				)
+			)
+		)
+		
+		element(checkJobStatus, [rhost, wfdir, jobid, tr, jobdir]
+			log(LOG:DEBUG, "START jobid={jobid}")
+			try(
+				sequential(
+					/*
+					 * This is a bit of optimization, but I'm not completely
+					 * sure of its correctness. The goal is to both detect
+					 * the presence of the success file and remove it, all
+					 * in one operation. It relies on file:remove() throwing 
+					 * an exception if the file is not there. 
+					 */
+					file:remove("{wfdir}/status/{jobdir}/{jobid}-success", host=rhost)
+					log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
+				)
+				if(
+					file:exists("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost) then(
+						log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
+						task:transfer(srchost=rhost, srcdir="{wfdir}/status/{jobdir}", srcfile="{jobid}-error")
+						error := parallel(
+							file:remove("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)
+							sequential(
+								str:strip(file:read("{jobid}-error"))
+								file:remove("{jobid}-error")
+							)
+						)
+						throw(error)
+					)
+					else (	
+						log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
+						throw("No status file was found. Check the shared filesystem on {rhost}")  
+					)
+				)
+			)
+		)
+		
+		element(initSharedDir, [rhost]
+			once(list(rhost, "shared")
+				vdl:setprogress("Initializing site shared directory")
+
+				log(LOG:INFO, "START host={rhost} - Initializing shared directory")
+
+				wfdir := "{VDL:SCRIPTNAME}-{VDL:RUNID}"
+				sharedDir := dircat(wfdir, "shared")
+				dir:make(sharedDir, host=rhost)
+				transfer(srcdir="{vds.home}/libexec/", srcfile="wrapper.sh", destdir=sharedDir, desthost=rhost)
+				transfer(srcdir="{vds.home}/libexec/", srcfile="seq.sh", destdir=sharedDir, desthost=rhost)
+				dir:make(dircat(wfdir, "kickstart"), host=rhost)
+				dir:make(dircat(wfdir, "status"), host=rhost)
+				dir:make(dircat(wfdir, "info"), host=rhost)
+				/*l := list(for(i, sys:range(97, 122), chr(i)), for(i, sys:range(48, 57), chr(i)))
+				for(one, l,
+					 for(two, l,
+						  dir:make(dircat("{wfdir}/shared/" , str:concat(one , "/" , two)), host=rhost)
+						  )
+				)
+				for(one, l,
+					 for(two, l,
+						  dir:make(dircat("{wfdir}/info/" , str:concat(one , "/" , two)), host=rhost)
+						  )
+				)*/
+				wfdir, sharedDir
+				//we send the cleanup data to vdl:main()
+				to(cleanup, list(wfdir, rhost))
+				log(LOG:INFO, "END host={rhost} - Done initializing shared directory")
+			)
+		)
+		
+		element(inFileDirs, [stageins]
+			for(file, stageins
+		 		reldirname(file)
+			)
+		)
+		
+		element(outFileDirs, [stageouts] 
+			for(pv, stageouts
+				[path, var] := each(pv)
+			
+				file := vdl:filename(vdl:getfield(var, path = path))
+				
+				dirname(file)
+			)
+		)
+		
+		element(inFiles, [stageins]
+			pathnames(stageins)
+		)
+		
+		element(outFiles, [stageouts]
+			for(pv, stageouts
+				[path, var] := each(pv)
+			
+				file := vdl:filename(vdl:getfield(var, path = path))
+				
+				file
+			)
+		)
+		
+		element(fileDirs, [stageins, stageouts]
+			list(
+				unique(
+					inFileDirs(stageins)
+					outFileDirs(stageouts)
+				)
+			)
+		)
+		
+		element(createDirSet, [jobid, destdir, host, dirs]
+			/*
+			 * Ideally this would be done by creating a tree of the directories
+			 * to be created and (eventually) exploiting the concurrency in that.
+			 */
+			log(LOG:INFO, "START jobid={jobid} host={host} - Initializing directory structure")
+			for(u, dirs
+				cacheOn(list(u, destdir, host)
+					createdirs(u, destdir, host)
+				)
+			)
+			log(LOG:INFO, "END jobid={jobid} - Done initializing directory structure")
+		)
+		
+		element(cleanup, [dir, host]
+			log(LOG:INFO, "START dir={dir} host={host}")
+			if(vdl:configProperty("sitedir.keep") == "false"
+				task:execute("/bin/rm", arguments="-rf {dir}", host=host, batch=true)
+			)
+			log(LOG:INFO, "END dir={dir} host={host}")
+		)
+		
+		element(cleanups, [cleanup]
+			log(LOG:INFO, "START cleanups={cleanup}")
+			parallelFor(i, cleanup
+				[dir, host] := each(i)
+				try(
+					cleanup(dir, host)
+					catch(".*",
+						log(LOG:DEBUG, "EXCEPTION - Exception caught while cleaning up", exception)
+						to(warnings, exception("Cleanup on {host} failed", exception))
+					)
+				)
+			)
+		)
+		
+		element(cleanupFiles, [files, host]
+			uParallelFor(r, files
+				log(LOG:INFO, "Purging ", r, " on ", host)
+				file:remove(r, host=host)
+				vdl:cacheFileRemoved(r, host)
+			)
+		)
+				
+		element(doStagein, [jobid, files, dir, host]
+			log(LOG:INFO, "START jobid={jobid} - Staging in files")
+			uParallelFor(file, files
+				provider := provider(file)
+				srchost := hostname(file)
+				srcdir := dirname(file)
+				destdir := dircat(dir, reldirname(file))
+				filename := basename(file)
+				size := file:size("{srcdir}/{filename}", host=srchost, provider=provider)
+
+				vdl:cacheAddAndLockFile(filename, destdir, host, size
+					cleanupFiles(cacheFilesToRemove, host)
+					log(LOG:DEBUG, "FILE_STAGE_IN_START file={file} srchost={srchost} srcdir={srcdir} srcname={filename} ",
+						"desthost={host} destdir={destdir} provider={provider}")
+					restartOnError(".*", 2
+					    task:transfer(srcprovider=provider, srchost=srchost, srcfile=filename, 
+					    	srcdir=srcdir, desthost=host, destdir=destdir)
+					)
+					log(LOG:DEBUG, "FILE_STAGE_IN_END file={file} srchost={srchost} srcdir={srcdir} srcname={filename} ",
+						"desthost={host} destdir={destdir} provider={provider}")
+				)
+			)
+			log(LOG:INFO, "END jobid={jobid} - Staging in finished")
+		)
+		
+		element(doStageout, [jobid, stageouts, dir, host]
+			log(LOG:INFO, "START jobid={jobid} - Staging out files")
+			done := list(
+				uparallelFor(pv, stageouts
+					[path, var] := each(pv)
+					file := vdl:absfilename(vdl:getfield(var, path = path))
+					provider := vdl:provider(file)
+					dhost := vdl:hostname(file)
+					rdir := dircat(dir, reldirname(file))
+					bname := basename(file)
+					ldir := dirname(file)
+					fullLocal := dircat(ldir, bname)
+					fullRemote := dircat(rdir, bname)
+										
+					log(LOG:DEBUG, "FILE_STAGE_OUT_START srcname={bname} srcdir={rdir} srchost={host} ",
+							"destdir={ldir} desthost={dhost} provider={provider}")
+					//make sure we do have the directory on the client side
+					/*dir:make(ldir)
+					restartOnError(".*", 2
+					    task:transfer(srchost=host, srcfile=bname,
+					        srcdir=rdir, destdir=ldir, desthost=dhost, destprovider=provider)
+					)*/
+					log(LOG:DEBUG, "FILE_STAGE_OUT_END srcname={bname} srcdir={rdir} srchost={host} ",
+						"destdir={ldir} desthost={dhost} provider={provider}")
+	
+					list(bname, rdir, host, file:size(fullLocal))
+				)
+			)
+			uParallelFor(f, done
+				[bname, rdir, host, size] := each(f)
+				vdl:cacheAddFile(bname, rdir, host, size
+					cleanupFiles(cacheFilesToRemove, host)
+				)
+			)
+			log(LOG:INFO, "END jobid={jobid} - Staging out finished")
+		)
+
+		element(doRestartlog, [restartouts]
+			uParallelFor(f, restartouts,
+				[path, var] := each(f)
+				vdl:logvar(var, path)
+			)
+		)
+		
+		element(graphStuff, [tr, stagein, stageout, err, optional(args)]
+			if(
+				vdl:configProperty("pgraph") != "false" then(
+					errprops := if(err ",color=lightsalmon" "")
+					tp := vdl:threadPrefix()
+					to(graph, 
+						concat(str:quote(tp), " [label=", str:quote("{tr}{errprops}"), "]")
+					)
+					for(si, stagein
+						si := basename(si)
+						to(graph
+							concat(str:quote(si), " [shape=parallelogram{errprops}]")
+							concat(str:quote(si), " -> ", str:quote(tp))
+						)
+					)
+					for(pv, stageout
+						[path, var] := each(pv)
+						file := vdl:fileName(vdl:getfield(var, path=path))
+						file := basename(file)
+						label := vdl:niceName(var, path = path)
+						to(graph
+							concat(str:quote(file), " [shape=parallelogram,label=", 
+								str:quote("{label}{errprops}"), "]")
+							concat(str:quote(tp), " -> ", str:quote(file))
+						)
+					)
+				)
+			)
+		)
+		
+		element(fileSizes, [files]
+			math:sum(
+				for(f, files, file:size(file))
+			)
+		)
+		
+		element(transferStandardFiles, [rhost, tmpdir, jobid, stdout, stderr]
+			concat(
+				for(f, list(list("stderr.txt", stderr), list("stdout.txt", stdout))	
+					[name, file] := each(f)
+					destfile := "{jobid}-{file}"
+					nl()
+					"{name}: "
+					try(
+						sequential(
+							task:transfer(srchost=rhost, srcdir=tmpdir, srcfile=file,
+								destfile=destfile)
+
+							file:read(destfile)
+						)
+						nl()
+					)
+					maybe(file:remove(destfile))
+				)
+			)
+		)
+		
+		element(transferKickstartRec, [rhost, wfdir, jobid, jobdir]
+			if(sys:not(file:exists("{VDL:SCRIPTNAME}-{VDL:RUNID}.d"))
+				task:dir:make("{VDL:SCRIPTNAME}-{VDL:RUNID}.d")
+			)
+			recfile := "{jobid}-kickstart.xml"
+			srcdir := dircat(concat(wfdir, "/kickstart/"), jobdir)
+			try(
+				task:transfer(srchost=rhost, srcdir=srcdir, srcfile=recfile, destdir="{VDL:SCRIPTNAME}-{VDL:RUNID}.d/")
+				(
+					maybe(file:remove(recfile))
+					log(LOG:WARN, "Failed to transfer kickstart records from {srcdir}/{rhost}", exception)
+				)
+			)
+			recfile
+		)
+
+		element(transferWrapperLog, [rhost, wfdir, jobid, jobdir]
+			if(sys:not(file:exists("{VDL:SCRIPTNAME}-{VDL:RUNID}.d"))
+				task:dir:make("{VDL:SCRIPTNAME}-{VDL:RUNID}.d")
+			)
+			recfile := "{jobid}-info"
+			srcdir := dircat(concat(wfdir, "/info/"), jobdir)
+			try(
+				task:transfer(srchost=rhost, srcdir=srcdir, srcfile=recfile, destdir="{VDL:SCRIPTNAME}-{VDL:RUNID}.d/")
+				(
+					maybe(file:remove(recfile))
+					log(LOG:WARN, "Failed to transfer wrapper log from {srcdir}/{rhost}")
+					log(LOG:DEBUG, "Exception for wrapper log failure from {srcdir}/{rhost}: ", exception)
+				)
+			)
+			recfile
+		)
+
+		element(execute2, [tr, optional(arguments, stdin, stdout, stderr), stagein, stageout,  restartout,
+			replicationGroup, replicationChannel]
+			stagein := list(unique(each(stagein)))
+			stageout := list(unique(each(stageout)))
+			allocateHost(rhost, constraints=vdl:jobConstraints(tr)
+				
+				[wfdir, sharedDir] := try(
+					initSharedDir(rhost)
+					throw(exception("Could not initialize shared directory on {rhost}", exception))
+				)
+				
+				uid := uid()
+				jobdir := concat(substring(uid, from=0,to=1), "/", substring(uid, from=1, to=2))
+				jobid := concat(tr, "-", uid)
+				
+				log(LOG:DEBUG, "THREAD_ASSOCIATION jobid={jobid} thread={#thread} host={rhost} replicationGroup={replicationGroup}")
+				vdl:setprogress("Stage in")
+				tmpdir := dircat(concat(wfdir, "/jobs/", jobdir), jobid)
+				
+				stdout := try(stdout, "stdout.txt")
+				stderr := try(stderr, "stderr.txt")
+				
+				kickstart := vdl:kickstart(rhost)
+				
+				try(
+					sequential(
+						fileDirs := fileDirs(stagein, stageout)
+						
+						createDirSet(jobid, sharedDir, rhost, fileDirs)
+						doStagein(jobid, stagein, sharedDir, rhost)
+
+						log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", maybe(" arguments=", arguments), " tmpdir={tmpdir} host={rhost}")
+				
+						vdl:setprogress("Submitting")
+						vdl:execute("/bin/bash",
+							list("shared/wrapper.sh", jobid,
+								"-jobdir", jobdir,
+								"-e", vdl:executable(tr, rhost), 
+								"-out", stdout, 
+								"-err", stderr, 
+								"-i", maybe(stdin),
+								"-d", flatten(each(fileDirs)),
+								"-if", flatten(infiles(stagein)), 
+								"-of", flatten(outfiles(stageout)),
+								"-k", kickstart,
+								"-a", maybe(each(arguments)))
+							directory=wfdir
+							redirect=false
+							host=rhost
+							vdl:tcprofile(tr, rhost) //this gets various app params from the tc, such as environment, walltime, etc
+							replicationGroup=replicationGroup
+							replicationChannel=replicationChannel
+							jobid=jobid
+						)
+							
+		
+						log(LOG:DEBUG, "STAGING_OUT jobid={jobid}")
+
+									
+						/* need to stage the files to upper scratch area in case they are not transfered to another site
+						   before all the files get cleaned out */
+					
+						
+						vdl:setprogress("Stage out")
+						doStageout(jobid, stageout, sharedDir, rhost)
+						doRestartlog(restartout)
+						if(
+							kickstart != "" & vdl:configProperty("kickstart.always.transfer") == "true"
+							discard(transferKickstartRec(rhost, wfdir, jobid, jobdir))
+						)
+						if(
+							vdl:configProperty("wrapperlog.always.transfer") == "true"
+							discard(transferWrapperLog(rhost, wfdir, jobid, jobdir))
+						)
+						vdl:cacheUnlockFiles(stagein, sharedDir, rhost, cleanupFiles(cacheFilesToRemove, rhost))
+						log(LOG:DEBUG, "JOB_END jobid={jobid}")
+					)
+					catch("^Abort$"
+						log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
+						vdl:cacheUnlockFiles(stagein, sharedDir, rhost, force=false
+							cleanupFiles(cacheFilesToRemove, rhost)
+						)
+						throw(exception)
+					)
+					catch("^(?!Abort$).*"
+						vdl:setprogress("Failed but can retry")
+						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
+						vdl:cacheUnlockFiles(stagein, sharedDir, rhost, force=false
+							cleanupFiles(cacheFilesToRemove, rhost)
+						)
+						
+						outs := transferStandardFiles(rhost, tmpdir, jobid, stdout, stderr)
+						
+						discard(transferWrapperLog(rhost, wfdir, jobid, jobdir))
+						kickstartRec := if(
+							kickstart == "" ""
+							else(
+								try(
+									(
+										recfile := transferKickstartRec(rhost, wfdir, jobid, jobdir)
+										"KickstartRecord: {recfile}"
+									)
+									""
+								)
+							)
+						)
+						
+						throw(
+							exception(
+								concat(
+									"Exception in {tr}:", nl(),
+									maybe("Arguments: {arguments}", nl()),
+									"Host: {rhost}", nl(),
+									"Directory: {tmpdir}",
+									"{outs}", nl(),
+									"----", nl(),
+									kickstartRec
+								)
+								exception
+							)
+						)
+					)
+				)
+			)
+		)
+		
+		element(generateProvenanceGraph, [gdata]
+			pgraph := vdl:configProperty("pgraph")
+			gname := if(pgraph == "true" "{VDL:SCRIPTNAME}-{VDL:RUNID}.dot" pgraph)
+			file:write(gname
+				"digraph SwiftProvenance {{", nl()
+				"	graph [", vdl:configProperty("pgraph.graph.options"), "];", nl()
+				"	node [", vdl:configProperty("pgraph.node.options"), "];", nl()
+						
+				for(i, gdata
+					"	", i, nl()
+				)
+				"}", nl()
+			)
+			log(LOG:INFO, "Provenance graph saved in ", gname)
+		)
+	)
+)

Added: SwiftApps/LigandAtlas/swift/wrapper.sh
===================================================================
--- SwiftApps/LigandAtlas/swift/wrapper.sh	                        (rev 0)
+++ SwiftApps/LigandAtlas/swift/wrapper.sh	2009-03-30 16:48:40 UTC (rev 2783)
@@ -0,0 +1,288 @@
+# this script must be invoked inside of bash, not plain sh
+PATH=/fuse/bin:/fuse/usr/bin:$PATH
+infosection() {
+	echo >& "$INFO"
+	echo "_____________________________________________________________________________" >& "$INFO"
+	echo >& "$INFO"
+	echo "        $1" >& "$INFO" 
+	echo "_____________________________________________________________________________" >& "$INFO"
+	echo >& "$INFO"
+}
+
+info() {
+	infosection "uname -a"
+	uname -a 2>&1 >& "$INFO"
+	infosection "id"
+	id 2>&1 >& "$INFO"
+	infosection "env"
+	env 2>&1 >& "$INFO"
+	infosection "df"
+	df 2>&1 >& "$INFO"
+	infosection "/proc/cpuinfo"
+	cat /proc/cpuinfo 2>&1 >& "$INFO"
+	infosection "/proc/meminfo"
+	cat /proc/meminfo 2>&1 >& "$INFO"
+	infosection "command line"
+	echo $COMMANDLINE 2>&1 >& "$INFO"
+}
+
+logstate() {
+	echo "Progress " `date +"%Y-%m-%d %H:%M:%S"` " $@" >& "$INFO"
+}
+
+log() {
+	echo "$@" >& "$INFO"
+}
+
+fail() {
+	EC=$1
+	shift
+	log $@
+	info
+	exit $EC
+}
+
+checkError() {
+	if [ "$?" != "0" ]; then
+		fail $@
+	fi
+}
+
+checkEmpty() {
+	if [ "$1" == "" ]; then
+		shift
+		fail 254 $@
+	fi
+}
+
+getarg() {
+	NAME=$1
+	shift
+	VALUE=""
+	SHIFTCOUNT=0
+	if [ "$1" == "$NAME" ]; then
+		shift
+		let "SHIFTCOUNT=$SHIFTCOUNT+1"
+		while [ "${1:0:1}" != "-" ] && [ "$#" != "0" ]; do
+			VALUE="$VALUE $1"
+			shift
+			let "SHIFTCOUNT=$SHIFTCOUNT+1"
+		done
+	else
+		fail 254 "Missing $NAME argument"
+	fi
+	VALUE="${VALUE:1}"
+}
+
+openinfo() {
+	exec 3<> $1
+	INFO=3
+}
+
+closeinfo() {
+	exec 3>&-
+}
+
+cioinput() {
+    INPUT=$1
+    FILEPATH=`dirname $INPUT`
+    FILENAME=`basename $INPUT`
+    TYPE=${INPUT%%/*}
+    echo "INPUT_TYPE: $TYPE" >> /dev/shm/cio
+    if [ "$TYPE" == "common" ] && [ -e /dev/shm/share/$FILENAME ]; then
+	echo "cioinput(): link for common input $INPUT" >> /dev/shm/cio
+	ln -s "/dev/shm/share/$FILENAME" "$DIR/$L"
+    elif [ "$TYPE" == "_concurrent" ]; then
+	echo "cioinput(): toruscp for intermediate data $INPUT" >> /dev/shm/cio
+	mkdir -p $FILEPATH
+	echo "cioinput(): $DIR/$FILEPATH" >> /dev/shm/cio
+	/home/zzhang/cio/bin/toruscp.sh $FILENAME $DIR/$FILEPATH
+    else
+	echo "cioinput(): copy from GPFS $INPUT" >> /dev/shm/cio
+	cp "$PWD/shared/$L" "$DIR/$L"
+    fi
+}
+ciooutput() {
+    OUTPUT=$1
+    FILEPATH=`dirname $OUTPUT`
+    FILENAME=`basename $OUTPUT`
+    TYPE=${OUTPUT%%/*}
+    echo "OUTPUT_TYPE: $TYPE" >> /dev/shm/cio
+
+    if [ "$TYPE" == "_concurrent" ]; then
+	echo "ciooutput(): write intermediate data $OUTPUT" >> /dev/shm/cio
+	/home/zzhang/cio/bin/chtput.rb $FILENAME $RANK
+	cp $OUTPUT /dev/shm/share/
+    else
+	echo "ciooutput(): write regular data $OUTPUT" >> /dev/shm/cio
+	#dd if="$OUTPUT" of="$WFDIR/shared/$OUTPUT" bs=128k
+	echo "$OUTPUT /chirp/multi/${CHIRP_ADD}@stripe/" >> /dev/shm/chirp_add
+	cp "$OUTPUT" /chirp/multi/${CHIRP_ADD}@stripe/
+    fi
+}
+echo $@ >> /dev/shm/log
+RANK=`echo $CONTROL_INIT | awk -F, '{print $4}'`
+echo "RANK: $RANK" >> /dev/shm/log
+COMMANDLINE=$@
+WFDIR=$PWD
+ID=$1
+checkEmpty "$ID" "Missing job ID"
+
+shift
+
+getarg "-jobdir" "$@"
+JOBDIR=$VALUE
+shift $SHIFTCOUNT
+
+checkEmpty "$JOBDIR" "Missing job directory prefix"
+mkdir -p /dev/shm/swift-info/$JOBDIR
+
+closeinfo
+openinfo "/dev/shm/swift-info/$JOBDIR/${ID}-info"
+#openinfo "/dev/null"
+
+logstate "LOG_START"
+
+getarg "-e" "$@"
+EXEC=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-out" "$@"
+STDOUT=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-err" "$@"
+STDERR=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-i" "$@"
+STDIN=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-d" "$@"
+DIRS=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-if" "$@"
+INF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-of" "$@"
+OUTF=$VALUE
+shift $SHIFTCOUNT
+
+getarg "-k" "$@"
+KICKSTART=$VALUE
+shift $SHIFTCOUNT
+
+if [ "$1" == "-a" ]; then
+	shift
+else
+	fail 254 "Missing arguments (-a option)"
+fi
+
+if [ "X$SWIFT_JOBDIR_PATH" != "X" ]; then
+  DIR=${SWIFT_JOBDIR_PATH}/$JOBDIR/$ID
+  COPYNOTLINK=1
+else
+  DIR=/dev/shm/swift-work/$JOBDIR/$ID
+  COPYNOTLINK=0
+fi
+
+PATH=$PATH:/bin:/usr/bin
+
+if [ "$PATHPREFIX" != "" ]; then
+export PATH=$PATHPREFIX:$PATH
+fi
+
+IFS="^"
+
+logstate "CREATE_JOBDIR"
+mkdir -p $DIR
+
+logstate "CREATE_INPUTDIR"
+
+for D in $DIRS ; do
+    mkdir -p "$DIR/$D"
+    checkError 254 "Failed to create input directory $D"
+done
+
+#cd $DIR
+logstate "LINK_INPUTS"
+for L in $INF ; do
+	if [ $COPYNOTLINK = 1 ]; then
+	        cp "$PWD/shared/$L" "$DIR/$L"
+		checkError 254 "Failed to copy input file $L"
+	else
+	    cioinput $L
+	        #cp "$PWD/shared/$L" "$DIR/$L"
+		checkError 254 "Failed to link input file $L"
+	fi
+done
+
+logstate "EXECUTE"
+
+cd $DIR
+
+if [ "$KICKSTART" == "" ]; then
+	if [ "$STDIN" == "" ]; then
+	    "$EXEC" "$@" 1>"$STDOUT" 2>"$STDERR"
+	else
+	    "$EXEC" "$@" 1>"$STDOUT" 2>"$STDERR" <"$STDIN"
+	fi
+	checkError $? "Exit code $?"
+else
+	if [ ! -f "$KICKSTART" ]; then
+		fail 254 "The Kickstart executable ($KICKSTART) was not found"		
+	elif [ ! -x "$KICKSTART" ]; then
+		fail 254 "The Kickstart executable ($KICKSTART) does not have the executable bit set"
+	else
+		mkdir -p $WFDIR/kickstart/$JOBDIR
+		if [ "$STDIN" == "" ]; then
+			"$KICKSTART" -H -o "$STDOUT" -e "$STDERR" "$TMPEXEC" "$@" 1>kickstart.xml 2>"$STDERR"
+		else
+			"$KICKSTART" -H -o "$STDOUT" -i "$STDIN" -e "$STDERR" "$TMPEXEC" "$@" 1>kickstart.xml 2>"$STDERR"
+		fi
+		export APPEXIT=$?
+		mv -f kickstart.xml "$WFDIR/kickstart/$JOBDIR/$ID-kickstart.xml" 2>&1 >& "$INFO"
+		checkError 254 "Failed to copy Kickstart record to shared directory"
+		if [ "$APPEXIT" != "0" ]; then
+			fail $APPEXIT "Exit code $APPEXIT"
+		fi
+	fi
+fi
+
+logstate "EXECUTE_DONE"
+
+MISSING=
+for O in $OUTF ; do
+	if [ ! -f "$DIR/$O" ]; then
+		if [ "$MISSING" == "" ]; then
+			MISSING=$O
+		else
+			MISSING="$MISSING, $O"
+		fi
+	fi
+done
+if [ "$MISSING" != "" ]; then
+	fail 254 "The following output files were not created by the application: $MISSING"
+fi
+
+logstate "COPYING_OUTPUTS"
+for O in $OUTF ; do
+	#cp "$DIR/$O" "$WFDIR/shared/$O" 2>&1 >& "$INFO"
+        #cp "$DIR/$O" "$WFDIR/shared/$O"
+        #dd if="$DIR/$O" of="$WFDIR/shared/$JOBDIR/$O" bs=128k
+        #dd if="$DIR/$O" of="$WFDIR/shared/$O" bs=128k
+        ciooutput $O
+	checkError 254 "Failed to copy output file $O to shared directory"
+done
+
+logstate "RM_JOBDIR"
+
+closeinfo
+#rm -f "$WFDIR/info/$JOBDIR/${ID}-info"
+#echo "$WFDIR/info/$JOBDIR/${ID}-info" >> /dev/shm/log
+#mkdir -p "$WFDIR/info/$JOBDIR/"
+#dd if=/dev/shm/swift-info/$JOBDIR/${ID}-info of="$WFDIR/info/$JOBDIR/${ID}-info" bs=128k
+#dd if=/dev/shm/swift-info/$JOBDIR/${ID}-info of="/fuse/tmp/${ID}-info" bs=128k




More information about the Swift-commit mailing list