[Swift-commit] Cog update
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Sat Apr 28 20:35:03 CDT 2012
------------------------------------------------------------------------
r3385 | hategan | 2012-04-28 20:34:09 -0500 (Sat, 28 Apr 2012) | 1 line
allow getting jobs from the holding queue if the fast queue is empty (instead of waiting for the next plan to move jobs from holding to the fast queue); also, the holding queue must be sorted
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java (revision 3384)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java (working copy)
@@ -10,7 +10,7 @@
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,7 +46,7 @@
Jobs not moved to queued - they may not fit into existing
blocks
*/
- private List<Job> holding;
+ private SortedJobSet holding;
/**
Jobs that either fit into existing Blocks or were enqueued
@@ -79,7 +79,7 @@
private ChannelContext clientChannelContext;
- private boolean done, planning;
+ private boolean done;
private final Metric metric;
@@ -100,7 +100,7 @@
public BlockQueueProcessor(Settings settings) {
super("Block Queue Processor");
this.settings = settings;
- holding = new ArrayList<Job>();
+ holding = new SortedJobSet();
blocks = new TreeMap<String, Block>();
tl = new HashMap<Integer, List<Job>>();
id = DDF.format(new Date());
@@ -109,7 +109,9 @@
queued = new SortedJobSet(metric);
running = new JobSet(metric);
rlogger = new RemoteLogger();
- logger.info("Starting... id=" + id);
+ if (logger.isInfoEnabled()) {
+ logger.info("Starting... id=" + id);
+ }
}
public Metric getMetric() {
@@ -122,12 +124,16 @@
script = ScriptManager.writeScript();
int planTimeMillis = 1;
while (!done) {
- logger.debug("Holding queue job count: " +
+ if (logger.isDebugEnabled()) {
+ logger.debug("Holding queue job count: " +
holding.size());
+ }
planTimeMillis = updatePlan();
double planTime = ((double) planTimeMillis) / 1000;
- logger.debug("Planning time (seconds): " +
+ if (logger.isDebugEnabled()) {
+ logger.debug("Planning time (seconds): " +
SECONDS_3.format(planTime));
+ }
if (holding.size() + incoming.size() == 0) {
planTimeMillis = 100;
}
@@ -159,18 +165,13 @@
}
public void enqueue1(Task t) {
- synchronized (incoming) {
- Job j = new Job(t);
- if (checkJob(j)) {
+ Job j = new Job(t);
+ if (checkJob(j)) {
+ synchronized (incoming) {
if (logger.isDebugEnabled()) {
logger.debug("Got job with walltime = " + j.getMaxWallTime());
}
- if (planning) {
- incoming.add(j);
- }
- else {
- queue(j);
- }
+ incoming.add(j);
}
}
}
@@ -235,7 +236,9 @@
allocsize += b.sizeLeft();
}
if (allocsize != lastAllocSize) {
- logger.debug("Updated allocsize: " + allocsize);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Updated allocsize: " + allocsize);
+ }
}
lastAllocSize = allocsize;
}
@@ -259,28 +262,34 @@
b.start();
}
- private Set<Job> queueToExistingBlocks() {
+ private void queueToExistingBlocks() {
+ List<Job> remove = new ArrayList<Job>();
double runningSize = getRunningSizeLeft();
- Set<Job> remove = new HashSet<Job>();
+ int count = 0;
for (Job j : holding) {
if (allocsize - queued.getJSize() - runningSize > metric.getSize(j) && fits(j)) {
queue(j);
remove.add(j);
+ count++;
}
}
- if (remove.size() > 0) {
- logger.debug("Queued " + remove.size() + " jobs to existing blocks");
+ if (count > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Queued " + count + " jobs to existing blocks");
+ }
}
- return remove;
+ holding.removeAll(remove);
}
private void requeueNonFitting() {
int count = 0;
double runningSize = getRunningSizeLeft();
- logger.debug("allocsize = " + allocsize +
+ if (logger.isDebugEnabled()) {
+ logger.debug("allocsize = " + allocsize +
", queuedsize = " + queued.getJSize() +
", running = " + runningSize +
", qsz = " + queued.size());
+ }
while (allocsize - queued.getJSize() - runningSize < 0) {
Job j = queued.removeOne(TimeInterval.FOREVER,
Integer.MAX_VALUE);
@@ -297,7 +306,9 @@
count++;
}
if (count > 0) {
- logger.info("Requeued " + count + " non-fitting jobs");
+ if (logger.isInfoEnabled()) {
+ logger.info("Requeued " + count + " non-fitting jobs");
+ }
}
}
@@ -343,9 +354,13 @@
if (sz > 0) {
if (sz < 1)
sz = 1;
- logger.info("Jobs in holding queue: " + holding.size());
+ if (logger.isInfoEnabled()) {
+ logger.info("Jobs in holding queue: " + holding.size());
+ }
String s = SECONDS.format(sz);
- logger.info("Time estimate for holding queue (seconds): " + s);
+ if (logger.isInfoEnabled()) {
+ logger.info("Time estimate for holding queue (seconds): " + s);
+ }
}
return (int) sz;
}
@@ -433,9 +448,8 @@
Move Jobs from {@link holding} to {@link queued} by
allocating new Blocks for them
*/
- private Set<Job> allocateBlocks(double tsum) {
- Set<Job> remove = new HashSet<Job>();
-
+ private void allocateBlocks(double tsum) {
+ List<Job> remove = new ArrayList<Job>();
// Calculate chunkOfBlocks: how many blocks we may allocate
// in this particular call to allocateBlocks()
int maxBlocks = settings.getMaxBlocks();
@@ -445,10 +459,14 @@
// Last job queued to last new Block
int last = 0;
+ Iterator<Job> lastI = holding.iterator();
// Index through holding queue
int index = 0;
+ Iterator<Job> indexI = holding.iterator();
// Number of new Blocks allocated in this call
int newBlocks = 0;
+
+ int added = 0;
// get the size (w * h) for the current block by
// dividing the total required size (tsum) by the number
@@ -465,10 +483,13 @@
double size = metric.blockSize(newBlocks, chunkOfBlocks, tsum);
String s = SECONDS.format(tsum);
- logger.info("Allocating blocks for a total walltime of: " + s + "s");
+ if (logger.isInfoEnabled()) {
+ logger.info("Allocating blocks for a total walltime of: " + s + "s");
+ }
- while (index <= holding.size() && newBlocks < chunkOfBlocks) {
- // Job job = holding.get(i);
+ while (indexI.hasNext() && newBlocks < chunkOfBlocks) {
+ Job job = indexI.next();
+
int granularity = settings.getNodeGranularity() * settings.getJobsPerNode();
// true if the number of jobs for this block is a multiple
// of granularity
@@ -487,8 +508,8 @@
int msz = (int) size;
// jobs are sorted on walltime, and the last job is the longest,
// so use that for calculating the overallocation
- int lastwalltime = (int) holding.get(index).getMaxWallTime().getSeconds();
- int h = overallocatedSize(holding.get(index));
+ int lastwalltime = (int) job.getMaxWallTime().getSeconds();
+ int h = overallocatedSize(job);
// the maximum time is a hard limit, so for the maximum useable time
// the reserve needs to be subtracted
@@ -519,7 +540,7 @@
int r = (index - last) % width;
if (logger.isInfoEnabled()) {
- logger.info("\t Considering: " + holding.get(index));
+ logger.info("\t Considering: " + job);
logger.info("\t Max Walltime (seconds): " + lastwalltime);
logger.info("\t Time estimate (seconds): " + h);
logger.info("\t Total for this new Block (est. seconds): " +
@@ -533,8 +554,6 @@
// |xxxxx| |xxxxx|
// |xxxxx| |xxxxx|
// +-----+ +-----+
- // This result is unused and overwritten below. -Justin
- index += (width - r);
if (r != 0) {
@@ -552,9 +571,11 @@
// now add jobs from holding until the size of the jobs exceeds the
// size of the block (as given by the metric)
int ii = last;
- while (ii < holding.size() && sumSizes(last, ii + 1) <= metric.size(width, h)) {
- queue(holding.get(ii));
- remove.add(holding.get(ii));
+ while (lastI.hasNext() && sumSizes(last, ii + 1) <= metric.size(width, h)) {
+ Job j = lastI.next();
+ queue(j);
+ remove.add(j);
+ added++;
ii++;
}
@@ -563,6 +584,11 @@
" jobs to new Block");
}
// update index of last added job
+ // skip ii - index - 1 jobs since the iterator and "index" are off by one
+ // since index only gets updated at the end of the loop
+ for (int i = 0; i < ii - index - 1; i++) {
+ indexI.next();
+ }
index = ii - 1;
// commit the block
addBlock(b);
@@ -573,18 +599,10 @@
}
index++;
}
- if (remove.size() > 0) {
- logger.info("Added " + remove.size() + " jobs to new blocks");
- }
- return remove;
- }
-
- private void removeJobs(Set<Job> r) {
- List<Job> old = holding;
- holding = new ArrayList<Job>(holding.size());
- for (Job j : old) {
- if (!r.contains(j)) {
- holding.add(j);
+ holding.removeAll(remove);
+ if (added > 0) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Added " + added + " jobs to new blocks");
}
}
}
@@ -592,8 +610,11 @@
private boolean first = true;
private void updateSettings() throws PlanningException {
+ if (!first) {
+ return;
+ }
if (!holding.isEmpty()) {
- Job j = holding.get(0);
+ Job j = holding.iterator().next();
Task t = j.getTask();
ExecutionService p = (ExecutionService) t.getService(0);
settings.setServiceContact(p.getServiceContact());
@@ -621,7 +642,9 @@
}
}
if (first) {
- logger.info("\n" + settings.toString());
+ if (logger.isInfoEnabled()) {
+ logger.info("\n" + settings.toString());
+ }
first = false;
}
}
@@ -631,7 +654,9 @@
synchronized (incoming) {
holding.addAll(incoming);
if (incoming.size() > 0) {
- logger.info("Committed " + incoming.size() + " new jobs");
+ if (logger.isInfoEnabled()) {
+ logger.info("Committed " + incoming.size() + " new jobs");
+ }
}
incoming.clear();
}
@@ -651,65 +676,69 @@
}
}
}
-
+
/**
@return Time consumed in milliseconds
*/
public int updatePlan() throws PlanningException {
Set<Job> tmp;
- synchronized (incoming) {
- planning = true;
- }
long start = System.currentTimeMillis();
- // Move all incoming Jobs to holding
- commitNewJobs();
-
- // Shutdown Blocks that are done
- cleanDoneBlocks();
-
- // Subtract elapsed time from existing allocation
- updateAllocatedSize();
-
- // Move jobs that fit from holding to queued
- tmp = queueToExistingBlocks();
-
- // Subtract these Jobs from holding
- removeJobs(tmp);
-
- // int jss = jobs.size();
- // If queued has too many Jobs, move some back to holding
- requeueNonFitting();
-
- updateSettings();
-
- computeSums();
-
- tsum = computeTotalRequestSize();
-
- if (tsum == 0) {
- removeIdleBlocks();
+ synchronized(holding) {
+ // Move all incoming Jobs to holding
+ commitNewJobs();
+
+ // Shutdown Blocks that are done
+ cleanDoneBlocks();
+
+ // Subtract elapsed time from existing allocation
+ updateAllocatedSize();
+
+ // Move jobs that fit from holding to queued
+ queueToExistingBlocks();
+
+ // int jss = jobs.size();
+ // If queued has too many Jobs, move some back to holding
+ requeueNonFitting();
+
+ updateSettings();
+
+ computeSums();
+
+ tsum = computeTotalRequestSize();
+
+ if (tsum == 0) {
+ removeIdleBlocks();
+ }
+ else {
+ allocateBlocks(tsum);
+ }
}
- else {
- tmp = allocateBlocks(tsum);
- removeJobs(tmp);
- }
updateMonitor();
- synchronized (incoming) {
- planning = false;
- }
+
return (int) (System.currentTimeMillis() - start);
}
public Job request(TimeInterval ti, int cpus) {
Job job = queued.removeOne(ti, cpus);
+ if (job == null) {
+ synchronized(holding) {
+ job = holding.removeOne(ti, cpus);
+ }
+ }
+
if (job != null) {
synchronized(running) {
running.add(job);
}
}
+ else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("request - no job " + ti + ", " + cpus);
+ }
+ }
return job;
}
@@ -799,7 +828,7 @@
}
public List<Job> getJobs() {
- return holding;
+ return holding.getAll();
}
public SortedJobSet getQueued() {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/SortedJobSet.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/SortedJobSet.java (revision 3384)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/SortedJobSet.java (working copy)
@@ -9,9 +9,12 @@
*/
package org.globus.cog.abstraction.coaster.service.job.manager;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -44,7 +47,10 @@
public SortedJobSet(SortedJobSet other) {
metric = other.metric;
synchronized(other) {
- sm = new TreeMap<TimeInterval, LinkedList<Job>>(other.sm);
+ sm = new TreeMap<TimeInterval, LinkedList<Job>>();
+ for (Map.Entry<TimeInterval, LinkedList<Job>> e : other.sm.entrySet()) {
+ sm.put(e.getKey(), new LinkedList<Job>(e.getValue()));
+ }
jsize = other.jsize;
size = other.size;
}
@@ -53,8 +59,36 @@
public int size() {
return size;
}
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ public List<Job> getAll() {
+ List<Job> l = new ArrayList<Job>();
+ for (List<Job> s : sm.values()) {
+ l.addAll(s);
+ }
+ return l;
+ }
+
+ public void addAll(Collection<Job> c) {
+ for (Job j : c) {
+ _add(j);
+ }
+ }
+
+ public void removeAll(Collection<Job> c) {
+ for (Job j : c) {
+ remove(j);
+ }
+ }
- public synchronized void add(Job j) {
+ public void add(Job j) {
+ _add(j);
+ }
+
+ private void _add(Job j) {
LinkedList<Job> l = sm.get(j.getMaxWallTime());
if (l == null) {
l = new LinkedList<Job>();
@@ -65,6 +99,12 @@
size++;
seq++;
}
+
+ public void remove(Job job) {
+ LinkedList<Job> l = sm.get(job.getMaxWallTime());
+ l.remove(job);
+ size--;
+ }
/**
Remove and return largest job with a walltime smaller than the
@@ -123,6 +163,9 @@
}
public Job next() {
+ if (it2 == null) {
+ throw new NoSuchElementException();
+ }
if (it2.hasNext()) {
return it2.next();
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (revision 3384)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (working copy)
@@ -102,7 +102,9 @@
}
private void sleep() {
- logger.debug(block.getId() + ":" + getId() + " sleeping");
+ if (logger.isDebugEnabled()) {
+ logger.debug(block.getId() + ":" + getId() + " sleeping");
+ }
sleep(this);
}
@@ -169,12 +171,7 @@
success = launch(running);
}
else {
- if (block.getAllocationProcessor().getQueued().size() == 0) {
- sleep();
- }
- else {
- sleep();
- }
+ sleep();
}
}
else {
@@ -292,7 +289,7 @@
}
}
- public synchronized Job getRunning() {
+ public Job getRunning() {
return running;
}
@@ -351,7 +348,9 @@
}
public synchronized void statusChanged(StatusEvent event) {
- logger.debug(event);
+ if (logger.isDebugEnabled()) {
+ logger.debug(event);
+ }
if (event.getStatus().isTerminal()) {
running.getTask().removeStatusListener(this);
running.setEndTime(Time.now());
More information about the Swift-commit
mailing list