[Swift-commit] r3221 - in trunk: etc libexec src/org/globus/swift src/org/globus/swift/data src/org/globus/swift/data/policy src/org/globus/swift/data/util
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Wed Feb 3 15:12:49 CST 2010
Author: wozniak
Date: 2010-02-03 15:12:49 -0600 (Wed, 03 Feb 2010)
New Revision: 3221
Added:
trunk/libexec/cdm.pl
trunk/src/org/globus/swift/data/
trunk/src/org/globus/swift/data/Director.java
trunk/src/org/globus/swift/data/Query.java
trunk/src/org/globus/swift/data/policy/
trunk/src/org/globus/swift/data/policy/Default.java
trunk/src/org/globus/swift/data/policy/Direct.java
trunk/src/org/globus/swift/data/policy/Policy.java
trunk/src/org/globus/swift/data/util/
trunk/src/org/globus/swift/data/util/LineReader.java
Modified:
trunk/etc/log4j.properties
trunk/libexec/_swiftwrap
trunk/libexec/vdl-int.k
trunk/libexec/vdl-lib.xml
trunk/libexec/vdl.k
Log:
General framework for CDM functionality, including DIRECT policy.
Modified: trunk/etc/log4j.properties
===================================================================
--- trunk/etc/log4j.properties 2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/etc/log4j.properties 2010-02-03 21:12:49 UTC (rev 3221)
@@ -26,4 +26,6 @@
log4j.logger.org.globus.cog.abstraction.impl.common.task.TaskImpl=DEBUG
log4j.logger.org.griphyn.vdl.karajan.lib.GetFieldValue=DEBUG
log4j.logger.org.griphyn.vdl.engine.Karajan=INFO
-log4j.logger.org.globus.cog.abstraction.coaster.rlog=INFO
\ No newline at end of file
+log4j.logger.org.globus.cog.abstraction.coaster.rlog=INFO
+
+log4j.logger.org.globus.swift.data.Director=DEBUG
Modified: trunk/libexec/_swiftwrap
===================================================================
--- trunk/libexec/_swiftwrap 2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/libexec/_swiftwrap 2010-02-03 21:12:49 UTC (rev 3221)
@@ -113,6 +113,37 @@
exec 3>&-
}
+cdm() {
+ log "CDM: $@"
+
+ local JOBDIR=$1 # Given jobdir
+ local MODE=$2 # INPUT or OUTPUT
+ local FILE=$3 # User file
+ local POLICY=$4 # DIRECT, BROADCAST, ...
+ shift 4
+ local ARGS=$@
+
+ case $POLICY in
+ DIRECT)
+ DIRECT_DIR=${ARGS[0]}
+ log "CDM[DIRECT]: Linking $JOBDIR/$FILE to $DIRECT_DIR/$FILE"
+ if [ $MODE == "INPUT" ]; then
+ [ -f "$DIRECT_DIR/$FILE" ]
+ checkError 254 "CDM[DIRECT]: $DIRECT_DIR/$FILE does not exist!"
+ ln -s $DIRECT_DIR/$FILE $JOBDIR/$FILE
+ checkError 254 "CDM[DIRECT]: Linking to $DIRECT_DIR/$FILE failed!"
+ elif [ $MODE == "OUTPUT" ]; then
+ touch $DIRECT_DIR/$FILE
+ checkError 254 "CDM[DIRECT]: Touching $DIRECT_DIR/$FILE failed!"
+ ln -s $DIRECT_DIR/$FILE $JOBDIR/$FILE
+ checkError 254 "CDM[DIRECT]: Linking to $DIRECT_DIR/$FILE failed!"
+ else
+ fail 254 "Unknown MODE: $MODE"
+ fi
+ ;;
+ esac
+}
+
COMMANDLINE=$@
# get the parent directory of the directory containing _swiftwrap, to use
@@ -162,7 +193,6 @@
logstate "LOG_START"
infosection "Wrapper"
-
getarg "-e" "$@"
EXEC=$VALUE
shift $SHIFTCOUNT
@@ -195,6 +225,10 @@
KICKSTART=$VALUE
shift $SHIFTCOUNT
+getarg "-cdmfile" "$@"
+CDM_FILE=$VALUE
+shift $SHIFTCOUNT
+
getarg "-status" "$@"
STATUSMODE=$VALUE
shift $SHIFTCOUNT
@@ -259,6 +293,7 @@
log "INF=$INF"
log "OUTF=$OUTF"
log "KICKSTART=$KICKSTART"
+log "CDM_FILE=$CDM_FILE"
log "ARGS=$@"
log "ARGC=$#"
@@ -276,24 +311,45 @@
log "Created output directory: $DIR/$D"
done
+log "PWD: $( pwd )"
+log "FIND: $( find . )"
+
logstate "LINK_INPUTS"
for L in $INF ; do
+ CDM_POLICY=$( perl shared/cdm.pl $L < shared/$CDM_FILE )
+ log "CDM_POLICY: $CDM_POLICY"
+ if [ $CDM_POLICY != "DEFAULT" ]; then
+ eval cdm $DIR "INPUT" $L $CDM_POLICY
+ break
+ fi
if [ $COPYNOTLINK = 1 ]; then
cp "$WFDIR/shared/$L" "$DIR/$L" 2>&1 >& $INFO
checkError 254 "Failed to copy input file $L"
log "Copied input: $WFDIR/shared/$L to $DIR/$L"
else
+ [ -f $WFDIR/shared/$L ]
+ checkError 254 "Could not locate input file: $L"
ln -s "$WFDIR/shared/$L" "$DIR/$L" 2>&1 >& $INFO
checkError 254 "Failed to link input file $L"
log "Linked input: $WFDIR/shared/$L to $DIR/$L"
fi
done
+logstate "LINK_CDM_OUTPUTS"
+for L in $OUTF ; do
+ CDM_POLICY=$( perl shared/cdm.pl $L < shared/$CDM_FILE )
+ log "CDM_POLICY: $CDM_POLICY"
+ if [ $CDM_POLICY != "DEFAULT" ]; then
+ eval cdm $DIR "OUTPUT" $L $CDM_POLICY
+ eval CDM_OUTPUT_$L="SKIP"
+ break
+ fi
+done
+
logstate "EXECUTE"
cd $DIR
-#ls >>$WRAPPERLOG
if [ ! -f "$EXEC" ]; then
fail 254 "The executable $EXEC does not exist"
fi
@@ -349,6 +405,7 @@
MISSING=
for O in $OUTF ; do
+ log "checking: $O"
if [ ! -f "$DIR/$O" ]; then
if [ "$MISSING" == "" ]; then
MISSING=$O
@@ -361,19 +418,25 @@
fail 254 "The following output files were not created by the application: $MISSING"
fi
-logstate "MOVING_OUTPUTS"
+logstate "MOVING_OUTPUTS $OUTF"
for O in $OUTF ; do
- mv "$DIR/$O" "$WFDIR/shared/$O" 2>&1 >& "$INFO"
- checkError 254 "Failed to move output file $O to shared directory"
+ log "output: $O ${CDM_OUTPUTS}"
+ if [ $( eval echo "$"$( echo CDM_OUTPUT_$O ) ) != "SKIP" ]; then
+ mv "$DIR/$O" "$WFDIR/shared/$O" 2>&1 >& "$INFO"
+ checkError 254 "Failed to move output file $O to shared directory"
+ else
+ log "CDM: SKIP OUTPUT: $O"
+ fi
done
logstate "RM_JOBDIR"
-rm -rf "$DIR" 2>&1 >& "$INFO"
+# rm -rf "$DIR" 2>&1 >& "$INFO"
checkError 254 "Failed to remove job directory $DIR"
if [ "$STATUSMODE" = "files" ]; then
logstate "TOUCH_SUCCESS"
touch $WFDIR/status/${JOBDIR}/${ID}-success
+ log "TOUCH CODE: $WFDIR/status/${JOBDIR}/${ID}-success $?"
fi
logstate "END"
Added: trunk/libexec/cdm.pl
===================================================================
--- trunk/libexec/cdm.pl (rev 0)
+++ trunk/libexec/cdm.pl 2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,46 @@
+
+# CDM Lookup
+
+sub print_hash
+{
+ my $hash = $_[0];
+ foreach (keys %$hash)
+ {
+ print "$_ : $$hash{$_} \n";
+ }
+}
+
+sub cdm_lookup
+{
+ my $hash = $_[0];
+ my $file = $_[1];
+
+ $result = "DEFAULT";
+ foreach (keys %$hash)
+ {
+ $pattern = $_;
+ if ($file =~ /$pattern/)
+ {
+ print("$$hash{$pattern}\n");
+ last;
+ }
+ }
+}
+
+# Command-line arguments:
+$file = $ARGV[0];
+
+# Read fs.data off of stdin:
+%map = ();
+while (<STDIN>)
+{
+ chomp;
+ next if $_ eq "";
+
+ @tokens = split(/ /, $_);
+ $key = shift(@tokens);
+ $rest = join(' ', @tokens);
+ $map{$key} = $rest;
+}
+
+cdm_lookup(\%map, $file);
Modified: trunk/libexec/vdl-int.k
===================================================================
--- trunk/libexec/vdl-int.k 2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/libexec/vdl-int.k 2010-02-03 21:12:49 UTC (rev 3221)
@@ -215,6 +215,7 @@
)
)
)
+ log(LOG:INFO, "END cleanups={cleanup}")
)
element(cleanupFiles, [files, host]
@@ -245,6 +246,21 @@
element(doStagein, [jobid, files, dir, host]
log(LOG:INFO, "START jobid={jobid} - Staging in files")
+
+ cdmfile := cdm:file()
+ log(LOG:DEBUG, "cdmfile: {cdmfile}")
+ log(LOG:DEBUG, "swift.home: {swift.home}")
+ libexec := "{swift.home}/libexec"
+
+ if (cdmfile != "" then(
+ doStageinFile(provider="file", srchost="localhost", srcfile=basename(cdmfile),
+ srcdir=dirname(cdmfile), desthost=host, destdir=dir,
+ size=file:size(cdmfile), policy="DEFAULT")
+ doStageinFile(provider="file", srchost="localhost", srcfile="cdm.pl",
+ srcdir=libexec, desthost=host, destdir=dir,
+ size=file:size("{libexec}/cdm.pl}"), policy="DEFAULT")
+ ))
+
uParallelFor(file, files
provider := provider(file)
srchost := hostname(file)
@@ -253,25 +269,35 @@
filename := basename(file)
size := file:size("{srcdir}/{filename}", host=srchost, provider=provider)
- vdl:cacheAddAndLockFile(filename, destdir, host, size
- cleanupFiles(cacheFilesToRemove, host)
- log(LOG:DEBUG, "FILE_STAGE_IN_START file={file} srchost={srchost} srcdir={srcdir} srcname={filename} ",
- "desthost={host} destdir={destdir} provider={provider}")
- restartOnError(".*", 2
- task:transfer(srcprovider=provider, srchost=srchost, srcfile=filename,
- srcdir=srcdir, desthost=host, destdir=destdir)
- )
- log(LOG:DEBUG, "FILE_STAGE_IN_END file={file} srchost={srchost} srcdir={srcdir} srcname={filename} ",
- "desthost={host} destdir={destdir} provider={provider}")
- )
- )
+ policy := cdm:query(query=file)
+ log(LOG:DEBUG, "policy: {file} : {policy}")
+
+ doStageinFile(provider=provider, srchost=srchost, srcfile=filename,
+ srcdir=srcdir, desthost=host, destdir=destdir, size=size, policy=policy)
+ )
log(LOG:INFO, "END jobid={jobid} - Staging in finished")
)
-
+
+ element(doStageinFile, [provider, srchost, srcfile, srcdir, desthost, destdir, size, policy]
+ vdl:cacheAddAndLockFile(srcfile, destdir, desthost, size
+ cleanupFiles(cacheFilesToRemove, desthost)
+
+ log(LOG:DEBUG, "FILE_STAGE_IN_START file={srcfile} srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
+ "desthost={desthost} destdir={destdir} provider={provider}")
+ if (policy == "DEFAULT" then(
+ restartOnError(".*", 2
+ task:transfer(srcprovider=provider, srchost=srchost, srcfile=srcfile,
+ srcdir=srcdir, desthost=desthost, destdir=destdir)))
+ else(log(LOG:DEBUG, "FILE_STAGE_IN_SKIP file={srcfile} policy={policy}")))
+ log(LOG:DEBUG, "FILE_STAGE_IN_END file={srcfile} srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
+ "desthost={desthost} destdir={destdir} provider={provider}")
+ )
+ )
+
element(doStageout, [jobid, stageouts, dir, host]
log(LOG:INFO, "START jobid={jobid} - Staging out files")
done := list(
- uparallelFor(pv, stageouts
+ uParallelFor(pv, stageouts
[path, var] := each(pv)
file := vdl:absfilename(vdl:getfield(var, path = path))
provider := vdl:provider(file)
@@ -286,9 +312,12 @@
"destdir={ldir} desthost={dhost} provider={provider}")
//make sure we do have the directory on the client side
dir:make(ldir, host=dhost, provider=provider)
- restartOnError(".*", 2
- task:transfer(srchost=host, srcfile=bname,
- srcdir=rdir, destdir=ldir, desthost=dhost, destprovider=provider)
+ policy := cdm:query(query=file)
+ if (policy == "DEFAULT" then(
+ restartOnError(".*", 2
+ task:transfer(srchost=host, srcfile=bname,
+ srcdir=rdir, destdir=ldir, desthost=dhost, destprovider=provider)))
+ else(log(LOG:DEBUG, "FILE_STAGE_OUT_SKIP srcname={bname}"))
)
log(LOG:DEBUG, "FILE_STAGE_OUT_END srcname={bname} srcdir={rdir} srchost={host} ",
"destdir={ldir} desthost={dhost} provider={provider}")
@@ -437,6 +466,7 @@
"-if ",flatten(infiles(stagein)),nl(),
"-of ",flatten(outfiles(stageout)),nl(),
"-k ",kickstart,nl(),
+ "-cdmfile ",cdm:file(),nl(),
"-status ",statusMode,nl())
for(argiterator, arguments
sys:file:write(wrapfile,append=true,"-a ",argiterator,nl())
@@ -497,7 +527,8 @@
"-if", flatten(infiles(stagein)),
"-of", flatten(outfiles(stageout)),
"-k", kickstart,
- "-status", statusMode
+ "-cdmfile", cdm:file(),
+ "-status", statusMode,
"-a", maybe(each(arguments))
)
directory=wfdir
Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml 2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/libexec/vdl-lib.xml 2010-02-03 21:12:49 UTC (rev 3221)
@@ -97,4 +97,10 @@
<export name="tparallelFor"><elementDef classname="org.griphyn.vdl.karajan.lib.ThrottledParallelFor"/></export>
</namespace>
+
+ <namespace prefix="cdm">
+ <export name="query"><elementDef classname="org.globus.swift.data.Query"/></export>
+ <export name="file"><elementDef classname="org.globus.swift.data.Query"/></export>
+ </namespace>
+
</karajan>
Modified: trunk/libexec/vdl.k
===================================================================
--- trunk/libexec/vdl.k 2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/libexec/vdl.k 2010-02-03 21:12:49 UTC (rev 3221)
@@ -155,7 +155,9 @@
if(
sys:not(anyerrors) then(
//hmm, you can append to channels!
+ log(LOG:DEBUG, "Starting cleanups")
append(warnings, from(warnings, cleanups(cleanup)))
+ log(LOG:DEBUG, "Ending cleanups")
)
else(
log(LOG:INFO, "Errors detected. Cleanup not done.")
Added: trunk/src/org/globus/swift/data/Director.java
===================================================================
--- trunk/src/org/globus/swift/data/Director.java (rev 0)
+++ trunk/src/org/globus/swift/data/Director.java 2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,67 @@
+
+package org.globus.swift.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+import org.globus.swift.data.policy.Policy;
+import org.globus.swift.data.util.LineReader;
+import org.griphyn.vdl.karajan.Loader;
+
+/**
+ * Manages CDM policies for files based on pattern matching.
+ * @author wozniak
+ * */
+
+public class Director {
+ private static final Logger logger = Logger.getLogger(Director.class);
+
+ /**
+ Save the location of the given CDM policy file
+ */
+ static File policyFile;
+
+ /**
+ Maps from Patterns to Policies
+ */
+ static Map map;
+
+ static {
+ map = new LinkedHashMap();
+ }
+
+ public static void load(File policyFile) throws IOException {
+ logger.info("loading: " + policyFile);
+ Director.policyFile = policyFile;
+ LineReader lines = new LineReader();
+ List list = lines.read(policyFile);
+ for (Iterator it = list.iterator(); it.hasNext(); ) {
+ String s = (String) it.next();
+ addLine(s);
+ }
+ }
+
+ static void addLine(String s) {
+ String[] tokens = LineReader.tokenize(s);
+ Pattern pattern = Pattern.compile(tokens[0]);
+ Policy policy = Policy.valueOf(tokens[1]);
+ map.put(pattern, policy);
+ }
+
+ public static Policy lookup(String file) {
+ for (Iterator it = map.keySet().iterator(); it.hasNext(); ) {
+ Pattern pattern = (Pattern) it.next();
+ Matcher matcher = pattern.matcher(file);
+ if (matcher.matches())
+ return (Policy) map.get(pattern);
+ }
+ return Policy.DEFAULT;
+ }
+}
Added: trunk/src/org/globus/swift/data/Query.java
===================================================================
--- trunk/src/org/globus/swift/data/Query.java (rev 0)
+++ trunk/src/org/globus/swift/data/Query.java 2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,36 @@
+package org.globus.swift.data;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.TypeUtil;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.nodes.functions.FunctionsCollection;
+
+import org.globus.swift.data.policy.Policy;
+
+public class Query extends FunctionsCollection {
+
+ public static final Arg PA_QUERY = new Arg.Positional("query");
+
+ static {
+ setArguments("cdm_query", new Arg[]{ PA_QUERY });
+ setArguments("cdm_file", new Arg[]{});
+ }
+
+ public String cdm_query(VariableStack stack) throws ExecutionException {
+ String file = (String) PA_QUERY.getValue(stack);
+ Policy policy = Director.lookup(file);
+ return policy.toString();
+ }
+
+ public String cdm_file(VariableStack stack) throws ExecutionException {
+ String file = "";
+ if (Director.policyFile != null)
+ file = Director.policyFile.toString();
+ return file;
+ }
+}
Added: trunk/src/org/globus/swift/data/policy/Default.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Default.java (rev 0)
+++ trunk/src/org/globus/swift/data/policy/Default.java 2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,9 @@
+
+package org.globus.swift.data.policy;
+
+public class Default extends Policy {
+
+ public String toString() {
+ return "DEFAULT";
+ }
+}
Added: trunk/src/org/globus/swift/data/policy/Direct.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Direct.java (rev 0)
+++ trunk/src/org/globus/swift/data/policy/Direct.java 2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,8 @@
+package org.globus.swift.data.policy;
+
+public class Direct extends Policy {
+
+ public String toString() {
+ return "DIRECT";
+ }
+}
Added: trunk/src/org/globus/swift/data/policy/Policy.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Policy.java (rev 0)
+++ trunk/src/org/globus/swift/data/policy/Policy.java 2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,21 @@
+
+package org.globus.swift.data.policy;
+
+import java.util.List;
+
+public class Policy {
+
+ public static Policy DEFAULT = valueOf("default");
+
+ public Policy()
+ {}
+
+ public static Policy valueOf(String token) {
+ if (token.compareToIgnoreCase("default") == 0)
+ return new Default();
+ else if (token.compareToIgnoreCase("direct") == 0)
+ return new Direct();
+
+ return null;
+ }
+}
Added: trunk/src/org/globus/swift/data/util/LineReader.java
===================================================================
--- trunk/src/org/globus/swift/data/util/LineReader.java (rev 0)
+++ trunk/src/org/globus/swift/data/util/LineReader.java 2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,78 @@
+
+package org.globus.swift.data.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.globus.cog.karajan.arguments.Arg;
+
+public class LineReader {
+
+ public LineReader()
+ {}
+
+ public static List read(File file) throws FileNotFoundException {
+ BufferedReader reader =
+ new BufferedReader(new FileReader(file));
+ return read(reader);
+ }
+
+ public static List read(String s) {
+ BufferedReader reader =
+ new BufferedReader(new StringReader(s));
+ return read(reader);
+ }
+
+ public static List read(BufferedReader reader) {
+ List result = new ArrayList();
+ try
+ {
+ String prevline = "";
+ String line = "";
+ while ((line = reader.readLine()) != null) {
+ int hash = line.indexOf("#");
+ if (hash >= 0)
+ line = line.substring(0,hash);
+ line = (prevline + " " + line).trim();
+ if (line.endsWith("\\")) {
+ line = line.substring(0, line.length()-2);
+ prevline = line;
+ continue;
+ }
+ else {
+ prevline = "";
+ line = line.trim();
+ if (line.length() > 0)
+ result.add(line);
+ }
+ }
+ reader.close();
+ }
+ catch (IOException e)
+ {
+ System.out.println("LineReader: I/O problem.");
+ return null;
+ }
+ return result;
+ }
+
+ public static String[] tokenize(String line) {
+ if (line == null)
+ return null;
+ List words = new ArrayList();
+ String[] ws = line.split("\\s");
+ for (int i = 0; i < ws.length; i++)
+ if (ws[i].length() > 0)
+ words.add(ws[i]);
+ String[] result = new String[words.size()];
+ for (int i = 0; i < words.size(); i++)
+ result[i] = (String) words.get(i);
+ return result;
+ }
+}
More information about the Swift-commit
mailing list