[Swift-commit] r2508 - in provider-wonky: . src/org/globus/cog/abstraction/impl/execution/wonky

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Fri Feb 6 20:54:28 CST 2009


Author: hategan
Date: 2009-02-06 20:54:27 -0600 (Fri, 06 Feb 2009)
New Revision: 2508

Modified:
   provider-wonky/project.properties
   provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java
Log:
cancel message; cleanup; updated version; option to lose completion notification

Modified: provider-wonky/project.properties
===================================================================
--- provider-wonky/project.properties	2009-02-07 02:13:49 UTC (rev 2507)
+++ provider-wonky/project.properties	2009-02-07 02:54:27 UTC (rev 2508)
@@ -1,6 +1,6 @@
 module.name 		= provider-wonky
 long.name 		= Local provider, but wonky
-version			= 0.1
+version			= 0.2
 project			= Java CoG Kit
 lib.deps		= -
 debug			= true

Modified: provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java
===================================================================
--- provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java	2009-02-07 02:13:49 UTC (rev 2507)
+++ provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java	2009-02-07 02:54:27 UTC (rev 2508)
@@ -24,6 +24,7 @@
 import java.util.Random;
 
 import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.AbstractDelegatedTaskHandler;
 import org.globus.cog.abstraction.impl.common.StatusImpl;
 import org.globus.cog.abstraction.impl.common.execution.JobException;
 import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
@@ -32,14 +33,13 @@
 import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
 import org.globus.cog.abstraction.impl.common.util.NullOutputStream;
 import org.globus.cog.abstraction.impl.common.util.OutputStreamMultiplexer;
-import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
 import org.globus.cog.abstraction.interfaces.FileLocation;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
 import org.globus.cog.abstraction.interfaces.ServiceContact;
 import org.globus.cog.abstraction.interfaces.Status;
 import org.globus.cog.abstraction.interfaces.Task;
 
