[Swift-commit] Cog update

swift at ci.uchicago.edu swift at ci.uchicago.edu
Sat Apr 28 20:35:03 CDT 2012


------------------------------------------------------------------------
r3385 | hategan | 2012-04-28 20:34:09 -0500 (Sat, 28 Apr 2012) | 1 line

allow getting jobs from the holding queue if the fast queue is empty (instead of waiting for the next plan to move jobs from holding to the fast queue); also, the holding queue must be sorted
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java	(revision 3384)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java	(working copy)
@@ -10,7 +10,7 @@
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,7 +46,7 @@
        Jobs not moved to queued - they may not fit into existing
        blocks
      */
-    private List<Job> holding;
+    private SortedJobSet holding;
 
     /**
        Jobs that either fit into existing Blocks or were enqueued
@@ -79,7 +79,7 @@
 
     private ChannelContext clientChannelContext;
 
-    private boolean done, planning;
+    private boolean done;
 
     private final Metric metric;
 
@@ -100,7 +100,7 @@
     public BlockQueueProcessor(Settings settings) {
         super("Block Queue Processor");
         this.settings = settings;
-        holding = new ArrayList<Job>();
+        holding = new SortedJobSet();
         blocks = new TreeMap<String, Block>();
         tl = new HashMap<Integer, List<Job>>();
         id = DDF.format(new Date());
@@ -109,7 +109,9 @@
         queued = new SortedJobSet(metric);
         running = new JobSet(metric);
         rlogger = new RemoteLogger();
-        logger.info("Starting... id=" + id);
+        if (logger.isInfoEnabled()) {
+            logger.info("Starting... id=" + id);
+        }
     }
 
     public Metric getMetric() {
@@ -122,12 +124,16 @@
             script = ScriptManager.writeScript();
             int planTimeMillis = 1;
             while (!done) {
-                logger.debug("Holding queue job count: " +
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Holding queue job count: " +
                              holding.size());
+                }
                 planTimeMillis = updatePlan();
                 double planTime = ((double) planTimeMillis) / 1000;
-                logger.debug("Planning time (seconds): " +
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Planning time (seconds): " +
                              SECONDS_3.format(planTime));
+                }
                 if (holding.size() + incoming.size() == 0) {
                     planTimeMillis = 100;
                 }
@@ -159,18 +165,13 @@
     }
 
     public void enqueue1(Task t) {
-        synchronized (incoming) {
-            Job j = new Job(t);
-            if (checkJob(j)) {
+        Job j = new Job(t);
+        if (checkJob(j)) {
+            synchronized (incoming) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Got job with walltime = " + j.getMaxWallTime());
                 }
-                if (planning) {
-                    incoming.add(j);
-                }
-                else {
-                    queue(j);
-                }
+                incoming.add(j);
             }
         }
     }
@@ -235,7 +236,9 @@
                 allocsize += b.sizeLeft();
             }
             if (allocsize != lastAllocSize) {
-                logger.debug("Updated allocsize: " + allocsize);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Updated allocsize: " + allocsize);
+                }
             }
             lastAllocSize = allocsize;
         }
@@ -259,28 +262,34 @@
         b.start();
     }
 
-    private Set<Job> queueToExistingBlocks() {
+    private void queueToExistingBlocks() {
+        List<Job> remove = new ArrayList<Job>();
         double runningSize = getRunningSizeLeft();
-        Set<Job> remove = new HashSet<Job>();
+        int count = 0;
         for (Job j : holding) {
             if (allocsize - queued.getJSize() - runningSize > metric.getSize(j) && fits(j)) {
                 queue(j);
                 remove.add(j);
+                count++;
             }
         }
-        if (remove.size() > 0) {
-            logger.debug("Queued " + remove.size() + " jobs to existing blocks");
+        if (count > 0) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Queued " + count + " jobs to existing blocks");
+            }
         }
-        return remove;
+        holding.removeAll(remove);
     }
 
     private void requeueNonFitting() {
         int count = 0;
         double runningSize = getRunningSizeLeft();
-        logger.debug("allocsize = " + allocsize +
+        if (logger.isDebugEnabled()) {
+            logger.debug("allocsize = " + allocsize +
                      ", queuedsize = " + queued.getJSize() +
                      ", running = " + runningSize +
                      ", qsz = " + queued.size());
+        }
         while (allocsize - queued.getJSize() - runningSize < 0) {
             Job j = queued.removeOne(TimeInterval.FOREVER,
                                      Integer.MAX_VALUE);
@@ -297,7 +306,9 @@
             count++;
         }
         if (count > 0) {
-            logger.info("Requeued " + count + " non-fitting jobs");
+            if (logger.isInfoEnabled()) {
+                logger.info("Requeued " + count + " non-fitting jobs");
+            }
         }
     }
     
@@ -343,9 +354,13 @@
         if (sz > 0) {
             if (sz < 1)
                 sz = 1;
-            logger.info("Jobs in holding queue: " + holding.size());
+            if (logger.isInfoEnabled()) {
+                logger.info("Jobs in holding queue: " + holding.size());
+            }
             String s = SECONDS.format(sz);
-            logger.info("Time estimate for holding queue (seconds): " + s);
+            if (logger.isInfoEnabled()) {
+                logger.info("Time estimate for holding queue (seconds): " + s);
+            }
         }
         return (int) sz;
     }
@@ -433,9 +448,8 @@
        Move Jobs from {@link holding} to {@link queued} by
        allocating new Blocks for them
      */
