[Swift-commit] cog r3498

swift at ci.uchicago.edu swift at ci.uchicago.edu
Wed Nov 7 15:10:09 CST 2012


------------------------------------------------------------------------
r3498 | davidkelly999 | 2012-11-07 15:05:14 -0600 (Wed, 07 Nov 2012) | 2 lines

Initial commit of LSF provider

------------------------------------------------------------------------
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/QueuePoller.java	(revision 3498)
@@ -0,0 +1,175 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+
+public class QueuePoller extends AbstractQueuePoller {
+	public static final Logger logger = Logger.getLogger(QueuePoller.class);
+	public static final int FULL_LIST_THRESHOLD = 16;
+
+	private Set processed;
+
+	public QueuePoller(AbstractProperties properties) {
+		super("PBS provider queue poller", properties);
+		processed = new HashSet();
+	}
+
+	private static String[] CMDARRAY;
+
+	protected synchronized String[] getCMDArray() {
+	    if (getJobs().size() <= FULL_LIST_THRESHOLD) {
+	        String[] cmda = new String[2 + getJobs().size()];
+	        cmda[0] = getProperties().getPollCommand();
+	        cmda[1] = "-f";
+	        int i = 2;
+	        for (Job j : getJobs().values()) {
+	            cmda[i++] = j.getJobID();
+	        }
+	        return cmda;
+	    }
+	    else {
+	        if (CMDARRAY == null) {
+	            CMDARRAY = new String[] { getProperties().getPollCommand(), "-f" };
+	        }
+	    }
+		return CMDARRAY;
+	}
+
+	@Override
+    protected int getError(int ec, String stderr) {
+	    if (ec != 0) {
+    	    BufferedReader sr = new BufferedReader(new StringReader(stderr));
+    	    try {
+        	    String line = sr.readLine();
+        	    while (line != null) {
+        	        if (!line.contains("Unknown Job Id")) {
+        	            return ec;
+        	        }
+        	        line = sr.readLine();
+        	    }
+    	    }
+    	    catch (IOException e) {
+    	        // should not occur while reading from a string reader
+    	        e.printStackTrace();
+    	    }
+    	    return 0;
+	    }
+	    else {
+	        return ec;
+	    }
+    }
+
+    protected void processStdout(InputStream is) throws IOException {
+		BufferedReader br = new BufferedReader(new InputStreamReader(is));
+		processed.clear();
+		String line;
+		String currentJobID = null;
+		Job currentJob = null;
+		do {
+			line = br.readLine();
+			if (line != null) {
+				try {
+					line = line.trim();
+					if (line.startsWith("Job Id: ")) {
+						currentJobID = line.substring("Job Id: ".length());
+						processed.add(currentJobID);
+						currentJob = getJob(currentJobID);
+						continue;
+					}
+					if (currentJob != null) {
+						if (line.startsWith("job_state = ")) {
+						    if (logger.isDebugEnabled()) {
+						        logger.debug("Status line: " + line);
+						    }
+							switch (line.substring("job_state = ".length())
+									.charAt(0)) {
+								case 'Q': {
+									if (logger.isDebugEnabled()) {
+										logger.debug("Status for "
+												+ currentJobID + " is Q");
+									}
+									currentJob.setState(Job.STATE_QUEUED);
+									break;
+								}
+								case 'R': {
+									if (logger.isDebugEnabled()) {
+										logger.debug("Status for "
+												+ currentJobID + " is R");
+									}
+									currentJob.setState(Job.STATE_RUNNING);
+									break;
+								}
+								case 'C': {
+									// for sites where keep_completed is there,
+									// don't wait
+									// for the job to be removed from the queue
+									if (logger.isDebugEnabled()) {
+										logger.debug("Status for "
+												+ currentJobID + " is C");
+									}
+									addDoneJob(currentJob.getJobID());
+									break;
+								}
+							}
+						}
+						else if (line.startsWith("exit_status = ")) {
+							try {
+								int ec = Integer.parseInt(line.substring(
+										"exit_status = ".length()).trim());
+								currentJob.setExitcode(ec);
+							}
+							catch (Exception e) {
+								if (logger.isDebugEnabled()) {
+									logger.debug("Could not parse exit_status",
+											e);
+								}
+							}
+						}
+					}
+				}
+				catch (Exception e) {
+					logger.warn("Exception caught while handling "
+							+ getProperties().getPollCommandName()
+							+ " output: " + line, e);
+				}
+			}
+		} while (line != null);
+		Iterator i = getJobs().entrySet().iterator();
+		while (i.hasNext()) {
+			Map.Entry e = (Map.Entry) i.next();
+			String id = (String) e.getKey();
+			if (!processed.contains(id)) {
+				Job job = (Job) e.getValue();
+				if (logger.isDebugEnabled()) {
+					logger.debug("Status for " + id + " is Done");
+				}
+				job.setState(Job.STATE_DONE);
+				if (job.getState() == Job.STATE_DONE) {
+					addDoneJob(id);
+				}
+			}
+		}
+	}
+
+	protected void processStderr(InputStream is) throws IOException {
+	}
+}
+
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/LSFExecutor.java	(revision 3498)
@@ -0,0 +1,381 @@
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+import java.io.IOException;
+import java.io.Writer;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.execution.WallTime;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractQueuePoller;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.FileLocation;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class LSFExecutor extends AbstractExecutor {
+	public static final Logger logger = Logger.getLogger(LSFExecutor.class);
+
+
+	/**
+	   Number of program invocations
+	 */
+	int count = 1;
+
+	/**
+	   PBS processes-per-node
+	 */
+	int ppn = 1;
+
+	/**
+     PBS mppdepth: number of available threads per node
+	 */
+	int depth = 1;
+
+	/**
+	   Unique number for automatic task names
+	*/
+	private static int unique = 0; 
+	
+	public LSFExecutor(Task task, ProcessListener listener) {
+		super(task, listener);
+	}
+
+	private static NumberFormat IDF = new DecimalFormat("000000");
+	
+	/** 
+	    The job name is limited to 15 characters: 
+		http://doesciencegrid.org/public/pbs/qsub.html
+	 */
+	protected void validate(Task task) {
+		String name = task.getName();
+		if (name == null) {
+		    int i = 0;
+          synchronized(LSFExecutor.class) {
+              i = unique++;
+          }
+          name = "cog-" + IDF.format(i);
+          if (logger.isDebugEnabled()) {
+              logger.debug("PBS name: for: " + task.getIdentity() + 
+                       " is: " + name);
+          }
+		}
+		else if (name.length() > 15) {
+		    task.setName(name.substring(0, 15));
+		}
+	}
+	
+	/** 
+     Write attribute if non-null
+     @throws IOException
+	 */
+	protected void writeAttr(String attrName, String arg, Writer wr)
+	throws IOException {
+		Object value = getSpec().getAttribute(attrName);
+		if (value != null) {
+			wr.write("#PBS " + arg + String.valueOf(value) + '\n');
+		}
+	}
+
+	/** 
+     Write attribute if non-null and non-empty
+     @throws IOException
+	 */
+	protected void writeNonEmptyAttr(String attrName, String arg, 
+	                                 Writer wr)
+	throws IOException {
+		Object value = getSpec().getAttribute(attrName);
+		if (value != null) {
+			String v = String.valueOf(value);
+			if (v.length() > 0 )
+				wr.write("#PBS " + arg + v + '\n');
+		}
+	}
+	
+	protected void writeWallTime(Writer wr) throws IOException {
+		Object walltime = getSpec().getAttribute("maxwalltime");
+		if (walltime != null) {
+			wr.write("#PBS -l walltime="
+					+ WallTime.normalize(walltime.toString(), "pbs-native")
+					+ '\n');
+		}
+	}
+
+	private int parseAndValidateInt(Object obj, String name) {
+	    try {
+	        assert(obj != null);
+	        return Integer.parseInt(obj.toString());
+	    }
+	    catch (NumberFormatException e) {
+	        throw new IllegalArgumentException("Illegal value for " + name + ". Must be an integer.");
+	    }
+	}
+
+	/**
+	   Obtains profile settings regarding job size from
+	   JobSpecification and writes them into the PBS file.
+	   Looks for profiles count, ppn, ppts, and pbs.mpp
+	   count: mandatory, default 1 (number of processes)
+	   depth: default 1 (number of threads per node)
+	   ppn: optional, default 1 (processes per node)
+	   pbs.mpp: output mppwidth/mppnppn instead of nodes/ppn
+	   pbs.properties: extra PBS properties
+	   pbs.resource_list: extra PBS -l line
+
+	   Note that the semantics are different for the pbs.mpp setting:
+	   mppwidth is the total number of cores while nodes is the number
+	   of nodes.
+
+	   http://www.clusterresources.com/torquedocs/2.1jobsubmission.shtml
+	   @return true if this is a multi-core job
+	 */
+	protected boolean writeCountAndPPN(JobSpecification spec,
+	                                   Writer wr)
+	throws IOException {
+	    boolean result = false;
+
+	    Object o;
+
+	    // Number of program invocations
+	    o = getSpec().getAttribute("count");
+	    if (o != null)
+	        count = parseAndValidateInt(o, "count");
+	    if (count != 1)
+	        result = true;
+
+	    o = spec.getAttribute("ppn");
+	    if (o != null)
+	        ppn = parseAndValidateInt(o, "ppn");
+
+          o = spec.getAttribute("depth");
+          if (o != null)
+              depth = parseAndValidateInt(o, "depth");
+
+	    String pbsProperties =
+	        (String) getSpec().getAttribute("lsf.properties");
+
+	    boolean mpp = false;
+      if (spec.getAttribute("lsf.mpp") != null)
+          mpp = true;
+
+	    StringBuilder sb = new StringBuilder(512);
+	    sb.append("#PBS -l ");
+	    if (mpp) {
+	        sb.append("mppwidth=").append(count);
+	        sb.append(",");
+	        sb.append("mppnppn=").append(ppn);
+	        sb.append(",");
+	        sb.append("mppdepth=").append(depth);
+	    }
+	    else {
+	        sb.append("nodes=");
+	        sb.append(count);
+	        sb.append(":");
+	        sb.append("ppn=");
+	        sb.append(ppn);
+	    }
+
+	    if (pbsProperties != null &&
+	        pbsProperties.length() > 0 ) {
+	        sb.append(":");
+	        sb.append(pbsProperties);
+	    }
+
+	    sb.append('\n');
+
+	    wr.write(sb.toString());
+
+	    return result;
+	}
+
+	/*
+	private boolean parseAndValidateBool(Object obj, String name)
+	{
+	    try {
+	        return Boolean.parseBoolean(obj.toString());
+	    }
+	    catch (NumberFormatException e) {
+	        throw new IllegalArgumentException
+	        ("Illegal value for " + name + ". Must be true/false.");
+	    }
+	}
+	*/
+
+	@Override
+	protected void writeScript(Writer wr, String exitcodefile, String stdout,
+	                           String stderr) 
+	throws IOException {
+		Task task = getTask();
+		JobSpecification spec = getSpec();
+		Properties properties = Properties.getProperties();
+
+      getSpec().unpackProviderAttributes();
+
+      validate(task);
+      writeHeader(wr);
+      
+		wr.write("#PBS -S /bin/bash\n");
+		wr.write("#PBS -N " + task.getName() + '\n');
+		wr.write("#PBS -m n\n");
+		writeNonEmptyAttr("project", "-A ", wr);
+		boolean multiple = writeCountAndPPN(spec, wr);
+		writeWallTime(wr);
+		writeNonEmptyAttr("queue", "-q ", wr);
+		wr.write("#PBS -o " + quote(stdout) + '\n');
+		wr.write("#PBS -e " + quote(stderr) + '\n');
+
+		for (String name : spec.getEnvironmentVariableNames()) {
+			// "export" is necessary on the Cray XT5 Crow
+			wr.write("export ");
+			wr.write(name);
+			wr.write('=');
+			wr.write(quote(spec.getEnvironmentVariable(name)));
+			wr.write('\n');
+		}
+
+		if (spec.getEnvironmentVariableNames().size() > 0) {
+		    wr.write("#PBS -v " + makeList(spec.getEnvironmentVariableNames()) + '\n');
+		}
+
+		String resources =
+		    (String) spec.getAttribute("lsf.resource_list");
+		if (resources != null && resources.length() > 0) {
+		    if (logger.isDebugEnabled())
+		        logger.debug("lsf.resource_list: " + resources);
+		    wr.write("#PBS -l " + resources + '\n');
+		}
+
+		// aprun option specifically for Cray Beagle, Franklin
+		boolean aprun = false;
+		if (spec.getAttribute("lsf.aprun") != null)
+		    aprun = true;
+
+		String type = (String) spec.getAttribute("jobType");
+		if (logger.isDebugEnabled())
+			logger.debug("Job type: " + type);
+		if ("multiple".equals(type)) 
+		    multiple = true;
+		else if("single".equals(type))
+		    multiple = false;
+		if (aprun) 
+			multiple = false;
+		if (multiple)
+          writeMultiJobPreamble(wr, exitcodefile);
+
+		if (type != null) {
+			String wrapper =
+			    properties.getProperty("wrapper." + type);
+			if (logger.isDebugEnabled()) {
+				logger.debug("Wrapper: " + wrapper);
+			}
+			if (wrapper != null) {
+				wrapper = replaceVars(wrapper);
+				wr.write(wrapper);
+				wr.write(' ');
+			}
+			if (logger.isDebugEnabled()) {
+				logger.debug("Wrapper after variable substitution: " + wrapper);
+			}
+		}
+		if (spec.getDirectory() != null) {
+			wr.write("cd " + quote(spec.getDirectory()) + " && ");
+		}
+
+		if (aprun)
+		    wr.write("aprun -n " + count + " -N 1 -cc none -d " +
+		             depth + " -F exclusive /bin/sh -c '");
+
+		wr.write(quote(spec.getExecutable()));
+		writeQuotedList(wr, spec.getArgumentsAsList());
+
+		if (aprun)
+          wr.write("'");
+
+		if (spec.getStdInput() != null) {
+          wr.write(" < " + quote(spec.getStdInput()));
+      }
+		if (multiple) {
+		    writeMultiJobPostamble(wr);
+		}
+		else {
+		    wr.write('\n');
+		    wr.write("/bin/echo $? >" + exitcodefile + '\n');
+		}
+		wr.close();
+	}
+
+	void writeHeader(Writer writer)	
+	throws IOException {
+		writer.write("#CoG This script generated by CoG\n");
+		writer.write("#CoG   by class: " + LSFExecutor.class + '\n');
+		writer.write("#CoG   on date: " + new Date() + "\n\n");
+	}
+	
+	
+	private String makeList(Collection<String> names) {
+      StringBuilder sb = new StringBuilder();
+      Iterator<String> i = names.iterator();
+      while (i.hasNext()) {
+      	sb.append(i.next());
+      	if (i.hasNext()) {
+      		sb.append(", ");
+      	}
+      }
+      return sb.toString();
+  }
+
+	protected void writeMultiJobPreamble(Writer wr, String exitcodefile)
+          throws IOException {
+      wr.write("NODES=`cat $PBS_NODEFILE`\n");
+      wr.write("ECF=" + exitcodefile + "\n");
+      wr.write("INDEX=0\n");
+      wr.write("for NODE in $NODES; do\n");
+      wr.write("  echo \"N\" >$ECF.$INDEX\n");
+      wr.write("  ssh $NODE /bin/bash -c \\\" \"");
+  }
+
+
+	@Override
+protected String getName() {
+		return "PBS";
+	}
+
+	@Override
+protected AbstractProperties getProperties() {
+		return Properties.getProperties();
+	}
+
+	@Override
+protected Job createJob(String jobid, String stdout,
+			FileLocation stdOutputLocation, String stderr,
+			FileLocation stdErrorLocation, String exitcode,
+			AbstractExecutor executor) {
+		return new Job(jobid, stdout, stdOutputLocation, stderr,
+				stdErrorLocation, exitcode, executor);
+	}
+
+	private static QueuePoller poller;
+
+	@Override
+protected AbstractQueuePoller getQueuePoller() {
+		synchronized(LSFExecutor.class) {
+			if (poller == null) {
+				poller = new QueuePoller(getProperties());
+				poller.start();
+			}
+			return poller;
+		}
+	}
+}
Index: modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java
===================================================================
--- modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java	(revision 0)
+++ modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/lsf/Properties.java	(revision 3498)
@@ -0,0 +1,59 @@
+package org.globus.cog.abstraction.impl.scheduler.lsf;
+
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+//import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.AbstractProperties;
+
+public class Properties extends AbstractProperties {
+
+  private static final long serialVersionUID = 1L;
+
+  // private static Logger logger = 
+  // Logger.getLogger(Properties.class);
+
+	public static final String PROPERTIES = "provider-lsf.properties";
+	
+	public static final String POLL_INTERVAL = "poll.interval";
+	public static final String QSUB = "qsub";
+	public static final String QSTAT = "qstat";
+	public static final String QDEL = "qdel";
+	public static final String USE_MPPWIDTH = "use.mppwidth";
+
+	private static Properties properties;
+
+	public static synchronized Properties getProperties() {
+		if (properties == null) {
+			properties = new Properties();
+			properties.load(PROPERTIES);
+		}
+		return properties;
+	}
+	
+	protected void setDefaults() {
+		setPollInterval(5);
+		setSubmitCommand("qsub");
+		setPollCommand("qstat");
+		setRemoveCommand("qdel");
+	}
+
+
+	public String getPollCommandName() {
+		return QSTAT;
+	}
+
+
+	public String getRemoveCommandName() {
+		return QDEL;
+	}
+
+
+	public String getSubmitCommandName() {
+		return QSUB;
+	}
+}
+
Index: modules/provider-localscheduler/resources/cog-provider.properties
===================================================================
--- modules/provider-localscheduler/resources/cog-provider.properties	(revision 3497)
+++ modules/provider-localscheduler/resources/cog-provider.properties	(working copy)
@@ -29,3 +29,7 @@
 executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.slurm.execution.TaskHandlerImpl
 securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
 
