[Swift-commit] cog r3477
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Sat Sep 22 22:25:03 CDT 2012
------------------------------------------------------------------------
r3477 | hategan | 2012-09-22 22:20:49 -0500 (Sat, 22 Sep 2012) | 1 line
fix potential deadlock in shutdown when cpu is notified by both failed channel and termination of block task
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (revision 3476)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (working copy)
@@ -20,12 +20,9 @@
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
-import org.globus.cog.karajan.workflow.service.channels.ChannelListener;
-import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
-import org.globus.cog.karajan.workflow.service.commands.HeartBeatCommand;
public class Cpu implements Comparable<Cpu>, Callback, StatusListener {
public static final Logger logger = Logger.getLogger(Cpu.class);
@@ -90,7 +87,9 @@
return dif;
}
- public synchronized void jobTerminated() {
+ private void jobTerminated() {
+ assert Thread.holdsLock(this);
+
if (logger.isInfoEnabled()) {
logger.info(block.getId() + ":" + getId() + " jobTerminated");
}
@@ -281,13 +280,13 @@
return;
}
shutdown = true;
+ Block block = node.getBlock();
+ done.clear();
+ if (running != null) {
+ logger.info(block.getId() + "-" + id + ": Job still running while shutting down");
+ running.fail("Shutting down worker", null);
+ }
}
- Block block = node.getBlock();
- done.clear();
- if (running != null) {
- logger.info(block.getId() + "-" + id + ": Job still running while shutting down");
- running.fail("Shutting down worker", null);
- }
node.shutdown();
}
@@ -305,7 +304,7 @@
return running;
}
- public Time getTimeLast() {
+ public synchronized Time getTimeLast() {
if (running != null) {
if (timelast.isGreaterThan(Time.now())) {
return timelast;
@@ -329,6 +328,9 @@
}
public synchronized void taskFailed(String msg, Exception e) {
+ if (shutdown) {
+ return;
+ }
shutdown = true;
if (running == null) {
if (starttime == null) {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java (revision 3476)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java (working copy)
@@ -45,7 +45,7 @@
return submitter;
}
- private int workers;
+ private int workers, workersStarting;
private TimeInterval walltime;
private Time endtime, starttime, deadline, creationtime;
private final SortedSet<Cpu> scpus;
@@ -62,7 +62,7 @@
private static final NumberFormat IDF = new DecimalFormat("000000");
- public static volatile int requestedWorkers, activeWorkers, failedWorkers, completedWorkers, completedJobs;
+ public static int requestedWorkers, activeWorkers, failedWorkers, completedWorkers, completedJobs;
public Block(String id) {
this.id = id;
@@ -93,6 +93,9 @@
"BLOCK_REQUESTED id=" + getId() + ", workers=" + getWorkerCount() + ", walltime="
+ getWalltime().getSeconds());
task = new BlockTask(this);
+ synchronized(cpus) {
+ workersStarting = workers;
+ }
task.addStatusListener(this);
try {
task.initialize();
@@ -170,7 +173,7 @@
}
public void remove(Cpu cpu) {
- synchronized (scpus) {
+ synchronized (cpus) {
if (!scpus.remove(cpu)) {
CoasterService.error(16, "CPU was not in the block", new Throwable());
}
@@ -181,7 +184,7 @@
}
public void add(Cpu cpu) {
- synchronized (scpus) {
+ synchronized (cpus) {
if (!scpus.add(cpu)) {
CoasterService.error(15, "CPU is already in the block", new Throwable());
}
@@ -195,7 +198,7 @@
}
public void shutdownIfEmpty(Cpu cpu) {
- synchronized (scpus) {
+ synchronized (cpus) {
if (scpus.isEmpty()) {
if (logger.isInfoEnabled() && !shutdown) {
logger.info(this + ": all cpus are clear");
@@ -335,23 +338,24 @@
if (logger.isInfoEnabled()) {
logger.info("Worker task failed: " + msg, e);
}
- requestedWorkers -= workers;
- activeWorkers -= workers;
+ // use auxiliary list to avoid deadlocks when
+ // cpus get notified separately by the dead channel
+ List<Cpu> cpusToNotify = new ArrayList<Cpu>();
synchronized (cpus) {
- synchronized (scpus) {
- failed = true;
- running = false;
- for (int j = cpus.size(); j < workers; j++) {
- Cpu cpu = new Cpu(j, new Node(j, this, null));
- scpus.add(cpu);
- cpus.add(cpu);
- }
-
- for (Cpu cpu : cpus) {
- cpu.taskFailed(msg, e);
- }
+ requestedWorkers -= workers;
+ activeWorkers -= workers;
+ failed = true;
+ running = false;
+ for (int j = cpus.size(); j < workersStarting; j++) {
+ Cpu cpu = new Cpu(j, new Node(j, this, null));
+ scpus.add(cpu);
+ cpus.add(cpu);
}
+ cpusToNotify.addAll(cpus);
}
+ for (Cpu cpu : cpusToNotify) {
+ cpu.taskFailed(msg, e);
+ }
}
public String getId() {
@@ -361,31 +365,30 @@
public String workerStarted(String workerID,
String workerHostname,
ChannelContext channelContext) {
- activeWorkers += workers;
synchronized (cpus) {
- synchronized (scpus) {
- int wid = Integer.parseInt(workerID);
- Node n = new Node(wid, this, workerHostname,
- channelContext);
- nodes.add(n);
- int jobsPerNode = bqp.getSettings().getJobsPerNode();
- for (int i = 0; i < jobsPerNode; i++) {
- //this id scheme works out because the sid is based on the
- //number of cpus already added (i.e. cpus.size()).
- Cpu cpu = new Cpu(wid + i, n);
- scpus.add(cpu);
- cpus.add(cpu);
- n.add(cpu);
- cpu.workerStarted();
- if (logger.isInfoEnabled()) {
- logger.info("Started CPU " + cpu);
- }
- }
+ int wid = Integer.parseInt(workerID);
+ Node n = new Node(wid, this, workerHostname,
+ channelContext);
+ nodes.add(n);
+ int jobsPerNode = bqp.getSettings().getJobsPerNode();
+ workersStarting -= jobsPerNode;
+ activeWorkers += jobsPerNode;
+ for (int i = 0; i < jobsPerNode; i++) {
+ //this id scheme works out because the sid is based on the
+ //number of cpus already added (i.e. cpus.size()).
+ Cpu cpu = new Cpu(wid + i, n);
+ scpus.add(cpu);
+ cpus.add(cpu);
+ n.add(cpu);
+ cpu.workerStarted();
if (logger.isInfoEnabled()) {
- logger.info("Started worker " + this.id + ":" + IDF.format(wid));
+ logger.info("Started CPU " + cpu);
}
- return workerID;
}
+ if (logger.isInfoEnabled()) {
+ logger.info("Started worker " + this.id + ":" + IDF.format(wid));
+ }
+ return workerID;
}
}
@@ -538,12 +541,10 @@
public void removeNode(Node node) {
synchronized(cpus) {
- synchronized(scpus) {
- nodes.remove(node);
- for (Cpu cpu : node.getCpus()) {
- scpus.remove(cpu);
- cpus.remove(cpu);
- }
+ nodes.remove(node);
+ for (Cpu cpu : node.getCpus()) {
+ scpus.remove(cpu);
+ cpus.remove(cpu);
}
}
bqp.nodeRemoved(node);
More information about the Swift-commit
mailing list