[Swift-commit] r4437 - in SwiftApps/SwiftR: . Swift/R Swift/exec

tga at ci.uchicago.edu tga at ci.uchicago.edu
Fri Apr 29 17:10:54 CDT 2011


Author: tga
Date: 2011-04-29 17:10:54 -0500 (Fri, 29 Apr 2011)
New Revision: 4437

Modified:
   SwiftApps/SwiftR/IMMEDIATE-TODO
   SwiftApps/SwiftR/Swift/R/Apply.R
   SwiftApps/SwiftR/Swift/R/Export.R
   SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh
   SwiftApps/SwiftR/Swift/exec/SwiftRServer.R
   SwiftApps/SwiftR/Swift/exec/rserver.swift
Log:
Revamping export mechanism so that Swift will stage files: now SwiftR writes out a manifest of exported files adn these are explicit arguments to the remote R invocation.
TODO: investigate whether swift will stage files to same worker multiple times unncessarily.


Modified: SwiftApps/SwiftR/IMMEDIATE-TODO
===================================================================
--- SwiftApps/SwiftR/IMMEDIATE-TODO	2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/IMMEDIATE-TODO	2011-04-29 22:10:54 UTC (rev 4437)
@@ -92,7 +92,9 @@
     * Don't want to have to hardcode change:
 
 MID:
-- More robust mechanism for swiftExport - use swift to stage?
+- More robust mechanism for swiftExport
+    - Write all needed files into request directory (symlink?),
+        then map with swift and use swift to stage?
 - mechanism to copy other files across directly
 
 LOW:

Modified: SwiftApps/SwiftR/Swift/R/Apply.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Apply.R	2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/R/Apply.R	2011-04-29 22:10:54 UTC (rev 4437)
@@ -209,9 +209,17 @@
   options(.swift.requestid=requestid)
   reqdir = file.path(requestdirbase, sprintf("R%.7d",requestid))
   dir.create(reqdir,recursive=TRUE,showWarnings=FALSE,mode=kDIR_MODE)
+  
   return (reqdir)
 }
 
+writeExportList <- function (reqdir, exportFileList) {
+  # Write out a list of exported Rdata files, in text format 
+  # with one file per line.  This is in a file called exports.txt
+  expFile<-file(file.path(reqdir, "exports.txt"))
+  writeLines(as.vector(exportFileList, mode="character"), expFile)
+  close(expFile)
+}
 
 writeRequestBatches <- function (func, arglists, initialexpr, 
                         reqdir, callsperbatch, exportlist=NULL) {
@@ -221,6 +229,7 @@
     exportlist <- newSession()
     options(.swift.session = exportlist)
   }
+  writeExportList(reqdir, exportlist$exports)
   # Write the function call info out to cbatch.?.RData files in reqdir 
   # in batches of size specified by callsperbatch 
   # returns the number of batches written

