[Swift-commit] Cog update
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Thu Jan 12 00:15:04 CST 2012
------------------------------------------------------------------------
r3345 | hategan | 2012-01-12 00:14:17 -0600 (Thu, 12 Jan 2012) | 1 line
made local provider more override-friendly and fixed a wait-for-output-to-be-read bug
------------------------------------------------------------------------
Index: modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java (revision 3344)
+++ modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java (working copy)
@@ -163,23 +163,10 @@
// TODO move away from the multi-threaded approach
JobSpecification spec = (JobSpecification) getTask().getSpecification();
- /*
- * Run the process in the specified directory or
- * in the current directory if the specification
- * directory is null
- */
- File dir = null;
- if (spec.getDirectory() != null) {
- dir = new File(spec.getDirectory());
- }
- else {
- dir = new File(".");
- }
-
+ File dir = getJobDir(spec);
stageIn(spec, dir);
-
- process = Runtime.getRuntime().exec(buildCmdArray(spec), buildEnvp(spec), dir);
-
+
+ process = startProcess(spec, dir);
getTask().setStatus(Status.ACTIVE);
/*
@@ -234,9 +221,9 @@
* so STDIN processing cannot easily be interwoven
* with STDOUT/STDERR processing in a single thread
*/
- processIN(spec.getStdInput(), dir);
+ processIN(spec.getStdInput(), dir, process.getOutputStream());
- int exitCode = process.waitFor();
+ int exitCode = p.waitFor();
if (logger.isDebugEnabled()) {
logger.debug("Exit code was " + exitCode);
@@ -272,7 +259,25 @@
}
}
- private void cleanUp(JobSpecification spec, File dir) throws TaskSubmissionException {
+ protected Process startProcess(JobSpecification spec, File dir) throws IOException {
+ return Runtime.getRuntime().exec(buildCmdArray(spec), buildEnvp(spec), dir);
+ }
+
+ protected File getJobDir(JobSpecification spec) {
+ /*
+ * Run the process in the specified directory or
+ * in the current directory if the specification
+ * directory is null
+ */
+ if (spec.getDirectory() != null) {
+ return new File(spec.getDirectory());
+ }
+ else {
+ return new File(".");
+ }
+ }
+
+ protected void cleanUp(JobSpecification spec, File dir) throws TaskSubmissionException {
CleanUpSet cs = spec.getCleanUpSet();
if (cs == null || cs.isEmpty()) {
return;
@@ -304,7 +309,7 @@
}
}
- private void stageOut(JobSpecification spec, File dir, boolean jobSucceeded) throws Exception {
+ protected void stageOut(JobSpecification spec, File dir, boolean jobSucceeded) throws Exception {
StagingSet s = spec.getStageOut();
if (s == null || s.isEmpty()) {
return;
@@ -320,7 +325,7 @@
}
}
- private void stageIn(JobSpecification spec, File dir) throws Exception {
+ protected void stageIn(JobSpecification spec, File dir) throws Exception {
StagingSet s = spec.getStageIn();
if (s == null || s.isEmpty()) {
return;
@@ -380,7 +385,7 @@
}
}
- private ServiceContact getServiceContact(URI uri) {
+ protected ServiceContact getServiceContact(URI uri) {
ServiceContact sc = new ServiceContactImpl();
if (uri.getHost() != null) {
sc.setHost(uri.getHost());
@@ -405,12 +410,10 @@
return uri;
}
- protected void processIN(String in, File dir) throws IOException {
+ protected void processIN(String in, File dir, OutputStream out) throws IOException {
byte[] buf = new byte[BUFFER_SIZE];
if (in != null) {
- OutputStream out = process.getOutputStream();
-
File stdin;
if (dir != null) {
stdin = new File(dir, in);
@@ -426,8 +429,8 @@
read = file.read(buf);
}
file.close();
- out.close();
}
+ out.close();
}
protected OutputStream prepareOutStream(String out, FileLocation loc, File dir, Task task,
@@ -459,7 +462,7 @@
return os;
}
- private String[] buildCmdArray(JobSpecification spec) {
+ protected String[] buildCmdArray(JobSpecification spec) {
List<String> arguments = spec.getArgumentsAsList();
String[] cmdarray = new String[arguments.size() + 1];
@@ -471,7 +474,7 @@
return cmdarray;
}
- private String[] buildEnvp(JobSpecification spec) {
+ protected String[] buildEnvp(JobSpecification spec) {
Collection<String> names = spec.getEnvironmentVariableNames();
if (names.size() == 0) {
/*
@@ -560,6 +563,7 @@
private Process p;
private List<StreamPair> streamPairs;
byte[] buf;
+ private boolean done;
public Processor(Process p, List<StreamPair> streamPairs) {
this.p = p;
@@ -587,8 +591,12 @@
any = false;
}
if (processDone()) {
- processPairs();
+ while(processPairs()) {}
closePairs();
+ synchronized(this) {
+ done = true;
+ notifyAll();
+ }
return;
}
else {
@@ -634,6 +642,15 @@
}
return any;
}
+
+ public int waitFor() throws InterruptedException {
+ synchronized(this) {
+ while (!done) {
+ wait();
+ }
+ }
+ return p.waitFor();
+ }
private boolean processDone() {
try {
More information about the Swift-commit
mailing list