-    private Set<Job> allocateBlocks(double tsum) {
-        Set<Job> remove = new HashSet<Job>();
-
+    private void allocateBlocks(double tsum) {
+        List<Job> remove = new ArrayList<Job>();
         // Calculate chunkOfBlocks: how many blocks we may allocate
         //     in this particular call to allocateBlocks()
         int maxBlocks = settings.getMaxBlocks();
@@ -445,10 +459,14 @@
 
         // Last job queued to last new Block
         int last = 0;
+        Iterator<Job> lastI = holding.iterator();
         // Index through holding queue
         int index = 0;
+        Iterator<Job> indexI = holding.iterator();
         // Number of new Blocks allocated in this call
         int newBlocks = 0;
+        
+        int added = 0;
 
         // get the size (w * h) for the current block by 
         // dividing the total required size (tsum) by the number
@@ -465,10 +483,13 @@
         double size = metric.blockSize(newBlocks, chunkOfBlocks, tsum);
 
         String s = SECONDS.format(tsum);
-        logger.info("Allocating blocks for a total walltime of: " + s + "s");
+        if (logger.isInfoEnabled()) {
+            logger.info("Allocating blocks for a total walltime of: " + s + "s");
+        }
 
-        while (index <= holding.size() && newBlocks < chunkOfBlocks) {
-            // Job job = holding.get(i);
+        while (indexI.hasNext() && newBlocks < chunkOfBlocks) {
+            Job job = indexI.next();
+            
             int granularity = settings.getNodeGranularity() * settings.getJobsPerNode();
             // true if the number of jobs for this block is a multiple
             // of granularity
@@ -487,8 +508,8 @@
                 int msz = (int) size;
                 // jobs are sorted on walltime, and the last job is the longest,
                 // so use that for calculating the overallocation
-                int lastwalltime = (int) holding.get(index).getMaxWallTime().getSeconds();
-                int h = overallocatedSize(holding.get(index));
+                int lastwalltime = (int) job.getMaxWallTime().getSeconds();
+                int h = overallocatedSize(job);
                 
                 // the maximum time is a hard limit, so for the maximum useable time
                 // the reserve needs to be subtracted
@@ -519,7 +540,7 @@
                 int r = (index - last) % width;
 
                 if (logger.isInfoEnabled()) {
-                	logger.info("\t Considering: " + holding.get(index));
+                	logger.info("\t Considering: " + job);
                 	logger.info("\t  Max Walltime (seconds):   " + lastwalltime);
                     logger.info("\t  Time estimate (seconds):  " + h);
                     logger.info("\t  Total for this new Block (est. seconds): " +
@@ -533,8 +554,6 @@
                 // |xxxxx|     |xxxxx|
                 // |xxxxx|     |xxxxx|
                 // +-----+     +-----+
-                // This result is unused and overwritten below. -Justin
-                index += (width - r);
                 
                 
                 if (r != 0) {
@@ -552,9 +571,11 @@
                 // now add jobs from holding until the size of the jobs exceeds the
                 // size of the block (as given by the metric)
                 int ii = last;
-                while (ii < holding.size() && sumSizes(last, ii + 1) <= metric.size(width, h)) {
-                    queue(holding.get(ii));
-                    remove.add(holding.get(ii));
+                while (lastI.hasNext() && sumSizes(last, ii + 1) <= metric.size(width, h)) {
+                    Job j = lastI.next();
+                    queue(j);
+                    remove.add(j);
+                    added++;
                     ii++;
                 }
                 
@@ -563,6 +584,11 @@
                                 " jobs to new Block");
                 }
                 // update index of last added job
+                // skip ii - index - 1 jobs since the iterator and "index" are off by one
+                // since index only gets updated at the end of the loop
+                for (int i = 0; i < ii - index - 1; i++) {
+                    indexI.next();
+                }
                 index = ii - 1;
                 // commit the block
                 addBlock(b);
@@ -573,18 +599,10 @@
             }
             index++;
         }
-        if (remove.size() > 0) {
-            logger.info("Added " + remove.size() + " jobs to new blocks");
-        }
-        return remove;
-    }
-
-    private void removeJobs(Set<Job> r) {
-        List<Job> old = holding;
-        holding = new ArrayList<Job>(holding.size());
-        for (Job j : old) {
-            if (!r.contains(j)) {
-                holding.add(j);
+        holding.removeAll(remove);
+        if (added > 0) {
+            if (logger.isInfoEnabled()) {
+                logger.info("Added " + added + " jobs to new blocks");
             }
         }
     }
@@ -592,8 +610,11 @@
     private boolean first = true;
 
     private void updateSettings() throws PlanningException {
+        if (!first) {
+            return;
+        }
         if (!holding.isEmpty()) {
-            Job j = holding.get(0);
+            Job j = holding.iterator().next();
             Task t = j.getTask();
             ExecutionService p = (ExecutionService) t.getService(0);
             settings.setServiceContact(p.getServiceContact());
@@ -621,7 +642,9 @@
                 }
             }
             if (first) {
-                logger.info("\n" + settings.toString());
+                if (logger.isInfoEnabled()) {
+                    logger.info("\n" + settings.toString());
+                }
                 first = false;
             }
         }
@@ -631,7 +654,9 @@
         synchronized (incoming) {
             holding.addAll(incoming);
             if (incoming.size() > 0) {
-                logger.info("Committed " + incoming.size() + " new jobs");
+                if (logger.isInfoEnabled()) {
+                    logger.info("Committed " + incoming.size() + " new jobs");
+                }
             }
             incoming.clear();
         }
@@ -651,65 +676,69 @@
             }
         }
     }
-
+    
     /**
       @return Time consumed in milliseconds
      */
     public int updatePlan() throws PlanningException {
         Set<Job> tmp;
 
-        synchronized (incoming) {
-            planning = true;
-        }
         long start = System.currentTimeMillis();
 
-        // Move all incoming Jobs to holding
-        commitNewJobs();
-
-        // Shutdown Blocks that are done
-        cleanDoneBlocks();
-
-        // Subtract elapsed time from existing allocation
-        updateAllocatedSize();
-
-        // Move jobs that fit from holding to queued
-        tmp = queueToExistingBlocks();
-
-        // Subtract these Jobs from holding
-        removeJobs(tmp);
-
-        // int jss = jobs.size();
-        // If queued has too many Jobs, move some back to holding
-        requeueNonFitting();
-
-        updateSettings();
-
-        computeSums();
-
-        tsum = computeTotalRequestSize();
-
-        if (tsum == 0) {
-            removeIdleBlocks();
+        synchronized(holding) {
+            // Move all incoming Jobs to holding
+            commitNewJobs();
+    
+            // Shutdown Blocks that are done
+            cleanDoneBlocks();
+    
+            // Subtract elapsed time from existing allocation
+            updateAllocatedSize();
+        
+            // Move jobs that fit from holding to queued
+            queueToExistingBlocks();
+    
+            // int jss = jobs.size();
+            // If queued has too many Jobs, move some back to holding
+            requeueNonFitting();
+    
+            updateSettings();
+    
+            computeSums();
+    
+            tsum = computeTotalRequestSize();
+    
+            if (tsum == 0) {
+                removeIdleBlocks();
+            }
+            else {
+                allocateBlocks(tsum);
+            }
         }
-        else {
-            tmp = allocateBlocks(tsum);
-            removeJobs(tmp);
-        }
 
         updateMonitor();
-        synchronized (incoming) {
-            planning = false;
-        }
+        
         return (int) (System.currentTimeMillis() - start);
     }
 
     public Job request(TimeInterval ti, int cpus) {
         Job job = queued.removeOne(ti, cpus);
+        if (job == null) {
+            synchronized(holding) {
+                job = holding.removeOne(ti, cpus);
+            }
+        }
+        
         if (job != null) {
             synchronized(running) {
                 running.add(job);
             }
         }
+        else {
+            if (logger.isDebugEnabled()) {
+                logger.debug("request - no job " + ti + ", " + cpus);
+            }
+        }
         return job;
     }
 
@@ -799,7 +828,7 @@
     }
 
     public List<Job> getJobs() {
-        return holding;
+        return holding.getAll();
     }
 
     public SortedJobSet getQueued() {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/SortedJobSet.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/SortedJobSet.java	(revision 3384)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/SortedJobSet.java	(working copy)
@@ -9,9 +9,12 @@
  */
 package org.globus.cog.abstraction.coaster.service.job.manager;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -44,7 +47,10 @@
     public SortedJobSet(SortedJobSet other) {
         metric = other.metric;
         synchronized(other) {
-            sm = new TreeMap<TimeInterval, LinkedList<Job>>(other.sm);
+            sm = new TreeMap<TimeInterval, LinkedList<Job>>();
+            for (Map.Entry<TimeInterval, LinkedList<Job>> e : other.sm.entrySet()) {
+                sm.put(e.getKey(), new LinkedList<Job>(e.getValue()));
+            }
             jsize = other.jsize;
             size = other.size;
         }
@@ -53,8 +59,36 @@
     public int size() {
         return size;
     }
+    
+    public boolean isEmpty() {
+        return size == 0;
+    }
+    
+    public List<Job> getAll() {
+        List<Job> l = new ArrayList<Job>();
+        for (List<Job> s : sm.values()) {
+            l.addAll(s);
+        }
+        return l;
+    }
+    
+    public void addAll(Collection<Job> c) {
+        for (Job j : c) {
+            _add(j);
+        }
+    }
+    
+    public void removeAll(Collection<Job> c) {
+        for (Job j : c) {
+            remove(j);
+        }
+    }
 
-    public synchronized void add(Job j) {
+    public void add(Job j) {
+        _add(j);
+    }
+    
+    private void _add(Job j) {
         LinkedList<Job> l = sm.get(j.getMaxWallTime());
         if (l == null) {
             l = new LinkedList<Job>();
@@ -65,6 +99,12 @@
         size++;
         seq++;
     }
+    
+    public void remove(Job job) {
+        LinkedList<Job> l = sm.get(job.getMaxWallTime());
+        l.remove(job);
+        size--;
+    }
 
     /**
        Remove and return largest job with a walltime smaller than the
@@ -123,6 +163,9 @@
             }
 
             public Job next() {
+                if (it2 == null) {
+                    throw new NoSuchElementException();
+                }
                 if (it2.hasNext()) {
                     return it2.next();
                 }
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(revision 3384)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(working copy)
@@ -102,7 +102,9 @@
     }
 
     private void sleep() {
-    	logger.debug(block.getId() + ":" + getId() + " sleeping");
+        if (logger.isDebugEnabled()) {
+            logger.debug(block.getId() + ":" + getId() + " sleeping");
+        }
         sleep(this);
     }
 
@@ -169,12 +171,7 @@
                     success = launch(running);
                 }
                 else {
-                    if (block.getAllocationProcessor().getQueued().size() == 0) {
-                        sleep();
-                    }
-                    else {
-                        sleep();
-                    }
+                    sleep();
                 }
             }
             else {
@@ -292,7 +289,7 @@
         }
     }
 
-    public synchronized Job getRunning() {
+    public Job getRunning() {
         return running;
     }
 
@@ -351,7 +348,9 @@
     }
 
      public synchronized void statusChanged(StatusEvent event) {
-         logger.debug(event);
+         if (logger.isDebugEnabled()) {
+             logger.debug(event);
+         }
          if (event.getStatus().isTerminal()) {
              running.getTask().removeStatusListener(this);
              running.setEndTime(Time.now());



More information about the Swift-commit mailing list