[Swift-commit] Cog update

swift at ci.uchicago.edu swift at ci.uchicago.edu
Thu Jan 12 00:15:04 CST 2012


------------------------------------------------------------------------
r3345 | hategan | 2012-01-12 00:14:17 -0600 (Thu, 12 Jan 2012) | 1 line

made local provider more override-friendly and fixed a wait-for-output-to-be-read bug
------------------------------------------------------------------------
Index: modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	(revision 3344)
+++ modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	(working copy)
@@ -163,23 +163,10 @@
             // TODO move away from the multi-threaded approach
             JobSpecification spec = (JobSpecification) getTask().getSpecification();
 
-            /*
-             * Run the process in the specified directory or
-             * in the current directory if the specification
-             * directory is null
-             */
-            File dir = null;
-            if (spec.getDirectory() != null) {
-                dir = new File(spec.getDirectory());
-            }
-            else {
-                dir = new File(".");
-            }
-
+            File dir = getJobDir(spec);
             stageIn(spec, dir);
-
-            process = Runtime.getRuntime().exec(buildCmdArray(spec), buildEnvp(spec), dir);
-
+            
+            process = startProcess(spec, dir);
             getTask().setStatus(Status.ACTIVE);
 
             /*
@@ -234,9 +221,9 @@
              * so STDIN processing cannot easily be interwoven
              * with STDOUT/STDERR processing in a single thread
              */
-            processIN(spec.getStdInput(), dir);
+            processIN(spec.getStdInput(), dir, process.getOutputStream());
             
-            int exitCode = process.waitFor();
+            int exitCode = p.waitFor();
 
             if (logger.isDebugEnabled()) {
                 logger.debug("Exit code was " + exitCode);
@@ -272,7 +259,25 @@
         }
     }
 
-    private void cleanUp(JobSpecification spec, File dir) throws TaskSubmissionException {
+    protected Process startProcess(JobSpecification spec, File dir) throws IOException {
+        return Runtime.getRuntime().exec(buildCmdArray(spec), buildEnvp(spec), dir);
+    }
+
+    protected File getJobDir(JobSpecification spec) {
+        /*
+         * Run the process in the specified directory or
+         * in the current directory if the specification
+         * directory is null
+         */
+        if (spec.getDirectory() != null) {
+            return new File(spec.getDirectory());
+        }
+        else {
+            return new File(".");
+        }
+    }
+
+    protected void cleanUp(JobSpecification spec, File dir) throws TaskSubmissionException {
         CleanUpSet cs = spec.getCleanUpSet();
         if (cs == null || cs.isEmpty()) {
             return;
@@ -304,7 +309,7 @@
         }
     }
 
-    private void stageOut(JobSpecification spec, File dir, boolean jobSucceeded) throws Exception {
+    protected void stageOut(JobSpecification spec, File dir, boolean jobSucceeded) throws Exception {
         StagingSet s = spec.getStageOut();
         if (s == null || s.isEmpty()) {
             return;
@@ -320,7 +325,7 @@
         }
     }
 
-    private void stageIn(JobSpecification spec, File dir) throws Exception {
+    protected void stageIn(JobSpecification spec, File dir) throws Exception {
         StagingSet s = spec.getStageIn();
         if (s == null || s.isEmpty()) {
             return;
@@ -380,7 +385,7 @@
         }
     }
 
-    private ServiceContact getServiceContact(URI uri) {
+    protected ServiceContact getServiceContact(URI uri) {
         ServiceContact sc = new ServiceContactImpl();
         if (uri.getHost() != null) {
             sc.setHost(uri.getHost());
@@ -405,12 +410,10 @@
         return uri;
     }
     
-    protected void processIN(String in, File dir) throws IOException {
+    protected void processIN(String in, File dir, OutputStream out) throws IOException {
 
         byte[] buf = new byte[BUFFER_SIZE];
         if (in != null) {
-            OutputStream out = process.getOutputStream();
-
             File stdin;
             if (dir != null) {
                 stdin = new File(dir, in);
@@ -426,8 +429,8 @@
                 read = file.read(buf);
             }
             file.close();
-            out.close();
         }
+        out.close();
     }
 
     protected OutputStream prepareOutStream(String out, FileLocation loc, File dir, Task task,
@@ -459,7 +462,7 @@
         return os;
     }
 
-    private String[] buildCmdArray(JobSpecification spec) {
+    protected String[] buildCmdArray(JobSpecification spec) {
         List<String> arguments = spec.getArgumentsAsList();
         String[] cmdarray = new String[arguments.size() + 1];
 
@@ -471,7 +474,7 @@
         return cmdarray;
     }
 
-    private String[] buildEnvp(JobSpecification spec) {
+    protected String[] buildEnvp(JobSpecification spec) {
         Collection<String> names = spec.getEnvironmentVariableNames();
         if (names.size() == 0) {
             /*
@@ -560,6 +563,7 @@
         private Process p;
         private List<StreamPair> streamPairs;
         byte[] buf;
+        private boolean done;
 
         public Processor(Process p, List<StreamPair> streamPairs) {
             this.p = p;
@@ -587,8 +591,12 @@
                     any = false;
                 }
                 if (processDone()) {
-                    processPairs();
+                    while(processPairs()) {}
                     closePairs();
+                    synchronized(this) {
+                        done = true;
+                        notifyAll();
+                    }
                     return;
                 }
                 else {
@@ -634,6 +642,15 @@
             }
             return any;
         }
+        
+        public int waitFor() throws InterruptedException {
+            synchronized(this) {
+                while (!done) {
+                    wait();
+                }
+            }
+            return p.waitFor();
+        }
 
         private boolean processDone() {
             try {



More information about the Swift-commit mailing list