[Swift-commit] r4055 - in SwiftApps/SwiftR: . Swift Swift/R

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Thu Feb 3 14:00:40 CST 2011

Author: tga
Date: 2011-02-03 14:00:40 -0600 (Thu, 03 Feb 2011)
New Revision: 4055

Refactored R code a bit:
* Added NAMESPACE declaration so that only functions intended to be exported
    will be exported
* Split Swift.R into Export.R, Library.R and Apply.R as it was getting rather logn
* Refactored swiftapply to use some subprocedures.

Modified: SwiftApps/SwiftR/Makefile
--- SwiftApps/SwiftR/Makefile	2011-02-03 18:37:26 UTC (rev 4054)
+++ SwiftApps/SwiftR/Makefile	2011-02-03 20:00:40 UTC (rev 4055)
@@ -3,6 +3,7 @@
 PKG_FILES = $(shell find Swift/R -name '*.R'; find Swift/man -name '*.Rd'; find Swift/exec/ -type f -not -path '*/.svn*'; find Swift/inst/ -type f -not -path '*/.svn*')
 # Extract the version number from the R package description file

Added: SwiftApps/SwiftR/Swift/NAMESPACE
--- SwiftApps/SwiftR/Swift/NAMESPACE	                        (rev 0)
+++ SwiftApps/SwiftR/Swift/NAMESPACE	2011-02-03 20:00:40 UTC (rev 4055)
@@ -0,0 +1,10 @@

