[Swift-commit] r3221 - in trunk: etc libexec src/org/globus/swift src/org/globus/swift/data src/org/globus/swift/data/policy src/org/globus/swift/data/util

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Wed Feb 3 15:12:49 CST 2010


Author: wozniak
Date: 2010-02-03 15:12:49 -0600 (Wed, 03 Feb 2010)
New Revision: 3221

Added:
   trunk/libexec/cdm.pl
   trunk/src/org/globus/swift/data/
   trunk/src/org/globus/swift/data/Director.java
   trunk/src/org/globus/swift/data/Query.java
   trunk/src/org/globus/swift/data/policy/
   trunk/src/org/globus/swift/data/policy/Default.java
   trunk/src/org/globus/swift/data/policy/Direct.java
   trunk/src/org/globus/swift/data/policy/Policy.java
   trunk/src/org/globus/swift/data/util/
   trunk/src/org/globus/swift/data/util/LineReader.java
Modified:
   trunk/etc/log4j.properties
   trunk/libexec/_swiftwrap
   trunk/libexec/vdl-int.k
   trunk/libexec/vdl-lib.xml
   trunk/libexec/vdl.k
Log:
General framework for CDM functionality, including DIRECT policy. 


Modified: trunk/etc/log4j.properties
===================================================================
--- trunk/etc/log4j.properties	2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/etc/log4j.properties	2010-02-03 21:12:49 UTC (rev 3221)
@@ -26,4 +26,6 @@
 log4j.logger.org.globus.cog.abstraction.impl.common.task.TaskImpl=DEBUG
 log4j.logger.org.griphyn.vdl.karajan.lib.GetFieldValue=DEBUG
 log4j.logger.org.griphyn.vdl.engine.Karajan=INFO
-log4j.logger.org.globus.cog.abstraction.coaster.rlog=INFO
\ No newline at end of file
+log4j.logger.org.globus.cog.abstraction.coaster.rlog=INFO
+
+log4j.logger.org.globus.swift.data.Director=DEBUG

Modified: trunk/libexec/_swiftwrap
===================================================================
--- trunk/libexec/_swiftwrap	2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/libexec/_swiftwrap	2010-02-03 21:12:49 UTC (rev 3221)
@@ -113,6 +113,37 @@
 	exec 3>&-
 }
 
+cdm() {
+    log "CDM: $@"
+
+    local JOBDIR=$1  # Given jobdir
+    local MODE=$2    # INPUT or OUTPUT
+    local FILE=$3    # User file
+    local POLICY=$4  # DIRECT, BROADCAST, ... 
+    shift 4
+    local ARGS=$@
+
+    case $POLICY in 
+	DIRECT)
+	    DIRECT_DIR=${ARGS[0]}
+	    log "CDM[DIRECT]: Linking $JOBDIR/$FILE to $DIRECT_DIR/$FILE"
+	    if [ $MODE == "INPUT" ]; then 
+		[ -f "$DIRECT_DIR/$FILE" ]
+		checkError 254 "CDM[DIRECT]: $DIRECT_DIR/$FILE does not exist!"
+		ln -s $DIRECT_DIR/$FILE $JOBDIR/$FILE
+		checkError 254 "CDM[DIRECT]: Linking to $DIRECT_DIR/$FILE failed!"
+	    elif [ $MODE == "OUTPUT" ]; then 
+		touch $DIRECT_DIR/$FILE
+		checkError 254 "CDM[DIRECT]: Touching $DIRECT_DIR/$FILE failed!"
+		ln -s $DIRECT_DIR/$FILE $JOBDIR/$FILE
+		checkError 254 "CDM[DIRECT]: Linking to $DIRECT_DIR/$FILE failed!"
+	    else 
+		fail 254 "Unknown MODE: $MODE"
+	    fi
+	    ;;
+    esac
+}
+
 COMMANDLINE=$@
 
 # get the parent directory of the directory containing _swiftwrap, to use
