[Swift-commit] cog r3535

swift at ci.uchicago.edu swift at ci.uchicago.edu
Tue Jan 1 22:45:06 CST 2013

r3535 | davidkelly999 | 2013-01-01 22:43:29 -0600 (Tue, 01 Jan 2013) | 2 lines

A real slurm provider that uses sbatch/srun, scancel, and squeue directly

Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/SlurmExecutor.java
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/SlurmExecutor.java	(revision 3534)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/SlurmExecutor.java	(working copy)
@@ -4,9 +4,7 @@
 import java.io.Writer;
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
-import java.util.Collection;
 import java.util.Date;
-import java.util.Iterator;
 import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.impl.common.execution.WallTime;
@@ -22,336 +20,189 @@
 public class SlurmExecutor extends AbstractExecutor {
 	public static final Logger logger = Logger.getLogger(SlurmExecutor.class);
-	/**
-	   Number of program invocations
-	 */
-	int count = 1;
-	/**
-	   PBS processes-per-node
-	 */
-	int ppn = 1;
-	/**
-     PBS mppdepth: number of available threads per node
-	 */
-	int depth = 1;
-	/**
-	   Unique number for automatic task names
-	*/
-	private static int unique = 0; 
+	// Number of program invocations
+	private int count = 1;
+	private static NumberFormat IDF = new DecimalFormat("000000");
+	// Used for task name generation
+	private static int unique = 0;
 	public SlurmExecutor(Task task, ProcessListener listener) {
 		super(task, listener);
-	private static NumberFormat IDF = new DecimalFormat("000000");
-	/** 
-	    The job name is limited to 15 characters: 
-		http://doesciencegrid.org/public/pbs/qsub.html
+	/**
+	 * Write attribute if non-null
+	 * @throws IOException
-	protected void validate(Task task) {
-		String name = task.getName();
-		if (name == null) {
-		    int i = 0;
-          synchronized(SlurmExecutor.class) {
-              i = unique++;
-          }
-          name = "cog-" + IDF.format(i);
-          if (logger.isDebugEnabled()) {
-              logger.debug("Slurm name: for: " + task.getIdentity() + 
-                       " is: " + name);
-          }
-		}
-		else if (name.length() > 15) {
-		    task.setName(name.substring(0, 15));
-		}
-	}
-	/** 
-     Write attribute if non-null
-     @throws IOException
-	 */
 	protected void writeAttr(String attrName, String arg, Writer wr)
-	throws IOException {
+			throws IOException {
 		Object value = getSpec().getAttribute(attrName);
 		if (value != null) {
-			wr.write("#PBS " + arg + String.valueOf(value) + '\n');
+			wr.write("#SBATCH " + arg + "=" + String.valueOf(value) + '\n');
-	/** 
-     Write attribute if non-null and non-empty
-     @throws IOException
+	/**
+	 * Write attribute if non-null and non-empty 
+	 * @throws IOException
-	protected void writeNonEmptyAttr(String attrName, String arg, 
-	                                 Writer wr)
-	throws IOException {
+	protected void writeNonEmptyAttr(String attrName, String arg, Writer wr)
+			throws IOException {
 		Object value = getSpec().getAttribute(attrName);
 		if (value != null) {
 			String v = String.valueOf(value);
-			if (v.length() > 0 )
-				wr.write("#PBS " + arg + v + '\n');
+			if (v.length() > 0)
+				wr.write("#SBATCH " + arg + "=" + v + '\n');
+	/**
+	 * Write walltime in hh:mm:ss
+	 * @param wr
+	 * @throws IOException
+	 */
 	protected void writeWallTime(Writer wr) throws IOException {
 		Object walltime = getSpec().getAttribute("maxwalltime");
 		if (walltime != null) {
-			wr.write("#PBS -l walltime="
+			wr.write("#SBATCH --time="
 					+ WallTime.normalize(walltime.toString(), "pbs-native")
 					+ '\n');
-	private int parseAndValidateInt(Object obj, String name) {
-	    try {
-	        assert(obj != null);
-	        return Integer.parseInt(obj.toString());
-	    }
-	    catch (NumberFormatException e) {
-	        throw new IllegalArgumentException("Illegal value for " + name + ". Must be an integer.");
-	    }
+	/**
+	 * Ensure tasks have a valid name
+	 */
+	protected void validate(Task task) {
+		String name = task.getName();
+		if (name == null) {
+			int i = 0;
+			synchronized (SlurmExecutor.class) {
+				i = unique++;
+			}
+			name = "cog-" + IDF.format(i);
+			task.setName(name);
+		} else if (name.length() > 15) {
+			task.setName(name.substring(0, 15));
+		}
+		if (logger.isDebugEnabled()) {
+			logger.debug("Slurm name: for: " + task.getIdentity() + " is: " + name);
+		}
-	   Obtains profile settings regarding job size from
-	   JobSpecification and writes them into the PBS file.
-	   Looks for profiles count, ppn, ppts, and pbs.mpp
-	   count: mandatory, default 1 (number of processes)
-	   depth: default 1 (number of threads per node)
-	   ppn: optional, default 1 (processes per node)
-	   pbs.mpp: output mppwidth/mppnppn instead of nodes/ppn
-	   pbs.properties: extra PBS properties
-	   pbs.resource_list: extra PBS -l line
-	   Note that the semantics are different for the pbs.mpp setting:
-	   mppwidth is the total number of cores while nodes is the number
-	   of nodes.
-	   http://www.clusterresources.com/torquedocs/2.1jobsubmission.shtml
-	   @return true if this is a multi-core job
+	 * Verify that an object contains a valid int
+	 * @param obj
+	 * @param name
+	 * @return
-	protected boolean writeCountAndPPN(JobSpecification spec,
-	                                   Writer wr)
-	throws IOException {
-	    boolean result = false;
-	    Object o;
-	    // Number of program invocations
-	    o = getSpec().getAttribute("count");
-	    if (o != null)
-	        count = parseAndValidateInt(o, "count");
-	    if (count != 1)
-	        result = true;
-	    o = spec.getAttribute("ppn");
-	    if (o != null)
-	        ppn = parseAndValidateInt(o, "ppn");
-          o = spec.getAttribute("depth");
-          if (o != null)
-              depth = parseAndValidateInt(o, "depth");
-	    String slurmProperties =
-	        (String) getSpec().getAttribute("slurm.properties");
-	    boolean mpp = false;
-      if (spec.getAttribute("pbs.mpp") != null)
-          mpp = true;
-	    StringBuilder sb = new StringBuilder(512);
-	    sb.append("#PBS -l ");
-	    if (mpp) {
-	        sb.append("mppwidth=").append(count);
-	        sb.append(",");
-	        sb.append("mppnppn=").append(ppn);
-	        sb.append(",");
-	        sb.append("mppdepth=").append(depth);
-	    }
-	    else {
-	        sb.append("nodes=");
-	        sb.append(count);
-	        sb.append(":");
-	        sb.append("ppn=");
-	        sb.append(ppn);
-	    }
-	    if (slurmProperties != null &&
-	        slurmProperties.length() > 0 ) {
-	        sb.append(":");
-	        sb.append(slurmProperties);
-	    }
-	    sb.append('\n');
-	    wr.write(sb.toString());
-	    return result;
+	private int parseAndValidateInt(Object obj, String name) {
+		try {
+			assert (obj != null);
+			return Integer.parseInt(obj.toString());
+		} 
+		catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Illegal value for " + name + ". Must be an integer.");
+		}
-	/*
-	private boolean parseAndValidateBool(Object obj, String name)
-	{
-	    try {
-	        return Boolean.parseBoolean(obj.toString());
-	    }
-	    catch (NumberFormatException e) {
-	        throw new IllegalArgumentException
-	        ("Illegal value for " + name + ". Must be true/false.");
-	    }
-	}
-	*/
-	protected void writeScript(Writer wr, String exitcodefile, String stdout,
-	                           String stderr) 
-	throws IOException {
+	protected void writeScript(Writer wr, String exitcodefile, String stdout, String stderr) 
+			throws IOException {
 		Task task = getTask();
 		JobSpecification spec = getSpec();
 		Properties properties = Properties.getProperties();
+		validate(task);
+		writeHeader(wr);
-      getSpec().unpackProviderAttributes();
-      validate(task);
-      writeHeader(wr);
-		wr.write("#PBS -S /bin/bash\n");
-		wr.write("#PBS -N " + task.getName() + '\n');
-		wr.write("#PBS -m n\n");
-		writeNonEmptyAttr("project", "-A ", wr);
-		boolean multiple = writeCountAndPPN(spec, wr);
+		Object countValue = getSpec().getAttribute("count");
+		if (countValue != null)
+			count = parseAndValidateInt(countValue, "count");
+		wr.write("#SBATCH --job-name=" + task.getName() + '\n');
+		wr.write("#SBATCH --output=" + quote(stdout) + '\n');
+		wr.write("#SBATCH --error=" + quote(stderr) + '\n');
+		wr.write("#SBATCH --nodes=" + count + '\n');
+		wr.write("#SBATCH --exclusive\n");
+		wr.write("#SBATCH --ntasks-per-node=1\n");
+		writeNonEmptyAttr("ppn", "--cpus-per-task", wr);
+		writeNonEmptyAttr("project", "--account", wr);
+		writeNonEmptyAttr("queue", "--partition", wr);
-		writeNonEmptyAttr("queue", "-q ", wr);
-		wr.write("#PBS -o " + quote(stdout) + '\n');
-		wr.write("#PBS -e " + quote(stderr) + '\n');
+		wr.write("\n");
 		for (String name : spec.getEnvironmentVariableNames()) {
-			// "export" is necessary on the Cray XT5 Crow
-			wr.write("export ");
-			wr.write(name);
-			wr.write('=');
-			wr.write(quote(spec.getEnvironmentVariable(name)));
-			wr.write('\n');
+			wr.write("export " + name + '=' + quote(spec.getEnvironmentVariable(name)) + '\n');
-		if (spec.getEnvironmentVariableNames().size() > 0) {
-		    wr.write("#PBS -v " + makeList(spec.getEnvironmentVariableNames()) + '\n');
+		// Handle all slurm attributes specified by the user
+		for (String a : spec.getAttributeNames()) {
+			if (a != null && a.startsWith("slurm.")) {
+				String attributeName[] = a.split("slurm.");
+				wr.write("#SBATCH --" + attributeName[1] + " = " + spec.getAttribute(a) + '\n');
+			}
-		String resources =
-		    (String) spec.getAttribute("slurm.resource_list");
-		if (resources != null && resources.length() > 0) {
-		    if (logger.isDebugEnabled())
-		        logger.debug("slurm.resource_list: " + resources);
-		    wr.write("#PBS -l " + resources + '\n');
-		}
-		// aprun option specifically for Cray Beagle, Franklin
-		boolean aprun = false;
-		if (spec.getAttribute("pbs.aprun") != null)
-		    aprun = true;
 		String type = (String) spec.getAttribute("jobType");
 		if (logger.isDebugEnabled())
 			logger.debug("Job type: " + type);
-		if ("multiple".equals(type)) 
-		    multiple = true;
-		else if("single".equals(type))
-		    multiple = false;
-		if (aprun) 
-			multiple = false;
-		if (multiple)
-          writeMultiJobPreamble(wr, exitcodefile);
 		if (type != null) {
-			String wrapper =
-			    properties.getProperty("wrapper." + type);
+			String wrapper = properties.getProperty("wrapper." + type);
 			if (logger.isDebugEnabled()) {
 				logger.debug("Wrapper: " + wrapper);
 			if (wrapper != null) {
 				wrapper = replaceVars(wrapper);
 				wr.write(' ');
-			if (logger.isDebugEnabled()) {
+			if (logger.isDebugEnabled()) { 
 				logger.debug("Wrapper after variable substitution: " + wrapper);
+		wr.write("srun /bin/bash -c \'");
 		if (spec.getDirectory() != null) {
 			wr.write("cd " + quote(spec.getDirectory()) + " && ");
-		if (aprun)
-		    wr.write("aprun -n " + count + " -N 1 -cc none -d " +
-		             depth + " -F exclusive /bin/sh -c '");
 		writeQuotedList(wr, spec.getArgumentsAsList());
-		if (aprun)
-          wr.write("'");
 		if (spec.getStdInput() != null) {
-          wr.write(" < " + quote(spec.getStdInput()));
-      }
-		if (multiple) {
-		    writeMultiJobPostamble(wr);
+			wr.write(" < " + quote(spec.getStdInput()));
-		else {
-		    wr.write('\n');
-		    wr.write("/bin/echo $? >" + exitcodefile + '\n');
+		if("multiple".equals(type)) {
+			wr.write("; /bin/echo $? >" + exitcodefile + ".$SLURM_PROCID\'\n");
+		} else {
+			wr.write("; /bin/echo $? >" + exitcodefile + "\'\n");
-	void writeHeader(Writer writer)	
-	throws IOException {
+	void writeHeader(Writer writer) throws IOException {
+		writer.write("#!/bin/bash\n\n");
 		writer.write("#CoG This script generated by CoG\n");
 		writer.write("#CoG   by class: " + SlurmExecutor.class + '\n');
 		writer.write("#CoG   on date: " + new Date() + "\n\n");
-	private String makeList(Collection<String> names) {
-      StringBuilder sb = new StringBuilder();
-      Iterator<String> i = names.iterator();
-      while (i.hasNext()) {
-      	sb.append(i.next());
-      	if (i.hasNext()) {
-      		sb.append(", ");
-      	}
-      }
-      return sb.toString();
-  }
-	protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
-          throws IOException {
-      wr.write("NODES=`cat $PBS_NODEFILE`\n");
-      wr.write("ECF=" + exitcodefile + "\n");
-      wr.write("INDEX=0\n");
-      wr.write("for NODE in $NODES; do\n");
-      wr.write("  echo \"N\" >$ECF.$INDEX\n");
-      wr.write("  ssh $NODE /bin/bash -c \\\" \"");
-  }
-protected String getName() {
-		return "PBS";
+	protected String getName() {
+		return "Slurm";
-protected AbstractProperties getProperties() {
+	protected AbstractProperties getProperties() {
 		return Properties.getProperties();
-protected Job createJob(String jobid, String stdout,
+	protected Job createJob(String jobid, String stdout,
 			FileLocation stdOutputLocation, String stderr,
 			FileLocation stdErrorLocation, String exitcode,
 			AbstractExecutor executor) {
@@ -362,8 +213,8 @@
 	private static QueuePoller poller;
-protected AbstractQueuePoller getQueuePoller() {
-		synchronized(SlurmExecutor.class) {
+	protected AbstractQueuePoller getQueuePoller() {
+		synchronized (SlurmExecutor.class) {
 			if (poller == null) {
 				poller = new QueuePoller(getProperties());
@@ -371,4 +222,14 @@
 			return poller;
+    protected String parseSubmitCommandOutput(String out) throws IOException {
+        if ("".equals(out)) {
+            throw new IOException(getProperties().getSubmitCommandName()
+                    + " returned an empty job ID");
+        }
+        String outArray[] = out.split(" ");
+        return outArray[outArray.length-1].trim();
+    }
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/QueuePoller.java
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/QueuePoller.java	(revision 3534)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/QueuePoller.java	(working copy)
@@ -8,6 +8,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.log4j.Logger;
@@ -19,142 +20,96 @@
 	public static final Logger logger = Logger.getLogger(QueuePoller.class);
 	public static final int FULL_LIST_THRESHOLD = 16;
-	private Set processed;
+	private Set<String> processed;
 	public QueuePoller(AbstractProperties properties) {
-		super("PBS provider queue poller", properties);
-		processed = new HashSet();
+		super("Slurm provider queue poller", properties);
+		processed = new HashSet<String>();
 	private static String[] CMDARRAY;
 	protected synchronized String[] getCMDArray() {
-	    if (getJobs().size() <= FULL_LIST_THRESHOLD) {
-	        String[] cmda = new String[2 + getJobs().size()];
-	        cmda[0] = getProperties().getPollCommand();
-	        cmda[1] = "-f";
-	        int i = 2;
-	        for (Job j : getJobs().values()) {
-	            cmda[i++] = j.getJobID();
-	        }
-	        return cmda;
-	    }
-	    else {
-	        if (CMDARRAY == null) {
-	            CMDARRAY = new String[] { getProperties().getPollCommand(), "-f" };
-	        }
-	    }
+		if(CMDARRAY == null) {
+			if (getJobs().size() <= FULL_LIST_THRESHOLD) {
+				CMDARRAY = new String[4];
+				CMDARRAY[0] = getProperties().getPollCommand();
+				CMDARRAY[1] = "--noheader";
+				CMDARRAY[2] = "--jobs";
+				boolean first=true;
+				for (Job j : getJobs().values()) {
+					if(first) {
+						CMDARRAY[3] = j.getJobID();
+						first=false;
+					} else { 
+						CMDARRAY[3] += "," + j.getJobID();
+					}
+				}
+			} else {
+				CMDARRAY = new String[] { getProperties().getPollCommand(), "--noheader" };
+			}			
+		}
 		return CMDARRAY;
-    protected int getError(int ec, String stderr) {
-	    if (ec != 0) {
-    	    BufferedReader sr = new BufferedReader(new StringReader(stderr));
-    	    try {
-        	    String line = sr.readLine();
-        	    while (line != null) {
-        	        if (!line.contains("Unknown Job Id")) {
-        	            return ec;
-        	        }
-        	        line = sr.readLine();
-        	    }
-    	    }
-    	    catch (IOException e) {
-    	        // should not occur while reading from a string reader
-    	        e.printStackTrace();
-    	    }
-    	    return 0;
-	    }
-	    else {
-	        return ec;
-	    }
-    }
+	protected int getError(int ec, String stderr) {
+		if (ec != 0) {
+			BufferedReader sr = new BufferedReader(new StringReader(stderr));
+			try {
+				String line = sr.readLine();
+				while (line != null) {
+					if (!line.contains("Unknown Job Id")) {
+						return ec;
+					}
+					line = sr.readLine();
+				}
+			} catch (IOException e) {
+				// should not occur while reading from a string reader
+				e.printStackTrace();
+			}
+			return 0;
+		} else {
+			return ec;
+		}
+	}
-    protected void processStdout(InputStream is) throws IOException {
+	protected void processStdout(InputStream is) throws IOException {
 		BufferedReader br = new BufferedReader(new InputStreamReader(is));
-		processed.clear();
 		String line;
-		String currentJobID = null;
-		Job currentJob = null;
+		processed.clear();
 		do {
 			line = br.readLine();
-			if (line != null) {
-				try {
-					line = line.trim();
-					if (line.startsWith("Job Id: ")) {
-						currentJobID = line.substring("Job Id: ".length());
-						processed.add(currentJobID);
-						currentJob = getJob(currentJobID);
-						continue;
-					}
-					if (currentJob != null) {
-						if (line.startsWith("job_state = ")) {
-						    if (logger.isDebugEnabled()) {
-						        logger.debug("Status line: " + line);
-						    }
-							switch (line.substring("job_state = ".length())
-									.charAt(0)) {
-								case 'Q': {
-									if (logger.isDebugEnabled()) {
-										logger.debug("Status for "
-												+ currentJobID + " is Q");
-									}
-									currentJob.setState(Job.STATE_QUEUED);
-									break;
-								}
-								case 'R': {
-									if (logger.isDebugEnabled()) {
-										logger.debug("Status for "
-												+ currentJobID + " is R");
-									}
-									currentJob.setState(Job.STATE_RUNNING);
-									break;
-								}
-								case 'C': {
-									// for sites where keep_completed is there,
-									// don't wait
-									// for the job to be removed from the queue
-									if (logger.isDebugEnabled()) {
-										logger.debug("Status for "
-												+ currentJobID + " is C");
-									}
-									addDoneJob(currentJob.getJobID());
-									break;
-								}
-							}
-						}
-						else if (line.startsWith("exit_status = ")) {
-							try {
-								int ec = Integer.parseInt(line.substring(
-										"exit_status = ".length()).trim());
-								currentJob.setExitcode(ec);
-							}
-							catch (Exception e) {
-								if (logger.isDebugEnabled()) {
-									logger.debug("Could not parse exit_status",
-											e);
-								}
-							}
-						}
-					}
+			if(line != null) {
+				String words[] = line.split("\\s+");
+				String jobid = words[0].trim();
+				String state = words[4].trim();
+				if (jobid == null || jobid.equals("") || state == null || state.equals("")) {
+					throw new IOException("Failed to parse squeue line: " + line);
-				catch (Exception e) {
-					logger.warn("Exception caught while handling "
-							+ getProperties().getPollCommandName()
-							+ " output: " + line, e);
+				Job job = getJob(jobid);
+				if (job == null){ continue; }
+				processed.add(jobid);
+				if (state.equals("PD")) {
+					job.setState(Job.STATE_QUEUED);
+				else if(state.equals("R")) {
+					job.setState(Job.STATE_RUNNING);
+				}
 		} while (line != null);
-		Iterator i = getJobs().entrySet().iterator();
+		Iterator<Entry<String, Job>> i = getJobs().entrySet().iterator();
 		while (i.hasNext()) {
-			Map.Entry e = (Map.Entry) i.next();
-			String id = (String) e.getKey();
+			Map.Entry<String, Job> e = i.next();
+			String id = e.getKey();
 			if (!processed.contains(id)) {
-				Job job = (Job) e.getValue();
-				if (logger.isDebugEnabled()) {
-					logger.debug("Status for " + id + " is Done");
-				}
+				Job job = e.getValue();
 				if (job.getState() == Job.STATE_DONE) {
@@ -162,7 +117,7 @@
 	protected void processStderr(InputStream is) throws IOException {
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/Properties.java
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/Properties.java	(revision 3534)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/Properties.java	(working copy)
@@ -8,10 +8,9 @@
 	public static final String PROPERTIES = "provider-slurm.properties";
 	public static final String POLL_INTERVAL = "poll.interval";
-	public static final String QSUB = "qsub";
-	public static final String QSTAT = "qstat";
-	public static final String QDEL = "qdel";
-	public static final String USE_MPPWIDTH = "use.mppwidth";
+	public static final String SBATCH = "sbatch";
+	public static final String SQUEUE = "squeue";
+	public static final String SCANCEL = "scancel";
 	private static Properties properties;
@@ -25,23 +24,23 @@
 	protected void setDefaults() {
-		setSubmitCommand("qsub");
-		setPollCommand("qstat");
-		setRemoveCommand("qdel");
+		setSubmitCommand("sbatch");
+		setPollCommand("squeue");
+		setRemoveCommand("scancel");
 	public String getPollCommandName() {
-		return QSTAT;
+		return SQUEUE;
 	public String getRemoveCommandName() {
-		return QDEL;
+		return SCANCEL;
 	public String getSubmitCommandName() {
-		return QSUB;
+		return SBATCH;

More information about the Swift-commit mailing list