[Swift-commit] Cog update
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Tue Jan 10 10:40:12 CST 2012
------------------------------------------------------------------------
r3343 | davidkelly999 | 2012-01-10 10:37:52 -0600 (Tue, 10 Jan 2012) | 2 lines
Adding SGE modifications from 0.93 to trunk
------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueueInformation.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueueInformation.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueueInformation.java (revision 3343)
@@ -0,0 +1,62 @@
+package org.globus.cog.abstraction.impl.scheduler.sge;
+
+import java.util.ArrayList;
+
+/**
+ * Data structure for defining queue properties
+ */
+public class QueueInformation {
+
+ private String name;
+ private ArrayList<String> pe_list = new ArrayList<String>();
+ private int slots;
+ private String walltime;
+
+ /**
+ * Add data to queue information
+ * @param data Two dimensional array in the format of {"setting", "value is here"}
+ */
+ public void addData(String[] data) {
+ if(data.length != 2) return;
+ if(data[0].equals("h_rt")) setWalltime(data[1]);
+ if(data[0].equals("qname")) setName(data[1]);
+ if(data[0].equals("pe_list")) setPe_list(data[1]);
+ if(data[0].equals("slots")) setSlots(data[1]);
+ }
+
+ public String getWalltime() {
+ return walltime;
+ }
+
+ public void setWalltime(String walltime) {
+ this.walltime = walltime;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public ArrayList<String> getPe_list() {
+ return pe_list;
+ }
+
+ public void setPe_list(String list) {
+ this.pe_list.clear();
+ for(String s : list.split(" ")) {
+ this.pe_list.add(s);
+ }
+ }
+
+ public int getSlots() {
+ return slots;
+ }
+
+ public void setSlots(String slots) {
+ String slot_list[] = slots.split(",");
+ this.slots = Integer.valueOf(slot_list[0]);
+ }
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueuePoller.java (revision 3342)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueuePoller.java (working copy)
@@ -10,118 +10,280 @@
package org.globus.cog.abstraction.impl.scheduler.sge;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
+import java.util.Scanner;
import java.util.Set;
+import org.w3c.dom.CharacterData;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
import org.globus.cog.abstraction.impl.scheduler.common.Job;
+/**
+ * SGE queue poller class
+ */
public class QueuePoller extends AbstractQueuePoller {
- public static final Logger logger = Logger.getLogger(QueuePoller.class);
- private Set processed;
+ private static String[] CMDARRAY;
+ public static final Logger logger = Logger.getLogger(QueuePoller.class);
+ private Set<Object> processed;
+ private Hashtable<String, QueueInformation> queueInformation;
+ DocumentBuilder builder;
+ Document doc;
+
+ public QueuePoller(AbstractProperties properties) {
+ this("SGE provider queue poller", properties);
+ }
- public QueuePoller(AbstractProperties properties) {
- super("SGE provider queue poller", properties);
- processed = new HashSet();
- }
+ public QueuePoller(String name, AbstractProperties properties) {
+ super(name, properties);
+ try {
+ builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+ }
+ catch(Exception e) {
+ if(logger.isInfoEnabled()) {
+ logger.info(e.getMessage());
+ }
+ }
+ processed = new HashSet<Object>();
+ gatherQueueInformation();
+ }
+
+ /**
+ * getDataFromElement - Make XML parsing a bit easier
+ * @param e XML Element
+ * @return XML data as a String
+ */
+ public static String getDataFromElement(Element e) {
+ try {
+ Node child = e.getFirstChild();
+ if (child instanceof CharacterData) {
+ CharacterData cd = (CharacterData) child;
+ return cd.getData();
+ }
+ }
+
+ catch (Exception ex) {
+ logger.debug("Error in getDataFromElement");
+ logger.debug(ex.getMessage());
+ logger.debug(ex.getStackTrace());
+ }
+ return "";
+ }
- private static String[] CMDARRAY;
- protected synchronized String[] getCMDArray() {
- if (CMDARRAY == null) {
- CMDARRAY = new String[] { getProperties().getPollCommand() };
- }
- return CMDARRAY;
- }
+ /**
+ * gatherQueueInformation - Collect information about queues and PEs
+ */
+ private void gatherQueueInformation() {
+ queueInformation = new Hashtable<String, QueueInformation>();
+ String command[] = {
+ ((Properties) this.getProperties()).getConfigCommand(), "-sql" };
- // there's an XML options that the SGE qstat has. It's probably a safer
- // way to do this
- protected void processStdout(InputStream is) throws IOException {
- BufferedReader br = new BufferedReader(new InputStreamReader(is));
- String line;
- String header = br.readLine();
- // sge qstat outputs nothing when there are no jobs
- if (header != null) {
- int jobIDIndex = header.indexOf("job-ID");
- int stateIndex = header.indexOf("state");
- // skip the -----
- br.readLine();
- processed.clear();
- do {
- line = br.readLine();
- if (line != null) {
- String jobid = parseToWhitespace(line, jobIDIndex);
- String state = parseToWhitespace(line, stateIndex);
- if (jobid == null || jobid.equals("") || state == null
- || state.equals("")) {
- throw new IOException("Failed to parse qstat line: "
- + line);
- }
- Job job = getJob(jobid);
- if (job == null) {
- continue;
- }
- processed.add(jobid);
- if (state.contains("q") || state.contains("w")) {
- if (logger.isDebugEnabled()) {
- logger.debug(jobid + " is queued");
- }
- job.setState(Job.STATE_QUEUED);
- }
- else if (state.contains("r")) {
- if (logger.isDebugEnabled()) {
- logger.debug(jobid + " is running");
- }
- job.setState(Job.STATE_RUNNING);
- }
- else if (state.contains("E")) {
- job.fail("Job is in an error state. Try running qstat -j "
- + jobid + " to see why.");
- }
- }
- } while (line != null);
- }
- else {
- processed.clear();
- }
- Iterator i = getJobs().entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- String id = (String) e.getKey();
- Job job = (Job) e.getValue();
- if (!processed.contains(id)) {
- if (logger.isDebugEnabled()) {
- logger.debug(id + " is done");
- }
- job.setState(Job.STATE_DONE);
- if (job.getState() == Job.STATE_DONE) {
- addDoneJob(id);
- }
- }
- else {
- // at least on Ranger the job is done long
- // before qstat reports it as done, so check
- // if the exit code file is there
- File f = new File(job.getExitcodeFileName());
- if (f.exists()) {
- job.setState(Job.STATE_DONE);
- if (job.getState() == Job.STATE_DONE) {
- addDoneJob(id);
- }
- }
- }
- }
- }
+ try {
+ // Get queue names
+ Process p = Runtime.getRuntime().exec(command);
+ InputStream is = p.getInputStream();
+ try {
+ p.waitFor();
+ } catch (InterruptedException e1) {
+ logger.error("QueuePoller command failed");
+ logger.error(e1.getMessage());
+ }
- protected void processStderr(InputStream is) throws IOException {
- }
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ String line = "";
+
+ while ((line = br.readLine()) != null) {
+ queueInformation.put(line, new QueueInformation());
+ }
+
+ // Get info about each queue
+ for (String queue : queueInformation.keySet()) {
+
+ command = new String[] {
+ ((Properties) this.getProperties()).getConfigCommand(),
+ "-sq", queue };
+ p = Runtime.getRuntime().exec(command);
+ try {
+ p.waitFor();
+ } catch (InterruptedException e) {
+ logger.error("QueuePoller command interrupted");
+ logger.error(e.getMessage());
+ }
+ is = p.getInputStream();
+ isr = new InputStreamReader(is);
+ br = new BufferedReader(isr);
+
+ while ((line = br.readLine()) != null) {
+ String results[] = line.split("\\s+", 2);
+ queueInformation.get(queue).addData(results);
+ }
+ }
+ } catch (IOException e) {
+ logger.error("QueuePoller command interrupted");
+ logger.error(e.getMessage());
+ logger.error(e.getStackTrace());
+ }
+ }
+
+ /**
+ * getAllQueues - Get a list of queues in a list
+ * @return ArrayList<String> of queues
+ */
+ public ArrayList<String> getAllQueues() {
+ ArrayList<String> result = new ArrayList<String>();
+ for (String s : queueInformation.keySet()) {
+ result.add(s);
+ }
+ return result;
+ }
+
+ /**
+ * getCMDArray - Return poll command
+ * @return String array contains poll command and flags
+ */
+ protected synchronized String[] getCMDArray() {
+ if (CMDARRAY == null) {
+ CMDARRAY = getProperties().getPollCommand().split(" ");
+ }
+ return CMDARRAY;
+ }
+
+ /**
+ * Return queue information for a requested queue
+ * @param queue
+ * String of queue name
+ * @return QueueInformation for requested queue
+ */
+ public QueueInformation getQueueInformation(String queue) {
+ return queueInformation.get(queue);
+ }
+
+ /**
+ * isValidQueue - Determine if queue is valid on this system
+ * @param queue Queue name
+ * @return True if queue exists, false otherwise
+ */
+ public boolean isValidQueue(String queue) {
+ if (queueInformation.keySet().contains(queue))
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * processStderr - defines how to handle errors from the queue poller
+ * @param InputStream
+ */
+ protected void processStderr(InputStream is) throws IOException {
+ String error = new Scanner(is).useDelimiter("\\A").next();
+ if(logger.isDebugEnabled()) {
+ logger.debug("QueuePoller error: " + error);
+ }
+ }
+
+ /**
+ * processStdout - Process poller output and determine the status of jobs
+ * Uses XML to parse, which requires SGE 6.0 or later
+ * @param InputStream is - stream representing output
+ */
+ protected void processStdout(InputStream is) throws IOException {
+ try {
+ String xml = new Scanner(is).useDelimiter("\\A").next();
+ if(logger.isDebugEnabled()) {
+ logger.debug("QueuePoller XML: " + xml);
+ }
+ InputStream is_copy = new ByteArrayInputStream(xml.getBytes());
+ doc = builder.parse(is_copy);
+ processed.clear();
+ NodeList nodes = doc.getElementsByTagName("job_list");
+ Job tmpJob;
+
+ for (int i = 0; i < nodes.getLength(); i++) {
+ Element element = (Element) nodes.item(i);
+ NodeList nodeList = element.getElementsByTagName("JB_job_number");
+ Element line = (Element) nodeList.item(0);
+ String jobid = getDataFromElement(line);
+ tmpJob = getJob(jobid);
+
+ if (tmpJob == null) {
+ continue;
+ }
+
+ processed.add(jobid);
+ nodeList = element.getElementsByTagName("state");
+ line = (Element) nodeList.item(0);
+ String state = getDataFromElement(line);
+
+ if (state.contains("q") || state.contains("w")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(jobid + " is queued");
+ }
+ tmpJob.setState(Job.STATE_QUEUED);
+ } else if (state.contains("r")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(jobid + " is running");
+ }
+ tmpJob.setState(Job.STATE_RUNNING);
+ } else if (state.contains("E")) {
+ tmpJob.fail("Job is in an error state. Try running qstat -j "
+ + jobid + " to see why.");
+ }
+ }
+
+ Iterator i = getJobs().entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ String id = (String) e.getKey();
+ Job job = (Job) e.getValue();
+ if (!processed.contains(id)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(id + " is done");
+ }
+ job.setState(Job.STATE_DONE);
+ if (job.getState() == Job.STATE_DONE) {
+ addDoneJob(id);
+ } else {
+ // at least on Ranger the job is done long
+ // before qstat reports it as done, so check
+ // if the exit code file is there
+ File f = new File(job.getExitcodeFileName());
+ if (f.exists()) {
+ job.setState(Job.STATE_DONE);
+ if (job.getState() == Job.STATE_DONE) {
+ addDoneJob(id);
+ }
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Exception in processStdout");
+ logger.debug(e.getStackTrace());
+ }
+ }
+ }
}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/Properties.java (revision 3342)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/Properties.java (working copy)
@@ -9,22 +9,25 @@
*/
package org.globus.cog.abstraction.impl.scheduler.sge;
-import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
public class Properties extends AbstractProperties {
- private static Logger logger = Logger.getLogger(Properties.class);
+ private static final long serialVersionUID = 1L;
public static final String PROPERTIES = "provider-sge.properties";
-
public static final String POLL_INTERVAL = "poll.interval";
public static final String QSUB = "qsub";
public static final String QSTAT = "qstat";
public static final String QDEL = "qdel";
+ public static final String QCONF = "qconf";
public static final String DEFAULT_PE = "parallel.environment";
-
+ public static final String SUBMISSION_DELAY = "submission.delay";
private static Properties properties;
+ /**
+ * getProperties - return properties
+ * @return Properties object representing SGE properties
+ */
public static synchronized Properties getProperties() {
if (properties == null) {
properties = new Properties();
@@ -33,33 +36,88 @@
return properties;
}
- protected void setDefaults() {
- setPollInterval(10);
- setSubmitCommand("qsub");
- setPollCommand("qstat");
- setRemoveCommand("qdel");
- setDefaultPE("threaded");
+ /**
+ * Get the config command
+ * @return String with config command
+ */
+ public String getConfigCommand() {
+ return QCONF;
}
+ /**
+ * getDefaultPE - Get the default parallel environment
+ * @return String containing pe
+ */
+ public String getDefaultPE() {
+ return getProperty(DEFAULT_PE);
+ }
+ /**
+ * getPollCommandName - Get poll command name
+ * @return String containing poll command
+ */
public String getPollCommandName() {
return QSTAT;
}
-
+ /**
+ * getRemoveCommandName - Get remove command
+ * @return String of command on how to remove a job
+ */
public String getRemoveCommandName() {
return QDEL;
}
-
+ /**
+ * getSubmissionDelay - Get length to sleep before submitting a job
+ * Value as a string representing milliseconds
+ * @return Submission delay as String in milliseconds
+ */
+ public String getSubmissionDelay() {
+ return getProperty(SUBMISSION_DELAY);
+ }
+
+ /**
+ * getSubmitCommandName - Get submit command
+ * @return String of submit command
+ */
public String getSubmitCommandName() {
return QSUB;
}
-
+
+ /**
+ * Set the default config command
+ * Used for gathering information about the queues
+ * @param config String with command name
+ */
+ public void setConfigCommand(String config) {
+ setProperty(QCONF, config);
+ }
+ /**
+ * setDefaultPE - set the default parallel environment
+ * @param pe String representing pe
+ */
public void setDefaultPE(String pe) {
setProperty(DEFAULT_PE, pe);
}
-
- public String getDefaultPE() {
- return getProperty(DEFAULT_PE);
+
+ /**
+ * setDefault - Reset all SGE options to default
+ */
+ protected void setDefaults() {
+ setPollInterval(10);
+ setSubmitCommand("qsub");
+ setPollCommand("qstat -xml");
+ setRemoveCommand("qdel");
+ setDefaultPE("threaded");
+ setConfigCommand("qconf");
+ setSubmissionDelay("0");
}
+
+ /**
+ * setSubmissionDelay - set the submission delay
+ * @param delay String representing milliseconds to sleep
+ */
+ private void setSubmissionDelay(String delay) {
+ setProperty(SUBMISSION_DELAY, delay);
+ }
}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/SGEExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/SGEExecutor.java (revision 3342)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/SGEExecutor.java (working copy)
@@ -4,15 +4,16 @@
//This message may not be removed or altered.
//----------------------------------------------------------------------
-/*
- * Created on Oct 11, 2005
- */
package org.globus.cog.abstraction.impl.scheduler.sge;
import java.io.BufferedReader;
import java.io.CharArrayReader;
+import java.io.File;
import java.io.IOException;
import java.io.Writer;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
@@ -24,21 +25,279 @@
import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.impl.scheduler.pbs.PBSExecutor;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.gsi.gssapi.auth.AuthorizationException;
+import org.ietf.jgss.GSSException;
+/**
+ * Java CoG interface for Sun/Oracle Grid Engine
+ */
public class SGEExecutor extends AbstractExecutor {
- public static final Logger logger = Logger.getLogger(SGEExecutor.class);
+ public static final Pattern JOB_ID_LINE = Pattern.compile(".*[Yy]our job (\\d+) \\(.*\\) has been submitted");
+ public static final Logger logger = Logger.getLogger(SGEExecutor.class);
+ private static QueuePoller poller;
+ private static final String[] QSUB_PARAMS = new String[] {};
+ private static int unique = 0;
+ private static NumberFormat IDF = new DecimalFormat("000000");
+
public SGEExecutor(Task task, ProcessListener listener) {
super(task, listener);
+ verifyQueueInformation();
}
+
+ /**
+ * Create a new Job
+ * @param jobid - String representing SGE job ID
+ * @param stdout
+ * @param stdOutputLocation
+ * @param stderr
+ * @param stdErrorLocation
+ * @param exitcode
+ * @param executor
+ */
+ protected Job createJob(String jobid, String stdout,
+ FileLocation stdOutputLocation, String stderr,
+ FileLocation stdErrorLocation, String exitcode,
+ AbstractExecutor executor) {
+ return new Job(jobid, stdout, stdOutputLocation, stderr,
+ stdErrorLocation, exitcode, executor);
+ }
+ /**
+ * Return additional submit parameters
+ * @return String representing qsub parameters
+ */
+ protected String[] getAdditionalSubmitParameters() {
+ return QSUB_PARAMS;
+ }
+
+ /**
+ * Get an attribute from a job specification
+ * If the attribute does not exist, return the default value
+ * @param spec JobSpecification to search
+ * @param name Attribute to search for
+ * @param defaultValue This value is return if the attribute is not found
+ * @return String representing an attribute (if found) or the default value
+ */
+ private String getAttribute(JobSpecification spec, String name,
+ String defaultValue) {
+ Object value = spec.getAttribute(name);
+ if (value == null) {
+ return defaultValue;
+ }
+ else {
+ return value.toString();
+ }
+ }
+
+ /**
+ * getName - Return the name of this provider
+ * @return String representing provider name
+ */
+ protected String getName() {
+ return "SGE";
+ }
+
+ /**
+ * getProperties - Return SGE properties
+ * @return Properties as an AbstractProperties object
+ */
+ protected AbstractProperties getProperties() {
+ return Properties.getProperties();
+ }
+
+ /**
+ * getQueuePoller - return the Queue Poller
+ * @return AbstractQueuePoller
+ */
+ protected AbstractQueuePoller getQueuePoller() {
+ synchronized (SGEExecutor.class) {
+ if (poller == null) {
+ poller = new QueuePoller(getProperties());
+ poller.start();
+ }
+ return poller;
+ }
+ }
+
+ /**
+ * getSGEProperties - Return SGE properties
+ * @return Properties as a Properties object
+ */
+ protected Properties getSGEProperties() {
+ return (Properties) getProperties();
+ }
+
+ /**
+ * Create SGE job name
+ * @param task
+ * @return String containing task name
+ */
+ private String makeName(Task task) {
+ String name = task.getName();
+ if (name == null) {
+ int i = 0;
+ synchronized(SGEExecutor.class) {
+ i = unique++;
+ }
+ name = "cog-" + IDF.format(i);
+ }
+ else if (name.length() > 15) {
+ name = name.substring(0, 15);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("SGE name: for: " + task.getName() +
+ " is: " + name);
+ }
+ return name;
+ }
+
+ /**
+ * parseSubmitCommandOutput - Given qsub output, return job ID
+ * @param out - String that contains qsub output
+ * @return String containing job id
+ * @throws IOException
+ */
+ protected String parseSubmitCommandOutput(String out) throws IOException {
+ // > your job 2494189 ("t1.sub") has been submitted
+ BufferedReader br = new BufferedReader(new CharArrayReader(out.toCharArray()));
+ String line = br.readLine();
+ while (line != null) {
+ Matcher m = JOB_ID_LINE.matcher(line);
+ if (m.matches()) {
+ String id = m.group(1);
+ if (logger.isInfoEnabled()) {
+ logger.info("Job id from qsub: " + id);
+ }
+ return id;
+ }
+ line = br.readLine();
+ }
+ throw new IOException("None of the qsub lines matches the required patten: " + JOB_ID_LINE);
+ }
+
+ /**
+ * @see AbstractExecutor#start()
+ */
+ public void start() throws AuthorizationException, IOException, ProcessException {
+ try {
+ Thread.sleep(Integer.valueOf(getSGEProperties().getSubmissionDelay()));
+ }
+ catch (InterruptedException e) {
+ logger.error(e.getStackTrace());
+ }
+ super.start();
+ }
+
+ /**
+ * Check that job specification values are valid for this system
+ * @throws IllegalArgumentException
+ */
+ private void verifyQueueInformation() {
+
+ // A queue must be defined in order to gather information about it
+ poller = (QueuePoller) getQueuePoller();
+ JobSpecification spec = getSpec();
+ String queue = (String) spec.getAttribute("queue");
+ if(queue == null) {
+ logger.error("Error: No queue defined");
+ return;
+ }
+
+ QueueInformation qi = poller.getQueueInformation(queue);
+ String error="";
+
+ // Verify the queue is available
+ if(!poller.isValidQueue(queue)) {
+ error = "Invalid queue \"" + queue + "\"\nAvailable queues are: ";
+ for(String s : poller.getAllQueues()) {
+ error += s + " ";
+ }
+ logger.error(error);
+ return;
+ }
+
+ // Check that pe is defined
+ String pe = (String) spec.getAttribute("pe");
+ if(pe == null) {
+ error = "Error: No parallel environment specified";
+ logger.error(error);
+ return;
+ }
+
+ // Check that pe is available for the queue
+ if(!qi.getPe_list().contains(pe)) {
+ error = "Parallel environment " + pe + " is not valid for " + queue + " queue\n";
+ error += "Valid PEs are: ";
+ for(String s : qi.getPe_list()) {
+ error += s + " ";
+ }
+ logger.error(error);
+ return;
+ }
+
+ // Check that requested walltime fits into time limits
+ String maxWalltimeAttribute = (String) spec.getAttribute("maxwalltime");
+ if(maxWalltimeAttribute != null) {
+ int requestedWalltimeSeconds = WallTime.timeToSeconds(maxWalltimeAttribute);
+ String queueWalltimeString = qi.getWalltime();
+ if(!queueWalltimeString.equalsIgnoreCase("INFINITY")) {
+ int queueWalltime = WallTime.timeToSeconds(queueWalltimeString);
+ if(requestedWalltimeSeconds > queueWalltime) {
+ error = "Requested wall time of " + requestedWalltimeSeconds
+ + " seconds is greater than queue limit of " + queueWalltime;
+ }
+ }
+ }
+
+ // Give a warning if CPUs are being underutilized (this may be intentional due to memory restrictions)
+ Object jobsPerNodeAttribute = spec.getAttribute("jobsPerNode");
+ if(jobsPerNodeAttribute != null) {
+ String jobsPerNode = String.valueOf(jobsPerNodeAttribute);
+ if(Integer.valueOf(jobsPerNode) < qi.getSlots()) {
+ if(logger.isInfoEnabled()) {
+ logger.info("Requesting only " + jobsPerNode + "/" + qi.getSlots() + " CPUs per node");
+ }
+ }
+ }
+ }
+
+
+ /**
+ * writeAttr - Write a specification attribute to a submit file
+ * If the attribute is not found, use null
+ * @param attrName Specification attribute to write
+ * @param arg The SGE argument (eg. -N for the job name)
+ * @param wr A Writer object representing the submit file
+ * @throws IOException
+ */
+ protected void writeAttr(String attrName, String arg, Writer wr)
+ throws IOException {
+
+ writeAttr(attrName, arg, wr, null);
+ }
+
+
+
+ /**
+ * writeAttr - Write a specification attribute to a submit file
+ *
+ * @param attrName Specification attribute to write
+ * @param arg The SGE argument (eg. -N for the job name)
+ * @param wr A Writer object representing the submit file
+ * @param defaultValue If the requested attribute is not found, use this default value
+ * @throws IOException
+ */
protected void writeAttr(String attrName, String arg, Writer wr,
String defaultValue) throws IOException {
- Object value = getSpec().getAttribute(attrName);
+
+ Object value = getSpec().getAttribute(attrName);
if (value != null) {
wr.write("#$ " + arg + String.valueOf(value) + '\n');
}
@@ -47,42 +306,64 @@
}
}
- protected void writeAttr(String attrName, String arg, Writer wr)
+ /**
+ * writeMultiJobPreamble - Add multiple jobs to a single submit file
+ * @param wr Writer A Writer object representing the submit file
+ * @param exitcodefile Filename where application exit code should be written
+ * @throws IOException
+ */
+ protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
throws IOException {
- writeAttr(attrName, arg, wr, null);
+ wr.write("NODES=`cat $PE_HOSTFILE | awk '{ for(i=0;i<$2;i++){print $1} }'`\n");
+ wr.write("ECF=" + exitcodefile + "\n");
+ wr.write("INDEX=0\n");
+ wr.write("for NODE in $NODES; do\n");
+ wr.write(" echo \"N\" >$ECF.$INDEX\n");
+ wr.write(" ssh $NODE /bin/bash -c \\\" \"");
}
- protected void writeWallTime(Writer wr) throws IOException {
- Object walltime = getSpec().getAttribute("maxwalltime");
- if (walltime != null) {
- wr.write("#$ -l h_rt="
- + WallTime.normalize(walltime.toString(), "sge-native")
- + '\n');
- }
- }
+ /**
+ * writeScript - Write the SGE submit script
+ * @param wr A Writer object representing the submit file
+ * @param exitcodefile Filename where exit code will be written
+ * @param stdout Filename where standard output will be written
+ * @param stderr Filename where standard error will be written
+ * @throws IOException
+ */
protected void writeScript(Writer wr, String exitcodefile, String stdout,
String stderr) throws IOException {
- Task task = getTask();
+
+ Task task = getTask();
JobSpecification spec = getSpec();
+
String type = (String) spec.getAttribute("jobType");
boolean multiple = false;
if ("multiple".equals(type)) {
multiple = true;
}
-
+
+ int count = Integer.valueOf(getAttribute(spec, "count", "1"));
+ String queue = (String)spec.getAttribute("queue");
+
+ int coresPerNode = Integer.valueOf(getAttribute(spec, "coresPerNode",
+ String.valueOf(poller.getQueueInformation(queue).getSlots())));
+ int jobsPerNode = Integer.valueOf(getAttribute(spec, "jobsPerNode",
+ String.valueOf(coresPerNode)));
+ int coresToRequest = ( count * jobsPerNode + coresPerNode - 1) / coresPerNode * coresPerNode;
+
wr.write("#!/bin/bash\n");
- wr.write("#$ -N " + task.getName() + '\n');
- // ranger requires this. might as well be default
+ wr.write("#$ -N " + makeName(task) + '\n');
wr.write("#$ -V\n");
writeAttr("project", "-A ", wr);
-
- writeAttr("count", "-pe "
- + getAttribute(spec, "pe", getSGEProperties().getDefaultPE())
- + " ", wr, "1");
-
+ writeAttr("queue", "-q ", wr);
+
+ String peValue = "-pe " + getAttribute(spec, "pe", getSGEProperties().getDefaultPE()) + " ";
+ writeAttr("null", peValue, wr, String.valueOf(coresToRequest));
+
writeWallTime(wr);
- writeAttr("queue", "-q ", wr);
+ writeSoftWallTime(wr);
+
if (spec.getStdInput() != null) {
wr.write("#$ -i " + quote(spec.getStdInput()) + '\n');
}
@@ -91,9 +372,9 @@
if (!spec.getEnvironmentVariableNames().isEmpty()) {
wr.write("#$ -v ");
- Iterator i = spec.getEnvironmentVariableNames().iterator();
+ Iterator<String> i = spec.getEnvironmentVariableNames().iterator();
while (i.hasNext()) {
- String name = (String) i.next();
+ String name = i.next();
wr.write(name);
wr.write('=');
wr.write(quote(spec.getEnvironmentVariable(name)));
@@ -107,6 +388,7 @@
if (logger.isDebugEnabled()) {
logger.debug("Job type: " + type);
}
+
if (type != null) {
String wrapper = Properties.getProperties().getProperty(
"wrapper." + type);
@@ -122,6 +404,21 @@
logger.debug("Wrapper after variable substitution: " + wrapper);
}
}
+
+ wr.write("\nwrite_exitcode()\n");
+ wr.write("{\n");
+ wr.write("echo $1 > " + exitcodefile + "\n");
+ wr.write("exit 0\n");
+ wr.write("}\n\n");
+
+ wr.write("# Trap all signals\n");
+ wr.write("SIGNAL=1\n");
+ wr.write("while [ $SIGNAL -le 30 ];\n");
+ wr.write("do\n");
+ wr.write(" trap \"echo Received signal $SIGNAL; write_exitcode $SIGNAL\" $SIGNAL\n");
+ wr.write(" (( SIGNAL+=1 ))\n");
+ wr.write("done\n\n");
+
if (spec.getDirectory() != null) {
wr.write("cd " + quote(spec.getDirectory()) + " && ");
}
@@ -129,11 +426,12 @@
if (multiple) {
writeMultiJobPreamble(wr, exitcodefile);
}
+
wr.write(quote(spec.getExecutable()));
- List args = spec.getArgumentsAsList();
+ List<String> args = spec.getArgumentsAsList();
if (args != null && args.size() > 0) {
wr.write(' ');
- Iterator i = args.iterator();
+ Iterator<String> i = args.iterator();
while (i.hasNext()) {
wr.write(quote((String) i.next()));
if (i.hasNext()) {
@@ -145,92 +443,41 @@
if (spec.getStdInput() != null) {
wr.write(" < " + quote(spec.getStdInput()));
}
+
if (multiple) {
writeMultiJobPostamble(wr);
+ } else {
+ wr.write(" &\n");
+ wr.write("wait $!\n");
+ wr.write("write_exitcode $?\n");
}
- else {
- wr.write('\n');
- wr.write("/bin/echo $? >" + exitcodefile + '\n');
- }
wr.close();
}
-
- protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
- throws IOException {
- wr.write("NODES=`cat $PE_HOSTFILE | awk '{ for(i=0;i<$2;i++){print $1} }'`\n");
- wr.write("ECF=" + exitcodefile + "\n");
- wr.write("INDEX=0\n");
- wr.write("for NODE in $NODES; do\n");
- wr.write(" echo \"N\" >$ECF.$INDEX\n");
- wr.write(" ssh $NODE /bin/bash -c \\\" \"");
- }
-
- private String getAttribute(JobSpecification spec, String name,
- String defaultValue) {
- Object value = spec.getAttribute(name);
- if (value == null) {
- return defaultValue;
+
+ /**
+ * writeWallTime - Convert time into correct format and write to submit file
+ * Use maxtime first is available, otherwise use maxwalltime
+ * @param wr Writer A Writer object representing the submit file
+ * @throws IOException
+ */
+ protected void writeWallTime(Writer wr) throws IOException {
+ Object walltime = getSpec().getAttribute("maxwalltime");
+ if (walltime != null) {
+ wr.write("#$ -l h_rt="
+ + WallTime.normalize(walltime.toString(), "sge-native")
+ + '\n');
}
- else {
- return value.toString();
- }
}
-
- protected String getName() {
- return "SGE";
- }
-
- protected AbstractProperties getProperties() {
- return Properties.getProperties();
- }
- protected Properties getSGEProperties() {
- return (Properties) getProperties();
+ protected void writeSoftWallTime(Writer wr) throws IOException {
+ String walltime = (String)getSpec().getAttribute("maxwalltime");
+ if (walltime != null) {
+ int walltimeSeconds = WallTime.timeToSeconds(walltime) - 10;
+ wr.write("#$ -l s_rt="
+ + WallTime.format("sge-native", walltimeSeconds)
+ + '\n');
+ }
}
- public static final Pattern JOB_ID_LINE = Pattern.compile(".*[Yy]our job (\\d+) \\(.*\\) has been submitted");
- protected String parseSubmitCommandOutput(String out) throws IOException {
- // > your job 2494189 ("t1.sub") has been submitted
- BufferedReader br = new BufferedReader(new CharArrayReader(out.toCharArray()));
- String line = br.readLine();
- while (line != null) {
- Matcher m = JOB_ID_LINE.matcher(line);
- if (m.matches()) {
- String id = m.group(1);
- if (logger.isInfoEnabled()) {
- logger.info("Job id from qsub: " + id);
- }
- return id;
- }
- line = br.readLine();
- }
- throw new IOException("None of the qsub lines matches the required patten: " + JOB_ID_LINE);
- }
-
- private static final String[] QSUB_PARAMS = new String[] {};
-
- protected String[] getAdditionalSubmitParameters() {
- return QSUB_PARAMS;
- }
-
- protected Job createJob(String jobid, String stdout,
- FileLocation stdOutputLocation, String stderr,
- FileLocation stdErrorLocation, String exitcode,
- AbstractExecutor executor) {
- return new Job(jobid, stdout, stdOutputLocation, stderr,
- stdErrorLocation, exitcode, executor);
- }
-
- private static QueuePoller poller;
-
- protected AbstractQueuePoller getQueuePoller() {
- synchronized (SGEExecutor.class) {
- if (poller == null) {
- poller = new QueuePoller(getProperties());
- poller.start();
- }
- return poller;
- }
- }
}
More information about the Swift-commit
mailing list