[Swift-commit] r4816 - trunk/src/org/griphyn/vdl/karajan
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Mon Jul 11 17:17:20 CDT 2011
Author: hategan
Date: 2011-07-11 17:17:20 -0500 (Mon, 11 Jul 2011)
New Revision: 4816
Modified:
trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java
Log:
mode for throttle to track running coaster workers
Modified: trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java 2011-07-11 19:33:39 UTC (rev 4815)
+++ trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java 2011-07-11 22:17:20 UTC (rev 4816)
@@ -12,20 +12,24 @@
import java.util.TimerTask;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.local.CoasterResourceTracker;
import org.globus.cog.abstraction.impl.common.IdentityImpl;
import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.impl.common.StatusImpl;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
import org.globus.cog.abstraction.impl.common.task.TaskImpl;
import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Service;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.scheduler.AbstractScheduler;
import org.globus.cog.karajan.scheduler.ResourceConstraintChecker;
import org.globus.cog.karajan.scheduler.TaskConstraints;
import org.globus.cog.karajan.scheduler.WeightedHostScoreScheduler;
+import org.globus.cog.karajan.scheduler.WeightedHostSet;
import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.Contact;
+import org.globus.cog.karajan.util.ContactSet;
import org.globus.cog.karajan.util.TypeUtil;
import org.globus.swift.catalog.TCEntry;
import org.globus.swift.catalog.transformation.File;
@@ -33,7 +37,7 @@
import org.griphyn.vdl.util.FQN;
-public class VDSAdaptiveScheduler extends WeightedHostScoreScheduler {
+public class VDSAdaptiveScheduler extends WeightedHostScoreScheduler implements CoasterResourceTracker {
public static final Logger logger = Logger.getLogger(VDSAdaptiveScheduler.class);
private static Timer timer;
@@ -45,10 +49,17 @@
private Map<Task, List<Object[]>> tasks;
private boolean clusteringEnabled;
private int clusterId;
+
+ /**
+ * A map to allow quick determination of what contact a service
+ * belongs to
+ */
+ private Map<Service, BoundContact> serviceContactMapping;
public VDSAdaptiveScheduler() {
dq = new LinkedList<Object[]>();
tasks = new HashMap<Task, List<Object[]>>();
+ serviceContactMapping = new HashMap<Service, BoundContact>();
}
public static final String PROP_TC_FILE = "transformationCatalogFile";
@@ -421,6 +432,41 @@
failTask(t, ex.getMessage(), ex);
}
}
+
+ @Override
+ public void setResources(ContactSet cs) {
+ super.setResources(cs);
+ for (BoundContact bc : cs.getContacts()) {
+ if ("passive".equals(bc.getProperty("globus:workerManager"))
+ && "true".equals(bc.getProperty("globus:throttleTracksWorkers"))) {
+ Service s = bc.getService(Service.EXECUTION, "coaster");
+ if (s != null) {
+ s.setAttribute("resource-tracker", this);
+ WeightedHostSet whs = getWeightedHostSet();
+ // set throttle to one so that a task gets sent
+ // to the provider and the connection/service is
+ // initialized/started
+ whs.changeThrottleOverride(whs.findHost(bc), 1);
+ serviceContactMapping.put(s, bc);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void resourceUpdated(Service service, String name, String value) {
+ if (logger.isInfoEnabled()) {
+ logger.info(service + " resource updated: " + name + " -> " + value);
+ }
+ if (name.equals("job-capacity")) {
+ int throttle = Integer.parseInt(value);
+ BoundContact bc = serviceContactMapping.get(service);
+ WeightedHostSet whs = getWeightedHostSet();
+ whs.changeThrottleOverride(whs.findHost(bc), throttle > 0 ? throttle : 1);
+
+ raiseTasksFinished();
+ }
+ }
public static class TCChecker implements ResourceConstraintChecker {
private TCCache tc;
@@ -461,7 +507,9 @@
return true;
}
- public List checkConstraints(List resources, TaskConstraints tc) {
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public List checkConstraints(List resources, TaskConstraints tc) {
LinkedList l = new LinkedList();
Iterator i = resources.iterator();
while (i.hasNext()) {
More information about the Swift-commit
mailing list