[Swift-commit] r7952 - trunk/src/org/griphyn/vdl/karajan
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Fri Jul 4 01:34:39 CDT 2014
Author: hategan
Date: 2014-07-04 01:34:38 -0500 (Fri, 04 Jul 2014)
New Revision: 7952
Modified:
trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java
trunk/src/org/griphyn/vdl/karajan/VDSTaskTransformer.java
Log:
new sites spec
Modified: trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java 2014-07-04 06:33:48 UTC (rev 7951)
+++ trunk/src/org/griphyn/vdl/karajan/VDSAdaptiveScheduler.java 2014-07-04 06:34:38 UTC (rev 7952)
@@ -44,16 +44,14 @@
import org.globus.cog.karajan.scheduler.ContactAllocationTask;
import org.globus.cog.karajan.scheduler.ResourceConstraintChecker;
import org.globus.cog.karajan.scheduler.TaskConstraints;
+import org.globus.cog.karajan.scheduler.WeightedHost;
import org.globus.cog.karajan.scheduler.WeightedHostScoreScheduler;
import org.globus.cog.karajan.scheduler.WeightedHostSet;
import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.Contact;
import org.globus.cog.karajan.util.ContactSet;
import org.globus.cog.karajan.util.TypeUtil;
-import org.globus.swift.catalog.TCEntry;
-import org.globus.swift.catalog.transformation.File;
-import org.globus.swift.catalog.types.TCType;
-import org.griphyn.vdl.util.FQN;
+import org.globus.swift.catalog.site.SwiftContact;
public class VDSAdaptiveScheduler extends WeightedHostScoreScheduler implements CoasterResourceTracker {
@@ -61,7 +59,6 @@
private static Timer timer;
- private TCCache tc;
private LinkedList<Entry> dq;
private int clusteringQueueDelay = 1;
private int minClusterTime = 60;
@@ -98,9 +95,8 @@
public void setProperty(String name, Object value) {
if (PROP_TC_FILE.equals(name)) {
- tc = new TCCache(File.getNonSingletonInstance((String) value));
- this.setConstraintChecker(new TCChecker(tc));
- this.addTaskTransformer(new VDSTaskTransformer(tc));
+ this.setConstraintChecker(new SwiftSiteChecker());
+ this.addTaskTransformer(new VDSTaskTransformer());
}
else if (PROP_CLUSTERING_QUEUE_DELAY.equals(name)) {
clusteringQueueDelay = TypeUtil.toInt(value);
@@ -115,7 +111,7 @@
super.setProperty(name, value);
}
}
-
+
@Override
public void enqueue(Task task, Object constraints, StatusListener l) {
if (shouldBeClustered(task, constraints)) {
@@ -465,23 +461,46 @@
}
super.setResources(cs);
for (BoundContact bc : cs.getContacts()) {
- if ("passive".equals(bc.getProperty("globus:workerManager"))
- && "true".equals(bc.getProperty("globus:throttleTracksWorkers"))) {
- Service s = bc.getService(Service.EXECUTION, "coaster");
- if (s != null) {
- s.setAttribute("resource-tracker", this);
+ Service es = bc.getService(Service.EXECUTION, "coaster");
+ if (es != null && "passive".equals(es.getAttribute("workerManager"))
+ && "true".equals(bc.getProperty("throttleTracksWorkers"))) {
+ if (es != null) {
+ es.setAttribute("resource-tracker", this);
WeightedHostSet whs = getWeightedHostSet();
// set throttle to one so that a task gets sent
// to the provider and the connection/service is
// initialized/started
whs.changeThrottleOverride(whs.findHost(bc), 1);
- serviceContactMapping.put(s, bc);
+ serviceContactMapping.put(es, bc);
}
}
+
+ Object maxParallelJobs = bc.getProperty("maxParallelTasks");
+ Object initialParallelJobs = bc.getProperty("initialParallelTasks");
+ if (maxParallelJobs != null) {
+ double max = parseAndCheck(maxParallelJobs, "maxParallelTasks", bc);
+ bc.setProperty("jobThrottle", WeightedHost.jobThrottleFromMaxParallelism(max));
+ if (initialParallelJobs != null) {
+ double initial = parseAndCheck(initialParallelJobs, "initialParallelTasks", bc);
+ bc.setProperty("initialScore", WeightedHost.initialScoreFromInitialParallelism(initial, max));
+ }
+ }
+ else if (initialParallelJobs != null) {
+ throw new IllegalArgumentException("initialParallelJobs cannot be used without maxParallelTasks");
+ }
}
}
- public void resourceUpdated(Service service, String name, String value) {
+ private double parseAndCheck(Object value, String name, BoundContact bc) {
+ double d = TypeUtil.toDouble(value);
+ if (d < 1) {
+ throw new IllegalArgumentException("Invalid " + name + " value (" + d + ") for site '" +
+ bc.getName() + "'. Must be greater or equal to 1.");
+ }
+ return d;
+ }
+
+ public void resourceUpdated(Service service, String name, String value) {
if (logger.isInfoEnabled()) {
logger.info(service + " resource updated: " + name + " -> " + value);
}
@@ -495,29 +514,13 @@
}
}
- public static class TCChecker implements ResourceConstraintChecker {
- private TCCache tc;
+ public static class SwiftSiteChecker implements ResourceConstraintChecker {
- public TCChecker(TCCache tc) {
- this.tc = tc;
- }
-
public boolean checkConstraints(BoundContact resource, TaskConstraints tc) {
- if (isPresent("trfqn", tc)) {
- FQN tr = (FQN) tc.getConstraint("trfqn");
- try {
- List<TCEntry> l = this.tc.getTCEntries(tr, resource.getHost(), TCType.INSTALLED);
- if (l == null || l.isEmpty()) {
- return false;
- }
- else {
- return true;
- }
- }
- catch (Exception e) {
- logger.warn("Exception caught while querying TC", e);
- return false;
- }
+ if (isPresent("tr", tc)) {
+ SwiftContact sc = (SwiftContact) resource;
+ String tr = (String) tc.getConstraint("tr");
+ return sc.findApplication(tr) != null;
}
else {
return true;
Modified: trunk/src/org/griphyn/vdl/karajan/VDSTaskTransformer.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/VDSTaskTransformer.java 2014-07-04 06:33:48 UTC (rev 7951)
+++ trunk/src/org/griphyn/vdl/karajan/VDSTaskTransformer.java 2014-07-04 06:34:38 UTC (rev 7952)
@@ -41,8 +41,8 @@
private TaskTransformer impl;
- public VDSTaskTransformer(TCCache tc) {
- this.impl = new TCTransformer(tc);
+ public VDSTaskTransformer() {
+ this.impl = new SwiftTransformer();
}
public void transformTask(Task task, Contact[] contacts, Service[] services) {
@@ -54,7 +54,6 @@
public void transformTask(Task task, Contact[] contacts, Service[] services) {
if (task.getType() == Task.JOB_SUBMISSION) {
applyJobWorkDirectory(task, contacts);
- applyTCEntry(task, contacts);
}
else if (task.getType() == Task.FILE_TRANSFER) {
applyTransferWorkDirectory(task, contacts);
@@ -158,21 +157,8 @@
}
}
- protected abstract void applyTCEntry(Task task, Contact[] contacts);
}
- public static class TCTransformer extends AbstractTransformer {
- private TCCache tc;
-
- public TCTransformer(TCCache tc) {
- this.tc = tc;
- }
-
- protected void applyTCEntry(Task task, Contact[] contacts) {
- // this method used to filter the task executable through
- // tc.data, but the task executable at this point was
- // always set to /bin/bash (or whatever the wrapper interpreter was).
- // That was useless.
- }
+ public static class SwiftTransformer extends AbstractTransformer {
}
}
More information about the Swift-commit
mailing list