[Swift-commit] cog r3498
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Wed Nov 7 15:10:09 CST 2012
------------------------------------------------------------------------
r3498 | davidkelly999 | 2012-11-07 15:05:14 -0600 (Wed, 07 Nov 2012) | 2 lines
Initial commit of LSF provider
------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java (revision 3498)
@@ -0,0 +1,175 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+
+public class QueuePoller extends AbstractQueuePoller {
+ public static final Logger logger = Logger.getLogger(QueuePoller.class);
+ public static final int FULL_LIST_THRESHOLD = 16;
+
+ private Set processed;
+
+ public QueuePoller(AbstractProperties properties) {
+ super("PBS provider queue poller", properties);
+ processed = new HashSet();
+ }
+
+ private static String[] CMDARRAY;
+
+ protected synchronized String[] getCMDArray() {
+ if (getJobs().size() <= FULL_LIST_THRESHOLD) {
+ String[] cmda = new String[2 + getJobs().size()];
+ cmda[0] = getProperties().getPollCommand();
+ cmda[1] = "-f";
+ int i = 2;
+ for (Job j : getJobs().values()) {
+ cmda[i++] = j.getJobID();
+ }
+ return cmda;
+ }
+ else {
+ if (CMDARRAY == null) {
+ CMDARRAY = new String[] { getProperties().getPollCommand(), "-f" };
+ }
+ }
+ return CMDARRAY;
+ }
+
+ @Override
+ protected int getError(int ec, String stderr) {
+ if (ec != 0) {
+ BufferedReader sr = new BufferedReader(new StringReader(stderr));
+ try {
+ String line = sr.readLine();
+ while (line != null) {
+ if (!line.contains("Unknown Job Id")) {
+ return ec;
+ }
+ line = sr.readLine();
+ }
+ }
+ catch (IOException e) {
+ // should not occur while reading from a string reader
+ e.printStackTrace();
+ }
+ return 0;
+ }
+ else {
+ return ec;
+ }
+ }
+
+ protected void processStdout(InputStream is) throws IOException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ processed.clear();
+ String line;
+ String currentJobID = null;
+ Job currentJob = null;
+ do {
+ line = br.readLine();
+ if (line != null) {
+ try {
+ line = line.trim();
+ if (line.startsWith("Job Id: ")) {
+ currentJobID = line.substring("Job Id: ".length());
+ processed.add(currentJobID);
+ currentJob = getJob(currentJobID);
+ continue;
+ }
+ if (currentJob != null) {
+ if (line.startsWith("job_state = ")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status line: " + line);
+ }
+ switch (line.substring("job_state = ".length())
+ .charAt(0)) {
+ case 'Q': {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for "
+ + currentJobID + " is Q");
+ }
+ currentJob.setState(Job.STATE_QUEUED);
+ break;
+ }
+ case 'R': {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for "
+ + currentJobID + " is R");
+ }
+ currentJob.setState(Job.STATE_RUNNING);
+ break;
+ }
+ case 'C': {
+ // for sites where keep_completed is there,
+ // don't wait
+ // for the job to be removed from the queue
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for "
+ + currentJobID + " is C");
+ }
+ addDoneJob(currentJob.getJobID());
+ break;
+ }
+ }
+ }
+ else if (line.startsWith("exit_status = ")) {
+ try {
+ int ec = Integer.parseInt(line.substring(
+ "exit_status = ".length()).trim());
+ currentJob.setExitcode(ec);
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Could not parse exit_status",
+ e);
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.warn("Exception caught while handling "
+ + getProperties().getPollCommandName()
+ + " output: " + line, e);
+ }
+ }
+ } while (line != null);
+ Iterator i = getJobs().entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ String id = (String) e.getKey();
+ if (!processed.contains(id)) {
+ Job job = (Job) e.getValue();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + id + " is Done");
+ }
+ job.setState(Job.STATE_DONE);
+ if (job.getState() == Job.STATE_DONE) {
+ addDoneJob(id);
+ }
+ }
+ }
+ }
+
+ protected void processStderr(InputStream is) throws IOException {
+ }
+}
+
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java (revision 3498)
@@ -0,0 +1,381 @@
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+import java.io.IOException;
+import java.io.Writer;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.execution.WallTime;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.FileLocation;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class LSFExecutor extends AbstractExecutor {
+ public static final Logger logger = Logger.getLogger(LSFExecutor.class);
+
+
+ /**
+ Number of program invocations
+ */
+ int count = 1;
+
+ /**
+ PBS processes-per-node
+ */
+ int ppn = 1;
+
+ /**
+ PBS mppdepth: number of available threads per node
+ */
+ int depth = 1;
+
+ /**
+ Unique number for automatic task names
+ */
+ private static int unique = 0;
+
+ public LSFExecutor(Task task, ProcessListener listener) {
+ super(task, listener);
+ }
+
+ private static NumberFormat IDF = new DecimalFormat("000000");
+
+ /**
+ The job name is limited to 15 characters:
+ http://doesciencegrid.org/public/pbs/qsub.html
+ */
+ protected void validate(Task task) {
+ String name = task.getName();
+ if (name == null) {
+ int i = 0;
+ synchronized(LSFExecutor.class) {
+ i = unique++;
+ }
+ name = "cog-" + IDF.format(i);
+ if (logger.isDebugEnabled()) {
+ logger.debug("PBS name: for: " + task.getIdentity() +
+ " is: " + name);
+ }
+ }
+ else if (name.length() > 15) {
+ task.setName(name.substring(0, 15));
+ }
+ }
+
+ /**
+ Write attribute if non-null
+ @throws IOException
+ */
+ protected void writeAttr(String attrName, String arg, Writer wr)
+ throws IOException {
+ Object value = getSpec().getAttribute(attrName);
+ if (value != null) {
+ wr.write("#PBS " + arg + String.valueOf(value) + '\n');
+ }
+ }
+
+ /**
+ Write attribute if non-null and non-empty
+ @throws IOException
+ */
+ protected void writeNonEmptyAttr(String attrName, String arg,
+ Writer wr)
+ throws IOException {
+ Object value = getSpec().getAttribute(attrName);
+ if (value != null) {
+ String v = String.valueOf(value);
+ if (v.length() > 0 )
+ wr.write("#PBS " + arg + v + '\n');
+ }
+ }
+
+ protected void writeWallTime(Writer wr) throws IOException {
+ Object walltime = getSpec().getAttribute("maxwalltime");
+ if (walltime != null) {
+ wr.write("#PBS -l walltime="
+ + WallTime.normalize(walltime.toString(), "pbs-native")
+ + '\n');
+ }
+ }
+
+ private int parseAndValidateInt(Object obj, String name) {
+ try {
+ assert(obj != null);
+ return Integer.parseInt(obj.toString());
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Illegal value for " + name + ". Must be an integer.");
+ }
+ }
+
+ /**
+ Obtains profile settings regarding job size from
+ JobSpecification and writes them into the PBS file.
+ Looks for profiles count, ppn, ppts, and pbs.mpp
+ count: mandatory, default 1 (number of processes)
+ depth: default 1 (number of threads per node)
+ ppn: optional, default 1 (processes per node)
+ pbs.mpp: output mppwidth/mppnppn instead of nodes/ppn
+ pbs.properties: extra PBS properties
+ pbs.resource_list: extra PBS -l line
+
+ Note that the semantics are different for the pbs.mpp setting:
+ mppwidth is the total number of cores while nodes is the number
+ of nodes.
+
+ http://www.clusterresources.com/torquedocs/2.1jobsubmission.shtml
+ @return true if this is a multi-core job
+ */
+ protected boolean writeCountAndPPN(JobSpecification spec,
+ Writer wr)
+ throws IOException {
+ boolean result = false;
+
+ Object o;
+
+ // Number of program invocations
+ o = getSpec().getAttribute("count");
+ if (o != null)
+ count = parseAndValidateInt(o, "count");
+ if (count != 1)
+ result = true;
+
+ o = spec.getAttribute("ppn");
+ if (o != null)
+ ppn = parseAndValidateInt(o, "ppn");
+
+ o = spec.getAttribute("depth");
+ if (o != null)
+ depth = parseAndValidateInt(o, "depth");
+
+ String pbsProperties =
+ (String) getSpec().getAttribute("lsf.properties");
+
+ boolean mpp = false;
+ if (spec.getAttribute("lsf.mpp") != null)
+ mpp = true;
+
+ StringBuilder sb = new StringBuilder(512);
+ sb.append("#PBS -l ");
+ if (mpp) {
+ sb.append("mppwidth=").append(count);
+ sb.append(",");
+ sb.append("mppnppn=").append(ppn);
+ sb.append(",");
+ sb.append("mppdepth=").append(depth);
+ }
+ else {
+ sb.append("nodes=");
+ sb.append(count);
+ sb.append(":");
+ sb.append("ppn=");
+ sb.append(ppn);
+ }
+
+ if (pbsProperties != null &&
+ pbsProperties.length() > 0 ) {
+ sb.append(":");
+ sb.append(pbsProperties);
+ }
+
+ sb.append('\n');
+
+ wr.write(sb.toString());
+
+ return result;
+ }
+
+ /*
+ private boolean parseAndValidateBool(Object obj, String name)
+ {
+ try {
+ return Boolean.parseBoolean(obj.toString());
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException
+ ("Illegal value for " + name + ". Must be true/false.");
+ }
+ }
+ */
+
+ @Override
+ protected void writeScript(Writer wr, String exitcodefile, String stdout,
+ String stderr)
+ throws IOException {
+ Task task = getTask();
+ JobSpecification spec = getSpec();
+ Properties properties = Properties.getProperties();
+
+ getSpec().unpackProviderAttributes();
+
+ validate(task);
+ writeHeader(wr);
+
+ wr.write("#PBS -S /bin/bash\n");
+ wr.write("#PBS -N " + task.getName() + '\n');
+ wr.write("#PBS -m n\n");
+ writeNonEmptyAttr("project", "-A ", wr);
+ boolean multiple = writeCountAndPPN(spec, wr);
+ writeWallTime(wr);
+ writeNonEmptyAttr("queue", "-q ", wr);
+ wr.write("#PBS -o " + quote(stdout) + '\n');
+ wr.write("#PBS -e " + quote(stderr) + '\n');
+
+ for (String name : spec.getEnvironmentVariableNames()) {
+ // "export" is necessary on the Cray XT5 Crow
+ wr.write("export ");
+ wr.write(name);
+ wr.write('=');
+ wr.write(quote(spec.getEnvironmentVariable(name)));
+ wr.write('\n');
+ }
+
+ if (spec.getEnvironmentVariableNames().size() > 0) {
+ wr.write("#PBS -v " + makeList(spec.getEnvironmentVariableNames()) + '\n');
+ }
+
+ String resources =
+ (String) spec.getAttribute("lsf.resource_list");
+ if (resources != null && resources.length() > 0) {
+ if (logger.isDebugEnabled())
+ logger.debug("lsf.resource_list: " + resources);
+ wr.write("#PBS -l " + resources + '\n');
+ }
+
+ // aprun option specifically for Cray Beagle, Franklin
+ boolean aprun = false;
+ if (spec.getAttribute("lsf.aprun") != null)
+ aprun = true;
+
+ String type = (String) spec.getAttribute("jobType");
+ if (logger.isDebugEnabled())
+ logger.debug("Job type: " + type);
+ if ("multiple".equals(type))
+ multiple = true;
+ else if("single".equals(type))
+ multiple = false;
+ if (aprun)
+ multiple = false;
+ if (multiple)
+ writeMultiJobPreamble(wr, exitcodefile);
+
+ if (type != null) {
+ String wrapper =
+ properties.getProperty("wrapper." + type);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Wrapper: " + wrapper);
+ }
+ if (wrapper != null) {
+ wrapper = replaceVars(wrapper);
+ wr.write(wrapper);
+ wr.write(' ');
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Wrapper after variable substitution: " + wrapper);
+ }
+ }
+ if (spec.getDirectory() != null) {
+ wr.write("cd " + quote(spec.getDirectory()) + " && ");
+ }
+
+ if (aprun)
+ wr.write("aprun -n " + count + " -N 1 -cc none -d " +
+ depth + " -F exclusive /bin/sh -c '");
+
+ wr.write(quote(spec.getExecutable()));
+ writeQuotedList(wr, spec.getArgumentsAsList());
+
+ if (aprun)
+ wr.write("'");
+
+ if (spec.getStdInput() != null) {
+ wr.write(" < " + quote(spec.getStdInput()));
+ }
+ if (multiple) {
+ writeMultiJobPostamble(wr);
+ }
+ else {
+ wr.write('\n');
+ wr.write("/bin/echo $? >" + exitcodefile + '\n');
+ }
+ wr.close();
+ }
+
+ void writeHeader(Writer writer)
+ throws IOException {
+ writer.write("#CoG This script generated by CoG\n");
+ writer.write("#CoG by class: " + LSFExecutor.class + '\n');
+ writer.write("#CoG on date: " + new Date() + "\n\n");
+ }
+
+
+ private String makeList(Collection<String> names) {
+ StringBuilder sb = new StringBuilder();
+ Iterator<String> i = names.iterator();
+ while (i.hasNext()) {
+ sb.append(i.next());
+ if (i.hasNext()) {
+ sb.append(", ");
+ }
+ }
+ return sb.toString();
+ }
+
+ protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
+ throws IOException {
+ wr.write("NODES=`cat $PBS_NODEFILE`\n");
+ wr.write("ECF=" + exitcodefile + "\n");
+ wr.write("INDEX=0\n");
+ wr.write("for NODE in $NODES; do\n");
+ wr.write(" echo \"N\" >$ECF.$INDEX\n");
+ wr.write(" ssh $NODE /bin/bash -c \\\" \"");
+ }
+
+
+ @Override
+protected String getName() {
+ return "PBS";
+ }
+
+ @Override
+protected AbstractProperties getProperties() {
+ return Properties.getProperties();
+ }
+
+ @Override
+protected Job createJob(String jobid, String stdout,
+ FileLocation stdOutputLocation, String stderr,
+ FileLocation stdErrorLocation, String exitcode,
+ AbstractExecutor executor) {
+ return new Job(jobid, stdout, stdOutputLocation, stderr,
+ stdErrorLocation, exitcode, executor);
+ }
+
+ private static QueuePoller poller;
+
+ @Override
+protected AbstractQueuePoller getQueuePoller() {
+ synchronized(LSFExecutor.class) {
+ if (poller == null) {
+ poller = new QueuePoller(getProperties());
+ poller.start();
+ }
+ return poller;
+ }
+ }
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java (revision 3498)
@@ -0,0 +1,59 @@
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+//import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+
+public class Properties extends AbstractProperties {
+
+ private static final long serialVersionUID = 1L;
+
+ // private static Logger logger =
+ // Logger.getLogger(Properties.class);
+
+ public static final String PROPERTIES = "provider-lsf.properties";
+
+ public static final String POLL_INTERVAL = "poll.interval";
+ public static final String QSUB = "qsub";
+ public static final String QSTAT = "qstat";
+ public static final String QDEL = "qdel";
+ public static final String USE_MPPWIDTH = "use.mppwidth";
+
+ private static Properties properties;
+
+ public static synchronized Properties getProperties() {
+ if (properties == null) {
+ properties = new Properties();
+ properties.load(PROPERTIES);
+ }
+ return properties;
+ }
+
+ protected void setDefaults() {
+ setPollInterval(5);
+ setSubmitCommand("qsub");
+ setPollCommand("qstat");
+ setRemoveCommand("qdel");
+ }
+
+
+ public String getPollCommandName() {
+ return QSTAT;
+ }
+
+
+ public String getRemoveCommandName() {
+ return QDEL;
+ }
+
+
+ public String getSubmitCommandName() {
+ return QSUB;
+ }
+}
+
Index: modules/provider-localscheduler/resources/cog-provider.properties
===================================================================
--- modules/provider-localscheduler/resources/cog-provider.properties (revision 3497)
+++ modules/provider-localscheduler/resources/cog-provider.properties (working copy)
@@ -29,3 +29,7 @@
executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.slurm.execution.TaskHandlerImpl
securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
+provider=lsf
+sandbox=false
+executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.lsf.execution.TaskHandlerImpl
+securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
Index: modules/provider-localscheduler/etc/provider-lsf.properties
===================================================================
--- modules/provider-localscheduler/etc/provider-lsf.properties (revision 0)
+++ modules/provider-localscheduler/etc/provider-lsf.properties (revision 3498)
@@ -0,0 +1,33 @@
+#
+# The interval, in seconds, at which the provider will poll the PBS
+# queue for status updates. There is at most one poll thread per JVM,
+# which is shared by all the jobs submitted through the PBS provider.
+#
+poll.interval=5
+
+#
+# The path to qsub. The default assumes that qsub is in PATH
+#
+qsub=lsf-qsub
+
+#
+# The path to qstat. The default assumes that qstat is in PATH
+#
+qstat=lsf-qstat
+
+#
+# The path to qdel. The default assumes that qdel is in PATH
+#
+qdel=lsf-qdel
+
+# If true, use "#PBS -l mppwidth=" instead of "#PBS -l nodes="
+# in PBS script
+use.mppwidth=false
+
+# If the jobType attribute is specified, then the PBS provider
+# will look for a property named "wrapper.<jobType>" and prepend
+# that to the executable line in the PBS script. It will also
+# substitute value of attributes in the job specification, using
+# the "$attrName" notation.
+#
+wrapper.mpi=mpirun -np $count
More information about the Swift-commit
mailing list