[Swift-commit] r5814 - trunk/libexec

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Sat Jun 23 15:44:26 CDT 2012


Author: hategan
Date: 2012-06-23 15:44:26 -0500 (Sat, 23 Jun 2012)
New Revision: 5814

Modified:
   trunk/libexec/_swiftwrap.wrapperstaging
   trunk/libexec/vdl-int-wrapper-staging.k
Log:
uniform file naming convention for wrapper staging (fixes status.mode=files for wrapper staging); properly deal with URLs that are not automatically prefixed using URLPREFIX

Modified: trunk/libexec/_swiftwrap.wrapperstaging
===================================================================
--- trunk/libexec/_swiftwrap.wrapperstaging	2012-06-23 20:41:54 UTC (rev 5813)
+++ trunk/libexec/_swiftwrap.wrapperstaging	2012-06-23 20:44:26 UTC (rev 5814)
@@ -55,23 +55,47 @@
 	[[ $DEBUG == 1 ]] && echo "$@" >& "$INFO"
 }
 
+localPath() {
+	# remove protocol://
+	PATH="${1#*://}"
+	# remove leading "/" if present
+	PATH="${PATH#/}"
+	log "Transformed $1 to $PATH"
+	echo $PATH
+}
+
 stagein() {
 	SRC=$1
 	DST=$2
 	
-	log "Staging in $URLPREFIX/$SRC to $DST"
+	case $SRC in
+		*://*)
+			;;
+		*)
+			SRC=$URLPREFIX/$SRC
+			;;
+	esac
 	
-	case $URLPREFIX in
+	log "Staging in $SRC to $DST"
+	
+	case $SRC in
 		file://*)
-			SRC2="${URLPREFIX#file://}/$SRC"
-			log "Copying $SRC2 to $DST"
-			cp $SRC2 $DST
+			SRC=${SRC#file://}
+			log "Copying $SRC to $DST"
+			if [ ! -f $SRC ]; then
+				fail 254 "Cannot stage in $SRC. File not found."
+			fi
+			cp $SRC $DST 2>&1 >& "$INFO"
 			;;
+		http://*)
+			log "Fetching $SRC using wget"
+			wget $SRC -O $DST 2>&1 >& "$INFO"
+			;;
 		*://*)
-			fail 254 "Cannot handle protocol ${URLPREFIX%%://*}"
+			fail 254 "Cannot handle protocol ${SRC%%://*}"
 			;;
 		*)
-			fail 254 "Invalid URL prefix: $URLPREFIX"
+			fail 254 "Invalid URL: $SRC"
 			;;
 	esac
 }
@@ -80,21 +104,30 @@
 	SRC=$1
 	DST=$2
 	
-	log "Staging out $SRC to $URLPREFIX/$DST"
+	case $DST in
+		*://*)
+			;;
+		*)
+			DST=$URLPREFIX/$DST
+	esac
 	
-	case $URLPREFIX in
+	log "Staging out $SRC to $DST"
+	
+	local DIR
+	
+	case $DST in
 		file://*)
-			DST2="${URLPREFIX#file://}/$DST"
-			DIR=`dirname $DST2`
+			DST=${DST#file://}
+			DIR=`dirname $DST`
 			mkdir -p $DIR
-			log "Copying $SRC to $DST2"
-			cp $SRC $DST2
+			log "Copying $SRC to $DST"
+			cp $SRC $DST 2>&1 >& "$INFO"
 			;;
 		*://*)
-			fail 254 "Cannot handle protocol ${URLPREFIX%%://*}"
+			fail 254 "Cannot handle protocol ${DST%%://*}"
 			;;
 		*)
-			fail 254 "Invalid URL prefix: $URLPREFIX"
+			fail 254 "Invalid URL: $DST"
 			;;
 	esac
 }
@@ -105,8 +138,8 @@
 	shift
 	
 	if [ "X$DIR" != "X" ]; then
