[Swift-commit] cog r3793
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Fri Sep 20 23:00:06 CDT 2013
------------------------------------------------------------------------
r3793 | hategan | 2013-09-20 22:56:02 -0500 (Fri, 20 Sep 2013) | 1 line
fixed two deadlocks in the coaster service (one causing bug 1006)
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (revision 3792)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Cpu.java (working copy)
@@ -176,11 +176,11 @@
if (logger.isTraceEnabled()) {
logger.trace(block.getId() + ":" + getId() + " pull");
}
+ if (isShutDown()) {
+ return;
+ }
synchronized (this) {
totalJobCount++;
- if (shutdown) {
- return;
- }
if (!started()) {
sleep();
}
@@ -240,7 +240,7 @@
if (timelast == null) {
CoasterService.error(20, "Timelast is null", new Throwable());
}
- block.add(this);
+ block.add(this, timelast);
submit(running);
return true;
}
@@ -293,11 +293,10 @@
}
public void shutdown() {
+ if (raiseShutdown()) {
+ return;
+ }
synchronized(this) {
- if (shutdown) {
- return;
- }
- shutdown = true;
Block block = node.getBlock();
done.clear();
if (running != null) {
@@ -344,29 +343,48 @@
public List<Job> getDoneJobs() {
return done;
}
+
+ private Object shutdownLock = new Object();
+
+ private boolean raiseShutdown() {
+ synchronized(shutdownLock) {
+ if (shutdown) {
+ return true;
+ }
+ shutdown = true;
+ return false;
+ }
+ }
+
+ private boolean isShutDown() {
+ synchronized(shutdownLock) {
+ return shutdown;
+ }
+ }
- public synchronized void taskFailed(String msg, Exception e) {
- if (shutdown) {
+ public void taskFailed(String msg, Exception e) {
+ if (raiseShutdown()) {
return;
}
- shutdown = true;
- if (running == null) {
- if (starttime == null) {
- starttime = Time.now();
+ synchronized(this) {
+ if (running == null) {
+ if (starttime == null) {
+ starttime = Time.now();
+ }
+ if (endtime == null) {
+ endtime = starttime.add(block.getWalltime());
+ }
+ TimeInterval time = endtime.subtract(Time.now());
+ int cpus = 1 + getPullThread(node.getBlock()).sleepers();
+ running = bqp.request(time, cpus);
+ // no listener is added to this task, so make sure
+ // it won't linger in the BQP running set
+ bqp.jobTerminated(running);
}
- if (endtime == null) {
- endtime = starttime.add(block.getWalltime());
+ if (running != null) {
+ running.fail("Block task failed: " + msg, e);
}
- TimeInterval time = endtime.subtract(Time.now());
- int cpus = 1 + getPullThread(node.getBlock()).sleepers();
- running = bqp.request(time, cpus);
- // no listener is added to this task, so make sure
- // it won't linger in the BQP running set
- bqp.jobTerminated(running);
}
- if (running != null) {
- running.fail("Block task failed: " + msg, e);
- }
}
public void errorReceived(Command cmd, String msg, Exception t) {
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java (revision 3792)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java (working copy)
@@ -14,9 +14,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.SortedSet;
+import java.util.SortedMap;
import java.util.TimerTask;
-import java.util.TreeSet;
+import java.util.TreeMap;
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.coaster.service.CoasterService;
@@ -48,7 +48,7 @@
private int workers, activeWorkers;
private TimeInterval walltime;
private Time endtime, starttime, deadline, creationtime;
- private final SortedSet<Cpu> scpus;
+ private final SortedMap<Cpu, Time> scpus;
private final List<Cpu> cpus;
private final List<Node> nodes;
private boolean running = false, failed, shutdown, suspended;
@@ -70,7 +70,7 @@
public Block(String id) {
this.id = id;
- scpus = new TreeSet<Cpu>();
+ scpus = new TreeMap<Cpu, Time>();
cpus = new ArrayList<Cpu>();
nodes = new ArrayList<Node>();
}
@@ -164,15 +164,19 @@
// if the simple tests before fail try to see if there is a
// cpu that can specifically fit this job.
synchronized (cpus) {
- for (Cpu cpu : cpus) {
- Job running = cpu.getRunning();
- if (running == null) {
- return true;
- }
- else if (j.getMaxWallTime().isLessThan(endtime.subtract(running.getEndTime()))) {
- return true;
- }
+ if (scpus.size() < cpus.size()) {
+ // there are some cores not running any jobs
+ return true;
}
+ if (scpus.size() == 0) {
+ // started, but workers not connected yet
+ return true;
+ }
+ Cpu cpu = scpus.firstKey();
+ Time jobEndTime = scpus.get(cpu);
+ if (j.getMaxWallTime().isLessThan(endtime.subtract(jobEndTime))) {
+ return true;
+ }
}
return false;
}
@@ -180,25 +184,25 @@
public void remove(Cpu cpu) {
synchronized (cpus) {
- if (!scpus.remove(cpu)) {
+ if (scpus.remove(cpu) == null) {
CoasterService.error(16, "CPU was not in the block", new Throwable());
}
- if (scpus.contains(cpu)) {
+ if (scpus.containsKey(cpu)) {
CoasterService.error(17, "CPU not removed", new Throwable());
}
}
}
- public void add(Cpu cpu) {
+ public void add(Cpu cpu, Time estJobCompletionTime) {
Cpu last = null;
synchronized (cpus) {
- if (!scpus.add(cpu)) {
+ if (scpus.put(cpu, estJobCompletionTime) != null) {
CoasterService.error(15, "CPU is already in the block", new Throwable());
}
- last = scpus.last();
+ last = scpus.lastKey();
}
if (last == cpu) {
- setDeadline(Time.min(cpu.getTimeLast().add(bqp.getSettings().getReserve()),
+ setDeadline(Time.min(estJobCompletionTime.add(bqp.getSettings().getReserve()),
getEndTime()));
}
}
@@ -363,7 +367,7 @@
running = false;
for (int j = cpus.size(); j < (workers - this.activeWorkers); j++) {
Cpu cpu = new Cpu(j, new Node(j, this, null));
- scpus.add(cpu);
+ scpus.put(cpu, null);
cpus.add(cpu);
}
this.activeWorkers = 0;
@@ -394,7 +398,7 @@
//this id scheme works out because the sid is based on the
//number of cpus already added (i.e. cpus.size()).
Cpu cpu = new Cpu(wid + i, n);
- scpus.add(cpu);
+ scpus.put(cpu, cpu.getTimeLast());
cpus.add(cpu);
n.add(cpu);
cpu.workerStarted();
More information about the Swift-commit
mailing list