@@ -162,7 +193,6 @@
 logstate "LOG_START"
 infosection "Wrapper"
 
-
 getarg "-e" "$@"
 EXEC=$VALUE
 shift $SHIFTCOUNT
@@ -195,6 +225,10 @@
 KICKSTART=$VALUE
 shift $SHIFTCOUNT
 
+getarg "-cdmfile" "$@"
+CDM_FILE=$VALUE
+shift $SHIFTCOUNT
+
 getarg "-status" "$@"
 STATUSMODE=$VALUE
 shift $SHIFTCOUNT
@@ -259,6 +293,7 @@
 log "INF=$INF"
 log "OUTF=$OUTF"
 log "KICKSTART=$KICKSTART"
+log "CDM_FILE=$CDM_FILE"
 log "ARGS=$@"
 log "ARGC=$#"
 
@@ -276,24 +311,45 @@
 	log "Created output directory: $DIR/$D"
 done
 
+log "PWD: $( pwd )"
+log "FIND: $( find . )"
+
 logstate "LINK_INPUTS"
 for L in $INF ; do
+        CDM_POLICY=$( perl shared/cdm.pl $L < shared/$CDM_FILE )
+	log "CDM_POLICY: $CDM_POLICY" 
+	if [ $CDM_POLICY != "DEFAULT" ]; then
+    	        eval cdm $DIR "INPUT" $L $CDM_POLICY 
+		break
+	fi
 	if [ $COPYNOTLINK = 1 ]; then
 		cp "$WFDIR/shared/$L" "$DIR/$L" 2>&1 >& $INFO
 		checkError 254 "Failed to copy input file $L"
 		log "Copied input: $WFDIR/shared/$L to $DIR/$L"
 	else
+		[ -f $WFDIR/shared/$L ]
+		checkError 254 "Could not locate input file: $L" 
 		ln -s "$WFDIR/shared/$L" "$DIR/$L" 2>&1 >& $INFO
 		checkError 254 "Failed to link input file $L"
 		log "Linked input: $WFDIR/shared/$L to $DIR/$L"
 	fi
 done
 
+logstate "LINK_CDM_OUTPUTS"
+for L in $OUTF ; do
+	CDM_POLICY=$( perl shared/cdm.pl $L < shared/$CDM_FILE )
+	log "CDM_POLICY: $CDM_POLICY" 
+	if [ $CDM_POLICY != "DEFAULT" ]; then
+    	        eval cdm $DIR "OUTPUT" $L $CDM_POLICY
+		eval CDM_OUTPUT_$L="SKIP"
+		break
+	fi
+done
+
 logstate "EXECUTE"
 cd $DIR
 
 
-#ls >>$WRAPPERLOG
 if [ ! -f "$EXEC" ]; then
 	fail 254 "The executable $EXEC does not exist"
 fi
@@ -349,6 +405,7 @@
 
 MISSING=
 for O in $OUTF ; do
+    log "checking: $O"
 	if [ ! -f "$DIR/$O" ]; then
 		if [ "$MISSING" == "" ]; then
 			MISSING=$O
@@ -361,19 +418,25 @@
 	fail 254 "The following output files were not created by the application: $MISSING"
 fi
 
-logstate "MOVING_OUTPUTS"
+logstate "MOVING_OUTPUTS $OUTF"
 for O in $OUTF ; do
-	mv "$DIR/$O" "$WFDIR/shared/$O" 2>&1 >& "$INFO"
-	checkError 254 "Failed to move output file $O to shared directory"
+    log "output: $O ${CDM_OUTPUTS}"
+	if [ $( eval echo "$"$( echo CDM_OUTPUT_$O ) ) != "SKIP" ]; then
+	    mv "$DIR/$O" "$WFDIR/shared/$O" 2>&1 >&	"$INFO"
+	    checkError 254 "Failed to move output file $O to shared directory"
+	else
+	    log "CDM: SKIP OUTPUT: $O"
+	fi
 done
 
 logstate "RM_JOBDIR"
