[Swift-commit] cog r3793

swift at ci.uchicago.edu swift at ci.uchicago.edu
Fri Sep 20 23:00:06 CDT 2013


------------------------------------------------------------------------
r3793 | hategan | 2013-09-20 22:56:02 -0500 (Fri, 20 Sep 2013) | 1 line

fixed two deadlocks in the coaster service (one causing bug 1006)
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(revision 3792)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(working copy)
@@ -176,11 +176,11 @@
             if (logger.isTraceEnabled()) {
                 logger.trace(block.getId() + ":" + getId() + " pull");
             }
+            if (isShutDown()) {
+                return;
+            }
             synchronized (this) {
                 totalJobCount++;
-                if (shutdown) {
-                    return;
-                }
                 if (!started()) {
                     sleep();
                 }
@@ -240,7 +240,7 @@
         if (timelast == null) {
             CoasterService.error(20, "Timelast is null", new Throwable());
         }
-        block.add(this);
+        block.add(this, timelast);
         submit(running);
         return true;
     }
@@ -293,11 +293,10 @@
     }
 
     public void shutdown() {
+        if (raiseShutdown()) {
+            return;
+        }
         synchronized(this) {
-            if (shutdown) {
-                return;
-            }
-            shutdown = true;
     		Block block = node.getBlock();
             done.clear();
             if (running != null) {
@@ -344,29 +343,48 @@
     public List<Job> getDoneJobs() {
         return done;
     }
+    
+    private Object shutdownLock = new Object();
+    
+    private boolean raiseShutdown() {
+        synchronized(shutdownLock) {
+            if (shutdown) {
+                return true;
+            }
+            shutdown = true;
+            return false;
+        }
+    }
+    
+    private boolean isShutDown() {
+        synchronized(shutdownLock) {
+            return shutdown;
+        }
+    }
 
-    public synchronized void taskFailed(String msg, Exception e) {
-        if (shutdown) {
+    public void taskFailed(String msg, Exception e) {
+        if (raiseShutdown()) {
             return;
         }
-		shutdown = true;
-        if (running == null) {
-            if (starttime == null) {
-                starttime = Time.now();
+        synchronized(this) {
+            if (running == null) {
+                if (starttime == null) {
+                    starttime = Time.now();
+                }
+                if (endtime == null) {
+                    endtime = starttime.add(block.getWalltime());
+                }
+                TimeInterval time = endtime.subtract(Time.now());
+                int cpus = 1 + getPullThread(node.getBlock()).sleepers();
+                running = bqp.request(time, cpus);
+                // no listener is added to this task, so make sure
+                // it won't linger in the BQP running set
+                bqp.jobTerminated(running);
             }
-            if (endtime == null) {
-                endtime = starttime.add(block.getWalltime());
+            if (running != null) {
+                running.fail("Block task failed: " + msg, e);
             }
-            TimeInterval time = endtime.subtract(Time.now());
-            int cpus = 1 + getPullThread(node.getBlock()).sleepers();
-            running = bqp.request(time, cpus);
-            // no listener is added to this task, so make sure
-            // it won't linger in the BQP running set
-            bqp.jobTerminated(running);
         }
-        if (running != null) {
-            running.fail("Block task failed: " + msg, e);
-        }
     }
 
     public void errorReceived(Command cmd, String msg, Exception t) {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java	(revision 3792)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java	(working copy)
@@ -14,9 +14,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.SortedSet;
+import java.util.SortedMap;
 import java.util.TimerTask;
-import java.util.TreeSet;
+import java.util.TreeMap;
 
 import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.coaster.service.CoasterService;
@@ -48,7 +48,7 @@
     private int workers, activeWorkers;
     private TimeInterval walltime;
     private Time endtime, starttime, deadline, creationtime;
-    private final SortedSet<Cpu> scpus;
+    private final SortedMap<Cpu, Time> scpus;
     private final List<Cpu> cpus;
     private final List<Node> nodes;
     private boolean running = false, failed, shutdown, suspended;
@@ -70,7 +70,7 @@
 
     public Block(String id) {
         this.id = id;
-        scpus = new TreeSet<Cpu>();
+        scpus = new TreeMap<Cpu, Time>();
         cpus = new ArrayList<Cpu>();
         nodes = new ArrayList<Node>();
     }
@@ -164,15 +164,19 @@
             // if the simple tests before fail try to see if there is a
             // cpu that can specifically fit this job.
             synchronized (cpus) {
-                for (Cpu cpu : cpus) {
-                    Job running = cpu.getRunning();
-                    if (running == null) {
-                        return true;
-                    }
-                    else if (j.getMaxWallTime().isLessThan(endtime.subtract(running.getEndTime()))) {
-                        return true;
-                    }
+                if (scpus.size() < cpus.size()) {
+                    // there are some cores not running any jobs
+                    return true;
                 }
+                if (scpus.size() == 0) {
+                    // started, but workers not connected yet
+                    return true;
+                }
+                Cpu cpu = scpus.firstKey();
+                Time jobEndTime = scpus.get(cpu);
+                if (j.getMaxWallTime().isLessThan(endtime.subtract(jobEndTime))) {
+                    return true;
+                }
             }
             return false;
         }
@@ -180,25 +184,25 @@
 
     public void remove(Cpu cpu) {
         synchronized (cpus) {
-            if (!scpus.remove(cpu)) {
+            if (scpus.remove(cpu) == null) {
                 CoasterService.error(16, "CPU was not in the block", new Throwable());
             }
-            if (scpus.contains(cpu)) {
+            if (scpus.containsKey(cpu)) {
                 CoasterService.error(17, "CPU not removed", new Throwable());
             }
         }
     }
 
-    public void add(Cpu cpu) {
+    public void add(Cpu cpu, Time estJobCompletionTime) {
     	Cpu last = null;
         synchronized (cpus) {
-            if (!scpus.add(cpu)) {
+            if (scpus.put(cpu, estJobCompletionTime) != null) {
                 CoasterService.error(15, "CPU is already in the block", new Throwable());
             }
-            last = scpus.last();
+            last = scpus.lastKey();
         }
         if (last == cpu) {
-            setDeadline(Time.min(cpu.getTimeLast().add(bqp.getSettings().getReserve()),
+            setDeadline(Time.min(estJobCompletionTime.add(bqp.getSettings().getReserve()),
                 getEndTime()));
         }
     }
@@ -363,7 +367,7 @@
             running = false;
             for (int j = cpus.size(); j < (workers - this.activeWorkers); j++) {
                 Cpu cpu = new Cpu(j, new Node(j, this, null));
-                scpus.add(cpu);
+                scpus.put(cpu, null);
                 cpus.add(cpu);
             }
             this.activeWorkers = 0;
@@ -394,7 +398,7 @@
                 //this id scheme works out because the sid is based on the
                 //number of cpus already added (i.e. cpus.size()).
                 Cpu cpu = new Cpu(wid + i, n);
-                scpus.add(cpu);
+                scpus.put(cpu, cpu.getTimeLast());
                 cpus.add(cpu);
                 n.add(cpu);
                 cpu.workerStarted();



More information about the Swift-commit mailing list