[Swift-commit] cog r3538

swift at ci.uchicago.edu swift at ci.uchicago.edu
Wed Jan 2 11:50:05 CST 2013


------------------------------------------------------------------------
r3538 | davidkelly999 | 2013-01-02 11:48:23 -0600 (Wed, 02 Jan 2013) | 2 lines

LSF provider 

------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java	(revision 3538)
@@ -0,0 +1,148 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+
+public class QueuePoller extends AbstractQueuePoller {
+	public static final Logger logger = Logger.getLogger(QueuePoller.class);
+	public static final int FULL_LIST_THRESHOLD = 16;
+
+	private Set<String> processed;
+
+	public QueuePoller(AbstractProperties properties) {
+		super("LSF provider queue poller", properties);
+		processed = new HashSet<String>();
+	}
+
+	private static String[] CMDARRAY;
+
+	protected synchronized String[] getCMDArray() {
+		if (CMDARRAY == null) {
+			CMDARRAY = new String[] { getProperties().getPollCommand(), "-a" };
+		}
+		return CMDARRAY;
+	}
+	
+	@Override
+    protected int getError(int ec, String stderr) {
+	    if (ec != 0) {
+    	    BufferedReader sr = new BufferedReader(new StringReader(stderr));
+    	    try {
+        	    String line = sr.readLine();
+        	    while (line != null) {
+        	        if (!line.contains("is not found")) {
+        	            return ec;
+        	        }
+        	        line = sr.readLine();
+        	    }
+    	    }
+    	    catch (IOException e) {
+    	        // should not occur while reading from a string reader
+    	        e.printStackTrace();
+    	    }
+    	    return 0;
+	    }
+	    else {
+	        return ec;
+	    }
+    }
+	
+	protected void processStdout(InputStream is) throws IOException {
+		BufferedReader br = new BufferedReader(new InputStreamReader(is));
+		String line;
+		String header = br.readLine();
+		if (header == null) {
+			logger.warn("Failed to read bjobs header");
+			return;
+		}
+		int jobIDIndex = header.indexOf("JOBID");
+		int stateIndex = header.indexOf("STAT");
+		int locationIndex = header.indexOf("QUEUE");
+		
+		if (jobIDIndex == -1 || stateIndex == -1 || locationIndex == -1) {
+			throw new IOException("Invalid bjobs header: " + header);
+		}
+		
+		processed.clear();
+		
+		do {
+			line = br.readLine();
+			if (line != null) {
+				String jobid = parseToWhitespace(line, jobIDIndex);
+				String state = parseToWhitespace(line, stateIndex);
+				if (jobid == null || jobid.equals("") || state == null
+						|| state.equals("")) {
+					throw new IOException("Failed to parse bjobs line: "
+							+ line);
+				}
+				
+				Job job = getJob(jobid);
+				if (job == null){ continue; }
+				processed.add(jobid);
+				if (state.equals("PEND")) {
+					if (logger.isDebugEnabled()) {
+						logger.debug("Status for " + jobid + " is PEND");
+					}
+					job.setState(Job.STATE_QUEUED);
+				}
+				else if (state.equals("RUN")) {
+					if (logger.isDebugEnabled()) {
+						logger.debug("Status for " + jobid + " is RUN");
+					}
+					job.setState(Job.STATE_RUNNING);
+				}
+				else if (state.equals("DONE")) {
+					if (logger.isDebugEnabled()) {
+						logger.debug("Status for " + jobid + " is DONE");
+					}
+					addDoneJob(job.getJobID());
+				}
+				else if (state.equals("EXIT")) {
+					if(logger.isDebugEnabled()) {
+						logger.debug("Status for " + jobid + " is EXIT");
+					}
+					addDoneJob(job.getJobID());
+				}
+			}
+		} while (line != null);
+		
+		
+		Iterator<Entry<String, Job>> i = getJobs().entrySet().iterator();
+		while (i.hasNext()) {
+			Map.Entry<String, Job> e = i.next();
+			String id = (String) e.getKey();
+			if (!processed.contains(id)) {
+				Job job = (Job) e.getValue();
+				if (logger.isDebugEnabled()) {
+					logger.debug("Status for " + id + " is Done");
+				}
+				job.setState(Job.STATE_DONE);
+				if (job.getState() == Job.STATE_DONE) {
+					addDoneJob(id);
+				}
+			}
+		}
+	}
+	
+	protected void processStderr(InputStream is) throws IOException {
+	}
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/TaskHandlerImpl.java	(revision 3538)
@@ -0,0 +1,32 @@
+// ----------------------------------------------------------------------
+// This code is developed as part of the Java CoG Kit project
+// The terms of the license can be found at http://www.cogkit.org/license
+// This message may not be removed or altered.
+// ----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf.execution;
+
+import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
+
+/**
+ *Provides a local LSF <code>TaskHandler</code>
+ *for job submission to the local resource without
+ *any security context.
+ *
+ */
+public class TaskHandlerImpl extends
+		org.globus.cog.abstraction.impl.common.execution.TaskHandlerImpl {
+
+	protected DelegatedTaskHandler newDelegatedTaskHandler() {
+		return new JobSubmissionTaskHandler();
+	}
+
+	protected String getName() {
+		return "LSF";
+	}
+
+    public String toString()
+    {
+        return "TaskHandlerImpl(execution LSF)";
+    }
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/JobSubmissionTaskHandler.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/JobSubmissionTaskHandler.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/execution/JobSubmissionTaskHandler.java	(revision 3538)
@@ -0,0 +1,12 @@
+package org.globus.cog.abstraction.impl.scheduler.lsf.execution;
+import org.globus.cog.abstraction.impl.scheduler.lsf.LSFExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractJobSubmissionTaskHandler;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class JobSubmissionTaskHandler extends AbstractJobSubmissionTaskHandler {
+	protected AbstractExecutor newExecutor(Task task,
+			AbstractJobSubmissionTaskHandler th) {
+		return new LSFExecutor(task, th);
+	}
+}
\ No newline at end of file
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java	(revision 3538)
@@ -0,0 +1,326 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.lsf.Properties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.FileLocation;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class LSFExecutor extends AbstractExecutor {
+	public static final Logger logger = Logger.getLogger(LSFExecutor.class);
+	private int count = 1;
+
+	public LSFExecutor(Task task, ProcessListener listener) {
+		super(task, listener);
+	}
+	
+	@Override
+	  protected Job createJob(String jobid, String stdout,
+				FileLocation stdOutputLocation, String stderr,
+				FileLocation stdErrorLocation, String exitcode,
+				AbstractExecutor executor) {
+			return new Job(jobid, stdout, stdOutputLocation, stderr,
+					stdErrorLocation, exitcode, executor);
+		}
+	
+	@Override
+	protected String getName() {
+		return "LSF";
+	}
+
+	@Override
+	protected AbstractProperties getProperties() {
+		return Properties.getProperties();
+	}
+
+	void writeHeader(Writer writer)	
+	throws IOException {
+		writer.write("#CoG This script generated by CoG\n");
+		writer.write("#CoG   by class: " + LSFExecutor.class + '\n');
+		writer.write("#CoG   on date: " + new Date() + "\n\n");
+	}
+
+	/** 
+    Write attribute if non-null and non-empty
+    @throws IOException
+	 */
+	protected void writeNonEmptyAttr(String attrName, String arg, 
+	                                 Writer wr)
+	throws IOException {
+		Object value = getSpec().getAttribute(attrName);
+		if (value != null) {
+			String v = String.valueOf(value);
+			if (v.length() > 0 )
+				wr.write("#BSUB " + arg + " " + v + '\n');
+		}
+	}
+	
+	private int parseAndValidateInt(Object obj, String name) {
+	    try {
+	        assert(obj != null);
+	        return Integer.parseInt(obj.toString());
+	    }
+	    catch (NumberFormatException e) {
+	        throw new IllegalArgumentException("Illegal value for " + name + ". Must be an integer.");
+	    }
+	}
+	
+	/**
+	   @return true if this is a multi-core job
+	**/
+	protected boolean writeCountAndPPN(JobSpecification spec,
+	                                   Writer wr)
+	throws IOException {
+	    boolean result = false;
+
+	    Object o;
+
+	    // Number of program invocations
+	    o = getSpec().getAttribute("count");
+	    if (o != null)
+	        count = parseAndValidateInt(o, "count");
+	    if (count != 1)
+	        result = true;
+
+	    wr.write("#BSUB -n " + count + "\n");
+	    return result;
+	}
+
+	protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
+            throws IOException {
+        wr.write("ECF=" + exitcodefile + "\n");
+        wr.write("INDEX=0\n");
+        wr.write("for NODE in $LSB_HOSTS; do\n");
+        wr.write("  echo \"N\" >$ECF.$INDEX\n");
+        wr.write("  ssh $NODE /bin/bash -c \\\" \"");
+    }
+	
+	@Override
+	protected void writeScript(Writer wr, String exitcodefile, String stdout,
+	                           String stderr) 
+	throws IOException {
+		Task task = getTask();
+		JobSpecification spec = getSpec();
+		Properties properties = Properties.getProperties();
+        validate(task);
+        writeHeader(wr);
+        
+		wr.write("#BSUB -L /bin/bash\n");
+		wr.write("#BSUB -J " + task.getName() + '\n');
+		wr.write("#BSUB -o " + quote(stdout) + '\n');
+		wr.write("#BSUB -e " + quote(stderr) + '\n');
+		
+		writeNonEmptyAttr("project", "-P", wr);
+		writeNonEmptyAttr("queue", "-q", wr);
+		boolean multiple = writeCountAndPPN(spec, wr);
+
+        // Convert maxwalltime to HH:MM format
+        int hours=0, minutes=0;
+        String walltime = getSpec().getAttribute("maxwalltime").toString();
+        if(walltime != null) {
+        	String walltimeSplit[] = walltime.split(":");
+        	if(walltimeSplit.length == 1) {
+        		minutes = Integer.valueOf(walltimeSplit[0]);
+        	} else if(walltimeSplit.length > 1) {
+        		hours = Integer.valueOf(walltimeSplit[0]);
+        		minutes = Integer.valueOf(walltimeSplit[1]);
+        		// LSF ignores seconds
+        	}
+        	if(minutes >= 60) {
+                hours = minutes / 60;
+                minutes = minutes % 60;
+        	}
+        	wr.write("#BSUB -W " + hours + ":" + minutes + '\n');
+        }
+
+        if (getSpec().getDirectory() != null)
+        	wr.write("#BSUB -cwd " + getSpec().getDirectory() + '\n');
+		
+		for (String name : spec.getEnvironmentVariableNames()) {
+			wr.write("export ");
+			wr.write(name);
+			wr.write('=');
+			wr.write(quote(spec.getEnvironmentVariable(name)));
+			wr.write('\n');
+		}
+
+		String type = (String) spec.getAttribute("jobType");
+		if (logger.isDebugEnabled())
+			logger.debug("Job type: " + type);
+		if ("multiple".equals(type)) 
+		    multiple = true;
+		else if("single".equals(type))
+		    multiple = false;
+		if (multiple)
+            writeMultiJobPreamble(wr, exitcodefile);
+
+		if (type != null) {
+			String wrapper =
+			    properties.getProperty("wrapper." + type);
+			if (logger.isDebugEnabled()) {
+				logger.debug("Wrapper: " + wrapper);
+			}
+			if (wrapper != null) {
+				wrapper = replaceVars(wrapper);
+				wr.write(wrapper);
+				wr.write(' ');
+			}
+			if (logger.isDebugEnabled()) {
+				logger.debug("Wrapper after variable substitution: " + wrapper);
+			}
+		}
+		if (spec.getDirectory() != null) {
+			wr.write("cd " + quote(spec.getDirectory()) + " && ");
+		}
+
+		wr.write(quote(spec.getExecutable()));
+		writeQuotedList(wr, spec.getArgumentsAsList());
+
+ 		// Handle all LSF attributes specified by the user
+	    for(String a : spec.getAttributeNames()) {
+	    	if(a != null && a.startsWith("lsf.")) {
+	    		String attributeName[] = a.split("lsf.");
+	    		wr.write(attributeName[1] + " = " + spec.getAttribute(a) + '\n');
+	    	}
+	    }
+
+		if (spec.getStdInput() != null) {
+            wr.write(" < " + quote(spec.getStdInput()));
+        }
+		
+		if (multiple) {
+		    writeMultiJobPostamble(wr);
+		} else {
+		    wr.write('\n');
+		    wr.write("/bin/echo $? >" + exitcodefile + '\n');
+		}
+		wr.close();
+	}
+	
+	protected void addAttr(String attrName, String option, List<String> l) {
+		addAttr(attrName, option, l, null);
+	}
+
+	protected void addAttr(String attrName, String option, List<String> l, boolean round) {
+		addAttr(attrName, option, l, null, round);
+	}
+
+	protected void addAttr(String attrName, String option, List<String> l, String defval) {
+		addAttr(attrName, option, l, defval, false);
+	}
+
+	protected void addAttr(String attrName, String option, List<String> l,
+			String defval, boolean round) {
+		Object value = getSpec().getAttribute(attrName);
+		if (value != null) {
+			if (round) {
+				value = round(value);
+			}
+			l.add(option);
+			l.add(String.valueOf(value));
+		}
+		else if (defval != null) {
+			l.add(option);
+			l.add(defval);
+		}
+	}
+
+	protected Object round(Object value) {
+		if (value instanceof Number) {
+			return new Integer(((Number) value).intValue());
+		}
+		else {
+			return value;
+		}
+	}
+
+    protected String parseSubmitCommandOutput(String out) throws IOException {
+        if ("".equals(out)) {
+            throw new IOException(getProperties().getSubmitCommandName()
+                    + " returned an empty job ID");
+        }
+        String outArray[] = out.split(" ");
+        String jobString = outArray[1];
+        jobString = jobString.replaceAll("<", "");
+        jobString = jobString.replaceAll(">", "");
+        return jobString;
+    }
+    
+    protected String[] buildCommandLine(File jobdir, File script,
+            String exitcode, String stdout, String stderr)
+    throws IOException {
+
+        writeScript(new BufferedWriter(new FileWriter(script)), exitcode,
+            stdout, stderr);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Wrote " + getName() + " script to " + script);
+        }
+
+        String[] cmdline = { "/bin/bash", "-c", (getProperties().getSubmitCommand() + " < " + script.getAbsolutePath()) };
+        return cmdline;
+    }
+    
+
+	@Override
+	protected String quote(String s) {
+		boolean quotes = false;
+		if (s.indexOf(' ') != -1) {
+			quotes = true;
+		}
+		StringBuffer sb = new StringBuffer();
+		if (quotes) {
+			sb.append('"');
+		}
+		for (int i = 0; i < s.length(); i++) {
+			char c = s.charAt(i);
+			if (c == '"' || c == '\\') {
+				sb.append('\\');
+				break;
+			}
+			sb.append(c);
+		}
+		if (quotes) {
+			sb.append('"');
+		}
+		return sb.toString();
+	}
+
+	@Override
+	protected void cleanup() {
+		super.cleanup();
+		new File(getStdout()).delete();
+		new File(getStderr()).delete();
+	}
+
+	private static AbstractQueuePoller poller;
+
+	@Override
+	protected AbstractQueuePoller getQueuePoller() {
+		synchronized(LSFExecutor.class) {
+			if (poller == null) {
+				poller = new QueuePoller(getProperties());
+				poller.start();
+			}
+			return poller;
+		}
+	}
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java	(revision 3538)
@@ -0,0 +1,49 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+
+public class Properties extends AbstractProperties {
+	private static final long serialVersionUID = 1L;
+	public static final String PROPERTIES = "provider-lsf.properties";
+	
+	
+	public static final String BSUB = "bsub";
+	public static final String BJOBS = "bjobs";
+	public static final String BKILL = "bkill";
+
+	private static Properties properties;
+
+	public static synchronized Properties getProperties() {
+		if (properties == null) {
+			properties = new Properties();
+			properties.load(PROPERTIES);
+		}
+		return properties;
+	}
+
+	protected void setDefaults() {
+		setPollInterval(5);
+		setSubmitCommand("bsub");
+		setPollCommand("bjobs");
+		setRemoveCommand("bkill");
+	}
+
+	public String getPollCommandName() {
+		return BJOBS;
+	}
+
+	public String getRemoveCommandName() {
+		return BKILL;
+	}
+
+	public String getSubmitCommandName() {
+		return BSUB;
+	}
+	
+	
+}
Index: modules/provider-localscheduler/etc/provider-lsf.properties
===================================================================
--- modules/provider-localscheduler/etc/provider-lsf.properties	(revision 0)
+++ modules/provider-localscheduler/etc/provider-lsf.properties	(revision 3538)
@@ -0,0 +1,21 @@
+#
+# The interval, in seconds, at which the provider will poll the Cobalt
+# queue for status updates. There is at most one poll thread per JVM,
+# which is shared by all the jobs submitted through the Cobalt provider.
+#
+poll.interval=5
+
+#
+# The path to bsub. The default assumes that bsub is in PATH
+#
+bsub=bsub
+
+#
+# The path to bjobs. The default assumes that bjobs is in PATH
+#
+bjobs=bjobs
+
+# 
+# The path to bkill. The default assumes that bkill is in PATH
+#
+bkill=bkill



More information about the Swift-commit mailing list