[Swift-commit] Cog update

swift at ci.uchicago.edu swift at ci.uchicago.edu
Tue Jan 10 10:40:12 CST 2012


------------------------------------------------------------------------
r3343 | davidkelly999 | 2012-01-10 10:37:52 -0600 (Tue, 10 Jan 2012) | 2 lines

Adding SGE modifications from 0.93 to trunk

------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueueInformation.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueueInformation.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueueInformation.java	(revision 3343)
@@ -0,0 +1,62 @@
+package org.globus.cog.abstraction.impl.scheduler.sge;
+
+import java.util.ArrayList;
+
+/**
+ * Data structure for defining queue properties
+ */
+public class QueueInformation {
+
+	private String name;
+	private ArrayList<String> pe_list = new ArrayList<String>();
+	private int slots;
+	private String walltime;
+
+	/**
+	 * Add data to queue information
+	 * @param data Two dimensional array in the format of {"setting", "value is here"}
+	 */
+	public void addData(String[] data) {
+		if(data.length != 2) return;
+		if(data[0].equals("h_rt")) setWalltime(data[1]);
+		if(data[0].equals("qname")) setName(data[1]);
+		if(data[0].equals("pe_list")) setPe_list(data[1]);
+		if(data[0].equals("slots")) setSlots(data[1]);
+	}
+	
+	public String getWalltime() {
+		return walltime;
+	}
+
+	public void setWalltime(String walltime) {
+		this.walltime = walltime;
+	}
+
+	public String getName() {
+		return name;
+	}
+	
+	public void setName(String name) {
+		this.name = name;
+	}
+	
+	public ArrayList<String> getPe_list() {
+		return pe_list;
+	}
+	
+	public void setPe_list(String list) {
+		this.pe_list.clear();
+		for(String s : list.split(" ")) {
+			this.pe_list.add(s);
+		}
+	}
+	
+	public int getSlots() {
+		return slots;
+	}
+	
+	public void setSlots(String slots) {
+		String slot_list[] = slots.split(",");
+		this.slots = Integer.valueOf(slot_list[0]);
+	}
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueuePoller.java	(revision 3342)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/QueuePoller.java	(working copy)
@@ -10,118 +10,280 @@
 package org.globus.cog.abstraction.impl.scheduler.sge;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Scanner;
 import java.util.Set;
 
+import org.w3c.dom.CharacterData;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
 import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
 import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
 import org.globus.cog.abstraction.impl.scheduler.common.Job;
 
+/**
+ * SGE queue poller class
+ */
 public class QueuePoller extends AbstractQueuePoller {
-    public static final Logger logger = Logger.getLogger(QueuePoller.class);
 
-    private Set processed;
+	private static String[] CMDARRAY;
+	public static final Logger logger = Logger.getLogger(QueuePoller.class);
+	private Set<Object> processed;
+	private Hashtable<String, QueueInformation> queueInformation;
+	DocumentBuilder builder;
+	Document doc;
+	
+	public QueuePoller(AbstractProperties properties) {
+		this("SGE provider queue poller", properties);
+	}
 
-    public QueuePoller(AbstractProperties properties) {
-        super("SGE provider queue poller", properties);
-        processed = new HashSet();
-    }
+	public QueuePoller(String name, AbstractProperties properties) {
+		super(name, properties);
+		try {
+			builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+		}
+		catch(Exception e) {
+			if(logger.isInfoEnabled()) {
+				logger.info(e.getMessage());
+			}
+		}
+		processed = new HashSet<Object>();
+		gatherQueueInformation();
+	}
+	
+	/**
+	 * getDataFromElement - Make XML parsing a bit easier 
+	 * @param e XML Element
+	 * @return XML data as a String
+	 */
+	public static String getDataFromElement(Element e) {
+		try {
+			Node child = e.getFirstChild();
+			if (child instanceof CharacterData) {
+				CharacterData cd = (CharacterData) child;
+				return cd.getData();
+			}
+		}
+		
+		catch (Exception ex) {
+			logger.debug("Error in getDataFromElement");
+			logger.debug(ex.getMessage());
+			logger.debug(ex.getStackTrace());
+		}
+		return "";
+	}
 
-    private static String[] CMDARRAY;
 
-    protected synchronized String[] getCMDArray() {
-        if (CMDARRAY == null) {
-            CMDARRAY = new String[] { getProperties().getPollCommand() };
-        }
-        return CMDARRAY;
-    }
+	/** 
+	 * gatherQueueInformation - Collect information about queues and PEs
+	 */
+	private void gatherQueueInformation() {
+		queueInformation = new Hashtable<String, QueueInformation>();
+		String command[] = {
+				((Properties) this.getProperties()).getConfigCommand(), "-sql" };
 
-    // there's an XML options that the SGE qstat has. It's probably a safer
-    // way to do this
-    protected void processStdout(InputStream is) throws IOException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(is));
-        String line;
-        String header = br.readLine();
-        // sge qstat outputs nothing when there are no jobs
-        if (header != null) {
-            int jobIDIndex = header.indexOf("job-ID");
-            int stateIndex = header.indexOf("state");
-            // skip the -----
-            br.readLine();
-            processed.clear();
-            do {
-                line = br.readLine();
-                if (line != null) {
-                    String jobid = parseToWhitespace(line, jobIDIndex);
-                    String state = parseToWhitespace(line, stateIndex);
-                    if (jobid == null || jobid.equals("") || state == null
-                            || state.equals("")) {
-                        throw new IOException("Failed to parse qstat line: "
-                                + line);
-                    }
-                    Job job = getJob(jobid);
-                    if (job == null) {
-                        continue;
-                    }
-                    processed.add(jobid);
-                    if (state.contains("q") || state.contains("w")) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug(jobid + " is queued");
-                        }
-                        job.setState(Job.STATE_QUEUED);
-                    }
-                    else if (state.contains("r")) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug(jobid + " is running");
-                        }
-                        job.setState(Job.STATE_RUNNING);
-                    }
-                    else if (state.contains("E")) {
-                        job.fail("Job is in an error state. Try running qstat -j "
-                                    + jobid + " to see why.");
-                    }
-                }
-            } while (line != null);
-        }
-        else {
-            processed.clear();
-        }
-        Iterator i = getJobs().entrySet().iterator();
-        while (i.hasNext()) {
-            Map.Entry e = (Map.Entry) i.next();
-            String id = (String) e.getKey();
-            Job job = (Job) e.getValue();
-            if (!processed.contains(id)) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(id + " is done");
-                }
-                job.setState(Job.STATE_DONE);
-                if (job.getState() == Job.STATE_DONE) {
-                    addDoneJob(id);
-                }
-            }
-            else {
-                // at least on Ranger the job is done long
-                // before qstat reports it as done, so check
-                // if the exit code file is there
-                File f = new File(job.getExitcodeFileName());
-                if (f.exists()) {
-                    job.setState(Job.STATE_DONE);
-                    if (job.getState() == Job.STATE_DONE) {
-                        addDoneJob(id);
-                    }
-                }
-            }
-        }
-    }
+		try {
+			// Get queue names
+			Process p = Runtime.getRuntime().exec(command);
+			InputStream is = p.getInputStream();
+			try {
+				p.waitFor();
+			} catch (InterruptedException e1) {
+				logger.error("QueuePoller command failed");
+				logger.error(e1.getMessage());
+			}
 
-    protected void processStderr(InputStream is) throws IOException {
-    }
+			InputStreamReader isr = new InputStreamReader(is);
+			BufferedReader br = new BufferedReader(isr);
+			String line = "";
+
+			while ((line = br.readLine()) != null) {
+				queueInformation.put(line, new QueueInformation());
+			}
+
+			// Get info about each queue
+			for (String queue : queueInformation.keySet()) {
+
+				command = new String[] {
+						((Properties) this.getProperties()).getConfigCommand(),
+						"-sq", queue };
+				p = Runtime.getRuntime().exec(command);
+				try {
+					p.waitFor();
+				} catch (InterruptedException e) {
+					logger.error("QueuePoller command interrupted");
+					logger.error(e.getMessage());
+				}
+				is = p.getInputStream();
+				isr = new InputStreamReader(is);
+				br = new BufferedReader(isr);
+
+				while ((line = br.readLine()) != null) {
+					String results[] = line.split("\\s+", 2);
+					queueInformation.get(queue).addData(results);
+				}
+			}
+		} catch (IOException e) {
+			logger.error("QueuePoller command interrupted");
+			logger.error(e.getMessage());
+			logger.error(e.getStackTrace());
+		}
+	}
+
+	/**
+	 * getAllQueues - Get a list of queues in a list
+	 * @return ArrayList<String> of queues
+	 */
+	public ArrayList<String> getAllQueues() {
+		ArrayList<String> result = new ArrayList<String>();
+		for (String s : queueInformation.keySet()) {
+			result.add(s);
+		}
+		return result;
+	}
+
+	/**
+	 * getCMDArray - Return poll command
+	 * @return String array contains poll command and flags
+	 */
+	protected synchronized String[] getCMDArray() {
+		if (CMDARRAY == null) {
+			CMDARRAY = getProperties().getPollCommand().split(" ");
+		}
+		return CMDARRAY;
+	}
+
+	/**
+	 * Return queue information for a requested queue
+	 * @param queue
+	 *            String of queue name
+	 * @return QueueInformation for requested queue
+	 */
+	public QueueInformation getQueueInformation(String queue) {
+		return queueInformation.get(queue);
+	}
+
+	/**
+	 * isValidQueue - Determine if queue is valid on this system
+	 * @param queue Queue name
+	 * @return True if queue exists, false otherwise
+	 */
+	public boolean isValidQueue(String queue) {
+		if (queueInformation.keySet().contains(queue))
+			return true;
+		else
+			return false;
+	}
+
+	/**
+	 * processStderr - defines how to handle errors from the queue poller 
+	 * @param InputStream
+	 */
+	protected void processStderr(InputStream is) throws IOException {
+		String error = new Scanner(is).useDelimiter("\\A").next();
+		if(logger.isDebugEnabled()) {
+			logger.debug("QueuePoller error: " + error);
+		}
+	}
+
+	/**
+	 * processStdout - Process poller output and determine the status of jobs
+	 * Uses XML to parse, which requires SGE 6.0 or later
+	 * @param InputStream is - stream representing output
+	 */
+	protected void processStdout(InputStream is) throws IOException {
+		try {
+			String xml = new Scanner(is).useDelimiter("\\A").next();
+			if(logger.isDebugEnabled()) {
+				logger.debug("QueuePoller XML: " + xml);
+			}	
+			InputStream is_copy = new ByteArrayInputStream(xml.getBytes());
+			doc = builder.parse(is_copy);
+			processed.clear();
+			NodeList nodes = doc.getElementsByTagName("job_list");
+			Job tmpJob;
+
+			for (int i = 0; i < nodes.getLength(); i++) {
+				Element element = (Element) nodes.item(i);
+				NodeList nodeList = element.getElementsByTagName("JB_job_number");
+				Element line = (Element) nodeList.item(0);
+				String jobid = getDataFromElement(line);
+				tmpJob = getJob(jobid);
+
+				if (tmpJob == null) {
+					continue;
+				}
+
+				processed.add(jobid);
+				nodeList = element.getElementsByTagName("state");
+				line = (Element) nodeList.item(0);
+				String state = getDataFromElement(line);
+
+				if (state.contains("q") || state.contains("w")) {
+					if (logger.isDebugEnabled()) {
+						logger.debug(jobid + " is queued");
+					}
+					tmpJob.setState(Job.STATE_QUEUED);
+				} else if (state.contains("r")) {
+					if (logger.isDebugEnabled()) {
+						logger.debug(jobid + " is running");
+					}
+					tmpJob.setState(Job.STATE_RUNNING);
+				} else if (state.contains("E")) {
+					tmpJob.fail("Job is in an error state. Try running qstat -j "
+						+ jobid + " to see why.");
+				}
+			}
+
+			Iterator i = getJobs().entrySet().iterator();
+			while (i.hasNext()) {
+				Map.Entry e = (Map.Entry) i.next();
+				String id = (String) e.getKey();
+				Job job = (Job) e.getValue();
+				if (!processed.contains(id)) {
+					if (logger.isDebugEnabled()) {
+						logger.debug(id + " is done");
+					}
+					job.setState(Job.STATE_DONE);
+					if (job.getState() == Job.STATE_DONE) {
+						addDoneJob(id);
+					} else {
+						// at least on Ranger the job is done long
+						// before qstat reports it as done, so check
+						// if the exit code file is there
+						File f = new File(job.getExitcodeFileName());
+						if (f.exists()) {
+							job.setState(Job.STATE_DONE);
+							if (job.getState() == Job.STATE_DONE) {
+								addDoneJob(id);
+							}
+						}
+                   }
+				}
+			}
+		}
+		catch (Exception e) {
+			if(logger.isDebugEnabled()) {
+				logger.debug("Exception in processStdout");
+				logger.debug(e.getStackTrace());
+			}
+		}
+	}
 }
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/Properties.java	(revision 3342)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/Properties.java	(working copy)
@@ -9,22 +9,25 @@
  */
 package org.globus.cog.abstraction.impl.scheduler.sge;
 
