[Swift-commit] r4065 - in SwiftApps/SwiftR/Swift: R exec man

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Mon Feb 7 16:07:51 CST 2011


Author: tga
Date: 2011-02-07 16:07:51 -0600 (Mon, 07 Feb 2011)
New Revision: 4065

Modified:
   SwiftApps/SwiftR/Swift/R/Apply.R
   SwiftApps/SwiftR/Swift/R/Init.R
   SwiftApps/SwiftR/Swift/R/Workers.R
   SwiftApps/SwiftR/Swift/exec/start-swift
   SwiftApps/SwiftR/Swift/exec/start-swift-daemon
   SwiftApps/SwiftR/Swift/man/swiftapply.Rd
Log:
* Made the way the swift server was selected more consistent in swiftapply
* If a server type is specified through an argument or the toptions mechanism, we use the most recently launched server of that type.  If no server type is specified, then we use the most recently launched server.
* swiftShutdown now shuts down the most recently started server by default.


Modified: SwiftApps/SwiftR/Swift/R/Apply.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Apply.R	2011-02-07 21:04:05 UTC (rev 4064)
+++ SwiftApps/SwiftR/Swift/R/Apply.R	2011-02-07 22:07:51 UTC (rev 4065)
@@ -20,7 +20,6 @@
 {
   # Set Swift default options if not passed as keywords or pre-set by user
   if(is.null(server)) server <- getOption("swift.server")
-  if(is.null(server)) server<-"local"
 
   if(is.null(callsperbatch)) callsperbatch <- getOption("swift.callsperbatch")
   if(is.null(callsperbatch)) 
@@ -83,9 +82,7 @@
   }
   else if (runmode == "service") { 
     # Send request to service
-    user <- Sys.info()[["user"]]
-    swiftServerDir = file.path(tmpdir,user,"SwiftR", 
-                paste("swift.",server,sep=""))
+    swiftServerDir = getWorkerDir(server) 
 
     requestPipeName=file.path(swiftServerDir,"requestpipe")
     resultPipeName=file.path(swiftServerDir,"resultpipe")

Modified: SwiftApps/SwiftR/Swift/R/Init.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Init.R	2011-02-07 21:04:05 UTC (rev 4064)
+++ SwiftApps/SwiftR/Swift/R/Init.R	2011-02-07 22:07:51 UTC (rev 4065)
@@ -13,7 +13,7 @@
 
 swiftCleanup <- function () {
     # Shut down workers
-    swiftShutdown()
+    swiftShutdown(all=TRUE)
     # Clean up exported files
     removeExports()
 

Modified: SwiftApps/SwiftR/Swift/R/Workers.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Workers.R	2011-02-07 21:04:05 UTC (rev 4064)
+++ SwiftApps/SwiftR/Swift/R/Workers.R	2011-02-07 22:07:51 UTC (rev 4065)
@@ -149,12 +149,15 @@
     # launch asynchronously
     # for now, we will rely on the shell script's output to inform
     #  the user if there was a problem with the workers
-    pid <- system(cmdString, intern=TRUE)
+    out <- system(cmdString, intern=TRUE)
+    pid <- out[1]
+    workdir <- out[2]
     cat("Started worker manager with pid ", pid, "\n")
     
     output <- list()
     output$pid <- pid
     output$server <- server
+    output$workdir <- workdir
     output$cores <- cores
     output$nodes <- nodes
 
@@ -165,19 +168,13 @@
     # this process exits
     addHook()
 
-    # by default, set the swift.server option so that
-    # subsequent swift*apply calls use the newly created
-    # server
-    if (is.null(getOption("swift.server"))) 
-        options(swift.server=server)
     # Sleep to give start-swift time to set up fifos,etc
     Sys.sleep(2)
 
     return (output)
 }
 
-swiftShutdown <- function(handle=NULL) 
-{
+swiftShutdown <- function(handle=NULL, all=FALSE) {
     if (!exists(".swift.workers") 
             || is.null(.swift.workers)
             || length(.swift.workers) == 0) {
@@ -185,10 +182,19 @@
         return
     }
     if (is.null(handle)) {
-        workers = .swift.workers
-        .swift.workers <<- list()
+        if (all) {
+            workers <- .swift.workers
+            .swift.workers <<- list()
+            cat("Shutting down all Swift worker processes\n")
+        }
+        else {
+            # Remove the last started worker
+            workers <- .swift.workers[length(.swift.workers)]
+            .swift.workers <<- .swift.workers[1:length(.swift.workers)-1]
+        }
     }
     else {
+        # Split between matching workers and non-matching workers
         workers <- Filter(
             function (worker) { return (worker$pid == as.character(handle$pid)) ; },
             .swift.workers)
@@ -196,9 +202,9 @@
             function (worker) { return (worker$pid != as.character(handle$pid)) ; },
             .swift.workers)
     }
-    cat("Shutting down Swift worker processes\n")
     # shut down all worker processes using kill
     for (worker in workers) {
+        cat("Terminating worker", worker$pid, " of type ",worker$server, "\n")
         cmdString <- file.path(.find.package("Swift"), "exec/killtree &> /dev/null ")
         killCmd <- paste(cmdString, worker$pid)
         system(killCmd, wait=FALSE)
@@ -212,16 +218,49 @@
     # TODO: this is a bit flakey as there is no guarantee
     # the currently active worker of this type was launched within
     # R
+    worker <- getWorker(server)    
+    if (is.null(worker)) {
+        return (NULL)
+    }
+    else {
+        return (worker$cores * worker$nodes)
+    }
+}
+
+# Find the most recently launched instance of a server type.  If
+# no argument provided, the most recent server of any type
+getWorker <- function (server=NULL) {
     if (!exists(".swift.workers") 
         || is.null(.swift.workers)) {
         return (NULL)
     }
-    worker <- Find(
-            function (worker) { return (worker$server == server) ; },
-            .swift.workers, right=TRUE)
-    if (is.null(worker))
-        return (NULL)
-    else
-        return (worker$cores * worker$nodes)
+    if (is.null(server)) {
+        return (.swift.workers[[length(.swift.workers)]])
+    } 
+    else {
+        worker <- Find(
+                function (worker) { return (worker$server == server) ; },
+                .swift.workers, right=TRUE)
+        return (worker)
+    }
 }
 
+getWorkerDir <- function (server=NULL, tmpdir=NULL) {
+    worker <- getWorker(server)
+    if (!is.null(worker)) {
+        return (worker$workdir)
+    }
+    else {
+        if (!is.null(server)) {
+            if (is.null(tmpdir)) tmpdir <- chooseTmpDir()
+            user <- Sys.info()[["user"]]
+            return (file.path(tmpdir,user,"SwiftR", 
+                paste("swift.",server,sep="")))
+        }
+        else {
+            stop(paste("No SwiftR servers launched within R and no server type",
+                    "specified, can't identify a likely location for a swiftR",
+                    "service"))
+        }
+    }
+}

Modified: SwiftApps/SwiftR/Swift/exec/start-swift
===================================================================
--- SwiftApps/SwiftR/Swift/exec/start-swift	2011-02-07 21:04:05 UTC (rev 4064)
+++ SwiftApps/SwiftR/Swift/exec/start-swift	2011-02-07 22:07:51 UTC (rev 4065)
@@ -39,7 +39,7 @@
 {
   get-contact
   LOGDIR=$(pwd)/swiftworkerlogs # full path. FIXME: Generate this with remote-side paths if not shared dir env?
-  LOGDIR=/tmp/$USER/SwiftR/swiftworkerlogs  # FIXME: left this in /tmp so it works on any host. Better way?
+  LOGDIR=$tmp/$USER/SwiftR/swiftworkerlogs  # FIXME: left this in /tmp so it works on any host. Better way?
 
   #  mkdir -p $LOGDIR # is done with the ssh command, below
 
@@ -378,6 +378,7 @@
 queue=NONE
 project=NONE
 parEnv=NONE
+workdir=NONE
 workerLogging=ERROR
 keepdir=FALSE
 
@@ -400,6 +401,7 @@
     -t) time=$2; verify-not-null time $time; shift ;;
     -w) workerLogging=$2; verify-is-one-of workerLoggingLevel $workerLogging NONE ERROR WARN INFO DEBUG TRACE; shift ;;
     -k) keepdir=TRUE ;;