-		echo $@ >"$DIR/_error"
-		stageout "$DIR/_error" "$JOBDIR/error"
+		echo $@ >"$DIR/_swift.error"
+		stageout "$DIR/_swift.error" "$JOBDIR/_swift.error"
 	else
 		echo $@
 	fi
@@ -266,12 +299,12 @@
 closeinfo
 
 if [ -z $MPI_RANK ]; then
-	INFOFILE="$INFODIR/_info"
+	INFOFILE="$INFODIR/_swift.info"
 else
 	# Rename info file for each rank
-	INFOFILE="$INFODIR/_info-${PMI_RANK}"
+	INFOFILE="$INFODIR/_swift.info-${PMI_RANK}"
 	# Build list of per-rank info files
-	echo $INFOFILE >> $INFODIR/_info
+	echo $INFOFILE >> $INFODIR/_swift.info
 fi
 rm -f $INFOFILE
 openinfo "$INFOFILE"
@@ -421,7 +454,7 @@
 			continue
 		fi
 		
-		stagein $L "$DIR/$L"
+		stagein $L "$DIR/`localPath $L`"
 	done
 	
 	if [[ $CDM_FILE != "" ]]; then
@@ -484,11 +517,12 @@
 
 	MISSING=
 	for O in $OUTF ; do
-		if [ ! -f "$DIR/$O" ]; then
+		LO=`localPath $O`
+		if [ ! -f "$DIR/$LO" ]; then
 			if [ "$MISSING" == "" ]; then
-				MISSING=$O
+				MISSING=$LO
 			else
-				MISSING="$MISSING, $O"
+				MISSING="$MISSING, $LO"
 			fi
 		fi
 	done
@@ -500,7 +534,7 @@
 	logstate "MOVING_OUTPUTS $OUTF"
 	for O in $OUTF ; do
 		if ! contains SKIPPED_OUTPUT $O ; then
-			stageout "$DIR/$O" "$O" 
+			stageout "$DIR/`localPath $O`" "$O" 
 		fi
 	done
 	
@@ -508,9 +542,9 @@
 	cdm_gather $GATHER_OUTPUT
 	
 	if [ "$STATUSMODE" = "files" ]; then
-		logstate "TOUCH_SUCCESS"
-		touch ${ID}-success
-		stageout "$DIR/${ID}-success" "$JOBDIR/success"  
+		logstate "TOUCH_SUCCESS $DIR `pwd`"
+		touch _swift.success
+		stageout "$DIR/_swift.success" "$JOBDIR/_swift.success"  
 	fi
 	
 	log "Moving back to workflow directory $WFDIR"
@@ -520,7 +554,7 @@
 	fi
 	
 	if [ "$WRAPPERLOG_ALWAYS_TRANSFER" == "true" ]; then
-		stageout "$INFOFILE" "$JOBDIR/info"
+		stageout "$INFOFILE" "$JOBDIR/_swift.info"
 	fi
 	if [ "$SITEDIR_KEEP" != "true" ]; then
 		logstate "RM_JOBDIR"
@@ -537,7 +571,7 @@
 closeinfo
 
 if [ "$WRAPPER_LOG_ALWAYS_TRANSFER" == "true" ]; then
-	stageout "$DIR/${ID}-info" "$JOBID/${ID}-info"
+	stageout "$DIR/_swift.info" "$JOBID/_swift.info"
 fi
 
 # ensure we exit with a 0 after a successful execution

