[Swift-commit] r4062 - in SwiftApps/SwiftR/Swift: R exec
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Mon Feb 7 11:21:08 CST 2011
Author: tga
Date: 2011-02-07 11:21:08 -0600 (Mon, 07 Feb 2011)
New Revision: 4062
Modified:
SwiftApps/SwiftR/Swift/R/Apply.R
SwiftApps/SwiftR/Swift/R/Export.R
SwiftApps/SwiftR/Swift/R/Workers.R
SwiftApps/SwiftR/Swift/exec/SwiftRServer.R
Log:
* swiftExport, swiftExportAll and swiftRemoveAll functions implemented.
- However, they rely on the user manually specifying an export file on
a filesystem readable by the workers. THis will be fixed next.
- These functions are implemented using a session abstraction which allows
workers newly joining a swift cluster to "catch up" on previous
exports when an apply call is made, while not requiring workers to
repeatedly import the same files
* Functions on host now run in a separate environment from the global
environment, which can be tidied up when removeAll is invoked.
It is no longer necessary to use the <<- operator in swift initial
expressions: any assignment made with the <- operator in an initializer
will be visible to any function being run.
* Any references to variables or function made in the body of an implied
function that match the name of an exported variable/function resolve
correctly
* Bugfix: count number of cores correct for ssh workers
* Bugfix: pass quiet argument to where it is required
Modified: SwiftApps/SwiftR/Swift/R/Apply.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Apply.R 2011-02-04 18:42:48 UTC (rev 4061)
+++ SwiftApps/SwiftR/Swift/R/Apply.R 2011-02-07 17:21:08 UTC (rev 4062)
@@ -24,7 +24,7 @@
if(is.null(callsperbatch)) callsperbatch <- getOption("swift.callsperbatch")
if(is.null(callsperbatch))
- callsperbatch <- chooseBatchSize(length(arglists), server)
+ callsperbatch <- chooseBatchSize(length(arglists), server, quiet=quiet)
# service: send requests to Swift service loop via fifo
# script: run swift for each request, via RunSwiftScript.sh (broken)
@@ -150,7 +150,11 @@
writeRequestBatches <- function (func, arglists, initialexpr,
reqdir, callsperbatch, exportlist=NULL) {
- if (is.null(exportlist)) exportlist <- getOption(".swift.exports")
+ if (is.null(exportlist)) exportlist <- getOption(".swift.session")
+ if (is.null(exportlist)) {
+ exportlist <- newSession()
+ options(.swift.session = exportlist)
+ }
# Write the function call info out to cbatch.?.RData files in reqdir
# in batches of size specified by callsperbatch
# returns the number of batches written
@@ -208,7 +212,7 @@
return(rlist)
}
-chooseBatchSize <- function (numargs, server) {
+chooseBatchSize <- function (numargs, server, quiet=FALSE) {
# Automatic selection of worker count
# start with 1 batch per core
wc <- workerCount(server)
Modified: SwiftApps/SwiftR/Swift/R/Export.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Export.R 2011-02-04 18:42:48 UTC (rev 4061)
+++ SwiftApps/SwiftR/Swift/R/Export.R 2011-02-07 17:21:08 UTC (rev 4062)
@@ -47,22 +47,37 @@
}
#TODO: not implemented
-swiftExport <- function (..., list=NULL, expFile=NULL) {
+swiftExport <- function (..., list=NULL, file=NULL) {
# List of object names (as R symbols or as strings)
# These will be passed directly to save() to be serialized
# TODO: For now,while testing, must manually specify location
# TODO: choose file location
# expFile <- ???
- save(..., list=list, file=expFile)
- exportList <- getOption(".swift.exports")
- if (is.null(exportList))
- exportList = list() #TODO: start with removeAll command?
- exportList[[length(exportList) + 1]] = c("import", expFile)
- options(.swift.exports=exportList)
+ if (is.null(list))
+ save(..., file=file)
+ else
+ save(..., list=list, file=file)
+ session <- getOption(".swift.session")
+ if (is.null(session))
+ session <- newSession()
+ session$exports[[length(session$exports) + 1]] <- file
+ options(.swift.session=session)
}
+newSession <- function () {
+ exportList= list()
+ exportList$session <- as.integer(runif(1, min=0, max=.Machine$integer.max))
+ exportList$exports <- list()
+ return (exportList)
+}
+
swiftRemoveAll <- function () {
# Cleans up all data in global namespace on workers
- options(.swift.exports=list(c("removeAll")))
+ # This is achieved by starting a new "session"
+ # Use session numbers to track whether a worker has the
+ # correct data
+ # TODO: think about how to clean up data files.
+ # it is hard to know when they can be safely removed
+ options(.swift.session=newSession())
}
Modified: SwiftApps/SwiftR/Swift/R/Workers.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Workers.R 2011-02-04 18:42:48 UTC (rev 4061)
+++ SwiftApps/SwiftR/Swift/R/Workers.R 2011-02-07 17:21:08 UTC (rev 4062)
@@ -66,7 +66,12 @@
if(is.null(hosts))
hosts <- getOption("swift.hosts")
- if(! is.null(hosts) ) {
+ if( is.null(hosts) ) {
+ if (server == "ssh") {
+ error(paste("Need to provide hosts list for ssh server."))
+ }
+ }
+ else {
if (length(hosts) > 1) {
# Concatenate list of hosts
hosts <- paste(hosts, collapse=" ")
@@ -95,7 +100,10 @@
if(is.null(nodes))
nodes <- getOption("swift.nodes")
if(is.null(nodes))
- nodes <- 1 # Default value
+ if (server == "ssh")
+ nodes <- length(strsplit(hosts, " "))
+ else
+ nodes <- 1 # Default value
if(! is.null(nodes) ) {
cmdString <- paste(cmdString, "-n", shQuote(nodes))
}
Modified: SwiftApps/SwiftR/Swift/exec/SwiftRServer.R
===================================================================
--- SwiftApps/SwiftR/Swift/exec/SwiftRServer.R 2011-02-04 18:42:48 UTC (rev 4061)
+++ SwiftApps/SwiftR/Swift/exec/SwiftRServer.R 2011-02-07 17:21:08 UTC (rev 4062)
@@ -7,7 +7,7 @@
cat(Sys.getpid(),file=paste(fifoDir,"/R.pid",sep=""))
-latestInitializer <- "";
+.current.initializer <<- ""
SwiftRFifoServer <- function( fifoBasename )
{
@@ -45,14 +45,12 @@
}
}
-doInit <- function(initializer) {
+doInit <- function(initializer, env) {
#print(sprintf("received initializer=%s latestInitializer=%s\n",
# initializer, latestInitializer));
- if( initializer != latestInitializer) {
- initialExpr <- parse(text=initializer)
- eval(initialExpr)
- latestInitializer <<- initializer
- # print(sprintf("after eval: latestInitializer=%s\n", latestInitializer));
+ if( initializer != .current.initializer) {
+ eval(parse(text=initializer), envir=env)
+ .current.initializer <<- initializer
}
}
@@ -73,16 +71,35 @@
success <- try(load(callBatchFileName));
if (inherits(success, "try-error")) {
failBatch(rcall, success, resultBatchFileName)
- return
+ return()
}
- success <- try(doInit(rcall$initializer))
+ success <- try(loadImports(rcall$imports))
if (inherits(success, "try-error")) {
failBatch(rcall, success, resultBatchFileName)
- return
+ return()
}
- success <- try(loadImports(rcall$imports))
+
+ if (exists(".current.envir", envir=globalenv())) {
+ env = get(".current.envir", envir=globalenv())
+ }
+ else {
+ # Create a new environment to work in
+ if (is.null(importlist) || is.null(importlist$session)) {
+ # If the client provides no info about the environment,
+ # we should create a new environment to work in
+ env = new.env(parent=parent.env(globalenv()))
+ }
+ else {
+ newSession(importlist$session)
+ env = get(".current.envir", envir=globalenv())
+ }
+ }
+ environment(rcall$func) <- env
+
+ success <- try(doInit(rcall$initializer, env))
if (inherits(success, "try-error")) {
failBatch(rcall, success, resultBatchFileName)
+ return()
}
result <- list()
@@ -95,52 +112,61 @@
save(result,file=resultBatchFileName)
}
+newSession <- function (session) {
+ cat("Setting up new session\n", file=stderr())
+ #Setup new session
+ .current.session <<- session
+ # Use an environment as a hash table to track imports
+ .current.imported <<- new.env(hash=T, parent=emptyenv())
+ # Create an environment to store imported items in
+ # and to evaluate apply calls in. Note that the environment
+ # does not point to the global environment as its parent, which means
+ # means that assignments and variable lookusp will not touch the global
+ # environment by default, and will be contained in this session.
+ # However, the <<- assignment operator and the get, assign, etc function
+ # can still access the global enviroment if used in the applied function.
+
+ .current.envir <<- new.env(parent=parent.env(globalenv()))
+
+ .current.initializer <<- ""
+}
+
loadImports <- function (importlist) {
- # importlist is a list of directives to be carried out in sequence.
- # There are two distinct directives which appear in the first position:
- # - removeAll removes all global R objects
- # - import comes with an argument in the second position which is
- # the location of an R file to import
- for (imp in importlist) {
- directive <- imp[1]
- if (directive == "import") {
- # Load the contents of the specified file
- # into the global environment
- file <- imp[2]
- if (!exists(".swift.imported")) {
- .swift.imported <<- new.env(hash=T, parent=emptyenv())
- doImport <- TRUE
- }
- else {
- # check to see if already imported
- doImport <- !exists(file, envir=.swift.imported)
- }
- if (doImport) {
- #TODO: load can fail with warning
- load(file, envir=globalenv())
- # if an error occurs here, assume calling function
- # will catch it
- assign(file, TRUE, envir=.swift.imported)
- cat("Loaded file ", file, "\n")
- }
- else {
- cat("Ignored file ", file, "\n")
- }
+ # First check whether the session id has changed
+ cat(paste("Import list:", importlist$exports, "\n"), file=stderr())
+ cat(paste("New Session:", importlist$session, "\n"), file=stderr())
+ cat(paste("Old Session:", try(get(".current.session", envir=globalenv())),
+ "\n"), file=stderr())
+ doSetup <- FALSE
+ if (!is.null(importlist)) {
+ if (exists(".current.session", envir=globalenv())) {
+ if (importlist$session != .current.session)
+ doSetup <- TRUE
}
- else if (directive == "removeAll") {
- # delete all visible objects in global environment
- # TODO: need to record that we did this removeAll
- cat("Deleting ", ls(envir=globalenv()))
- #TODO: this fails because it deletes the functions in
- # this file from the environment
- rm(list=ls(envir=globalenv()))
- # reset record of imported items
- if (exists(".swift.imported", envir=globalenv()))
- rm(.swift.imported, envir=globalenv())
- cat("Deleted all")
+ else {
+ doSetup <- TRUE
}
+ }
+ if (doSetup) {
+ newSession(importlist$session)
+ }
+
+ for (file in importlist$exports) {
+ # Load the contents of the specified file
+ # into the global environment
+
+ cat("File: ", file, "\n")
+ # check to see if already imported
+ if (!exists(file, envir=.current.imported)) {
+ #TODO: load can fail with warning
+ load(file, envir=.current.envir)
+ # if an error occurs here, assume calling function
+ # will catch it
+ assign(file, TRUE, envir=.current.imported)
+ cat("Loaded file ", file, "\n")
+ }
else {
- stop(paste("Invalid import directive", directive))
+ cat("Ignored file ", file, "\n")
}
}
}
More information about the Swift-commit
mailing list