[Swift-commit] r4042 - in SwiftApps/SwiftR: . Swift/R Swift/man

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Thu Jan 27 17:29:11 CST 2011


Author: tga
Date: 2011-01-27 17:29:10 -0600 (Thu, 27 Jan 2011)
New Revision: 4042

Modified:
   SwiftApps/SwiftR/Makefile
   SwiftApps/SwiftR/Swift/R/Swift.R
   SwiftApps/SwiftR/Swift/R/Workers.R
   SwiftApps/SwiftR/Swift/man/Swift-package.Rd
   SwiftApps/SwiftR/Swift/man/swiftInit.Rd
Log:
Modified swift*apply logic to be smarter about how to batch up calls.
Now the default behaviour is to automatically choose a callsperbatch setting such that there will be
somewhere between 1 and 2 batches per worker (1 batch per worker if calls is divisible by worker count).

This improves performance significantly in many cases.

Implementing this change required retaining more information about launched worker processes within the R session: now we retain the number of cores/nodes andthe server type along with the process id.




Modified: SwiftApps/SwiftR/Makefile
===================================================================
--- SwiftApps/SwiftR/Makefile	2011-01-27 17:32:06 UTC (rev 4041)
+++ SwiftApps/SwiftR/Makefile	2011-01-27 23:29:10 UTC (rev 4042)
@@ -30,6 +30,9 @@
 install: $(TBALL)
 	R CMD INSTALL $(TBALL)
 