Modified: trunk/libexec/vdl-int-wrapper-staging.k
===================================================================
--- trunk/libexec/vdl-int-wrapper-staging.k	2012-06-23 20:41:54 UTC (rev 5813)
+++ trunk/libexec/vdl-int-wrapper-staging.k	2012-06-23 20:44:26 UTC (rev 5814)
@@ -22,25 +22,7 @@
 
 namespace("vdl"
 	export(
-		element(rmdir, [dir, host]
-			parallelFor(entry, file:list(dir, host=host)
-				epath := "{dir}/{entry}"
-				if(
-					file:isDirectory(epath, host=host) rmdir(epath, host)
-					file:remove(epath, host=host)
-				)
-			)
-			dir:remove(dir, host=host)
-		)
-
-		element(createdirs, [path, dir, host]
-			dc := dircat(dir, path)
-			log(LOG:INFO, "START path={path} dir={dir} - Creating directory structure")
-
-			dir:make(dc, host=host)
-		)
-
-		element(checkJobStatus, [wfdir, jobid, tr]
+		element(checkJobStatus, [jobdir, jobid, tr]
 			log(LOG:DEBUG, "START jobid={jobid}")
 			try(
 				sequential(
@@ -51,39 +33,24 @@
 					 * in one operation. It relies on file:remove() throwing
 					 * an exception if the file is not there.
 					 */
-					file:remove("{wfdir}/status/{jobdir}/{jobid}-success", host=rhost)
+					file:remove("{jobdir}/_swift.success")
 					log(LOG:INFO, "SUCCESS jobid={jobid} - Success file found")
 				)
-				sequential(
-					try (
-						msg = checkErrorFile(rhost, wfdir, jobid, jobdir)
-						sequential (
-							log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
-							throw("No status file was found. Check the shared filesystem on {rhost}")
-						)
-					)
-				)
-				throw(checkErrorFile(rhost, wfdir, jobid, jobdir))
+				throw(checkErrorFile(jobdir, jobid))
 			)
 		)
 		
-		element(checkErrorFile, [wfdir, jobid]
+		element(checkErrorFile, [jobdir, jobid]
 			if (
-				file:exists("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost) then(
+				file:exists("{jobdir}/_swift.error") then(
 					log(LOG:INFO, "FAILURE jobid={jobid} - Failure file found")
-					task:transfer(srchost=rhost, srcdir="{wfdir}/status/{jobdir}", srcfile="{jobid}-error")
-					error := parallel(
-						file:remove("{wfdir}/status/{jobdir}/{jobid}-error", host=rhost)
-						sequential(
-							str:strip(file:read("{jobid}-error"))
-							file:remove("{jobid}-error")
-						)
-					)
+					error := str:strip(file:read("{jobdir}/_swift.error"))
+					file:remove("{jobdir}/_swift.error")
 					error
 				)
 				else (
-					log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Error file missing")
-					throw("No status file was found. Check the shared filesystem on {rhost}")
+					log(LOG:INFO, "NO_STATUS_FILE jobid={jobid} - Both status files are missing")
+					throw("No status file was found")
 				)
 			)
 		)
@@ -116,7 +83,7 @@
 		)
 
 		element(inFiles, [stageins]
-			pathnames(stageins)
+			stageins
 		)
 
 		element(fileDirs, [stageins, stageouts]
@@ -128,42 +95,8 @@
 			)
 		)
 
-		element(createDirSet, [jobid, destdir, host, dirs]
-			/*
-			 * Ideally this would be done by creating a tree of the directories
-			 * to be created and (eventually) exploiting the concurrency in that.
-			 */
-			log(LOG:INFO, "START jobid={jobid} host={host} - Initializing directory structure")
-			for(u, dirs
-				cacheOn(list(u, destdir, host)
-					createdirs(u, destdir, host)
-				)
-			)
-			log(LOG:INFO, "END jobid={jobid} - Done initializing directory structure")
-		)
-
 		element(cleanup, [dir, host]
 			log(LOG:INFO, "START dir={dir} host={host}")
-			cdmfile := cdm:file()
-			log(LOG:DEBUG, "cdmfile {cdmfile}")
-			if(cdmfile != "" &
-                           cdm:get("GATHER_DIR") != "UNSET" then(
-				log(LOG:INFO, "submitting cdm_cleanup.sh to {dir}")
-				task:transfer(srcfile="cdm_cleanup.sh",
-					      srcdir="{swift.home}/libexec",
-					      desthost=host, destdir=dir)
-				task:transfer(srcfile="cdm_lib.sh",
-					      srcdir="{swift.home}/libexec",
-					      desthost=host, destdir=dir)
-			        log(LOG:INFO, "execute: cdm_cleanup.sh")
-				task:execute(
-					     executable="/bin/bash",
-					     arguments=list("{dir}/cdm_cleanup.sh",
-					                     cdm:get("GATHER_DIR"), cdm:get("GATHER_TARGET")
-							     sys:uid() )
-					     host=host, batch=true, tcprofile(host))
-				)
-			)
 			if(vdl:configProperty("sitedir.keep") == "false"
 				task:execute(
 					vdl:siteprofile(host, "swift:cleanupCommand"),
@@ -191,14 +124,6 @@
 			log(LOG:INFO, "END cleanups={cleanup}")
 		)
 
-		element(cleanupFiles, [files, host]
-			uParallelFor(r, files
-				log(LOG:INFO, "Purging ", r, " on ", host)
-				file:remove(r, host=host)
-				vdl:cacheFileRemoved(r, host)
-			)
-		)
-
 		element(stageWrapperParams, [jobid, wrapfile, dir, host]
 			log(LOG:INFO, "START jobid={jobid} - staging in wrapper params")
 			provider := provider(wrapfile)
@@ -243,16 +168,10 @@
 				)
 			)
 		)
-
-		element(fileSizes, [files]
-			math:sum(
-				for(f, files, file:size(file))
-			)
-		)
 		
 		element(readStandardFiles, [jobdir, stdout, stderr]
 			concat(
-				for(f, list(list("stderr.txt", stderr), list("stdout.txt", stdout))
+				for(f, list(list("_swift.stderr", stderr), list("_swift.stdout", stdout))
 					[name, file] := each(f)
 					destfile := "{jobdir}/{file}"
 					nl()
@@ -291,8 +210,8 @@
 
 				wrapfile := "{jobdir}/_paramfile"
 
-				stdout := try(getFieldValue(stdout), "stdout.txt")
-				stderr := try(getFieldValue(stderr), "stderr.txt")
+				stdout := try(getFieldValue(stdout), "_swift.stdout")
+				stderr := try(getFieldValue(stderr), "_swift.stderr")
 				fileDirs := fileDirs(stagein, stageout)
 				os := vdl:siteprofile(rhost, "SYSINFO:OS")
 				
@@ -306,8 +225,8 @@
 							"-err ", stderr, nl(),
 							"-i ", maybe(getFieldValue(stdin)), nl(),
 							"-d ", flatten(each(fileDirs)), nl(),
-							"-if ", flatten(infiles(stagein)), nl(),
-							"-of ", flatten(outfiles(stageout)), nl(),
+							"-if ", flatten(inFiles(stagein)), nl(),
+							"-of ", flatten(outFiles(stageout)), nl(),
 							"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
 							"-sk", SITEDIR_KEEP,
 							"-cdmfile ", cdm:file(), nl(),
@@ -366,8 +285,8 @@
 									"-err", stderr,
 									"-i", maybe(getFieldValue(stdin)),
 									"-d", flatten(each(fileDirs)),
-									"-if", flatten(infiles(stagein)),
-									"-of", flatten(outfiles(stageout)),
+									"-if", flatten(inFiles(stagein)),
+									"-of", flatten(outFiles(stageout)),
 									"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
 									"-sk", SITEDIR_KEEP,
 									"-cdmfile", cdm:file(),
@@ -386,7 +305,7 @@
 
 						vdl:setprogress("Checking status")
 						if(statusMode == "files"
-							checkJobStatus(wfdir, jobid, tr)
+							checkJobStatus(jobdir, jobid, tr)
 						)
 
 						if(wrapperMode == "files"
@@ -412,7 +331,7 @@
 					catch("^(?!Abort$).*"
 						vdl:setprogress("Failed but can retry")
 						prev := exception
-						exception := try(exception(checkErrorFile(rhost, wfdir, jobid)), prev)
+						exception := try(exception(checkErrorFile(jobdir, jobid)), prev)
 						
 						log(LOG:DEBUG, "APPLICATION_EXCEPTION jobid={jobid} - Application exception: ", exception)
 




More information about the Swift-commit mailing list