[Swift-commit] cog r3918

swift at ci.uchicago.edu swift at ci.uchicago.edu
Tue Jun 10 01:05:09 CDT 2014


------------------------------------------------------------------------
r3918 | hategan | 2014-06-10 01:02:50 -0500 (Tue, 10 Jun 2014) | 1 line

shell provider
------------------------------------------------------------------------
Index: modules/provider-localscheduler/shell-provider.README
===================================================================
--- modules/provider-localscheduler/shell-provider.README	(revision 0)
+++ modules/provider-localscheduler/shell-provider.README	(revision 3918)
@@ -0,0 +1,276 @@
+===================
+= I. Introduction =
+===================
+
+This file contains information about the shell provider. The shell
+provider is a CoG provider that allows interfacing with cutsom queuing
+systems for which a provider is not otherwise available.
+
+Names contained within angle brackets are meant to be replaced with
+specific strings or values. For example, if you want to name your
+provider "PBS" then replace all occurrences of <providerName> with
+"PBS".
+
+There are five basic ingredients to a shell provider: 
+
+	1. A file that informs the job abstraction code about the existence
+	of the new provider (cog-provider.properties)
+	
+	2. A file that contains settings for the new provider
+	(provider-<providerName>.properties)
+	
+	3. A submit script that receives job information and is responsible
+	for starting the job.
+	
+	4. A cancel script that can be used to cancel a submitted job
+	
+	5. A status script that provides information about the status of
+	jobs
+
+For a sample implementation, see the 'examples' directory.
+
+=============
+= II. Setup =
+=============
+
+1. cog-provider.properties
+--------------------------
+
+The first step is creating the cog-provider.properties file. For each
+provider that you want to add, make sure that the file contains the
+following lines:
+
+	provider=<providerName>
+	sandbox=false
+	executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.shell.execution.TaskHandlerImpl
+	securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
+
+You can add multiple sets of these, one for each provider you want to
+implement. The cog-provider.properties file must be put in a directory
+(or .jar file) that is included in Java's class path at run-time. If
+using any of the CoG modules in a normal deployment or Swift, the
+dist/<moduleName>/etc directory is automatically included in the class
+path.
+
+If you wanted this provider to be available on a remote site accessed by
+coasters, you could put this file in a directory that would be added to
+the CLASSPATH environment variable in the shell profile (although at the
+time of this writing the author is not entirely sure that CLASSPATH is
+used by the coaster bootstrap process).
+
+
+2. provider-<providerName>.properties
+-------------------------------------
+
+This file must contain the following entries:
+
+	submit.command=<path to submit command>
+	cancel.command=<path to cancel command>
+	poll.command=<path to status command>
+
+Optionally, it could contain the following property:
+
+	poll.interval=<seconds>
+
+The poll interval indicates the interval at which the status of jobs is
+checked. The check is done regularly at this interval for all jobs
+sumbitted through this provider. The default is 5 seconds.
+
+
+3. The submit script
+--------------------
+
+This script is responsible for submitting the actual job. It receives
+input on STDIN. It must produce its normal output on STDOUT and if an
+error occurs, it should go on STDERR.
+
+Following are the syntactic rules for the input:
+	- empty lines must be ignored
+	- lines starting with the '#' character must also be ignored
+	- each other line is a "key=value" pair
+	- values can contain unescaped '=' symbols, so when parsing lines,
+	the first equal sign signifies the end of the key
+	- new lines are escaped using '\n' and backslashes are escaped
+	as '\\'; there are no other escape codes
+	- for certain keys, multiple lines with the same key can appear 
+	in the input; this is the way to encode a list
+	- keys appear in the input in a well defined order; however, some
+	keys can be missing, indicating that the corresponding value is
+	not set
+
+The following keys (in the specified order) can appear in the input. 
+Optional keys are marked with '?' (although no '?' will appear in the
+actual input to the submit script). Similarly, lists (i.e. cases when 
+the key can appear zero or more times) are marked with '*'. Literals
+are single-quoted. The pipe ('|') represents mutually exclusive choices.
+
+?directory=<dir> 
+	if specified, run the job in <dir>
+	
+executable=<path>
+	this is the path to the executable
+
+*arg=<string>
+	one of the arguments to the executable
+
+?stdin.location='local'|'remote'
+	indicates that the executable should be fed a file to its STDIN and
+	whether this file is on the submit side ('local') or on the node 
+	where the job will run ('remote'). If this key is present, then
+	the following key is mandatory. 
+	
+	Practically, if the file is located on a shared file system 
+	accessible (and mounted in the same place) by both the submit
+	host and the compute node, then treating 'remote' as 'local'
+	and the other way around should produce no visible difference.
+	
+	For the purpose of implementing a provider that can be used to
+	control coaster blocks, assuming a shared file system and 
+	implementing this in the easiest way is a reasonable way to go.
+
+?stdin.path=<path>
+	The path to the file to be fed to the STDIN of the job.
+
+?stdout.location='local'|'remote'|'tmp'
+	specifies where the process STDOUT should be redirected. With
+	the exception of 'tmp', the same considerations as in the case
+	of stdin.location above apply.
+	
+	'tmp' is a special case. It says that the submit script must
+	re-direct the job stdout to some temporary file AND return
+	(on STDOUT) the path to this file in the following form:
+	
+	stdout.path=<path>
+	
+	if 'tmp' is specified, the following key (stdout.path) will
+	not be present
+
+?stdout.path=<path>
+	where to re-direct the job stdout if stdout.location is 'local'
+	or 'remote'
+
+?stderr.location='local'|'remote'|'tmp'
+	see stdout.location
+
+?stderr.path=<path>
+	see stdout.path
+
+*env.<name>=<value>
+	Specifies an environment variable that muts be available in the
+	job environment
+
+*attr.<name>=<value>
+	These are miscellaneous job attributes. Their interpretation
+	is up to the provider. Some example of attributes include
+	'maxwalltime', 'queue', 'project', etc. Unknown attributes 
+	should be ignored.
+	
+	Coasters will set the following attributes:
+	maxwalltime=<minutes> - obvious
+	count=<number> - the number of process copies to start
+
+-----
+Small aside: file staging/cleanup is optional. If not implemented,
+an error should be produced.
+-----
+
+*stagein.source=<path>
+	Specifies a stage-in request. If present, it will be followed
+	by stagein.destination.
+	<path> represents the local file that must be copied to the 
+	compute node.
+	
+*stagein.destination=<path>
+	Specifies where (on the compute node) the most recent 
+	stagein.source should be copied
+
+*stageout.source=<path>
+	Specifies a stage-out request. If present, it will be followed
+	by stageout.destination and stageout.mode.
+	<path> represents the remote file that must be copied from the
+	compute node to the submit node.
+	
+*stageout.destination=<path>
+	Specifies where (on the submit node) the most recent 
+	stageout.source should be copied
+
+*stagout.mode=<mode>
+	Specifies under what circumstances the most recent stagout 
+	request should be fulfilled. The <mode> is a bit-wise OR
+	of the following possible values:
+	
+	1 - always: stage out the file whether the job succeeds or 
+	not. If the file cannot be staged out, produce an error
+	2 - if present: stage out the file only if it can be found
+	3 - on error: only attempt to stage out the file if the job
+	fails; produce an error if the job failed and the file cannot
+	be staged-out
+	4 - on success: like "on error" except if the job succeeds.
+
+*cleanup=<path>
+	A list of files and or directories to be deleted after the
+	job is done and the stage out process is completed. Reasonable
+	implementations could ensure that the cleanup paths are
+	limited to the job directory
+
+The submit script, after starting or queuing the job, must reply,
+on STDOUT, with "jobid=<jobid>", where <jobid> is a handle that
+can be used to subsequently identify the job for purposes of 
+cancelling or querying its status. If the submission is successful,
+the implementation should return an exit code of 0.
+
+If the implementation finds that the specification is erroneous, or
+if any errors occur submitting the job, an error message should be
+printed on STDERR and an exit code != 0 should be produced.
+
+Here is an example:
+	STDIN:
+		directory=/tmp
+		executable=/bin/ls
+		arg=-al
+		arg=*.txt
+		stdout.location=tmp
+	
+	A local execution submit script could perform the following:
+		cd /tmp
+		/bin/ls -al *.txt 1>/tmp/tempx0001 &
+		PID=$!
+		echo "stdout.path=/tmp/tempx0001"
+		echo "jobid=$PID"
+
+
+4. The cancel script
+--------------------
+
+The cancel script must cancel the job with the <jobid> passed to 
+it as the firs command-line argument.
+
+
+5. The status script
+--------------------
+
+The status script implements a mechanism for querying the status
+of jobs. It receives a list of <jobid>s on the command line, one
+in each argument, and returns, if successful, on STDOUT, a list
+of lines, one for each job, of the following format:
+
+<jobid> 'Q'|'R'|'C'|'F' [<exitCode> [<errorMessage>]]
+
+The meaning of the second column is as follows:
+	'Q': the job is queued
+	'R': the job is running
+	'C': the job is completed
+		optionally, an <exitCode> could be present
+	'F': the job is failed
+		optionally, an <exitCoud> should be present, along possibly
+		with an <errorMessage>
+
+There is some level of ambiguity about the interpretation of non-zero
+exit codes. An implementation can chose to consider all jobs that 
+do not run any more as completed, and use the exit code as an 
+indication of whether the completion was successful or not. Such
+an implementation might reserve the failed state for cases in
+which the job has failed to run due to queuing system conditions,
+and provider, in <errorMessage> an appropriate explanation of the
+failure condition. Other implementations may chose to mark all jobs
+completed with non-zero exit codes as failed.
Index: modules/provider-localscheduler/examples/shell-provider/ststat
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/ststat	(revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/ststat	(revision 3918)
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+while [ "$1" != "" ]; do
+	if [ $(pidof $1) ]; then
+		echo $1 R
+	else
+		# Assume succes if the process is not running
+		# This is just a simple example
+		echo $1 C 0
+	fi
+	shift 1
+done
\ No newline at end of file

Property changes on: modules/provider-localscheduler/examples/shell-provider/ststat
___________________________________________________________________
Added: svn:executable
   + *

Index: modules/provider-localscheduler/examples/shell-provider/provider-shelltest.properties
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/provider-shelltest.properties	(revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/provider-shelltest.properties	(revision 3918)
@@ -0,0 +1,3 @@
+submit.command=/path/to/stsubmit
+cancel.command=/path/to/stcancel
+poll.command=/path/to/ststat
\ No newline at end of file
Index: modules/provider-localscheduler/examples/shell-provider/stsubmit
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/stsubmit	(revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/stsubmit	(revision 3918)
@@ -0,0 +1,92 @@
+#!/bin/bash
+
+
+rm -f /tmp/stsubmit
+EXECUTABLE=
+DIR=
+ARGS=
+STDOUTLOC=
+STDOUTPATH=
+STDERRLOC=
+STDERRPATH=
+STDINLOC=
+STDINPATH=
+STDIN=
+STDOUT=
+STDERR=
+
+while read LINE; do
+	echo $LINE >>/tmp/stsubmit
+	case $LINE in
+		executable=*)
+			EXECUTABLE=${LINE#executable=}
+			;;
+		directory=*)
+			DIR=${LINE#directory=}
+			;;
+		arg=*)
+			ARGS="$ARGS ${LINE#arg=}"
+			;;
+		attr.*)
+			# ignore attributes
+			;;
+		stdin.location=*)
+			STDINLOC=${LINE#stdin.location=}
+			;;
+		stdin.path=*)
+			STDINPATH=${LINE#stdin.path=}
+			;;
+		stdout.location=*)
+			STDOUTLOC=${LINE#stdout.location=}
+			;;
+		stdout.path=*)
+			STDOUTPATH=${LINE#stdout.path=}
+			;;
+		stderr.location=*)
+			STDERRLOC=${LINE#stderr.location=}
+			;;
+		stderr.path=*)
+			STDERRPATH=${LINE#stderr.path=}
+			;;
+		env.*)
+			LINE2=${LINE#env.}
+			# split on '='
+			ELS=(${LINE2//=/})
+			NAME=${ELS[0]}
+			VALUE=${ELS[1]}
+			export $NAME=$VALUE
+			;;
+		*)
+			echo "Don't know how to interpret line: $LINE" >&2
+			exit 2
+	esac
+done < /dev/stdin
+
+if [ "$STDOUTLOC" == "tmp" ]; then
+	STDOUTPATH=$(mktemp)
+	echo "stdout.path=$STDOUTPATH"
+fi
+if [ "$STDOUTPATH" != "" ]; then
+	STDOUT="1> $STDOUTPATH"
+fi
+
+if [ "$STDERRLOC" == "tmp" ]; then
+	STDERRPATH=$(mktemp)
+	echo "stderr.path=$STDERRPATH"
+fi
+if [ "$STDERRPATH" != "" ]; then
+	STDERR="2> $STDERRPATH"
+fi
+
+if [ "$STDINLOC" != "" ]; then
+	STDIN="< $STDINLOC"
+fi
+
+CMD="$EXECUTABLE $ARGS $STDIN $STDOUT $STDERR"
+
+cd $DIR
+$CMD &
+
+echo "jobid=$!"
+
+exit 0
\ No newline at end of file

Property changes on: modules/provider-localscheduler/examples/shell-provider/stsubmit
___________________________________________________________________
Added: svn:executable
   + *

Index: modules/provider-localscheduler/examples/shell-provider/cog-provider.properties
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/cog-provider.properties	(revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/cog-provider.properties	(revision 3918)
@@ -0,0 +1,4 @@
+provider=shelltest
+sandbox=false
+executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.shell.execution.TaskHandlerImpl
+securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
Index: modules/provider-localscheduler/examples/shell-provider/stcancel
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/stcancel	(revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/stcancel	(revision 3918)
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+kill $1

Property changes on: modules/provider-localscheduler/examples/shell-provider/stcancel
___________________________________________________________________
Added: svn:executable
   + *

Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/QueuePoller.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/QueuePoller.java	(revision 3918)
@@ -0,0 +1,107 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.shell;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+
+public class QueuePoller extends AbstractQueuePoller {
+	public static final Logger logger = Logger.getLogger(QueuePoller.class);
+
+
+	public QueuePoller(Properties properties) {
+		super(properties.getName() + "provider queue poller", properties);
+	}
+
+	private static String[] CMDARRAY;
+
+	protected synchronized String[] getCMDArray() {
+        String[] cmda = new String[1 + getJobs().size()];
+        cmda[0] = getProperties().getPollCommand();
+        int i = 1;
+        for (Job j : getJobs().values()) {
+            cmda[i++] = j.getJobID();
+        }
+        return cmda;
+	}
+
+    protected void processStdout(InputStream is) throws IOException {
+		BufferedReader br = new BufferedReader(new InputStreamReader(is));
+		String line;
+		do {
+			line = br.readLine();
+			if (line != null) {
+				try {
+					String[] el = line.trim().split("\\s+", 4);
+					String jobid = el[0];
+					String message = null;
+					
+					if (el.length > 3) {
+					    message = el[3];
+					}
+					
+					Job job = getJob(jobid);
+					if (job == null) {
+					    logger.warn("Received status for an unknown job: '" + jobid + "'");
+					}
+					
+					if (el[1].length() != 1) {
+                        job.setMessage("Received unknown status code: '" + el[1] + "'");
+                        job.setState(Job.STATE_FAILED);
+                    }
+					
+					char status = el[1].charAt(0);
+					if (logger.isDebugEnabled()) {
+                        logger.debug("Status for " + jobid + " is " + status);
+                    }
+					
+					if (el.length > 2) {
+                        int exitCode = Integer.parseInt(el[2]);
+                        job.setExitcode(exitCode);
+                    }
+					if (status == 'Q') {
+					    job.setState(Job.STATE_QUEUED);
+					}
+					else if (status == 'R') {
+					    job.setState(Job.STATE_RUNNING);
+					}
+					else if (status == 'C') {
+					    job.setState(Job.STATE_DONE);
+					}
+					else if (status == 'F') {
+					    if (message != null) {
+					        job.setMessage(message);
+					    }
+					    else {
+					        job.setMessage("Job failed");
+					    }
+					    job.setState(Job.STATE_FAILED);
+					}
+					
+				}
+				catch (Exception e) {
+					logger.warn("Exception caught while handling "
+							+ getProperties().getPollCommandName()
+							+ " output: " + line, e);
+				}
+			}
+		} while (line != null);
+	}
+
+	protected void processStderr(InputStream is) throws IOException {
+	    // not used
+	}
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/TaskHandlerImpl.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/TaskHandlerImpl.java	(revision 3918)
@@ -0,0 +1,50 @@
+
+// ----------------------------------------------------------------------
+// This code is developed as part of the Java CoG Kit project
+// The terms of the license can be found at http://www.cogkit.org/license
+// This message may not be removed or altered.
+// ----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.shell.execution;
+
+import org.globus.cog.abstraction.impl.scheduler.shell.Properties;
+import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
+
+public class TaskHandlerImpl extends
+		org.globus.cog.abstraction.impl.common.execution.TaskHandlerImpl {
+    private Properties props;
+    private String name;
+
+	protected DelegatedTaskHandler newDelegatedTaskHandler() {
+		return new JobSubmissionTaskHandler(props);
+	}
+
+	public String getName() {
+		return name;
+	}
+	
+	public void setName(String name) {
+	    if (this.name != null) {
+	        throw new IllegalStateException("Name is already set to '" + name + "'");
+	    }
+	    this.name = name;
+	    props = Properties.getProperties(name);
+	    validateProperties();
+	}
+	
+	private void validateProperties() {
+        if (props.getSubmitCommand() == null) {
+            throw new IllegalArgumentException("Invalid configuration for provider '" + name + "'. No submit command specified.");
+        }
+        
+        if (props.getRemoveCommand() == null) {
+            throw new IllegalArgumentException("Invalid configuration for provider '" + name + "'. No cancel command specified.");
+        }
+        
+        if (props.getPollCommand() == null) {
+            throw new IllegalArgumentException("Invalid configuration for provider '" + name + "'. No poll command specified.");
+        }
+    }
+	
+	
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/JobSubmissionTaskHandler.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/JobSubmissionTaskHandler.java	(revision 3918)
@@ -0,0 +1,26 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.shell.execution;
+
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractJobSubmissionTaskHandler;
+import org.globus.cog.abstraction.impl.scheduler.shell.Properties;
+import org.globus.cog.abstraction.impl.scheduler.shell.ShellExecutor;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class JobSubmissionTaskHandler extends AbstractJobSubmissionTaskHandler {
+    private Properties props;
+    
+    public JobSubmissionTaskHandler(Properties props) {
+        this.props = props;
+    }
+    
+	protected AbstractExecutor newExecutor(Task task,
+			AbstractJobSubmissionTaskHandler th) {
+		return new ShellExecutor(props, task, th);
+	}
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/ShellExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/ShellExecutor.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/ShellExecutor.java	(revision 3918)
@@ -0,0 +1,345 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.shell;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.EnumSet;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.FileLocation;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.StagingSet;
+import org.globus.cog.abstraction.interfaces.StagingSetEntry;
+import org.globus.cog.abstraction.interfaces.StagingSetEntry.Mode;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.gsi.gssapi.auth.AuthorizationException;
+
+public class ShellExecutor extends AbstractExecutor {
+	public static final Logger logger = Logger.getLogger(ShellExecutor.class);
+	
+	private Properties props;
+	private static volatile int idCounter;
+	private String name, jobid;
+	private boolean active;
+	
+	public ShellExecutor(Properties props, Task task, ProcessListener listener) {
+		super(task, listener);
+		this.props = props;
+		name = "cog-" + IDF.format(idCounter++);
+	}
+
+    private static NumberFormat IDF = new DecimalFormat("000000");
+			
+	protected void writeSpec(String name, String value, Writer wr) 
+	        throws IOException {
+        if (value != null) {
+            wr.write(name);
+            wr.write('=');
+            wr.write(String.valueOf(value));
+            wr.write('\n');
+        }
+    }
+	
+	private static class ProcessStreams {
+	    private Process process;
+	    public Writer stdin;
+	    private Reader stdout, stderr;
+	    private StringBuilder out, err;
+	    
+	    public ProcessStreams(Process process) {
+	        this.process = process;
+	        this.stdin = new OutputStreamWriter(process.getOutputStream());
+	        this.stdout = new InputStreamReader(process.getInputStream());
+	        this.stderr = new InputStreamReader(process.getErrorStream());
+	        out = new StringBuilder();
+	        err = new StringBuilder();
+	    }	    
+	}
+
+	@Override
+    public void start() throws AuthorizationException, IOException, ProcessException {
+        Process p = Runtime.getRuntime().exec(props.getSubmitCommand());
+        ProcessStreams ps = new ProcessStreams(p);
+        
+        JobSpecification spec = this.getSpec();
+                
+        // Basic stuff
+        if (!spec.getExecutableLocation().equals(FileLocation.REMOTE)) {
+            throw new IllegalArgumentException("Only remote executables supported");
+        }
+        writeSpec("directory", escape(spec.getDirectory()), ps.stdin);
+        writeSpec("executable", escape(spec.getExecutable()), ps.stdin);
+        if (spec.getArgumentsAsList() != null) {
+            for (String arg : spec.getArgumentsAsList()) {
+                writeSpec("arg", escape(arg), ps.stdin);
+            }
+        }
+        
+        // Standard streams
+        if (spec.getStdInputLocation().overlaps(FileLocation.MEMORY)) {
+            throw new IllegalArgumentException("In-memory STDIN not supported");
+        }
+        
+        if (spec.getStdInput() != null) {
+            writeSpec("stdin.location", spec.getStdInputLocation().toString(), ps.stdin);
+            writeSpec("stdin.path", spec.getStdInput(), ps.stdin);
+        }
+        
+        writeStandardFileSpec(spec.getStdOutput(), spec.getStdOutputLocation(), "stdout", ps);
+        writeStandardFileSpec(spec.getStdError(), spec.getStdErrorLocation(), "stderr", ps);
+        
+        // Environment variables
+        for (String en : spec.getEnvironmentVariableNames()) {
+            writeSpec("env." + en, escape(spec.getEnvironmentVariable(en)), ps.stdin);
+        }
+        
+        // Attributes
+        for (String attr : spec.getAttributeNames()) {
+            Object o = spec.getAttribute(attr);
+            if (o != null) {
+                writeSpec("attr." + attr, escape(o.toString()), ps.stdin);
+            }
+        }
+        
+        // Staging stuff
+        writeStagingSpec(spec.getStageIn(), "stagein", false, ps);
+        writeStagingSpec(spec.getStageOut(), "stageout", true, ps);
+        
+        // Cleanup
+        if (spec.getCleanUpSet() != null) {
+            for (String ce : spec.getCleanUpSet()) {
+                writeSpec("cleanup", escape(ce), ps.stdin);
+            }
+        }
+        
+        ps.stdin.close();
+        
+        try {
+            int ec = p.waitFor();
+            if (logger.isDebugEnabled()) {
+                logger.debug(props.getSubmitCommandName() + " done (exit code " + ec + ")");
+            }
+            if (ec != 0) {
+                throw new ProcessException(props.getSubmitCommand() + " failed: " + read(ps.stderr));
+            }
+        }
+        catch (InterruptedException e) {
+            throw new ProcessException(e);
+        }
+        
+        parseOutput(ps.stdout);
+        active = true;
+        if (logger.isDebugEnabled()) {
+            logger.debug("Submitted job with id '" + jobid + "'");
+        }
+        getQueuePoller().addJob(
+            job = createJob(jobid, stdout, spec.getStdOutputLocation(), stderr,
+                spec.getStdErrorLocation(), exitcode, this));
+    }
+	
+	
+	private void parseOutput(Reader r) throws IOException, ProcessException {
+        BufferedReader br = new BufferedReader(r);
+        String line;
+        while ((line = br.readLine()) != null) {
+            String[] els = line.split("=", 2);
+            if (els.length < 2) {
+                throw new ProcessException("Invalid output from '" + 
+                        props.getSubmitCommand() + "': " + line);
+            }
+            if ("jobid".equals(els[0])) {
+                jobid = els[1];
+                if ("".equals(jobid)) {
+                    throw new ProcessException("Received empty job id from '" + 
+                            props.getSubmitCommand() + "'");
+                }
+            }
+            else if ("stdout.path".equals(els[0])) {
+                stdout = els[1];
+            }
+            else if ("stderr.path".equals(els[0])) {
+                stderr = els[1];
+            }
+            else {
+                throw new ProcessException("Invalid output from '" +
+                        props.getSubmitCommand() + "': " + line);
+            }
+        }
+        if (jobid == null) {
+            throw new ProcessException("No job id received from '" + props.getSubmitCommand() + "'");
+        }
+    }
+	
+	@Override
+    public void processCompleted(int exitCode) {
+	    active = false;
+        super.processCompleted(exitCode);
+    }
+
+
+    @Override
+    public void processFailed(String message) {
+        active = false;
+        super.processFailed(message);
+    }
+
+
+    @Override
+    public void processFailed(Exception e) {
+        active = false;
+        super.processFailed(e);
+    }
+
+
+    public void cancel() throws TaskSubmissionException {
+        if (!active) {
+            throw new TaskSubmissionException("Can only cancel an active task");
+        }
+        String[] cmdline = new String[] { getProperties().getRemoveCommand(), jobid };
+        try {
+            logger.debug("Canceling job: jobid=" + jobid);
+            Process process = Runtime.getRuntime().exec(cmdline, null, null);
+            int ec = process.waitFor();
+            if (ec != 0) {
+                throw new TaskSubmissionException("Failed to cancel task: " + 
+                        read(new InputStreamReader(process.getInputStream())));
+            }
+        }
+        catch (InterruptedException e) {
+            throw new TaskSubmissionException(
+                "Thread interrupted while waiting for "
+                        + getProperties().getRemoveCommandName() + " to finish");
+        }
+        catch (IOException e) {
+            throw new TaskSubmissionException("Failed to cancel task", e);
+        }
+    }
+
+    private String escape(String value) {
+        if (value == null) {
+            return null;
+        }
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < value.length(); i++) {
+            char c = value.charAt(i);
+            if (c == '\n' || c == '\\') {
+                sb.append("\\");
+                if (c == '\n') {
+                    sb.append('n');
+                }
+                else {
+                    sb.append('\\');
+                }
+            }
+            else {
+                sb.append(c);
+            }
+        }
+        return sb.toString();
+    }
+
+    private String read(Reader r) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        BufferedReader br = new BufferedReader(r);
+        String line = br.readLine();
+        while (line != null) {
+            sb.append(line);
+            sb.append('\n');
+            line = br.readLine();
+        }
+        return sb.toString();
+    }
+
+    private void writeStagingSpec(StagingSet ss, String name, boolean modes, ProcessStreams pc) throws IOException {
+        if (ss == null || ss.isEmpty()) {
+            return;
+        }
+        for (StagingSetEntry se : ss) {
+            writeSpec(name + ".source", escape(se.getSource()), pc.stdin);
+            writeSpec(name + ".destination", escape(se.getDestination()), pc.stdin);
+            if (modes) {
+                writeSpec(name + ".mode", stagingModeToStr(se.getMode()), pc.stdin);
+            }
+        }
+    }
+
+    private String stagingModeToStr(EnumSet<Mode> mode) {
+        int s = 0;
+        for (Mode m : mode) {
+            s += m.getId();
+        }
+        return String.valueOf(s);
+    }
+
+    private void writeStandardFileSpec(String path, FileLocation loc, String name, ProcessStreams pc) 
+            throws IOException {
+        if (path == null) {
+            if (loc.overlaps(FileLocation.MEMORY)) {
+                writeSpec(name + ".location", "tmp", pc.stdin);
+            }
+        }
+        else {
+            writeSpec("stdout.location", loc.remove(FileLocation.MEMORY).toString(), pc.stdin);
+            writeSpec("stdout.path", path, pc.stdin);
+        }
+    }
+
+    @Override
+    protected void writeScript(Writer wr, String exitcode, String stdout, String stderr)
+            throws IOException {
+        // not used
+    }
+
+    @Override
+	protected String getName() {
+		return "Shell";
+	}
+
+	@Override
+	protected AbstractProperties getProperties() {
+		return props;
+	}
+
+	@Override
+	protected Job createJob(String jobid, String stdout,
+			FileLocation stdOutputLocation, String stderr,
+			FileLocation stdErrorLocation, String exitcode,
+			AbstractExecutor executor) {
+		return new Job(jobid, stdout, stdOutputLocation, stderr,
+				stdErrorLocation, exitcode, executor);
+	}
+
+	private static QueuePoller poller;
+
+	@Override
+	protected AbstractQueuePoller getQueuePoller() {
+		synchronized(ShellExecutor.class) {
+			if (poller == null) {
+				poller = new QueuePoller(props);
+				poller.start();
+			}
+			return poller;
+		}
+	}
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/Properties.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/Properties.java	(revision 3918)
@@ -0,0 +1,54 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 20, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.shell;
+
+// import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+
+public class Properties extends AbstractProperties {
+
+    private static final long serialVersionUID = 1L;
+	
+	public static final String POLL_INTERVAL = "poll.interval";
+	public static final String SUBMIT_COMMAND = "submit.command";
+	public static final String POLL_COMMAND = "poll.command";
+	public static final String CANCEL_COMMAND = "cancel.command";
+	public static final String NAME = "name";
+
+	public static synchronized Properties getProperties(String name) {
+	    Properties properties = new Properties();
+		properties.load("provider-" + name + ".properties");
+		properties.put(NAME, name);
+		return properties;
+	}
+	
+	protected void setDefaults() {
+		setPollInterval(5);
+	}
+
+
+	public String getPollCommandName() {
+		return POLL_COMMAND;
+	}
+
+
+	public String getRemoveCommandName() {
+		return CANCEL_COMMAND;
+	}
+
+
+	public String getSubmitCommandName() {
+		return SUBMIT_COMMAND;
+	}
+
+	public String getName() {
+	    return getProperty(NAME);
+	}
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/execution/TaskHandlerImpl.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/execution/TaskHandlerImpl.java	(working copy)
@@ -15,7 +15,7 @@
 		return new JobSubmissionTaskHandler();
 	}
 
-	protected String getName() {
+	public String getName() {
 		return "Slurm";
 	}
 }
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java	(working copy)
@@ -22,12 +22,7 @@
 		return new JobSubmissionTaskHandler();
 	}
 
-	protected String getName() {
+	public String getName() {
 		return "Cobalt";
 	}
-
-    public String toString()
-    {
-        return "TaskHandlerImpl(execution cobalt)";
-    }
 }
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/execution/TaskHandlerImpl.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/execution/TaskHandlerImpl.java	(working copy)
@@ -22,7 +22,7 @@
 		return new JobSubmissionTaskHandler();
 	}
 
-	protected String getName() {
+	public String getName() {
 		return "Condor";
 	}
 }
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java	(working copy)
@@ -21,12 +21,7 @@
 		return new JobSubmissionTaskHandler();
 	}
 
-	protected String getName() {
+	public String getName() {
 		return "LSF";
 	}
-
-    public String toString()
-    {
-        return "TaskHandlerImpl(execution LSF)";
-    }
 }
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/TaskHandlerImpl.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/TaskHandlerImpl.java	(working copy)
@@ -22,7 +22,7 @@
 		return new JobSubmissionTaskHandler();
 	}
 
-	protected String getName() {
+	public String getName() {
 		return "PBS";
 	}
 }
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractExecutor.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractExecutor.java	(working copy)
@@ -61,11 +61,10 @@
         validate(task);
     }
 
-    public void start()
-    throws AuthorizationException, IOException, ProcessException {
+    public void start() throws AuthorizationException, IOException, ProcessException {
 
     	String scriptdirPath = System.getProperty("script.dir");
-    	if(scriptdirPath == null) {
+    	if (scriptdirPath == null) {
     		scriptdirPath = System.getProperty("user.home")
                     + File.separatorChar + ".globus" + File.separatorChar
                     + "scripts";
@@ -107,6 +106,10 @@
 
         try {
             int code = process.waitFor();
+            if (logger.isDebugEnabled()) {
+                logger.debug(getProperties().getSubmitCommandName()
+                        + " done (exit code " + code + ")");
+            }
             if (code != 0) {
                 String errorText = getOutput(process.getInputStream())
                         + getOutput(process.getErrorStream());
@@ -115,10 +118,6 @@
                         + " reported an exit code of " + code + "). "
                         + errorText);
             }
-            if (logger.isDebugEnabled()) {
-                logger.debug(getProperties().getSubmitCommandName()
-                        + " done (exit code " + code + ")");
-            }
         }
         catch (InterruptedException e) {
             if (logger.isDebugEnabled()) {
@@ -126,8 +125,7 @@
                         + getProperties().getSubmitCommandName(), e);
             }
             if (listener != null) {
-                listener
-                    .processFailed("The submission process was interrupted");
+                listener.processFailed("The submission process was interrupted");
             }
         }
 
@@ -268,8 +266,7 @@
     }
 
     protected abstract void writeScript(Writer wr, String exitcode,
-                                        String stdout, String stderr)
-    throws IOException;
+            String stdout, String stderr) throws IOException;
 
     protected JobSpecification getSpec() {
         return spec;
@@ -403,8 +400,7 @@
        @param list May be null or empty
        @throws IOException
     */
-    protected void writeQuotedList(Writer writer, List<String> list)
-    throws IOException
+    protected void writeQuotedList(Writer writer, List<String> list) throws IOException
     {
         if (list != null && list.size() > 0) {
             writer.write(' ');
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractQueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractQueuePoller.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractQueuePoller.java	(working copy)
@@ -15,7 +15,6 @@
 import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 
@@ -28,7 +27,8 @@
     public static final int MAX_CONSECUTIVE_FAILURES = 3;
 
     private String name;
-    private LinkedList newjobs, donejobs;
+    private LinkedList<Job> newjobs;
+    private LinkedList<String> donejobs;
     private Map<String, Job> jobs;
     boolean any = false;
     private int sleepTime;
@@ -45,8 +45,8 @@
         this.properties = properties;
         this.sleepTime = properties.getPollInterval() * 1000;
         jobs = new HashMap<String, Job>();
-        newjobs = new LinkedList();
-        donejobs = new LinkedList();
+        newjobs = new LinkedList<Job>();
+        donejobs = new LinkedList<String>();
     }
 
     public void start() {
@@ -102,7 +102,7 @@
         else {
             synchronized (newjobs) {
                 while (!newjobs.isEmpty()) {
-                    Job job = (Job) newjobs.removeFirst();
+                    Job job = newjobs.removeFirst();
                     jobs.put(job.getJobID(), job);
                 }
             }
@@ -115,7 +115,7 @@
         }
         else {
             while (!donejobs.isEmpty()) {
-                String jobid = (String) donejobs.removeFirst();
+                String jobid = donejobs.removeFirst();
                 removeDoneJob(jobid);
             }
         }
@@ -137,9 +137,7 @@
         if (logger.isDebugEnabled()) {
             logger.debug("Fail all: " + message);
         }
-        Iterator i = jobs.values().iterator();
-        while (i.hasNext()) {
-            Job job = (Job) i.next();
+        for (Job job : jobs.values()) {
             try {
                 job.fail(message);
             }
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java	(working copy)
@@ -24,11 +24,12 @@
 	public static final int STATE_QUEUED = 1;
 	public static final int STATE_RUNNING = 2;
 	public static final int STATE_DONE = 3;
+	public static final int STATE_FAILED = 5;
 	public static final int STATE_UNKNOWN = 4;
 
 	private static final int NO_EXITCODE = -1;
 
-	private String jobID, location;
+	private String jobID, location, message;
 	private String exitcodeFileName;
 	private String stdout, stderr;
 	private FileLocation outLoc, errLoc;
@@ -50,8 +51,12 @@
 		this.ticks = 0;
 		this.exitcode = NO_EXITCODE;
 	}
+	
+	public void setMessage(String message) {
+	    this.message = message;
+	}
 
-	public boolean close() {
+	public boolean close(int tentativeState) {
 		if (logger.isDebugEnabled()) {
 			logger.debug("Closing " + jobID);
 		}
@@ -63,9 +68,8 @@
 			f = new File(exitcodeFileName);	
 			if (f != null && !f.exists()) {
 				if (ticks == 5) {
-					listener
-							.processFailed(new ProcessException(
-									"Exitcode file (" + exitcodeFileName + ") not found 5 queue polls after the job was reported done"));
+					listener.processFailed(new ProcessException(
+					        "Exitcode file (" + exitcodeFileName + ") not found 5 queue polls after the job was reported done"));
 					return true;
 				}
 				else {
@@ -78,11 +82,11 @@
 			}
 		}
 
-		processExitCode();
+		processExitCode(tentativeState);
 		return true;
 	}
 
-	protected boolean processExitCode() {
+	protected boolean processExitCode(int tentativeState) {
 		if (logger.isDebugEnabled()) {
 			logger.debug("Processing exit code for job " + jobID);
 		}
@@ -101,7 +105,17 @@
 			if (logger.isDebugEnabled()) {
 				logger.debug("Exit code: " + exitcode);
 			}
-			listener.processCompleted(exitcode);
+			if (tentativeState == STATE_FAILED) {
+			    if (message == null) {
+			        listener.processFailed("Job failed (exit code: " + exitcode + ")"); 
+			    }
+			    else {
+			        listener.processFailed(message);
+			    }
+			}
+			else {
+			    listener.processCompleted(exitcode);
+			}
 			return true;
 		}
 		catch (Exception e) {
@@ -155,7 +169,7 @@
 	}
 
 	public synchronized void await() {
-		while (state != STATE_DONE) {
+		while (state != STATE_DONE && state != STATE_FAILED) {
 			try {
 				wait();
 			}
@@ -173,9 +187,8 @@
 			return;
 		}
 		else {
-			if (state == STATE_DONE) {
-				if (close()) {
-					this.state = STATE_DONE;
+			if (state == STATE_DONE || state == STATE_FAILED) {
+				if (close(state)) {
 					synchronized (this) {
 						notify();
 					}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractJobSubmissionTaskHandler.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractJobSubmissionTaskHandler.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractJobSubmissionTaskHandler.java	(working copy)
@@ -14,7 +14,6 @@
 import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
 import org.globus.cog.abstraction.impl.common.task.InvalidServiceContactException;
 import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
-import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
 import org.globus.cog.abstraction.interfaces.FileLocation;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
 import org.globus.cog.abstraction.interfaces.Status;
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/execution/TaskHandlerImpl.java	(revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/execution/TaskHandlerImpl.java	(working copy)
@@ -22,7 +22,7 @@
 		return new JobSubmissionTaskHandler();
 	}
 
-	protected String getName() {
+	public String getName() {
 		return "SGE";
 	}
 }
\ No newline at end of file



More information about the Swift-commit mailing list