+    -d) workdir=$2; verify-not-null workdir $workdir; shift ;;
     *)  usage; exit 1 ;;
   esac
   shift
@@ -410,7 +412,17 @@
 
 rundir=$tmp/$USER/SwiftR/swift.$server  # rundir prefix # FIXME: handle multiple concurent independent swift servers per user
 mkdir -p $(dirname $rundir)
-trundir=$(mktemp -d $rundir.XXXX) # FIXME: check success
+
+# Setup a working directory
+if [ "$workdir" = NONE ]
+then
+    trundir=$(mktemp -d $rundir.XXXX) # FIXME: check success
+else 
+    echo Working in $workdir
+    trundir=$workdir
+    mkdir -p $workdir
+fi
+
 rm -f $rundir
 ln -s $trundir $rundir
 cd $trundir

Modified: SwiftApps/SwiftR/Swift/exec/start-swift-daemon
===================================================================
--- SwiftApps/SwiftR/Swift/exec/start-swift-daemon	2011-02-07 21:04:05 UTC (rev 4064)
+++ SwiftApps/SwiftR/Swift/exec/start-swift-daemon	2011-02-07 22:07:51 UTC (rev 4065)
@@ -8,13 +8,16 @@
 # the pid of start-swift
 ssscript=`dirname $0`/start-swift
 
-# Start as detached daemon, with output going to stdout
+# Choose a working directory
+tmp=${SWIFTR_TMP:-/tmp}
+workdir=$(mktemp -d $tmp/$USER/SwiftR/swift.XXXX) # FIXME: check success
 
-
+# Start as detached daemon, with output going to stderr
 # Start up a subprocess with a new process group
 # childpid will be of form '[jobno] pid'
-$ssscript "$@" 1>&2 &
+$ssscript "$@" -d $workdir 1>&2 &
 childpid=$!
 
 
 echo ${childpid}
+echo ${workdir}

Modified: SwiftApps/SwiftR/Swift/man/swiftapply.Rd
===================================================================
--- SwiftApps/SwiftR/Swift/man/swiftapply.Rd	2011-02-07 21:04:05 UTC (rev 4064)
+++ SwiftApps/SwiftR/Swift/man/swiftapply.Rd	2011-02-07 22:07:51 UTC (rev 4065)
@@ -44,7 +44,9 @@
     The swift server type to use to run.  The possible values are
     "local", "ssh", "pbs", "sge" and "pbsf", the same as swiftInit.
     The most recently started server of the specified type will be used
-    to execute the apply call.  The default value is "local".
+    to execute the apply call. If not specified, then 
+    the swift.server option will be used.  If that is not specified, the
+    most recently launched server of any type will be used.
 }
   \item{callsperbatch}{
     The number of function calls to group together into a single batch.




More information about the Swift-commit mailing list