[Swift-commit] r5159 - in SwiftApps/SwiftR: . Swift/R Swift/exec

tga at ci.uchicago.edu tga at ci.uchicago.edu
Fri Sep 23 18:22:17 CDT 2011


Author: tga
Date: 2011-09-23 18:22:17 -0500 (Fri, 23 Sep 2011)
New Revision: 5159

Modified:
   SwiftApps/SwiftR/Makefile
   SwiftApps/SwiftR/Swift/R/Workers.R
   SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh
Log:
Getting the R server cleanup logic working.



Modified: SwiftApps/SwiftR/Makefile
===================================================================
--- SwiftApps/SwiftR/Makefile	2011-09-23 22:04:00 UTC (rev 5158)
+++ SwiftApps/SwiftR/Makefile	2011-09-23 23:22:17 UTC (rev 5159)
@@ -7,7 +7,7 @@
 
 PKG_FILES += Swift/DESCRIPTION
 PKG_FILES += Swift/NAMESPACE 
-PKG_FILE +=  Swift/src/Makefile Swift/src/make.include
+PKG_FILES +=  Swift/src/Makefile Swift/src/make.include
 PKG_FILES += $(shell find Swift/src/swift-patches -not -path '*/.svn*') 
 
 PACKAGE_DEPS = $(PKG_FILES) Makefile

Modified: SwiftApps/SwiftR/Swift/R/Workers.R
===================================================================
--- SwiftApps/SwiftR/Swift/R/Workers.R	2011-09-23 22:04:00 UTC (rev 5158)
+++ SwiftApps/SwiftR/Swift/R/Workers.R	2011-09-23 23:22:17 UTC (rev 5159)
@@ -139,9 +139,10 @@
     if(! is.null(workmode) )  {
         cmdString <- paste(cmdString, "-m", shQuote(workmode)) 
     }
