[Swift-commit] r2511 - trunk/src/org/griphyn/vdl/karajan/lib/replication
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Fri Feb 6 21:05:50 CST 2009
Author: hategan
Date: 2009-02-06 21:05:49 -0600 (Fri, 06 Feb 2009)
New Revision: 2511
Modified:
trunk/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java
Log:
kill jobs that have been running for twice their walltime
Modified: trunk/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java 2009-02-07 02:59:26 UTC (rev 2510)
+++ trunk/src/org/griphyn/vdl/karajan/lib/replication/ReplicationManager.java 2009-02-07 03:05:49 UTC (rev 2511)
@@ -5,131 +5,201 @@
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.execution.WallTime;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.scheduler.Scheduler;
import org.griphyn.vdl.util.VDL2Config;
public class ReplicationManager {
- public static final Logger logger = Logger.getLogger(ReplicationManager.class);
+ public static final Logger logger = Logger
+ .getLogger(ReplicationManager.class);
- public static final int STATUS_NEEDS_REPLICATION = 100;
-
- public static final int INITIAL_QUEUE_TIME_ESTIMATE = 30; //seconds
+ public static final int STATUS_NEEDS_REPLICATION = 100;
- private int n;
- private long s;
- private long s2;
- private Map queued;
- private int minQueueTime, limit;
- private boolean enabled;
- private ReplicationGroups replicationGroups;
- private Scheduler scheduler;
+ public static final int INITIAL_QUEUE_TIME_ESTIMATE = 30; // seconds
+
+ public static final int WALLTIME_DEADLINE_MULTIPLIER = 2;
- public ReplicationManager(Scheduler scheduler) {
- this.replicationGroups = new ReplicationGroups(scheduler);
- queued = new HashMap();
- try {
- minQueueTime = Integer.parseInt(VDL2Config.getConfig().getProperty(
- "replication.min.queue.time"));
- enabled = Boolean.valueOf(VDL2Config.getConfig().getProperty("replication.enabled")).booleanValue();
- limit = Integer.parseInt(VDL2Config.getConfig().getProperty("replication.limit"));
- }
- catch (Exception e) {
- logger.warn("Failed to get value of replication.min.queue.time property "
- + "from Swift configuration. Using default (60s).", e);
- minQueueTime = 60;
- }
- if (enabled) {
- Sweeper.getSweeper().register(this);
- }
- }
+ private int n;
+ private long s;
+ private long s2;
+ private Map queued, running;
+ private int minQueueTime, limit;
+ private boolean enabled;
+ private ReplicationGroups replicationGroups;
+ private Scheduler scheduler;
- public void register(String rg, Task task) throws CanceledReplicaException {
- if (enabled) {
- replicationGroups.add(rg, task);
- }
- }
+ public ReplicationManager(Scheduler scheduler) {
+ this.replicationGroups = new ReplicationGroups(scheduler);
+ this.scheduler = scheduler;
+ queued = new HashMap();
+ running = new HashMap();
+ try {
+ minQueueTime = Integer.parseInt(VDL2Config.getConfig().getProperty(
+ "replication.min.queue.time"));
+ enabled = Boolean.valueOf(
+ VDL2Config.getConfig().getProperty("replication.enabled"))
+ .booleanValue();
+ limit = Integer.parseInt(VDL2Config.getConfig().getProperty(
+ "replication.limit"));
+ }
+ catch (Exception e) {
+ logger.warn(
+ "Failed to get value of replication.min.queue.time property "
+ + "from Swift configuration. Using default (60s).",
+ e);
+ minQueueTime = 60;
+ }
+ if (enabled) {
+ Sweeper.getSweeper().register(this);
+ }
+ }
- public void submitted(Task task, Date time) {
- if (enabled) {
- synchronized (queued) {
- queued.put(task, time);
- }
- }
- }
+ public void register(String rg, Task task) throws CanceledReplicaException {
+ if (enabled) {
+ replicationGroups.add(rg, task);
+ }
+ }
- public void active(Task task, Date time) {
- if (enabled) {
- Date submitted;
- synchronized (queued) {
- submitted = (Date) queued.remove(task);
- }
- if (submitted != null) {
- long delta = (time.getTime() - submitted.getTime()) / 1000;
- synchronized (this) {
- n++;
- s += delta;
- s2 += delta * delta;
- }
- }
- replicationGroups.active(task);
- }
- }
+ public void submitted(Task task, Date time) {
+ if (enabled) {
+ synchronized (this) {
+ queued.put(task, time);
+ }
+ }
+ }
- public synchronized int getN() {
- return n;
- }
+ public void active(Task task, Date time) {
+ if (enabled) {
+ Date submitted;
+ synchronized (this) {
+ submitted = (Date) queued.remove(task);
+ registerRunning(task, time);
+ if (submitted != null) {
+ long delta = (time.getTime() - submitted.getTime()) / 1000;
+ n++;
+ s += delta;
+ s2 += delta * delta;
+ }
+ }
+ replicationGroups.active(task);
+ }
+ }
- public synchronized double getMean() {
- if (n == 0) {
- return INITIAL_QUEUE_TIME_ESTIMATE;
- }
- else {
- return s / n;
- }
- }
+ private static final Set warnedAboutWalltime = new HashSet();
- public synchronized double getStandardDeviation() {
- if (n == 0) {
- return 0;
- }
- else {
- return Math.sqrt((s2 - s * s / n) / n);
- }
- }
+ protected void registerRunning(Task task, Date time) {
+ JobSpecification spec = (JobSpecification) task.getSpecification();
+ Object walltime = spec.getAttribute("maxwalltime");
+ String tr = (String) spec.getAttribute("tr");
+ if (walltime == null) {
+ warn(tr, "Warning: missing walltime specification for \"" + tr
+ + "\". Assuming 10 minutes.");
+ walltime = "10";
+ }
+ int seconds;
+ try {
+ seconds = WallTime.timeToSeconds(walltime.toString());
+ }
+ catch (IllegalArgumentException e) {
+ warn(tr, "Warning: invalid walltime specification for \"" + tr
+ + "\" (" + walltime + "). Assuming 10 minutes.");
+ seconds = 10 * 60;
+ }
+ Date deadline = new Date(time.getTime() + WALLTIME_DEADLINE_MULTIPLIER * seconds * 1000);
+ running.put(task, deadline);
+ }
- public void checkTasks() {
- Map m;
- synchronized (queued) {
- m = new HashMap(queued);
- }
- Iterator i = m.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- Task t = (Task) e.getKey();
- Date d = (Date) e.getValue();
- if (shouldBeReplicated(t, d)) {
- replicationGroups.requestReplica(t);
- }
- }
- }
+ private void warn(String tr, String message) {
+ synchronized (warnedAboutWalltime) {
+ if (warnedAboutWalltime.add(tr)) {
+ System.out.println(message);
+ }
+ }
+ }
- private boolean shouldBeReplicated(Task t, Date d) {
- if (t.getStatus().getStatusCode() == STATUS_NEEDS_REPLICATION) {
- // don't keep replicating the same job
- return false;
- }
- long inTheQueue = (System.currentTimeMillis() - d.getTime()) / 1000;
- if (inTheQueue > minQueueTime && inTheQueue > 3 * getMean()
- && replicationGroups.getRequestedCount(t) < limit) {
- return true;
- }
- else {
- return false;
- }
- }
+ public synchronized int getN() {
+ return n;
+ }
+
+ public synchronized double getMean() {
+ if (n == 0) {
+ return INITIAL_QUEUE_TIME_ESTIMATE;
+ }
+ else {
+ return s / n;
+ }
+ }
+
+ public synchronized double getStandardDeviation() {
+ if (n == 0) {
+ return 0;
+ }
+ else {
+ return Math.sqrt((s2 - s * s / n) / n);
+ }
+ }
+
+ public void checkTasks() {
+ Map m, r;
+ synchronized (this) {
+ m = new HashMap(queued);
+ r = new HashMap(running);
+ }
+ Iterator i;
+ i = m.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ Task t = (Task) e.getKey();
+ Date d = (Date) e.getValue();
+ if (shouldBeReplicated(t, d)) {
+ replicationGroups.requestReplica(t);
+ }
+ }
+ i = r.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ Task t = (Task) e.getKey();
+ Date d = (Date) e.getValue();
+ if (isOverDeadline(t, d)) {
+ logger.info(t + ": deadline passed. Cancelling job.");
+ cancelTask(t);
+ }
+ }
+ }
+
+ private boolean shouldBeReplicated(Task t, Date d) {
+ if (t.getStatus().getStatusCode() == STATUS_NEEDS_REPLICATION) {
+ // don't keep replicating the same job
+ return false;
+ }
+ long inTheQueue = (System.currentTimeMillis() - d.getTime()) / 1000;
+ if (inTheQueue > minQueueTime && inTheQueue > 3 * getMean()
+ && replicationGroups.getRequestedCount(t) < limit) {
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ private boolean isOverDeadline(Task t, Date d) {
+ if (System.currentTimeMillis() > d.getTime()) {
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ private void cancelTask(Task t) {
+ scheduler.cancelTask(t, "Walltime exceeded");
+ }
}
\ No newline at end of file
More information about the Swift-commit
mailing list