[Swift-commit] cog r3538
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Wed Jan 2 11:50:05 CST 2013
------------------------------------------------------------------------
r3538 | davidkelly999 | 2013-01-02 11:48:23 -0600 (Wed, 02 Jan 2013) | 2 lines
LSF provider
------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java (revision 3538)
@@ -0,0 +1,148 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+
+public class QueuePoller extends AbstractQueuePoller {
+ public static final Logger logger = Logger.getLogger(QueuePoller.class);
+ public static final int FULL_LIST_THRESHOLD = 16;
+
+ private Set<String> processed;
+
+ public QueuePoller(AbstractProperties properties) {
+ super("LSF provider queue poller", properties);
+ processed = new HashSet<String>();
+ }
+
+ private static String[] CMDARRAY;
+
+ protected synchronized String[] getCMDArray() {
+ if (CMDARRAY == null) {
+ CMDARRAY = new String[] { getProperties().getPollCommand(), "-a" };
+ }
+ return CMDARRAY;
+ }
+
+ @Override
+ protected int getError(int ec, String stderr) {
+ if (ec != 0) {
+ BufferedReader sr = new BufferedReader(new StringReader(stderr));
+ try {
+ String line = sr.readLine();
+ while (line != null) {
+ if (!line.contains("is not found")) {
+ return ec;
+ }
+ line = sr.readLine();
+ }
+ }
+ catch (IOException e) {
+ // should not occur while reading from a string reader
+ e.printStackTrace();
+ }
+ return 0;
+ }
+ else {
+ return ec;
+ }
+ }
+
+ protected void processStdout(InputStream is) throws IOException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ String line;
+ String header = br.readLine();
+ if (header == null) {
+ logger.warn("Failed to read bjobs header");
+ return;
+ }
+ int jobIDIndex = header.indexOf("JOBID");
+ int stateIndex = header.indexOf("STAT");
+ int locationIndex = header.indexOf("QUEUE");
+
+ if (jobIDIndex == -1 || stateIndex == -1 || locationIndex == -1) {
+ throw new IOException("Invalid bjobs header: " + header);
+ }
+
+ processed.clear();
+
+ do {
+ line = br.readLine();
+ if (line != null) {
+ String jobid = parseToWhitespace(line, jobIDIndex);
+ String state = parseToWhitespace(line, stateIndex);
+ if (jobid == null || jobid.equals("") || state == null
+ || state.equals("")) {
+ throw new IOException("Failed to parse bjobs line: "
+ + line);
+ }
+
+ Job job = getJob(jobid);
+ if (job == null){ continue; }
+ processed.add(jobid);
+ if (state.equals("PEND")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + jobid + " is PEND");
+ }
+ job.setState(Job.STATE_QUEUED);
+ }
+ else if (state.equals("RUN")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + jobid + " is RUN");
+ }
+ job.setState(Job.STATE_RUNNING);
+ }
+ else if (state.equals("DONE")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + jobid + " is DONE");
+ }
+ addDoneJob(job.getJobID());
+ }
+ else if (state.equals("EXIT")) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Status for " + jobid + " is EXIT");
+ }
+ addDoneJob(job.getJobID());
+ }
+ }
+ } while (line != null);
+
+
+ Iterator<Entry<String, Job>> i = getJobs().entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<String, Job> e = i.next();
+ String id = (String) e.getKey();
+ if (!processed.contains(id)) {
+ Job job = (Job) e.getValue();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + id + " is Done");
+ }
+ job.setState(Job.STATE_DONE);
+ if (job.getState() == Job.STATE_DONE) {
+ addDoneJob(id);
+ }
+ }
+ }
+ }
+
+ protected void processStderr(InputStream is) throws IOException {
+ }
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java (revision 3538)
@@ -0,0 +1,32 @@
+// ----------------------------------------------------------------------
+// This code is developed as part of the Java CoG Kit project
+// The terms of the license can be found at http://www.cogkit.org/license
+// This message may not be removed or altered.
+// ----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf.execution;
+
+import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
+
+/**
+ *Provides a local LSF <code>TaskHandler</code>
+ *for job submission to the local resource without
+ *any security context.
+ *
+ */
+public class TaskHandlerImpl extends
+ org.globus.cog.abstraction.impl.common.execution.TaskHandlerImpl {
+
+ protected DelegatedTaskHandler newDelegatedTaskHandler() {
+ return new JobSubmissionTaskHandler();
+ }
+
+ protected String getName() {
+ return "LSF";
+ }
+
+ public String toString()
+ {
+ return "TaskHandlerImpl(execution LSF)";
+ }
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/JobSubmissionTaskHandler.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/JobSubmissionTaskHandler.java (revision 3538)
@@ -0,0 +1,12 @@
+package org.globus.cog.abstraction.impl.scheduler.lsf.execution;
+import org.globus.cog.abstraction.impl.scheduler.lsf.LSFExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractJobSubmissionTaskHandler;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class JobSubmissionTaskHandler extends AbstractJobSubmissionTaskHandler {
+ protected AbstractExecutor newExecutor(Task task,
+ AbstractJobSubmissionTaskHandler th) {
+ return new LSFExecutor(task, th);
+ }
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java (revision 3538)
@@ -0,0 +1,326 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.lsf.Properties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.FileLocation;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class LSFExecutor extends AbstractExecutor {
+ public static final Logger logger = Logger.getLogger(LSFExecutor.class);
+ private int count = 1;
+
+ public LSFExecutor(Task task, ProcessListener listener) {
+ super(task, listener);
+ }
+
+ @Override
+ protected Job createJob(String jobid, String stdout,
+ FileLocation stdOutputLocation, String stderr,
+ FileLocation stdErrorLocation, String exitcode,
+ AbstractExecutor executor) {
+ return new Job(jobid, stdout, stdOutputLocation, stderr,
+ stdErrorLocation, exitcode, executor);
+ }
+
+ @Override
+ protected String getName() {
+ return "LSF";
+ }
+
+ @Override
+ protected AbstractProperties getProperties() {
+ return Properties.getProperties();
+ }
+
+ void writeHeader(Writer writer)
+ throws IOException {
+ writer.write("#CoG This script generated by CoG\n");
+ writer.write("#CoG by class: " + LSFExecutor.class + '\n');
+ writer.write("#CoG on date: " + new Date() + "\n\n");
+ }
+
+ /**
+ Write attribute if non-null and non-empty
+ @throws IOException
+ */
+ protected void writeNonEmptyAttr(String attrName, String arg,
+ Writer wr)
+ throws IOException {
+ Object value = getSpec().getAttribute(attrName);
+ if (value != null) {
+ String v = String.valueOf(value);
+ if (v.length() > 0 )
+ wr.write("#BSUB " + arg + " " + v + '\n');
+ }
+ }
+
+ private int parseAndValidateInt(Object obj, String name) {
+ try {
+ assert(obj != null);
+ return Integer.parseInt(obj.toString());
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Illegal value for " + name + ". Must be an integer.");
+ }
+ }
+
+ /**
+ @return true if this is a multi-core job
+ **/
+ protected boolean writeCountAndPPN(JobSpecification spec,
+ Writer wr)
+ throws IOException {
+ boolean result = false;
+
+ Object o;
+
+ // Number of program invocations
+ o = getSpec().getAttribute("count");
+ if (o != null)
+ count = parseAndValidateInt(o, "count");
+ if (count != 1)
+ result = true;
+
+ wr.write("#BSUB -n " + count + "\n");
+ return result;
+ }
+
+ protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
+ throws IOException {
+ wr.write("ECF=" + exitcodefile + "\n");
+ wr.write("INDEX=0\n");
+ wr.write("for NODE in $LSB_HOSTS; do\n");
+ wr.write(" echo \"N\" >$ECF.$INDEX\n");
+ wr.write(" ssh $NODE /bin/bash -c \\\" \"");
+ }
+
+ @Override
+ protected void writeScript(Writer wr, String exitcodefile, String stdout,
+ String stderr)
+ throws IOException {
+ Task task = getTask();
+ JobSpecification spec = getSpec();
+ Properties properties = Properties.getProperties();
+ validate(task);
+ writeHeader(wr);
+
+ wr.write("#BSUB -L /bin/bash\n");
+ wr.write("#BSUB -J " + task.getName() + '\n');
+ wr.write("#BSUB -o " + quote(stdout) + '\n');
+ wr.write("#BSUB -e " + quote(stderr) + '\n');
+
+ writeNonEmptyAttr("project", "-P", wr);
+ writeNonEmptyAttr("queue", "-q", wr);
+ boolean multiple = writeCountAndPPN(spec, wr);
+
+ // Convert maxwalltime to HH:MM format
+ int hours=0, minutes=0;
+ String walltime = getSpec().getAttribute("maxwalltime").toString();
+ if(walltime != null) {
+ String walltimeSplit[] = walltime.split(":");
+ if(walltimeSplit.length == 1) {
+ minutes = Integer.valueOf(walltimeSplit[0]);
+ } else if(walltimeSplit.length > 1) {
+ hours = Integer.valueOf(walltimeSplit[0]);
+ minutes = Integer.valueOf(walltimeSplit[1]);
+ // LSF ignores seconds
+ }
+ if(minutes >= 60) {
+ hours = minutes / 60;
+ minutes = minutes % 60;
+ }
+ wr.write("#BSUB -W " + hours + ":" + minutes + '\n');
+ }
+
+ if (getSpec().getDirectory() != null)
+ wr.write("#BSUB -cwd " + getSpec().getDirectory() + '\n');
+
+ for (String name : spec.getEnvironmentVariableNames()) {
+ wr.write("export ");
+ wr.write(name);
+ wr.write('=');
+ wr.write(quote(spec.getEnvironmentVariable(name)));
+ wr.write('\n');
+ }
+
+ String type = (String) spec.getAttribute("jobType");
+ if (logger.isDebugEnabled())
+ logger.debug("Job type: " + type);
+ if ("multiple".equals(type))
+ multiple = true;
+ else if("single".equals(type))
+ multiple = false;
+ if (multiple)
+ writeMultiJobPreamble(wr, exitcodefile);
+
+ if (type != null) {
+ String wrapper =
+ properties.getProperty("wrapper." + type);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Wrapper: " + wrapper);
+ }
+ if (wrapper != null) {
+ wrapper = replaceVars(wrapper);
+ wr.write(wrapper);
+ wr.write(' ');
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Wrapper after variable substitution: " + wrapper);
+ }
+ }
+ if (spec.getDirectory() != null) {
+ wr.write("cd " + quote(spec.getDirectory()) + " && ");
+ }
+
+ wr.write(quote(spec.getExecutable()));
+ writeQuotedList(wr, spec.getArgumentsAsList());
+
+ // Handle all LSF attributes specified by the user
+ for(String a : spec.getAttributeNames()) {
+ if(a != null && a.startsWith("lsf.")) {
+ String attributeName[] = a.split("lsf.");
+ wr.write(attributeName[1] + " = " + spec.getAttribute(a) + '\n');
+ }
+ }
+
+ if (spec.getStdInput() != null) {
+ wr.write(" < " + quote(spec.getStdInput()));
+ }
+
+ if (multiple) {
+ writeMultiJobPostamble(wr);
+ } else {
+ wr.write('\n');
+ wr.write("/bin/echo $? >" + exitcodefile + '\n');
+ }
+ wr.close();
+ }
+
+ protected void addAttr(String attrName, String option, List<String> l) {
+ addAttr(attrName, option, l, null);
+ }
+
+ protected void addAttr(String attrName, String option, List<String> l, boolean round) {
+ addAttr(attrName, option, l, null, round);
+ }
+
+ protected void addAttr(String attrName, String option, List<String> l, String defval) {
+ addAttr(attrName, option, l, defval, false);
+ }
+
+ protected void addAttr(String attrName, String option, List<String> l,
+ String defval, boolean round) {
+ Object value = getSpec().getAttribute(attrName);
+ if (value != null) {
+ if (round) {
+ value = round(value);
+ }
+ l.add(option);
+ l.add(String.valueOf(value));
+ }
+ else if (defval != null) {
+ l.add(option);
+ l.add(defval);
+ }
+ }
+
+ protected Object round(Object value) {
+ if (value instanceof Number) {
+ return new Integer(((Number) value).intValue());
+ }
+ else {
+ return value;
+ }
+ }
+
+ protected String parseSubmitCommandOutput(String out) throws IOException {
+ if ("".equals(out)) {
+ throw new IOException(getProperties().getSubmitCommandName()
+ + " returned an empty job ID");
+ }
+ String outArray[] = out.split(" ");
+ String jobString = outArray[1];
+ jobString = jobString.replaceAll("<", "");
+ jobString = jobString.replaceAll(">", "");
+ return jobString;
+ }
+
+ protected String[] buildCommandLine(File jobdir, File script,
+ String exitcode, String stdout, String stderr)
+ throws IOException {
+
+ writeScript(new BufferedWriter(new FileWriter(script)), exitcode,
+ stdout, stderr);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Wrote " + getName() + " script to " + script);
+ }
+
+ String[] cmdline = { "/bin/bash", "-c", (getProperties().getSubmitCommand() + " < " + script.getAbsolutePath()) };
+ return cmdline;
+ }
+
+
+ @Override
+ protected String quote(String s) {
+ boolean quotes = false;
+ if (s.indexOf(' ') != -1) {
+ quotes = true;
+ }
+ StringBuffer sb = new StringBuffer();
+ if (quotes) {
+ sb.append('"');
+ }
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c == '"' || c == '\\') {
+ sb.append('\\');
+ break;
+ }
+ sb.append(c);
+ }
+ if (quotes) {
+ sb.append('"');
+ }
+ return sb.toString();
+ }
+
+ @Override
+ protected void cleanup() {
+ super.cleanup();
+ new File(getStdout()).delete();
+ new File(getStderr()).delete();
+ }
+
+ private static AbstractQueuePoller poller;
+
+ @Override
+ protected AbstractQueuePoller getQueuePoller() {
+ synchronized(LSFExecutor.class) {
+ if (poller == null) {
+ poller = new QueuePoller(getProperties());
+ poller.start();
+ }
+ return poller;
+ }
+ }
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java (revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java (revision 3538)
@@ -0,0 +1,49 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+
+public class Properties extends AbstractProperties {
+ private static final long serialVersionUID = 1L;
+ public static final String PROPERTIES = "provider-lsf.properties";
+
+
+ public static final String BSUB = "bsub";
+ public static final String BJOBS = "bjobs";
+ public static final String BKILL = "bkill";
+
+ private static Properties properties;
+
+ public static synchronized Properties getProperties() {
+ if (properties == null) {
+ properties = new Properties();
+ properties.load(PROPERTIES);
+ }
+ return properties;
+ }
+
+ protected void setDefaults() {
+ setPollInterval(5);
+ setSubmitCommand("bsub");
+ setPollCommand("bjobs");
+ setRemoveCommand("bkill");
+ }
+
+ public String getPollCommandName() {
+ return BJOBS;
+ }
+
+ public String getRemoveCommandName() {
+ return BKILL;
+ }
+
+ public String getSubmitCommandName() {
+ return BSUB;
+ }
+
+
+}
Index: modules/provider-localscheduler/etc/provider-lsf.properties
===================================================================
--- modules/provider-localscheduler/etc/provider-lsf.properties (revision 0)
+++ modules/provider-localscheduler/etc/provider-lsf.properties (revision 3538)
@@ -0,0 +1,21 @@
+#
+# The interval, in seconds, at which the provider will poll the Cobalt
+# queue for status updates. There is at most one poll thread per JVM,
+# which is shared by all the jobs submitted through the Cobalt provider.
+#
+poll.interval=5
+
+#
+# The path to bsub. The default assumes that bsub is in PATH
+#
+bsub=bsub
+
+#
+# The path to bjobs. The default assumes that bjobs is in PATH
+#
+bjobs=bjobs
+
+#
+# The path to bkill. The default assumes that bkill is in PATH
+#
+bkill=bkill
More information about the Swift-commit
mailing list