-    if (! is.null(retries) ) {
-        cmdString <- paste(cmdString, "-retries", shQuote(retries))
+    if (is.null(retries) ) {
+        retries <- 3
     }
+    cmdString <- paste(cmdString, "-retries", shQuote(retries))
 
     if (server == "local")
         nodes <- 1

Modified: SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh
===================================================================
--- SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh	2011-09-23 22:04:00 UTC (rev 5158)
+++ SwiftApps/SwiftR/Swift/exec/EvalRBatchPersistent.sh	2011-09-23 23:22:17 UTC (rev 5159)
@@ -53,6 +53,8 @@
 fi
 
 TRAPS="EXIT 1 2 3 15"
+HAS_MUTEX=false
+RPID=
 
 function onexit() { # FIXME: move this logic into worker.pl, or try to use R timeout option if it works on fifos - need to test.
   trap - $TRAPS
@@ -60,9 +62,38 @@
   if [ "_$RPIDS" != _ ]; then
     kill $RPIDS >& /dev/null
   fi
+
+  if [ "$HAS_MUTEX" = "true" ]; then
+    rmdir $SLOTDIR/mutex
+    HAS_MUTEX=false
+  fi
 }
 
+function launchRServer() {
+  #Start R server and set RPID to the value
+  
+  mkfifo $SLOTDIR/toR.fifo
+  mkfifo $SLOTDIR/fromR.fifo
 
+  echo "$0: INFO: Launching $RServerScript $SLOTDIR"
+  #echo PATH: $PATH > $SLOTDIR/R.log
+  #which Rscript >> $SLOTDIR/R.log
+  
+  echo LD_LIBRARYPATH= $LD_LIBRARY_PATH
+  PATH=.:$PATH
+  echo PATH= $PATH
+  echo R_LIBS_USER= $R_LIBS_USER
+
+  slotdir=$1
+  # Use R instead of Rscript due to Rscript issues on some
+  # platforms (e.g. cray xt)
+  R --slave --no-restore --file=$RServerScript \
+                --args $slotdir >> $slotdir/R.log 2>&1 &   # launch R server
+  RPID=$!
+  echo $RPID > $SLOTDIR/R.pid
+  echo "$0: INFO: Launched $RServerScript $slotdir Rscript"
+}
+
 trap "onexit 1" 1
 trap "onexit 2" 2
 trap "onexit 3" 3
@@ -70,19 +101,34 @@
 trap "onexit EXIT" EXIT
 
 
-function idletimer {
-  cd $1
-  while true; do
-    touch idle
-    sleep 60
-    if [ ! -f lastwrite -o \( idle -nt lastwrite \) ]; then
-      # FIXME: should work, but does not: echo quit x x >toR.fifo # fifo problem?
-      RPID=$(cat R.pid)
-      echo killing idle R process $RPID
-      kill $RPID
-      exit 0
+function start_idletimer {
+  # After the specified number of seconds
+  # kill the specified R process
+  local timeout=$1
+  local rpid=$2
+  local idletimer_id=$$$(date '+%s%N') # timestamp in ns plus pid should be unique
+  echo "$idletimer_id" > $SLOTDIR/idletimer
+  (
+    sleep ${timeout}s
+    
+    touch $SLOTDIR/timeout_reached
+    # if mutex has been acquired, know another process active
+    if mkdir $SLOTDIR/mutex ; then
+        touch $SLOTDIR/timeout_reached_mutex
+        new_idletimer_id=$(cat $SLOTDIR/idletimer)
+        echo new: "$new_idletimer_id" old "$idletimer_id" > $SLOTDIR/idletimer_ids
+        if [ "$new_idletimer_id" = "$idletimer_id" ]; then
+          touch $SLOTDIR/timeout_reached_mutex_killed
+          echo killing idle R process $rpid
+          kill $rpid
+          rm -f $SLOTDIR/idletimer $SLOTDIR/fromR.fifo $SLOTDIR/toR.fifo
+          touch $SLOTDIR/timedout
+        else
+            touch $SLOTDIR/timeout_reached_mutex_notkilled
+        fi
+        rmdir $SLOTDIR/mutex
     fi
-  done
+  ) &> /dev/null &
 }
 
 # Ensure that the dir for this slot exists. 
@@ -104,28 +150,40 @@
 WORKERDIR=$BASEDIR/worker.$SWIFT_WORKER_PID
 SLOTDIR=$WORKERDIR/${SWIFT_JOB_SLOT}
 
+# terminate timeout as early as possible
+rm ${SLOTDIR}/idletimer
+
 mkdir -p $WORKERDIR
 
 RPIDS=
 
 TIMEOUT=30
+IDLE_TIMEOUT=10
+timeout_pid=
 
+
 function start_timeout {
-    ppid=$$
-    child_pid=$1
+    local ppid=$$
+    local child_pid=$1
     trap "timeout_handler" SIGHUP
     (
-    sleep ${TIMEOUT}s
-    kill -1 $ppid  &> /dev/null #SIGHUP 
-    if [ "$child_pid" != "" ]; then
-        kill -1 $child_pid  &> /dev/null #SIGHUP 
-    fi
+        trap "exit 0" 1 # timeout cancelled with SIGHUP
+        sleep ${TIMEOUT}s
+        kill -1 $ppid  &> /dev/null #SIGHUP 
+        if [ "$child_pid" != "" ]; then
+            kill -1 $child_pid  &> /dev/null #SIGHUP 
+        fi
     ) &
+    timeout_pid=$!
 }
 
 function stop_timeout {
     #DEBUG
     trap "" SIGHUP
+    if [ ! -z "$timeout_pid" ]; then
+        kill -1 $timeout_pid
+        timeout_pid=
+    fi
 }
 
 function timeout_handler {
@@ -145,48 +203,41 @@
 
 mkdir $SLOTDIR >& /dev/null
 if [ $? = 0 ]; then
-  mkfifo $SLOTDIR/toR.fifo
-  mkfifo $SLOTDIR/fromR.fifo
-  chmod +x $RServerScript
-  echo "$0: INFO: Launching $RServerScript $SLOTDIR"
-  #echo PATH: $PATH > $SLOTDIR/R.log
-  #which Rscript >> $SLOTDIR/R.log
-  
-  echo LD_LIBRARYPATH= $LD_LIBRARY_PATH
-  PATH=.:$PATH
-  echo PATH= $PATH
-  echo R_LIBS_USER= $R_LIBS_USER
-  # Use R instead of Rscript due to Rscript issues on some
-  # platforms (e.g. cray xt)
-  R --slave --no-restore --file=$RServerScript \
-                --args $SLOTDIR >> $SLOTDIR/R.log 2>&1 &   # launch R server
-  #$RServerScript $SLOTDIR >> $SLOTDIR/R.log 2>&1 &   # launch R server
-  # idletimer $SLOTDIR </dev/null >/dev/null 2>&1 & # R saves pid in R.pid for idletimer to kill it
-  echo "$0: INFO: Launched $RServerScript $SLOTDIR Rscript"
+  launchRServer $SLOTDIR
 else
   # wait to make sure fifo exists
-  # fromR fifo is created last, so wait for that one
+  # R.pid is created last and left if, so wait for that
   while true; do 
-    if [ -p $SLOTDIR/fromR.fifo ]; then
+    if [ -f $SLOTDIR/R.pid ]; then
+      RPID=$(cat $SLOTDIR/R.pid)
       break
     fi
   done
 fi
+# At this point we've launched an R server, or maybe discovered
+# an existing server.  IT is still possible that the old server
+# was timed out, btu to be sure we will acquire a mutex first
 
 # Ready to talk to the server: send request and read response
 #FIXME: what if mutex isn't cleaned up?
 start_timeout
 while true; do
+  HAS_MUTEX=true
   mkdir $SLOTDIR/mutex
   if [ $? != 0 ]; then
     sleep 1;
   else
+    if [ -f $SLOTDIR/timedout ]; then
+      launchRServer $SLOTDIR
+      rm -f $SLOTDIR/timedout
+    fi
     break;
   fi
 done
 stop_timeout
 echo DB: Obtained $SLOTDIR/mutex
 
+
 absimports=
 for im in $imports; do
     absimports="$absimports $(pwd)/$im"
@@ -207,11 +258,17 @@
 
     res=$(cat < $SLOTDIR/fromR.fifo)
     echo DB: Got response: $res
+    
+    
+    # While still holding mutex, start timer to shut down idle R server
+    start_idletimer $IDLE_TIMEOUT $RPID   
 
     rmdir $SLOTDIR/mutex
+    HAS_MUTEX=false
 
     echo DB: Freed $SLOTDIR/mutex
-    
+
+
     # Test if R server reported an error
     if echo "$res" | grep -q '^ERROR:'
     then
@@ -221,6 +278,7 @@
     stop_timeout
     echo "ERROR: Could not write to fifo ok"
     rmdir $SLOTDIR/mutex
+    HAS_MUTEX=false
 
     exit 1
 fi




More information about the Swift-commit mailing list