[Swift-commit] cog r3477

swift at ci.uchicago.edu swift at ci.uchicago.edu
Sat Sep 22 22:25:03 CDT 2012


------------------------------------------------------------------------
r3477 | hategan | 2012-09-22 22:20:49 -0500 (Sat, 22 Sep 2012) | 1 line

fix potential deadlock in shutdown when cpu is notified by both failed channel and termination of block task
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(revision 3476)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java	(working copy)
@@ -20,12 +20,9 @@
 import org.globus.cog.abstraction.interfaces.Status;
 import org.globus.cog.abstraction.interfaces.StatusListener;
 import org.globus.cog.abstraction.interfaces.Task;
-import org.globus.cog.karajan.workflow.service.channels.ChannelListener;
-import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
 import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
 import org.globus.cog.karajan.workflow.service.commands.Command;
 import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
-import org.globus.cog.karajan.workflow.service.commands.HeartBeatCommand;
 
 public class Cpu implements Comparable<Cpu>, Callback, StatusListener {
     public static final Logger logger = Logger.getLogger(Cpu.class);
@@ -90,7 +87,9 @@
         return dif;
     }
 
-    public synchronized void jobTerminated() {
+    private void jobTerminated() {
+        assert Thread.holdsLock(this);
+        
         if (logger.isInfoEnabled()) {
             logger.info(block.getId() + ":" + getId() + " jobTerminated");
         }
@@ -281,13 +280,13 @@
                 return;
             }
             shutdown = true;
+    		Block block = node.getBlock();
+            done.clear();
+            if (running != null) {
+                logger.info(block.getId() + "-" + id + ": Job still running while shutting down");
+                running.fail("Shutting down worker", null);
+            }
         }
-		Block block = node.getBlock();
-        done.clear();
-        if (running != null) {
-            logger.info(block.getId() + "-" + id + ": Job still running while shutting down");
-            running.fail("Shutting down worker", null);
-        }
         node.shutdown();
     }
 
@@ -305,7 +304,7 @@
         return running;
     }
 
