[Swift-commit] r7952 - trunk/src/org/griphyn/vdl/karajan

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Fri Jul 4 01:34:39 CDT 2014


Author: hategan
Date: 2014-07-04 01:34:38 -0500 (Fri, 04 Jul 2014)
New Revision: 7952

Modified:
   trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java
   trunk/src/org/griphyn/vdl/karajan/VDSTaskTransformer.java
Log:
new sites spec

Modified: trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java	2014-07-04 06:33:48 UTC (rev 7951)
+++ trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java	2014-07-04 06:34:38 UTC (rev 7952)
@@ -44,16 +44,14 @@
 import org.globus.cog.karajan.scheduler.ContactAllocationTask;
 import org.globus.cog.karajan.scheduler.ResourceConstraintChecker;
 import org.globus.cog.karajan.scheduler.TaskConstraints;
+import org.globus.cog.karajan.scheduler.WeightedHost;
 import org.globus.cog.karajan.scheduler.WeightedHostScoreScheduler;
 import org.globus.cog.karajan.scheduler.WeightedHostSet;
 import org.globus.cog.karajan.util.BoundContact;
 import org.globus.cog.karajan.util.Contact;
 import org.globus.cog.karajan.util.ContactSet;
 import org.globus.cog.karajan.util.TypeUtil;
