[Swift-commit] r2508 - in provider-wonky: . src/org/globus/cog/abstraction/impl/execution/wonky
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Fri Feb 6 20:54:28 CST 2009
Author: hategan
Date: 2009-02-06 20:54:27 -0600 (Fri, 06 Feb 2009)
New Revision: 2508
Modified:
provider-wonky/project.properties
provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java
Log:
cancel message; cleanup; updated version; option to lose completion notification
Modified: provider-wonky/project.properties
===================================================================
--- provider-wonky/project.properties 2009-02-07 02:13:49 UTC (rev 2507)
+++ provider-wonky/project.properties 2009-02-07 02:54:27 UTC (rev 2508)
@@ -1,6 +1,6 @@
module.name = provider-wonky
long.name = Local provider, but wonky
-version = 0.1
+version = 0.2
project = Java CoG Kit
lib.deps = -
debug = true
Modified: provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java
===================================================================
--- provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java 2009-02-07 02:13:49 UTC (rev 2507)
+++ provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java 2009-02-07 02:54:27 UTC (rev 2508)
@@ -24,6 +24,7 @@
import java.util.Random;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.AbstractDelegatedTaskHandler;
import org.globus.cog.abstraction.impl.common.StatusImpl;
import org.globus.cog.abstraction.impl.common.execution.JobException;
import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
@@ -32,14 +33,13 @@
import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
import org.globus.cog.abstraction.impl.common.util.NullOutputStream;
import org.globus.cog.abstraction.impl.common.util.OutputStreamMultiplexer;
-import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.ServiceContact;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
-public class JobSubmissionTaskHandler implements DelegatedTaskHandler,
+public class JobSubmissionTaskHandler extends AbstractDelegatedTaskHandler implements
Runnable {
private static Logger logger = Logger
.getLogger(JobSubmissionTaskHandler.class);
@@ -58,7 +58,6 @@
public static final int BUFFER_SIZE = 1024;
- private Task task = null;
private Thread thread = null;
private Process process;
private volatile boolean killed;
@@ -66,81 +65,72 @@
public void submit(Task task) throws IllegalSpecException,
InvalidSecurityContextException, InvalidServiceContactException,
TaskSubmissionException {
- if (this.task != null) {
- throw new TaskSubmissionException(
- "JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
- }
- else if (task.getStatus().getStatusCode() != Status.UNSUBMITTED) {
- throw new TaskSubmissionException("Task is not in unsubmitted state");
- }
- else {
- logger.info("Submitting wonky job "+jobNumber);
- this.task = task;
+ checkAndSetTask(task);
+ logger.info("Submitting wonky job "+jobNumber);
- ServiceContact serviceContact = this.task.getService(0)
- .getServiceContact();
- String server = serviceContact.getContact();
+ ServiceContact serviceContact = task.getService(0)
+ .getServiceContact();
+ String server = serviceContact.getContact();
- logger.debug("Contact is: "+server);
- String[] splitServer = server.split("/");
- siteName = splitServer[0];
- siteOptions = Arrays.asList(splitServer);
+ logger.debug("Contact is: "+server);
+ String[] splitServer = server.split("/");
+ siteName = splitServer[0];
+ siteOptions = Arrays.asList(splitServer);
- if(!failDelay("submitting")) {
- this.task.setStatus(Status.FAILED);
- return;
- }
- task.setStatus(Status.SUBMITTING);
- JobSpecification spec;
- try {
- spec = (JobSpecification) this.task.getSpecification();
- } catch (Exception e) {
- throw new IllegalSpecException(
- "Exception while retrieving Job Specification", e);
- }
- if (logger.isDebugEnabled()) {
- logger.debug(spec.toString());
- }
+ if(!failDelay("submitting")) {
+ task.setStatus(Status.FAILED);
+ return;
+ }
+ task.setStatus(Status.SUBMITTING);
+ JobSpecification spec;
+ try {
+ spec = (JobSpecification) task.getSpecification();
+ } catch (Exception e) {
+ throw new IllegalSpecException(
+ "Exception while retrieving Job Specification", e);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(spec.toString());
+ }
- Collection attributeNames = spec.getAttributeNames();
- Iterator attributeIterator = attributeNames.iterator();
+ Collection attributeNames = spec.getAttributeNames();
+ Iterator attributeIterator = attributeNames.iterator();
- if(siteOptions.contains("strictattr")) {
- logger.debug("Strict attribute checking is enabled");
- while(attributeIterator.hasNext()) {
- String attrName = (String)attributeIterator.next();
- if(!siteOptions.contains("permitattr=" + attrName)) {
- logger.error("Job specification attribute was passed that should not have been: "+attrName);
- this.task.setStatus(Status.FAILED);
- return;
- }
+ if(siteOptions.contains("strictattr")) {
+ logger.debug("Strict attribute checking is enabled");
+ while(attributeIterator.hasNext()) {
+ String attrName = (String)attributeIterator.next();
+ if(!siteOptions.contains("permitattr=" + attrName)) {
+ logger.error("Job specification attribute was passed that should not have been: "+attrName);
+ task.setStatus(Status.FAILED);
+ return;
}
}
+ }
- try {
- if (logger.isInfoEnabled()) {
- logger.info("Submitting task " + task);
- }
- synchronized(this) {
- this.thread = new Thread(this);
- if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
- if(!failDelay("submitted")) {
- this.task.setStatus(Status.FAILED);
- return;
- }
- this.task.setStatus(Status.SUBMITTED);
- this.thread.start();
- if (spec.isBatchJob()) {
- this.task.setStatus(Status.COMPLETED);
- }
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("Submitting task " + task);
+ }
+ synchronized(this) {
+ this.thread = new Thread(this);
+ if (task.getStatus().getStatusCode() != Status.CANCELED) {
+ if(!failDelay("submitted")) {
+ task.setStatus(Status.FAILED);
+ return;
}
+ task.setStatus(Status.SUBMITTED);
+ this.thread.start();
+ if (spec.isBatchJob()) {
+ task.setStatus(Status.COMPLETED);
+ }
}
- } catch (Exception e) {
- throw new TaskSubmissionException("Cannot submit job", e);
}
+ } catch (Exception e) {
+ throw new TaskSubmissionException("Cannot submit job", e);
}
}
@@ -154,13 +144,13 @@
logger.info("Resume called on wonky job "+jobNumber);
}
- public void cancel() throws InvalidSecurityContextException,
+ public void cancel(String message) throws InvalidSecurityContextException,
TaskSubmissionException {
logger.info("Cancel called on wonky job "+jobNumber);
synchronized(this) {
killed = true;
process.destroy();
- this.task.setStatus(Status.CANCELED);
+ getTask().setStatus(new StatusImpl(Status.CANCELED, message, null));
}
}
@@ -169,7 +159,7 @@
public void run() {
try {
- JobSpecification spec = (JobSpecification) this.task
+ JobSpecification spec = (JobSpecification) getTask()
.getSpecification();
File dir = null;
@@ -210,10 +200,10 @@
logger.info("Wonky job running now");
process = Runtime.getRuntime().exec(cmdarray, buildEnvp(spec), dir);
if(!failDelay("active")) {
- this.task.setStatus(Status.FAILED);
+ getTask().setStatus(Status.FAILED);
return;
}
- this.task.setStatus(Status.ACTIVE);
+ getTask().setStatus(Status.ACTIVE);
// reusable byte buffer
byte[] buf = new byte[BUFFER_SIZE];
@@ -232,7 +222,7 @@
if (logger.isDebugEnabled()) {
logger.debug("STDOUT from job: " + out);
}
- this.task.setStdOutput(out);
+ getTask().setStdOutput(out);
}
}
@@ -244,7 +234,7 @@
if (logger.isDebugEnabled()) {
logger.debug("STDERR from job: " + err);
}
- this.task.setStdError(err);
+ getTask().setStdError(err);
}
}
@@ -262,13 +252,18 @@
if(siteOptions.contains("nofailonexit")) {
// suppress failures caused by exit code
- this.task.setStatus(Status.COMPLETED);
+ getTask().setStatus(Status.COMPLETED);
} else { // normal fail behaviour
if (exitCode == 0) {
if(failDelay("completed")) {
- this.task.setStatus(Status.COMPLETED);
+ if (siteOptions.contains("runawayjob") && jobNumber == 0) {
+ System.out.println("Wonky job completion notification gets lost");
+ }
+ else {
+ getTask().setStatus(Status.COMPLETED);
+ }
} else {
- this.task.setStatus(Status.FAILED);
+ getTask().setStatus(Status.FAILED);
}
} else {
throw new JobException(exitCode);
@@ -281,12 +276,7 @@
if (logger.isDebugEnabled()) {
logger.debug("Exception while running local executable", e);
}
- Status newStatus = new StatusImpl();
- Status oldStatus = this.task.getStatus();
- newStatus.setPrevStatusCode(oldStatus.getStatusCode());
- newStatus.setStatusCode(Status.FAILED);
- newStatus.setException(e);
- this.task.setStatus(newStatus);
+ failTask(null, e);
}
}
More information about the Swift-commit
mailing list