[Swift-commit] r7745 - in trunk: libexec src/org/globus/swift/data src/org/griphyn/vdl/karajan/lib src/org/griphyn/vdl/karajan/lib/swiftscript src/org/griphyn/vdl/mapping
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Sat Mar 22 14:52:30 CDT 2014
Author: hategan
Date: 2014-03-22 14:52:28 -0500 (Sat, 22 Mar 2014)
New Revision: 7745
Added:
trunk/src/org/griphyn/vdl/karajan/lib/GetStagingInfo.java
trunk/src/org/griphyn/vdl/karajan/lib/RemoteFileNames.java
Removed:
trunk/src/org/griphyn/vdl/karajan/lib/Flatten.java
trunk/src/org/griphyn/vdl/karajan/lib/InFileDirs.java
trunk/src/org/griphyn/vdl/karajan/lib/OutFileDirs.java
trunk/src/org/griphyn/vdl/karajan/lib/OutFiles.java
Modified:
trunk/libexec/swift-int-staging.k
trunk/libexec/swift-int-wrapper-staging.k
trunk/libexec/swift-int.k
trunk/libexec/swift-lib.k
trunk/libexec/swift.k
trunk/src/org/globus/swift/data/Query.java
trunk/src/org/griphyn/vdl/karajan/lib/AppStageins.java
trunk/src/org/griphyn/vdl/karajan/lib/AppStageouts.java
trunk/src/org/griphyn/vdl/karajan/lib/CacheUnlockFiles.java
trunk/src/org/griphyn/vdl/karajan/lib/DoRestartLog.java
trunk/src/org/griphyn/vdl/karajan/lib/IsDone.java
trunk/src/org/griphyn/vdl/karajan/lib/IsLogged.java
trunk/src/org/griphyn/vdl/karajan/lib/JobConstraints.java
trunk/src/org/griphyn/vdl/karajan/lib/LogVar.java
trunk/src/org/griphyn/vdl/karajan/lib/Mark.java
trunk/src/org/griphyn/vdl/karajan/lib/PathUtils.java
trunk/src/org/griphyn/vdl/karajan/lib/SetDatasetValues.java
trunk/src/org/griphyn/vdl/karajan/lib/Stagein.java
trunk/src/org/griphyn/vdl/karajan/lib/Stageout.java
trunk/src/org/griphyn/vdl/karajan/lib/SwiftFunction.java
trunk/src/org/griphyn/vdl/karajan/lib/swiftscript/Misc.java
trunk/src/org/griphyn/vdl/mapping/AbstractDataNode.java
trunk/src/org/griphyn/vdl/mapping/DSHandle.java
Log:
cleaned up stagein and stageout code
Modified: trunk/libexec/swift-int-staging.k
===================================================================
--- trunk/libexec/swift-int-staging.k 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/libexec/swift-int-staging.k 2014-03-22 19:52:28 UTC (rev 7745)
@@ -25,11 +25,7 @@
initDDir := function() {
"{SWIFT:DEBUG_DIR_PREFIX}{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}.d"
}
-
- inFiles := function(stageins) {
- pathnames(stageins)
- }
-
+
fileSizes := function(files) {
math:sum(
for(f, files, file:size(f))
@@ -64,13 +60,10 @@
}
export(execute2,
- function(progress, tr, stagein, stageout, restartout
- replicationGroup, replicationChannel
+ function(progress, tr, stagein, stageout,
+ replicationGroup, replicationChannel,
arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
- stagein := list(unique(each(stagein)))
- stageout := list(unique(each(stageout)))
-
allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
ddir := initDDir()
@@ -83,7 +76,9 @@
wfdir := "{SWIFT:SCRIPT_NAME}-{SWIFT:RUN_ID}"
tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
- cdmfile := cdm:file()
+ cdmfile := cdm:file(),
+
+ (localFileDirs, remoteFileDirs, inFiles, outFiles) := getStagingInfo(stagein, stageout)
try {
log(LOG:DEBUG, "JOB_START jobid={jobid} tr={tr}", if (arguments != null, (" arguments=", arguments)),
@@ -101,9 +96,9 @@
"-out", if(stdout == null, "stdout.txt", getFieldValue(stdout)),
"-err", if(stderr == null, "stderr.txt", getFieldValue(stderr)),
"-i", if (stdin != null, getFieldValue(stdin)),
- "-d", flatten(unique(outFileDirs(stageout))),
- "-if", flatten(inFiles(stagein)),
- "-of", flatten(outFiles(stageout)),
+ "-d", str:join(remoteFileDirs, "|"),
+ "-if", str:join(remoteFileNames(inFiles), "|"),
+ "-of", str:join(remoteFileNames(outFiles), "|"),
"-k",
"-cdmfile", cdmfile,
"-status", "provider"
@@ -132,7 +127,7 @@
stageIn("{loc}{SWIFT:HOME}/libexec/cdm_lib.sh", "cdm_lib.sh")
}
- appStageins(jobid, stagein, stagingMethod)
+ appStageins(jobid, inFiles, stagingMethod)
stageOut("wrapper.log", "{stagingMethod}://localhost/{ddir}/{jobid}.info",
mode = WRAPPER_TRANSFER_MODE)
@@ -156,7 +151,7 @@
}
stageOut("wrapper.error", "{stagingMethod}://localhost/{ddir}/{jobid}.error",
mode = STAGING_MODE:IF_PRESENT)
- appStageouts(jobid, stageout, stagingMethod)
+ appStageouts(jobid, outFiles, stagingMethod)
if (CLEANUP_ENABLED) {
task:cleanUp(".")
@@ -164,7 +159,7 @@
)
- doRestartLog(restartout)
+ doRestartLog(stageout)
log(LOG:DEBUG, "JOB_END jobid={jobid}")
}
else catch(prev) {
Modified: trunk/libexec/swift-int-wrapper-staging.k
===================================================================
--- trunk/libexec/swift-int-wrapper-staging.k 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/libexec/swift-int-wrapper-staging.k 2014-03-22 19:52:28 UTC (rev 7745)
@@ -64,20 +64,6 @@
ddir
}
-
- inFiles := function(stageins) {
- pathnames(stageins)
- }
-
- fileDirs := function(stageins, stageouts) {
- list(
- unique(
- inFileDirs(stageins)
- outFileDirs(stageouts)
- )
- )
- }
-
cleanup := function(dir, host) {
log(LOG:INFO, "START dir={dir} host={host}")
if(vdl:configProperty("sitedir.keep") == "false") {
@@ -143,13 +129,10 @@
export(execute2,
- function(progress, tr, stagein, stageout, restartout
- replicationGroup, replicationChannel
+ function(progress, tr, stagein, stageout,
+ replicationGroup, replicationChannel,
arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
- stagein := list(unique(each(stagein)))
- stageout := list(unique(each(stageout)))
-
allocateHost(rhost, constraints=jobConstraints(tr, stagein = stagein)) {
ddir := initDDir()
@@ -173,9 +156,10 @@
wrapfile := "{jobdir}/_paramfile"
stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
- stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr))
+ stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
+
+ (localFileDirs, remoteFileDirs, inFiles, outFiles) := getStagingInfo(stagein, stageout)
- fileDirs := fileDirs(stagein, stageout)
os := siteProfile(rhost, "SYSINFO:OS")
scratch := siteProfile(rhost, "scratch")
@@ -186,9 +170,9 @@
"\n-out ", stdout,
"\n-err ", stderr,
"\n-i ", if (stdin != null, getFieldValue(stdin)),
- "\n-d ", flatten(each(fileDirs)),
- "\n-if ", flatten(inFiles(stagein)),
- "\n-of ", flatten(outFiles(stageout)),
+ "\n-d ", str:join(remoteFileDirs, "|"),
+ "\n-if ", str:join(remoteFileNames(inFiles), "|"),
+ "\n-of ", str:join(remoteFileNames(outFiles), "|"),
"\n-wt", WRAPPERLOG_ALWAYS_TRANSFER,
"\n-sk", SITEDIR_KEEP,
"\n-cdmfile ", cdm:file(),
@@ -247,9 +231,9 @@
"-out", stdout,
"-err", stderr,
"-i", if (stdin != null, getFieldValue(stdin)),
- "-d", flatten(each(fileDirs)),
- "-if", flatten(inFiles(stagein)),
- "-of", flatten(outFiles(stageout)),
+ "-d", str:join(remoteFileDirs, "|"),
+ "-if", str:join(remoteFileNames(inFiles), "|"),
+ "-of", str:join(remoteFileNames(outFiles), "|"),
"-wt", WRAPPERLOG_ALWAYS_TRANSFER,
"-sk", SITEDIR_KEEP,
"-cdmfile", cdm:file(),
@@ -283,7 +267,7 @@
setProgress(progress, "Stage out")
- doRestartlog(restartout)
+ doRestartlog(stageout)
log(LOG:DEBUG, "JOB_END jobid={jobid}")
}
Modified: trunk/libexec/swift-int.k
===================================================================
--- trunk/libexec/swift-int.k 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/libexec/swift-int.k 2014-03-22 19:52:28 UTC (rev 7745)
@@ -111,19 +111,6 @@
ddir
}
- inFiles := function(stageins) {
- pathnames(stageins)
- }
-
- fileDirs := function(stageins, stageouts) {
- list(
- unique(
- inFileDirs(stageins)
- outFileDirs(stageouts)
- )
- )
- }
-
createDirSet := function(jobid, destdir, host, dirs) {
/*
* Ideally this would be done by creating a tree of the directories
@@ -219,7 +206,7 @@
cdm:wait()
}
- doStagein := function(jobid, files, dir, host) {
+ doStagein := function(jobid, dir, host, files) {
log(LOG:INFO, "START jobid={jobid} - Staging in files {files}")
cdmfile := cdm:file()
@@ -238,11 +225,7 @@
}
parallelFor(file, files) {
- provider := provider(file)
- srchost := hostname(file)
- srcdir := swift:dirname(file)
- destdir := dircat(dir, reldirname(file))
- filename := basename(file)
+ (provider, srchost, destdir, filename, srcdir) := splitFileURL(file, dir)
size := file:size("{srcdir}/{filename}", host=srchost, provider=provider)
policy := cdm:query(file)
@@ -254,18 +237,12 @@
log(LOG:INFO, "END jobid={jobid} - Staging in finished")
}
- doStageout := function(jobid, stageouts, dir, host) {
+ doStageout := function(jobid, dir, host, files) {
log(LOG:INFO, "START jobid={jobid} - Staging out files")
- log(LOG:DEBUG, "stageouts: {stageouts}")
+ log(LOG:DEBUG, "stageouts: {files}")
done := list(
- parallelFor(pv, stageouts) {
- (path, var) := each(pv)
- file := absFileName(getField(var, path))
- provider := provider(file)
- dhost := hostname(file)
- rdir := dircat(dir, reldirname(file))
- bname := basename(file)
- ldir := swift:dirname(file)
+ parallelFor(file, files) {
+ (provider, dhost, rdir, bname, ldir) := splitFileURL(file, dir)
fullLocal := dircat(ldir, bname)
fullRemote := dircat(rdir, bname)
@@ -354,13 +331,10 @@
)
export(execute2,
- function(progress, tr, stagein, stageout, restartout
- replicationGroup, replicationChannel
+ function(progress, tr, stagein, stageout,
+ replicationGroup, replicationChannel,
arguments = [], stdin = null, stdout = null, stderr = null, attributes = null) {
- stagein := list(unique(each(stagein)))
- stageout := list(unique(each(stageout)))
-
allocateHost(rhost, constraints = jobConstraints(tr, stagein = stagein)) {
ddir := initDDir(),
(wfdir, sharedDir) :=
@@ -383,9 +357,10 @@
wrapfile := "{ddir}/param-{jobid}"
stdout := if (stdout == null, "stdout.txt", getFieldValue(stdout))
- stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr))
+ stderr := if (stderr == null, "stderr.txt", getFieldValue(stderr)),
+
+ (localFileDirs, remoteFileDirs, inFiles, outFiles) := getStagingInfo(stagein, stageout)
- fileDirs := fileDirs(stagein, stageout)
os := siteProfile(rhost, "SYSINFO:OS")
if(wrapperMode == "files") {
@@ -395,9 +370,9 @@
"\n-out ", stdout,
"\n-err ", stderr,
"\n-i ", if (stdin != null, getFieldValue(stdin)),
- "\n-d ", flatten(each(fileDirs)),
- "\n-if ", flatten(inFiles(stagein)),
- "\n-of ", flatten(outFiles(stageout)),
+ "\n-d ", str:join(remoteFileDirs, "|"),
+ "\n-if ", str:join(remoteFileNames(inFiles), "|"),
+ "\n-of ", str:join(remoteFileNames(outFiles), "|"),
"\n-cdmfile ", cdm:file(),
"\n-status ", statusMode,
for(a, arguments) {
@@ -410,8 +385,8 @@
tmpdir := dircat("{wfdir}/jobs/{jobdir}", jobid)
try {
- createDirSet(jobid, sharedDir, rhost, fileDirs)
- doStagein(jobid, stagein, sharedDir, rhost)
+ createDirSet(jobid, sharedDir, rhost, localFileDirs)
+ doStagein(jobid, sharedDir, rhost, inFiles)
if(wrapperMode == "files") {
stageWrapperParams(jobid, jobdir, wrapfile, wfdir, rhost)
}
@@ -452,9 +427,9 @@
"-out", stdout,
"-err", stderr,
"-i", if (stdin != null, getFieldValue(stdin)),
- "-d", flatten(each(fileDirs)),
- "-if", flatten(inFiles(stagein)),
- "-of", flatten(outFiles(stageout)),
+ "-d", str:join(remoteFileDirs, "|"),
+ "-if", str:join(remoteFileNames(inFiles), "|"),
+ "-of", str:join(remoteFileNames(outFiles), "|"),
"-cdmfile", cdm:file(),
"-status", statusMode,
"-a", if (arguments != null, each(arguments))
@@ -486,14 +461,14 @@
setProgress(progress, "Stage out")
- doStageout(jobid, stageout, sharedDir, rhost)
- doRestartLog(restartout)
+ doStageout(jobid, sharedDir, rhost, outFiles)
+ doRestartLog(stageout)
if (configProperty("wrapperlog.always.transfer") == "true") {
discard(transferWrapperLog(rhost, wfdir, jobid, jobdir))
}
- cacheUnlockFiles(stagein, sharedDir, rhost) {
+ cacheUnlockFiles(inFiles, sharedDir, rhost) {
cleanupFiles(cacheFilesToRemove, rhost)
}
@@ -502,7 +477,7 @@
else catch(prev) {
if (matches(prev, "^Abort$")) {
log(LOG:DEBUG, "JOB_CANCELED jobid={jobid}")
- cacheUnlockFiles(stagein, sharedDir, rhost, force=false) {
+ cacheUnlockFiles(inFiles, sharedDir, rhost, force=false) {
cleanupFiles(cacheFilesToRemove, rhost)
}
throw(prev)
@@ -517,7 +492,7 @@
throw(exception)
}
- cacheUnlockFiles(stagein, sharedDir, rhost, force=false) {
+ cacheUnlockFiles(inFiles, sharedDir, rhost, force=false) {
cleanupFiles(cacheFilesToRemove, rhost)
}
Modified: trunk/libexec/swift-lib.k
===================================================================
--- trunk/libexec/swift-lib.k 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/libexec/swift-lib.k 2014-03-22 19:52:28 UTC (rev 7745)
@@ -115,6 +115,7 @@
export(hostname, def("org.griphyn.vdl.karajan.lib.PathUtils$HostName"))
export(dircat, def("org.griphyn.vdl.karajan.lib.PathUtils$DirCat"))
export(pathnames, def("org.griphyn.vdl.karajan.lib.PathUtils$PathNames"))
+ export(splitFileURL, def("org.griphyn.vdl.karajan.lib.PathUtils$SplitFileURL"))
export(execute, def("org.griphyn.vdl.karajan.lib.Execute"))
export(expandArguments, def("org.griphyn.vdl.karajan.lib.ExpandArguments"))
@@ -125,13 +126,11 @@
export(appStageins, def("org.griphyn.vdl.karajan.lib.AppStageins"))
export(appStageouts, def("org.griphyn.vdl.karajan.lib.AppStageouts"))
+ export(remoteFileNames, def("org.griphyn.vdl.karajan.lib.RemoteFileNames"))
export(isDone, def("org.griphyn.vdl.karajan.lib.IsDone"))
export(mark, def("org.griphyn.vdl.karajan.lib.Mark"))
- export(flatten, def("org.griphyn.vdl.karajan.lib.Flatten"))
export(parameterlog, def("org.griphyn.vdl.karajan.lib.Parameterlog"))
- export(inFileDirs, def("org.griphyn.vdl.karajan.lib.InFileDirs"))
- export(outFileDirs, def("org.griphyn.vdl.karajan.lib.OutFileDirs"))
- export(outFiles, def("org.griphyn.vdl.karajan.lib.OutFiles"))
+ export(getStagingInfo, def("org.griphyn.vdl.karajan.lib.GetStagingInfo"))
export(doRestartLog, def("org.griphyn.vdl.karajan.lib.DoRestartLog"))
export(unwrapClosedList, def("org.griphyn.vdl.karajan.lib.UnwrapClosedList"))
Modified: trunk/libexec/swift.k
===================================================================
--- trunk/libexec/swift.k 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/libexec/swift.k 2014-03-22 19:52:28 UTC (rev 7745)
@@ -178,11 +178,11 @@
stdin = null, stdout = null, stderr = null,
attributes = null,
deperror = false, mdeperror = false,
- channel(stagein), channel(stageout), channel(restartout)) {
+ channel(stagein), channel(stageout)) {
progress := initProgressState()
- done := isDone(restartout)
+ done := isDone(stageout)
derr := try(deperror, false)
merr := try(mdeperror, false)
@@ -203,7 +203,7 @@
progress,
tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
- stagein, stageout, restartout, replicationGroup, replicationChannel
+ stagein, stageout, replicationGroup, replicationChannel
)
}
else catch(exception) {
@@ -222,7 +222,7 @@
progress,
tr, if(arguments != null, arguments = unwrapClosedList(arguments)),
stdin=stdin, stdout=stdout, stderr=stderr, attributes=attributes,
- stagein, stageout, restartout, null, null
+ stagein, stageout, null, null
)
}
else catch(exception) {
@@ -236,7 +236,7 @@
}
}
setDatasetValues(stageout)
- mark(restartout, false, mapping=false)
+ mark(stageout, false, mapping=false)
log(LOG:INFO, "END_SUCCESS thread=", currentThread(), " tr={tr}")
setProgress(progress, "Finished successfully")
}
@@ -251,7 +251,7 @@
to(errors, exception)
log(LOG:INFO, exception)
echo(exception)
- mark(restartout, true, mapping=false)
+ mark(stageout, true, mapping=false)
graphStuff(tr, stagein, stageout, true, maybe(args=arguments))
}
}
@@ -273,7 +273,7 @@
to(errors, exception)
log(LOG:INFO, exception)
}
- mark(restartout, true, mapping=merr)
+ mark(stageout, true, mapping=merr)
graphStuff(tr, stagein, stageout, true, maybe(args=arguments))
}
}
Modified: trunk/src/org/globus/swift/data/Query.java
===================================================================
--- trunk/src/org/globus/swift/data/Query.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/globus/swift/data/Query.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -25,6 +25,7 @@
import org.globus.cog.karajan.compiled.nodes.functions.AbstractSingleValuedFunction;
import org.globus.cog.karajan.compiled.nodes.functions.NullaryOp;
import org.globus.swift.data.policy.Policy;
+import org.griphyn.vdl.mapping.AbsFile;
/**
Karajan-accessible read-queries to CDM functionality.
@@ -36,7 +37,7 @@
Do CDM policy lookup based on the CDM file.
*/
public static class Q extends AbstractSingleValuedFunction {
- private ArgRef<String> query;
+ private ArgRef<AbsFile> query;
@Override
protected Param[] getParams() {
@@ -45,8 +46,8 @@
@Override
public Object function(Stack stack) {
- String file = query.getValue(stack);
- Policy policy = Director.lookup(file);
+ AbsFile file = query.getValue(stack);
+ Policy policy = Director.lookup(file.getPath());
if (logger.isDebugEnabled()) {
logger.debug("Director.lookup(): " + file + " -> " + policy);
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/AppStageins.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/AppStageins.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/AppStageins.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -33,14 +33,13 @@
import org.globus.cog.karajan.analyzer.Signature;
import org.globus.cog.karajan.analyzer.VarRef;
import org.globus.cog.karajan.compiled.nodes.InternalFunction;
-import org.globus.cog.karajan.util.TypeUtil;
import org.globus.swift.data.Director;
import org.globus.swift.data.policy.Policy;
import org.griphyn.vdl.mapping.AbsFile;
public class AppStageins extends InternalFunction {
private ArgRef<String> jobid;
- private ArgRef<List<String>> files;
+ private ArgRef<List<AbsFile>> files;
private ArgRef<String> stagingMethod;
private ChannelRef<List<String>> cr_stagein;
@@ -64,11 +63,10 @@
protected void runBody(LWThread thr) {
Stack stack = thr.getStack();
- List<String> files = this.files.getValue(stack);
+ List<AbsFile> files = this.files.getValue(stack);
String stagingMethod = this.stagingMethod.getValue(stack);
String cwd = this.cwd.getValue(stack);
- for (Object f : files) {
- AbsFile file = new AbsFile(TypeUtil.toString(f));
+ for (AbsFile file : files) {
Policy policy = Director.lookup(file.toString());
if (policy != Policy.DEFAULT) {
logger.debug("will not stage in (CDM): " + file);
Modified: trunk/src/org/griphyn/vdl/karajan/lib/AppStageouts.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/AppStageouts.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/AppStageouts.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -34,12 +34,10 @@
import org.globus.cog.karajan.compiled.nodes.InternalFunction;
import org.griphyn.vdl.karajan.Pair;
import org.griphyn.vdl.mapping.AbsFile;
-import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.mapping.Path;
public class AppStageouts extends InternalFunction {
private ArgRef<String> jobid;
- private ArgRef<List<List<Object>>> files;
+ private ArgRef<List<AbsFile>> files;
private ArgRef<String> stagingMethod;
private ChannelRef<List<String>> cr_stageout;
@@ -61,14 +59,11 @@
protected void runBody(LWThread thr) {
try {
Stack stack = thr.getStack();
- List<List<Object>> files = this.files.getValue(stack);
+ List<AbsFile> files = this.files.getValue(stack);
String stagingMethod = this.stagingMethod.getValue(stack);
String cwd = this.cwd.getValue(stack);
- for (List<Object> pv : files) {
- Path p = (Path) pv.get(0);
- DSHandle handle = (DSHandle) pv.get(1);
- AbsFile file = new AbsFile(SwiftFunction.filename(handle.getField(p))[0]);
+ for (AbsFile file : files) {
String protocol = file.getProtocol();
if (protocol.equals("file")) {
protocol = stagingMethod;
Modified: trunk/src/org/griphyn/vdl/karajan/lib/CacheUnlockFiles.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/CacheUnlockFiles.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/CacheUnlockFiles.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -21,6 +21,7 @@
package org.griphyn.vdl.karajan.lib;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import k.rt.Stack;
@@ -39,7 +40,7 @@
import org.griphyn.vdl.mapping.AbsFile;
public class CacheUnlockFiles extends CacheFunction {
- private ArgRef<List<?>> file;
+ private ArgRef<Collection<AbsFile>> files;
private ArgRef<String> dir;
private ArgRef<BoundContact> host;
private ArgRef<Boolean> force;
@@ -49,7 +50,7 @@
@Override
protected Signature getSignature() {
- return new Signature(params("file", "dir", "host", optional("force", Boolean.TRUE), block("body")));
+ return new Signature(params("files", "dir", "host", optional("force", Boolean.TRUE), block("body")));
}
@Override
@@ -77,7 +78,7 @@
}
public void remove(Stack stack) {
- List<?> pairs = this.file.getValue(stack);
+ Collection<AbsFile> files = this.files.getValue(stack);
String dir = this.dir.getValue(stack);
Object host = this.host.getValue(stack);
VDLFileCache cache = getCache(stack);
@@ -85,10 +86,9 @@
boolean force = this.force.getValue(stack);
- for (Object o : pairs) {
- String file = (String) o;
- File f = new File(PathUtils.remotePathName(new AbsFile(file).getPath()), dir, host, 0);
- CacheReturn cr = cache.unlockEntry(f, force);
+ for (AbsFile f : files) {
+ File cf = new File(PathUtils.remotePathName(f.getPath()), dir, host, 0);
+ CacheReturn cr = cache.unlockEntry(cf, force);
rem.addAll(cr.remove);
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/DoRestartLog.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/DoRestartLog.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/DoRestartLog.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -33,30 +33,29 @@
import org.globus.cog.karajan.analyzer.Signature;
import org.globus.cog.karajan.compiled.nodes.InternalFunction;
import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.mapping.Path;
public class DoRestartLog extends InternalFunction {
- private ArgRef<List<List<Object>>> restartouts;
+ private ArgRef<List<DSHandle>> stageouts;
private ChannelRef<Object> cr_vargs;
private ChannelRef<Object> cr_restartLog;
@Override
protected Signature getSignature() {
- return new Signature(params("restartouts"), returns(channel("...", DYNAMIC), channel("restartLog", DYNAMIC)));
+ return new Signature(params("stageouts"), returns(channel("...", DYNAMIC), channel("restartLog", DYNAMIC)));
}
@Override
protected void runBody(LWThread thr) {
Stack stack = thr.getStack();
- Collection<List<Object>> files = restartouts.getValue(stack);
+ Collection<DSHandle> files = stageouts.getValue(stack);
Channel<Object> ret = cr_vargs.get(stack);
Channel<Object> log = cr_restartLog.get(stack);
try {
- for (List<Object> pv : files) {
- Path p = (Path) pv.get(0);
- DSHandle handle = (DSHandle) pv.get(1);
- LogVar.logVar(log, handle, p);
+ for (DSHandle file : files) {
+ for (DSHandle leaf : file.getLeaves()) {
+ LogVar.logVar(log, leaf);
+ }
}
}
catch (Exception e) {
Deleted: trunk/src/org/griphyn/vdl/karajan/lib/Flatten.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Flatten.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Flatten.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -1,65 +0,0 @@
-/*
- * Copyright 2012 University of Chicago
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/*
- * Created on Jul 18, 2010
- */
-package org.griphyn.vdl.karajan.lib;
-
-import java.util.List;
-
-import k.rt.Channel;
-import k.rt.Stack;
-
-import org.globus.cog.karajan.analyzer.ChannelRef;
-import org.globus.cog.karajan.analyzer.Signature;
-import org.globus.cog.karajan.util.TypeUtil;
-
-public class Flatten extends SwiftFunction {
- private ChannelRef<?> c_vargs;
-
- @Override
- protected Signature getSignature() {
- return new Signature(params("..."));
- }
-
- @Override
- public Object function(Stack stack) {
- Channel<?> v = c_vargs.get(stack);
- if (v.isEmpty()) {
- return "";
- }
- else {
- StringBuilder sb = new StringBuilder();
- flatten(sb, v.getAll());
- sb.deleteCharAt(sb.length() - 1);
- return sb.toString();
- }
- }
-
- private void flatten(StringBuilder sb, List<?> l) {
- for (Object o : l) {
- if (o instanceof List) {
- flatten(sb, (List<?>) o);
- }
- else {
- sb.append(TypeUtil.toString(o));
- sb.append('|');
- }
- }
- }
-}
Added: trunk/src/org/griphyn/vdl/karajan/lib/GetStagingInfo.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/GetStagingInfo.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/GetStagingInfo.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2012 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Created on Jul 18, 2010
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import k.rt.Channel;
+import k.rt.ExecutionException;
+import k.rt.Stack;
+
+import org.globus.cog.karajan.analyzer.ArgRef;
+import org.globus.cog.karajan.analyzer.ChannelRef;
+import org.globus.cog.karajan.analyzer.Signature;
+import org.griphyn.vdl.mapping.AbsFile;
+import org.griphyn.vdl.mapping.DSHandle;
+import org.griphyn.vdl.mapping.HandleOpenException;
+import org.griphyn.vdl.type.Type;
+
+public class GetStagingInfo extends SwiftFunction {
+
+ private ArgRef<List<DSHandle>> stageins;
+ private ArgRef<List<DSHandle>> stageouts;
+ private ChannelRef<Object> cr_vargs;
+
+ @Override
+ protected Signature getSignature() {
+ return new Signature(params("stageins", "stageouts"), returns(channel("...", 4)));
+ }
+
+
+ @Override
+ public Object function(Stack stack) {
+ Collection<DSHandle> fi = stageins.getValue(stack);
+ Collection<DSHandle> fo = stageouts.getValue(stack);
+ Channel<Object> ret = cr_vargs.get(stack);
+ Set<String> localDirNames = new HashSet<String>();
+ Set<String> remoteDirNames = new HashSet<String>();
+ List<AbsFile> inFiles = new ArrayList<AbsFile>();
+ List<AbsFile> outFiles = new ArrayList<AbsFile>();
+
+ try {
+ addPaths(localDirNames, remoteDirNames, inFiles, fi);
+ addPaths(localDirNames, remoteDirNames, outFiles, fo);
+ }
+ catch (HandleOpenException e) {
+ throw new ExecutionException(e.getMessage(), e);
+ }
+ ret.add(localDirNames);
+ ret.add(remoteDirNames);
+ ret.add(inFiles);
+ ret.add(outFiles);
+ return null;
+ }
+
+ private void addPaths(Set<String> ldirs, Set<String> rdirs, List<AbsFile> files, Collection<DSHandle> vars) throws HandleOpenException {
+ for (DSHandle file : vars) {
+ for (DSHandle leaf : file.getLeaves()) {
+ Type t = leaf.getType();
+ String fname = SwiftFunction.argList(SwiftFunction.filename(leaf), true);
+ AbsFile f = new AbsFile(fname);
+ String dir = f.getDirectory();
+ if (dir != null) {
+ ldirs.add(dir);
+ rdirs.add(remoteDir(f, dir));
+ }
+ files.add(f);
+ }
+ }
+ }
+
+
+ private String remoteDir(AbsFile f, String dir) {
+ if ("file".equals(f.getProtocol())) {
+ return PathUtils.remotePathName(dir);
+ }
+ else {
+ // also prepend host name to the path
+ return f.getHost() + "/" + PathUtils.remotePathName(dir);
+ }
+ }
+}
Deleted: trunk/src/org/griphyn/vdl/karajan/lib/InFileDirs.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/InFileDirs.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/InFileDirs.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -1,65 +0,0 @@
-/*
- * Copyright 2012 University of Chicago
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/*
- * Created on Jul 18, 2010
- */
-package org.griphyn.vdl.karajan.lib;
-
-import java.util.List;
-
-import k.rt.Channel;
-import k.rt.Stack;
-import k.thr.LWThread;
-
-import org.globus.cog.karajan.analyzer.ArgRef;
-import org.globus.cog.karajan.analyzer.ChannelRef;
-import org.globus.cog.karajan.analyzer.Signature;
-import org.globus.cog.karajan.compiled.nodes.InternalFunction;
-import org.griphyn.vdl.mapping.AbsFile;
-
-public class InFileDirs extends InternalFunction {
-
- private ArgRef<List<String>> stageins;
- private ChannelRef<Object> cr_vargs;
-
- @Override
- protected Signature getSignature() {
- return new Signature(params("stageins"), returns(channel("...", DYNAMIC)));
- }
-
-
- @Override
- protected void runBody(LWThread thr) {
- Stack stack = thr.getStack();
- List<String> files = stageins.getValue(stack);
- Channel<Object> ret = cr_vargs.get(stack);
- for (String path : files) {
- AbsFile af = new AbsFile(path);
- String dir = af.getDirectory();
- if (dir != null) {
- if ("file".equals(af.getProtocol())) {
- ret.add(PathUtils.remotePathName(dir));
- }
- else {
- // also prepend host name to the path
- ret.add(af.getHost() + "/" + PathUtils.remotePathName(dir));
- }
- }
- }
- }
-}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/IsDone.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/IsDone.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/IsDone.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -20,8 +20,6 @@
*/
package org.griphyn.vdl.karajan.lib;
-import java.util.List;
-
import k.rt.Context;
import k.rt.Stack;
@@ -30,10 +28,10 @@
import org.globus.cog.karajan.analyzer.Signature;
import org.globus.cog.karajan.analyzer.VarRef;
import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.mapping.Path;
+import org.griphyn.vdl.mapping.HandleOpenException;
public class IsDone extends SwiftFunction {
- private ArgRef<Iterable<List<Object>>> stageout;
+ private ArgRef<Iterable<DSHandle>> stageout;
private VarRef<Context> context;
@@ -50,12 +48,22 @@
@Override
public Object function(Stack stack) {
- Iterable<List<Object>> files = stageout.getValue(stack);
- for (List<Object> pv : files) {
- Path p = (Path) pv.get(0);
- DSHandle handle = (DSHandle) pv.get(1);
- if (!IsLogged.isLogged(context.getValue(stack), handle, p)) {
- return Boolean.FALSE;
+ Iterable<DSHandle> files = stageout.getValue(stack);
+ for (DSHandle file : files) {
+ if (file.isRestartable()) {
+ try {
+ for (DSHandle leaf : file.getLeaves()) {
+ if (!IsLogged.isLogged(context.getValue(stack), leaf)) {
+ return Boolean.FALSE;
+ }
+ }
+ }
+ catch (HandleOpenException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handle open caught in isDone for " + file + ". Assuming false.");
+ }
+ return Boolean.FALSE;
+ }
}
}
if (!files.iterator().hasNext()) {
Modified: trunk/src/org/griphyn/vdl/karajan/lib/IsLogged.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/IsLogged.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/IsLogged.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -24,7 +24,6 @@
import java.util.Map;
import k.rt.Context;
-import k.rt.ExecutionException;
import k.rt.Stack;
import org.globus.cog.karajan.analyzer.ArgRef;
@@ -33,19 +32,18 @@
import org.globus.cog.karajan.analyzer.VarRef;
import org.globus.cog.karajan.compiled.nodes.restartLog.LogEntry;
import org.globus.cog.karajan.compiled.nodes.restartLog.RestartLog;
-import org.globus.cog.karajan.util.TypeUtil;
import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.mapping.Path;
+import org.griphyn.vdl.mapping.PhysicalFormat;
+import org.griphyn.vdl.type.Types;
public class IsLogged extends SwiftFunction {
private ArgRef<DSHandle> var;
- private ArgRef<Object> path;
private VarRef<Context> context;
@Override
protected Signature getSignature() {
- return new Signature(params("var", "path"));
+ return new Signature(params("var"));
}
@Override
@@ -57,25 +55,32 @@
@Override
public Object function(Stack stack) {
DSHandle var = this.var.getValue(stack);
- Path path;
- Object p = this.path.getValue(stack);
- if (p instanceof Path) {
- path = (Path) p;
- }
- else {
- path = Path.parse(TypeUtil.toString(p));
- }
- return Boolean.valueOf(isLogged(context.getValue(stack), var, path));
+ return Boolean.valueOf(isLogged(context.getValue(stack), var));
}
- public static boolean isLogged(Context ctx, DSHandle var, Path path) throws ExecutionException {
+ public static boolean isLogged(Context ctx, DSHandle var) {
@SuppressWarnings("unchecked")
Map<LogEntry, Object> logData = (Map<LogEntry, Object>) ctx.getAttribute(RestartLog.LOG_DATA);
if (logData.isEmpty()) {
return false;
}
- path = var.getPathFromRoot().append(path);
- LogEntry entry = LogEntry.build(LogVar.getLogId(var, path));
+
+ if (var.getType().equals(Types.EXTERNAL)) {
+ return isLogged(logData, LogVar.getLogId(var));
+ }
+ else {
+ PhysicalFormat pf = var.map();
+ if (pf == null) {
+ throw new IllegalArgumentException(var + " could not be mapped");
+ }
+ else {
+ return isLogged(logData, pf.toString());
+ }
+ }
+ }
+
+ private static boolean isLogged(Map<LogEntry, Object> logData, String str) {
+ LogEntry entry = LogEntry.build(str);
boolean found = false;
synchronized (logData) {
List<?> files = (List<?>) logData.get(entry);
@@ -84,5 +89,5 @@
}
}
return found;
- }
+ }
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/JobConstraints.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/JobConstraints.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/JobConstraints.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -32,11 +32,12 @@
import org.globus.cog.karajan.analyzer.Signature;
import org.globus.cog.karajan.scheduler.TaskConstraints;
import org.griphyn.vdl.karajan.lib.cache.CacheMapAdapter;
+import org.griphyn.vdl.mapping.DSHandle;
import org.griphyn.vdl.util.FQN;
public class JobConstraints extends CacheFunction {
private ArgRef<String> tr;
- private ArgRef<Collection<String>> stagein;
+ private ArgRef<Collection<DSHandle>> stagein;
private ChannelRef<Object> cr_vargs;
@Override
@@ -51,39 +52,36 @@
Stack stack = thr.getStack();
String tr = this.tr.getValue(stack);
String[] filenames = null;
- Collection<String> c = this.stagein.getValue(stack);
- if (c != null) {
- filenames = c.toArray(STRING_ARRAY);
- }
+ Collection<DSHandle> stageins = this.stagein.getValue(stack);
SwiftTaskConstraints tc = new SwiftTaskConstraints(tr, new FQN(tr));
- if (filenames != null) {
- tc.setFilenames(filenames);
+ if (stageins != null) {
+ tc.setStageins(stageins);
tc.setFilecache(new CacheMapAdapter(getCache(stack)));
}
cr_vargs.append(stack, tc);
}
private static final List<String> NAMES1 = Arrays.asList("tr", "trfqn");
- private static final List<String> NAMES2 = Arrays.asList("tr", "trfqn", "filenames", "filecache");
+ private static final List<String> NAMES2 = Arrays.asList("tr", "trfqn", "stageins", "filecache");
private static class SwiftTaskConstraints implements TaskConstraints {
private final String tr;
private final FQN trfqn;
- private String[] filenames;
+ private Collection<DSHandle> stageins;
private CacheMapAdapter filecache;
public SwiftTaskConstraints(String tr, FQN trfqn) {
this.tr = tr;
this.trfqn = trfqn;
}
-
- public String[] getFilenames() {
- return filenames;
+
+ public Collection<DSHandle> getStageins() {
+ return stageins;
}
- public void setFilenames(String[] filenames) {
- this.filenames = filenames;
+ public void setStageins(Collection<DSHandle> stageins) {
+ this.stageins = stageins;
}
public CacheMapAdapter getFilecache() {
@@ -103,8 +101,8 @@
else if ("trfqn".equals(name)) {
return trfqn;
}
- else if ("filenames".equals(name)) {
- return filenames;
+ else if ("stageins".equals(name)) {
+ return stageins;
}
else if ("filecache".equals(name)) {
return filecache;
@@ -116,7 +114,7 @@
@Override
public Collection<String> getConstraintNames() {
- if (filenames == null) {
+ if (stageins == null) {
return NAMES1;
}
else {
Modified: trunk/src/org/griphyn/vdl/karajan/lib/LogVar.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/LogVar.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/LogVar.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -28,53 +28,46 @@
import org.globus.cog.karajan.analyzer.ChannelRef;
import org.globus.cog.karajan.analyzer.Signature;
import org.globus.cog.karajan.analyzer.VariableNotFoundException;
-import org.globus.cog.karajan.util.TypeUtil;
import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.mapping.Path;
import org.griphyn.vdl.mapping.PhysicalFormat;
import org.griphyn.vdl.mapping.RootHandle;
+import org.griphyn.vdl.type.Types;
public class LogVar extends SwiftFunction {
private ArgRef<DSHandle> var;
- private ArgRef<Object> path;
private ChannelRef<Object> cr_restartlog;
@Override
protected Signature getSignature() {
- return new Signature(params("var", "path"), returns(channel("restartlog", 1)));
+ return new Signature(params("var"), returns(channel("restartlog", 1)));
}
@Override
public Object function(Stack stack) {
DSHandle var = this.var.getValue(stack);
- Path path;
- Object p = this.path.getValue(stack);
- if (p instanceof Path) {
- path = (Path) p;
- }
- else {
- path = Path.parse(TypeUtil.toString(p));
- }
- logVar(cr_restartlog.get(stack), var, path);
+ logVar(cr_restartlog.get(stack), var);
return null;
}
-
- public static void logVar(Channel<Object> log, DSHandle var, Path path) throws VariableNotFoundException {
- String annotation;
- PhysicalFormat pf = var.map(path);
- if (pf != null) {
- annotation = "" + pf;
- }
- else {
- annotation = "unmapped";
- }
- log.add(getLogId(var, path) + "!" + annotation);
- }
-
- public static String getLogId(DSHandle var, Path path) {
- RootHandle root = var.getRoot();
- LWThread thr = root.getThread();
- return thr.getQualifiedName() + ":" + root.getName() +
- "." + path.stringForm();
- }
+
+ public static void logVar(Channel<Object> log, DSHandle var) throws VariableNotFoundException {
+ if (var.getType().equals(Types.EXTERNAL)) {
+ log.add(getLogId(var));
+ }
+ else {
+ PhysicalFormat pf = var.map();
+ if (pf == null) {
+ throw new IllegalArgumentException(var + " could not be mapped");
+ }
+ else {
+ log.add(pf.toString());
+ }
+ }
+ }
+
+ public static String getLogId(DSHandle var) {
+ RootHandle root = var.getRoot();
+ LWThread thr = root.getThread();
+ return thr.getQualifiedName() + ":" + root.getName() +
+ "." + var.getPathFromRoot().stringForm();
+ }
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/Mark.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Mark.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Mark.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -20,7 +20,7 @@
*/
package org.griphyn.vdl.karajan.lib;
-import java.util.List;
+import java.util.Collection;
import k.rt.Channel;
import k.rt.ExecutionException;
@@ -28,13 +28,13 @@
import org.globus.cog.karajan.analyzer.ArgRef;
import org.globus.cog.karajan.analyzer.Signature;
+import org.griphyn.vdl.mapping.AbstractDataNode;
import org.griphyn.vdl.mapping.DSHandle;
import org.griphyn.vdl.mapping.DataDependentException;
import org.griphyn.vdl.mapping.MappingDependentException;
-import org.griphyn.vdl.mapping.Path;
public class Mark extends SwiftFunction {
- private ArgRef<Channel<List<Object>>> restarts;
+ private ArgRef<Channel<AbstractDataNode>> restarts;
private ArgRef<Boolean> err;
private ArgRef<Boolean> mapping;
@@ -48,19 +48,19 @@
try {
if (err.getValue(stack)) {
boolean mapping = this.mapping.getValue(stack);
- Channel<List<Object>> files = this.restarts.getValue(stack);
- for (List<Object> pv : files) {
- Path p = parsePath(pv.get(0));
- DSHandle handle = (DSHandle) pv.get(1);
- DSHandle leaf = handle.getField(p);
- synchronized (leaf) {
- if (mapping) {
- leaf.setValue(new MappingDependentException(leaf, null));
+ Channel<AbstractDataNode> files = this.restarts.getValue(stack);
+ for (AbstractDataNode dn : files) {
+ Collection<DSHandle> leaves = dn.getLeaves();
+ for (DSHandle leaf : leaves) {
+ synchronized (leaf) {
+ if (mapping) {
+ leaf.setValue(new MappingDependentException(leaf, null));
+ }
+ else {
+ leaf.setValue(new DataDependentException(leaf, null));
+ }
+ leaf.closeShallow();
}
- else {
- leaf.setValue(new DataDependentException(leaf, null));
- }
- leaf.closeShallow();
}
}
}
Deleted: trunk/src/org/griphyn/vdl/karajan/lib/OutFileDirs.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/OutFileDirs.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/OutFileDirs.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -1,74 +0,0 @@
-/*
- * Copyright 2012 University of Chicago
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/*
- * Created on Jul 18, 2010
- */
-package org.griphyn.vdl.karajan.lib;
-
-import java.util.List;
-
-import k.rt.Channel;
-import k.rt.ExecutionException;
-import k.rt.Stack;
-
-import org.globus.cog.karajan.analyzer.ArgRef;
-import org.globus.cog.karajan.analyzer.ChannelRef;
-import org.globus.cog.karajan.analyzer.Signature;
-import org.griphyn.vdl.mapping.AbsFile;
-import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.mapping.Path;
-
-public class OutFileDirs extends SwiftFunction {
-
- private ArgRef<List<List<Object>>> stageouts;
- private ChannelRef<Object> cr_vargs;
-
- @Override
- protected Signature getSignature() {
- return new Signature(params("stageouts"), returns(channel("...", DYNAMIC)));
- }
-
-
- @Override
- public Object function(Stack stack) {
- List<List<Object>> files = stageouts.getValue(stack);
- Channel<Object> ret = cr_vargs.get(stack);
- try {
- for (List<Object> pv : files) {
- Path p = parsePath(pv.get(0));
- DSHandle handle = (DSHandle) pv.get(1);
- DSHandle leaf = handle.getField(p);
- String fname = SwiftFunction.filename(leaf)[0];
- AbsFile af = new AbsFile(fname);
- String dir = af.getDirectory();
- if (dir != null) {
- if ("file".equals(af.getProtocol())) {
- ret.add(PathUtils.remotePathName(dir));
- }
- else {
- ret.add(af.getHost() + "/" + PathUtils.remotePathName(dir));
- }
- }
- }
- }
- catch (Exception e) {
- throw new ExecutionException(this, e);
- }
- return null;
- }
-}
Deleted: trunk/src/org/griphyn/vdl/karajan/lib/OutFiles.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/OutFiles.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/OutFiles.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -1,62 +0,0 @@
-/*
- * Copyright 2012 University of Chicago
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/*
- * Created on Jul 18, 2010
- */
-package org.griphyn.vdl.karajan.lib;
-
-import java.util.List;
-
-import k.rt.Channel;
-import k.rt.ExecutionException;
-import k.rt.Stack;
-
-import org.globus.cog.karajan.analyzer.ArgRef;
-import org.globus.cog.karajan.analyzer.ChannelRef;
-import org.globus.cog.karajan.analyzer.Signature;
-import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.mapping.Path;
-
-public class OutFiles extends SwiftFunction {
- private ArgRef<List<List<Object>>> stageouts;
- private ChannelRef<Object> cr_vargs;
-
- @Override
- protected Signature getSignature() {
- return new Signature(params("stageouts"), returns(channel("...", DYNAMIC)));
- }
-
- @Override
- public Object function(Stack stack) {
- List<List<Object>> files = this.stageouts.getValue(stack);
- Channel<Object> ret = cr_vargs.get(stack);
- try {
- for (List<Object> pv : files) {
- Path p = parsePath(pv.get(0));
- DSHandle handle = (DSHandle) pv.get(1);
- DSHandle leaf = handle.getField(p);
- String fname = argList(SwiftFunction.filename(leaf), true);
- ret.add(fname);
- }
- }
- catch (Exception e) {
- throw new ExecutionException(this, e);
- }
- return null;
- }
-}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/PathUtils.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/PathUtils.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/PathUtils.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -20,10 +20,14 @@
import java.util.ArrayList;
import java.util.List;
+import k.rt.Channel;
import k.rt.Stack;
+import k.thr.LWThread;
import org.globus.cog.karajan.analyzer.ArgRef;
+import org.globus.cog.karajan.analyzer.ChannelRef;
import org.globus.cog.karajan.analyzer.Param;
+import org.globus.cog.karajan.analyzer.Signature;
import org.globus.cog.karajan.compiled.nodes.functions.AbstractSingleValuedFunction;
import org.globus.swift.catalog.types.Os;
import org.griphyn.vdl.mapping.AbsFile;
@@ -61,6 +65,10 @@
@Override
public Object function(Stack stack) {
String dir = new AbsFile(path.getValue(stack)).getDirectory();
+ return function(dir);
+ }
+
+ public static String function(String dir) {
if (dir != null) {
return remotePathName(dir);
}
@@ -73,7 +81,8 @@
private static final char EOL = '\0';
/**
* Replace leading slash if present with "__root__" and replace
- * parent dir references with "__parent__"
+ * parent dir references with "__parent__".
+ *
*/
public static String remotePathName(String dir) {
@@ -272,6 +281,10 @@
String dir = this.dir.getValue(stack);
String path = this.path.getValue(stack);
boolean windows = this.os.getValue(stack).equals(Os.WINDOWS);
+ return function(dir, path, windows);
+ }
+
+ public static String function(String dir, String path, boolean windows) {
if (dir.equals("")) {
return windowsify(path, windows);
}
@@ -310,4 +323,51 @@
return l.toArray(new String[0]);
}
}
+
+ /**
+ * (provider, dhost, rdir, bname, ldir) = splitFileURL(file, dir)
+ *
+ * Implements this functionality that used to be in swift-int.k:
+ *
+ * provider := provider(file)
+ * dhost := hostname(file)
+ * rdir := dircat(dir, reldirname(file))
+ * bname := basename(file)
+ * ldir := swift:dirname(file)
+ *
+ */
+ public static class SplitFileURL extends SwiftFunction {
+ private ArgRef<AbsFile> file;
+ private ArgRef<String> dir;
+ private ChannelRef<Object> cr_vargs;
+
+ @Override
+ protected Signature getSignature() {
+ return new Signature(params("file", "dir"), returns(channel("...", DYNAMIC)));
+ }
+
+
+
+ @Override
+ public void runBody(LWThread thr) {
+ super.runBody(thr);
+ }
+
+ @Override
+ public Object function(Stack stack) {
+ AbsFile f = this.file.getValue(stack);
+ String dir = this.dir.getValue(stack);
+ Channel<Object> ret = cr_vargs.get(stack);
+
+ ret.add(f.getProtocol());
+ ret.add(f.getHost());
+ String fdir = f.getDirectory();
+ ret.add(DirCat.function(dir, RelDirName.function(fdir), false));
+ ret.add(f.getName());
+ ret.add(fdir == null ? "" : fdir);
+
+ return null;
+ }
+ }
+
}
Added: trunk/src/org/griphyn/vdl/karajan/lib/RemoteFileNames.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/RemoteFileNames.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/RemoteFileNames.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Created on Jan 5, 2007
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import k.rt.Stack;
+
+import org.globus.cog.karajan.analyzer.ArgRef;
+import org.globus.cog.karajan.analyzer.Signature;
+import org.griphyn.vdl.mapping.AbsFile;
+
+public class RemoteFileNames extends SwiftFunction {
+ private ArgRef<List<AbsFile>> files;
+
+ @Override
+ protected Signature getSignature() {
+ return new Signature(params("files"));
+ }
+
+ @Override
+ public Object function(Stack stack) {
+ List<AbsFile> files = this.files.getValue(stack);
+ List<String> ret = new ArrayList<String>();
+ for (AbsFile f : files) {
+ String path = null;
+ if ("file".equals(f.getProtocol())) {
+ ret.add(PathUtils.remotePathName(f.getPath()));
+ }
+ else {
+ ret.add(PathUtils.remotePathName(f.getHost() + "/" + f.getPath()));
+ }
+ }
+ return ret;
+ }
+}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/SetDatasetValues.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SetDatasetValues.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SetDatasetValues.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -30,10 +30,9 @@
import org.globus.cog.karajan.analyzer.Signature;
import org.griphyn.vdl.mapping.AbstractDataNode;
import org.griphyn.vdl.mapping.DSHandle;
-import org.griphyn.vdl.mapping.Path;
public class SetDatasetValues extends SwiftFunction {
- private ArgRef<List<List<Object>>> stageouts;
+ private ArgRef<List<DSHandle>> stageouts;
@Override
protected Signature getSignature() {
@@ -42,13 +41,12 @@
@Override
public Object function(Stack stack) {
- Collection<List<Object>> files = this.stageouts.getValue(stack);
+ Collection<DSHandle> files = this.stageouts.getValue(stack);
try {
- for (List<Object> pv : files) {
- Path p = parsePath(pv.get(0));
- DSHandle handle = (DSHandle) pv.get(1);
- DSHandle leaf = handle.getField(p);
- leaf.setValue(AbstractDataNode.FILE_VALUE);
+ for (DSHandle file : files) {
+ for (DSHandle leaf : file.getLeaves()) {
+ leaf.setValue(AbstractDataNode.FILE_VALUE);
+ }
}
}
catch (Exception e) {
Modified: trunk/src/org/griphyn/vdl/karajan/lib/Stagein.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Stagein.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Stagein.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -85,10 +85,7 @@
deperr = true;
}
- k.rt.Channel<Object> stagein = cr_stagein.get(stack);
- for (Path p : fp) {
- stagein.add(filename(var.getField(p))[0]);
- }
+ cr_stagein.append(stack, var);
}
catch (ConditionalYield f) {
if (tracer.isEnabled()) {
Modified: trunk/src/org/griphyn/vdl/karajan/lib/Stageout.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/Stageout.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/Stageout.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -20,11 +20,6 @@
*/
package org.griphyn.vdl.karajan.lib;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import k.rt.ExecutionException;
import k.rt.Stack;
import org.apache.log4j.Logger;
@@ -33,32 +28,23 @@
import org.globus.cog.karajan.analyzer.Signature;
import org.globus.cog.karajan.analyzer.VarRef;
import org.griphyn.vdl.mapping.AbstractDataNode;
-import org.griphyn.vdl.mapping.DSHandle;
+import org.griphyn.vdl.mapping.Mapper;
import org.griphyn.vdl.mapping.MappingDependentException;
-import org.griphyn.vdl.mapping.Path;
public class Stageout extends SwiftFunction {
public static final Logger logger = Logger.getLogger(Stageout.class);
private ArgRef<AbstractDataNode> var;
private ChannelRef<Object> cr_stageout;
- private ChannelRef<Object> cr_restartout;
private VarRef<Boolean> r_deperror;
private VarRef<Boolean> r_mdeperror;
@Override
protected Signature getSignature() {
return new Signature(params("var"), returns("deperror", "mdeperror",
- channel("stageout", DYNAMIC), channel("restartout", DYNAMIC)));
+ channel("stageout", DYNAMIC)));
}
- private List<?> list(Path p, DSHandle var) {
- ArrayList<Object> l = new ArrayList<Object>(2);
- l.add(p);
- l.add(var);
- return l;
- }
-
@Override
public Object function(Stack stack) {
AbstractDataNode var = this.var.getValue(stack);
@@ -69,16 +55,16 @@
// if these arrays had their sizes closed, which could lead to
// race conditions (e.g. if this array's mapper had some parameter
// dependencies that weren't closed at the time the app was started).
- if (var.getType().isArray()) {
- var.waitFor(this);
- }
try {
+ if (var.getType().isArray()) {
+ Mapper m = var.getMapper();
+ if (m.isStatic()) {
+ var.waitFor(this);
+ }
+ }
if (!var.isPrimitive()) {
- retPaths(cr_stageout.get(stack), var);
+ cr_stageout.append(stack, var);
}
- if (var.isRestartable()) {
- retPaths(cr_restartout.get(stack), var);
- }
}
catch (MappingDependentException e) {
logger.debug(e);
@@ -93,16 +79,4 @@
}
return null;
}
-
- private void retPaths(k.rt.Channel<Object> channel, DSHandle var) throws ExecutionException {
- try {
- Collection<Path> fp = var.getFringePaths();
- for (Path p : fp) {
- channel.add(list(p, var));
- }
- }
- catch (Exception e) {
- throw new ExecutionException(this, e);
- }
- }
}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/SwiftFunction.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SwiftFunction.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SwiftFunction.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -277,7 +277,7 @@
* true, then each input string will be passed through the relativize
* function.
*/
- public String argList(String[] s, boolean relative) {
+ public static String argList(String[] s, boolean relative) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < s.length; i++) {
if (relative) {
Modified: trunk/src/org/griphyn/vdl/karajan/lib/swiftscript/Misc.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/swiftscript/Misc.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/karajan/lib/swiftscript/Misc.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -172,7 +172,13 @@
@Override
public Object function(Stack stack) {
AbstractDataNode dn = file.getValue(stack);
- String filename = SwiftFunction.unwrap(this, dn);
+ String filename;
+ if (dn.getType().equals(Types.STRING)) {
+ filename = SwiftFunction.unwrap(this, dn);
+ }
+ else {
+ filename = SwiftFunction.filename(dn)[0];
+ }
AbsFile file = new AbsFile(filename);
if (logger.isDebugEnabled()) {
Modified: trunk/src/org/griphyn/vdl/mapping/AbstractDataNode.java
===================================================================
--- trunk/src/org/griphyn/vdl/mapping/AbstractDataNode.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/mapping/AbstractDataNode.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -459,6 +459,46 @@
}
}
}
+
+ public Collection<DSHandle> getLeaves() throws HandleOpenException {
+ Type t = getType();
+ if (t.isPrimitive()) {
+ return Collections.emptyList();
+ }
+ if (!t.isComposite()) {
+ return Collections.singletonList((DSHandle) this);
+ }
+ List<DSHandle> list = new ArrayList<DSHandle>();
+ getLeaves(list);
+ return list;
+ }
+
+ public void getLeaves(List<DSHandle> list) throws HandleOpenException {
+ checkMappingException();
+ if (getType().getBaseType() != null) {
+ list.add(this);
+ }
+ else {
+ for (Field field : getType().getFields()) {
+ AbstractDataNode child;
+ String name = (String) field.getId();
+ try {
+ child = (AbstractDataNode) this.getField(name);
+ }
+ catch (NoSuchFieldException e) {
+ throw new RuntimeException("Inconsistency between type declaration and " +
+ "handle for field '" + name + "'");
+ }
+ Type type = child.getType();
+ if (!type.isPrimitive() && !child.isArray() && type.getFields().size() == 0) {
+ list.add(child);
+ }
+ else {
+ child.getLeaves(list);
+ }
+ }
+ }
+ }
public void closeShallow() {
synchronized(this) {
Modified: trunk/src/org/griphyn/vdl/mapping/DSHandle.java
===================================================================
--- trunk/src/org/griphyn/vdl/mapping/DSHandle.java 2014-03-19 21:11:38 UTC (rev 7744)
+++ trunk/src/org/griphyn/vdl/mapping/DSHandle.java 2014-03-22 19:52:28 UTC (rev 7745)
@@ -91,6 +91,8 @@
public void closeDeep();
public Collection<Path> getFringePaths() throws HandleOpenException;
+
+ public Collection<DSHandle> getLeaves() throws HandleOpenException;
public Map<Comparable<?>, DSHandle> getArrayValue();
More information about the Swift-commit
mailing list