[Swift-commit] r4437 - in SwiftApps/SwiftR: . Swift/R Swift/exec
tga at ci.uchicago.edu
tga at ci.uchicago.edu
Fri Apr 29 17:10:54 CDT 2011
Author: tga
Date: 2011-04-29 17:10:54 -0500 (Fri, 29 Apr 2011)
New Revision: 4437
Modified:
SwiftApps/SwiftR/IMMEDIATE-TODO
SwiftApps/SwiftR/Swift/R/Apply.R
SwiftApps/SwiftR/Swift/R/Export.R
SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh
SwiftApps/SwiftR/Swift/exec/SwiftRServer.R
SwiftApps/SwiftR/Swift/exec/rserver.swift
Log:
Revamping export mechanism so that Swift will stage files: now SwiftR writes out a manifest of exported files adn these are explicit arguments to the remote R invocation.
TODO: investigate whether swift will stage files to same worker multiple times unncessarily.
Modified: SwiftApps/SwiftR/IMMEDIATE-TODO
===================================================================
--- SwiftApps/SwiftR/IMMEDIATE-TODO 2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/IMMEDIATE-TODO 2011-04-29 22:10:54 UTC (rev 4437)
@@ -92,7 +92,9 @@
* Don't want to have to hardcode change:
MID:
-- More robust mechanism for swiftExport - use swift to stage?
+- More robust mechanism for swiftExport
+ - Write all needed files into request directory (symlink?),
+ then map with swift and use swift to stage?
- mechanism to copy other files across directly
LOW:
Modified: SwiftApps/SwiftR/Swift/R/Apply.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Apply.R 2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/R/Apply.R 2011-04-29 22:10:54 UTC (rev 4437)
@@ -209,9 +209,17 @@
options(.swift.requestid=requestid)
reqdir = file.path(requestdirbase, sprintf("R%.7d",requestid))
dir.create(reqdir,recursive=TRUE,showWarnings=FALSE,mode=kDIR_MODE)
+
return (reqdir)
}
+writeExportList <- function (reqdir, exportFileList) {
+ # Write out a list of exported Rdata files, in text format
+ # with one file per line. This is in a file called exports.txt
+ expFile<-file(file.path(reqdir, "exports.txt"))
+ writeLines(as.vector(exportFileList, mode="character"), expFile)
+ close(expFile)
+}
writeRequestBatches <- function (func, arglists, initialexpr,
reqdir, callsperbatch, exportlist=NULL) {
@@ -221,6 +229,7 @@
exportlist <- newSession()
options(.swift.session = exportlist)
}
+ writeExportList(reqdir, exportlist$exports)
# Write the function call info out to cbatch.?.RData files in reqdir
# in batches of size specified by callsperbatch
# returns the number of batches written
Modified: SwiftApps/SwiftR/Swift/R/Export.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Export.R 2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/R/Export.R 2011-04-29 22:10:54 UTC (rev 4437)
@@ -113,10 +113,10 @@
getExportDir <- function () {
d <- getOption("swift.exportdir")
- if (! is.null(d)) d <- file.path(d, ".swiftr", "exports")
-
- # Home directory is a candidate
- d <- Sys.getenv("HOME")[[1]]
+ if (is.null(d)) {
+ # temp directory
+ return (file.path(tempdir(), "swexports"))
+ }
if (d == "") {
return (NULL)
}
Modified: SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh
===================================================================
--- SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh 2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh 2011-04-29 22:10:54 UTC (rev 4437)
@@ -10,8 +10,8 @@
umask "077"
#if [ $# != 4 ]; then
-if [ $# != 3 ]; then
- echo $0: expecting 4 arguments, got $#: $* 1>&2
+if [ $# -lt 4 ]; then
+ echo $0: expecting 4+ arguments, got $#: $* 1>&2
exit 1
fi
@@ -22,7 +22,18 @@
RServerScript=$1
callFile=$2
resultFile=$3
+shift; shift; shift;
+if [ $1 != "--imports" ];
+then
+ echo $0: expected --imports to be provided
+fi
+shift;
+# now $@ contains imports
+imports=$@
+
+
+
# Find our bin dir (to use for running utility scripts)
SWIFTRBIN=$(cd $(dirname $0); pwd)
@@ -141,7 +152,7 @@
else
# wait to make sure fifo exists
# fromR fifo is created last, so wait for that one
- while 1; do
+ while true; do
if [ -p $SLOTDIR/fromR.fifo ]; then
break
fi
@@ -162,7 +173,13 @@
stop_timeout
echo DB: Obtained $SLOTDIR/mutex
-echo run $(pwd)/$callFile $(pwd)/$resultFile > $SLOTDIR/toR.fifo &
+absimports=
+for im in $imports; do
+ absimports="$absimports $(pwd)/$im"
+done
+echo imports: $imports
+echo absimports: $absimports
+echo run $(pwd)/$callFile $(pwd)/$resultFile $absimports > $SLOTDIR/toR.fifo &
echopid=$!
start_timeout $echopid
Modified: SwiftApps/SwiftR/Swift/exec/SwiftRServer.R
===================================================================
--- SwiftApps/SwiftR/Swift/exec/SwiftRServer.R 2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/exec/SwiftRServer.R 2011-04-29 22:10:54 UTC (rev 4437)
@@ -29,11 +29,35 @@
repeat {
cat("SwiftRServer at top of loop is in dir:",getwd(),"\n");
setwd(dir) # FIXME: not yet sure what is changing the CWD
+
+ # Read a line from fifo
infifo <- fifo(inFifoName,open="rb",blocking=TRUE)
- cmd <- scan(infifo,what=list("","",""),nlines=1,flush=FALSE,quiet=FALSE,fill=TRUE)
+ inline <- readLines(infifo, 1)
+ close(infifo)
+
+ cmd <- unlist(strsplit(inline, " ", fixed=T))
+ cat("cmd:\n")
+ print(cmd)
+ if (length(cmd) < 3) {
+ stop(paste("Error: cmd provided was too short with length ",
+ length(cmd), ":", cmd))
+ }
op = cmd[[1]]
callBatchFileName = cmd[[2]]
resultBatchFileName = cmd[[3]]
+ # NOTE: there has to be a correspondence between
+ # the importFileNames received on the pipe and the
+ # list of import file names in the call object.
+ # They correspond 1:1. The name in the call object
+ # is the unique filepath on the client's system, which
+ # we use to determine if an import file has been
+ # already imported. The name received on the pipe
+ # is the name in the local file system.
+ if (length(cmd) > 3) {
+ importFileNames = cmd[4:length(cmd)]
+ } else {
+ importFileNames = NULL
+ }
cat("DB: cmd: op=",op," call batch=",callBatchFileName," result batch=",resultBatchFileName,"\n");
if(is.null(op)) {
cat("op is NULL\n")
@@ -48,13 +72,15 @@
}
if( identical(op,"run")) {
cat("DB: About to run batch file: ", callBatchFileName,"\n");
- runBatch(callBatchFileName,resultBatchFileName)
+ runBatch(callBatchFileName,resultBatchFileName, importFileNames)
outfifo <- fifo(outFifoName,open="wb",blocking=TRUE)
cat(file=outfifo, "Batch completed: result batch file: ", resultBatchFileName,"\n");
}
+ else {
+ cat("DB: got op [", op, "]\n")
+ }
}
- close(infifo)
- close(outfifo)
+ try(close(outfifo))
}
}
@@ -64,7 +90,7 @@
outfifo <- fifo(outFifoName,open="wb",blocking=TRUE)
cat(paste("ERROR: R server failed with error", gsub("\n", " ", error)), "\n")
cat(file=outfifo, paste("ERROR: R server failed with error", gsub("\n", " ", error)), "\n")
- close(outfifo)
+ try(close(outfifo))
}
doInit <- function(initializer) {
@@ -90,7 +116,7 @@
save(result,file=resultBatchFileName)
}
- runBatch <- function( callBatchFileName, resultBatchFileName )
+ runBatch <- function( callBatchFileName, resultBatchFileName, importFileNames )
{
# Load contents into local environment
success <- try(load(callBatchFileName, envir=environment()));
@@ -98,8 +124,9 @@
stop(paste(callBatchFileName, "could not be opened"))
return()
}
- success <- try(loadImports(rcall$imports))
+ success <- try(loadImports(rcall$imports, importFileNames))
if (inherits(success, "try-error")) {
+ cat("import failed!!: ", success)
failBatch(rcall, success, resultBatchFileName)
return()
}
@@ -139,7 +166,7 @@
cat("DB: reset env\n")
}
- loadImports <- function (importlist) {
+ loadImports <- function (importlist, importFileNames) {
# First check whether the session id has changed
#cat(paste("Import list:", importlist$exports, "\n"), file=stderr())
#cat(paste("New Session:", importlist$session, "\n"), file=stderr())
@@ -158,24 +185,35 @@
if (doSetup) {
newSession(importlist$session)
}
+ # Co-iterate over the unique names and
+ # if length doesn't match, something has gone wrong
+ numImports <- length(importlist$exports)
+ if (numImports != length(importFileNames)) {
+ stop(paste("Number of import file names provided on command line (",
+ length(importFileNames),") does not match number in call object (",
+ numImports, "). ", importFileNames, " vs ", importlist$exports))
+ }
+ if (numImports > 0) {
+ for (i in 1:numImports) {
+ # Load the contents of the specified file
+ # into the global environment
+ uniqueId <- importlist$exports[[i]]
+ localFile <- importFileNames[[i]]
- for (file in importlist$exports) {
- # Load the contents of the specified file
- # into the global environment
-
- #cat("File: ", file, "\n")
- # check to see if already imported
- if (!exists(file, envir=.current.imported)) {
- #TODO: load can fail with warning
- load(file, envir=globalenv())
- # if an error occurs here, assume calling function
- # will catch it
- assign(file, TRUE, envir=.current.imported)
- cat("Loaded file ", file, "\n")
+ #cat("File: ", file, "\n")
+ # check to see if already imported
+ if (!exists(uniqueId, envir=.current.imported)) {
+ #TODO: load can fail with warning
+ load(localFile, envir=globalenv())
+ # if an error occurs here, assume calling function
+ # will catch it
+ assign(uniqueId, TRUE, envir=.current.imported)
+ cat("Loaded file ", localFile, " with unique path ", uniqueId, "\n")
+ }
+ else {
+ #cat("Ignored file ", file, "\n")
+ }
}
- else {
- #cat("Ignored file ", file, "\n")
- }
}
}
Modified: SwiftApps/SwiftR/Swift/exec/rserver.swift
===================================================================
--- SwiftApps/SwiftR/Swift/exec/rserver.swift 2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/exec/rserver.swift 2011-04-29 22:10:54 UTC (rev 4437)
@@ -8,9 +8,10 @@
# replace ack with ftracef to result pipe
# condense shellscript and rserverscript to one?
-app (external e, RData result, file stout, file sterr) runR (file shellscript, file RServerScript, RData rcall)
+app (external e, RData result, file stout, file sterr) runR (file shellscript, file RServerScript, RData rcall, RData exports[])
{
- bash @shellscript @RServerScript @rcall @result stdout=@stout stderr=@sterr;
+ bash @shellscript @RServerScript @rcall @result "--imports" @exports
+ stdout=@stout stderr=@sterr;
}
app ack (external e[])
@@ -30,12 +31,16 @@
RData results[] <simple_mapper; location=runDir, prefix="rbatch.", suffix=".Rdata", padding=0>;
file stout[] <simple_mapper; location=runDir, prefix="stdout.", suffix=".txt", padding=0>;
file sterr[] <simple_mapper; location=runDir, prefix="stderr.", suffix=".txt", padding=0>;
-
+
+ # Load list of exported files
+ string export_list[] = readData(@strcat(runDir, "/exports.txt"));
+ RData exports[] <array_mapper; files=export_list>;
+
file runRscript <"EvalRBatchPersistent.sh">;
file rsScript <"SwiftRServer.R">;
foreach c, i in rcalls {
- (e[i], results[i],stout[i], sterr[i]) = runR(runRscript,rsScript,c);
+ (e[i], results[i],stout[i], sterr[i]) = runR(runRscript,rsScript,c, exports);
}
}
More information about the Swift-commit
mailing list