-import org.globus.swift.catalog.TCEntry;
-import org.globus.swift.catalog.transformation.File;
-import org.globus.swift.catalog.types.TCType;
-import org.griphyn.vdl.util.FQN;
+import org.globus.swift.catalog.site.SwiftContact;
 
 
 public class VDSAdaptiveScheduler extends WeightedHostScoreScheduler implements CoasterResourceTracker {
@@ -61,7 +59,6 @@
 
 	private static Timer timer;
 
-	private TCCache tc;
 	private LinkedList<Entry> dq;
 	private int clusteringQueueDelay = 1;
 	private int minClusterTime = 60;
@@ -98,9 +95,8 @@
 
 	public void setProperty(String name, Object value) {
 		if (PROP_TC_FILE.equals(name)) {
-			tc = new TCCache(File.getNonSingletonInstance((String) value));
-			this.setConstraintChecker(new TCChecker(tc));
-			this.addTaskTransformer(new VDSTaskTransformer(tc));
+			this.setConstraintChecker(new SwiftSiteChecker());
+			this.addTaskTransformer(new VDSTaskTransformer());
 		}
 		else if (PROP_CLUSTERING_QUEUE_DELAY.equals(name)) {
 			clusteringQueueDelay = TypeUtil.toInt(value);
@@ -115,7 +111,7 @@
 			super.setProperty(name, value);
 		}
 	}
-
+	
 	@Override
 	public void enqueue(Task task, Object constraints, StatusListener l) {
 		if (shouldBeClustered(task, constraints)) {
@@ -465,23 +461,46 @@
 	    }
         super.setResources(cs);
         for (BoundContact bc : cs.getContacts()) {
-            if ("passive".equals(bc.getProperty("globus:workerManager")) 
-                    && "true".equals(bc.getProperty("globus:throttleTracksWorkers"))) {
-                Service s = bc.getService(Service.EXECUTION, "coaster");
-                if (s != null) {
-                    s.setAttribute("resource-tracker", this);
+            Service es = bc.getService(Service.EXECUTION, "coaster");
+            if (es != null && "passive".equals(es.getAttribute("workerManager"))
+                    && "true".equals(bc.getProperty("throttleTracksWorkers"))) {
+                if (es != null) {
+                    es.setAttribute("resource-tracker", this);
                     WeightedHostSet whs = getWeightedHostSet();
                     // set throttle to one so that a task gets sent
                     // to the provider and the connection/service is 
                     // initialized/started
                     whs.changeThrottleOverride(whs.findHost(bc), 1);
-                    serviceContactMapping.put(s, bc);
+                    serviceContactMapping.put(es, bc);
                 }
             }
+            
+            Object maxParallelJobs = bc.getProperty("maxParallelTasks");
+            Object initialParallelJobs = bc.getProperty("initialParallelTasks");
+            if (maxParallelJobs != null) {
+                double max = parseAndCheck(maxParallelJobs, "maxParallelTasks", bc);
+                bc.setProperty("jobThrottle", WeightedHost.jobThrottleFromMaxParallelism(max));
+                if (initialParallelJobs != null) {
+                    double initial = parseAndCheck(initialParallelJobs, "initialParallelTasks", bc);
+                    bc.setProperty("initialScore", WeightedHost.initialScoreFromInitialParallelism(initial, max));
+                }
+            }
+            else if (initialParallelJobs != null) {
+                throw new IllegalArgumentException("initialParallelJobs cannot be used without maxParallelTasks");
+            }
         }
     }
 	
-	public void resourceUpdated(Service service, String name, String value) {
+	private double parseAndCheck(Object value, String name, BoundContact bc) {
+        double d = TypeUtil.toDouble(value);
+        if (d < 1) {
+            throw new IllegalArgumentException("Invalid " + name + " value (" + d + ") for site '" + 
+                bc.getName() + "'. Must be greater or equal to 1.");
+        }
+        return d;
+    }
+
+    public void resourceUpdated(Service service, String name, String value) {
 	    if (logger.isInfoEnabled()) {
 	        logger.info(service + " resource updated: " + name + " -> " + value);
 	    }
@@ -495,29 +514,13 @@
 	    }
     }
 
-	public static class TCChecker implements ResourceConstraintChecker {
-		private TCCache tc;
+	public static class SwiftSiteChecker implements ResourceConstraintChecker {
 
-		public TCChecker(TCCache tc) {
-			this.tc = tc;
-		}
-
 		public boolean checkConstraints(BoundContact resource, TaskConstraints tc) {
-			if (isPresent("trfqn", tc)) {
-				FQN tr = (FQN) tc.getConstraint("trfqn");
-				try {
-					List<TCEntry> l = this.tc.getTCEntries(tr, resource.getHost(), TCType.INSTALLED);
-					if (l == null || l.isEmpty()) {
-						return false;
-					}
-					else {
-						return true;
-					}
-				}
-				catch (Exception e) {
-					logger.warn("Exception caught while querying TC", e);
-					return false;
-				}
+			if (isPresent("tr", tc)) {
+			    SwiftContact sc = (SwiftContact) resource;
+				String tr = (String) tc.getConstraint("tr");
+			    return sc.findApplication(tr) != null;
 			}
 			else {
 				return true;

Modified: trunk/src/org/griphyn/vdl/karajan/VDSTaskTransformer.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/VDSTaskTransformer.java	2014-07-04 06:33:48 UTC (rev 7951)
+++ trunk/src/org/griphyn/vdl/karajan/VDSTaskTransformer.java	2014-07-04 06:34:38 UTC (rev 7952)
@@ -41,8 +41,8 @@
 
 	private TaskTransformer impl;
 
-	public VDSTaskTransformer(TCCache tc) {
-		this.impl = new TCTransformer(tc);
+	public VDSTaskTransformer() {
+		this.impl = new SwiftTransformer();
 	}
 
 	public void transformTask(Task task, Contact[] contacts, Service[] services) {
@@ -54,7 +54,6 @@
 		public void transformTask(Task task, Contact[] contacts, Service[] services) {
 			if (task.getType() == Task.JOB_SUBMISSION) {
 				applyJobWorkDirectory(task, contacts);
-				applyTCEntry(task, contacts);
 			}
 			else if (task.getType() == Task.FILE_TRANSFER) {
 				applyTransferWorkDirectory(task, contacts);
@@ -158,21 +157,8 @@
 			}
 		}
 
-		protected abstract void applyTCEntry(Task task, Contact[] contacts);
 	}
 
-	public static class TCTransformer extends AbstractTransformer {
-		private TCCache tc;
-
-		public TCTransformer(TCCache tc) {
-			this.tc = tc;
-		}
-
-		protected void applyTCEntry(Task task, Contact[] contacts) {
-			// this method used to filter the task executable through
-		    // tc.data, but the task executable at this point was
-		    // always set to /bin/bash (or whatever the wrapper interpreter was).
-		    // That was useless.
-		}
+	public static class SwiftTransformer extends AbstractTransformer {
 	}
 }




More information about the Swift-commit mailing list