-import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
 
 public class Properties extends AbstractProperties {
-	private static Logger logger = Logger.getLogger(Properties.class);
 
+	private static final long serialVersionUID = 1L;
 	public static final String PROPERTIES = "provider-sge.properties";
-	
 	public static final String POLL_INTERVAL = "poll.interval";
 	public static final String QSUB = "qsub";
 	public static final String QSTAT = "qstat";
 	public static final String QDEL = "qdel";
+	public static final String QCONF = "qconf";
 	public static final String DEFAULT_PE = "parallel.environment"; 
-
+	public static final String SUBMISSION_DELAY = "submission.delay";
 	private static Properties properties;
 
+	/**
+	 * getProperties - return properties
+	 * @return Properties object representing SGE properties
+	 */
 	public static synchronized Properties getProperties() {
 		if (properties == null) {
 			properties = new Properties();
@@ -33,33 +36,88 @@
 		return properties;
 	}
 
-	protected void setDefaults() {
-		setPollInterval(10);
-		setSubmitCommand("qsub");
-		setPollCommand("qstat");
-		setRemoveCommand("qdel");
-		setDefaultPE("threaded");
+	/**
+	 * Get the config command
+	 * @return String with config command
+	 */
+	public String getConfigCommand() {
+		return QCONF;
 	}
+	/**
+	 * getDefaultPE - Get the default parallel environment
+	 * @return String containing pe
+	 */
+	public String getDefaultPE() {
+	    return getProperty(DEFAULT_PE);
+	}
 
+	/**
+	 * getPollCommandName - Get poll command name
+	 * @return String containing poll command
+	 */
 	public String getPollCommandName() {
 		return QSTAT;
 	}
 
-
+	/**
+	 * getRemoveCommandName - Get remove command
+	 * @return String of command on how to remove a job
+	 */
 	public String getRemoveCommandName() {
 		return QDEL;
 	}
 
-
+	/**
+	 * getSubmissionDelay - Get length to sleep before submitting a job
+	 * Value as a string representing milliseconds
+	 * @return Submission delay as String in milliseconds
+	 */
+	public String getSubmissionDelay() {
+		return getProperty(SUBMISSION_DELAY);
+	}
+	
+	/**
+	 * getSubmitCommandName - Get submit command
+	 * @return String of submit command
+	 */
 	public String getSubmitCommandName() {
 		return QSUB;
 	}
-	
+
+	/**
+	 * Set the default config command
+	 * Used for gathering information about the queues
+	 * @param config String with command name
+	 */
+	public void setConfigCommand(String config) {
+		setProperty(QCONF, config);
+	}
+	/**
+	 * setDefaultPE - set the default parallel environment
+	 * @param pe String representing pe
+	 */
 	public void setDefaultPE(String pe) {
 	    setProperty(DEFAULT_PE, pe);
 	}
-	
-	public String getDefaultPE() {
-	    return getProperty(DEFAULT_PE);
+
+	/**
+	 * setDefault - Reset all SGE options to default
+	 */
+	protected void setDefaults() {
+		setPollInterval(10);
+		setSubmitCommand("qsub");
+		setPollCommand("qstat -xml");
+		setRemoveCommand("qdel");
+		setDefaultPE("threaded");
+		setConfigCommand("qconf");
+		setSubmissionDelay("0");
 	}
+
+	/**
+	 * setSubmissionDelay - set the submission delay
+	 * @param delay String representing milliseconds to sleep 
+	 */
+	private void setSubmissionDelay(String delay) {
+		setProperty(SUBMISSION_DELAY, delay);
+	}
 }
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/SGEExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/SGEExecutor.java	(revision 3342)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/sge/SGEExecutor.java	(working copy)
@@ -4,15 +4,16 @@
 //This message may not be removed or altered.
 //----------------------------------------------------------------------
 
-/*
- * Created on Oct 11, 2005
- */
 package org.globus.cog.abstraction.impl.scheduler.sge;
 
 import java.io.BufferedReader;
 import java.io.CharArrayReader;
+import java.io.File;
 import java.io.IOException;
 import java.io.Writer;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Matcher;
@@ -24,21 +25,279 @@
 import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
 import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
 import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
 import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.impl.scheduler.pbs.PBSExecutor;
 import org.globus.cog.abstraction.interfaces.FileLocation;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
 import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.gsi.gssapi.auth.AuthorizationException;
+import org.ietf.jgss.GSSException;
 
+/**
+ * Java CoG interface for Sun/Oracle Grid Engine
+ */
 public class SGEExecutor extends AbstractExecutor {
-    public static final Logger logger = Logger.getLogger(SGEExecutor.class);
 
+    public static final Pattern JOB_ID_LINE = Pattern.compile(".*[Yy]our job (\\d+) \\(.*\\) has been submitted");
+	public static final Logger logger = Logger.getLogger(SGEExecutor.class);
+    private static QueuePoller poller;
+    private static final String[] QSUB_PARAMS = new String[] {};
+    private static int unique = 0; 
+    private static NumberFormat IDF = new DecimalFormat("000000");
+    
     public SGEExecutor(Task task, ProcessListener listener) {
         super(task, listener);
+        verifyQueueInformation();
     }
+    
+    /**
+     * Create a new Job
+     * @param jobid - String representing SGE job ID
+     * @param stdout
+     * @param stdOutputLocation
+     * @param stderr
+     * @param stdErrorLocation
+     * @param exitcode
+     * @param executor
+     */
+    protected Job createJob(String jobid, String stdout,
+            FileLocation stdOutputLocation, String stderr,
+            FileLocation stdErrorLocation, String exitcode,
+            AbstractExecutor executor) {
+        return new Job(jobid, stdout, stdOutputLocation, stderr,
+            stdErrorLocation, exitcode, executor);
+    }
 
+    /**
+     * Return additional submit parameters
+     * @return String representing qsub parameters
+     */
+    protected String[] getAdditionalSubmitParameters() {
+        return QSUB_PARAMS;
+    }
+
+    /**
+     * Get an attribute from a job specification
+     * If the attribute does not exist, return the default value
+     * @param spec JobSpecification to search
+     * @param name Attribute to search for
+     * @param defaultValue This value is return if the attribute is not found
+     * @return String representing an attribute (if found) or the default value
+     */
+    private String getAttribute(JobSpecification spec, String name,
+            String defaultValue) {
+        Object value = spec.getAttribute(name);
+        if (value == null) {
+            return defaultValue;
+        }
+        else {
+            return value.toString();
+        }
+    }
+
+    /** 
+     * getName - Return the name of this provider
+     * @return String representing provider name
+     */
+    protected String getName() {
+        return "SGE";
+    }
+    
+    /**
+     * getProperties - Return SGE properties
+     * @return Properties as an AbstractProperties object
+     */
+    protected AbstractProperties getProperties() {
+        return Properties.getProperties();
+    }
+
+    /**
+     * getQueuePoller - return the Queue Poller
+     * @return AbstractQueuePoller
+     */
+    protected AbstractQueuePoller getQueuePoller() {
+        synchronized (SGEExecutor.class) {
+            if (poller == null) {
+                poller = new QueuePoller(getProperties());
+                poller.start();
+            }
+            return poller;
+        }
+    }
+
+    /**
+     * getSGEProperties - Return SGE properties
+     * @return Properties as a Properties object
+     */
+    protected Properties getSGEProperties() {
+        return (Properties) getProperties();
+    }
+
+    /**
+     * Create SGE job name
+     * @param task
+     * @return String containing task name
+     */
+	private String makeName(Task task) {
+		String name = task.getName();
+		if (name == null) {
+			int i = 0;
+			synchronized(SGEExecutor.class) {
+				i = unique++;
+			}
+			name = "cog-" + IDF.format(i);
+		}
+		else if (name.length() > 15) {
+		    name = name.substring(0, 15);
+		}
+		if (logger.isDebugEnabled()) {
+		    logger.debug("SGE name: for: " + task.getName() + 
+                         " is: " + name);
+		}
+		return name;
+	}
+    
+    /**
+     * parseSubmitCommandOutput - Given qsub output, return job ID
+     * @param out - String that contains qsub output
+     * @return String containing job id
+     * @throws IOException
+     */
+    protected String parseSubmitCommandOutput(String out) throws IOException {
+        // > your job 2494189 ("t1.sub") has been submitted
+        BufferedReader br = new BufferedReader(new CharArrayReader(out.toCharArray()));
+        String line = br.readLine();
+        while (line != null) {
+            Matcher m = JOB_ID_LINE.matcher(line);
+            if (m.matches()) {
+                String id = m.group(1);
+                if (logger.isInfoEnabled()) {
+                    logger.info("Job id from qsub: " + id);
+                }
+                return id;
+            }
+            line = br.readLine();
+        }
+        throw new IOException("None of the qsub lines matches the required patten: " + JOB_ID_LINE);
+    }
+
+    /**
+     * @see AbstractExecutor#start()
+     */
+    public void start() throws AuthorizationException, IOException, ProcessException {
+    	try {
+    		Thread.sleep(Integer.valueOf(getSGEProperties().getSubmissionDelay()));
+    	}
+    	catch (InterruptedException e) {
+    		logger.error(e.getStackTrace());
+    	}
+    	super.start();
+    }
+    
+    /**
+     * Check that job specification values are valid for this system
+     * @throws IllegalArgumentException
+     */
+    private void verifyQueueInformation() {
+
+    	// A queue must be defined in order to gather information about it
+    	poller = (QueuePoller) getQueuePoller();
+    	JobSpecification spec = getSpec();
+        String queue = (String) spec.getAttribute("queue");
+        if(queue == null) {
+        	logger.error("Error: No queue defined");
+        	return;
+        }
+
+    	QueueInformation qi = poller.getQueueInformation(queue);
+    	String error="";
+        
+        // Verify the queue is available
+    	if(!poller.isValidQueue(queue)) {
+    		error = "Invalid queue \"" + queue + "\"\nAvailable queues are: ";
+    		for(String s : poller.getAllQueues()) {
+    			error += s + " ";
+    		}
+    		logger.error(error);
+    		return;
+    	}
+    	                
+        // Check that pe is defined
+        String pe = (String) spec.getAttribute("pe");
+        if(pe == null) {
+        	error = "Error: No parallel environment specified";
+        	logger.error(error);
+        	return;
+        }
+        
+        // Check that pe is available for the queue
+        if(!qi.getPe_list().contains(pe)) {
+        	error = "Parallel environment " + pe + " is not valid for " + queue + " queue\n";
+        	error += "Valid PEs are: ";
+        	for(String s : qi.getPe_list()) {
+        		error += s + " ";
+        	}
+        	logger.error(error);
+        	return;
+        }
+     
+        // Check that requested walltime fits into time limits
+        String maxWalltimeAttribute = (String) spec.getAttribute("maxwalltime");
+        if(maxWalltimeAttribute != null) {
+        	int requestedWalltimeSeconds = WallTime.timeToSeconds(maxWalltimeAttribute);
+        	String queueWalltimeString = qi.getWalltime();
+        	if(!queueWalltimeString.equalsIgnoreCase("INFINITY")) {
+        		int queueWalltime = WallTime.timeToSeconds(queueWalltimeString);
+        		if(requestedWalltimeSeconds > queueWalltime) {
+        			error = "Requested wall time of " + requestedWalltimeSeconds
+        				+ " seconds is greater than queue limit of " + queueWalltime;
+        		}
+            }        	
+        }
+        
+        // Give a warning if CPUs are being underutilized (this may be intentional due to memory restrictions)
+        Object jobsPerNodeAttribute = spec.getAttribute("jobsPerNode");
+        if(jobsPerNodeAttribute != null) {
+        	String jobsPerNode = String.valueOf(jobsPerNodeAttribute);
+        	if(Integer.valueOf(jobsPerNode) < qi.getSlots()) {
+        		if(logger.isInfoEnabled()) {
+        			logger.info("Requesting only " + jobsPerNode + "/" + qi.getSlots() + " CPUs per node");
+        		}
+        	}
+        }
+    }
+    
+
+    /**
+     * writeAttr - Write a specification attribute to a submit file
+     * If the attribute is not found, use null
+     * @param attrName Specification attribute to write
+     * @param arg The SGE argument (eg. -N for the job name)
+     * @param wr A Writer object representing the submit file
+     * @throws IOException
+     */
+    protected void writeAttr(String attrName, String arg, Writer wr)
+            throws IOException {
+    	
+    	writeAttr(attrName, arg, wr, null);
+    }
+
+
+
+    /**
+     * writeAttr - Write a specification attribute to a submit file
+     * 
+     * @param attrName Specification attribute to write
+     * @param arg The SGE argument (eg. -N for the job name)
+     * @param wr A Writer object representing the submit file
+     * @param defaultValue If the requested attribute is not found, use this default value
+     * @throws IOException
+     */
     protected void writeAttr(String attrName, String arg, Writer wr,
             String defaultValue) throws IOException {
-        Object value = getSpec().getAttribute(attrName);
+
+    	Object value = getSpec().getAttribute(attrName);
         if (value != null) {
             wr.write("#$ " + arg + String.valueOf(value) + '\n');
         }
@@ -47,42 +306,64 @@
         }
     }
 
-    protected void writeAttr(String attrName, String arg, Writer wr)
+    /**
+     * writeMultiJobPreamble - Add multiple jobs to a single submit file
+     * @param wr Writer A Writer object representing the submit file
+     * @param exitcodefile Filename where application exit code should be written
+     * @throws IOException
+     */
+    protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
             throws IOException {
-        writeAttr(attrName, arg, wr, null);
+        wr.write("NODES=`cat $PE_HOSTFILE | awk '{ for(i=0;i<$2;i++){print $1} }'`\n");
+        wr.write("ECF=" + exitcodefile + "\n");
+        wr.write("INDEX=0\n");
+        wr.write("for NODE in $NODES; do\n");
+        wr.write("  echo \"N\" >$ECF.$INDEX\n");
+        wr.write("  ssh $NODE /bin/bash -c \\\" \"");
     }
 
-    protected void writeWallTime(Writer wr) throws IOException {
-        Object walltime = getSpec().getAttribute("maxwalltime");
-        if (walltime != null) {
-            wr.write("#$ -l h_rt="
-                    + WallTime.normalize(walltime.toString(), "sge-native")
-                    + '\n');
-        }
-    }
 
+    /**
+     * writeScript - Write the SGE submit script
+     * @param wr A Writer object representing the submit file
+     * @param exitcodefile Filename where exit code will be written
+     * @param stdout Filename where standard output will be written
+     * @param stderr Filename where standard error will be written
+     * @throws IOException
+     */
     protected void writeScript(Writer wr, String exitcodefile, String stdout,
             String stderr) throws IOException {
-        Task task = getTask();
+
+    	Task task = getTask();
         JobSpecification spec = getSpec();
+		
         String type = (String) spec.getAttribute("jobType");
         boolean multiple = false;
         if ("multiple".equals(type)) {
             multiple = true;
         }
-
+        
+        int count = Integer.valueOf(getAttribute(spec, "count", "1"));
+        String queue = (String)spec.getAttribute("queue");
+        
+        int coresPerNode = Integer.valueOf(getAttribute(spec, "coresPerNode", 
+        		String.valueOf(poller.getQueueInformation(queue).getSlots())));
+        int jobsPerNode = Integer.valueOf(getAttribute(spec, "jobsPerNode", 
+        		String.valueOf(coresPerNode)));
+        int coresToRequest = ( count * jobsPerNode + coresPerNode - 1) / coresPerNode * coresPerNode;
+   
         wr.write("#!/bin/bash\n");
-        wr.write("#$ -N " + task.getName() + '\n');
-        // ranger requires this. might as well be default
+        wr.write("#$ -N " + makeName(task) + '\n');
         wr.write("#$ -V\n");
         writeAttr("project", "-A ", wr);
-
-        writeAttr("count", "-pe "
-                + getAttribute(spec, "pe", getSGEProperties().getDefaultPE())
-                + " ", wr, "1");
-
+    	writeAttr("queue", "-q ", wr);
+                
+        String peValue = "-pe " +  getAttribute(spec, "pe", getSGEProperties().getDefaultPE()) + " ";
+        writeAttr("null", peValue, wr, String.valueOf(coresToRequest));
+        	
         writeWallTime(wr);
-        writeAttr("queue", "-q ", wr);
+        writeSoftWallTime(wr);
+        
         if (spec.getStdInput() != null) {
             wr.write("#$ -i " + quote(spec.getStdInput()) + '\n');
         }
@@ -91,9 +372,9 @@
 
         if (!spec.getEnvironmentVariableNames().isEmpty()) {
             wr.write("#$ -v ");
-            Iterator i = spec.getEnvironmentVariableNames().iterator();
+            Iterator<String> i = spec.getEnvironmentVariableNames().iterator();
             while (i.hasNext()) {
-                String name = (String) i.next();
+                String name = i.next();
                 wr.write(name);
                 wr.write('=');
                 wr.write(quote(spec.getEnvironmentVariable(name)));
@@ -107,6 +388,7 @@
         if (logger.isDebugEnabled()) {
             logger.debug("Job type: " + type);
         }
+        
         if (type != null) {
             String wrapper = Properties.getProperties().getProperty(
                 "wrapper." + type);
@@ -122,6 +404,21 @@
                 logger.debug("Wrapper after variable substitution: " + wrapper);
             }
         }
+
+        wr.write("\nwrite_exitcode()\n");
+        wr.write("{\n");
+        wr.write("echo $1 > " + exitcodefile + "\n");
+        wr.write("exit 0\n"); 
+        wr.write("}\n\n");
+
+        wr.write("# Trap all signals\n");
+        wr.write("SIGNAL=1\n");
+        wr.write("while [ $SIGNAL -le 30 ];\n");
+        wr.write("do\n");
+        wr.write("   trap \"echo Received signal $SIGNAL; write_exitcode $SIGNAL\" $SIGNAL\n");
+        wr.write("   (( SIGNAL+=1 ))\n");
+        wr.write("done\n\n");
+
         if (spec.getDirectory() != null) {
             wr.write("cd " + quote(spec.getDirectory()) + " && ");
         }
@@ -129,11 +426,12 @@
         if (multiple) {
             writeMultiJobPreamble(wr, exitcodefile);
         }
+        
         wr.write(quote(spec.getExecutable()));
-        List args = spec.getArgumentsAsList();
+        List<String> args = spec.getArgumentsAsList();
         if (args != null && args.size() > 0) {
             wr.write(' ');
-            Iterator i = args.iterator();
+            Iterator<String> i = args.iterator();
             while (i.hasNext()) {
                 wr.write(quote((String) i.next()));
                 if (i.hasNext()) {
@@ -145,92 +443,41 @@
         if (spec.getStdInput() != null) {
             wr.write(" < " + quote(spec.getStdInput()));
         }
+
         if (multiple) {
             writeMultiJobPostamble(wr);
+        } else {
+            wr.write(" &\n");
+            wr.write("wait $!\n");
+            wr.write("write_exitcode $?\n");
         }
-        else {
-            wr.write('\n');
-            wr.write("/bin/echo $? >" + exitcodefile + '\n');
-        }
         wr.close();
     }
-    
-    protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
-            throws IOException {
-        wr.write("NODES=`cat $PE_HOSTFILE | awk '{ for(i=0;i<$2;i++){print $1} }'`\n");
-        wr.write("ECF=" + exitcodefile + "\n");
-        wr.write("INDEX=0\n");
-        wr.write("for NODE in $NODES; do\n");
-        wr.write("  echo \"N\" >$ECF.$INDEX\n");
-        wr.write("  ssh $NODE /bin/bash -c \\\" \"");
-    }
-
-    private String getAttribute(JobSpecification spec, String name,
-            String defaultValue) {
-        Object value = spec.getAttribute(name);
-        if (value == null) {
-            return defaultValue;
+ 
+    /**
+     * writeWallTime - Convert time into correct format and write to submit file
+     * Use maxtime first is available, otherwise use maxwalltime
+     * @param wr Writer A Writer object representing the submit file
+     * @throws IOException
+     */
+    protected void writeWallTime(Writer wr) throws IOException {
+    	Object walltime = getSpec().getAttribute("maxwalltime");
+        if (walltime != null) {
+            wr.write("#$ -l h_rt="
+                    + WallTime.normalize(walltime.toString(), "sge-native")
+                    + '\n');
         }
-        else {
-            return value.toString();
-        }
     }
-
-    protected String getName() {
-        return "SGE";
-    }
-
-    protected AbstractProperties getProperties() {
-        return Properties.getProperties();
-    }
     
-    protected Properties getSGEProperties() {
-        return (Properties) getProperties();
+    protected void writeSoftWallTime(Writer wr) throws IOException {
+    	String walltime = (String)getSpec().getAttribute("maxwalltime");
+        if (walltime != null) {
+        	int walltimeSeconds = WallTime.timeToSeconds(walltime) - 10;
+        	wr.write("#$ -l s_rt="
+        	        + WallTime.format("sge-native", walltimeSeconds)
+        	        + '\n');
+        }
     }
     
-    public static final Pattern JOB_ID_LINE = Pattern.compile(".*[Yy]our job (\\d+) \\(.*\\) has been submitted");
 
-    protected String parseSubmitCommandOutput(String out) throws IOException {
-        // > your job 2494189 ("t1.sub") has been submitted
-        BufferedReader br = new BufferedReader(new CharArrayReader(out.toCharArray()));
-        String line = br.readLine();
-        while (line != null) {
-            Matcher m = JOB_ID_LINE.matcher(line);
-            if (m.matches()) {
-                String id = m.group(1);
-                if (logger.isInfoEnabled()) {
-                    logger.info("Job id from qsub: " + id);
-                }
-                return id;
-            }
-            line = br.readLine();
-        }
-        throw new IOException("None of the qsub lines matches the required patten: " + JOB_ID_LINE);
-    }
-
-    private static final String[] QSUB_PARAMS = new String[] {};
-
-    protected String[] getAdditionalSubmitParameters() {
-        return QSUB_PARAMS;
-    }
-
-    protected Job createJob(String jobid, String stdout,
-            FileLocation stdOutputLocation, String stderr,
-            FileLocation stdErrorLocation, String exitcode,
-            AbstractExecutor executor) {
-        return new Job(jobid, stdout, stdOutputLocation, stderr,
-            stdErrorLocation, exitcode, executor);
-    }
-
-    private static QueuePoller poller;
-
-    protected AbstractQueuePoller getQueuePoller() {
-        synchronized (SGEExecutor.class) {
-            if (poller == null) {
-                poller = new QueuePoller(getProperties());
-                poller.start();
-            }
-            return poller;
-        }
-    }
 }



More information about the Swift-commit mailing list