[Swift-commit] r2075 - provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Mon Jun 16 09:35:09 CDT 2008
Author: benc
Date: 2008-06-16 09:35:08 -0500 (Mon, 16 Jun 2008)
New Revision: 2075
Modified:
provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java
Log:
more delay parameters
Modified: provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java
===================================================================
--- provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java 2008-06-15 23:45:15 UTC (rev 2074)
+++ provider-wonky/src/org/globus/cog/abstraction/impl/execution/wonky/JobSubmissionTaskHandler.java 2008-06-16 14:35:08 UTC (rev 2075)
@@ -13,9 +13,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Random;
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.StatusImpl;
@@ -29,22 +33,26 @@
import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.ServiceContact;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
-/**
- * @author Kaizar Amin (amin at mcs.anl.gov)
- *
- */
public class JobSubmissionTaskHandler implements DelegatedTaskHandler,
Runnable {
private static Logger logger = Logger
.getLogger(JobSubmissionTaskHandler.class);
-static boolean firstJob = true;
+static Random r = new Random();
+
+static List firstJobList = Collections.synchronizedList(new ArrayList());
+
static int globalJobNumber = 0;
-int jobNumber = globalJobNumber++; // racy
+int jobNumber = globalJobNumber++; // racy?
+List siteOptions;
+
+String siteName;
+
public static final int BUFFER_SIZE = 1024;
private Task task = null;
@@ -63,8 +71,25 @@
throw new TaskSubmissionException("Task is not in unsubmitted state");
}
else {
-System.out.println("Submitting wonky job "+jobNumber);
+logger.info("Submitting wonky job "+jobNumber);
this.task = task;
+
+
+ ServiceContact serviceContact = this.task.getService(0)
+ .getServiceContact();
+ String server = serviceContact.getContact();
+
+logger.debug("Contact is: "+server);
+ String[] splitServer = server.split("/");
+ siteName = splitServer[0];
+ siteOptions = Arrays.asList(splitServer);
+
+
+
+ if(!failDelay("submitting")) {
+ this.task.setStatus(Status.FAILED);
+ return;
+ }
task.setStatus(Status.SUBMITTING);
JobSpecification spec;
try {
@@ -85,6 +110,10 @@
synchronized(this) {
this.thread = new Thread(this);
if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
+ if(!failDelay("submitted")) {
+ this.task.setStatus(Status.FAILED);
+ return;
+ }
this.task.setStatus(Status.SUBMITTED);
this.thread.start();
if (spec.isBatchJob()) {
@@ -100,17 +129,17 @@
public void suspend() throws InvalidSecurityContextException,
TaskSubmissionException {
-System.out.println("Suspend called on wonky job "+jobNumber);
+logger.info("Suspend called on wonky job "+jobNumber);
}
public void resume() throws InvalidSecurityContextException,
TaskSubmissionException {
-System.out.println("Resume called on wonky job "+jobNumber);
+logger.info("Resume called on wonky job "+jobNumber);
}
public void cancel() throws InvalidSecurityContextException,
TaskSubmissionException {
-System.out.println("Cancel called on wonky job "+jobNumber);
+logger.info("Cancel called on wonky job "+jobNumber);
synchronized(this) {
killed = true;
process.destroy();
@@ -123,30 +152,52 @@
public void run() {
try {
-System.out.println("Wonky job in queue, job number "+jobNumber);
+ JobSpecification spec = (JobSpecification) this.task
+ .getSpecification();
+
+ File dir = null;
+ if (spec.getDirectory() != null) {
+ dir = new File(spec.getDirectory());
+ }
+logger.info("Wonky job in queue, job number "+jobNumber);
+String[] cmdarray = buildCmdArray(spec, jobNumber);
try {
-if(firstJob) {
-System.out.println("not sleeping - this is the first job, "+jobNumber);
-firstJob=false; // mmmm racy
+logger.info("wonky site is "+siteName);
+ if(siteOptions.contains("firstfast") && !firstJobList.contains(siteName)) {
+logger.info("not sleeping - this is the first job on site "+siteName+", "+jobNumber);
+firstJobList.add(siteName);
} else
{
-Thread.sleep(120*1000);
+double qmean=0;
+double qstddev=0;
+
+Iterator it = siteOptions.iterator();
+while(it.hasNext()) {
+String op = (String) it.next();
+logger.debug("Checking: "+op);
+if(op.startsWith("qmean=")) {
+qmean = Double.parseDouble(op.substring(6));
+logger.debug("qmean = "+qmean);
+} else if(op.startsWith("qstddev=")) {
+qstddev = Double.parseDouble(op.substring(8));
+logger.debug("qstddev = "+qstddev);
}
+}
+
+double qdelay = qmean + qstddev * r.nextGaussian();
+
+Thread.sleep((long)(qdelay*1000));
+}
} catch(InterruptedException ie) {
System.out.println("fake queue delay interrupted for job "+jobNumber);
}
System.out.println("Wonky job running now");
- // TODO move away from the multi-threaded approach
- JobSpecification spec = (JobSpecification) this.task
- .getSpecification();
-
- File dir = null;
- if (spec.getDirectory() != null) {
- dir = new File(spec.getDirectory());
+ process = Runtime.getRuntime().exec(cmdarray,
+ buildEnvp(spec), dir);
+ if(!failDelay("active")) {
+ this.task.setStatus(Status.FAILED);
+ return;
}
-
- process = Runtime.getRuntime().exec(buildCmdArray(spec),
- buildEnvp(spec), dir);
this.task.setStatus(Status.ACTIVE);
// reusable byte buffer
@@ -195,7 +246,11 @@
}
System.err.println("Wonky job completed with exitCode "+exitCode);
if (exitCode == 0) {
- this.task.setStatus(Status.COMPLETED);
+ if(failDelay("completed")) {
+ this.task.setStatus(Status.COMPLETED);
+ } else {
+ this.task.setStatus(Status.FAILED);
+ }
System.err.println("Wonky job status COMPLETED "+jobNumber);
} else {
throw new JobException(exitCode);
@@ -274,16 +329,21 @@
}
}
- private String[] buildCmdArray(JobSpecification spec) {
+ private String[] buildCmdArray(JobSpecification spec, int jobid) {
List arguments = spec.getArgumentsAsList();
String[] cmdarray = new String[arguments.size() + 1];
+ logger.debug("jobid "+jobid+" :"); // this loop is a console race...
cmdarray[0] = spec.getExecutable();
Iterator i = arguments.iterator();
int index = 1;
while (i.hasNext()) {
- cmdarray[index++] = (String) i.next();
+ String s = (String)i.next();
+ logger.debug(s);
+ cmdarray[index++] = s;
+
}
+ logger.debug("end of args");
return cmdarray;
}
@@ -307,4 +367,36 @@
}
return envp;
}
+
+ boolean failDelay(String name) {
+ double failRate = getWonkyParam(name+"fail");
+ double delayTime = getWonkyParam(name+"delay");
+ System.out.println("Fail probability is: "+failRate);
+ if(Math.random()<failRate) {
+ System.out.println("Fail.");
+ return false;
+ } else {
+ System.out.println("Success.");
+ try { Thread.sleep((long)(delayTime * 1000)); }
+ catch(InterruptedException ie) {
+ logger.warn("Warning: wonky delay was interrupted");
+ }
+ return true;
+ }
+ }
+
+ double getWonkyParam(String name) {
+ Iterator it = siteOptions.iterator();
+ while(it.hasNext()) {
+ String op = (String) it.next();
+ logger.debug("Checking: "+op+" against '"+name+"='");
+
+ if(op.startsWith(name+"=")) {
+ String value = op.substring(name.length()+1);
+ System.out.println("Parameter "+name+" has value "+value);
+ return Double.parseDouble(value);
+ }
+ }
+ return 0;
+ }
}
More information about the Swift-commit
mailing list