Copied: SwiftApps/SwiftR/Swift/R/Apply.R (from rev 4054, SwiftApps/SwiftR/Swift/R/Swift.R)
--- SwiftApps/SwiftR/Swift/R/Apply.R	                        (rev 0)
+++ SwiftApps/SwiftR/Swift/R/Apply.R	2011-02-03 20:00:40 UTC (rev 4055)
@@ -0,0 +1,301 @@
+# Modes for directories and files created by SwiftR
+# We only give permissions to the current user.
+kDIR_MODE <- "700"
+kFILE_MODE <- "600"
+swiftapply <- function( func, arglists,
+                        server=NULL,
+                        callsperbatch=NULL,
+                        runmode=NULL,
+                        initialexpr=NULL,
+                        workerhosts=NULL,
+                        keepwork=NULL,
+                        tmpdir=NULL,
+                        timeout=NULL, 
+                        quiet=FALSE)
+  # Set Swift default options if not passed as keywords or pre-set by user
+  if(is.null(server))
+    server <- getOption("swift.server")
+  if(is.null(server))
+    server<-"local"
+  if(is.null(callsperbatch))
+    callsperbatch <- getOption("swift.callsperbatch")
+  if(is.null(callsperbatch)) {
+    # auto batching:
+    # start with 1 batch per core
+    wc <- workerCount(server)
+    if (is.null(wc)) {
+        if (! quiet ) 
+            cat(paste("Information about current server of type", 
+                    server, "not found"))
+        callsperbatch <- 1
+    }
+    else {
+        batches_per_worker <- getOption("swift.batchesperworker")
+        if (is.null(batches_per_worker)) 
+            batches_per_worker <- 1
+        target_batches <- wc * batches_per_worker
+        # We want to have at least batches_per_worker batches of calls
+        # for each worker.  If it doesn't divide evenly, some will get
+        # batches_per_worker + 1 batches.  
+        # callsperbatch < 1 doesn't make sense, hence the use of max
+        callsperbatch = max(1, floor(length(arglists)/target_batches))
+#        if (! quiet ) 
+#            cat(paste("Selected callsperbatch of", callsperbatch, 
+#                    "automatically, based on worker count of",wc,
+#                    "and arg count of ", length(arglists)))
+    }
+  }
+  #cat("Got ", length(arglists), " arguments with batching factor ", callsperbatch)
+  if(is.null(runmode))
+    runmode <- getOption("swift.runmode")
+  if(is.null(runmode))
+    runmode <- "service"
+    # service: send requests to Swift service loop via fifo
+    # script:  run swift for each request, via RunSwiftScript.sh (currently broken)
+    # manual:  for testing, let user run remote R server manually
+  # Build a list of "Library" statements for all of the libraries
+  # already specified through swiftLibrary commands.
+  lib <- getOption(".swift.packages")
+  if (!is.null(lib)) {
+    # library statements
+    stmts <- lapply(lib, 
+                function (libcmd) { 
+                    verb <- libcmd[1];
+                    if (verb == "library") {
+                        return (sprintf("library(%s);", libcmd[2]));
+                    }
+                    else if (verb == "detach") {
+                        return (sprintf("try(detach(package:%s),silent=T);", 
+                                    libcmd[2]));
+                    }
+                    else {
+                        error(paste("invalid verb ", verb, " in ",
+                                ".swift.packages option"))
+                    }
+                })
+    libstr = paste(stmts, collapse=" ")
+    cat("libstr: ", libstr)
+  }
+  else {
+    libstr = ""
+  }
+  if(is.null(initialexpr))
+    initialexpr <- getOption("swift.initialexpr")
+  # Have library imports before other expressions
+  initialexpr <- paste(libstr, initialexpr, collapse=" ");
+  if(is.null(workerhosts))
+    workerhosts <- getOption("swift.workerhosts")
+  if(is.null(workerhosts))
+    workerhosts <- "localhost";
+  if(is.null(keepwork))
+    keepwork <- getOption("swift.keepwork")
+  if(is.null(keepwork))
+    keepwork <- FALSE;
+  if(is.null(tmpdir))
+    tmpdir <- getOption("swift.tmpdir")
+  if(is.null(tmpdir))
+    tmpdir <- Sys.getenv("SWIFTR_TMP");
+  if(tmpdir=="") {
+    if(is.null(tmpdir))
+      tmpdir <- getOption("swift.tmpdir")
+    tmpdir <- "/tmp";
+  }
+  if(is.null(quiet)) {
+    quiet <- getOption("swift.quiet")
+    if(is.null(quiet)) 
+        quiet <- FALSE
+  }
+  if (! quiet) {
+      cat("\nswiftapply to ", length(arglists), " arg lists.\n")
+      cat("\nSwift properties:\n")
+      cat("  server =", server,"\n")
+      cat("  callsperbatch =", callsperbatch,"\n")
+      cat("  runmode =", runmode,"\n")
+      cat("  tmpdir =", tmpdir,"\n")
+      cat("  workerhosts =", workerhosts,"\n")
+      cat("  initialexpr =", initialexpr,"\n\n")
+  }
+  # Execute the calls in batches
+  reqdir <- setupRequestDir(tmpdir=tmpdir)
+  if (! quiet) {
+      cat("Swift request is in",reqdir,"\n")
+  }  
+  nbatches <- writeRequestBatches(func, arglists, initialexpr, 
+                reqdir, callsperbatch)
+  swiftapplyScript <- system.file(package="Swift","exec/swiftapply.swift")
+  if( runmode == "manual" ) { # Prompt for return (empty line) to continue; assumes user ran a manual R to process the call.
+    cat("Manual Swift Run:\n  run dir: ", getwd(), "/", reqdir,"\n\n")
+    cat("  swift script: ", RunSwiftScript, "\n")
+    cat("  server: ", server,"\n")
+    cat("  swiftapplyScript: ", swiftapplyScript,"\n")
+    cat("  Use RunAllR.sh to process and press return when complete:")
+    system(paste("cp ", system.file(package="Swift","exec/RunAllR.sh"), reqdir))
+    readLines(n=1)
+  }
+  else if (runmode == "script") {
+    RunSwiftScript <- system.file(package="Swift","exec/RunSwiftScript.sh")
+    system(paste(RunSwiftScript,reqdir,server,swiftapplyScript,"\"",workerhosts,"\""))
+  }
+  else { # runmode == "service" # FIXME: check and post error if not "service"
+    # Send request to service
+    user <- Sys.info()[["user"]]
+    swiftServerDir = file.path(tmpdir,user,"SwiftR", 
+                paste("swift.",server,sep=""))
+    requestPipeName=file.path(swiftServerDir,"requestpipe")
+    resultPipeName=file.path(swiftServerDir,"resultpipe")
+    # fifo will block irrecoverably if there is no reader on the
+    # other end of the requestPipe.  This is bad.  The swift worker
+    # script is responsible for deleting the request pipe when it
+    # shuts down, so we know if the requestPipe still exists there
+    # should still be a worker (or the worker crashed in a funny way).
+    if (file.exists(requestPipeName)) {
+        # there is a race condition here if the fifo disappears in
+        # between checking for existence and opening the fifo, but
+        # the timeout will catch that unlikely case
+        writeTimeout <- 4000
+        success <- writeFifo(requestPipeName,paste(reqdir,"\n",sep=""), 
+                timeout=writeTimeout)
+        if (! success) {
+            stop(paste("timeout of", writeTimeout, 
+                "ms exceeded when attempting to",
+                "rendezvous with swift server: maybe it is not running or",
+                "it has crashed"))
+        }
+        # Wait for reply from service
+        res <- readFifo(resultPipeName, timeout=timeout)
+        if ((! length(res) == 0) && is.na(res)) {
+            stop(paste("Timeout of ", timeout, "ms exceeded when waiting",
+                "for response from swift server"))
+        }
+        # Check that the message was correct
+        successMsg <- "done"
+        if (res[[1]] != successMsg) 
+            stop(paste("Got unexpected message '", 
+                paste(res, collapse="\n"),"' on fifo ",
+                "aborting job", sep=""))
+    }
+    else {
+        stop(paste("Have you run swiftInit?\n",
+                "It appears that no SwiftR servers of type", server, 
+                "are running, as no request pipe exists in", 
+                swiftServerDir))
+    }
+  }
+  # Fetch the batch results
+  rno <- 1
+  rlist <- list()
+  for(batch in 1:nbatches) {
+    result <- NULL
+    load(file.path(reqdir,
+            paste("/rbatch.",as.character(batch),".Rdata",sep="")))
+    nresults <- length(result)
+    for(r in 1:nresults) {
+      rlist[[rno]] <- result[[r]]
+if(class(result[[r]]) == "try-error") {
+cat("ERROR in eval: ", result[[r]], "\n");
+      #DB cat("swiftapply: result rno=",rno,":\n") # FIXME: for logging
+      #DB cat(rlist[[rno]]@output$gradient,"\n")
+      rno <- rno + 1
+    }
+  }
+  names(rlist) = names(arglists)
+  if( ! keepwork ) {
+    cat("Removing ", reqdir, "\n")
+    unlink(reqdir,recursive=TRUE)
+  }
+  return(rlist)
+swiftLapply <- function( tlist, func, ... )
+  if (length(tlist) == 0) return(tlist)
+  arglists <- list()
+  narglists <- length(tlist)
+  for(i in 1 : narglists) {
+    arglists[[i]] <- list(tlist[[i]], ...);
+  }
+  names(arglists) = names(tlist)
+  swiftapply(func, arglists)
+setupRequestDir <- function (tmpdir) {
+  # Initialize globals if first call in this invocation of R
+  # Use the options mechanism so that setting is tied
+  # to lifetime of this R process.  If we stored this in a global
+  # variable, it is possible that, say, directory requests.55555/R0000005
+  # is created, the user exits the session without saving, and therefore
+  # the swift.requestid counter is out of step with the file system
+  requestdirbase = getOption("swift.requestdirbase") 
+  if(!is.null(requestdirbase)) {
+    requestid = getOption("swift.requestid") + 1;
+  }
+  else {
+    requestdirbase = sprintf("%s/%s/SwiftR/requests.P%.5d",tmpdir,
+                            Sys.info()[["user"]],Sys.getpid())
+    dir.create(requestdirbase,recursive=TRUE,showWarnings=FALSE, 
+            mode=kDIR_MODE)
+    options(swift.requestdirbase=requestdirbase)
+    requestid = 0;
+  }
+  options(swift.requestid=requestid)
+  reqdir = sprintf("%s/R%.7d",requestdirbase,requestid)
+  dir.create(reqdir,recursive=TRUE,showWarnings=FALSE,mode=kDIR_MODE)
+  return (reqdir)
+writeRequestBatches <- function (func, arglists, initialexpr, 
+                        reqdir, callsperbatch) {
+  # Write the function call info out to cbatch.?.RData files in reqdir 
+  # in batches of size specified by callsperbatch 
+  # returns the number of batches written
+  narglists <- length(arglists) # number of arglists to process
+  batch <- 1   # Next arglist batch number to fill
+  arglist <- 1 # Next arglist number to insert
+  while(arglist <= narglists) {
+    arglistsleft <- narglists - arglist + 1
+    if(arglistsleft >= callsperbatch) {
+      batchsize <- callsperbatch
+    }
+    else {
+      batchsize <- arglistsleft
+    }
+    arglistbatch <- list()
+    for(i in 1 : batchsize) {
+      arglistbatch[[i]] <- arglists[[arglist]]
+      arglist <- arglist +1 
+    }
+    rcall <- list(initializer=initialexpr,func=func,arglistbatch=arglistbatch)
+    save(rcall,
+        file=file.path(reqdir,
+            paste("cbatch.",as.character(batch),".Rdata",sep="")))
+    batch <- batch + 1;
+  }
+  return (batch - 1)

Property changes on: SwiftApps/SwiftR/Swift/R/Apply.R
Name: svn:mergeinfo

Added: SwiftApps/SwiftR/Swift/R/Export.R
--- SwiftApps/SwiftR/Swift/R/Export.R	                        (rev 0)
+++ SwiftApps/SwiftR/Swift/R/Export.R	2011-02-03 20:00:40 UTC (rev 4055)
@@ -0,0 +1,64 @@
+swiftRemoveAll <- function () {
+    # Cleans up all data in global namespace on workers
+    options(.swift.exports=list(c("removeAll")))
+swiftExportAll <- function () {
+    return (swiftExport(list=ls(globalenv())))
+#TODO: not implemented
+.swiftExport <- function (..., list=NULL) {
+    # List of object names (as R symbols or as strings)
+    # These will be passed directly to save() to be serialized
+    #TODO:
+    # We want the protocol to support workers coming and going from the 
+    #  cluster.  We want to rely on less implicit state.  Ie. if we run
+    # swiftExport, then a worker is added to cluster, then an apply call
+    # is run, we want the worker to include the exported objects.
+    # One approach to doing this is to enforce the export at apply
+    # time.
+    #
+    #
+    # When swiftExport() is called:
+    # * Save all of the objects to a file 
+    #    - Q:Can we assume its on a shared filesystem?
+    # * MAYBE: put the file somewhere accessible by all workers
+    # * append the file's path to a stored list of files
+    # * MAYBE LATER: asynchronously start swift tasks to distribute
+    #   the data, to get a headstart on the work
+    # When swiftApply() is called:
+    #   - Inform all worker R processes through some mechanism (TBD)
+    #       of the list of all exported data files. (extra file?)
+    #   - The worker goes through the list in order to ensure that name 
+    #       conflicts are resolved in the way you would expect.  
+    #       If it has loaded a file previously, it does nothing.
+    #       If it hasn't loaded the file, it accesses the file (mechanism TBD)
+    #       and loads it into the global namespace.
+    #       data 
+    # 
+    # swiftRemoveAll():
+    #   delete the list of exported files from above and have a single
+    #   entry which instructs workers to delete all their data.  This is
+    #   mainly important because it eliminates possibility of using huge
+    #   amounts of memory.
+    #  swiftRemove()
+    #   Implementation of this function is somewhat problematic, as it only
+    #   partially undoes previous work
+    #Pseudocode:
+    # TODO: choose file location
+    # expFile <- ???
+    save(..., list=list, file=expFile)
+    exportList <- getOption(".swift.exports")
+    if (is.null(exportList))
+        exportList = list() #TODO: start with removeAll command?
+    exportList[[length(exportList) + 1]] = c("export", expFile)
+    options(.swift.exports=exportList)

Added: SwiftApps/SwiftR/Swift/R/Library.R
--- SwiftApps/SwiftR/Swift/R/Library.R	                        (rev 0)
+++ SwiftApps/SwiftR/Swift/R/Library.R	2011-02-03 20:00:40 UTC (rev 4055)
@@ -0,0 +1,75 @@
+swiftLibrary <- function (packname) {
+    # Add a library to be included on all workers
+    # The package can be provided directly or alternatively
+    # the package name can be provided.
+    # If reset is true, then previously added libraries won't be
+    # reimported on new workers.
+    # Check to see if it is a string, if it is not a string
+    # then we will deparse it to get the expression text
+    if (!is.character(substitute(packname))) {
+        # Maybe library was provided unquoted
+        packname <- deparse(substitute(packname))
+    }
+    if (! packname %in% installed.packages()) {
+        # Warn users in case they made a typo
+        warning(paste("Package", packname, 
+                "was not a installed package in this instance of R,", 
+                "but may be installed on SwiftR workers"))
+    }
+    plist <- getOption(".swift.packages")
+    if (is.null(plist)) {
+        plist <- list()
+    }
+    else {
+        # scan libs for previous detaches and libraries of 
+        # this package.  Delete all of them
+        plist = Filter(function(packcmd) 
+                        { return (packcmd[2] != packname);},
+                        plist)
+    }
+    # Add the library command to the end of the list of commands to be 
+    # executed in slave R instances
+    plist[[length(plist) + 1]] <- c("library", packname)
+    options(.swift.packages=plist)
+swiftDetach <- function (name) {
+    # name is an string or identifier such as "package:OpenMx",
+    #   following the same pattern as the built in R detach() function
+    # Detaches a library from workers and ensures that it will no longer be 
+    # imported.  Note that the effect on dependent imports is a little
+    # messy and behaves differently on existing and new workers.  
+    # If package A requires package B, and we import A on a worker,
+    # package B will also be imported on the worker.  By detaching package A
+    # we don't also detach B.  This contrasts to a fresh worker, where
+    # package B will not be imported.
+    if (!is.character(substitute(name))) {
+        # Maybe package was provided unquoted
+        name <- deparse(substitute(name))
+    }
+    #TODO: remove from options(".swift.packages")
+    # Scan through list and remove any attach or detach of this name
+    # add a detach cmd to the end
+    if (substr(name, 1, 8) != "package:")
+        stop("Can only detach packages.  Package must be specified as package:PackageName")
+    # Get bit after "package:"
+    packname = substr(name, 9, nchar(name))
+    plist = getOption(".swift.packages")
+    if (is.null(plist)) {
+        plist = list()
+    }
+    else {
+        # Previous export/detach commands are now redundant
+        plist = Filter(function(packcmd) 
+                        { return (packcmd[2] != packname);},
+                        plist)
+    }
+    plist[[length(plist) + 1]] = c("detach", packname)
+    options(.swift.packages=plist)

Deleted: SwiftApps/SwiftR/Swift/R/Swift.R
--- SwiftApps/SwiftR/Swift/R/Swift.R	2011-02-03 18:37:26 UTC (rev 4054)
+++ SwiftApps/SwiftR/Swift/R/Swift.R	2011-02-03 20:00:40 UTC (rev 4055)
@@ -1,421 +0,0 @@
-# Modes for directories and files created by SwiftR
-# We only give permissions to the current user.
-kDIR_MODE <- "700"
-kFILE_MODE <- "600"
-swiftapply <- function( func, arglists,
-                        server=NULL,
-                        callsperbatch=NULL,
-                        runmode=NULL,
-                        initialexpr=NULL,
-                        workerhosts=NULL,
-                        keepwork=NULL,
-                        tmpdir=NULL,
-                        timeout=NULL, 
-                        quiet=FALSE)
-  # Set Swift default options if not passed as keywords or pre-set by user
-  if(is.null(server))
-    server <- getOption("swift.server")
-  if(is.null(server))
-    server<-"local"
-  if(is.null(callsperbatch))
-    callsperbatch <- getOption("swift.callsperbatch")
-  if(is.null(callsperbatch)) {
-    # auto batching:
-    # start with 1 batch per core
-    wc <- workerCount(server)
-    if (is.null(wc)) {
-        if (! quiet ) 
-            cat(paste("Information about current server of type", 
-                    server, "not found"))
-        callsperbatch <- 1
-    }
-    else {
-        batches_per_worker <- getOption("swift.batchesperworker")
-        if (is.null(batches_per_worker)) 
-            batches_per_worker <- 1
-        target_batches <- wc * batches_per_worker
-        # We want to have at least batches_per_worker batches of calls
-        # for each worker.  If it doesn't divide evenly, some will get
-        # batches_per_worker + 1 batches.  
-        # callsperbatch < 1 doesn't make sense, hence the use of max
-        callsperbatch = max(1, floor(length(arglists)/target_batches))
-#        if (! quiet ) 
-#            cat(paste("Selected callsperbatch of", callsperbatch, 
-#                    "automatically, based on worker count of",wc,
-#                    "and arg count of ", length(arglists)))
-    }
-  }
-  #cat("Got ", length(arglists), " arguments with batching factor ", callsperbatch)
-  if(is.null(runmode))
-    runmode <- getOption("swift.runmode")
-  if(is.null(runmode))
-    runmode <- "service"
-    # service: send requests to Swift service loop via fifo
-    # script:  run swift for each request, via RunSwiftScript.sh (currently broken)
-    # manual:  for testing, let user run remote R server manually
-  # Build a list of "Library" statements for all of the libraries
-  # already specified through swiftLibrary commands.
-  lib <- getOption(".swift.packages")
-  if (!is.null(lib)) {
-    # library statements
-    stmts <- lapply(lib, 
-                function (libcmd) { 
-                    verb <- libcmd[1];
-                    if (verb == "library") {
-                        return (sprintf("library(%s);", libcmd[2]));
-                    }
-                    else if (verb == "detach") {
-                        return (sprintf("try(detach(package:%s),silent=T);", 
-                                    libcmd[2]));
-                    }
-                    else {
-                        error(paste("invalid verb ", verb, " in ",
-                                ".swift.packages option"))
-                    }
-                })
-    libstr = paste(stmts, collapse=" ")
-    cat("libstr: ", libstr)
-  }
-  else {
-    libstr = ""
-  }
-  if(is.null(initialexpr))
-    initialexpr <- getOption("swift.initialexpr")
-  # Have library imports before other expressions
-  initialexpr <- paste(libstr, initialexpr, collapse=" ");
-  if(is.null(workerhosts))
-    workerhosts <- getOption("swift.workerhosts")
-  if(is.null(workerhosts))
-    workerhosts <- "localhost";
-  if(is.null(keepwork))
-    keepwork <- getOption("swift.keepwork")
-  if(is.null(keepwork))
-    keepwork <- FALSE;
-  if(is.null(tmpdir))
-    tmpdir <- getOption("swift.tmpdir")
-  if(is.null(tmpdir))
-    tmpdir <- Sys.getenv("SWIFTR_TMP");
-  if(tmpdir=="") {
-    if(is.null(tmpdir))
-      tmpdir <- getOption("swift.tmpdir")
-    tmpdir <- "/tmp";
-  }
-  if(is.null(quiet)) {
-    quiet <- getOption("swift.quiet")
-    if(is.null(quiet)) 
-        quiet <- FALSE
-  }
-  if (! quiet) {
-      cat("\nswiftapply to ", length(arglists), " arg lists.\n")
-      cat("\nSwift properties:\n")
-      cat("  server =", server,"\n")
-      cat("  callsperbatch =", callsperbatch,"\n")
-      cat("  runmode =", runmode,"\n")
-      cat("  tmpdir =", tmpdir,"\n")
-      cat("  workerhosts =", workerhosts,"\n")
-      cat("  initialexpr =", initialexpr,"\n\n")
-  }
-  user <- Sys.info()[["user"]]
-  # Initialize globals if first call in this invocation of R
-  # Use the options mechanism so that setting is tied
-  # to lifetime of this R process.  If we stored this in a global
-  # variable, it is possible that, say, directory requests.55555/R0000005
-  # is created, the user exits the session without saving, and therefore
-  # the swift.requestid counter is out of step with the file system
-  requestdirbase = getOption("swift.requestdirbase") 
-  if(!is.null(requestdirbase)) {
-    requestid = getOption("swift.requestid") + 1;
-  }
-  else {
-    requestdirbase = sprintf("%s/%s/SwiftR/requests.P%.5d",tmpdir,user,Sys.getpid())
-    dir.create(requestdirbase,recursive=TRUE,showWarnings=FALSE, 
-            mode=kDIR_MODE)
-    options(swift.requestdirbase=requestdirbase)
-    requestid = 0;
-  }
-  options(swift.requestid=requestid)
-  # Execute the calls in batches
-  reqdir = sprintf("%s/R%.7d",requestdirbase,requestid)
-  dir.create(reqdir,recursive=TRUE,showWarnings=FALSE,mode=kDIR_MODE)
-  if (! quiet) {
-      cat("Swift request is in",reqdir,"\n")
-  }  
-  narglists <- length(arglists) # number of arglists to process
-  batch <- 1   # Next arglist batch number to fill
-  arglist <- 1 # Next arglist number to insert
-  while(arglist <= narglists) {
-    arglistsleft <- narglists - arglist + 1
-    if(arglistsleft >= callsperbatch) {
-      batchsize <- callsperbatch
-    }
-    else {
-      batchsize <- arglistsleft
-    }
-    arglistbatch <- list()
-    for(i in 1 : batchsize) {
-      arglistbatch[[i]] <- arglists[[arglist]]
-      arglist <- arglist +1 
-    }
-    rcall <- list(initializer=initialexpr,func=func,arglistbatch=arglistbatch)
-    save(rcall,file=paste(reqdir,"/cbatch.",as.character(batch),".Rdata",sep=""))
-    batch <- batch + 1;
-  }
-  nbatches <- batch - 1
-  swiftapplyScript <- system.file(package="Swift","exec/swiftapply.swift")
-  if( runmode == "manual" ) { # Prompt for return (empty line) to continue; assumes user ran a manual R to process the call.
-    cat("Manual Swift Run:\n  run dir: ", getwd(), "/", reqdir,"\n\n")
-    cat("  swift script: ", RunSwiftScript, "\n")
-    cat("  server: ", server,"\n")
-    cat("  swiftapplyScript: ", swiftapplyScript,"\n")
-    cat("  Use RunAllR.sh to process and press return when complete:")
-    system(paste("cp ", system.file(package="Swift","exec/RunAllR.sh"), reqdir))
-    readLines(n=1)
-  }
-  else if (runmode == "script") {
-    RunSwiftScript <- system.file(package="Swift","exec/RunSwiftScript.sh")
-    system(paste(RunSwiftScript,reqdir,server,swiftapplyScript,"\"",workerhosts,"\""))
-  }
-  else { # runmode == "service" # FIXME: check and post error if not "service"
-    # Send request to service
-    swiftServerDir = paste(tmpdir,"/",user,"/SwiftR/swift.",server,sep="")
-    requestPipeName=paste(swiftServerDir,"/requestpipe",sep="")
-    resultPipeName=paste(swiftServerDir,"/resultpipe",sep="")
-    # fifo will block irrecoverably if there is no reader on the
-    # other end of the requestPipe.  This is bad.  The swift worker
-    # script is responsible for deleting the request pipe when it
-    # shuts down, so we know if the requestPipe still exists there
-    # should still be a worker (or the worker crashed in a funny way).
-    if (file.exists(requestPipeName)) {
-        # there is a race condition here if the fifo disappears in
-        # between checking for existence and opening the fifo, but
-        # the timeout will catch that unlikely case
-        writeTimeout <- 4000
-        success <- writeFifo(requestPipeName,paste(reqdir,"\n",sep=""), 
-                timeout=writeTimeout)
-        if (! success) {
-            stop(paste("timeout of", writeTimeout, 
-                "ms exceeded when attempting to",
-                "rendezvous with swift server: maybe it is not running or",
-                "it has crashed"))
-        }
-        # Wait for reply from service
-        res <- readFifo(resultPipeName, timeout=timeout)
-        if ((! length(res) == 0) && is.na(res)) {
-            stop(paste("Timeout of ", timeout, "ms exceeded when waiting",
-                "for response from swift server"))
-        }
-        # Check that the message was correct
-        successMsg <- "done"
-        if (res[[1]] != successMsg) 
-            stop(paste("Got unexpected message '", 
-                paste(res, collapse="\n"),"' on fifo ",
-                "aborting job", sep=""))
-    }
-    else {
-        stop(paste("Have you run swiftInit?\n",
-                "It appears that no SwiftR servers of type", server, 
-                "are running, as no request pipe exists in", 
-                swiftServerDir))
-    }
-  }
-  # Fetch the batch results
-  rno <- 1
-  rlist <- list()
-  for(batch in 1:nbatches) {
-    result <- NULL
-    load(paste(reqdir,"/rbatch.",as.character(batch),".Rdata",sep=""))
-    nresults <- length(result)
-    for(r in 1:nresults) {
-      rlist[[rno]] <- result[[r]]
-if(class(result[[r]]) == "try-error") {
-cat("ERROR in eval: ", result[[r]], "\n");
-      #DB cat("swiftapply: result rno=",rno,":\n") # FIXME: for logging
-      #DB cat(rlist[[rno]]@output$gradient,"\n")
-      rno <- rno + 1
-    }
-  }
-  names(rlist) = names(arglists)
-  if( ! keepwork ) {
-    cat("Removing ", reqdir, "\n")
-    unlink(reqdir,recursive=TRUE)
-  }
-  return(rlist)
-swiftLapply <- function( tlist, func, ... )
-  if (length(tlist) == 0) return(tlist)
-  arglists <- list()
-  narglists <- length(tlist)
-  for(i in 1 : narglists) {
-    arglists[[i]] <- list(tlist[[i]], ...);
-  }
-  names(arglists) = names(tlist)
-  swiftapply(func, arglists)
-swiftRemoveAll <- function () {
-    # Cleans up all data in global namespace on workers
-    options(.swift.exports=list(c("removeAll")))
-swiftLibrary <- function (packname) {
-    # Add a library to be included on all workers
-    # The package can be provided directly or alternatively
-    # the package name can be provided.
-    # If reset is true, then previously added libraries won't be
-    # reimported on new workers.
-    # Check to see if it is a string, if it is not a string
-    # then we will deparse it to get the expression text
-    if (!is.character(substitute(packname))) {
-        # Maybe library was provided unquoted
-        packname <- deparse(substitute(packname))
-    }
-    if (! packname %in% installed.packages()) {
-        # Warn users in case they made a typo
-        warning(paste("Package", packname, 
-                "was not a installed package in this instance of R,", 
-                "but may be installed on SwiftR workers"))
-    }
-    plist <- getOption(".swift.packages")
-    if (is.null(plist)) {
-        plist <- list()
-    }
-    else {
-        # scan libs for previous detaches and libraries of 
-        # this package.  Delete all of them
-        plist = Filter(function(packcmd) 
-                        { return (packcmd[2] != packname);},
-                        plist)
-    }
-    # Add the library command to the end of the list of commands to be 
-    # executed in slave R instances
-    plist[[length(plist) + 1]] <- c("library", packname)
-    options(.swift.packages=plist)
-swiftDetach <- function (name) {
-    # name is an string or identifier such as "package:OpenMx",
-    #   following the same pattern as the built in R detach() function
-    # Detaches a library from workers and ensures that it will no longer be 
-    # imported.  Note that the effect on dependent imports is a little
-    # messy and behaves differently on existing and new workers.  
-    # If package A requires package B, and we import A on a worker,
-    # package B will also be imported on the worker.  By detaching package A
-    # we don't also detach B.  This contrasts to a fresh worker, where
-    # package B will not be imported.
-    if (!is.character(substitute(name))) {
-        # Maybe package was provided unquoted
-        name <- deparse(substitute(name))
-    }
-    #TODO: remove from options(".swift.packages")
-    # Scan through list and remove any attach or detach of this name
-    # add a detach cmd to the end
-    if (substr(name, 1, 8) != "package:")
-        stop("Can only detach packages.  Package must be specified as package:PackageName")
-    # Get bit after "package:"
-    packname = substr(name, 9, nchar(name))
-    plist = getOption(".swift.packages")
-    if (is.null(plist)) {
-        plist = list()
-    }
-    else {
-        # Previous export/detach commands are now redundant
-        plist = Filter(function(packcmd) 
-                        { return (packcmd[2] != packname);},
-                        plist)
-    }
-    plist[[length(plist) + 1]] = c("detach", packname)
-    options(.swift.packages=plist)
-swiftExportAll <- function () {
-    return (swiftExport(list=ls(globalenv())))
-#TODO: not implemented
-.swiftExport <- function (..., list=NULL) {
-    # List of object names (as R symbols or as strings)
-    # These will be passed directly to save() to be serialized
-    #TODO:
-    # We want the protocol to support workers coming and going from the 
-    #  cluster.  We want to rely on less implicit state.  Ie. if we run
-    # swiftExport, then a worker is added to cluster, then an apply call
-    # is run, we want the worker to include the exported objects.
-    # One approach to doing this is to enforce the export at apply
-    # time.
-    #
-    #
-    # When swiftExport() is called:
-    # * Save all of the objects to a file 
-    #    - Q:Can we assume its on a shared filesystem?
-    # * MAYBE: put the file somewhere accessible by all workers
-    # * append the file's path to a stored list of files
-    # * MAYBE LATER: asynchronously start swift tasks to distribute
-    #   the data, to get a headstart on the work
-    # When swiftApply() is called:
-    #   - Inform all worker R processes through some mechanism (TBD)
-    #       of the list of all exported data files. (extra file?)
-    #   - The worker goes through the list in order to ensure that name 
-    #       conflicts are resolved in the way you would expect.  
-    #       If it has loaded a file previously, it does nothing.
-    #       If it hasn't loaded the file, it accesses the file (mechanism TBD)
-    #       and loads it into the global namespace.
-    #       data 
-    # 
-    # swiftRemoveAll():
-    #   delete the list of exported files from above and have a single
-    #   entry which instructs workers to delete all their data.  This is
-    #   mainly important because it eliminates possibility of using huge
-    #   amounts of memory.
-    #  swiftRemove()
-    #   Implementation of this function is somewhat problematic, as it only
-    #   partially undoes previous work
-    #Pseudocode:
-    # TODO: choose file location
-    # expFile <- ???
-    save(..., list=list, file=expFile)
-    exportList <- getOption(".swift.exports")
-    if (is.null(exportList))
-        exportList = list() #TODO: start with removeAll command?
-    exportList[[length(exportList) + 1]] = c("export", expFile)
-    options(.swift.exports=exportList)

More information about the Swift-commit mailing list