[Swift-commit] cog r3527

swift at ci.uchicago.edu swift at ci.uchicago.edu
Mon Dec 3 11:40:09 CST 2012


------------------------------------------------------------------------
r3527 | davidkelly999 | 2012-12-03 11:38:59 -0600 (Mon, 03 Dec 2012) | 2 lines

Some fixes to make condor more flexible

------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/CondorExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/CondorExecutor.java	(revision 3526)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/CondorExecutor.java	(working copy)
@@ -9,8 +9,12 @@
  */
 package org.globus.cog.abstraction.impl.scheduler.condor;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.Writer;
+import java.nio.channels.FileChannel;
 import java.util.Iterator;
 import java.util.List;
 
@@ -20,10 +24,12 @@
 import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
 import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
 import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
 import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
 import org.globus.cog.abstraction.interfaces.FileLocation;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
 import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.gsi.gssapi.auth.AuthorizationException;
 
 public class CondorExecutor extends AbstractExecutor {
 	public static final Logger logger = Logger.getLogger(CondorExecutor.class);
@@ -32,20 +38,95 @@
 		super(task, listener);
 	}
 
-	protected void writeAttr(String attrName, String arg, Writer wr)
-			throws IOException {
+	protected void writeAttr(String attrName, String arg, Writer wr) throws IOException {
 		Object value = getSpec().getAttribute(attrName);
 		if (value != null) {
 			wr.write(arg + String.valueOf(value) + '\n');
 		}
 	}
 
-	protected void writeScript(Writer wr, String exitcodefile, String stdout,
-			String stderr) throws IOException {
+    public void start()
+    throws AuthorizationException, IOException, ProcessException {
+    	File scriptdir = new File(".");
+        script = File.createTempFile(getName(), ".submit", scriptdir);
+        if (!logger.isDebugEnabled()) {
+            script.deleteOnExit();
+        }
+        stdout = spec.getStdOutput() == null ? script.getAbsolutePath()
+                + ".stdout" : spec.getStdOutput();
+        stderr = spec.getStdError() == null ? script.getAbsolutePath()
+                + ".stderr" : spec.getStdError();
+        exitcode = script.getAbsolutePath() + ".exitcode";
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Writing " + getName() + " script to " + script);
+        }
+
+        String[] cmdline = buildCommandLine(scriptdir, script, exitcode,
+            stdout, stderr);
+
+        if (logger.isDebugEnabled()) {
+            logCommandLine(cmdline);
+        }
+        Process process = Runtime.getRuntime().exec(cmdline, null, null);
+
+        try {
+            process.getOutputStream().close();
+        }
+        catch (IOException e) {
+        }
+
+        try {
+            int code = process.waitFor();
+            if (code != 0) {
+                String errorText = getOutput(process.getInputStream())
+                        + getOutput(process.getErrorStream());
+                throw new ProcessException("Could not submit job ("
+                        + getProperties().getSubmitCommandName()
+                        + " reported an exit code of " + code + "). "
+                        + errorText);
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug(getProperties().getSubmitCommandName()
+                        + " done (exit code " + code + ")");
+            }
+        }
+        catch (InterruptedException e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Interrupted exception while waiting for "
+                        + getProperties().getSubmitCommandName(), e);
+            }
+            if (listener != null) {
+                listener
+                    .processFailed("The submission process was interrupted");
+            }
+        }
+
+        String output = getOutput(process.getInputStream());
+        jobid = parseSubmitCommandOutput(output);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Submitted job with id '" + jobid + "'");
+        }
+
+        if (jobid.length() == 0) {
+            String errorText = getOutput(process.getErrorStream());
+            if (listener != null)
+                listener.processFailed("Received empty jobid!\n" +
+                                       output + "\n" + errorText);
+        }
+
+        process.getInputStream().close();
+
+        getQueuePoller().addJob(
+            job = createJob(jobid, stdout, spec.getStdOutputLocation(), stderr,
+                spec.getStdErrorLocation(), exitcode, this));
+    }
+	
+	protected void writeScript(Writer wr, String exitcodefile, String stdout, String stderr) throws IOException {
 		boolean grid = false;
-		Task task = getTask();
 		JobSpecification spec = getSpec();
-		getSpec().unpackProviderAttributes();
+
+		// Handle some predefined jobTypes
 		String type = (String) spec.getAttribute("jobType");
 		if (logger.isDebugEnabled()) {
 			logger.debug("Job type: " + type);
@@ -58,27 +139,31 @@
 			String gridResource = (String) spec.getAttribute("gridResource");
 			wr.write("universe = grid\n");
 			wr.write("grid_resource = "+gridResource+"\n");
-
-// the below two lines are needed to cause the gridmonitor to be used
-// which is the point of all this...
+			// the below two lines are needed to cause the gridmonitor to be used
+			// which is the point of all this...
 			wr.write("stream_output = False\n");
 			wr.write("stream_error  = False\n");
-
 			wr.write("Transfer_Executable = false\n");
 		}
 		else {
-			wr.write("universe = vanilla\n");
+			if(spec.getAttribute("condor.universe") == null) {
+				wr.write("universe = vanilla\n");
+			}
 		}
+
 		if ("true".equals(spec.getAttribute("holdIsFailure"))) {
 			wr.write("periodic_remove = JobStatus == 5\n");
 		}
+		
 		writeAttr("count", "machine_count = ", wr);
+		wr.write("output = " + quote(stdout) + '\n');
+		wr.write("error = " + quote(stderr) + '\n');
+
 		if (spec.getStdInput() != null) {
 			wr.write("input = " + quote(spec.getStdInput()) + "\n");
 		}
-		wr.write("output = " + quote(stdout) + '\n');
-		wr.write("error = " + quote(stderr) + '\n');
-		Iterator i = spec.getEnvironmentVariableNames().iterator();
+
+		Iterator<String> i = spec.getEnvironmentVariableNames().iterator();
 		if (i.hasNext()) {
 			wr.write("environment = ");
 		}
@@ -98,8 +183,17 @@
 				wr.write("remote_initialdir = " + quote(spec.getDirectory()) + "\n");
 			}
 		}
