[Swift-commit] r4057 - SwiftApps/SwiftR/Swift/R
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Thu Feb 3 15:40:13 CST 2011
Author: tga
Date: 2011-02-03 15:40:12 -0600 (Thu, 03 Feb 2011)
New Revision: 4057
Modified:
SwiftApps/SwiftR/Swift/R/Apply.R
SwiftApps/SwiftR/Swift/R/Export.R
SwiftApps/SwiftR/Swift/R/Library.R
Log:
Further refactoring of Apply module to make swiftapply less monolithic and more readable.
Modified: SwiftApps/SwiftR/Swift/R/Apply.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Apply.R 2011-02-03 20:19:49 UTC (rev 4056)
+++ SwiftApps/SwiftR/Swift/R/Apply.R 2011-02-03 21:40:12 UTC (rev 4057)
@@ -1,7 +1,11 @@
-# Modes for directories and files created by SwiftR
+
+
+# The Modes for directories and files created by SwiftR
# We only give permissions to the current user.
kDIR_MODE <- "700"
kFILE_MODE <- "600"
+swiftapplyScript <- system.file(package="Swift","exec/swiftapply.swift")
+RunSwiftScript <- system.file(package="Swift","exec/RunSwiftScript.sh")
swiftapply <- function( func, arglists,
server=NULL,
@@ -15,110 +19,35 @@
quiet=FALSE)
{
# Set Swift default options if not passed as keywords or pre-set by user
- if(is.null(server))
- server <- getOption("swift.server")
- if(is.null(server))
- server<-"local"
+ if(is.null(server)) server <- getOption("swift.server")
+ if(is.null(server)) server<-"local"
- if(is.null(callsperbatch))
- callsperbatch <- getOption("swift.callsperbatch")
+ if(is.null(callsperbatch)) callsperbatch <- getOption("swift.callsperbatch")
+ if(is.null(callsperbatch))
+ callsperbatch <- chooseBatchSize(length(arglists), server)
- if(is.null(callsperbatch)) {
- # auto batching:
- # start with 1 batch per core
- wc <- workerCount(server)
- if (is.null(wc)) {
- if (! quiet )
- cat(paste("Information about current server of type",
- server, "not found"))
- callsperbatch <- 1
- }
- else {
- batches_per_worker <- getOption("swift.batchesperworker")
- if (is.null(batches_per_worker))
- batches_per_worker <- 1
- target_batches <- wc * batches_per_worker
- # We want to have at least batches_per_worker batches of calls
- # for each worker. If it doesn't divide evenly, some will get
- # batches_per_worker + 1 batches.
- # callsperbatch < 1 doesn't make sense, hence the use of max
- callsperbatch = max(1, floor(length(arglists)/target_batches))
-# if (! quiet )
-# cat(paste("Selected callsperbatch of", callsperbatch,
-# "automatically, based on worker count of",wc,
-# "and arg count of ", length(arglists)))
- }
- }
- #cat("Got ", length(arglists), " arguments with batching factor ", callsperbatch)
+ # service: send requests to Swift service loop via fifo
+ # script: run swift for each request, via RunSwiftScript.sh (broken)
+ # manual: for testing, let user run remote R server manually
+ if(is.null(runmode)) runmode <- getOption("swift.runmode")
+ if(is.null(runmode)) runmode <- "service"
- if(is.null(runmode))
- runmode <- getOption("swift.runmode")
- if(is.null(runmode))
- runmode <- "service"
- # service: send requests to Swift service loop via fifo
- # script: run swift for each request, via RunSwiftScript.sh (currently broken)
- # manual: for testing, let user run remote R server manually
-
- # Build a list of "Library" statements for all of the libraries
- # already specified through swiftLibrary commands.
- lib <- getOption(".swift.packages")
- if (!is.null(lib)) {
- # library statements
- stmts <- lapply(lib,
- function (libcmd) {
- verb <- libcmd[1];
- if (verb == "library") {
- return (sprintf("library(%s);", libcmd[2]));
- }
- else if (verb == "detach") {
- return (sprintf("try(detach(package:%s),silent=T);",
- libcmd[2]));
- }
- else {
- error(paste("invalid verb ", verb, " in ",
- ".swift.packages option"))
- }
- })
- libstr = paste(stmts, collapse=" ")
- cat("libstr: ", libstr)
- }
- else {
- libstr = ""
- }
-
-
- if(is.null(initialexpr))
- initialexpr <- getOption("swift.initialexpr")
+ if(is.null(initialexpr)) initialexpr <- getOption("swift.initialexpr")
# Have library imports before other expressions
- initialexpr <- paste(libstr, initialexpr, collapse=" ");
+ initialexpr <- paste(buildLibStr(), initialexpr, collapse=" ");
- if(is.null(workerhosts))
- workerhosts <- getOption("swift.workerhosts")
- if(is.null(workerhosts))
- workerhosts <- "localhost";
+ if(is.null(workerhosts)) workerhosts <- getOption("swift.workerhosts")
+ if(is.null(workerhosts)) workerhosts <- "localhost";
- if(is.null(keepwork))
- keepwork <- getOption("swift.keepwork")
- if(is.null(keepwork))
- keepwork <- FALSE;
+ if(is.null(keepwork)) keepwork <- getOption("swift.keepwork")
+ if(is.null(keepwork)) keepwork <- FALSE;
- if(is.null(tmpdir))
- tmpdir <- getOption("swift.tmpdir")
- if(is.null(tmpdir))
- tmpdir <- Sys.getenv("SWIFTR_TMP");
+ if(is.null(tmpdir)) tmpdir <- chooseTmpDir()
- if(tmpdir=="") {
- if(is.null(tmpdir))
- tmpdir <- getOption("swift.tmpdir")
- tmpdir <- "/tmp";
- }
+ if(is.null(quiet)) quiet <- getOption("swift.quiet")
+ if(is.null(quiet)) quiet <- FALSE
- if(is.null(quiet)) {
- quiet <- getOption("swift.quiet")
- if(is.null(quiet))
- quiet <- FALSE
- }
-
+ # Inform user of some call info
if (! quiet) {
cat("\nswiftapply to ", length(arglists), " arg lists.\n")
cat("\nSwift properties:\n")
@@ -132,15 +61,14 @@
# Execute the calls in batches
reqdir <- setupRequestDir(tmpdir=tmpdir)
- if (! quiet) {
- cat("Swift request is in",reqdir,"\n")
- }
-
nbatches <- writeRequestBatches(func, arglists, initialexpr,
reqdir, callsperbatch)
- swiftapplyScript <- system.file(package="Swift","exec/swiftapply.swift")
- if( runmode == "manual" ) { # Prompt for return (empty line) to continue; assumes user ran a manual R to process the call.
+ if (! quiet) cat("Swift request written to: ",reqdir,"\n")
+
+ if( runmode == "manual" ) {
+ # Prompt for return (empty line) to continue; assumes user ran a
+ # manual R to process the call.
cat("Manual Swift Run:\n run dir: ", getwd(), "/", reqdir,"\n\n")
cat(" swift script: ", RunSwiftScript, "\n")
cat(" server: ", server,"\n")
@@ -150,11 +78,10 @@
readLines(n=1)
}
else if (runmode == "script") {
- RunSwiftScript <- system.file(package="Swift","exec/RunSwiftScript.sh")
- system(paste(RunSwiftScript,reqdir,server,swiftapplyScript,"\"",workerhosts,"\""))
+ system(paste(RunSwiftScript,reqdir,server,swiftapplyScript,
+ shQuote(workerhosts)))
}
- else { # runmode == "service" # FIXME: check and post error if not "service"
-
+ else if (runmode == "service") {
# Send request to service
user <- Sys.info()[["user"]]
swiftServerDir = file.path(tmpdir,user,"SwiftR",
@@ -162,74 +89,24 @@
requestPipeName=file.path(swiftServerDir,"requestpipe")
resultPipeName=file.path(swiftServerDir,"resultpipe")
-
- # fifo will block irrecoverably if there is no reader on the
- # other end of the requestPipe. This is bad. The swift worker
- # script is responsible for deleting the request pipe when it
- # shuts down, so we know if the requestPipe still exists there
- # should still be a worker (or the worker crashed in a funny way).
- if (file.exists(requestPipeName)) {
- # there is a race condition here if the fifo disappears in
- # between checking for existence and opening the fifo, but
- # the timeout will catch that unlikely case
- writeTimeout <- 4000
- success <- writeFifo(requestPipeName,paste(reqdir,"\n",sep=""),
- timeout=writeTimeout)
- if (! success) {
- stop(paste("timeout of", writeTimeout,
- "ms exceeded when attempting to",
- "rendezvous with swift server: maybe it is not running or",
- "it has crashed"))
- }
-
- # Wait for reply from service
- res <- readFifo(resultPipeName, timeout=timeout)
- if ((! length(res) == 0) && is.na(res)) {
- stop(paste("Timeout of ", timeout, "ms exceeded when waiting",
- "for response from swift server"))
- }
-
- # Check that the message was correct
- successMsg <- "done"
- if (res[[1]] != successMsg)
- stop(paste("Got unexpected message '",
- paste(res, collapse="\n"),"' on fifo ",
- "aborting job", sep=""))
+ # Try sending: this function will cause error if it fails
+ sendServiceRequest(requestPipeName, reqdir, server)
+
+ res <- getServiceResponse(resultPipeName, timeout)
+ # Check that the message was correct
+ if (res[[1]] != "done") {
+ stop(paste("Got unexpected message '",
+ paste(res, collapse="\n"),"' on fifo ",
+ "aborting job", sep=""))
}
- else {
- stop(paste("Have you run swiftInit?\n",
- "It appears that no SwiftR servers of type", server,
- "are running, as no request pipe exists in",
- swiftServerDir))
- }
}
+ else {
+ stop(paste("Invalid runmode", runmode))
+ }
# Fetch the batch results
-
- rno <- 1
- rlist <- list()
- for(batch in 1:nbatches) {
- result <- NULL
- load(file.path(reqdir,
- paste("/rbatch.",as.character(batch),".Rdata",sep="")))
- nresults <- length(result)
- for(r in 1:nresults) {
- rlist[[rno]] <- result[[r]]
-if(class(result[[r]]) == "try-error") {
-cat("ERROR in eval: ", result[[r]], "\n");
+ return(fetchBatchResults(reqdir, nbatches, arglists, keepwork))
}
- #DB cat("swiftapply: result rno=",rno,":\n") # FIXME: for logging
- #DB cat(rlist[[rno]]@output$gradient,"\n")
- rno <- rno + 1
- }
- }
- names(rlist) = names(arglists)
- if( ! keepwork ) {
- cat("Removing ", reqdir, "\n")
- unlink(reqdir,recursive=TRUE)
- }
- return(rlist)
-}
swiftLapply <- function( tlist, func, ... )
{
@@ -299,3 +176,111 @@
}
return (batch - 1)
}
+
+fetchBatchResults <- function (reqdir, nbatches, arglists, keepwork) {
+ rno <- 1
+ rlist <- list()
+ for(batch in 1:nbatches) {
+ # The result in the file will be named "result"
+ result <- NULL
+ load(file.path(reqdir,
+ paste("rbatch.",as.character(batch),".Rdata",sep="")))
+ nresults <- length(result)
+ for(r in 1:nresults) {
+ rlist[[rno]] <- result[[r]]
+ if(inherits(result[[r]], "try-error")) {
+ cat("ERROR in eval: ", result[[r]], "\n");
+ }
+ #DB cat("swiftapply: result rno=",rno,":\n") # FIXME: for logging
+ #DB cat(rlist[[rno]]@output$gradient,"\n")
+ rno <- rno + 1
+ }
+ }
+ names(rlist) = names(arglists)
+ if( ! keepwork ) {
+ cat("Removing ", reqdir, "\n")
+ unlink(reqdir,recursive=TRUE)
+ }
+ return(rlist)
+}
+
+chooseBatchSize <- function (numargs, server) {
+ # Automatic selection of worker count
+ # start with 1 batch per core
+ wc <- workerCount(server)
+ if (is.null(wc)) {
+ if (! quiet )
+ cat(paste("Information about current server of type",
+ server, "not found"))
+ callsperbatch <- 1
+ }
+ else {
+ batches_per_worker <- getOption("swift.batchesperworker")
+ if (is.null(batches_per_worker))
+ batches_per_worker <- 1
+ target_batches <- wc * batches_per_worker
+ # We want to have at least batches_per_worker batches of calls
+ # for each worker. If it doesn't divide evenly, some will get
+ # batches_per_worker + 1 batches.
+ # callsperbatch < 1 doesn't make sense, hence the use of max
+ callsperbatch = max(1, floor(numargs/target_batches))
+ }
+}
+
+chooseTmpDir <- function () {
+ # Choose temporary directory based on global settings
+ # with several fallbacks
+ tmpdir <- getOption("swift.tmpdir")
+ if(is.null(tmpdir))
+ tmpdir <- Sys.getenv("SWIFTR_TMP");
+
+ if(tmpdir=="") {
+ tmpdir <- "/tmp";
+ }
+}
+
+sendServiceRequest <- function (requestPipeName, reqdir, server=NULL) {
+ # fifo will block irrecoverably if there is no reader on the
+ # other end of the requestPipe. This is bad. The swift worker
+ # script is responsible for deleting the request pipe when it
+ # shuts down, so we know if the requestPipe still exists there
+ # should still be a worker (or the worker crashed in a funny way).
+ if (file.exists(requestPipeName)) {
+ # there is a race condition here if the fifo disappears in
+ # between checking for existence and opening the fifo, but
+ # the timeout will catch that unlikely case
+ writeTimeout <- 4000
+ success <- writeFifo(requestPipeName,paste(reqdir,"\n",sep=""),
+ timeout=writeTimeout)
+ if (! success) {
+ stop(paste("timeout of", writeTimeout,
+ "ms exceeded when attempting to",
+ "rendezvous with SwiftR server of type", server, ".\n",
+ "Maybe it is not running or it has crashed"))
+ }
+ }
+ else {
+ stop(paste("Have you run swiftInit?\n",
+ "It appears that no SwiftR servers of type", server,
+ "are running, as no request pipe exists at",
+ requestPipeName))
+ }
+
+}
+
+getServiceResponse <- function (resultPipeName, timeout) {
+ # Blocking wait for response from service on a pipe
+ # returns the string response. Raises error if timeout occurs.
+
+ # Wait for reply from service
+ res <- readFifo(resultPipeName, timeout=timeout)
+ if (is.na(res)) {
+ stop(paste("Timeout of ", timeout, "ms exceeded when waiting",
+ "for response from swift server"))
+ }
+ if (length(res) == 0) {
+ stop(paste("Zero length response on named pipe ", resultPipeName))
+ }
+ return (res)
+}
+
Modified: SwiftApps/SwiftR/Swift/R/Export.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Export.R 2011-02-03 20:19:49 UTC (rev 4056)
+++ SwiftApps/SwiftR/Swift/R/Export.R 2011-02-03 21:40:12 UTC (rev 4057)
@@ -1,4 +1,44 @@
-
+# EXPORT MECHANISM:
+#==================
+# We want the protocol to support workers coming and going from the
+# cluster. We also want to be robust to worker failures. This
+# means that we can't rely on state being retained at workers.
+# I.e. If a new worker is added, we need it to receive all of the
+# relevant exported data. However, for the purposes of efficiency,
+# we don't want workers to have to repeatedly reload data they have
+# already loaded.
+#
+# In order to achieve this, we will have the workers perform imports
+# at apply time. The worker will receive information as a part of the
+#
+#
+#
+#
+# When swiftExport() is called:
+# * Save all of the objects to a file
+# - Q:Can we assume its on a shared filesystem?
+# * MAYBE: put the file somewhere accessible by all workers
+# * append the file's path to a stored list of files
+# * MAYBE LATER: asynchronously start swift tasks to distribute
+# the data, to get a headstart on the work
+# When swiftApply() is called:
+# - Inform all worker R processes through some mechanism (TBD)
+# of the list of all exported data files. (extra file?)
+# - The worker goes through the list in order to ensure that name
+# conflicts are resolved in the way you would expect.
+# If it has loaded a file previously, it does nothing.
+# If it hasn't loaded the file, it accesses the file (mechanism TBD)
+# and loads it into the global namespace.
+# data
+#
+# swiftRemoveAll():
+# delete the list of exported files from above and have a single
+# entry which instructs workers to delete all their data. This is
+# mainly important because it eliminates possibility of using huge
+# amounts of memory.
+# swiftRemove()
+# Implementation of this function is somewhat problematic, as it only
+# partially undoes previous work
swiftRemoveAll <- function () {
# Cleans up all data in global namespace on workers
options(.swift.exports=list(c("removeAll")))
@@ -7,6 +47,7 @@
swiftExportAll <- function () {
+ # Exports all functions and data in global environment
return (swiftExport(list=ls(globalenv())))
}
@@ -15,40 +56,6 @@
# List of object names (as R symbols or as strings)
# These will be passed directly to save() to be serialized
- #TODO:
- # We want the protocol to support workers coming and going from the
- # cluster. We want to rely on less implicit state. Ie. if we run
- # swiftExport, then a worker is added to cluster, then an apply call
- # is run, we want the worker to include the exported objects.
- # One approach to doing this is to enforce the export at apply
- # time.
- #
- #
- # When swiftExport() is called:
- # * Save all of the objects to a file
- # - Q:Can we assume its on a shared filesystem?
- # * MAYBE: put the file somewhere accessible by all workers
- # * append the file's path to a stored list of files
- # * MAYBE LATER: asynchronously start swift tasks to distribute
- # the data, to get a headstart on the work
- # When swiftApply() is called:
- # - Inform all worker R processes through some mechanism (TBD)
- # of the list of all exported data files. (extra file?)
- # - The worker goes through the list in order to ensure that name
- # conflicts are resolved in the way you would expect.
- # If it has loaded a file previously, it does nothing.
- # If it hasn't loaded the file, it accesses the file (mechanism TBD)
- # and loads it into the global namespace.
- # data
- #
- # swiftRemoveAll():
- # delete the list of exported files from above and have a single
- # entry which instructs workers to delete all their data. This is
- # mainly important because it eliminates possibility of using huge
- # amounts of memory.
- # swiftRemove()
- # Implementation of this function is somewhat problematic, as it only
- # partially undoes previous work
#Pseudocode:
Modified: SwiftApps/SwiftR/Swift/R/Library.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Library.R 2011-02-03 20:19:49 UTC (rev 4056)
+++ SwiftApps/SwiftR/Swift/R/Library.R 2011-02-03 21:40:12 UTC (rev 4057)
@@ -94,3 +94,32 @@
options(.swift.packages=plist)
}
+
+buildLibStr <- function () {
+ # Build a list of "Library" statements for all of the libraries
+ # already specified through swiftLibrary commands.
+ lib <- getOption(".swift.packages")
+ if (!is.null(lib)) {
+ # library statements
+ stmts <- lapply(lib,
+ function (libcmd) {
+ verb <- libcmd[1];
+ if (verb == "library") {
+ return (sprintf("library(%s);", libcmd[2]));
+ }
+ else if (verb == "detach") {
+ return (sprintf("try(detach(package:%s),silent=T);",
+ libcmd[2]));
+ }
+ else {
+ error(paste("invalid verb ", verb, " in ",
+ ".swift.packages option"))
+ }
+ })
+ libstr = paste(stmts, collapse=" ")
+ }
+ else {
+ libstr = ""
+ }
+ return (libstr)
+}
More information about the Swift-commit
mailing list