[Swift-commit] cog r3918
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Tue Jun 10 01:05:09 CDT 2014
------------------------------------------------------------------------
r3918 | hategan | 2014-06-10 01:02:50 -0500 (Tue, 10 Jun 2014) | 1 line
shell provider
------------------------------------------------------------------------
Index: modules/provider-localscheduler/shell-provider.README
===================================================================
--- modules/provider-localscheduler/shell-provider.README (revision 0)
+++ modules/provider-localscheduler/shell-provider.README (revision 3918)
@@ -0,0 +1,276 @@
+===================
+= I. Introduction =
+===================
+
+This file contains information about the shell provider. The shell
+provider is a CoG provider that allows interfacing with cutsom queuing
+systems for which a provider is not otherwise available.
+
+Names contained within angle brackets are meant to be replaced with
+specific strings or values. For example, if you want to name your
+provider "PBS" then replace all occurrences of <providerName> with
+"PBS".
+
+There are five basic ingredients to a shell provider:
+
+ 1. A file that informs the job abstraction code about the existence
+ of the new provider (cog-provider.properties)
+
+ 2. A file that contains settings for the new provider
+ (provider-<providerName>.properties)
+
+ 3. A submit script that receives job information and is responsible
+ for starting the job.
+
+ 4. A cancel script that can be used to cancel a submitted job
+
+ 5. A status script that provides information about the status of
+ jobs
+
+For a sample implementation, see the 'examples' directory.
+
+=============
+= II. Setup =
+=============
+
+1. cog-provider.properties
+--------------------------
+
+The first step is creating the cog-provider.properties file. For each
+provider that you want to add, make sure that the file contains the
+following lines:
+
+ provider=<providerName>
+ sandbox=false
+ executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.shell.execution.TaskHandlerImpl
+ securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
+
+You can add multiple sets of these, one for each provider you want to
+implement. The cog-provider.properties file must be put in a directory
+(or .jar file) that is included in Java's class path at run-time. If
+using any of the CoG modules in a normal deployment or Swift, the
+dist/<moduleName>/etc directory is automatically included in the class
+path.
+
+If you wanted this provider to be available on a remote site accessed by
+coasters, you could put this file in a directory that would be added to
+the CLASSPATH environment variable in the shell profile (although at the
+time of this writing the author is not entirely sure that CLASSPATH is
+used by the coaster bootstrap process).
+
+
+2. provider-<providerName>.properties
+-------------------------------------
+
+This file must contain the following entries:
+
+ submit.command=<path to submit command>
+ cancel.command=<path to cancel command>
+ poll.command=<path to status command>
+
+Optionally, it could contain the following property:
+
+ poll.interval=<seconds>
+
+The poll interval indicates the interval at which the status of jobs is
+checked. The check is done regularly at this interval for all jobs
+sumbitted through this provider. The default is 5 seconds.
+
+
+3. The submit script
+--------------------
+
+This script is responsible for submitting the actual job. It receives
+input on STDIN. It must produce its normal output on STDOUT and if an
+error occurs, it should go on STDERR.
+
+Following are the syntactic rules for the input:
+ - empty lines must be ignored
+ - lines starting with the '#' character must also be ignored
+ - each other line is a "key=value" pair
+ - values can contain unescaped '=' symbols, so when parsing lines,
+ the first equal sign signifies the end of the key
+ - new lines are escaped using '\n' and backslashes are escaped
+ as '\\'; there are no other escape codes
+ - for certain keys, multiple lines with the same key can appear
+ in the input; this is the way to encode a list
+ - keys appear in the input in a well defined order; however, some
+ keys can be missing, indicating that the corresponding value is
+ not set
+
+The following keys (in the specified order) can appear in the input.
+Optional keys are marked with '?' (although no '?' will appear in the
+actual input to the submit script). Similarly, lists (i.e. cases when
+the key can appear zero or more times) are marked with '*'. Literals
+are single-quoted. The pipe ('|') represents mutually exclusive choices.
+
+?directory=<dir>
+ if specified, run the job in <dir>
+
+executable=<path>
+ this is the path to the executable
+
+*arg=<string>
+ one of the arguments to the executable
+
+?stdin.location='local'|'remote'
+ indicates that the executable should be fed a file to its STDIN and
+ whether this file is on the submit side ('local') or on the node
+ where the job will run ('remote'). If this key is present, then
+ the following key is mandatory.
+
+ Practically, if the file is located on a shared file system
+ accessible (and mounted in the same place) by both the submit
+ host and the compute node, then treating 'remote' as 'local'
+ and the other way around should produce no visible difference.
+
+ For the purpose of implementing a provider that can be used to
+ control coaster blocks, assuming a shared file system and
+ implementing this in the easiest way is a reasonable way to go.
+
+?stdin.path=<path>
+ The path to the file to be fed to the STDIN of the job.
+
+?stdout.location='local'|'remote'|'tmp'
+ specifies where the process STDOUT should be redirected. With
+ the exception of 'tmp', the same considerations as in the case
+ of stdin.location above apply.
+
+ 'tmp' is a special case. It says that the submit script must
+ re-direct the job stdout to some temporary file AND return
+ (on STDOUT) the path to this file in the following form:
+
+ stdout.path=<path>
+
+ if 'tmp' is specified, the following key (stdout.path) will
+ not be present
+
+?stdout.path=<path>
+ where to re-direct the job stdout if stdout.location is 'local'
+ or 'remote'
+
+?stderr.location='local'|'remote'|'tmp'
+ see stdout.location
+
+?stderr.path=<path>
+ see stdout.path
+
+*env.<name>=<value>
+ Specifies an environment variable that muts be available in the
+ job environment
+
+*attr.<name>=<value>
+ These are miscellaneous job attributes. Their interpretation
+ is up to the provider. Some example of attributes include
+ 'maxwalltime', 'queue', 'project', etc. Unknown attributes
+ should be ignored.
+
+ Coasters will set the following attributes:
+ maxwalltime=<minutes> - obvious
+ count=<number> - the number of process copies to start
+
+-----
+Small aside: file staging/cleanup is optional. If not implemented,
+an error should be produced.
+-----
+
+*stagein.source=<path>
+ Specifies a stage-in request. If present, it will be followed
+ by stagein.destination.
+ <path> represents the local file that must be copied to the
+ compute node.
+
+*stagein.destination=<path>
+ Specifies where (on the compute node) the most recent
+ stagein.source should be copied
+
+*stageout.source=<path>
+ Specifies a stage-out request. If present, it will be followed
+ by stageout.destination and stageout.mode.
+ <path> represents the remote file that must be copied from the
+ compute node to the submit node.
+
+*stageout.destination=<path>
+ Specifies where (on the submit node) the most recent
+ stageout.source should be copied
+
+*stagout.mode=<mode>
+ Specifies under what circumstances the most recent stagout
+ request should be fulfilled. The <mode> is a bit-wise OR
+ of the following possible values:
+
+ 1 - always: stage out the file whether the job succeeds or
+ not. If the file cannot be staged out, produce an error
+ 2 - if present: stage out the file only if it can be found
+ 3 - on error: only attempt to stage out the file if the job
+ fails; produce an error if the job failed and the file cannot
+ be staged-out
+ 4 - on success: like "on error" except if the job succeeds.
+
+*cleanup=<path>
+ A list of files and or directories to be deleted after the
+ job is done and the stage out process is completed. Reasonable
+ implementations could ensure that the cleanup paths are
+ limited to the job directory
+
+The submit script, after starting or queuing the job, must reply,
+on STDOUT, with "jobid=<jobid>", where <jobid> is a handle that
+can be used to subsequently identify the job for purposes of
+cancelling or querying its status. If the submission is successful,
+the implementation should return an exit code of 0.
+
+If the implementation finds that the specification is erroneous, or
+if any errors occur submitting the job, an error message should be
+printed on STDERR and an exit code != 0 should be produced.
+
+Here is an example:
+ STDIN:
+ directory=/tmp
+ executable=/bin/ls
+ arg=-al
+ arg=*.txt
+ stdout.location=tmp
+
+ A local execution submit script could perform the following:
+ cd /tmp
+ /bin/ls -al *.txt 1>/tmp/tempx0001 &
+ PID=$!
+ echo "stdout.path=/tmp/tempx0001"
+ echo "jobid=$PID"
+
+
+4. The cancel script
+--------------------
+
+The cancel script must cancel the job with the <jobid> passed to
+it as the firs command-line argument.
+
+
+5. The status script
+--------------------
+
+The status script implements a mechanism for querying the status
+of jobs. It receives a list of <jobid>s on the command line, one
+in each argument, and returns, if successful, on STDOUT, a list
+of lines, one for each job, of the following format:
+
+<jobid> 'Q'|'R'|'C'|'F' [<exitCode> [<errorMessage>]]
+
+The meaning of the second column is as follows:
+ 'Q': the job is queued
+ 'R': the job is running
+ 'C': the job is completed
+ optionally, an <exitCode> could be present
+ 'F': the job is failed
+ optionally, an <exitCoud> should be present, along possibly
+ with an <errorMessage>
+
+There is some level of ambiguity about the interpretation of non-zero
+exit codes. An implementation can chose to consider all jobs that
+do not run any more as completed, and use the exit code as an
+indication of whether the completion was successful or not. Such
+an implementation might reserve the failed state for cases in
+which the job has failed to run due to queuing system conditions,
+and provider, in <errorMessage> an appropriate explanation of the
+failure condition. Other implementations may chose to mark all jobs
+completed with non-zero exit codes as failed.
Index: modules/provider-localscheduler/examples/shell-provider/ststat
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/ststat (revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/ststat (revision 3918)
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+while [ "$1" != "" ]; do
+ if [ $(pidof $1) ]; then
+ echo $1 R
+ else
+ # Assume succes if the process is not running
+ # This is just a simple example
+ echo $1 C 0
+ fi
+ shift 1
+done
\ No newline at end of file
Property changes on: modules/provider-localscheduler/examples/shell-provider/ststat
___________________________________________________________________
Added: svn:executable
+ *
Index: modules/provider-localscheduler/examples/shell-provider/provider-shelltest.properties
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/provider-shelltest.properties (revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/provider-shelltest.properties (revision 3918)
@@ -0,0 +1,3 @@
+submit.command=/path/to/stsubmit
+cancel.command=/path/to/stcancel
+poll.command=/path/to/ststat
\ No newline at end of file
Index: modules/provider-localscheduler/examples/shell-provider/stsubmit
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/stsubmit (revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/stsubmit (revision 3918)
@@ -0,0 +1,92 @@
+#!/bin/bash
+
+
+rm -f /tmp/stsubmit
+EXECUTABLE=
+DIR=
+ARGS=
+STDOUTLOC=
+STDOUTPATH=
+STDERRLOC=
+STDERRPATH=
+STDINLOC=
+STDINPATH=
+STDIN=
+STDOUT=
+STDERR=
+
+while read LINE; do
+ echo $LINE >>/tmp/stsubmit
+ case $LINE in
+ executable=*)
+ EXECUTABLE=${LINE#executable=}
+ ;;
+ directory=*)
+ DIR=${LINE#directory=}
+ ;;
+ arg=*)
+ ARGS="$ARGS ${LINE#arg=}"
+ ;;
+ attr.*)
+ # ignore attributes
+ ;;
+ stdin.location=*)
+ STDINLOC=${LINE#stdin.location=}
+ ;;
+ stdin.path=*)
+ STDINPATH=${LINE#stdin.path=}
+ ;;
+ stdout.location=*)
+ STDOUTLOC=${LINE#stdout.location=}
+ ;;
+ stdout.path=*)
+ STDOUTPATH=${LINE#stdout.path=}
+ ;;
+ stderr.location=*)
+ STDERRLOC=${LINE#stderr.location=}
+ ;;
+ stderr.path=*)
+ STDERRPATH=${LINE#stderr.path=}
+ ;;
+ env.*)
+ LINE2=${LINE#env.}
+ # split on '='
+ ELS=(${LINE2//=/})
+ NAME=${ELS[0]}
+ VALUE=${ELS[1]}
+ export $NAME=$VALUE
+ ;;
+ *)
+ echo "Don't know how to interpret line: $LINE" >&2
+ exit 2
+ esac
+done < /dev/stdin
+
+if [ "$STDOUTLOC" == "tmp" ]; then
+ STDOUTPATH=$(mktemp)
+ echo "stdout.path=$STDOUTPATH"
+fi
+if [ "$STDOUTPATH" != "" ]; then
+ STDOUT="1> $STDOUTPATH"
+fi
+
+if [ "$STDERRLOC" == "tmp" ]; then
+ STDERRPATH=$(mktemp)
+ echo "stderr.path=$STDERRPATH"
+fi
+if [ "$STDERRPATH" != "" ]; then
+ STDERR="2> $STDERRPATH"
+fi
+
+if [ "$STDINLOC" != "" ]; then
+ STDIN="< $STDINLOC"
+fi
+
+CMD="$EXECUTABLE $ARGS $STDIN $STDOUT $STDERR"
+
+cd $DIR
+$CMD &
+
+echo "jobid=$!"
+
+exit 0
\ No newline at end of file
Property changes on: modules/provider-localscheduler/examples/shell-provider/stsubmit
___________________________________________________________________
Added: svn:executable
+ *
Index: modules/provider-localscheduler/examples/shell-provider/cog-provider.properties
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/cog-provider.properties (revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/cog-provider.properties (revision 3918)
@@ -0,0 +1,4 @@
+provider=shelltest
+sandbox=false
+executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.shell.execution.TaskHandlerImpl
+securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
Index: modules/provider-localscheduler/examples/shell-provider/stcancel
===================================================================
--- modules/provider-localscheduler/examples/shell-provider/stcancel (revision 0)
+++ modules/provider-localscheduler/examples/shell-provider/stcancel (revision 3918)
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+kill $1
Property changes on: modules/provider-localscheduler/examples/shell-provider/stcancel
___________________________________________________________________
Added: svn:executable
+ *
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/QueuePoller.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/QueuePoller.java (revision 3918)
@@ -0,0 +1,107 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.shell;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+
+public class QueuePoller extends AbstractQueuePoller {
+ public static final Logger logger = Logger.getLogger(QueuePoller.class);
+
+
+ public QueuePoller(Properties properties) {
+ super(properties.getName() + "provider queue poller", properties);
+ }
+
+ private static String[] CMDARRAY;
+
+ protected synchronized String[] getCMDArray() {
+ String[] cmda = new String[1 + getJobs().size()];
+ cmda[0] = getProperties().getPollCommand();
+ int i = 1;
+ for (Job j : getJobs().values()) {
+ cmda[i++] = j.getJobID();
+ }
+ return cmda;
+ }
+
+ protected void processStdout(InputStream is) throws IOException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ String line;
+ do {
+ line = br.readLine();
+ if (line != null) {
+ try {
+ String[] el = line.trim().split("\\s+", 4);
+ String jobid = el[0];
+ String message = null;
+
+ if (el.length > 3) {
+ message = el[3];
+ }
+
+ Job job = getJob(jobid);
+ if (job == null) {
+ logger.warn("Received status for an unknown job: '" + jobid + "'");
+ }
+
+ if (el[1].length() != 1) {
+ job.setMessage("Received unknown status code: '" + el[1] + "'");
+ job.setState(Job.STATE_FAILED);
+ }
+
+ char status = el[1].charAt(0);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + jobid + " is " + status);
+ }
+
+ if (el.length > 2) {
+ int exitCode = Integer.parseInt(el[2]);
+ job.setExitcode(exitCode);
+ }
+ if (status == 'Q') {
+ job.setState(Job.STATE_QUEUED);
+ }
+ else if (status == 'R') {
+ job.setState(Job.STATE_RUNNING);
+ }
+ else if (status == 'C') {
+ job.setState(Job.STATE_DONE);
+ }
+ else if (status == 'F') {
+ if (message != null) {
+ job.setMessage(message);
+ }
+ else {
+ job.setMessage("Job failed");
+ }
+ job.setState(Job.STATE_FAILED);
+ }
+
+ }
+ catch (Exception e) {
+ logger.warn("Exception caught while handling "
+ + getProperties().getPollCommandName()
+ + " output: " + line, e);
+ }
+ }
+ } while (line != null);
+ }
+
+ protected void processStderr(InputStream is) throws IOException {
+ // not used
+ }
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/TaskHandlerImpl.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/TaskHandlerImpl.java (revision 3918)
@@ -0,0 +1,50 @@
+
+// ----------------------------------------------------------------------
+// This code is developed as part of the Java CoG Kit project
+// The terms of the license can be found at http://www.cogkit.org/license
+// This message may not be removed or altered.
+// ----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.shell.execution;
+
+import org.globus.cog.abstraction.impl.scheduler.shell.Properties;
+import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
+
+public class TaskHandlerImpl extends
+ org.globus.cog.abstraction.impl.common.execution.TaskHandlerImpl {
+ private Properties props;
+ private String name;
+
+ protected DelegatedTaskHandler newDelegatedTaskHandler() {
+ return new JobSubmissionTaskHandler(props);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ if (this.name != null) {
+ throw new IllegalStateException("Name is already set to '" + name + "'");
+ }
+ this.name = name;
+ props = Properties.getProperties(name);
+ validateProperties();
+ }
+
+ private void validateProperties() {
+ if (props.getSubmitCommand() == null) {
+ throw new IllegalArgumentException("Invalid configuration for provider '" + name + "'. No submit command specified.");
+ }
+
+ if (props.getRemoveCommand() == null) {
+ throw new IllegalArgumentException("Invalid configuration for provider '" + name + "'. No cancel command specified.");
+ }
+
+ if (props.getPollCommand() == null) {
+ throw new IllegalArgumentException("Invalid configuration for provider '" + name + "'. No poll command specified.");
+ }
+ }
+
+
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/JobSubmissionTaskHandler.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/execution/JobSubmissionTaskHandler.java (revision 3918)
@@ -0,0 +1,26 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.shell.execution;
+
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractJobSubmissionTaskHandler;
+import org.globus.cog.abstraction.impl.scheduler.shell.Properties;
+import org.globus.cog.abstraction.impl.scheduler.shell.ShellExecutor;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class JobSubmissionTaskHandler extends AbstractJobSubmissionTaskHandler {
+ private Properties props;
+
+ public JobSubmissionTaskHandler(Properties props) {
+ this.props = props;
+ }
+
+ protected AbstractExecutor newExecutor(Task task,
+ AbstractJobSubmissionTaskHandler th) {
+ return new ShellExecutor(props, task, th);
+ }
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/ShellExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/ShellExecutor.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/ShellExecutor.java (revision 3918)
@@ -0,0 +1,345 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.shell;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.EnumSet;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.FileLocation;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.StagingSet;
+import org.globus.cog.abstraction.interfaces.StagingSetEntry;
+import org.globus.cog.abstraction.interfaces.StagingSetEntry.Mode;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.gsi.gssapi.auth.AuthorizationException;
+
+public class ShellExecutor extends AbstractExecutor {
+ public static final Logger logger = Logger.getLogger(ShellExecutor.class);
+
+ private Properties props;
+ private static volatile int idCounter;
+ private String name, jobid;
+ private boolean active;
+
+ public ShellExecutor(Properties props, Task task, ProcessListener listener) {
+ super(task, listener);
+ this.props = props;
+ name = "cog-" + IDF.format(idCounter++);
+ }
+
+ private static NumberFormat IDF = new DecimalFormat("000000");
+
+ protected void writeSpec(String name, String value, Writer wr)
+ throws IOException {
+ if (value != null) {
+ wr.write(name);
+ wr.write('=');
+ wr.write(String.valueOf(value));
+ wr.write('\n');
+ }
+ }
+
+ private static class ProcessStreams {
+ private Process process;
+ public Writer stdin;
+ private Reader stdout, stderr;
+ private StringBuilder out, err;
+
+ public ProcessStreams(Process process) {
+ this.process = process;
+ this.stdin = new OutputStreamWriter(process.getOutputStream());
+ this.stdout = new InputStreamReader(process.getInputStream());
+ this.stderr = new InputStreamReader(process.getErrorStream());
+ out = new StringBuilder();
+ err = new StringBuilder();
+ }
+ }
+
+ @Override
+ public void start() throws AuthorizationException, IOException, ProcessException {
+ Process p = Runtime.getRuntime().exec(props.getSubmitCommand());
+ ProcessStreams ps = new ProcessStreams(p);
+
+ JobSpecification spec = this.getSpec();
+
+ // Basic stuff
+ if (!spec.getExecutableLocation().equals(FileLocation.REMOTE)) {
+ throw new IllegalArgumentException("Only remote executables supported");
+ }
+ writeSpec("directory", escape(spec.getDirectory()), ps.stdin);
+ writeSpec("executable", escape(spec.getExecutable()), ps.stdin);
+ if (spec.getArgumentsAsList() != null) {
+ for (String arg : spec.getArgumentsAsList()) {
+ writeSpec("arg", escape(arg), ps.stdin);
+ }
+ }
+
+ // Standard streams
+ if (spec.getStdInputLocation().overlaps(FileLocation.MEMORY)) {
+ throw new IllegalArgumentException("In-memory STDIN not supported");
+ }
+
+ if (spec.getStdInput() != null) {
+ writeSpec("stdin.location", spec.getStdInputLocation().toString(), ps.stdin);
+ writeSpec("stdin.path", spec.getStdInput(), ps.stdin);
+ }
+
+ writeStandardFileSpec(spec.getStdOutput(), spec.getStdOutputLocation(), "stdout", ps);
+ writeStandardFileSpec(spec.getStdError(), spec.getStdErrorLocation(), "stderr", ps);
+
+ // Environment variables
+ for (String en : spec.getEnvironmentVariableNames()) {
+ writeSpec("env." + en, escape(spec.getEnvironmentVariable(en)), ps.stdin);
+ }
+
+ // Attributes
+ for (String attr : spec.getAttributeNames()) {
+ Object o = spec.getAttribute(attr);
+ if (o != null) {
+ writeSpec("attr." + attr, escape(o.toString()), ps.stdin);
+ }
+ }
+
+ // Staging stuff
+ writeStagingSpec(spec.getStageIn(), "stagein", false, ps);
+ writeStagingSpec(spec.getStageOut(), "stageout", true, ps);
+
+ // Cleanup
+ if (spec.getCleanUpSet() != null) {
+ for (String ce : spec.getCleanUpSet()) {
+ writeSpec("cleanup", escape(ce), ps.stdin);
+ }
+ }
+
+ ps.stdin.close();
+
+ try {
+ int ec = p.waitFor();
+ if (logger.isDebugEnabled()) {
+ logger.debug(props.getSubmitCommandName() + " done (exit code " + ec + ")");
+ }
+ if (ec != 0) {
+ throw new ProcessException(props.getSubmitCommand() + " failed: " + read(ps.stderr));
+ }
+ }
+ catch (InterruptedException e) {
+ throw new ProcessException(e);
+ }
+
+ parseOutput(ps.stdout);
+ active = true;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Submitted job with id '" + jobid + "'");
+ }
+ getQueuePoller().addJob(
+ job = createJob(jobid, stdout, spec.getStdOutputLocation(), stderr,
+ spec.getStdErrorLocation(), exitcode, this));
+ }
+
+
+ private void parseOutput(Reader r) throws IOException, ProcessException {
+ BufferedReader br = new BufferedReader(r);
+ String line;
+ while ((line = br.readLine()) != null) {
+ String[] els = line.split("=", 2);
+ if (els.length < 2) {
+ throw new ProcessException("Invalid output from '" +
+ props.getSubmitCommand() + "': " + line);
+ }
+ if ("jobid".equals(els[0])) {
+ jobid = els[1];
+ if ("".equals(jobid)) {
+ throw new ProcessException("Received empty job id from '" +
+ props.getSubmitCommand() + "'");
+ }
+ }
+ else if ("stdout.path".equals(els[0])) {
+ stdout = els[1];
+ }
+ else if ("stderr.path".equals(els[0])) {
+ stderr = els[1];
+ }
+ else {
+ throw new ProcessException("Invalid output from '" +
+ props.getSubmitCommand() + "': " + line);
+ }
+ }
+ if (jobid == null) {
+ throw new ProcessException("No job id received from '" + props.getSubmitCommand() + "'");
+ }
+ }
+
+ @Override
+ public void processCompleted(int exitCode) {
+ active = false;
+ super.processCompleted(exitCode);
+ }
+
+
+ @Override
+ public void processFailed(String message) {
+ active = false;
+ super.processFailed(message);
+ }
+
+
+ @Override
+ public void processFailed(Exception e) {
+ active = false;
+ super.processFailed(e);
+ }
+
+
+ public void cancel() throws TaskSubmissionException {
+ if (!active) {
+ throw new TaskSubmissionException("Can only cancel an active task");
+ }
+ String[] cmdline = new String[] { getProperties().getRemoveCommand(), jobid };
+ try {
+ logger.debug("Canceling job: jobid=" + jobid);
+ Process process = Runtime.getRuntime().exec(cmdline, null, null);
+ int ec = process.waitFor();
+ if (ec != 0) {
+ throw new TaskSubmissionException("Failed to cancel task: " +
+ read(new InputStreamReader(process.getInputStream())));
+ }
+ }
+ catch (InterruptedException e) {
+ throw new TaskSubmissionException(
+ "Thread interrupted while waiting for "
+ + getProperties().getRemoveCommandName() + " to finish");
+ }
+ catch (IOException e) {
+ throw new TaskSubmissionException("Failed to cancel task", e);
+ }
+ }
+
+ private String escape(String value) {
+ if (value == null) {
+ return null;
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < value.length(); i++) {
+ char c = value.charAt(i);
+ if (c == '\n' || c == '\\') {
+ sb.append("\\");
+ if (c == '\n') {
+ sb.append('n');
+ }
+ else {
+ sb.append('\\');
+ }
+ }
+ else {
+ sb.append(c);
+ }
+ }
+ return sb.toString();
+ }
+
+ private String read(Reader r) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ BufferedReader br = new BufferedReader(r);
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ sb.append('\n');
+ line = br.readLine();
+ }
+ return sb.toString();
+ }
+
+ private void writeStagingSpec(StagingSet ss, String name, boolean modes, ProcessStreams pc) throws IOException {
+ if (ss == null || ss.isEmpty()) {
+ return;
+ }
+ for (StagingSetEntry se : ss) {
+ writeSpec(name + ".source", escape(se.getSource()), pc.stdin);
+ writeSpec(name + ".destination", escape(se.getDestination()), pc.stdin);
+ if (modes) {
+ writeSpec(name + ".mode", stagingModeToStr(se.getMode()), pc.stdin);
+ }
+ }
+ }
+
+ private String stagingModeToStr(EnumSet<Mode> mode) {
+ int s = 0;
+ for (Mode m : mode) {
+ s += m.getId();
+ }
+ return String.valueOf(s);
+ }
+
+ private void writeStandardFileSpec(String path, FileLocation loc, String name, ProcessStreams pc)
+ throws IOException {
+ if (path == null) {
+ if (loc.overlaps(FileLocation.MEMORY)) {
+ writeSpec(name + ".location", "tmp", pc.stdin);
+ }
+ }
+ else {
+ writeSpec("stdout.location", loc.remove(FileLocation.MEMORY).toString(), pc.stdin);
+ writeSpec("stdout.path", path, pc.stdin);
+ }
+ }
+
+ @Override
+ protected void writeScript(Writer wr, String exitcode, String stdout, String stderr)
+ throws IOException {
+ // not used
+ }
+
+ @Override
+ protected String getName() {
+ return "Shell";
+ }
+
+ @Override
+ protected AbstractProperties getProperties() {
+ return props;
+ }
+
+ @Override
+ protected Job createJob(String jobid, String stdout,
+ FileLocation stdOutputLocation, String stderr,
+ FileLocation stdErrorLocation, String exitcode,
+ AbstractExecutor executor) {
+ return new Job(jobid, stdout, stdOutputLocation, stderr,
+ stdErrorLocation, exitcode, executor);
+ }
+
+ private static QueuePoller poller;
+
+ @Override
+ protected AbstractQueuePoller getQueuePoller() {
+ synchronized(ShellExecutor.class) {
+ if (poller == null) {
+ poller = new QueuePoller(props);
+ poller.start();
+ }
+ return poller;
+ }
+ }
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/Properties.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/shell/Properties.java (revision 3918)
@@ -0,0 +1,54 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 20, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.shell;
+
+// import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+
+public class Properties extends AbstractProperties {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String POLL_INTERVAL = "poll.interval";
+ public static final String SUBMIT_COMMAND = "submit.command";
+ public static final String POLL_COMMAND = "poll.command";
+ public static final String CANCEL_COMMAND = "cancel.command";
+ public static final String NAME = "name";
+
+ public static synchronized Properties getProperties(String name) {
+ Properties properties = new Properties();
+ properties.load("provider-" + name + ".properties");
+ properties.put(NAME, name);
+ return properties;
+ }
+
+ protected void setDefaults() {
+ setPollInterval(5);
+ }
+
+
+ public String getPollCommandName() {
+ return POLL_COMMAND;
+ }
+
+
+ public String getRemoveCommandName() {
+ return CANCEL_COMMAND;
+ }
+
+
+ public String getSubmitCommandName() {
+ return SUBMIT_COMMAND;
+ }
+
+ public String getName() {
+ return getProperty(NAME);
+ }
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/execution/TaskHandlerImpl.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/execution/TaskHandlerImpl.java (working copy)
@@ -15,7 +15,7 @@
return new JobSubmissionTaskHandler();
}
- protected String getName() {
+ public String getName() {
return "Slurm";
}
}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java (working copy)
@@ -22,12 +22,7 @@
return new JobSubmissionTaskHandler();
}
- protected String getName() {
+ public String getName() {
return "Cobalt";
}
-
- public String toString()
- {
- return "TaskHandlerImpl(execution cobalt)";
- }
}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/execution/TaskHandlerImpl.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/condor/execution/TaskHandlerImpl.java (working copy)
@@ -22,7 +22,7 @@
return new JobSubmissionTaskHandler();
}
- protected String getName() {
+ public String getName() {
return "Condor";
}
}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java (working copy)
@@ -21,12 +21,7 @@
return new JobSubmissionTaskHandler();
}
- protected String getName() {
+ public String getName() {
return "LSF";
}
-
- public String toString()
- {
- return "TaskHandlerImpl(execution LSF)";
- }
}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/TaskHandlerImpl.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/TaskHandlerImpl.java (working copy)
@@ -22,7 +22,7 @@
return new JobSubmissionTaskHandler();
}
- protected String getName() {
+ public String getName() {
return "PBS";
}
}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractExecutor.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractExecutor.java (working copy)
@@ -61,11 +61,10 @@
validate(task);
}
- public void start()
- throws AuthorizationException, IOException, ProcessException {
+ public void start() throws AuthorizationException, IOException, ProcessException {
String scriptdirPath = System.getProperty("script.dir");
- if(scriptdirPath == null) {
+ if (scriptdirPath == null) {
scriptdirPath = System.getProperty("user.home")
+ File.separatorChar + ".globus" + File.separatorChar
+ "scripts";
@@ -107,6 +106,10 @@
try {
int code = process.waitFor();
+ if (logger.isDebugEnabled()) {
+ logger.debug(getProperties().getSubmitCommandName()
+ + " done (exit code " + code + ")");
+ }
if (code != 0) {
String errorText = getOutput(process.getInputStream())
+ getOutput(process.getErrorStream());
@@ -115,10 +118,6 @@
+ " reported an exit code of " + code + "). "
+ errorText);
}
- if (logger.isDebugEnabled()) {
- logger.debug(getProperties().getSubmitCommandName()
- + " done (exit code " + code + ")");
- }
}
catch (InterruptedException e) {
if (logger.isDebugEnabled()) {
@@ -126,8 +125,7 @@
+ getProperties().getSubmitCommandName(), e);
}
if (listener != null) {
- listener
- .processFailed("The submission process was interrupted");
+ listener.processFailed("The submission process was interrupted");
}
}
@@ -268,8 +266,7 @@
}
protected abstract void writeScript(Writer wr, String exitcode,
- String stdout, String stderr)
- throws IOException;
+ String stdout, String stderr) throws IOException;
protected JobSpecification getSpec() {
return spec;
@@ -403,8 +400,7 @@
@param list May be null or empty
@throws IOException
*/
- protected void writeQuotedList(Writer writer, List<String> list)
- throws IOException
+ protected void writeQuotedList(Writer writer, List<String> list) throws IOException
{
if (list != null && list.size() > 0) {
writer.write(' ');
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractQueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractQueuePoller.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractQueuePoller.java (working copy)
@@ -15,7 +15,6 @@
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
@@ -28,7 +27,8 @@
public static final int MAX_CONSECUTIVE_FAILURES = 3;
private String name;
- private LinkedList newjobs, donejobs;
+ private LinkedList<Job> newjobs;
+ private LinkedList<String> donejobs;
private Map<String, Job> jobs;
boolean any = false;
private int sleepTime;
@@ -45,8 +45,8 @@
this.properties = properties;
this.sleepTime = properties.getPollInterval() * 1000;
jobs = new HashMap<String, Job>();
- newjobs = new LinkedList();
- donejobs = new LinkedList();
+ newjobs = new LinkedList<Job>();
+ donejobs = new LinkedList<String>();
}
public void start() {
@@ -102,7 +102,7 @@
else {
synchronized (newjobs) {
while (!newjobs.isEmpty()) {
- Job job = (Job) newjobs.removeFirst();
+ Job job = newjobs.removeFirst();
jobs.put(job.getJobID(), job);
}
}
@@ -115,7 +115,7 @@
}
else {
while (!donejobs.isEmpty()) {
- String jobid = (String) donejobs.removeFirst();
+ String jobid = donejobs.removeFirst();
removeDoneJob(jobid);
}
}
@@ -137,9 +137,7 @@
if (logger.isDebugEnabled()) {
logger.debug("Fail all: " + message);
}
- Iterator i = jobs.values().iterator();
- while (i.hasNext()) {
- Job job = (Job) i.next();
+ for (Job job : jobs.values()) {
try {
job.fail(message);
}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java (working copy)
@@ -24,11 +24,12 @@
public static final int STATE_QUEUED = 1;
public static final int STATE_RUNNING = 2;
public static final int STATE_DONE = 3;
+ public static final int STATE_FAILED = 5;
public static final int STATE_UNKNOWN = 4;
private static final int NO_EXITCODE = -1;
- private String jobID, location;
+ private String jobID, location, message;
private String exitcodeFileName;
private String stdout, stderr;
private FileLocation outLoc, errLoc;
@@ -50,8 +51,12 @@
this.ticks = 0;
this.exitcode = NO_EXITCODE;
}
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
- public boolean close() {
+ public boolean close(int tentativeState) {
if (logger.isDebugEnabled()) {
logger.debug("Closing " + jobID);
}
@@ -63,9 +68,8 @@
f = new File(exitcodeFileName);
if (f != null && !f.exists()) {
if (ticks == 5) {
- listener
- .processFailed(new ProcessException(
- "Exitcode file (" + exitcodeFileName + ") not found 5 queue polls after the job was reported done"));
+ listener.processFailed(new ProcessException(
+ "Exitcode file (" + exitcodeFileName + ") not found 5 queue polls after the job was reported done"));
return true;
}
else {
@@ -78,11 +82,11 @@
}
}
- processExitCode();
+ processExitCode(tentativeState);
return true;
}
- protected boolean processExitCode() {
+ protected boolean processExitCode(int tentativeState) {
if (logger.isDebugEnabled()) {
logger.debug("Processing exit code for job " + jobID);
}
@@ -101,7 +105,17 @@
if (logger.isDebugEnabled()) {
logger.debug("Exit code: " + exitcode);
}
- listener.processCompleted(exitcode);
+ if (tentativeState == STATE_FAILED) {
+ if (message == null) {
+ listener.processFailed("Job failed (exit code: " + exitcode + ")");
+ }
+ else {
+ listener.processFailed(message);
+ }
+ }
+ else {
+ listener.processCompleted(exitcode);
+ }
return true;
}
catch (Exception e) {
@@ -155,7 +169,7 @@
}
public synchronized void await() {
- while (state != STATE_DONE) {
+ while (state != STATE_DONE && state != STATE_FAILED) {
try {
wait();
}
@@ -173,9 +187,8 @@
return;
}
else {
- if (state == STATE_DONE) {
- if (close()) {
- this.state = STATE_DONE;
+ if (state == STATE_DONE || state == STATE_FAILED) {
+ if (close(state)) {
synchronized (this) {
notify();
}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractJobSubmissionTaskHandler.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractJobSubmissionTaskHandler.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/AbstractJobSubmissionTaskHandler.java (working copy)
@@ -14,7 +14,6 @@
import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
import org.globus.cog.abstraction.impl.common.task.InvalidServiceContactException;
import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
-import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Status;
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/execution/TaskHandlerImpl.java (revision 3917)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/execution/TaskHandlerImpl.java (working copy)
@@ -22,7 +22,7 @@
return new JobSubmissionTaskHandler();
}
- protected String getName() {
+ public String getName() {
return "SGE";
}
}
\ No newline at end of file
More information about the Swift-commit
mailing list