[Swift-commit] r7950 - trunk/src/org/griphyn/vdl/karajan/lib
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Fri Jul 4 01:32:22 CDT 2014
Author: hategan
Date: 2014-07-04 01:32:22 -0500 (Fri, 04 Jul 2014)
New Revision: 7950
Modified:
trunk/src/org/griphyn/vdl/karajan/lib/Executable.java
trunk/src/org/griphyn/vdl/karajan/lib/Execute.java
trunk/src/org/griphyn/vdl/karajan/lib/JobConstraints.java
trunk/src/org/griphyn/vdl/karajan/lib/SiteCatalog.java
trunk/src/org/griphyn/vdl/karajan/lib/SwiftFunction.java
trunk/src/org/griphyn/vdl/karajan/lib/TCProfile.java
Log:
new sites spec
Modified: trunk/src/org/griphyn/vdl/karajan/lib/Executable.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Executable.java 2014-07-04 06:24:42 UTC (rev 7949)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Executable.java 2014-07-04 06:32:22 UTC (rev 7950)
@@ -24,15 +24,13 @@
import org.globus.cog.karajan.analyzer.ArgRef;
import org.globus.cog.karajan.analyzer.Signature;
-import org.globus.cog.karajan.util.BoundContact;
-import org.globus.swift.catalog.TCEntry;
-import org.griphyn.vdl.karajan.TCCache;
-import org.griphyn.vdl.util.FQN;
+import org.globus.swift.catalog.site.Application;
+import org.globus.swift.catalog.site.SwiftContact;
public class Executable extends SwiftFunction {
private ArgRef<String> tr;
- private ArgRef<BoundContact> host;
+ private ArgRef<SwiftContact> host;
@Override
@@ -42,21 +40,16 @@
public Object function(Stack stack) {
- TCCache tc = getTC(stack);
String tr = this.tr.getValue(stack);
- BoundContact bc = this.host.getValue(stack);
- TCEntry tce = getTCE(tc, new FQN(tr), bc);
- if (tce == null) {
+ SwiftContact bc = this.host.getValue(stack);
+ // at this point, a host has been allocated, so we already
+ // know that the app is available on it
+ Application app = bc.findApplication(tr);
+ if (app.executableIsWildcard()) {
return tr;
}
else {
- String pt = tce.getPhysicalTransformation();
- if ("*".equals(pt)) {
- return tr;
- }
- else {
- return pt;
- }
+ return app.getExecutable();
}
}
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/Execute.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Execute.java 2014-07-04 06:24:42 UTC (rev 7949)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Execute.java 2014-07-04 06:32:22 UTC (rev 7950)
@@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import k.rt.Abort;
@@ -35,6 +36,8 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.StatusEvent;
+import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.analyzer.ArgRef;
@@ -58,6 +61,7 @@
private ArgRef<Channel<Object>> replicationChannel;
private ArgRef<String> jobid;
private ArgRef<ProgressState> progress;
+ private ArgRef<Map<String, String>> environment;
private VarRef<Context> context;
@@ -73,7 +77,8 @@
params.add(optional("jobid", null));
removeParams(params, "stdout", "stderr", "stdoutLocation", "stderrLocation",
"stdin", "provider", "securityContext", "nativespec",
- "delegation", "batch");
+ "delegation", "batch", "environment");
+ params.add(optional("environment", null));
return sig;
}
@@ -103,6 +108,11 @@
}
@Override
+ protected void addEnvironment(Stack stack, JobSpecificationImpl js) throws ExecutionException {
+ js.setEnvironmentVariables(environment.getValue(stack));
+ }
+
+ @Override
public void submitScheduled(Scheduler scheduler, Task task, Stack stack, Object constraints) {
try {
setTaskIdentity(stack, task);
@@ -129,12 +139,28 @@
logger.debug("Submitting task " + task);
}
String jobid = this.jobid.getValue(stack);
- if (logger.isDebugEnabled()) {
- logger.debug("jobid=" + jobid + " task=" + task);
+ if (logger.isInfoEnabled()) {
+ JobSpecification spec = (JobSpecification) task.getSpecification();
+ logger.info(buildTaskInfoString(task, spec));
}
}
- protected void registerReplica(Stack stack, Task task) throws CanceledReplicaException {
+ private String buildTaskInfoString(Task task, JobSpecification spec) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("JOB_TASK jobid=");
+ sb.append(jobid);
+ sb.append(" taskid=");
+ sb.append(task.getIdentity());
+ sb.append(" exec=");
+ sb.append(spec.getExecutable());
+ sb.append(" dir=");
+ sb.append(spec.getDirectory());
+ sb.append(" args=");
+ sb.append(spec.getArguments());
+ return sb.toString();
+ }
+
+ protected void registerReplica(Stack stack, Task task) throws CanceledReplicaException {
String rg = this.replicationGroup.getValue(stack);
if (rg != null) {
getReplicationManager(stack).register(rg, task);
@@ -152,12 +178,22 @@
Stack stack = getStack();
try {
if (stack != null) {
- int c = e.getStatus().getStatusCode();
+ Status s = e.getStatus();
+ int c = s.getStatusCode();
+ if (logger.isInfoEnabled()) {
+ if (s.getMessage() == null) {
+ logger.info("TASK_STATUS_CHANGE taskid=" + e.getSource().getIdentity() + " status=" + c);
+ }
+ else {
+ logger.info("TASK_STATUS_CHANGE taskid=" + e.getSource().getIdentity() + " status=" + c +
+ " " + s.getMessage());
+ }
+ }
ProgressState ps = progress.getValue(stack);
if (c == Status.SUBMITTED) {
ps.setState("Submitted");
if (replicationEnabled) {
- getReplicationManager(stack).submitted(task, e.getStatus().getTime());
+ getReplicationManager(stack).submitted(task, s.getTime());
}
}
else if (c == Status.STAGE_IN) {
@@ -169,7 +205,7 @@
else if (c == Status.ACTIVE) {
ps.setState("Active");
if (replicationEnabled) {
- getReplicationManager(stack).active(task, e.getStatus().getTime());
+ getReplicationManager(stack).active(task, s.getTime());
Execute.this.replicationChannel.getValue(stack).close();
}
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/JobConstraints.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/JobConstraints.java 2014-07-04 06:24:42 UTC (rev 7949)
+++ trunk/src/org/griphyn/vdl/karajan/lib/JobConstraints.java 2014-07-04 06:32:22 UTC (rev 7950)
@@ -33,7 +33,6 @@
import org.globus.cog.karajan.scheduler.TaskConstraints;
import org.griphyn.vdl.karajan.lib.cache.CacheMapAdapter;
import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.util.FQN;
public class JobConstraints extends CacheFunction {
private ArgRef<String> tr;
@@ -53,7 +52,7 @@
String tr = this.tr.getValue(stack);
String[] filenames = null;
Collection<DSHandle> stageins = this.stagein.getValue(stack);
- SwiftTaskConstraints tc = new SwiftTaskConstraints(tr, new FQN(tr));
+ SwiftTaskConstraints tc = new SwiftTaskConstraints(tr);
if (stageins != null) {
tc.setStageins(stageins);
tc.setFilecache(new CacheMapAdapter(getCache(stack)));
@@ -67,13 +66,11 @@
private static class SwiftTaskConstraints implements TaskConstraints {
private final String tr;
- private final FQN trfqn;
private Collection<DSHandle> stageins;
private CacheMapAdapter filecache;
- public SwiftTaskConstraints(String tr, FQN trfqn) {
+ public SwiftTaskConstraints(String tr) {
this.tr = tr;
- this.trfqn = trfqn;
}
public Collection<DSHandle> getStageins() {
@@ -98,9 +95,6 @@
if ("tr".equals(name)) {
return tr;
}
- else if ("trfqn".equals(name)) {
- return trfqn;
- }
else if ("stageins".equals(name)) {
return stageins;
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/SiteCatalog.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SiteCatalog.java 2014-07-04 06:24:42 UTC (rev 7949)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SiteCatalog.java 2014-07-04 06:32:22 UTC (rev 7950)
@@ -9,6 +9,9 @@
*/
package org.griphyn.vdl.karajan.lib;
+import java.util.ArrayList;
+import java.util.List;
+
import k.rt.ExecutionException;
import k.rt.Stack;
@@ -24,11 +27,10 @@
import org.globus.cog.karajan.analyzer.ArgRef;
import org.globus.cog.karajan.analyzer.Param;
import org.globus.cog.karajan.compiled.nodes.functions.AbstractSingleValuedFunction;
-import org.globus.cog.karajan.util.BoundContact;
-import org.globus.cog.karajan.util.ContactSet;
+import org.globus.swift.catalog.site.Application;
import org.globus.swift.catalog.site.SiteCatalogParser;
import org.globus.swift.catalog.site.SwiftContact;
-import org.griphyn.vdl.util.FQN;
+import org.globus.swift.catalog.site.SwiftContactSet;
import org.w3c.dom.Document;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
@@ -36,10 +38,6 @@
public class SiteCatalog extends AbstractSingleValuedFunction {
private ArgRef<String> fileName;
-
- private enum Version {
- V1, V2;
- }
@Override
protected Param[] getParams() {
@@ -58,48 +56,59 @@
throw new ExecutionException(this, "Failed to parse site catalog", e);
}
}
+
+ private static class KVPair {
+ public final String key;
+ public final String value;
+
+ public KVPair(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
private Object buildResources(Document doc) {
Node root = getRoot(doc);
if (root.getLocalName().equals("config")) {
- return parse(root, Version.V1);
+ throw new IllegalArgumentException("Old sites file format. Please upgrade your sites file to the new format.");
}
else if (root.getLocalName().equals("sites")) {
- return parse(root, Version.V1);
+ return parse(root);
}
else {
throw new IllegalArgumentException("Illegal sites file root node: " + root.getLocalName());
}
}
-
-
-
- private Object parse(Node config, Version v) {
- ContactSet cs = new ContactSet();
+
+ private Object parse(Node config) {
+ SwiftContactSet cs = new SwiftContactSet();
NodeList pools = config.getChildNodes();
for (int i = 0; i < pools.getLength(); i++) {
Node n = pools.item(i);
if (n.getNodeType() == Node.ELEMENT_NODE) {
- try {
- BoundContact bc = pool(n, v);
- if (bc != null) {
- cs.addContact(bc);
+ String ctype = n.getNodeName();
+ if (ctype.equals("site")) {
+ try {
+ SwiftContact bc = pool(n);
+ if (bc != null) {
+ cs.addContact(bc);
+ }
}
+ catch (Exception e) {
+ throw new ExecutionException(this, "Invalid site entry '" + poolName(n) + "': ", e);
+ }
}
- catch (Exception e) {
- throw new ExecutionException(this, "Invalid site entry '" + poolName(n, v) + "': ", e);
+ else if (ctype.equals("application")) {
+ cs.addApplication(application(n));
}
}
}
return cs;
}
- private String poolName(Node site, Version v) {
- if (site.getLocalName().equals("pool")) {
- return attr(site, "handle");
- }
- else if (site.getLocalName().equals("site")) {
+ private String poolName(Node site) {
+ if (site.getLocalName().equals("site")) {
return attr(site, "name");
}
else {
@@ -117,18 +126,18 @@
throw new IllegalArgumentException("Missing root element");
}
- private BoundContact pool(Node n, Version v) throws InvalidProviderException, ProviderMethodException {
+ private SwiftContact pool(Node n) throws InvalidProviderException, ProviderMethodException {
if (n.getNodeType() != Node.ELEMENT_NODE) {
return null;
}
- String name = poolName(n, v);
+ String name = poolName(n);
SwiftContact bc = new SwiftContact(name);
String sysinfo = attr(n, "sysinfo", null);
if (sysinfo != null) {
- bc.addProperty("sysinfo", sysinfo);
+ bc.setProperty("sysinfo", sysinfo);
}
-
+
NodeList cs = n.getChildNodes();
for (int i = 0; i < cs.getLength(); i++) {
@@ -138,33 +147,24 @@
}
String ctype = c.getNodeName();
- if (v == Version.V1 && ctype.equals("gridftp")) {
- bc.addService(gridftp(c));
- }
- else if (v == Version.V1 && ctype.equals("jobmanager")) {
- bc.addService(jobmanager(c));
- }
- else if (ctype.equals("execution")) {
+ if (ctype.equals("execution")) {
bc.addService(execution(c));
}
else if (ctype.equals("filesystem")) {
bc.addService(filesystem(c));
}
else if (ctype.equals("workdirectory")) {
- bc.addProperty("workdir", text(c));
+ bc.setProperty("workdir", text(c));
}
else if (ctype.equals("scratch")) {
- bc.addProperty("scratch", text(c));
+ bc.setProperty("scratch", text(c));
}
- else if (ctype.equals("env")) {
- env(bc, c);
+ else if (ctype.equals("property")) {
+ bc.setProperty(attr(c, "name"), text(c));
}
- else if (ctype.equals("profile")) {
- profile(bc, c);
+ else if (ctype.equals("apps")) {
+ apps(bc, c);
}
- else if (v == Version.V2 && ctype.equals("application")) {
- application(bc, c);
- }
else {
throw new IllegalArgumentException("Unknown node type: " + ctype);
}
@@ -172,44 +172,85 @@
return bc;
}
- private void application(BoundContact bc, Node c) {
+ private void apps(SwiftContact bc, Node n) {
+ NodeList cs = n.getChildNodes();
+
+ List<KVPair> envs = new ArrayList<KVPair>();
+ List<KVPair> props = new ArrayList<KVPair>();
+ for (int i = 0; i < cs.getLength(); i++) {
+ Node c = cs.item(i);
+ if (c.getNodeType() != Node.ELEMENT_NODE) {
+ continue;
+ }
+ String ctype = c.getNodeName();
+
+ if (ctype.equals("app")) {
+ bc.addApplication(application(c));
+ }
+ else if (ctype.equals("env")) {
+ envs.add(env(c));
+ }
+ else if (ctype.equals("property")) {
+ props.add(this.property(c));
+ }
+ else {
+ throw new IllegalArgumentException("Unknown node type: " + ctype);
+ }
+ }
+
+ mergeEnvsToApps(bc, envs);
+ mergePropsToApps(bc, props);
}
- private Service jobmanager(Node n) throws InvalidProviderException, ProviderMethodException {
- String provider;
- String url = attr(n, "url");
- String major = attr(n, "major");
- if (url.equals("local://localhost")) {
- provider = "local";
+ private void mergeEnvsToApps(SwiftContact bc, List<KVPair> envs) {
+ for (Application app : bc.getApplications()) {
+ for (KVPair kvp : envs) {
+ if (!app.getEnv().containsKey(kvp.key)) {
+ // only merge if app does not override
+ app.setEnv(kvp.key, kvp.value);
+ }
+ }
}
- else if (url.equals("pbs://localhost")) {
- provider = "pbs";
+ }
+
+ private void mergePropsToApps(SwiftContact bc, List<KVPair> props) {
+ for (Application app : bc.getApplications()) {
+ for (KVPair kvp : props) {
+ if (!app.getProperties().containsKey(kvp.key)) {
+ app.addProperty(kvp.key, kvp.value);
+ }
+ }
}
- else if ("2".equals(major)) {
- provider = "gt2";
- }
- else if ("4".equals(major)) {
- provider = "gt4";
- }
- else {
- throw new IllegalArgumentException("Unknown job manager version: " + major + ", url = '" + url + "'");
- }
-
- ServiceContact contact = new ServiceContactImpl(url);
- return new ServiceImpl(provider, Service.EXECUTION,
- contact, AbstractionFactory.newSecurityContext(provider, contact));
}
- private Service gridftp(Node n) throws InvalidProviderException, ProviderMethodException {
- String url = attr(n, "url");
- if (url.equals("local://localhost")) {
- return new ServiceImpl("local", Service.FILE_OPERATION, new ServiceContactImpl("localhost"), null);
+ private Application application(Node n) {
+ Application app = new Application();
+ app.setName(attr(n, "name"));
+ app.setExecutable(attr(n, "executable"));
+
+ NodeList cs = n.getChildNodes();
+
+ for (int i = 0; i < cs.getLength(); i++) {
+ Node c = cs.item(i);
+ if (c.getNodeType() != Node.ELEMENT_NODE) {
+ continue;
+ }
+ String ctype = c.getNodeName();
+
+ if (ctype.equals("env")) {
+ KVPair env = env(c);
+ app.setEnv(env.key, env.value);
+ }
+ else if (ctype.equals("property")) {
+ KVPair prop = property(c);
+ app.addProperty(prop.key, prop.value);
+ }
+ else {
+ throw new IllegalArgumentException("Unknown node type: " + ctype);
+ }
}
- else {
- ServiceContact contact = new ServiceContactImpl(url);
- return new ServiceImpl("gsiftp", Service.FILE_OPERATION,
- contact, AbstractionFactory.newSecurityContext("gsiftp", contact));
- }
+
+ return app;
}
private Service execution(Node n) throws InvalidProviderException, ProviderMethodException {
@@ -233,9 +274,39 @@
s.setJobManager(jobManager);
}
+ properties(s, n);
+
return s;
}
+ private void properties(Service s, Node n) {
+ NodeList cs = n.getChildNodes();
+
+ for (int i = 0; i < cs.getLength(); i++) {
+ Node c = cs.item(i);
+ if (c.getNodeType() != Node.ELEMENT_NODE) {
+ continue;
+ }
+ String ctype = c.getNodeName();
+
+ if (ctype.equals("property")) {
+ property(s, c);
+ }
+ else {
+ throw new IllegalArgumentException("Unknown node type: " + ctype);
+ }
+ }
+
+ }
+
+ private void property(Service s, Node c) {
+ s.setAttribute(attr(c, "name"), text(c));
+ }
+
+ private KVPair property(Node c) {
+ return new KVPair(attr(c, "name"), text(c));
+ }
+
private Service filesystem(Node n) throws InvalidProviderException, ProviderMethodException {
String provider = attr(n, "provider");
String url = attr(n, "url", null);
@@ -251,32 +322,18 @@
s.setSecurityContext(AbstractionFactory.newSecurityContext(provider, contact));
}
+ properties(s, n);
+
return s;
}
-
- private void env(SwiftContact bc, Node n) {
- String key = attr(n, "key");
+
+ private KVPair env(Node n) {
+ String key = attr(n, "name");
String value = text(n);
- bc.addProfile(new FQN("env", key), value);
+ return new KVPair(key, value);
}
- private void profile(SwiftContact bc, Node n) {
- String ns = attr(n, "namespace");
- String key = attr(n, "key");
- String value = text(n);
-
- if (value == null) {
- throw new IllegalArgumentException("No value for profile " + ns + ":" + key);
- }
- if (ns.equals("karajan")) {
- bc.addProperty(key, value);
- }
- else {
- bc.addProfile(new FQN(ns, key), value);
- }
- }
-
private String text(Node n) {
if (n.getFirstChild() != null) {
return n.getFirstChild().getNodeValue();
Modified: trunk/src/org/griphyn/vdl/karajan/lib/SwiftFunction.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SwiftFunction.java 2014-07-04 06:24:42 UTC (rev 7949)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SwiftFunction.java 2014-07-04 06:32:22 UTC (rev 7950)
@@ -21,11 +21,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import k.rt.Channel;
import k.rt.Context;
@@ -41,14 +38,8 @@
import org.globus.cog.karajan.compiled.nodes.Node;
import org.globus.cog.karajan.compiled.nodes.functions.AbstractFunction;
import org.globus.cog.karajan.parser.WrapperNode;
-import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.TypeUtil;
-import org.globus.swift.catalog.TCEntry;
-import org.globus.swift.catalog.transformation.File;
-import org.globus.swift.catalog.types.TCType;
import org.griphyn.vdl.karajan.AssertFailedException;
-import org.griphyn.vdl.karajan.TCCache;
-import org.griphyn.vdl.karajan.functions.ConfigProperty;
import org.griphyn.vdl.mapping.AbsFile;
import org.griphyn.vdl.mapping.DSHandle;
import org.griphyn.vdl.mapping.DependentException;
@@ -65,9 +56,7 @@
import org.griphyn.vdl.type.Field;
import org.griphyn.vdl.type.Type;
import org.griphyn.vdl.type.Types;
-import org.griphyn.vdl.util.FQN;
import org.griphyn.vdl.util.VDL2Config;
-import org.griphyn.vdl.util.VDL2ConfigProperties;
public abstract class SwiftFunction extends AbstractFunction {
public static final Logger logger = Logger.getLogger(SwiftFunction.class);
@@ -386,50 +375,7 @@
return Path.parse((String) o);
}
}
-
- private static Set<List<Object>> warnset = new HashSet<List<Object>>();
-
- protected TCEntry getTCE(TCCache tc, FQN fqn, BoundContact bc) {
- List<TCEntry> l;
- try {
- l = tc.getTCEntries(fqn, bc.getHost(), TCType.INSTALLED);
- }
- catch (Exception e) {
- throw new ExecutionException(this, e);
- }
- if (l == null || l.isEmpty()) {
- return null;
- }
- if (l.size() > 1) {
- synchronized (warnset) {
- LinkedList<Object> wl = new LinkedList<Object>();
- wl.add(fqn);
- wl.add(bc);
- if (!warnset.contains(wl)) {
- logger.warn("Multiple entries found for " + fqn + " on " + bc
- + ". Using the first one");
- warnset.add(wl);
- }
- }
- }
- return l.get(0);
- }
-
- public static final String TC = "vdl:TC";
-
- public TCCache getTC(Stack stack) throws ExecutionException {
- Context c = this.context.getValue(stack);
- synchronized (c) {
- TCCache tc = (TCCache) c.getAttribute(TC);
- if (tc == null) {
- String prop = ConfigProperty.getProperty(VDL2ConfigProperties.TC_FILE, (VDL2Config) c.getAttribute("SWIFT:CONFIG"));
- tc = new TCCache(File.getNonSingletonInstance(prop));
- c.setAttribute(TC, tc);
- }
- return tc;
- }
- }
-
+
private static int provenanceIDCount = 451000;
public static synchronized int nextProvenanceID() {
Modified: trunk/src/org/griphyn/vdl/karajan/lib/TCProfile.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/TCProfile.java 2014-07-04 06:24:42 UTC (rev 7949)
+++ trunk/src/org/griphyn/vdl/karajan/lib/TCProfile.java 2014-07-04 06:32:22 UTC (rev 7950)
@@ -21,11 +21,7 @@
package org.griphyn.vdl.karajan.lib;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import k.rt.ExecutionException;
import k.rt.Stack;
@@ -33,15 +29,11 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.execution.WallTime;
import org.globus.cog.karajan.analyzer.ArgRef;
-import org.globus.cog.karajan.analyzer.ChannelRef;
import org.globus.cog.karajan.analyzer.Signature;
import org.globus.cog.karajan.analyzer.VarRef;
-import org.globus.cog.karajan.compiled.nodes.functions.Map.Entry;
-import org.globus.swift.catalog.TCEntry;
+import org.globus.swift.catalog.site.Application;
import org.globus.swift.catalog.site.SwiftContact;
-import org.globus.swift.catalog.util.Profile;
-import org.griphyn.vdl.karajan.TCCache;
-import org.griphyn.vdl.util.FQN;
+import org.griphyn.vdl.engine.Warnings;
public class TCProfile extends SwiftFunction {
public static final Logger logger = Logger.getLogger(TCProfile.class);
@@ -58,7 +50,7 @@
private VarRef<Object> r_count;
private VarRef<Object> r_jobType;
private VarRef<Object> r_attributes;
- private ChannelRef<Map.Entry<Object, Object>> cr_environment;
+ private VarRef<Map<String, String>> r_environment;
private enum Attr {
COUNT, JOB_TYPE;
@@ -77,66 +69,67 @@
return new Signature(
params("host", optional("attributes", null), optional("tr", null)),
returns("count", "jobType",
- "attributes", channel("environment", DYNAMIC))
+ "attributes", "environment")
);
}
public Object function(Stack stack) {
- TCCache tc = getTC(stack);
String tr = this.tr.getValue(stack);
- Map<String, Object> dynamicAttributes = readDynamicAttributes(stack);
-
SwiftContact bc = this.host.getValue(stack);
-
- Map<String, Object> attrs = null;
- attrs = attributesFromHost(bc, attrs, stack);
-
- TCEntry tce = null;
- if (tr != null) {
- tce = getTCE(tc, new FQN(tr), bc);
+ Application app = bc.findApplication(tr);
+ if (app == null) {
+ throw new RuntimeException("Application '" + tr + "' not found on site '" + bc.getName() + "'");
}
- if (tce != null) {
- addEnvironment(stack, tce);
- addEnvironment(stack, bc);
- attrs = attributesFromTC(tce, attrs, stack);
- }
- attrs = addDynamicAttributes(attrs, dynamicAttributes);
- checkWalltime(attrs, tr, stack);
- addAttributes(attrs, stack);
+ addEnvironment(stack, app.getEnv());
+
+ Map<String, Object> dynamicAttributes = this.attributes.getValue(stack);
+ Map<String, Object> attrs = null;
+
+ attrs = combineAttributes(stack, attrs, app.getProperties());
+ attrs = combineAttributes(stack, attrs, dynamicAttributes);
+
+ checkWalltime(stack, attrs, tr);
+ setAttributes(stack, attrs);
return null;
}
- /**
+ private void addEnvironment(Stack stack, Map<String, String> env) {
+ r_environment.setValue(stack, env);
+ }
+
+ /**
Bring in the dynamic attributes from the Karajan stack
@return Map, may be null
*/
private Map<String, Object> readDynamicAttributes(Stack stack) {
return this.attributes.getValue(stack);
}
-
+
/**
- Store dynamic attributes into returned attributes,
- overwriting if necessary
- @param result Attributes so far known, may be null
- @param dynamicAttributes Attributes to insert, may be null
- @result Combination, may be null
+ * Combine attributes creating result as necessary
*/
- private Map<String, Object>
- addDynamicAttributes(Map<String, Object> result,
- Map<String, Object> dynamicAttributes) {
- if (result == null && dynamicAttributes == null)
- return null;
- if (result == null)
- return dynamicAttributes;
- if (dynamicAttributes == null)
- return result;
- result.putAll(dynamicAttributes);
+ private Map<String, Object> combineAttributes(Stack stack, Map<String, Object> result, Map<String, Object> src) {
+ if (src == null || src.isEmpty()) {
+ return result;
+ }
+ for (Map.Entry<String, Object> e : src.entrySet()) {
+ Attr a = ATTR_TYPES.get(e.getKey());
+ if (a != null) {
+ setAttr(a, stack, e.getValue());
+ }
+ else {
+ if (result == null) {
+ result = new HashMap<String, Object>();
+ }
+ result.put(e.getKey(), e.getValue());
+ }
+ }
return result;
}
- private void checkWalltime(Map<String, Object> attrs, String tr, Stack stack) {
+ private void checkWalltime(Stack stack, Map<String, Object> attrs, String tr) {
if (attrs == null) {
return;
}
@@ -149,113 +142,15 @@
WallTime.timeToSeconds(walltime.toString());
}
catch (ExecutionException e) {
- warn(tr, "Warning: invalid walltime specification for \"" + tr
+ Warnings.warn(Warnings.Type.SITE, "Invalid walltime specification for \"" + tr
+ "\" (" + walltime + ").");
}
}
-
- private static final Set<String> warnedAboutWalltime =
- new HashSet<String>();
-
- private void warn(String tr, String message) {
- synchronized (warnedAboutWalltime) {
- if (warnedAboutWalltime.add(tr)) {
- System.out.println(message);
- }
- }
- }
-
- private void addEnvironment(Stack stack, TCEntry tce) {
- List<Profile> list = tce.getProfiles(Profile.ENV);
- if (list != null) {
- for (Profile p : list) {
- cr_environment.append(stack, new Entry(p.getProfileKey(), p.getProfileValue()));
- }
- }
- }
-
- public static final String PROFILE_GLOBUS_PREFIX = (Profile.GLOBUS + "::").toLowerCase();
-
- private void addEnvironment(Stack stack, SwiftContact bc) {
- Map<FQN, Object> profiles = bc.getProfiles();
- if (profiles != null) {
- for (Map.Entry<FQN, Object> e : profiles.entrySet()) {
- FQN fqn = e.getKey();
- String value = (String) e.getValue();
- if (Profile.ENV.equalsIgnoreCase(fqn.getNamespace())) {
- cr_environment.append(stack, new Entry(fqn.getName(), value));
- }
- }
- }
- }
-
- private void addAttributes(Map<String,Object> attrs, Stack stack) {
- if (logger.isDebugEnabled()) {
- logger.debug("Attributes: " + attrs);
- }
- if (attrs == null) {
- return;
- }
- Iterator<Map.Entry<String, Object>> i = attrs.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<String, Object> e = i.next();
- Attr a = ATTR_TYPES.get(e.getKey());
- if (a != null) {
- setAttr(a, stack, e.getValue());
- i.remove();
- }
- }
- if (attrs.size() == 0) {
- return;
- }
+
+ private void setAttributes(Stack stack, Map<String, Object> attrs) {
this.r_attributes.setValue(stack, attrs);
}
- private Map<String,Object> attributesFromTC(TCEntry tce, Map<String,Object> attrs, Stack stack) {
- List<Profile> list = tce.getProfiles(Profile.GLOBUS);
- if (list != null) {
- for (Profile p : list) {
- Attr a = ATTR_TYPES.get(p.getProfileKey());
- if (a == null) {
- if (attrs == null) {
- attrs = new HashMap<String,Object>();
- }
- attrs.put(p.getProfileKey(), p.getProfileValue());
- }
- else {
- setAttr(a, stack, p.getProfileValue());
- }
- }
- }
- return attrs;
- }
-
- /**
- Inserts namespace=globus attributes from BoundContact bc
- into given attrs
- */
- private Map<String,Object> attributesFromHost(SwiftContact bc, Map<String, Object> attrs, Stack stack) {
- Map<FQN, Object> profiles = bc.getProfiles();
- if (profiles != null) {
- for (Map.Entry<FQN, Object> e : profiles.entrySet()) {
- FQN fqn = e.getKey();
- if (Profile.GLOBUS.equalsIgnoreCase(fqn.getNamespace())) {
- Attr a = ATTR_TYPES.get(fqn.getName());
- if (a == null) {
- if (attrs == null) {
- attrs = new HashMap<String,Object>();
- }
- attrs.put(fqn.getName(), e.getValue());
- }
- else {
- setAttr(a, stack, e.getValue());
- }
- }
- }
- }
- return attrs;
- }
-
private void setAttr(Attr a, Stack stack, Object value) {
switch (a) {
case COUNT:
More information about the Swift-commit
mailing list