+check: $(TBALL)
+	R CMD check $(TBALL)
+
 clean: 
 	rm -rf Swift/inst/swift/*
 	rm $(TBALL)

Modified: SwiftApps/SwiftR/Swift/R/Swift.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Swift.R	2011-01-27 17:32:06 UTC (rev 4041)
+++ SwiftApps/SwiftR/Swift/R/Swift.R	2011-01-27 23:29:10 UTC (rev 4042)
@@ -17,8 +17,33 @@
 
   if(is.null(callsperbatch))
     callsperbatch <- getOption("swift.callsperbatch")
-  if(is.null(callsperbatch))
-    callsperbatch <- 1
+
+  if(is.null(callsperbatch)) {
+    # auto batching:
+    # start with 1 batch per core
+    wc <- workerCount(server)
+    if (is.null(wc)) {
+        if (! quiet ) 
+            cat(paste("Information about current server of type", 
+                    server, "not found"))
+        callsperbatch <- 1
+    }
+    else {
+        batches_per_worker <- getOption("swift.batchesperworker")
+        if (is.null(batches_per_worker)) 
+            batches_per_worker <- 1
+        target_batches <- wc * batches_per_worker
+        # We want to have at least batches_per_worker batches of calls
+        # for each worker.  If it doesn't divide evenly, some will get
+        # batches_per_worker + 1 batches.  
+        # callsperbatch < 1 doesn't make sense, hence the use of max
+        callsperbatch = max(1, floor(length(arglists)/target_batches))
+#        if (! quiet ) 
+#            cat(paste("Selected callsperbatch of", callsperbatch, 
+#                    "automatically, based on worker count of",wc,
+#                    "and arg count of ", length(arglists)))
+    }
+  }
   #cat("Got ", length(arglists), " arguments with batching factor ", callsperbatch)
 
   if(is.null(runmode))
@@ -62,6 +87,7 @@
   }
 
   if (! quiet) {
+      cat("\nswiftapply to ", length(arglists), " arg lists.\n")
       cat("\nSwift properties:\n")
       cat("  server =", server,"\n")
       cat("  callsperbatch =", callsperbatch,"\n")
@@ -664,7 +690,7 @@
 
     cat("\n\n ===> Total elapsed test time = ",runTime," seconds.\n\n") 
     if (!is.null(testPid))
-        swiftShutdown(pid=testPid)
+        swiftShutdown(testPid)
 } # end function runAllTests
 
 #options(swift.site="local")

Modified: SwiftApps/SwiftR/Swift/R/Workers.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Workers.R	2011-01-27 17:32:06 UTC (rev 4041)
+++ SwiftApps/SwiftR/Swift/R/Workers.R	2011-01-27 23:29:10 UTC (rev 4042)
@@ -27,7 +27,8 @@
 
 
     # In case it was somehow deleted
-    if (is.null(.swift.workers)) {
+    if (!exists(".swift.workers") 
+        || is.null(.swift.workers)) {
         .swift.workers <<- list()
     }
 
@@ -36,11 +37,6 @@
     # Presume UNIX path names - start-swift script
     cmdString <- file.path(.find.package("Swift"), "exec/start-swift-daemon")
 
-    if(is.null(cores))
-        cores <- getOption("swift.cores")
-    if(! is.null(cores) )  {
-        cmdString <- paste(cmdString, "-c", shQuote(cores))
-    }
     
     if(is.null(server))
         server <- getOption("swift.server")
@@ -49,9 +45,25 @@
     if(! is.null(server) )  {
         cmdString <- paste(cmdString, "-s", shQuote(server)) 
     }
+    
+    if(is.null(cores))
+        cores <- getOption("swift.cores")
+        if (is.null(cores)) {
+            if (server == "local")
+                cores <- 2
+            else if (server == "ssh")
+                cores <- 4
+            else 
+                cores <- 8
+        }
+    if(! is.null(cores) )  {
+        cmdString <- paste(cmdString, "-c", shQuote(cores))
+    }
 
     if(is.null(hosts))
         hosts <- getOption("swift.hosts")
+        if (is.null(hosts))
+            hosts <- 1
     if(! is.null(hosts) )  {
         cmdString <- paste(cmdString, "-h", shQuote(hosts) )
     }
@@ -68,10 +80,14 @@
         cmdString <- paste(cmdString, "-m", shQuote(workmode)) 
     }
 
-    if(is.null(nodes))
-        nodes <- getOption("swift.nodes")
-    if(! is.null(nodes) )  {
-        cmdString <- paste(cmdString, "-n", shQuote(nodes)) 
+    if (server == "local")
+        nodes <- 1
+    else {
+        if(is.null(nodes))
+            nodes <- getOption("swift.nodes")
+        if(! is.null(nodes) )  {
+            cmdString <- paste(cmdString, "-n", shQuote(nodes)) 
+        }
     }
     
     if(is.null(throttle))
@@ -110,10 +126,16 @@
     # launch asynchronously
     # for now, we will rely on the shell script's output to inform
     #  the user if there was a problem with the workers
-    output <- system(cmdString, intern=TRUE)
-    cat("Started worker manager with pid ", output, "\n")
+    pid <- system(cmdString, intern=TRUE)
+    cat("Started worker manager with pid ", pid, "\n")
+    
+    output <- list()
+    output$pid <- pid
+    output$server <- server
+    output$cores <- cores
+    output$nodes <- nodes
 
-    # store pid
+    # store worker info
     .swift.workers[[length(.swift.workers) + 1]] <<- output
 
     # add hook to ensure child process will be killed when 
@@ -127,33 +149,57 @@
         options(swift.server=server)
     # Sleep to give start-swift time to set up fifos,etc
     Sys.sleep(2)
+
     return (output)
 }
 
 swiftShutdown <- function(handle=NULL) 
 {
+    if (!exists(".swift.workers") 
+            || is.null(.swift.workers)
+            || length(.swift.workers) == 0) {
+        cat("No swift workers were started in R\n")
+        return
+    }
     if (is.null(handle)) {
-        if (is.null(.swift.workers)) {
-            return
-        }
         workers = .swift.workers
         .swift.workers <<- list()
     }
     else {
-        workers=handle
+        workers <- Filter(
+            function (worker) { return (worker$pid == as.character(handle$pid)) ; },
+            .swift.workers)
         .swift.workers <<- Filter( 
-            function (p) { return (p != as.character(handle)) ; },
+            function (worker) { return (worker$pid != as.character(handle$pid)) ; },
             .swift.workers)
     }
     cat("Shutting down Swift worker processes\n")
     # shut down all worker processes using kill
-    for (pid in workers) {
+    for (worker in workers) {
         cmdString <- file.path(.find.package("Swift"), "exec/killtree &> /dev/null ")
-        killCmd <- paste(cmdString,pid)
+        killCmd <- paste(cmdString, worker$pid)
         system(killCmd, wait=FALSE)
     }
+}
 
+workerCount <- function (server) {
+    # workerCount gets the number of workers launched by the previous
 
+    # Find the last launched worker of the right type.
+    # TODO: this is a bit flakey as there is no guarantee
+    # the currently active worker of this type was launched within
+    # R
+    if (!exists(".swift.workers") 
+        || is.null(.swift.workers)) {
+        return (NULL)
+    }
+    worker <- Find(
+            function (worker) { return (worker$server == server) ; },
+            .swift.workers, right=TRUE)
+    if (is.null(worker))
+        return (NULL)
+    else
+        return (worker$cores * worker$nodes)
 }
 
 .First.lib <- function(libname, packagename) {
@@ -199,3 +245,4 @@
         rm(".UserLast", pos=".GlobalEnv")
     }
 }
+

Modified: SwiftApps/SwiftR/Swift/man/Swift-package.Rd
===================================================================
--- SwiftApps/SwiftR/Swift/man/Swift-package.Rd	2011-01-27 17:32:06 UTC (rev 4041)
+++ SwiftApps/SwiftR/Swift/man/Swift-package.Rd	2011-01-27 23:29:10 UTC (rev 4042)
@@ -486,6 +486,8 @@
 \code{\link{swiftapply}}
 }
 \examples{
+library(Swift)
+job <- swiftInit()
 
 myfunc <- function(treedata,cardata) { sum( treedata$Height, cardata$dist ) }
 data(cars)

Modified: SwiftApps/SwiftR/Swift/man/swiftInit.Rd
===================================================================
--- SwiftApps/SwiftR/Swift/man/swiftInit.Rd	2011-01-27 17:32:06 UTC (rev 4041)
+++ SwiftApps/SwiftR/Swift/man/swiftInit.Rd	2011-01-27 23:29:10 UTC (rev 4042)
@@ -121,22 +121,22 @@
 }
 \examples{
 # Running a job on the local machine with 8 cores
-option(swift.server="local")
+options(swift.server="local")
 serverHandle <- swiftInit(cores=8)
-swiftApply(log,list(list(3), list(4)))
+swiftapply(log,list(list(3), list(4)))
 swiftLapply(list(3, 4),
             function (x) { return (2*x); })
 # Now shut down the server
 swiftShutdown(serverHandle)
 
 # Now run remotely via ssh on 2 remote machines called foo and bar
-option(swift.server="ssh")
+options(swift.server="ssh")
 swiftInit(hosts="foo bar")
 swiftLapply(list(3, 4), log)
 # ssh servers will keep running
 
 # Run using the PBS scheduler
-option(swift.server="pbs")
+options(swift.server="pbs")
 # start a worker through pbs.  Note that, depending on your PBS 
 # installation's utilization level and policies, it may take
 # a while before swiftapply calls can be processed.




More information about the Swift-commit mailing list