+provider=lsf
+sandbox=false
+executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.lsf.execution.TaskHandlerImpl
+securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
Index: modules/provider-localscheduler/etc/provider-lsf.properties
===================================================================
--- modules/provider-localscheduler/etc/provider-lsf.properties	(revision 0)
+++ modules/provider-localscheduler/etc/provider-lsf.properties	(revision 3498)
@@ -0,0 +1,33 @@
+#
+# The interval, in seconds, at which the provider will poll the PBS
+# queue for status updates. There is at most one poll thread per JVM,
+# which is shared by all the jobs submitted through the PBS provider.
+#
+poll.interval=5
+
+#
+# The path to qsub.  The default assumes that qsub is in PATH
+#
+qsub=lsf-qsub
+
+#
+# The path to qstat. The default assumes that qstat is in PATH
+#
+qstat=lsf-qstat
+
+#
+# The path to qdel. The default assumes that qdel is in PATH
+#
+qdel=lsf-qdel
+
+# If true, use "#PBS -l mppwidth=" instead of "#PBS -l nodes="
+#  in PBS script
+use.mppwidth=false
+
+# If the jobType attribute is specified, then the PBS provider
+# will look for a property named "wrapper.<jobType>" and prepend
+# that to the executable line in the PBS script. It will also
+# substitute value of attributes in the job specification, using
+# the "$attrName" notation.
+#
+wrapper.mpi=mpirun -np $count



More information about the Swift-commit mailing list