[Swift-commit] r4062 - in SwiftApps/SwiftR/Swift: R exec

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Mon Feb 7 11:21:08 CST 2011


Author: tga
Date: 2011-02-07 11:21:08 -0600 (Mon, 07 Feb 2011)
New Revision: 4062

Modified:
   SwiftApps/SwiftR/Swift/R/Apply.R
   SwiftApps/SwiftR/Swift/R/Export.R
   SwiftApps/SwiftR/Swift/R/Workers.R
   SwiftApps/SwiftR/Swift/exec/SwiftRServer.R
Log:
* swiftExport, swiftExportAll and swiftRemoveAll functions implemented.
    - However, they rely on the user manually specifying an export file on
        a filesystem readable by the workers.  THis will be fixed next.
    - These functions are implemented using a session abstraction which allows
        workers newly joining a swift cluster to "catch up" on previous 
        exports when an apply call is made, while not requiring workers to 
        repeatedly import the same files
* Functions on host now run in a separate environment from the global 
    environment, which can be tidied up when removeAll is invoked.  
    It is no longer necessary to use the <<- operator in swift initial 
    expressions: any assignment made with the <- operator in an initializer
    will be visible to any function being run.
* Any references to variables or function made in the body of an implied 
    function that match the name of an exported variable/function resolve 
    correctly
* Bugfix: count number of cores correct for ssh workers
* Bugfix: pass quiet argument to where it is required



Modified: SwiftApps/SwiftR/Swift/R/Apply.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Apply.R	2011-02-04 18:42:48 UTC (rev 4061)
+++ SwiftApps/SwiftR/Swift/R/Apply.R	2011-02-07 17:21:08 UTC (rev 4062)
@@ -24,7 +24,7 @@
 
   if(is.null(callsperbatch)) callsperbatch <- getOption("swift.callsperbatch")
   if(is.null(callsperbatch)) 
-    callsperbatch <- chooseBatchSize(length(arglists), server)
+    callsperbatch <- chooseBatchSize(length(arglists), server, quiet=quiet)
 
   # service: send requests to Swift service loop via fifo
   # script:  run swift for each request, via RunSwiftScript.sh (broken)
@@ -150,7 +150,11 @@
 writeRequestBatches <- function (func, arglists, initialexpr, 
                         reqdir, callsperbatch, exportlist=NULL) {
 
-  if (is.null(exportlist)) exportlist <- getOption(".swift.exports")
+  if (is.null(exportlist)) exportlist <- getOption(".swift.session")
+  if (is.null(exportlist)) {
+    exportlist <- newSession()
+    options(.swift.session = exportlist)
+  }
   # Write the function call info out to cbatch.?.RData files in reqdir 
   # in batches of size specified by callsperbatch 
   # returns the number of batches written
@@ -208,7 +212,7 @@
   return(rlist)
 }
 
