[Swift-commit] r2075 - provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Mon Jun 16 09:35:09 CDT 2008


Author: benc
Date: 2008-06-16 09:35:08 -0500 (Mon, 16 Jun 2008)
New Revision: 2075

Modified:
   provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java
Log:
more delay parameters

Modified: provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java
===================================================================
--- provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java	2008-06-15 23:45:15 UTC (rev 2074)
+++ provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java	2008-06-16 14:35:08 UTC (rev 2075)
@@ -13,9 +13,13 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.impl.common.StatusImpl;
@@ -29,22 +33,26 @@
 import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
 import org.globus.cog.abstraction.interfaces.FileLocation;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.ServiceContact;
 import org.globus.cog.abstraction.interfaces.Status;
 import org.globus.cog.abstraction.interfaces.Task;
 
-/**
- * @author Kaizar Amin (amin at mcs.anl.gov)
- * 
- */
 public class JobSubmissionTaskHandler implements DelegatedTaskHandler,
         Runnable {
     private static Logger logger = Logger
             .getLogger(JobSubmissionTaskHandler.class);
 
-static boolean firstJob = true;
+static Random r = new Random();
+
+static List firstJobList = Collections.synchronizedList(new ArrayList());
+
 static int globalJobNumber = 0;
-int jobNumber = globalJobNumber++; // racy
+int jobNumber = globalJobNumber++; // racy?
 
+List siteOptions;
+
+String siteName;
+
     public static final int BUFFER_SIZE = 1024;
 
     private Task task = null;
@@ -63,8 +71,25 @@
             throw new TaskSubmissionException("Task is not in unsubmitted state");
         }
         else {
-System.out.println("Submitting wonky job "+jobNumber);
+logger.info("Submitting wonky job "+jobNumber);
             this.task = task;
+
+
+            ServiceContact serviceContact = this.task.getService(0)
+                .getServiceContact();
+            String server = serviceContact.getContact();
+
+logger.debug("Contact is: "+server);
+            String[] splitServer = server.split("/");
+	    siteName = splitServer[0];
+            siteOptions = Arrays.asList(splitServer);
+
+
+
+            if(!failDelay("submitting")) {
+                this.task.setStatus(Status.FAILED);
+                return;
+            }
             task.setStatus(Status.SUBMITTING);
             JobSpecification spec;
             try {
@@ -85,6 +110,10 @@
                 synchronized(this) {
                     this.thread = new Thread(this);
                     if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
+                        if(!failDelay("submitted")) {
+                            this.task.setStatus(Status.FAILED);
+                            return;
+                        }
                         this.task.setStatus(Status.SUBMITTED);
                         this.thread.start();
                         if (spec.isBatchJob()) {
@@ -100,17 +129,17 @@
 
     public void suspend() throws InvalidSecurityContextException,
             TaskSubmissionException {
-System.out.println("Suspend called on wonky job "+jobNumber);
+logger.info("Suspend called on wonky job "+jobNumber);
     }
 
     public void resume() throws InvalidSecurityContextException,
             TaskSubmissionException {
-System.out.println("Resume called on wonky job "+jobNumber);
+logger.info("Resume called on wonky job "+jobNumber);
     }
 
     public void cancel() throws InvalidSecurityContextException,
             TaskSubmissionException {
-System.out.println("Cancel called on wonky job "+jobNumber);
+logger.info("Cancel called on wonky job "+jobNumber);
         synchronized(this) {
             killed = true;
             process.destroy();
@@ -123,30 +152,52 @@
 
     public void run() {
         try {
-System.out.println("Wonky job in queue, job number "+jobNumber);
+            JobSpecification spec = (JobSpecification) this.task
+                    .getSpecification();
+
+            File dir = null;
+            if (spec.getDirectory() != null) {
+                dir = new File(spec.getDirectory());
+            }
+logger.info("Wonky job in queue, job number "+jobNumber);
+String[] cmdarray = buildCmdArray(spec, jobNumber);
 try {
-if(firstJob) {
-System.out.println("not sleeping - this is the first job, "+jobNumber);
-firstJob=false; // mmmm racy
+logger.info("wonky site is "+siteName);
+	if(siteOptions.contains("firstfast") && !firstJobList.contains(siteName)) {
+logger.info("not sleeping - this is the first job on site "+siteName+", "+jobNumber);
+firstJobList.add(siteName);
 } else 
 {
-Thread.sleep(120*1000);
+double qmean=0;
+double qstddev=0;
+
+Iterator it = siteOptions.iterator();
+while(it.hasNext()) {
+String op = (String) it.next();
+logger.debug("Checking: "+op);
+if(op.startsWith("qmean=")) {
+qmean = Double.parseDouble(op.substring(6));
+logger.debug("qmean = "+qmean);
+} else if(op.startsWith("qstddev=")) {
+qstddev = Double.parseDouble(op.substring(8));
+logger.debug("qstddev = "+qstddev);
 }
+}
+
+double qdelay = qmean + qstddev * r.nextGaussian();
+
+Thread.sleep((long)(qdelay*1000));
+}
 } catch(InterruptedException ie) {
 System.out.println("fake queue delay interrupted for job "+jobNumber);
 }
 System.out.println("Wonky job running now");
-            // TODO move away from the multi-threaded approach
-            JobSpecification spec = (JobSpecification) this.task
-                    .getSpecification();
-
-            File dir = null;
-            if (spec.getDirectory() != null) {
-                dir = new File(spec.getDirectory());
+            process = Runtime.getRuntime().exec(cmdarray,
+                    buildEnvp(spec), dir);
+            if(!failDelay("active")) {
+                this.task.setStatus(Status.FAILED);
+                return;
             }
-
-            process = Runtime.getRuntime().exec(buildCmdArray(spec),
-                    buildEnvp(spec), dir);
             this.task.setStatus(Status.ACTIVE);
 
             // reusable byte buffer
@@ -195,7 +246,11 @@
             }
 System.err.println("Wonky job completed with exitCode "+exitCode);
             if (exitCode == 0) {
-                this.task.setStatus(Status.COMPLETED);
+                if(failDelay("completed")) {
+                    this.task.setStatus(Status.COMPLETED);
+                } else {
+                    this.task.setStatus(Status.FAILED);
+                }
 System.err.println("Wonky job status COMPLETED "+jobNumber);
             } else {
                 throw new JobException(exitCode);
@@ -274,16 +329,21 @@
         }
     }
 
-    private String[] buildCmdArray(JobSpecification spec) {
+    private String[] buildCmdArray(JobSpecification spec, int jobid) {
         List arguments = spec.getArgumentsAsList();
         String[] cmdarray = new String[arguments.size() + 1];
 
+        logger.debug("jobid "+jobid+" :"); // this loop is a console race...
         cmdarray[0] = spec.getExecutable();
         Iterator i = arguments.iterator();
         int index = 1;
         while (i.hasNext()) {
-            cmdarray[index++] = (String) i.next();
+            String s = (String)i.next();
+            logger.debug(s);
+            cmdarray[index++] = s;
+
         }
+        logger.debug("end of args");
         return cmdarray;
     }
 
@@ -307,4 +367,36 @@
         }
         return envp;
     }
+
+    boolean failDelay(String name) {
+        double failRate = getWonkyParam(name+"fail");
+        double delayTime = getWonkyParam(name+"delay");
+        System.out.println("Fail probability is: "+failRate);
+        if(Math.random()<failRate) {
+            System.out.println("Fail.");
+            return false;
+        } else {
+            System.out.println("Success.");
+            try { Thread.sleep((long)(delayTime * 1000)); }
+            catch(InterruptedException ie) {
+                logger.warn("Warning: wonky delay was interrupted");
+            }
+            return true;
+        }
+    }
+
+    double getWonkyParam(String name) {
+        Iterator it = siteOptions.iterator();
+        while(it.hasNext()) {
+            String op = (String) it.next();
+            logger.debug("Checking: "+op+" against '"+name+"='");
+
+            if(op.startsWith(name+"=")) {
+                String value = op.substring(name.length()+1);
+                System.out.println("Parameter "+name+" has value "+value);
+                return Double.parseDouble(value);
+            }
+        }
+        return 0;
+    }
 }




More information about the Swift-commit mailing list