-rm -rf "$DIR" 2>&1 >& "$INFO"
+# rm -rf "$DIR" 2>&1 >& "$INFO"
 checkError 254 "Failed to remove job directory $DIR" 
 
 if [ "$STATUSMODE" = "files" ]; then
 	logstate "TOUCH_SUCCESS"
 	touch $WFDIR/status/${JOBDIR}/${ID}-success
+	log "TOUCH CODE: $WFDIR/status/${JOBDIR}/${ID}-success $?"
 fi
 
 logstate "END"

Added: trunk/libexec/cdm.pl
===================================================================
--- trunk/libexec/cdm.pl	                        (rev 0)
+++ trunk/libexec/cdm.pl	2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,46 @@
+
+# CDM Lookup 
+
+sub print_hash
+{
+    my $hash = $_[0];
+    foreach (keys %$hash) 
+    {
+        print "$_ : $$hash{$_} \n";
+    }
+}
+
+sub cdm_lookup
+{
+    my $hash = $_[0];
+    my $file = $_[1];
+
+    $result = "DEFAULT";
+    foreach (keys %$hash)
+    {
+	$pattern = $_;
+	if ($file =~ /$pattern/)
+	{
+	    print("$$hash{$pattern}\n");
+	    last;
+	}
+    }
+}
+
+# Command-line arguments: 
+$file = $ARGV[0];
+
+# Read fs.data off of stdin: 
+%map = ();
+while (<STDIN>)
+{
+    chomp;
+    next if $_ eq "";
+
+    @tokens = split(/ /, $_); 
+    $key    = shift(@tokens);
+    $rest   = join(' ', @tokens);
+    $map{$key} = $rest;
+}
+
+cdm_lookup(\%map, $file);

Modified: trunk/libexec/vdl-int.k
===================================================================
--- trunk/libexec/vdl-int.k	2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/libexec/vdl-int.k	2010-02-03 21:12:49 UTC (rev 3221)
@@ -215,6 +215,7 @@
 					)
 				)
 			)
+			log(LOG:INFO, "END cleanups={cleanup}")
 		)
 		
 		element(cleanupFiles, [files, host]
@@ -245,6 +246,21 @@
 
 		element(doStagein, [jobid, files, dir, host]
 			log(LOG:INFO, "START jobid={jobid} - Staging in files")
+
+			cdmfile := cdm:file()
+			log(LOG:DEBUG, "cdmfile: {cdmfile}")
+			log(LOG:DEBUG, "swift.home: {swift.home}")
+			libexec := "{swift.home}/libexec"
+
+			if (cdmfile != "" then(
+				doStageinFile(provider="file", srchost="localhost", srcfile=basename(cdmfile), 
+					    	srcdir=dirname(cdmfile), desthost=host, destdir=dir, 
+						size=file:size(cdmfile), policy="DEFAULT")
+				doStageinFile(provider="file", srchost="localhost", srcfile="cdm.pl", 
+					    	srcdir=libexec, desthost=host, destdir=dir, 
+						size=file:size("{libexec}/cdm.pl}"), policy="DEFAULT")
+			))
+
 			uParallelFor(file, files
 				provider := provider(file)
 				srchost := hostname(file)
@@ -253,25 +269,35 @@
 				filename := basename(file)
 				size := file:size("{srcdir}/{filename}", host=srchost, provider=provider)
 
-				vdl:cacheAddAndLockFile(filename, destdir, host, size
-					cleanupFiles(cacheFilesToRemove, host)
-					log(LOG:DEBUG, "FILE_STAGE_IN_START file={file} srchost={srchost} srcdir={srcdir} srcname={filename} ",
-						"desthost={host} destdir={destdir} provider={provider}")
-					restartOnError(".*", 2
-					    task:transfer(srcprovider=provider, srchost=srchost, srcfile=filename, 
-					    	srcdir=srcdir, desthost=host, destdir=destdir)
-					)
-					log(LOG:DEBUG, "FILE_STAGE_IN_END file={file} srchost={srchost} srcdir={srcdir} srcname={filename} ",
-						"desthost={host} destdir={destdir} provider={provider}")
-				)
-			)
+				policy := cdm:query(query=file)
+				log(LOG:DEBUG, "policy: {file} : {policy}")
+
+				doStageinFile(provider=provider, srchost=srchost, srcfile=filename, 
+						srcdir=srcdir, desthost=host, destdir=destdir, size=size, policy=policy)
+			)			
 			log(LOG:INFO, "END jobid={jobid} - Staging in finished")
 		)