-chooseBatchSize <- function (numargs, server) {
+chooseBatchSize <- function (numargs, server, quiet=FALSE) {
     # Automatic selection of worker count
     # start with 1 batch per core
     wc <- workerCount(server)

Modified: SwiftApps/SwiftR/Swift/R/Export.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Export.R	2011-02-04 18:42:48 UTC (rev 4061)
+++ SwiftApps/SwiftR/Swift/R/Export.R	2011-02-07 17:21:08 UTC (rev 4062)
@@ -47,22 +47,37 @@
 }
 
 #TODO: not implemented
-swiftExport <- function (..., list=NULL, expFile=NULL) {
+swiftExport <- function (..., list=NULL, file=NULL) {
     # List of object names (as R symbols or as strings)
     # These will be passed directly to save() to be serialized
     # TODO: For now,while testing, must manually specify location
     # TODO: choose file location
     # expFile <- ???
-    save(..., list=list, file=expFile)
-    exportList <- getOption(".swift.exports")
-    if (is.null(exportList))
-        exportList = list() #TODO: start with removeAll command?
-    exportList[[length(exportList) + 1]] = c("import", expFile)
-    options(.swift.exports=exportList)
+    if (is.null(list))
+        save(..., file=file)
+    else
+        save(..., list=list, file=file)
+    session <- getOption(".swift.session")
+    if (is.null(session))
+        session <- newSession()
+    session$exports[[length(session$exports) + 1]] <- file
+    options(.swift.session=session)
 
 }
 
+newSession <- function () {
+    exportList= list()
+    exportList$session <- as.integer(runif(1, min=0, max=.Machine$integer.max))
+    exportList$exports <- list()
+    return (exportList)
+}
+
 swiftRemoveAll <- function () {
     # Cleans up all data in global namespace on workers
-    options(.swift.exports=list(c("removeAll")))
+    # This is achieved by starting a new "session"
+    # Use session numbers to track whether a worker has the
+    # correct data
+    # TODO: think about how to clean up data files.
+    # it is hard to know when they can be safely removed
+    options(.swift.session=newSession())
 }

Modified: SwiftApps/SwiftR/Swift/R/Workers.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Workers.R	2011-02-04 18:42:48 UTC (rev 4061)
+++ SwiftApps/SwiftR/Swift/R/Workers.R	2011-02-07 17:21:08 UTC (rev 4062)
@@ -66,7 +66,12 @@
 
     if(is.null(hosts))
         hosts <- getOption("swift.hosts")
-    if(! is.null(hosts) )  {
+    if( is.null(hosts) )  {
+        if (server == "ssh") {
+            error(paste("Need to provide hosts list for ssh server."))
+        }
+    }
+    else {
         if (length(hosts) > 1) {
             # Concatenate list of hosts
             hosts <- paste(hosts, collapse=" ")
@@ -95,7 +100,10 @@
         if(is.null(nodes))
             nodes <- getOption("swift.nodes")
         if(is.null(nodes))
-            nodes <- 1 # Default value
+            if (server == "ssh")
+                nodes <- length(strsplit(hosts, " "))
+            else 
+                nodes <- 1 # Default value
         if(! is.null(nodes) )  {
             cmdString <- paste(cmdString, "-n", shQuote(nodes)) 
         }

Modified: SwiftApps/SwiftR/Swift/exec/SwiftRServer.R
===================================================================
--- SwiftApps/SwiftR/Swift/exec/SwiftRServer.R	2011-02-04 18:42:48 UTC (rev 4061)
+++ SwiftApps/SwiftR/Swift/exec/SwiftRServer.R	2011-02-07 17:21:08 UTC (rev 4062)
@@ -7,7 +7,7 @@
 
 cat(Sys.getpid(),file=paste(fifoDir,"/R.pid",sep=""))
 
-latestInitializer <- "";
+.current.initializer <<- ""
 
 SwiftRFifoServer <- function( fifoBasename )
 {
@@ -45,14 +45,12 @@
   }
 }
 
-doInit <- function(initializer) {
+doInit <- function(initializer, env) {
   #print(sprintf("received initializer=%s latestInitializer=%s\n",
   #                         initializer, latestInitializer));
-  if( initializer != latestInitializer) {
-    initialExpr <- parse(text=initializer)
-    eval(initialExpr)
-    latestInitializer <<- initializer
-    # print(sprintf("after eval: latestInitializer=%s\n", latestInitializer));
+  if( initializer != .current.initializer) {
+    eval(parse(text=initializer), envir=env)
+    .current.initializer <<- initializer
   }
 }
 
@@ -73,16 +71,35 @@
   success <- try(load(callBatchFileName));
   if (inherits(success, "try-error")) {
     failBatch(rcall, success, resultBatchFileName)
-    return
+    return()
   }
-  success <- try(doInit(rcall$initializer))
+  success <- try(loadImports(rcall$imports))
   if (inherits(success, "try-error")) {
     failBatch(rcall, success, resultBatchFileName)
-    return
+    return()
   }
-  success <- try(loadImports(rcall$imports))
+
+  if (exists(".current.envir", envir=globalenv())) {
+    env = get(".current.envir", envir=globalenv())
+  }
+  else {
+    # Create a new environment to work in
+    if (is.null(importlist) || is.null(importlist$session))  {
+        # If the client provides no info about the environment,
+        # we should create a new environment to work in
+        env = new.env(parent=parent.env(globalenv()))
+    }
+    else {
+        newSession(importlist$session)    
+        env = get(".current.envir", envir=globalenv())
+    }
+  }
+  environment(rcall$func) <- env
+
+  success <- try(doInit(rcall$initializer, env))
   if (inherits(success, "try-error")) {
     failBatch(rcall, success, resultBatchFileName)
+    return()
   }
 
   result <- list()
@@ -95,52 +112,61 @@
   save(result,file=resultBatchFileName)
 }
 
+newSession <- function (session) {
+    cat("Setting up new session\n", file=stderr())
+    #Setup new session
+    .current.session <<- session
+    # Use an environment as a hash table to track imports
+    .current.imported <<- new.env(hash=T, parent=emptyenv())
+    # Create an environment to store imported items in
+    # and to evaluate apply calls in.  Note that the environment
+    # does not point to the global environment as its parent, which means
+    # means that assignments and variable lookusp will not touch the global
+    # environment by default, and will be contained in this session.
+    # However, the <<- assignment operator and the get, assign, etc function 
+    # can still access the global enviroment if used in the applied function.
+
+    .current.envir <<- new.env(parent=parent.env(globalenv()))
+
+    .current.initializer <<- ""
+}
+
 loadImports <- function (importlist) {
-    # importlist is a list of directives to be carried out in sequence.
-    # There are two distinct directives which appear in the first position:
-    #   - removeAll removes all global R objects 
-    #   - import comes with an argument in the second position which is
-    #       the location of an R file to import
-    for (imp in importlist) {
-        directive <- imp[1]
-        if (directive == "import") {
-            # Load the contents of the specified file
-            # into the global environment
-            file <- imp[2]
-            if (!exists(".swift.imported")) {
-                .swift.imported <<- new.env(hash=T, parent=emptyenv())
-                doImport <- TRUE
-            }
-            else {
-                # check to see if already imported
-                doImport <- !exists(file, envir=.swift.imported)
-            }
-            if (doImport) {
-                #TODO: load can fail with warning
-                load(file, envir=globalenv()) 
-                # if an error occurs here, assume calling function
-                # will catch it
-                assign(file, TRUE, envir=.swift.imported)
-                cat("Loaded file ", file, "\n")
-            }
-            else {
-                cat("Ignored file ", file, "\n")
-            }
+    # First check whether the session id has changed
+    cat(paste("Import list:", importlist$exports, "\n"), file=stderr())
+    cat(paste("New Session:", importlist$session, "\n"), file=stderr())
+    cat(paste("Old Session:", try(get(".current.session", envir=globalenv())), 
+                "\n"), file=stderr())
+    doSetup <- FALSE
+    if (!is.null(importlist)) {
+        if (exists(".current.session", envir=globalenv())) {
+            if (importlist$session != .current.session) 
+                doSetup <- TRUE
         }
-        else if (directive == "removeAll") {
-            # delete all visible objects in global environment
-            # TODO: need to record that we did this removeAll
-            cat("Deleting ", ls(envir=globalenv()))
-            #TODO: this fails because it deletes the functions in 
-            # this file from the environment
-            rm(list=ls(envir=globalenv()))
-            # reset record of imported items
-            if (exists(".swift.imported", envir=globalenv()))
-                rm(.swift.imported, envir=globalenv()) 
-            cat("Deleted all")
+        else {
+            doSetup <- TRUE
         }
+    }
+    if (doSetup) {
+        newSession(importlist$session)
+    }
+
+    for (file in importlist$exports) {
+        # Load the contents of the specified file
+        # into the global environment
+
+        cat("File: ", file, "\n")
+        # check to see if already imported
+        if (!exists(file, envir=.current.imported)) {
+            #TODO: load can fail with warning
+            load(file, envir=.current.envir) 
+            # if an error occurs here, assume calling function
+            # will catch it
+            assign(file, TRUE, envir=.current.imported)
+            cat("Loaded file ", file, "\n")
+        }
         else {
-            stop(paste("Invalid import directive", directive))
+            cat("Ignored file ", file, "\n")
         }
     }
 }




More information about the Swift-commit mailing list