[Swift-commit] r4057 - SwiftApps/SwiftR/Swift/R

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Thu Feb 3 15:40:13 CST 2011


Author: tga
Date: 2011-02-03 15:40:12 -0600 (Thu, 03 Feb 2011)
New Revision: 4057

Modified:
   SwiftApps/SwiftR/Swift/R/Apply.R
   SwiftApps/SwiftR/Swift/R/Export.R
   SwiftApps/SwiftR/Swift/R/Library.R
Log:
Further refactoring of Apply module to make swiftapply less monolithic and more readable.


Modified: SwiftApps/SwiftR/Swift/R/Apply.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Apply.R	2011-02-03 20:19:49 UTC (rev 4056)
+++ SwiftApps/SwiftR/Swift/R/Apply.R	2011-02-03 21:40:12 UTC (rev 4057)
@@ -1,7 +1,11 @@
-# Modes for directories and files created by SwiftR
+
+
+# The Modes for directories and files created by SwiftR
 # We only give permissions to the current user.
 kDIR_MODE <- "700"
 kFILE_MODE <- "600"
+swiftapplyScript <- system.file(package="Swift","exec/swiftapply.swift")
+RunSwiftScript <- system.file(package="Swift","exec/RunSwiftScript.sh")
 
 swiftapply <- function( func, arglists,
                         server=NULL,
@@ -15,110 +19,35 @@
                         quiet=FALSE)
 {
   # Set Swift default options if not passed as keywords or pre-set by user
-  if(is.null(server))
-    server <- getOption("swift.server")
-  if(is.null(server))
-    server<-"local"
+  if(is.null(server)) server <- getOption("swift.server")
+  if(is.null(server)) server<-"local"
 
-  if(is.null(callsperbatch))
-    callsperbatch <- getOption("swift.callsperbatch")
+  if(is.null(callsperbatch)) callsperbatch <- getOption("swift.callsperbatch")
+  if(is.null(callsperbatch)) 
+    callsperbatch <- chooseBatchSize(length(arglists), server)
 
-  if(is.null(callsperbatch)) {
-    # auto batching:
-    # start with 1 batch per core
-    wc <- workerCount(server)
-    if (is.null(wc)) {
-        if (! quiet ) 
-            cat(paste("Information about current server of type", 
-                    server, "not found"))
-        callsperbatch <- 1
-    }
-    else {
-        batches_per_worker <- getOption("swift.batchesperworker")
-        if (is.null(batches_per_worker)) 
-            batches_per_worker <- 1
-        target_batches <- wc * batches_per_worker
-        # We want to have at least batches_per_worker batches of calls
-        # for each worker.  If it doesn't divide evenly, some will get
-        # batches_per_worker + 1 batches.  
-        # callsperbatch < 1 doesn't make sense, hence the use of max
-        callsperbatch = max(1, floor(length(arglists)/target_batches))
-#        if (! quiet ) 
-#            cat(paste("Selected callsperbatch of", callsperbatch, 
-#                    "automatically, based on worker count of",wc,
-#                    "and arg count of ", length(arglists)))
-    }
-  }
-  #cat("Got ", length(arglists), " arguments with batching factor ", callsperbatch)
+  # service: send requests to Swift service loop via fifo
+  # script:  run swift for each request, via RunSwiftScript.sh (broken)
+  # manual:  for testing, let user run remote R server manually
+  if(is.null(runmode)) runmode <- getOption("swift.runmode")
+  if(is.null(runmode)) runmode <- "service"
 
-  if(is.null(runmode))
-    runmode <- getOption("swift.runmode")
-  if(is.null(runmode))
-    runmode <- "service"
-    # service: send requests to Swift service loop via fifo
-    # script:  run swift for each request, via RunSwiftScript.sh (currently broken)
-    # manual:  for testing, let user run remote R server manually
-    
-  # Build a list of "Library" statements for all of the libraries
-  # already specified through swiftLibrary commands.
-  lib <- getOption(".swift.packages")
-  if (!is.null(lib)) {
-    # library statements
-    stmts <- lapply(lib, 
-                function (libcmd) { 
-                    verb <- libcmd[1];
-                    if (verb == "library") {
-                        return (sprintf("library(%s);", libcmd[2]));
-                    }
-                    else if (verb == "detach") {
-                        return (sprintf("try(detach(package:%s),silent=T);", 
-                                    libcmd[2]));
-                    }
-                    else {
-                        error(paste("invalid verb ", verb, " in ",
-                                ".swift.packages option"))
-                    }
-                })
-    libstr = paste(stmts, collapse=" ")
-    cat("libstr: ", libstr)
-  }
-  else {
-    libstr = ""
-  }
-    
-
-  if(is.null(initialexpr))
-    initialexpr <- getOption("swift.initialexpr")
+  if(is.null(initialexpr)) initialexpr <- getOption("swift.initialexpr")
   # Have library imports before other expressions
-  initialexpr <- paste(libstr, initialexpr, collapse=" ");
+  initialexpr <- paste(buildLibStr(), initialexpr, collapse=" ");
 
-  if(is.null(workerhosts))
-    workerhosts <- getOption("swift.workerhosts")
-  if(is.null(workerhosts))
-    workerhosts <- "localhost";
+  if(is.null(workerhosts)) workerhosts <- getOption("swift.workerhosts")
+  if(is.null(workerhosts)) workerhosts <- "localhost";
 
-  if(is.null(keepwork))
-    keepwork <- getOption("swift.keepwork")
-  if(is.null(keepwork))
-    keepwork <- FALSE;
+  if(is.null(keepwork)) keepwork <- getOption("swift.keepwork")
+  if(is.null(keepwork)) keepwork <- FALSE;
 
-  if(is.null(tmpdir))
-    tmpdir <- getOption("swift.tmpdir")
-  if(is.null(tmpdir))
-    tmpdir <- Sys.getenv("SWIFTR_TMP");
+  if(is.null(tmpdir)) tmpdir <- chooseTmpDir()
 
-  if(tmpdir=="") {
-    if(is.null(tmpdir))
-      tmpdir <- getOption("swift.tmpdir")
-    tmpdir <- "/tmp";
-  }
+  if(is.null(quiet)) quiet <- getOption("swift.quiet")
+  if(is.null(quiet)) quiet <- FALSE
 
-  if(is.null(quiet)) {
-    quiet <- getOption("swift.quiet")
-    if(is.null(quiet)) 
-        quiet <- FALSE
-  }
-
+  # Inform user of some call info
   if (! quiet) {
       cat("\nswiftapply to ", length(arglists), " arg lists.\n")
       cat("\nSwift properties:\n")
@@ -132,15 +61,14 @@
 
   # Execute the calls in batches
   reqdir <- setupRequestDir(tmpdir=tmpdir)
-  if (! quiet) {
-      cat("Swift request is in",reqdir,"\n")
-  }  
-
   nbatches <- writeRequestBatches(func, arglists, initialexpr, 
                 reqdir, callsperbatch)
 
-  swiftapplyScript <- system.file(package="Swift","exec/swiftapply.swift")
-  if( runmode == "manual" ) { # Prompt for return (empty line) to continue; assumes user ran a manual R to process the call.
+  if (! quiet) cat("Swift request written to: ",reqdir,"\n")
+
+  if( runmode == "manual" ) { 
+    # Prompt for return (empty line) to continue; assumes user ran a 
+    # manual R to process the call.
     cat("Manual Swift Run:\n  run dir: ", getwd(), "/", reqdir,"\n\n")
     cat("  swift script: ", RunSwiftScript, "\n")
     cat("  server: ", server,"\n")
@@ -150,11 +78,10 @@
     readLines(n=1)
   }
   else if (runmode == "script") {
-    RunSwiftScript <- system.file(package="Swift","exec/RunSwiftScript.sh")
-    system(paste(RunSwiftScript,reqdir,server,swiftapplyScript,"\"",workerhosts,"\""))
+    system(paste(RunSwiftScript,reqdir,server,swiftapplyScript,
+                shQuote(workerhosts)))
   }
-  else { # runmode == "service" # FIXME: check and post error if not "service"
-
+  else if (runmode == "service") { 
     # Send request to service
     user <- Sys.info()[["user"]]
     swiftServerDir = file.path(tmpdir,user,"SwiftR", 
@@ -162,74 +89,24 @@
 
     requestPipeName=file.path(swiftServerDir,"requestpipe")
     resultPipeName=file.path(swiftServerDir,"resultpipe")
-
-    # fifo will block irrecoverably if there is no reader on the
-    # other end of the requestPipe.  This is bad.  The swift worker
-    # script is responsible for deleting the request pipe when it
-    # shuts down, so we know if the requestPipe still exists there
-    # should still be a worker (or the worker crashed in a funny way).
-    if (file.exists(requestPipeName)) {
-        # there is a race condition here if the fifo disappears in
-        # between checking for existence and opening the fifo, but
-        # the timeout will catch that unlikely case
-        writeTimeout <- 4000
-        success <- writeFifo(requestPipeName,paste(reqdir,"\n",sep=""), 
-                timeout=writeTimeout)
-        if (! success) {
-            stop(paste("timeout of", writeTimeout, 
-                "ms exceeded when attempting to",
-                "rendezvous with swift server: maybe it is not running or",
-                "it has crashed"))
-        }
-
-        # Wait for reply from service
-        res <- readFifo(resultPipeName, timeout=timeout)
-        if ((! length(res) == 0) && is.na(res)) {
-            stop(paste("Timeout of ", timeout, "ms exceeded when waiting",
-                "for response from swift server"))
-        }
-
-        # Check that the message was correct
-        successMsg <- "done"
-        if (res[[1]] != successMsg) 
-            stop(paste("Got unexpected message '", 
-                paste(res, collapse="\n"),"' on fifo ",
-                "aborting job", sep=""))
+    # Try sending: this function will cause error if it fails
+    sendServiceRequest(requestPipeName, reqdir, server)
+    
+    res <- getServiceResponse(resultPipeName, timeout)
+    # Check that the message was correct
+    if (res[[1]] != "done") {
+      stop(paste("Got unexpected message '", 
+            paste(res, collapse="\n"),"' on fifo ",
+            "aborting job", sep=""))
     }
-    else {
-        stop(paste("Have you run swiftInit?\n",
-                "It appears that no SwiftR servers of type", server, 
-                "are running, as no request pipe exists in", 
-                swiftServerDir))
-    }
   }
+  else {
+    stop(paste("Invalid runmode", runmode))
+  }
 
   # Fetch the batch results
-
-  rno <- 1
-  rlist <- list()
-  for(batch in 1:nbatches) {
-    result <- NULL
-    load(file.path(reqdir,
-            paste("/rbatch.",as.character(batch),".Rdata",sep="")))
-    nresults <- length(result)
-    for(r in 1:nresults) {
-      rlist[[rno]] <- result[[r]]
-if(class(result[[r]]) == "try-error") {
-cat("ERROR in eval: ", result[[r]], "\n");
+  return(fetchBatchResults(reqdir, nbatches, arglists, keepwork))
 }
-      #DB cat("swiftapply: result rno=",rno,":\n") # FIXME: for logging
-      #DB cat(rlist[[rno]]@output$gradient,"\n")
-      rno <- rno + 1
-    }
-  }
-  names(rlist) = names(arglists)
-  if( ! keepwork ) {
-    cat("Removing ", reqdir, "\n")
-    unlink(reqdir,recursive=TRUE)
-  }
-  return(rlist)
-}
 
 swiftLapply <- function( tlist, func, ... )
 {
@@ -299,3 +176,111 @@
   }
   return (batch - 1)
 }
+
+fetchBatchResults <- function (reqdir, nbatches, arglists, keepwork) {
+  rno <- 1
+  rlist <- list()
+  for(batch in 1:nbatches) {
+    # The result in the file will be named "result"
+    result <- NULL
+    load(file.path(reqdir,
+            paste("rbatch.",as.character(batch),".Rdata",sep="")))
+    nresults <- length(result)
+    for(r in 1:nresults) {
+      rlist[[rno]] <- result[[r]]
+      if(inherits(result[[r]], "try-error")) {
+        cat("ERROR in eval: ", result[[r]], "\n");
+      }
+      #DB cat("swiftapply: result rno=",rno,":\n") # FIXME: for logging
+      #DB cat(rlist[[rno]]@output$gradient,"\n")
+      rno <- rno + 1
+    }
+  }
+  names(rlist) = names(arglists)
+  if( ! keepwork ) {
+    cat("Removing ", reqdir, "\n")
+    unlink(reqdir,recursive=TRUE)
+  }
+  return(rlist)
+}
+
+chooseBatchSize <- function (numargs, server) {
+    # Automatic selection of worker count
+    # start with 1 batch per core
+    wc <- workerCount(server)
+    if (is.null(wc)) {
+        if (! quiet ) 
+            cat(paste("Information about current server of type", 
+                    server, "not found"))
+        callsperbatch <- 1
+    }
+    else {
+        batches_per_worker <- getOption("swift.batchesperworker")
+        if (is.null(batches_per_worker)) 
+            batches_per_worker <- 1
+        target_batches <- wc * batches_per_worker
+        # We want to have at least batches_per_worker batches of calls
+        # for each worker.  If it doesn't divide evenly, some will get
+        # batches_per_worker + 1 batches.  
+        # callsperbatch < 1 doesn't make sense, hence the use of max
+        callsperbatch = max(1, floor(numargs/target_batches))
+    }
+}
+
+chooseTmpDir <- function () {
+  # Choose temporary directory based on global settings
+  # with several fallbacks
+  tmpdir <- getOption("swift.tmpdir")
+  if(is.null(tmpdir))
+    tmpdir <- Sys.getenv("SWIFTR_TMP");
+
+  if(tmpdir=="") {
+    tmpdir <- "/tmp";
+  }
+}
+
+sendServiceRequest <- function (requestPipeName, reqdir, server=NULL) {
+    # fifo will block irrecoverably if there is no reader on the
+    # other end of the requestPipe.  This is bad.  The swift worker
+    # script is responsible for deleting the request pipe when it
+    # shuts down, so we know if the requestPipe still exists there
+    # should still be a worker (or the worker crashed in a funny way).
+    if (file.exists(requestPipeName)) {
+        # there is a race condition here if the fifo disappears in
+        # between checking for existence and opening the fifo, but
+        # the timeout will catch that unlikely case
+        writeTimeout <- 4000
+        success <- writeFifo(requestPipeName,paste(reqdir,"\n",sep=""), 
+                timeout=writeTimeout)
+        if (! success) {
+            stop(paste("timeout of", writeTimeout, 
+                "ms exceeded when attempting to",
+                "rendezvous with  SwiftR server of type", server, ".\n",
+                "Maybe it is not running or it has crashed"))
+        }
+    }
+    else {
+        stop(paste("Have you run swiftInit?\n",
+                "It appears that no SwiftR servers of type", server, 
+                "are running, as no request pipe exists at", 
+                requestPipeName))
+    }
+
+}
+
+getServiceResponse <- function (resultPipeName, timeout) {
+    # Blocking wait for response from service on a pipe
+    # returns the string response. Raises error if timeout occurs.
+
+    # Wait for reply from service
+    res <- readFifo(resultPipeName, timeout=timeout)
+    if (is.na(res)) {
+      stop(paste("Timeout of ", timeout, "ms exceeded when waiting",
+            "for response from swift server"))
+    }
+    if (length(res) == 0) {
+      stop(paste("Zero length response on named pipe ", resultPipeName))
+    }
+    return (res)
+}
+

Modified: SwiftApps/SwiftR/Swift/R/Export.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Export.R	2011-02-03 20:19:49 UTC (rev 4056)
+++ SwiftApps/SwiftR/Swift/R/Export.R	2011-02-03 21:40:12 UTC (rev 4057)
@@ -1,4 +1,44 @@
-
+# EXPORT MECHANISM:
+#==================
+# We want the protocol to support workers coming and going from the 
+#  cluster.  We also want to be robust to worker failures.  This
+# means that we can't rely on state being retained at workers.  
+# I.e. If a new worker is added, we need it to receive all of the
+# relevant exported data.  However, for the purposes of efficiency,
+# we don't want workers to have to repeatedly reload data they have
+# already loaded.
+#
+# In order to achieve this, we will have the workers perform imports
+# at apply time.  The worker will receive information as a part of the
+#
+#
+#
+#
+# When swiftExport() is called:
+# * Save all of the objects to a file 
+#    - Q:Can we assume its on a shared filesystem?
+# * MAYBE: put the file somewhere accessible by all workers
+# * append the file's path to a stored list of files
+# * MAYBE LATER: asynchronously start swift tasks to distribute
+#   the data, to get a headstart on the work
+# When swiftApply() is called:
+#   - Inform all worker R processes through some mechanism (TBD)
+#       of the list of all exported data files. (extra file?)
+#   - The worker goes through the list in order to ensure that name 
+#       conflicts are resolved in the way you would expect.  
+#       If it has loaded a file previously, it does nothing.
+#       If it hasn't loaded the file, it accesses the file (mechanism TBD)
+#       and loads it into the global namespace.
+#       data 
+# 
+# swiftRemoveAll():
+#   delete the list of exported files from above and have a single
+#   entry which instructs workers to delete all their data.  This is
+#   mainly important because it eliminates possibility of using huge
+#   amounts of memory.
+#  swiftRemove()
+#   Implementation of this function is somewhat problematic, as it only
+#   partially undoes previous work
 swiftRemoveAll <- function () {
     # Cleans up all data in global namespace on workers
     options(.swift.exports=list(c("removeAll")))
@@ -7,6 +47,7 @@
 
 
 swiftExportAll <- function () {
+    # Exports all functions and data in global environment
     return (swiftExport(list=ls(globalenv())))
 }
 
@@ -15,40 +56,6 @@
     # List of object names (as R symbols or as strings)
     # These will be passed directly to save() to be serialized
     
-    #TODO:
-    # We want the protocol to support workers coming and going from the 
-    #  cluster.  We want to rely on less implicit state.  Ie. if we run
-    # swiftExport, then a worker is added to cluster, then an apply call
-    # is run, we want the worker to include the exported objects.
-    # One approach to doing this is to enforce the export at apply
-    # time.
-    #
-    #
-    # When swiftExport() is called:
-    # * Save all of the objects to a file 
-    #    - Q:Can we assume its on a shared filesystem?
-    # * MAYBE: put the file somewhere accessible by all workers
-    # * append the file's path to a stored list of files
-    # * MAYBE LATER: asynchronously start swift tasks to distribute
-    #   the data, to get a headstart on the work
-    # When swiftApply() is called:
-    #   - Inform all worker R processes through some mechanism (TBD)
-    #       of the list of all exported data files. (extra file?)
-    #   - The worker goes through the list in order to ensure that name 
-    #       conflicts are resolved in the way you would expect.  
-    #       If it has loaded a file previously, it does nothing.
-    #       If it hasn't loaded the file, it accesses the file (mechanism TBD)
-    #       and loads it into the global namespace.
-    #       data 
-    # 
-    # swiftRemoveAll():
-    #   delete the list of exported files from above and have a single
-    #   entry which instructs workers to delete all their data.  This is
-    #   mainly important because it eliminates possibility of using huge
-    #   amounts of memory.
-    #  swiftRemove()
-    #   Implementation of this function is somewhat problematic, as it only
-    #   partially undoes previous work
 
 
     #Pseudocode:

Modified: SwiftApps/SwiftR/Swift/R/Library.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Library.R	2011-02-03 20:19:49 UTC (rev 4056)
+++ SwiftApps/SwiftR/Swift/R/Library.R	2011-02-03 21:40:12 UTC (rev 4057)
@@ -94,3 +94,32 @@
 
     options(.swift.packages=plist)
 }
+
+buildLibStr <- function () {
+  # Build a list of "Library" statements for all of the libraries
+  # already specified through swiftLibrary commands.
+  lib <- getOption(".swift.packages")
+  if (!is.null(lib)) {
+    # library statements
+    stmts <- lapply(lib, 
+                function (libcmd) { 
+                    verb <- libcmd[1];
+                    if (verb == "library") {
+                        return (sprintf("library(%s);", libcmd[2]));
+                    }
+                    else if (verb == "detach") {
+                        return (sprintf("try(detach(package:%s),silent=T);", 
+                                    libcmd[2]));
+                    }
+                    else {
+                        error(paste("invalid verb ", verb, " in ",
+                                ".swift.packages option"))
+                    }
+                })
+    libstr = paste(stmts, collapse=" ")
+  }
+  else {
+    libstr = ""
+  }
+  return (libstr)
+}




More information about the Swift-commit mailing list