[Swift-commit] cog r3535
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Tue Jan 1 22:45:06 CST 2013
------------------------------------------------------------------------
r3535 | davidkelly999 | 2013-01-01 22:43:29 -0600 (Tue, 01 Jan 2013) | 2 lines
A real slurm provider that uses sbatch/srun, scancel, and squeue directly
------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/SlurmExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/SlurmExecutor.java (revision 3534)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/SlurmExecutor.java (working copy)
@@ -4,9 +4,7 @@
import java.io.Writer;
import java.text.DecimalFormat;
import java.text.NumberFormat;
-import java.util.Collection;
import java.util.Date;
-import java.util.Iterator;
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.execution.WallTime;
@@ -22,336 +20,189 @@
public class SlurmExecutor extends AbstractExecutor {
public static final Logger logger = Logger.getLogger(SlurmExecutor.class);
- /**
- Number of program invocations
- */
- int count = 1;
-
- /**
- PBS processes-per-node
- */
- int ppn = 1;
-
- /**
- PBS mppdepth: number of available threads per node
- */
- int depth = 1;
-
- /**
- Unique number for automatic task names
- */
- private static int unique = 0;
+ // Number of program invocations
+ private int count = 1;
+ private static NumberFormat IDF = new DecimalFormat("000000");
+ // Used for task name generation
+ private static int unique = 0;
+
public SlurmExecutor(Task task, ProcessListener listener) {
super(task, listener);
}
- private static NumberFormat IDF = new DecimalFormat("000000");
-
- /**
- The job name is limited to 15 characters:
- http://doesciencegrid.org/public/pbs/qsub.html
+ /**
+ * Write attribute if non-null
+ * @throws IOException
*/
- protected void validate(Task task) {
- String name = task.getName();
- if (name == null) {
- int i = 0;
- synchronized(SlurmExecutor.class) {
- i = unique++;
- }
- name = "cog-" + IDF.format(i);
- if (logger.isDebugEnabled()) {
- logger.debug("Slurm name: for: " + task.getIdentity() +
- " is: " + name);
- }
- }
- else if (name.length() > 15) {
- task.setName(name.substring(0, 15));
- }
- }
-
- /**
- Write attribute if non-null
- @throws IOException
- */
protected void writeAttr(String attrName, String arg, Writer wr)
- throws IOException {
+ throws IOException {
Object value = getSpec().getAttribute(attrName);
if (value != null) {
- wr.write("#PBS " + arg + String.valueOf(value) + '\n');
+ wr.write("#SBATCH " + arg + "=" + String.valueOf(value) + '\n');
}
}
- /**
- Write attribute if non-null and non-empty
- @throws IOException
+ /**
+ * Write attribute if non-null and non-empty
+ * @throws IOException
*/
- protected void writeNonEmptyAttr(String attrName, String arg,
- Writer wr)
- throws IOException {
+ protected void writeNonEmptyAttr(String attrName, String arg, Writer wr)
+ throws IOException {
Object value = getSpec().getAttribute(attrName);
if (value != null) {
String v = String.valueOf(value);
- if (v.length() > 0 )
- wr.write("#PBS " + arg + v + '\n');
+ if (v.length() > 0)
+ wr.write("#SBATCH " + arg + "=" + v + '\n');
}
}
-
+
+ /**
+ * Write walltime in hh:mm:ss
+ * @param wr
+ * @throws IOException
+ */
protected void writeWallTime(Writer wr) throws IOException {
Object walltime = getSpec().getAttribute("maxwalltime");
if (walltime != null) {
- wr.write("#PBS -l walltime="
+ wr.write("#SBATCH --time="
+ WallTime.normalize(walltime.toString(), "pbs-native")
+ '\n');
}
}
- private int parseAndValidateInt(Object obj, String name) {
- try {
- assert(obj != null);
- return Integer.parseInt(obj.toString());
- }
- catch (NumberFormatException e) {
- throw new IllegalArgumentException("Illegal value for " + name + ". Must be an integer.");
- }
+ /**
+ * Ensure tasks have a valid name
+ */
+ protected void validate(Task task) {
+ String name = task.getName();
+ if (name == null) {
+ int i = 0;
+ synchronized (SlurmExecutor.class) {
+ i = unique++;
+ }
+ name = "cog-" + IDF.format(i);
+ task.setName(name);
+ } else if (name.length() > 15) {
+ task.setName(name.substring(0, 15));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Slurm name: for: " + task.getIdentity() + " is: " + name);
+ }
}
-
+
/**
- Obtains profile settings regarding job size from
- JobSpecification and writes them into the PBS file.
- Looks for profiles count, ppn, ppts, and pbs.mpp
- count: mandatory, default 1 (number of processes)
- depth: default 1 (number of threads per node)
- ppn: optional, default 1 (processes per node)
- pbs.mpp: output mppwidth/mppnppn instead of nodes/ppn
- pbs.properties: extra PBS properties
- pbs.resource_list: extra PBS -l line
-
- Note that the semantics are different for the pbs.mpp setting:
- mppwidth is the total number of cores while nodes is the number
- of nodes.
-
- http://www.clusterresources.com/torquedocs/2.1jobsubmission.shtml
- @return true if this is a multi-core job
+ * Verify that an object contains a valid int
+ * @param obj
+ * @param name
+ * @return
*/
- protected boolean writeCountAndPPN(JobSpecification spec,
- Writer wr)
- throws IOException {
- boolean result = false;
-
- Object o;
-
- // Number of program invocations
- o = getSpec().getAttribute("count");
- if (o != null)
- count = parseAndValidateInt(o, "count");
- if (count != 1)
- result = true;
-
- o = spec.getAttribute("ppn");
- if (o != null)
- ppn = parseAndValidateInt(o, "ppn");
-
- o = spec.getAttribute("depth");
- if (o != null)
- depth = parseAndValidateInt(o, "depth");
-
- String slurmProperties =
- (String) getSpec().getAttribute("slurm.properties");
-
- boolean mpp = false;
- if (spec.getAttribute("pbs.mpp") != null)
- mpp = true;
-
- StringBuilder sb = new StringBuilder(512);
- sb.append("#PBS -l ");
- if (mpp) {
- sb.append("mppwidth=").append(count);
- sb.append(",");
- sb.append("mppnppn=").append(ppn);
- sb.append(",");
- sb.append("mppdepth=").append(depth);
- }
- else {
- sb.append("nodes=");
- sb.append(count);
- sb.append(":");
- sb.append("ppn=");
- sb.append(ppn);
- }
-
- if (slurmProperties != null &&
- slurmProperties.length() > 0 ) {
- sb.append(":");
- sb.append(slurmProperties);
- }
-
- sb.append('\n');
-
- wr.write(sb.toString());
-
- return result;
+ private int parseAndValidateInt(Object obj, String name) {
+ try {
+ assert (obj != null);
+ return Integer.parseInt(obj.toString());
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Illegal value for " + name + ". Must be an integer.");
+ }
}
- /*
- private boolean parseAndValidateBool(Object obj, String name)
- {
- try {
- return Boolean.parseBoolean(obj.toString());
- }
- catch (NumberFormatException e) {
- throw new IllegalArgumentException
- ("Illegal value for " + name + ". Must be true/false.");
- }
- }
- */
-
@Override
- protected void writeScript(Writer wr, String exitcodefile, String stdout,
- String stderr)
- throws IOException {
+ protected void writeScript(Writer wr, String exitcodefile, String stdout, String stderr)
+ throws IOException {
Task task = getTask();
JobSpecification spec = getSpec();
Properties properties = Properties.getProperties();
+ validate(task);
+ writeHeader(wr);
- getSpec().unpackProviderAttributes();
-
- validate(task);
- writeHeader(wr);
-
- wr.write("#PBS -S /bin/bash\n");
- wr.write("#PBS -N " + task.getName() + '\n');
- wr.write("#PBS -m n\n");
- writeNonEmptyAttr("project", "-A ", wr);
- boolean multiple = writeCountAndPPN(spec, wr);
+ Object countValue = getSpec().getAttribute("count");
+ if (countValue != null)
+ count = parseAndValidateInt(countValue, "count");
+ wr.write("#SBATCH --job-name=" + task.getName() + '\n');
+ wr.write("#SBATCH --output=" + quote(stdout) + '\n');
+ wr.write("#SBATCH --error=" + quote(stderr) + '\n');
+ wr.write("#SBATCH --nodes=" + count + '\n');
+ wr.write("#SBATCH --exclusive\n");
+ wr.write("#SBATCH --ntasks-per-node=1\n");
+ writeNonEmptyAttr("ppn", "--cpus-per-task", wr);
+ writeNonEmptyAttr("project", "--account", wr);
+ writeNonEmptyAttr("queue", "--partition", wr);
writeWallTime(wr);
- writeNonEmptyAttr("queue", "-q ", wr);
- wr.write("#PBS -o " + quote(stdout) + '\n');
- wr.write("#PBS -e " + quote(stderr) + '\n');
+ wr.write("\n");
for (String name : spec.getEnvironmentVariableNames()) {
- // "export" is necessary on the Cray XT5 Crow
- wr.write("export ");
- wr.write(name);
- wr.write('=');
- wr.write(quote(spec.getEnvironmentVariable(name)));
- wr.write('\n');
+ wr.write("export " + name + '=' + quote(spec.getEnvironmentVariable(name)) + '\n');
}
- if (spec.getEnvironmentVariableNames().size() > 0) {
- wr.write("#PBS -v " + makeList(spec.getEnvironmentVariableNames()) + '\n');
+ // Handle all slurm attributes specified by the user
+ for (String a : spec.getAttributeNames()) {
+ if (a != null && a.startsWith("slurm.")) {
+ String attributeName[] = a.split("slurm.");
+ wr.write("#SBATCH --" + attributeName[1] + " = " + spec.getAttribute(a) + '\n');
+ }
}
- String resources =
- (String) spec.getAttribute("slurm.resource_list");
- if (resources != null && resources.length() > 0) {
- if (logger.isDebugEnabled())
- logger.debug("slurm.resource_list: " + resources);
- wr.write("#PBS -l " + resources + '\n');
- }
-
- // aprun option specifically for Cray Beagle, Franklin
- boolean aprun = false;
- if (spec.getAttribute("pbs.aprun") != null)
- aprun = true;
-
String type = (String) spec.getAttribute("jobType");
if (logger.isDebugEnabled())
logger.debug("Job type: " + type);
- if ("multiple".equals(type))
- multiple = true;
- else if("single".equals(type))
- multiple = false;
- if (aprun)
- multiple = false;
- if (multiple)
- writeMultiJobPreamble(wr, exitcodefile);
if (type != null) {
- String wrapper =
- properties.getProperty("wrapper." + type);
+ String wrapper = properties.getProperty("wrapper." + type);
if (logger.isDebugEnabled()) {
logger.debug("Wrapper: " + wrapper);
}
+
if (wrapper != null) {
wrapper = replaceVars(wrapper);
wr.write(wrapper);
wr.write(' ');
}
- if (logger.isDebugEnabled()) {
+
+ if (logger.isDebugEnabled()) {
logger.debug("Wrapper after variable substitution: " + wrapper);
}
}
+
+ wr.write("srun /bin/bash -c \'");
if (spec.getDirectory() != null) {
wr.write("cd " + quote(spec.getDirectory()) + " && ");
}
- if (aprun)
- wr.write("aprun -n " + count + " -N 1 -cc none -d " +
- depth + " -F exclusive /bin/sh -c '");
-
wr.write(quote(spec.getExecutable()));
writeQuotedList(wr, spec.getArgumentsAsList());
- if (aprun)
- wr.write("'");
-
if (spec.getStdInput() != null) {
- wr.write(" < " + quote(spec.getStdInput()));
- }
- if (multiple) {
- writeMultiJobPostamble(wr);
+ wr.write(" < " + quote(spec.getStdInput()));
}
- else {
- wr.write('\n');
- wr.write("/bin/echo $? >" + exitcodefile + '\n');
+
+ if("multiple".equals(type)) {
+ wr.write("; /bin/echo $? >" + exitcodefile + ".$SLURM_PROCID\'\n");
+ } else {
+ wr.write("; /bin/echo $? >" + exitcodefile + "\'\n");
}
wr.close();
}
- void writeHeader(Writer writer)
- throws IOException {
+ void writeHeader(Writer writer) throws IOException {
+ writer.write("#!/bin/bash\n\n");
writer.write("#CoG This script generated by CoG\n");
writer.write("#CoG by class: " + SlurmExecutor.class + '\n');
writer.write("#CoG on date: " + new Date() + "\n\n");
}
-
-
- private String makeList(Collection<String> names) {
- StringBuilder sb = new StringBuilder();
- Iterator<String> i = names.iterator();
- while (i.hasNext()) {
- sb.append(i.next());
- if (i.hasNext()) {
- sb.append(", ");
- }
- }
- return sb.toString();
- }
- protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
- throws IOException {
- wr.write("NODES=`cat $PBS_NODEFILE`\n");
- wr.write("ECF=" + exitcodefile + "\n");
- wr.write("INDEX=0\n");
- wr.write("for NODE in $NODES; do\n");
- wr.write(" echo \"N\" >$ECF.$INDEX\n");
- wr.write(" ssh $NODE /bin/bash -c \\\" \"");
- }
-
-
@Override
-protected String getName() {
- return "PBS";
+ protected String getName() {
+ return "Slurm";
}
@Override
-protected AbstractProperties getProperties() {
+ protected AbstractProperties getProperties() {
return Properties.getProperties();
}
@Override
-protected Job createJob(String jobid, String stdout,
+ protected Job createJob(String jobid, String stdout,
FileLocation stdOutputLocation, String stderr,
FileLocation stdErrorLocation, String exitcode,
AbstractExecutor executor) {
@@ -362,8 +213,8 @@
private static QueuePoller poller;
@Override
-protected AbstractQueuePoller getQueuePoller() {
- synchronized(SlurmExecutor.class) {
+ protected AbstractQueuePoller getQueuePoller() {
+ synchronized (SlurmExecutor.class) {
if (poller == null) {
poller = new QueuePoller(getProperties());
poller.start();
@@ -371,4 +222,14 @@
return poller;
}
}
-}
+
+ protected String parseSubmitCommandOutput(String out) throws IOException {
+ if ("".equals(out)) {
+ throw new IOException(getProperties().getSubmitCommandName()
+ + " returned an empty job ID");
+ }
+ String outArray[] = out.split(" ");
+ return outArray[outArray.length-1].trim();
+ }
+
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/QueuePoller.java (revision 3534)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/QueuePoller.java (working copy)
@@ -8,6 +8,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.log4j.Logger;
@@ -19,142 +20,96 @@
public static final Logger logger = Logger.getLogger(QueuePoller.class);
public static final int FULL_LIST_THRESHOLD = 16;
- private Set processed;
+ private Set<String> processed;
public QueuePoller(AbstractProperties properties) {
- super("PBS provider queue poller", properties);
- processed = new HashSet();
+ super("Slurm provider queue poller", properties);
+ processed = new HashSet<String>();
}
private static String[] CMDARRAY;
protected synchronized String[] getCMDArray() {
- if (getJobs().size() <= FULL_LIST_THRESHOLD) {
- String[] cmda = new String[2 + getJobs().size()];
- cmda[0] = getProperties().getPollCommand();
- cmda[1] = "-f";
- int i = 2;
- for (Job j : getJobs().values()) {
- cmda[i++] = j.getJobID();
- }
- return cmda;
- }
- else {
- if (CMDARRAY == null) {
- CMDARRAY = new String[] { getProperties().getPollCommand(), "-f" };
- }
- }
+ if(CMDARRAY == null) {
+ if (getJobs().size() <= FULL_LIST_THRESHOLD) {
+ CMDARRAY = new String[4];
+ CMDARRAY[0] = getProperties().getPollCommand();
+ CMDARRAY[1] = "--noheader";
+ CMDARRAY[2] = "--jobs";
+ boolean first=true;
+ for (Job j : getJobs().values()) {
+ if(first) {
+ CMDARRAY[3] = j.getJobID();
+ first=false;
+ } else {
+ CMDARRAY[3] += "," + j.getJobID();
+ }
+ }
+ } else {
+ CMDARRAY = new String[] { getProperties().getPollCommand(), "--noheader" };
+ }
+ }
+
return CMDARRAY;
}
@Override
- protected int getError(int ec, String stderr) {
- if (ec != 0) {
- BufferedReader sr = new BufferedReader(new StringReader(stderr));
- try {
- String line = sr.readLine();
- while (line != null) {
- if (!line.contains("Unknown Job Id")) {
- return ec;
- }
- line = sr.readLine();
- }
- }
- catch (IOException e) {
- // should not occur while reading from a string reader
- e.printStackTrace();
- }
- return 0;
- }
- else {
- return ec;
- }
- }
+ protected int getError(int ec, String stderr) {
+ if (ec != 0) {
+ BufferedReader sr = new BufferedReader(new StringReader(stderr));
+ try {
+ String line = sr.readLine();
+ while (line != null) {
+ if (!line.contains("Unknown Job Id")) {
+ return ec;
+ }
+ line = sr.readLine();
+ }
+ } catch (IOException e) {
+ // should not occur while reading from a string reader
+ e.printStackTrace();
+ }
+ return 0;
+ } else {
+ return ec;
+ }
+ }
- protected void processStdout(InputStream is) throws IOException {
+ protected void processStdout(InputStream is) throws IOException {
+
BufferedReader br = new BufferedReader(new InputStreamReader(is));
- processed.clear();
String line;
- String currentJobID = null;
- Job currentJob = null;
+ processed.clear();
+
do {
line = br.readLine();
- if (line != null) {
- try {
- line = line.trim();
- if (line.startsWith("Job Id: ")) {
- currentJobID = line.substring("Job Id: ".length());
- processed.add(currentJobID);
- currentJob = getJob(currentJobID);
- continue;
- }
- if (currentJob != null) {
- if (line.startsWith("job_state = ")) {
- if (logger.isDebugEnabled()) {
- logger.debug("Status line: " + line);
- }
- switch (line.substring("job_state = ".length())
- .charAt(0)) {
- case 'Q': {
- if (logger.isDebugEnabled()) {
- logger.debug("Status for "
- + currentJobID + " is Q");
- }
- currentJob.setState(Job.STATE_QUEUED);
- break;
- }
- case 'R': {
- if (logger.isDebugEnabled()) {
- logger.debug("Status for "
- + currentJobID + " is R");
- }
- currentJob.setState(Job.STATE_RUNNING);
- break;
- }
- case 'C': {
- // for sites where keep_completed is there,
- // don't wait
- // for the job to be removed from the queue
- if (logger.isDebugEnabled()) {
- logger.debug("Status for "
- + currentJobID + " is C");
- }
- addDoneJob(currentJob.getJobID());
- break;
- }
- }
- }
- else if (line.startsWith("exit_status = ")) {
- try {
- int ec = Integer.parseInt(line.substring(
- "exit_status = ".length()).trim());
- currentJob.setExitcode(ec);
- }
- catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Could not parse exit_status",
- e);
- }
- }
- }
- }
+ if(line != null) {
+ String words[] = line.split("\\s+");
+ String jobid = words[0].trim();
+ String state = words[4].trim();
+ if (jobid == null || jobid.equals("") || state == null || state.equals("")) {
+ throw new IOException("Failed to parse squeue line: " + line);
}
- catch (Exception e) {
- logger.warn("Exception caught while handling "
- + getProperties().getPollCommandName()
- + " output: " + line, e);
+
+ Job job = getJob(jobid);
+ if (job == null){ continue; }
+ processed.add(jobid);
+
+ if (state.equals("PD")) {
+ job.setState(Job.STATE_QUEUED);
}
+ else if(state.equals("R")) {
+ job.setState(Job.STATE_RUNNING);
+ }
}
} while (line != null);
- Iterator i = getJobs().entrySet().iterator();
+
+ Iterator<Entry<String, Job>> i = getJobs().entrySet().iterator();
while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- String id = (String) e.getKey();
+ Map.Entry<String, Job> e = i.next();
+ String id = e.getKey();
if (!processed.contains(id)) {
- Job job = (Job) e.getValue();
- if (logger.isDebugEnabled()) {
- logger.debug("Status for " + id + " is Done");
- }
+ Job job = e.getValue();
job.setState(Job.STATE_DONE);
if (job.getState() == Job.STATE_DONE) {
addDoneJob(id);
@@ -162,7 +117,7 @@
}
}
}
-
+
protected void processStderr(InputStream is) throws IOException {
}
}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/Properties.java (revision 3534)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/slurm/Properties.java (working copy)
@@ -8,10 +8,9 @@
public static final String PROPERTIES = "provider-slurm.properties";
public static final String POLL_INTERVAL = "poll.interval";
- public static final String QSUB = "qsub";
- public static final String QSTAT = "qstat";
- public static final String QDEL = "qdel";
- public static final String USE_MPPWIDTH = "use.mppwidth";
+ public static final String SBATCH = "sbatch";
+ public static final String SQUEUE = "squeue";
+ public static final String SCANCEL = "scancel";
private static Properties properties;
@@ -25,23 +24,23 @@
protected void setDefaults() {
setPollInterval(5);
- setSubmitCommand("qsub");
- setPollCommand("qstat");
- setRemoveCommand("qdel");
+ setSubmitCommand("sbatch");
+ setPollCommand("squeue");
+ setRemoveCommand("scancel");
}
public String getPollCommandName() {
- return QSTAT;
+ return SQUEUE;
}
public String getRemoveCommandName() {
- return QDEL;
+ return SCANCEL;
}
public String getSubmitCommandName() {
- return QSUB;
+ return SBATCH;
}
}
More information about the Swift-commit
mailing list