Modified: SwiftApps/SwiftR/Swift/R/Export.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Export.R	2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/R/Export.R	2011-04-29 22:10:54 UTC (rev 4437)
@@ -113,10 +113,10 @@
 
 getExportDir <- function () {
     d <- getOption("swift.exportdir")
-    if (! is.null(d))   d <- file.path(d, ".swiftr", "exports")
-    
-    # Home directory is a candidate
-    d <- Sys.getenv("HOME")[[1]]
+    if (is.null(d)) {
+        # temp directory
+        return (file.path(tempdir(), "swexports"))
+    }
     if (d == "") {
         return (NULL)
     }

Modified: SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh
===================================================================
--- SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh	2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh	2011-04-29 22:10:54 UTC (rev 4437)
@@ -10,8 +10,8 @@
 umask "077"
 
 #if [ $# != 4 ]; then
-if [ $# != 3 ]; then
-  echo $0: expecting 4 arguments, got $#: $* 1>&2
+if [ $# -lt 4 ]; then
+  echo $0: expecting 4+ arguments, got $#: $* 1>&2
   exit 1
 fi
 
@@ -22,7 +22,18 @@
 RServerScript=$1
 callFile=$2
 resultFile=$3
+shift; shift; shift;
 
+if [ $1 != "--imports" ];
+then
+    echo $0: expected --imports to be provided
+fi
+shift;
+# now $@ contains imports
+imports=$@
+
+
+
 # Find our bin dir (to use for running utility scripts)
 
 SWIFTRBIN=$(cd $(dirname $0); pwd)
@@ -141,7 +152,7 @@
 else
   # wait to make sure fifo exists
   # fromR fifo is created last, so wait for that one
-  while 1; do
+  while true; do 
     if [ -p $SLOTDIR/fromR.fifo ]; then
       break
     fi
@@ -162,7 +173,13 @@
 stop_timeout
 echo DB: Obtained $SLOTDIR/mutex
 
-echo run $(pwd)/$callFile $(pwd)/$resultFile > $SLOTDIR/toR.fifo &
+absimports=
+for im in $imports; do
+    absimports="$absimports $(pwd)/$im"
+done
+echo imports: $imports
+echo absimports: $absimports
+echo run $(pwd)/$callFile $(pwd)/$resultFile $absimports > $SLOTDIR/toR.fifo &
 echopid=$!
 start_timeout $echopid
 

Modified: SwiftApps/SwiftR/Swift/exec/SwiftRServer.R
===================================================================
--- SwiftApps/SwiftR/Swift/exec/SwiftRServer.R	2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/exec/SwiftRServer.R	2011-04-29 22:10:54 UTC (rev 4437)
@@ -29,11 +29,35 @@
       repeat {
         cat("SwiftRServer at top of loop is in dir:",getwd(),"\n");
         setwd(dir) # FIXME: not yet sure what is changing the CWD
+
+        # Read a line from fifo
         infifo <- fifo(inFifoName,open="rb",blocking=TRUE)
-        cmd <- scan(infifo,what=list("","",""),nlines=1,flush=FALSE,quiet=FALSE,fill=TRUE)
+        inline <- readLines(infifo, 1)
+        close(infifo)
+
+        cmd <- unlist(strsplit(inline, " ", fixed=T))
+        cat("cmd:\n")
+        print(cmd)
+        if (length(cmd) < 3) {
+            stop(paste("Error: cmd provided was too short with length ",
+                    length(cmd), ":", cmd))
+        }
         op = cmd[[1]]
         callBatchFileName = cmd[[2]]
         resultBatchFileName = cmd[[3]]
+        # NOTE: there has to be a correspondence between
+        # the importFileNames received on the pipe and the
+        # list of import file names in the call object.
+        # They correspond 1:1.  The name in the call object
+        # is the unique filepath on the client's system, which
+        # we use to determine if an import file has been
+        # already imported.  The name received on the pipe
+        # is the name in the local file system.
+        if (length(cmd) > 3) {
+            importFileNames = cmd[4:length(cmd)]
+        } else {
+            importFileNames = NULL
+        }
         cat("DB: cmd: op=",op," call batch=",callBatchFileName," result batch=",resultBatchFileName,"\n");
         if(is.null(op)) {
           cat("op is NULL\n")
@@ -48,13 +72,15 @@
           }
           if( identical(op,"run")) {
             cat("DB: About to run batch file: ", callBatchFileName,"\n");
-            runBatch(callBatchFileName,resultBatchFileName)
+            runBatch(callBatchFileName,resultBatchFileName, importFileNames)
             outfifo <- fifo(outFifoName,open="wb",blocking=TRUE)
             cat(file=outfifo, "Batch completed: result batch file: ", resultBatchFileName,"\n");
           }
+          else {
+            cat("DB: got op [", op, "]\n")
+          }
         }
-        close(infifo)
-        close(outfifo)
+        try(close(outfifo))
       }
     }
 
@@ -64,7 +90,7 @@
       outfifo <- fifo(outFifoName,open="wb",blocking=TRUE)
       cat(paste("ERROR: R server failed with error", gsub("\n", " ", error)), "\n")
       cat(file=outfifo, paste("ERROR: R server failed with error", gsub("\n", " ", error)), "\n")
-      close(outfifo)
+      try(close(outfifo))
     }
 
     doInit <- function(initializer) {
@@ -90,7 +116,7 @@
         save(result,file=resultBatchFileName)
     }
 
-    runBatch <- function( callBatchFileName, resultBatchFileName )
+    runBatch <- function( callBatchFileName, resultBatchFileName, importFileNames )
     {
       # Load contents into local environment
       success <- try(load(callBatchFileName, envir=environment()));
@@ -98,8 +124,9 @@
         stop(paste(callBatchFileName, "could not be opened"))
         return()
       }