-public class JobSubmissionTaskHandler implements DelegatedTaskHandler,
+public class JobSubmissionTaskHandler extends AbstractDelegatedTaskHandler implements 
         Runnable {
     private static Logger logger = Logger
             .getLogger(JobSubmissionTaskHandler.class);
@@ -58,7 +58,6 @@
 
     public static final int BUFFER_SIZE = 1024;
 
-    private Task task = null;
     private Thread thread = null;
     private Process process;
     private volatile boolean killed;
@@ -66,81 +65,72 @@
     public void submit(Task task) throws IllegalSpecException,
             InvalidSecurityContextException, InvalidServiceContactException,
             TaskSubmissionException {
-        if (this.task != null) {
-            throw new TaskSubmissionException(
-                    "JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
-        }
-        else if (task.getStatus().getStatusCode() != Status.UNSUBMITTED) {
-            throw new TaskSubmissionException("Task is not in unsubmitted state");
-        }
-        else {
-            logger.info("Submitting wonky job "+jobNumber);
-            this.task = task;
+        checkAndSetTask(task);
+        logger.info("Submitting wonky job "+jobNumber);
 
 
-            ServiceContact serviceContact = this.task.getService(0)
-                .getServiceContact();
-            String server = serviceContact.getContact();
+        ServiceContact serviceContact = task.getService(0)
+            .getServiceContact();
+        String server = serviceContact.getContact();
 
-            logger.debug("Contact is: "+server);
-            String[] splitServer = server.split("/");
-	    siteName = splitServer[0];
-            siteOptions = Arrays.asList(splitServer);
+        logger.debug("Contact is: "+server);
+        String[] splitServer = server.split("/");
+    siteName = splitServer[0];
+        siteOptions = Arrays.asList(splitServer);
 
 
 
-            if(!failDelay("submitting")) {
-                this.task.setStatus(Status.FAILED);
-                return;
-            }
-            task.setStatus(Status.SUBMITTING);
-            JobSpecification spec;
-            try {
-                spec = (JobSpecification) this.task.getSpecification();
-            } catch (Exception e) {
-                throw new IllegalSpecException(
-                        "Exception while retrieving Job Specification", e);
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug(spec.toString());
-            }
+        if(!failDelay("submitting")) {
+            task.setStatus(Status.FAILED);
+            return;
+        }
+        task.setStatus(Status.SUBMITTING);
+        JobSpecification spec;
+        try {
+            spec = (JobSpecification) task.getSpecification();
+        } catch (Exception e) {
+            throw new IllegalSpecException(
+                    "Exception while retrieving Job Specification", e);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug(spec.toString());
+        }
 
-            Collection attributeNames = spec.getAttributeNames();
-            Iterator attributeIterator = attributeNames.iterator();
+        Collection attributeNames = spec.getAttributeNames();
+        Iterator attributeIterator = attributeNames.iterator();
 
-            if(siteOptions.contains("strictattr")) {
-                logger.debug("Strict attribute checking is enabled");
-                while(attributeIterator.hasNext()) {
-                    String attrName = (String)attributeIterator.next();  
-                    if(!siteOptions.contains("permitattr=" + attrName)) {
-                        logger.error("Job specification attribute was passed that should not have been: "+attrName);
-                        this.task.setStatus(Status.FAILED);
-                        return;
-                    }
+        if(siteOptions.contains("strictattr")) {
+            logger.debug("Strict attribute checking is enabled");
+            while(attributeIterator.hasNext()) {
+                String attrName = (String)attributeIterator.next();  
+                if(!siteOptions.contains("permitattr=" + attrName)) {
+                    logger.error("Job specification attribute was passed that should not have been: "+attrName);
+                    task.setStatus(Status.FAILED);
+                    return;
                 }
             }
+        }
 
-            try {
-                if (logger.isInfoEnabled()) {
-                    logger.info("Submitting task " + task);
-                }
-                synchronized(this) {
-                    this.thread = new Thread(this);
-                    if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
-                        if(!failDelay("submitted")) {
-                            this.task.setStatus(Status.FAILED);
-                            return;
-                        }
-                        this.task.setStatus(Status.SUBMITTED);
-                        this.thread.start();
-                        if (spec.isBatchJob()) {
-                            this.task.setStatus(Status.COMPLETED);
-                        }
+        try {
+            if (logger.isInfoEnabled()) {
+                logger.info("Submitting task " + task);
+            }
+            synchronized(this) {
+                this.thread = new Thread(this);
+                if (task.getStatus().getStatusCode() != Status.CANCELED) {
+                    if(!failDelay("submitted")) {
+                        task.setStatus(Status.FAILED);
+                        return;
                     }
+                    task.setStatus(Status.SUBMITTED);
+                    this.thread.start();
+                    if (spec.isBatchJob()) {
+                        task.setStatus(Status.COMPLETED);
+                    }
                 }
-            } catch (Exception e) {
-                throw new TaskSubmissionException("Cannot submit job", e);
             }
+        } catch (Exception e) {
+            throw new TaskSubmissionException("Cannot submit job", e);
         }
     }
 
@@ -154,13 +144,13 @@
         logger.info("Resume called on wonky job "+jobNumber);
     }
 
-    public void cancel() throws InvalidSecurityContextException,
+    public void cancel(String message) throws InvalidSecurityContextException,
             TaskSubmissionException {
         logger.info("Cancel called on wonky job "+jobNumber);
         synchronized(this) {
             killed = true;
             process.destroy();
-            this.task.setStatus(Status.CANCELED);
+            getTask().setStatus(new StatusImpl(Status.CANCELED, message, null));
         }
     }
 
@@ -169,7 +159,7 @@
 
     public void run() {
         try {
-            JobSpecification spec = (JobSpecification) this.task
+            JobSpecification spec = (JobSpecification) getTask()
                     .getSpecification();
 
             File dir = null;
@@ -210,10 +200,10 @@
             logger.info("Wonky job running now");
             process = Runtime.getRuntime().exec(cmdarray, buildEnvp(spec), dir);
             if(!failDelay("active")) {
-                this.task.setStatus(Status.FAILED);
+                getTask().setStatus(Status.FAILED);
                 return;
             }
-            this.task.setStatus(Status.ACTIVE);
+            getTask().setStatus(Status.ACTIVE);
 
             // reusable byte buffer
             byte[] buf = new byte[BUFFER_SIZE];
@@ -232,7 +222,7 @@
                     if (logger.isDebugEnabled()) {
                         logger.debug("STDOUT from job: " + out);
                     }
-                    this.task.setStdOutput(out);
+                    getTask().setStdOutput(out);
                 }
             }
 
@@ -244,7 +234,7 @@
                     if (logger.isDebugEnabled()) {
                         logger.debug("STDERR from job: " + err);
                     }
-                    this.task.setStdError(err);
+                    getTask().setStdError(err);
                 }
             }
 
@@ -262,13 +252,18 @@
 
             if(siteOptions.contains("nofailonexit"))  {
                 // suppress failures caused by exit code
-                this.task.setStatus(Status.COMPLETED);
+                getTask().setStatus(Status.COMPLETED);
             } else { // normal fail behaviour
                 if (exitCode == 0) {
                     if(failDelay("completed")) {
-                        this.task.setStatus(Status.COMPLETED);
+                        if (siteOptions.contains("runawayjob") && jobNumber == 0) {
+                            System.out.println("Wonky job completion notification gets lost");
+                        }
+                        else {
+                            getTask().setStatus(Status.COMPLETED);
+                        }
                     } else {
-                        this.task.setStatus(Status.FAILED);
+                        getTask().setStatus(Status.FAILED);
                     }
                 } else {
                     throw new JobException(exitCode);
@@ -281,12 +276,7 @@
             if (logger.isDebugEnabled()) {
                 logger.debug("Exception while running local executable", e);
             }
-            Status newStatus = new StatusImpl();
-            Status oldStatus = this.task.getStatus();
-            newStatus.setPrevStatusCode(oldStatus.getStatusCode());
-            newStatus.setStatusCode(Status.FAILED);
-            newStatus.setException(e);
-            this.task.setStatus(newStatus);
+            failTask(null, e);
         }
     }
 




More information about the Swift-commit mailing list