[Swift-commit] r4055 - in SwiftApps/SwiftR: . Swift Swift/R
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Thu Feb 3 14:00:40 CST 2011
Author: tga
Date: 2011-02-03 14:00:40 -0600 (Thu, 03 Feb 2011)
New Revision: 4055
Added:
SwiftApps/SwiftR/Swift/NAMESPACE
SwiftApps/SwiftR/Swift/R/Apply.R
SwiftApps/SwiftR/Swift/R/Export.R
SwiftApps/SwiftR/Swift/R/Library.R
Removed:
SwiftApps/SwiftR/Swift/R/Swift.R
Modified:
SwiftApps/SwiftR/Makefile
Log:
Refactored R code a bit:
* Added NAMESPACE declaration so that only functions intended to be exported
will be exported
* Split Swift.R into Export.R, Library.R and Apply.R as it was getting rather logn
* Refactored swiftapply to use some subprocedures.
Modified: SwiftApps/SwiftR/Makefile
===================================================================
--- SwiftApps/SwiftR/Makefile 2011-02-03 18:37:26 UTC (rev 4054)
+++ SwiftApps/SwiftR/Makefile 2011-02-03 20:00:40 UTC (rev 4055)
@@ -3,6 +3,7 @@
PKG_FILES = $(shell find Swift/R -name '*.R'; find Swift/man -name '*.Rd'; find Swift/exec/ -type f -not -path '*/.svn*'; find Swift/inst/ -type f -not -path '*/.svn*')
PKG_FILES += Swift/DESCRIPTION
+PKG_FILES += Swift/NAMESPACE
# Extract the version number from the R package description file
Added: SwiftApps/SwiftR/Swift/NAMESPACE
===================================================================
--- SwiftApps/SwiftR/Swift/NAMESPACE (rev 0)
+++ SwiftApps/SwiftR/Swift/NAMESPACE 2011-02-03 20:00:40 UTC (rev 4055)
@@ -0,0 +1,10 @@
+export(swiftapply)
+export(swiftLapply)
+export(swiftInit)
+export(swiftShutdown)
+export(swiftLibrary)
+export(swiftDetach)
+
+export(runAllSwiftTests)
+export(basicSwiftTest)
+exportPattern("^swiftTest")
Copied: SwiftApps/SwiftR/Swift/R/Apply.R (from rev 4054, SwiftApps/SwiftR/Swift/R/Swift.R)
===================================================================
--- SwiftApps/SwiftR/Swift/R/Apply.R (rev 0)
+++ SwiftApps/SwiftR/Swift/R/Apply.R 2011-02-03 20:00:40 UTC (rev 4055)
@@ -0,0 +1,301 @@
+# Modes for directories and files created by SwiftR
+# We only give permissions to the current user.
+kDIR_MODE <- "700"
+kFILE_MODE <- "600"
+
+swiftapply <- function( func, arglists,
+ server=NULL,
+ callsperbatch=NULL,
+ runmode=NULL,
+ initialexpr=NULL,
+ workerhosts=NULL,
+ keepwork=NULL,
+ tmpdir=NULL,
+ timeout=NULL,
+ quiet=FALSE)
+{
+ # Set Swift default options if not passed as keywords or pre-set by user
+ if(is.null(server))
+ server <- getOption("swift.server")
+ if(is.null(server))
+ server<-"local"
+
+ if(is.null(callsperbatch))
+ callsperbatch <- getOption("swift.callsperbatch")
+
+ if(is.null(callsperbatch)) {
+ # auto batching:
+ # start with 1 batch per core
+ wc <- workerCount(server)
+ if (is.null(wc)) {
+ if (! quiet )
+ cat(paste("Information about current server of type",
+ server, "not found"))
+ callsperbatch <- 1
+ }
+ else {
+ batches_per_worker <- getOption("swift.batchesperworker")
+ if (is.null(batches_per_worker))
+ batches_per_worker <- 1
+ target_batches <- wc * batches_per_worker
+ # We want to have at least batches_per_worker batches of calls
+ # for each worker. If it doesn't divide evenly, some will get
+ # batches_per_worker + 1 batches.
+ # callsperbatch < 1 doesn't make sense, hence the use of max
+ callsperbatch = max(1, floor(length(arglists)/target_batches))
+# if (! quiet )
+# cat(paste("Selected callsperbatch of", callsperbatch,
+# "automatically, based on worker count of",wc,
+# "and arg count of ", length(arglists)))
+ }
+ }
+ #cat("Got ", length(arglists), " arguments with batching factor ", callsperbatch)
+
+ if(is.null(runmode))
+ runmode <- getOption("swift.runmode")
+ if(is.null(runmode))
+ runmode <- "service"
+ # service: send requests to Swift service loop via fifo
+ # script: run swift for each request, via RunSwiftScript.sh (currently broken)
+ # manual: for testing, let user run remote R server manually
+
+ # Build a list of "Library" statements for all of the libraries
+ # already specified through swiftLibrary commands.
+ lib <- getOption(".swift.packages")
+ if (!is.null(lib)) {
+ # library statements
+ stmts <- lapply(lib,
+ function (libcmd) {
+ verb <- libcmd[1];
+ if (verb == "library") {
+ return (sprintf("library(%s);", libcmd[2]));
+ }
+ else if (verb == "detach") {
+ return (sprintf("try(detach(package:%s),silent=T);",
+ libcmd[2]));
+ }
+ else {
+ error(paste("invalid verb ", verb, " in ",
+ ".swift.packages option"))
+ }
+ })
+ libstr = paste(stmts, collapse=" ")
+ cat("libstr: ", libstr)
+ }
+ else {
+ libstr = ""
+ }
+
+
+ if(is.null(initialexpr))
+ initialexpr <- getOption("swift.initialexpr")
+ # Have library imports before other expressions
+ initialexpr <- paste(libstr, initialexpr, collapse=" ");
+
+ if(is.null(workerhosts))
+ workerhosts <- getOption("swift.workerhosts")
+ if(is.null(workerhosts))
+ workerhosts <- "localhost";
+
+ if(is.null(keepwork))
+ keepwork <- getOption("swift.keepwork")
+ if(is.null(keepwork))
+ keepwork <- FALSE;
+
+ if(is.null(tmpdir))
+ tmpdir <- getOption("swift.tmpdir")
+ if(is.null(tmpdir))
+ tmpdir <- Sys.getenv("SWIFTR_TMP");
+
+ if(tmpdir=="") {
+ if(is.null(tmpdir))
+ tmpdir <- getOption("swift.tmpdir")
+ tmpdir <- "/tmp";
+ }
+
+ if(is.null(quiet)) {
+ quiet <- getOption("swift.quiet")
+ if(is.null(quiet))
+ quiet <- FALSE
+ }
+
+ if (! quiet) {
+ cat("\nswiftapply to ", length(arglists), " arg lists.\n")
+ cat("\nSwift properties:\n")
+ cat(" server =", server,"\n")
+ cat(" callsperbatch =", callsperbatch,"\n")
+ cat(" runmode =", runmode,"\n")
+ cat(" tmpdir =", tmpdir,"\n")
+ cat(" workerhosts =", workerhosts,"\n")
+ cat(" initialexpr =", initialexpr,"\n\n")
+ }
+
+ # Execute the calls in batches
+ reqdir <- setupRequestDir(tmpdir=tmpdir)
+ if (! quiet) {
+ cat("Swift request is in",reqdir,"\n")
+ }
+
+ nbatches <- writeRequestBatches(func, arglists, initialexpr,
+ reqdir, callsperbatch)
+
+ swiftapplyScript <- system.file(package="Swift","exec/swiftapply.swift")
+ if( runmode == "manual" ) { # Prompt for return (empty line) to continue; assumes user ran a manual R to process the call.
+ cat("Manual Swift Run:\n run dir: ", getwd(), "/", reqdir,"\n\n")
+ cat(" swift script: ", RunSwiftScript, "\n")
+ cat(" server: ", server,"\n")
+ cat(" swiftapplyScript: ", swiftapplyScript,"\n")
+ cat(" Use RunAllR.sh to process and press return when complete:")
+ system(paste("cp ", system.file(package="Swift","exec/RunAllR.sh"), reqdir))
+ readLines(n=1)
+ }
+ else if (runmode == "script") {
+ RunSwiftScript <- system.file(package="Swift","exec/RunSwiftScript.sh")
+ system(paste(RunSwiftScript,reqdir,server,swiftapplyScript,"\"",workerhosts,"\""))
+ }
+ else { # runmode == "service" # FIXME: check and post error if not "service"
+
+ # Send request to service
+ user <- Sys.info()[["user"]]
+ swiftServerDir = file.path(tmpdir,user,"SwiftR",
+ paste("swift.",server,sep=""))
+
+ requestPipeName=file.path(swiftServerDir,"requestpipe")
+ resultPipeName=file.path(swiftServerDir,"resultpipe")
+
+ # fifo will block irrecoverably if there is no reader on the
+ # other end of the requestPipe. This is bad. The swift worker
+ # script is responsible for deleting the request pipe when it
+ # shuts down, so we know if the requestPipe still exists there
+ # should still be a worker (or the worker crashed in a funny way).
+ if (file.exists(requestPipeName)) {
+ # there is a race condition here if the fifo disappears in
+ # between checking for existence and opening the fifo, but
+ # the timeout will catch that unlikely case
+ writeTimeout <- 4000
+ success <- writeFifo(requestPipeName,paste(reqdir,"\n",sep=""),
+ timeout=writeTimeout)
+ if (! success) {
+ stop(paste("timeout of", writeTimeout,
+ "ms exceeded when attempting to",
+ "rendezvous with swift server: maybe it is not running or",
+ "it has crashed"))
+ }
+
+ # Wait for reply from service
+ res <- readFifo(resultPipeName, timeout=timeout)
+ if ((! length(res) == 0) && is.na(res)) {
+ stop(paste("Timeout of ", timeout, "ms exceeded when waiting",
+ "for response from swift server"))
+ }
+
+ # Check that the message was correct
+ successMsg <- "done"
+ if (res[[1]] != successMsg)
+ stop(paste("Got unexpected message '",
+ paste(res, collapse="\n"),"' on fifo ",
+ "aborting job", sep=""))
+ }
+ else {
+ stop(paste("Have you run swiftInit?\n",
+ "It appears that no SwiftR servers of type", server,
+ "are running, as no request pipe exists in",
+ swiftServerDir))
+ }
+ }
+
+ # Fetch the batch results
+
+ rno <- 1
+ rlist <- list()
+ for(batch in 1:nbatches) {
+ result <- NULL
+ load(file.path(reqdir,
+ paste("/rbatch.",as.character(batch),".Rdata",sep="")))
+ nresults <- length(result)
+ for(r in 1:nresults) {
+ rlist[[rno]] <- result[[r]]
+if(class(result[[r]]) == "try-error") {
+cat("ERROR in eval: ", result[[r]], "\n");
+}
+ #DB cat("swiftapply: result rno=",rno,":\n") # FIXME: for logging
+ #DB cat(rlist[[rno]]@output$gradient,"\n")
+ rno <- rno + 1
+ }
+ }
+ names(rlist) = names(arglists)
+ if( ! keepwork ) {
+ cat("Removing ", reqdir, "\n")
+ unlink(reqdir,recursive=TRUE)
+ }
+ return(rlist)
+}
+
+swiftLapply <- function( tlist, func, ... )
+{
+ if (length(tlist) == 0) return(tlist)
+ arglists <- list()
+ narglists <- length(tlist)
+ for(i in 1 : narglists) {
+ arglists[[i]] <- list(tlist[[i]], ...);
+ }
+ names(arglists) = names(tlist)
+ swiftapply(func, arglists)
+}
+
+
+setupRequestDir <- function (tmpdir) {
+ # Initialize globals if first call in this invocation of R
+ # Use the options mechanism so that setting is tied
+ # to lifetime of this R process. If we stored this in a global
+ # variable, it is possible that, say, directory requests.55555/R0000005
+ # is created, the user exits the session without saving, and therefore
+ # the swift.requestid counter is out of step with the file system
+ requestdirbase = getOption("swift.requestdirbase")
+ if(!is.null(requestdirbase)) {
+ requestid = getOption("swift.requestid") + 1;
+ }
+ else {
+ requestdirbase = sprintf("%s/%s/SwiftR/requests.P%.5d",tmpdir,
+ Sys.info()[["user"]],Sys.getpid())
+ dir.create(requestdirbase,recursive=TRUE,showWarnings=FALSE,
+ mode=kDIR_MODE)
+ options(swift.requestdirbase=requestdirbase)
+ requestid = 0;
+ }
+ options(swift.requestid=requestid)
+ reqdir = sprintf("%s/R%.7d",requestdirbase,requestid)
+ dir.create(reqdir,recursive=TRUE,showWarnings=FALSE,mode=kDIR_MODE)
+ return (reqdir)
+}
+
+
+writeRequestBatches <- function (func, arglists, initialexpr,
+ reqdir, callsperbatch) {
+ # Write the function call info out to cbatch.?.RData files in reqdir
+ # in batches of size specified by callsperbatch
+ # returns the number of batches written
+ narglists <- length(arglists) # number of arglists to process
+ batch <- 1 # Next arglist batch number to fill
+ arglist <- 1 # Next arglist number to insert
+ while(arglist <= narglists) {
+ arglistsleft <- narglists - arglist + 1
+ if(arglistsleft >= callsperbatch) {
+ batchsize <- callsperbatch
+ }
+ else {
+ batchsize <- arglistsleft
+ }
+ arglistbatch <- list()
+ for(i in 1 : batchsize) {
+ arglistbatch[[i]] <- arglists[[arglist]]
+ arglist <- arglist +1
+ }
+ rcall <- list(initializer=initialexpr,func=func,arglistbatch=arglistbatch)
+ save(rcall,
+ file=file.path(reqdir,
+ paste("cbatch.",as.character(batch),".Rdata",sep="")))
+ batch <- batch + 1;
+ }
+ return (batch - 1)
+}
Property changes on: SwiftApps/SwiftR/Swift/R/Apply.R
___________________________________________________________________
Name: svn:mergeinfo
+
Added: SwiftApps/SwiftR/Swift/R/Export.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Export.R (rev 0)
+++ SwiftApps/SwiftR/Swift/R/Export.R 2011-02-03 20:00:40 UTC (rev 4055)
@@ -0,0 +1,64 @@
+
+swiftRemoveAll <- function () {
+ # Cleans up all data in global namespace on workers
+ options(.swift.exports=list(c("removeAll")))
+}
+
+
+
+swiftExportAll <- function () {
+ return (swiftExport(list=ls(globalenv())))
+}
+
+#TODO: not implemented
+.swiftExport <- function (..., list=NULL) {
+ # List of object names (as R symbols or as strings)
+ # These will be passed directly to save() to be serialized
+
+ #TODO:
+ # We want the protocol to support workers coming and going from the
+ # cluster. We want to rely on less implicit state. Ie. if we run
+ # swiftExport, then a worker is added to cluster, then an apply call
+ # is run, we want the worker to include the exported objects.
+ # One approach to doing this is to enforce the export at apply
+ # time.
+ #
+ #
+ # When swiftExport() is called:
+ # * Save all of the objects to a file
+ # - Q:Can we assume its on a shared filesystem?
+ # * MAYBE: put the file somewhere accessible by all workers
+ # * append the file's path to a stored list of files
+ # * MAYBE LATER: asynchronously start swift tasks to distribute
+ # the data, to get a headstart on the work
+ # When swiftApply() is called:
+ # - Inform all worker R processes through some mechanism (TBD)
+ # of the list of all exported data files. (extra file?)
+ # - The worker goes through the list in order to ensure that name
+ # conflicts are resolved in the way you would expect.
+ # If it has loaded a file previously, it does nothing.
+ # If it hasn't loaded the file, it accesses the file (mechanism TBD)
+ # and loads it into the global namespace.
+ # data
+ #
+ # swiftRemoveAll():
+ # delete the list of exported files from above and have a single
+ # entry which instructs workers to delete all their data. This is
+ # mainly important because it eliminates possibility of using huge
+ # amounts of memory.
+ # swiftRemove()
+ # Implementation of this function is somewhat problematic, as it only
+ # partially undoes previous work
+
+
+ #Pseudocode:
+ # TODO: choose file location
+ # expFile <- ???
+ save(..., list=list, file=expFile)
+ exportList <- getOption(".swift.exports")
+ if (is.null(exportList))
+ exportList = list() #TODO: start with removeAll command?
+ exportList[[length(exportList) + 1]] = c("export", expFile)
+ options(.swift.exports=exportList)
+
+}
Added: SwiftApps/SwiftR/Swift/R/Library.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Library.R (rev 0)
+++ SwiftApps/SwiftR/Swift/R/Library.R 2011-02-03 20:00:40 UTC (rev 4055)
@@ -0,0 +1,75 @@
+
+swiftLibrary <- function (packname) {
+ # Add a library to be included on all workers
+ # The package can be provided directly or alternatively
+ # the package name can be provided.
+ # If reset is true, then previously added libraries won't be
+ # reimported on new workers.
+
+ # Check to see if it is a string, if it is not a string
+ # then we will deparse it to get the expression text
+ if (!is.character(substitute(packname))) {
+ # Maybe library was provided unquoted
+ packname <- deparse(substitute(packname))
+ }
+ if (! packname %in% installed.packages()) {
+ # Warn users in case they made a typo
+ warning(paste("Package", packname,
+ "was not a installed package in this instance of R,",
+ "but may be installed on SwiftR workers"))
+ }
+ plist <- getOption(".swift.packages")
+ if (is.null(plist)) {
+ plist <- list()
+ }
+ else {
+ # scan libs for previous detaches and libraries of
+ # this package. Delete all of them
+ plist = Filter(function(packcmd)
+ { return (packcmd[2] != packname);},
+ plist)
+ }
+ # Add the library command to the end of the list of commands to be
+ # executed in slave R instances
+ plist[[length(plist) + 1]] <- c("library", packname)
+
+ options(.swift.packages=plist)
+}
+
+
+swiftDetach <- function (name) {
+ # name is an string or identifier such as "package:OpenMx",
+ # following the same pattern as the built in R detach() function
+ # Detaches a library from workers and ensures that it will no longer be
+ # imported. Note that the effect on dependent imports is a little
+ # messy and behaves differently on existing and new workers.
+ # If package A requires package B, and we import A on a worker,
+ # package B will also be imported on the worker. By detaching package A
+ # we don't also detach B. This contrasts to a fresh worker, where
+ # package B will not be imported.
+ if (!is.character(substitute(name))) {
+ # Maybe package was provided unquoted
+ name <- deparse(substitute(name))
+ }
+
+ #TODO: remove from options(".swift.packages")
+ # Scan through list and remove any attach or detach of this name
+ # add a detach cmd to the end
+ if (substr(name, 1, 8) != "package:")
+ stop("Can only detach packages. Package must be specified as package:PackageName")
+ # Get bit after "package:"
+ packname = substr(name, 9, nchar(name))
+ plist = getOption(".swift.packages")
+ if (is.null(plist)) {
+ plist = list()
+ }
+ else {
+ # Previous export/detach commands are now redundant
+ plist = Filter(function(packcmd)
+ { return (packcmd[2] != packname);},
+ plist)
+ }
+ plist[[length(plist) + 1]] = c("detach", packname)
+
+ options(.swift.packages=plist)
+}
Deleted: SwiftApps/SwiftR/Swift/R/Swift.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Swift.R 2011-02-03 18:37:26 UTC (rev 4054)
+++ SwiftApps/SwiftR/Swift/R/Swift.R 2011-02-03 20:00:40 UTC (rev 4055)
@@ -1,421 +0,0 @@
-# Modes for directories and files created by SwiftR
-# We only give permissions to the current user.
-kDIR_MODE <- "700"
-kFILE_MODE <- "600"
-
-swiftapply <- function( func, arglists,
- server=NULL,
- callsperbatch=NULL,
- runmode=NULL,
- initialexpr=NULL,
- workerhosts=NULL,
- keepwork=NULL,
- tmpdir=NULL,
- timeout=NULL,
- quiet=FALSE)
-{
- # Set Swift default options if not passed as keywords or pre-set by user
- if(is.null(server))
- server <- getOption("swift.server")
- if(is.null(server))
- server<-"local"
-
- if(is.null(callsperbatch))
- callsperbatch <- getOption("swift.callsperbatch")
-
- if(is.null(callsperbatch)) {
- # auto batching:
- # start with 1 batch per core
- wc <- workerCount(server)
- if (is.null(wc)) {
- if (! quiet )
- cat(paste("Information about current server of type",
- server, "not found"))
- callsperbatch <- 1
- }
- else {
- batches_per_worker <- getOption("swift.batchesperworker")
- if (is.null(batches_per_worker))
- batches_per_worker <- 1
- target_batches <- wc * batches_per_worker
- # We want to have at least batches_per_worker batches of calls
- # for each worker. If it doesn't divide evenly, some will get
- # batches_per_worker + 1 batches.
- # callsperbatch < 1 doesn't make sense, hence the use of max
- callsperbatch = max(1, floor(length(arglists)/target_batches))
-# if (! quiet )
-# cat(paste("Selected callsperbatch of", callsperbatch,
-# "automatically, based on worker count of",wc,
-# "and arg count of ", length(arglists)))
- }
- }
- #cat("Got ", length(arglists), " arguments with batching factor ", callsperbatch)
-
- if(is.null(runmode))
- runmode <- getOption("swift.runmode")
- if(is.null(runmode))
- runmode <- "service"
- # service: send requests to Swift service loop via fifo
- # script: run swift for each request, via RunSwiftScript.sh (currently broken)
- # manual: for testing, let user run remote R server manually
-
- # Build a list of "Library" statements for all of the libraries
- # already specified through swiftLibrary commands.
- lib <- getOption(".swift.packages")
- if (!is.null(lib)) {
- # library statements
- stmts <- lapply(lib,
- function (libcmd) {
- verb <- libcmd[1];
- if (verb == "library") {
- return (sprintf("library(%s);", libcmd[2]));
- }
- else if (verb == "detach") {
- return (sprintf("try(detach(package:%s),silent=T);",
- libcmd[2]));
- }
- else {
- error(paste("invalid verb ", verb, " in ",
- ".swift.packages option"))
- }
- })
- libstr = paste(stmts, collapse=" ")
- cat("libstr: ", libstr)
- }
- else {
- libstr = ""
- }
-
-
- if(is.null(initialexpr))
- initialexpr <- getOption("swift.initialexpr")
- # Have library imports before other expressions
- initialexpr <- paste(libstr, initialexpr, collapse=" ");
-
- if(is.null(workerhosts))
- workerhosts <- getOption("swift.workerhosts")
- if(is.null(workerhosts))
- workerhosts <- "localhost";
-
- if(is.null(keepwork))
- keepwork <- getOption("swift.keepwork")
- if(is.null(keepwork))
- keepwork <- FALSE;
-
- if(is.null(tmpdir))
- tmpdir <- getOption("swift.tmpdir")
- if(is.null(tmpdir))
- tmpdir <- Sys.getenv("SWIFTR_TMP");
-
- if(tmpdir=="") {
- if(is.null(tmpdir))
- tmpdir <- getOption("swift.tmpdir")
- tmpdir <- "/tmp";
- }
-
- if(is.null(quiet)) {
- quiet <- getOption("swift.quiet")
- if(is.null(quiet))
- quiet <- FALSE
- }
-
- if (! quiet) {
- cat("\nswiftapply to ", length(arglists), " arg lists.\n")
- cat("\nSwift properties:\n")
- cat(" server =", server,"\n")
- cat(" callsperbatch =", callsperbatch,"\n")
- cat(" runmode =", runmode,"\n")
- cat(" tmpdir =", tmpdir,"\n")
- cat(" workerhosts =", workerhosts,"\n")
- cat(" initialexpr =", initialexpr,"\n\n")
- }
- user <- Sys.info()[["user"]]
-
- # Initialize globals if first call in this invocation of R
- # Use the options mechanism so that setting is tied
- # to lifetime of this R process. If we stored this in a global
- # variable, it is possible that, say, directory requests.55555/R0000005
- # is created, the user exits the session without saving, and therefore
- # the swift.requestid counter is out of step with the file system
- requestdirbase = getOption("swift.requestdirbase")
- if(!is.null(requestdirbase)) {
- requestid = getOption("swift.requestid") + 1;
- }
- else {
- requestdirbase = sprintf("%s/%s/SwiftR/requests.P%.5d",tmpdir,user,Sys.getpid())
- dir.create(requestdirbase,recursive=TRUE,showWarnings=FALSE,
- mode=kDIR_MODE)
- options(swift.requestdirbase=requestdirbase)
- requestid = 0;
- }
- options(swift.requestid=requestid)
-
- # Execute the calls in batches
-
- reqdir = sprintf("%s/R%.7d",requestdirbase,requestid)
- dir.create(reqdir,recursive=TRUE,showWarnings=FALSE,mode=kDIR_MODE)
- if (! quiet) {
- cat("Swift request is in",reqdir,"\n")
- }
-
- narglists <- length(arglists) # number of arglists to process
- batch <- 1 # Next arglist batch number to fill
- arglist <- 1 # Next arglist number to insert
- while(arglist <= narglists) {
- arglistsleft <- narglists - arglist + 1
- if(arglistsleft >= callsperbatch) {
- batchsize <- callsperbatch
- }
- else {
- batchsize <- arglistsleft
- }
- arglistbatch <- list()
- for(i in 1 : batchsize) {
- arglistbatch[[i]] <- arglists[[arglist]]
- arglist <- arglist +1
- }
- rcall <- list(initializer=initialexpr,func=func,arglistbatch=arglistbatch)
- save(rcall,file=paste(reqdir,"/cbatch.",as.character(batch),".Rdata",sep=""))
- batch <- batch + 1;
- }
- nbatches <- batch - 1
- swiftapplyScript <- system.file(package="Swift","exec/swiftapply.swift")
-
- if( runmode == "manual" ) { # Prompt for return (empty line) to continue; assumes user ran a manual R to process the call.
- cat("Manual Swift Run:\n run dir: ", getwd(), "/", reqdir,"\n\n")
- cat(" swift script: ", RunSwiftScript, "\n")
- cat(" server: ", server,"\n")
- cat(" swiftapplyScript: ", swiftapplyScript,"\n")
- cat(" Use RunAllR.sh to process and press return when complete:")
- system(paste("cp ", system.file(package="Swift","exec/RunAllR.sh"), reqdir))
- readLines(n=1)
- }
- else if (runmode == "script") {
- RunSwiftScript <- system.file(package="Swift","exec/RunSwiftScript.sh")
- system(paste(RunSwiftScript,reqdir,server,swiftapplyScript,"\"",workerhosts,"\""))
- }
- else { # runmode == "service" # FIXME: check and post error if not "service"
-
- # Send request to service
-
- swiftServerDir = paste(tmpdir,"/",user,"/SwiftR/swift.",server,sep="")
-
- requestPipeName=paste(swiftServerDir,"/requestpipe",sep="")
- resultPipeName=paste(swiftServerDir,"/resultpipe",sep="")
-
- # fifo will block irrecoverably if there is no reader on the
- # other end of the requestPipe. This is bad. The swift worker
- # script is responsible for deleting the request pipe when it
- # shuts down, so we know if the requestPipe still exists there
- # should still be a worker (or the worker crashed in a funny way).
- if (file.exists(requestPipeName)) {
- # there is a race condition here if the fifo disappears in
- # between checking for existence and opening the fifo, but
- # the timeout will catch that unlikely case
- writeTimeout <- 4000
- success <- writeFifo(requestPipeName,paste(reqdir,"\n",sep=""),
- timeout=writeTimeout)
- if (! success) {
- stop(paste("timeout of", writeTimeout,
- "ms exceeded when attempting to",
- "rendezvous with swift server: maybe it is not running or",
- "it has crashed"))
- }
-
- # Wait for reply from service
- res <- readFifo(resultPipeName, timeout=timeout)
- if ((! length(res) == 0) && is.na(res)) {
- stop(paste("Timeout of ", timeout, "ms exceeded when waiting",
- "for response from swift server"))
- }
-
- # Check that the message was correct
- successMsg <- "done"
- if (res[[1]] != successMsg)
- stop(paste("Got unexpected message '",
- paste(res, collapse="\n"),"' on fifo ",
- "aborting job", sep=""))
- }
- else {
- stop(paste("Have you run swiftInit?\n",
- "It appears that no SwiftR servers of type", server,
- "are running, as no request pipe exists in",
- swiftServerDir))
- }
- }
-
- # Fetch the batch results
-
- rno <- 1
- rlist <- list()
- for(batch in 1:nbatches) {
- result <- NULL
- load(paste(reqdir,"/rbatch.",as.character(batch),".Rdata",sep=""))
- nresults <- length(result)
- for(r in 1:nresults) {
- rlist[[rno]] <- result[[r]]
-if(class(result[[r]]) == "try-error") {
-cat("ERROR in eval: ", result[[r]], "\n");
-}
- #DB cat("swiftapply: result rno=",rno,":\n") # FIXME: for logging
- #DB cat(rlist[[rno]]@output$gradient,"\n")
- rno <- rno + 1
- }
- }
- names(rlist) = names(arglists)
- if( ! keepwork ) {
- cat("Removing ", reqdir, "\n")
- unlink(reqdir,recursive=TRUE)
- }
- return(rlist)
-}
-
-swiftLapply <- function( tlist, func, ... )
-{
- if (length(tlist) == 0) return(tlist)
- arglists <- list()
- narglists <- length(tlist)
- for(i in 1 : narglists) {
- arglists[[i]] <- list(tlist[[i]], ...);
- }
- names(arglists) = names(tlist)
- swiftapply(func, arglists)
-}
-
-swiftRemoveAll <- function () {
- # Cleans up all data in global namespace on workers
- options(.swift.exports=list(c("removeAll")))
-}
-
-
-swiftLibrary <- function (packname) {
- # Add a library to be included on all workers
- # The package can be provided directly or alternatively
- # the package name can be provided.
- # If reset is true, then previously added libraries won't be
- # reimported on new workers.
-
- # Check to see if it is a string, if it is not a string
- # then we will deparse it to get the expression text
- if (!is.character(substitute(packname))) {
- # Maybe library was provided unquoted
- packname <- deparse(substitute(packname))
- }
- if (! packname %in% installed.packages()) {
- # Warn users in case they made a typo
- warning(paste("Package", packname,
- "was not a installed package in this instance of R,",
- "but may be installed on SwiftR workers"))
- }
- plist <- getOption(".swift.packages")
- if (is.null(plist)) {
- plist <- list()
- }
- else {
- # scan libs for previous detaches and libraries of
- # this package. Delete all of them
- plist = Filter(function(packcmd)
- { return (packcmd[2] != packname);},
- plist)
- }
- # Add the library command to the end of the list of commands to be
- # executed in slave R instances
- plist[[length(plist) + 1]] <- c("library", packname)
-
- options(.swift.packages=plist)
-}
-
-
-swiftDetach <- function (name) {
- # name is an string or identifier such as "package:OpenMx",
- # following the same pattern as the built in R detach() function
- # Detaches a library from workers and ensures that it will no longer be
- # imported. Note that the effect on dependent imports is a little
- # messy and behaves differently on existing and new workers.
- # If package A requires package B, and we import A on a worker,
- # package B will also be imported on the worker. By detaching package A
- # we don't also detach B. This contrasts to a fresh worker, where
- # package B will not be imported.
- if (!is.character(substitute(name))) {
- # Maybe package was provided unquoted
- name <- deparse(substitute(name))
- }
-
- #TODO: remove from options(".swift.packages")
- # Scan through list and remove any attach or detach of this name
- # add a detach cmd to the end
- if (substr(name, 1, 8) != "package:")
- stop("Can only detach packages. Package must be specified as package:PackageName")
- # Get bit after "package:"
- packname = substr(name, 9, nchar(name))
- plist = getOption(".swift.packages")
- if (is.null(plist)) {
- plist = list()
- }
- else {
- # Previous export/detach commands are now redundant
- plist = Filter(function(packcmd)
- { return (packcmd[2] != packname);},
- plist)
- }
- plist[[length(plist) + 1]] = c("detach", packname)
-
- options(.swift.packages=plist)
-}
-
-swiftExportAll <- function () {
- return (swiftExport(list=ls(globalenv())))
-}
-
-#TODO: not implemented
-.swiftExport <- function (..., list=NULL) {
- # List of object names (as R symbols or as strings)
- # These will be passed directly to save() to be serialized
-
- #TODO:
- # We want the protocol to support workers coming and going from the
- # cluster. We want to rely on less implicit state. Ie. if we run
- # swiftExport, then a worker is added to cluster, then an apply call
- # is run, we want the worker to include the exported objects.
- # One approach to doing this is to enforce the export at apply
- # time.
- #
- #
- # When swiftExport() is called:
- # * Save all of the objects to a file
- # - Q:Can we assume its on a shared filesystem?
- # * MAYBE: put the file somewhere accessible by all workers
- # * append the file's path to a stored list of files
- # * MAYBE LATER: asynchronously start swift tasks to distribute
- # the data, to get a headstart on the work
- # When swiftApply() is called:
- # - Inform all worker R processes through some mechanism (TBD)
- # of the list of all exported data files. (extra file?)
- # - The worker goes through the list in order to ensure that name
- # conflicts are resolved in the way you would expect.
- # If it has loaded a file previously, it does nothing.
- # If it hasn't loaded the file, it accesses the file (mechanism TBD)
- # and loads it into the global namespace.
- # data
- #
- # swiftRemoveAll():
- # delete the list of exported files from above and have a single
- # entry which instructs workers to delete all their data. This is
- # mainly important because it eliminates possibility of using huge
- # amounts of memory.
- # swiftRemove()
- # Implementation of this function is somewhat problematic, as it only
- # partially undoes previous work
-
-
- #Pseudocode:
- # TODO: choose file location
- # expFile <- ???
- save(..., list=list, file=expFile)
- exportList <- getOption(".swift.exports")
- if (is.null(exportList))
- exportList = list() #TODO: start with removeAll command?
- exportList[[length(exportList) + 1]] = c("export", expFile)
- options(.swift.exports=exportList)
-
-}
More information about the Swift-commit
mailing list