[Swift-commit] r4816 - trunk/src/org/griphyn/vdl/karajan

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Mon Jul 11 17:17:20 CDT 2011


Author: hategan
Date: 2011-07-11 17:17:20 -0500 (Mon, 11 Jul 2011)
New Revision: 4816

Modified:
   trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java
Log:
mode for throttle to track running coaster workers

Modified: trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java	2011-07-11 19:33:39 UTC (rev 4815)
+++ trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java	2011-07-11 22:17:20 UTC (rev 4816)
@@ -12,20 +12,24 @@
 import java.util.TimerTask;
 
 import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.local.CoasterResourceTracker;
 import org.globus.cog.abstraction.impl.common.IdentityImpl;
 import org.globus.cog.abstraction.impl.common.StatusEvent;
 import org.globus.cog.abstraction.impl.common.StatusImpl;
 import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
 import org.globus.cog.abstraction.impl.common.task.TaskImpl;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Service;
 import org.globus.cog.abstraction.interfaces.Status;
 import org.globus.cog.abstraction.interfaces.Task;
 import org.globus.cog.karajan.scheduler.AbstractScheduler;
 import org.globus.cog.karajan.scheduler.ResourceConstraintChecker;
 import org.globus.cog.karajan.scheduler.TaskConstraints;
 import org.globus.cog.karajan.scheduler.WeightedHostScoreScheduler;
+import org.globus.cog.karajan.scheduler.WeightedHostSet;
 import org.globus.cog.karajan.util.BoundContact;
 import org.globus.cog.karajan.util.Contact;
+import org.globus.cog.karajan.util.ContactSet;
 import org.globus.cog.karajan.util.TypeUtil;
 import org.globus.swift.catalog.TCEntry;
 import org.globus.swift.catalog.transformation.File;
@@ -33,7 +37,7 @@
 import org.griphyn.vdl.util.FQN;
 
 
-public class VDSAdaptiveScheduler extends WeightedHostScoreScheduler {
+public class VDSAdaptiveScheduler extends WeightedHostScoreScheduler implements CoasterResourceTracker {
 	public static final Logger logger = Logger.getLogger(VDSAdaptiveScheduler.class);
 
 	private static Timer timer;
@@ -45,10 +49,17 @@
 	private Map<Task, List<Object[]>> tasks;
 	private boolean clusteringEnabled;
 	private int clusterId;
+	
+	/**
+	 * A map to allow quick determination of what contact a service
+	 * belongs to
+	 */
+	private Map<Service, BoundContact> serviceContactMapping;
 
 	public VDSAdaptiveScheduler() {
 		dq = new LinkedList<Object[]>();
 		tasks = new HashMap<Task, List<Object[]>>();
+		serviceContactMapping = new HashMap<Service, BoundContact>();
 	}
 
 	public static final String PROP_TC_FILE = "transformationCatalogFile";
@@ -421,6 +432,41 @@
 			failTask(t, ex.getMessage(), ex);
 		}
 	}
+	
+	@Override
+    public void setResources(ContactSet cs) {
+        super.setResources(cs);
+        for (BoundContact bc : cs.getContacts()) {
+            if ("passive".equals(bc.getProperty("globus:workerManager")) 
+                    && "true".equals(bc.getProperty("globus:throttleTracksWorkers"))) {
+                Service s = bc.getService(Service.EXECUTION, "coaster");
+                if (s != null) {
+                    s.setAttribute("resource-tracker", this);
+                    WeightedHostSet whs = getWeightedHostSet();
+                    // set throttle to one so that a task gets sent
+                    // to the provider and the connection/service is 
+                    // initialized/started
+                    whs.changeThrottleOverride(whs.findHost(bc), 1);
+                    serviceContactMapping.put(s, bc);
+                }
+            }
+        }
+    }
+	
+	@Override
+    public void resourceUpdated(Service service, String name, String value) {
+	    if (logger.isInfoEnabled()) {
+	        logger.info(service + " resource updated: " + name + " -> " + value);
+	    }
+	    if (name.equals("job-capacity")) {
+	        int throttle = Integer.parseInt(value);
+    	    BoundContact bc = serviceContactMapping.get(service);
+    	    WeightedHostSet whs = getWeightedHostSet();
+    	    whs.changeThrottleOverride(whs.findHost(bc), throttle > 0 ? throttle : 1);
+    	    
+    	    raiseTasksFinished();
+	    }
+    }
 
 	public static class TCChecker implements ResourceConstraintChecker {
 		private TCCache tc;
@@ -461,7 +507,9 @@
 			return true;
 		}
 
-		public List checkConstraints(List resources, TaskConstraints tc) {
+		
+		@SuppressWarnings({ "rawtypes", "unchecked" })
+        public List checkConstraints(List resources, TaskConstraints tc) {
 			LinkedList l = new LinkedList();
 			Iterator i = resources.iterator();
 			while (i.hasNext()) {




More information about the Swift-commit mailing list