-    public Time getTimeLast() {
+    public synchronized Time getTimeLast() {
         if (running != null) {
             if (timelast.isGreaterThan(Time.now())) {
                 return timelast;
@@ -329,6 +328,9 @@
     }
 
     public synchronized void taskFailed(String msg, Exception e) {
+        if (shutdown) {
+            return;
+        }
 		shutdown = true;
         if (running == null) {
             if (starttime == null) {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java	(revision 3476)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java	(working copy)
@@ -45,7 +45,7 @@
         return submitter;
     }
 
-    private int workers;
+    private int workers, workersStarting;
     private TimeInterval walltime;
     private Time endtime, starttime, deadline, creationtime;
     private final SortedSet<Cpu> scpus;
@@ -62,7 +62,7 @@
 
     private static final NumberFormat IDF = new DecimalFormat("000000");
     
-    public static volatile int requestedWorkers, activeWorkers, failedWorkers, completedWorkers, completedJobs;
+    public static int requestedWorkers, activeWorkers, failedWorkers, completedWorkers, completedJobs;
 
     public Block(String id) {
         this.id = id;
@@ -93,6 +93,9 @@
             "BLOCK_REQUESTED id=" + getId() + ", workers=" + getWorkerCount() + ", walltime="
                     + getWalltime().getSeconds());
         task = new BlockTask(this);
+        synchronized(cpus) {
+            workersStarting = workers;
+        }
         task.addStatusListener(this);
         try {
             task.initialize();
@@ -170,7 +173,7 @@
     }
 
     public void remove(Cpu cpu) {
-        synchronized (scpus) {
+        synchronized (cpus) {
             if (!scpus.remove(cpu)) {
                 CoasterService.error(16, "CPU was not in the block", new Throwable());
             }
@@ -181,7 +184,7 @@
     }
 
     public void add(Cpu cpu) {
-        synchronized (scpus) {
+        synchronized (cpus) {
             if (!scpus.add(cpu)) {
                 CoasterService.error(15, "CPU is already in the block", new Throwable());
             }
@@ -195,7 +198,7 @@
     }
 
     public void shutdownIfEmpty(Cpu cpu) {
-        synchronized (scpus) {
+        synchronized (cpus) {
             if (scpus.isEmpty()) {
                 if (logger.isInfoEnabled() && !shutdown) {
                     logger.info(this + ": all cpus are clear");
@@ -335,23 +338,24 @@
         if (logger.isInfoEnabled()) {
             logger.info("Worker task failed: " + msg, e);
         }
-        requestedWorkers -= workers;
-        activeWorkers -= workers;
+        // use auxiliary list to avoid deadlocks when
+        // cpus get notified separately by the dead channel
+        List<Cpu> cpusToNotify = new ArrayList<Cpu>();
         synchronized (cpus) {
-            synchronized (scpus) {
-                failed = true;
-                running = false;
-                for (int j = cpus.size(); j < workers; j++) {
-                    Cpu cpu = new Cpu(j, new Node(j, this, null));
-                    scpus.add(cpu);
-                    cpus.add(cpu);
-                }
-
-                for (Cpu cpu : cpus) {
-                    cpu.taskFailed(msg, e);
-                }
+            requestedWorkers -= workers;
+            activeWorkers -= workers;
+            failed = true;
+            running = false;
+            for (int j = cpus.size(); j < workersStarting; j++) {
+                Cpu cpu = new Cpu(j, new Node(j, this, null));
+                scpus.add(cpu);
+                cpus.add(cpu);
             }
+            cpusToNotify.addAll(cpus);
         }
+        for (Cpu cpu : cpusToNotify) {
+            cpu.taskFailed(msg, e);
+        }
     }
 
     public String getId() {
@@ -361,31 +365,30 @@
     public String workerStarted(String workerID,
                                 String workerHostname,
                                 ChannelContext channelContext) {
-    	activeWorkers += workers;
         synchronized (cpus) {
-            synchronized (scpus) {
-                int wid = Integer.parseInt(workerID);
-                Node n = new Node(wid, this, workerHostname,
-                                  channelContext);
-                nodes.add(n);
-                int jobsPerNode = bqp.getSettings().getJobsPerNode();
-                for (int i = 0; i < jobsPerNode; i++) {
-                    //this id scheme works out because the sid is based on the
-                    //number of cpus already added (i.e. cpus.size()).
-                    Cpu cpu = new Cpu(wid + i, n);
-                    scpus.add(cpu);
-                    cpus.add(cpu);
-                    n.add(cpu);
-                    cpu.workerStarted();
-                    if (logger.isInfoEnabled()) {
-                        logger.info("Started CPU " + cpu);
-                    }
-                }
+            int wid = Integer.parseInt(workerID);
+            Node n = new Node(wid, this, workerHostname,
+                              channelContext);
+            nodes.add(n);
+            int jobsPerNode = bqp.getSettings().getJobsPerNode();
+            workersStarting -= jobsPerNode;
+            activeWorkers += jobsPerNode;
+            for (int i = 0; i < jobsPerNode; i++) {
+                //this id scheme works out because the sid is based on the
+                //number of cpus already added (i.e. cpus.size()).
+                Cpu cpu = new Cpu(wid + i, n);
+                scpus.add(cpu);
+                cpus.add(cpu);
+                n.add(cpu);
+                cpu.workerStarted();
                 if (logger.isInfoEnabled()) {
-                    logger.info("Started worker " + this.id + ":" + IDF.format(wid));
+                    logger.info("Started CPU " + cpu);
                 }
-                return workerID;
             }
+            if (logger.isInfoEnabled()) {
+                logger.info("Started worker " + this.id + ":" + IDF.format(wid));
+            }
+            return workerID;
         }
     }
     
@@ -538,12 +541,10 @@
 
     public void removeNode(Node node) {
         synchronized(cpus) {
-            synchronized(scpus) {
-                nodes.remove(node);
-                for (Cpu cpu : node.getCpus()) {
-                    scpus.remove(cpu);
-                    cpus.remove(cpu);
-                }
+            nodes.remove(node);
+            for (Cpu cpu : node.getCpus()) {
+                scpus.remove(cpu);
+                cpus.remove(cpu);
             }
         }
         bqp.nodeRemoved(node);



More information about the Swift-commit mailing list