-      success <- try(loadImports(rcall$imports))
+      success <- try(loadImports(rcall$imports, importFileNames))
       if (inherits(success, "try-error")) {
+        cat("import failed!!: ", success)
         failBatch(rcall, success, resultBatchFileName)
         return()
       }
@@ -139,7 +166,7 @@
         cat("DB: reset env\n")
     }
 
-    loadImports <- function (importlist) {
+    loadImports <- function (importlist, importFileNames) {
         # First check whether the session id has changed
         #cat(paste("Import list:", importlist$exports, "\n"), file=stderr())
         #cat(paste("New Session:", importlist$session, "\n"), file=stderr())
@@ -158,24 +185,35 @@
         if (doSetup) {
             newSession(importlist$session)
         }
+        # Co-iterate over the unique names and 
+        # if length doesn't match, something has gone wrong
+        numImports <- length(importlist$exports)
+        if (numImports != length(importFileNames)) {
+            stop(paste("Number of import file names provided on command line (",
+                length(importFileNames),") does not match number in call object (",
+                numImports, ").  ", importFileNames, " vs ", importlist$exports))
+        }
+        if (numImports > 0) {
+            for (i in 1:numImports) {
+                # Load the contents of the specified file
+                # into the global environment
+                uniqueId <- importlist$exports[[i]]
+                localFile <- importFileNames[[i]]
 
-        for (file in importlist$exports) {
-            # Load the contents of the specified file
-            # into the global environment
-
-            #cat("File: ", file, "\n")
-            # check to see if already imported
-            if (!exists(file, envir=.current.imported)) {
-                #TODO: load can fail with warning
-                load(file, envir=globalenv()) 
-                # if an error occurs here, assume calling function
-                # will catch it
-                assign(file, TRUE, envir=.current.imported)
-                cat("Loaded file ", file, "\n")
+                #cat("File: ", file, "\n")
+                # check to see if already imported
+                if (!exists(uniqueId, envir=.current.imported)) {
+                    #TODO: load can fail with warning
+                    load(localFile, envir=globalenv()) 
+                    # if an error occurs here, assume calling function
+                    # will catch it
+                    assign(uniqueId, TRUE, envir=.current.imported)
+                    cat("Loaded file ", localFile, " with unique path ", uniqueId, "\n")
+                }
+                else {
+                    #cat("Ignored file ", file, "\n")
+                }
             }
-            else {
-                #cat("Ignored file ", file, "\n")
-            }
         }
     }
 

Modified: SwiftApps/SwiftR/Swift/exec/rserver.swift
===================================================================
--- SwiftApps/SwiftR/Swift/exec/rserver.swift	2011-04-29 18:44:55 UTC (rev 4436)
+++ SwiftApps/SwiftR/Swift/exec/rserver.swift	2011-04-29 22:10:54 UTC (rev 4437)
@@ -8,9 +8,10 @@
 #        replace ack with ftracef to result pipe
 #        condense shellscript and rserverscript to one?
 
-app (external e, RData result, file stout, file sterr) runR (file shellscript, file RServerScript, RData rcall)
+app (external e, RData result, file stout, file sterr) runR (file shellscript, file RServerScript, RData rcall, RData exports[])
 {
-  bash @shellscript @RServerScript @rcall @result stdout=@stout stderr=@sterr;
+  bash @shellscript @RServerScript @rcall @result "--imports" @exports 
+            stdout=@stout stderr=@sterr;
 }
 
 app ack (external e[])
@@ -30,12 +31,16 @@
   RData results[] <simple_mapper; location=runDir, prefix="rbatch.", suffix=".Rdata", padding=0>;
   file  stout[]   <simple_mapper; location=runDir, prefix="stdout.", suffix=".txt", padding=0>;
   file  sterr[]   <simple_mapper; location=runDir, prefix="stderr.", suffix=".txt", padding=0>;
- 
+
+  # Load list of exported files
+  string export_list[] = readData(@strcat(runDir, "/exports.txt"));
+  RData exports[] <array_mapper; files=export_list>;
+
   file  runRscript <"EvalRBatchPersistent.sh">;
   file  rsScript   <"SwiftRServer.R">;
 
   foreach c, i in rcalls {
-    (e[i], results[i],stout[i], sterr[i]) = runR(runRscript,rsScript,c);
+    (e[i], results[i],stout[i], sterr[i]) = runR(runRscript,rsScript,c, exports);
   }
 }
 




More information about the Swift-commit mailing list