-		
+
+		element(doStageinFile, [provider, srchost, srcfile, srcdir, desthost, destdir, size, policy]
+			vdl:cacheAddAndLockFile(srcfile, destdir, desthost, size
+				       cleanupFiles(cacheFilesToRemove, desthost)
+
+			log(LOG:DEBUG, "FILE_STAGE_IN_START file={srcfile} srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
+					"desthost={desthost} destdir={destdir} provider={provider}")
+			if (policy == "DEFAULT" then( 
+				restartOnError(".*", 2
+					task:transfer(srcprovider=provider, srchost=srchost, srcfile=srcfile, 
+							srcdir=srcdir, desthost=desthost, destdir=destdir)))
+			else(log(LOG:DEBUG, "FILE_STAGE_IN_SKIP file={srcfile} policy={policy}")))
+			log(LOG:DEBUG, "FILE_STAGE_IN_END file={srcfile} srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
+                                               "desthost={desthost} destdir={destdir} provider={provider}")
+			)			
+ 	        )
+
 		element(doStageout, [jobid, stageouts, dir, host]
 			log(LOG:INFO, "START jobid={jobid} - Staging out files")
 			done := list(
-				uparallelFor(pv, stageouts
+				uParallelFor(pv, stageouts
 					[path, var] := each(pv)
 					file := vdl:absfilename(vdl:getfield(var, path = path))
 					provider := vdl:provider(file)
@@ -286,9 +312,12 @@
 							"destdir={ldir} desthost={dhost} provider={provider}")
 					//make sure we do have the directory on the client side
 					dir:make(ldir, host=dhost, provider=provider)
-					restartOnError(".*", 2
-					    task:transfer(srchost=host, srcfile=bname,
-					        srcdir=rdir, destdir=ldir, desthost=dhost, destprovider=provider)
+					policy := cdm:query(query=file)
+					if (policy == "DEFAULT" then(
+						restartOnError(".*", 2
+							task:transfer(srchost=host, srcfile=bname,
+								srcdir=rdir, destdir=ldir, desthost=dhost, destprovider=provider)))
+					else(log(LOG:DEBUG, "FILE_STAGE_OUT_SKIP srcname={bname}"))
 					)
 					log(LOG:DEBUG, "FILE_STAGE_OUT_END srcname={bname} srcdir={rdir} srchost={host} ",
 						"destdir={ldir} desthost={dhost} provider={provider}")
@@ -437,6 +466,7 @@
 							"-if ",flatten(infiles(stagein)),nl(),
 							"-of ",flatten(outfiles(stageout)),nl(),
 							"-k ",kickstart,nl(),
+							"-cdmfile ",cdm:file(),nl(),
 							"-status ",statusMode,nl())
 						for(argiterator, arguments
 							sys:file:write(wrapfile,append=true,"-a ",argiterator,nl())
@@ -497,7 +527,8 @@
 								"-if", flatten(infiles(stagein)), 
 								"-of", flatten(outfiles(stageout)),
 								"-k", kickstart,
-								"-status", statusMode
+								"-cdmfile", cdm:file(),
+								"-status", statusMode,
 								"-a", maybe(each(arguments))
 							)
 							directory=wfdir

Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml	2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/libexec/vdl-lib.xml	2010-02-03 21:12:49 UTC (rev 3221)
@@ -97,4 +97,10 @@
 	
 	<export name="tparallelFor"><elementDef classname="org.griphyn.vdl.karajan.lib.ThrottledParallelFor"/></export>
   </namespace>
+
+  <namespace prefix="cdm">
+  	<export name="query"><elementDef classname="org.globus.swift.data.Query"/></export>
+  	<export name="file"><elementDef classname="org.globus.swift.data.Query"/></export>
+  </namespace>
+
 </karajan>

Modified: trunk/libexec/vdl.k
===================================================================
--- trunk/libexec/vdl.k	2010-01-25 22:00:15 UTC (rev 3220)
+++ trunk/libexec/vdl.k	2010-02-03 21:12:49 UTC (rev 3221)
@@ -155,7 +155,9 @@
 			if(
 				sys:not(anyerrors) then(
 					//hmm, you can append to channels!
+					log(LOG:DEBUG, "Starting cleanups")
 					append(warnings, from(warnings, cleanups(cleanup)))
+					log(LOG:DEBUG, "Ending cleanups")
 				)
 				else(
 					log(LOG:INFO, "Errors detected. Cleanup not done.")

Added: trunk/src/org/globus/swift/data/Director.java
===================================================================
--- trunk/src/org/globus/swift/data/Director.java	                        (rev 0)
+++ trunk/src/org/globus/swift/data/Director.java	2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,67 @@
+
+package org.globus.swift.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+import org.globus.swift.data.policy.Policy;
+import org.globus.swift.data.util.LineReader;
+import org.griphyn.vdl.karajan.Loader;
+
+/**
+ * Manages CDM policies for files based on pattern matching.
+ * @author wozniak
+ * */
+
+public class Director {
+    private static final Logger logger = Logger.getLogger(Director.class);
+
+    /**
+       Save the location of the given CDM policy file
+     */
+    static File policyFile;
+
+    /**
+       Maps from Patterns to Policies
+     */
+    static Map map;
+
+    static {
+    	map = new LinkedHashMap();
+    }
+
+    public static void load(File policyFile) throws IOException {
+        logger.info("loading: " + policyFile);
+        Director.policyFile = policyFile;
+        LineReader lines = new LineReader();
+        List list = lines.read(policyFile);
+        for (Iterator it = list.iterator(); it.hasNext(); ) {
+            String s = (String) it.next();
+            addLine(s);
+        }
+    }
+
+    static void addLine(String s) {
+        String[] tokens = LineReader.tokenize(s);
+        Pattern pattern = Pattern.compile(tokens[0]);
+        Policy policy = Policy.valueOf(tokens[1]);
+        map.put(pattern, policy);
+    }
+
+    public static Policy lookup(String file) {
+    	for (Iterator it = map.keySet().iterator(); it.hasNext(); ) {
+    	    Pattern pattern = (Pattern) it.next();
+    		Matcher matcher = pattern.matcher(file);
+    		if (matcher.matches())
+    			return (Policy) map.get(pattern);
+    	}
+    	return Policy.DEFAULT;
+    }
+}

Added: trunk/src/org/globus/swift/data/Query.java
===================================================================
--- trunk/src/org/globus/swift/data/Query.java	                        (rev 0)
+++ trunk/src/org/globus/swift/data/Query.java	2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,36 @@
+package org.globus.swift.data;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.TypeUtil;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.nodes.functions.FunctionsCollection;
+
+import org.globus.swift.data.policy.Policy;
+
+public class Query extends FunctionsCollection {
+
+    public static final Arg PA_QUERY = new Arg.Positional("query");
+
+    static {
+        setArguments("cdm_query", new Arg[]{ PA_QUERY });
+        setArguments("cdm_file", new Arg[]{});
+    }
+
+    public String cdm_query(VariableStack stack) throws ExecutionException {
+        String file = (String) PA_QUERY.getValue(stack);
+        Policy policy = Director.lookup(file);
+        return policy.toString();
+    }
+
+    public String cdm_file(VariableStack stack) throws ExecutionException {
+        String file = "";
+        if (Director.policyFile != null)
+            file = Director.policyFile.toString();
+        return file;
+    }
+}

Added: trunk/src/org/globus/swift/data/policy/Default.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Default.java	                        (rev 0)
+++ trunk/src/org/globus/swift/data/policy/Default.java	2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,9 @@
+
+package org.globus.swift.data.policy;
+
+public class Default extends Policy {
+
+    public String toString() {
+        return "DEFAULT";
+    }
+}

Added: trunk/src/org/globus/swift/data/policy/Direct.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Direct.java	                        (rev 0)
+++ trunk/src/org/globus/swift/data/policy/Direct.java	2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,8 @@
+package org.globus.swift.data.policy;
+
+public class Direct extends Policy {
+
+    public String toString() {
+        return "DIRECT";
+    }
+}

Added: trunk/src/org/globus/swift/data/policy/Policy.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Policy.java	                        (rev 0)
+++ trunk/src/org/globus/swift/data/policy/Policy.java	2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,21 @@
+
+package org.globus.swift.data.policy;
+
+import java.util.List;
+
+public class Policy {
+
+	public static Policy DEFAULT = valueOf("default");
+	
+    public Policy()
+    {}
+
+    public static Policy valueOf(String token) {
+        if (token.compareToIgnoreCase("default") == 0)
+            return new Default();
+        else if (token.compareToIgnoreCase("direct") == 0)
+            return new Direct();
+
+        return null;
+    }
+}

Added: trunk/src/org/globus/swift/data/util/LineReader.java
===================================================================
--- trunk/src/org/globus/swift/data/util/LineReader.java	                        (rev 0)
+++ trunk/src/org/globus/swift/data/util/LineReader.java	2010-02-03 21:12:49 UTC (rev 3221)
@@ -0,0 +1,78 @@
+
+package org.globus.swift.data.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.globus.cog.karajan.arguments.Arg;
+
+public class LineReader {
+
+    public LineReader()
+    {}
+
+    public static List read(File file) throws FileNotFoundException {
+        BufferedReader reader =
+            new BufferedReader(new FileReader(file));
+        return read(reader);
+    }
+
+    public static List read(String s) {
+        BufferedReader reader =
+            new BufferedReader(new StringReader(s));
+        return read(reader);
+    }
+
+    public static List read(BufferedReader reader) {
+        List result = new ArrayList();
+        try
+        {
+            String prevline = "";
+            String line = "";
+            while ((line = reader.readLine()) != null) {
+                int hash = line.indexOf("#");
+                if (hash >= 0)
+                    line = line.substring(0,hash);
+                line = (prevline + " " + line).trim();
+                if (line.endsWith("\\")) {
+                    line = line.substring(0, line.length()-2);
+                    prevline = line;
+                    continue;
+                }
+                else {
+                    prevline = "";
+                    line = line.trim();
+                    if (line.length() > 0)
+                        result.add(line);
+                }
+            }
+            reader.close();
+        }
+        catch (IOException e)
+        {
+            System.out.println("LineReader: I/O problem.");
+            return null;
+        }
+        return result;
+    }
+
+    public static String[] tokenize(String line) {
+        if (line == null)
+            return null;
+        List words = new ArrayList();
+        String[] ws = line.split("\\s");
+        for (int i = 0; i < ws.length; i++)
+            if (ws[i].length() > 0)
+                words.add(ws[i]);
+        String[] result = new String[words.size()];
+        for (int i = 0; i < words.size(); i++)
+            result[i] = (String) words.get(i);
+        return result;
+    }
+}




More information about the Swift-commit mailing list