[Swift-commit] cog r3527
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Mon Dec 3 11:40:09 CST 2012
------------------------------------------------------------------------
r3527 | davidkelly999 | 2012-12-03 11:38:59 -0600 (Mon, 03 Dec 2012) | 2 lines
Some fixes to make condor more flexible
------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/CondorExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/CondorExecutor.java (revision 3526)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/CondorExecutor.java (working copy)
@@ -9,8 +9,12 @@
*/
package org.globus.cog.abstraction.impl.scheduler.condor;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Writer;
+import java.nio.channels.FileChannel;
import java.util.Iterator;
import java.util.List;
@@ -20,10 +24,12 @@
import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.gsi.gssapi.auth.AuthorizationException;
public class CondorExecutor extends AbstractExecutor {
public static final Logger logger = Logger.getLogger(CondorExecutor.class);
@@ -32,20 +38,95 @@
super(task, listener);
}
- protected void writeAttr(String attrName, String arg, Writer wr)
- throws IOException {
+ protected void writeAttr(String attrName, String arg, Writer wr) throws IOException {
Object value = getSpec().getAttribute(attrName);
if (value != null) {
wr.write(arg + String.valueOf(value) + '\n');
}
}
- protected void writeScript(Writer wr, String exitcodefile, String stdout,
- String stderr) throws IOException {
+ public void start()
+ throws AuthorizationException, IOException, ProcessException {
+ File scriptdir = new File(".");
+ script = File.createTempFile(getName(), ".submit", scriptdir);
+ if (!logger.isDebugEnabled()) {
+ script.deleteOnExit();
+ }
+ stdout = spec.getStdOutput() == null ? script.getAbsolutePath()
+ + ".stdout" : spec.getStdOutput();
+ stderr = spec.getStdError() == null ? script.getAbsolutePath()
+ + ".stderr" : spec.getStdError();
+ exitcode = script.getAbsolutePath() + ".exitcode";
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Writing " + getName() + " script to " + script);
+ }
+
+ String[] cmdline = buildCommandLine(scriptdir, script, exitcode,
+ stdout, stderr);
+
+ if (logger.isDebugEnabled()) {
+ logCommandLine(cmdline);
+ }
+ Process process = Runtime.getRuntime().exec(cmdline, null, null);
+
+ try {
+ process.getOutputStream().close();
+ }
+ catch (IOException e) {
+ }
+
+ try {
+ int code = process.waitFor();
+ if (code != 0) {
+ String errorText = getOutput(process.getInputStream())
+ + getOutput(process.getErrorStream());
+ throw new ProcessException("Could not submit job ("
+ + getProperties().getSubmitCommandName()
+ + " reported an exit code of " + code + "). "
+ + errorText);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(getProperties().getSubmitCommandName()
+ + " done (exit code " + code + ")");
+ }
+ }
+ catch (InterruptedException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Interrupted exception while waiting for "
+ + getProperties().getSubmitCommandName(), e);
+ }
+ if (listener != null) {
+ listener
+ .processFailed("The submission process was interrupted");
+ }
+ }
+
+ String output = getOutput(process.getInputStream());
+ jobid = parseSubmitCommandOutput(output);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Submitted job with id '" + jobid + "'");
+ }
+
+ if (jobid.length() == 0) {
+ String errorText = getOutput(process.getErrorStream());
+ if (listener != null)
+ listener.processFailed("Received empty jobid!\n" +
+ output + "\n" + errorText);
+ }
+
+ process.getInputStream().close();
+
+ getQueuePoller().addJob(
+ job = createJob(jobid, stdout, spec.getStdOutputLocation(), stderr,
+ spec.getStdErrorLocation(), exitcode, this));
+ }
+
+ protected void writeScript(Writer wr, String exitcodefile, String stdout, String stderr) throws IOException {
boolean grid = false;
- Task task = getTask();
JobSpecification spec = getSpec();
- getSpec().unpackProviderAttributes();
+
+ // Handle some predefined jobTypes
String type = (String) spec.getAttribute("jobType");
if (logger.isDebugEnabled()) {
logger.debug("Job type: " + type);
@@ -58,27 +139,31 @@
String gridResource = (String) spec.getAttribute("gridResource");
wr.write("universe = grid\n");
wr.write("grid_resource = "+gridResource+"\n");
-
-// the below two lines are needed to cause the gridmonitor to be used
-// which is the point of all this...
+ // the below two lines are needed to cause the gridmonitor to be used
+ // which is the point of all this...
wr.write("stream_output = False\n");
wr.write("stream_error = False\n");
-
wr.write("Transfer_Executable = false\n");
}
else {
- wr.write("universe = vanilla\n");
+ if(spec.getAttribute("condor.universe") == null) {
+ wr.write("universe = vanilla\n");
+ }
}
+
if ("true".equals(spec.getAttribute("holdIsFailure"))) {
wr.write("periodic_remove = JobStatus == 5\n");
}
+
writeAttr("count", "machine_count = ", wr);
+ wr.write("output = " + quote(stdout) + '\n');
+ wr.write("error = " + quote(stderr) + '\n');
+
if (spec.getStdInput() != null) {
wr.write("input = " + quote(spec.getStdInput()) + "\n");
}
- wr.write("output = " + quote(stdout) + '\n');
- wr.write("error = " + quote(stderr) + '\n');
- Iterator i = spec.getEnvironmentVariableNames().iterator();
+
+ Iterator<String> i = spec.getEnvironmentVariableNames().iterator();
if (i.hasNext()) {
wr.write("environment = ");
}
@@ -98,8 +183,17 @@
wr.write("remote_initialdir = " + quote(spec.getDirectory()) + "\n");
}
}
+
+ String basename[] = spec.getExecutable().split("/");
+ FileChannel from = new FileInputStream(spec.getExecutable()).getChannel();
+ FileChannel to = new FileOutputStream(basename[basename.length-1]).getChannel();
+ to.transferFrom(from, 0, from.size());
+ from.close();
+ to.close();
+
+ spec.getExecutable();
wr.write("executable = " + quote(spec.getExecutable()) + "\n");
- List args = spec.getArgumentsAsList();
+ List<String> args = spec.getArgumentsAsList();
if (args != null && args.size() > 0) {
wr.write("arguments = ");
i = args.iterator();
@@ -110,20 +204,16 @@
}
}
}
- wr.write('\n');
+ wr.write('\n');
- String request_memory = (String) spec.getAttribute("request_memory");
- if(request_memory != null) {
- wr.write("request_memory = " + request_memory + '\n');
- }
-
- String resources = (String) spec.getAttribute("condor.resource_list");
- if (resources != null && resources.length() > 0) {
- if (logger.isDebugEnabled())
- logger.debug("condor.resource_list: " + resources);
- wr.write(resources + '\n');
- }
-
+ // Handle all condor attributes specified by the user
+ for(String a : spec.getAttributeNames()) {
+ if(a != null && a.startsWith("condor.")) {
+ String attributeName[] = a.split("condor.");
+ wr.write(attributeName[1] + " = " + spec.getAttribute(a) + '\n');
+ }
+ }
+
wr.write("notification = Never\n");
wr.write("leave_in_queue = TRUE\n");
wr.write("queue\n");
More information about the Swift-commit
mailing list