+		
+        String basename[] = spec.getExecutable().split("/");		
+        FileChannel from = new FileInputStream(spec.getExecutable()).getChannel();
+        FileChannel to = new FileOutputStream(basename[basename.length-1]).getChannel();
+        to.transferFrom(from, 0, from.size());
+        from.close();
+        to.close();
+        
+		spec.getExecutable();
 		wr.write("executable = " + quote(spec.getExecutable()) + "\n");
-		List args = spec.getArgumentsAsList();
+		List<String> args = spec.getArgumentsAsList();
 		if (args != null && args.size() > 0) {
 			wr.write("arguments = ");
 			i = args.iterator();
@@ -110,20 +204,16 @@
 				}
 			}
 		}
-		wr.write('\n');
+ 		wr.write('\n');
 
-		String request_memory = (String) spec.getAttribute("request_memory");
-		if(request_memory != null) {
-			wr.write("request_memory = " + request_memory + '\n');
-		}
-		
-		String resources = (String) spec.getAttribute("condor.resource_list");
-		if (resources != null && resources.length() > 0) {
-			if (logger.isDebugEnabled())
-				logger.debug("condor.resource_list: " + resources);
-			wr.write(resources + '\n');
-		}
-		
+		// Handle all condor attributes specified by the user
+	    for(String a : spec.getAttributeNames()) {
+	    	if(a != null && a.startsWith("condor.")) {
+	    		String attributeName[] = a.split("condor.");
+	    		wr.write(attributeName[1] + " = " + spec.getAttribute(a) + '\n');
+	    	}
+	    }
+	    
 		wr.write("notification = Never\n");
 		wr.write("leave_in_queue = TRUE\n");
 		wr.write("queue\n");



More information about the Swift-commit mailing list