[Swift-commit] r3281 - in trunk: libexec src/org/globus/swift/data src/org/globus/swift/data/policy

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Sun Apr 11 16:50:15 CDT 2010


Author: wozniak
Date: 2010-04-11 16:50:14 -0500 (Sun, 11 Apr 2010)
New Revision: 3281

Modified:
   trunk/libexec/cdm_broadcast.sh
   trunk/libexec/vdl-int.k
   trunk/libexec/vdl-lib.xml
   trunk/src/org/globus/swift/data/Action.java
   trunk/src/org/globus/swift/data/Director.java
   trunk/src/org/globus/swift/data/Query.java
   trunk/src/org/globus/swift/data/policy/AllocationHook.java
   trunk/src/org/globus/swift/data/policy/Broadcast.java
Log:
Connect CDM BROADCAST functionality to Coasters notification.


Modified: trunk/libexec/cdm_broadcast.sh
===================================================================
--- trunk/libexec/cdm_broadcast.sh	2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/libexec/cdm_broadcast.sh	2010-04-11 21:50:14 UTC (rev 3281)
@@ -3,29 +3,80 @@
 SWIFT_HOME=$( dirname $( dirname $0 ) )
 LOG=${SWIFT_HOME}/etc/cdm_broadcast.log
 
+# For each given location, broadcast the given files to it
+# Input: bgp_broadcast [-l <location> <file>*]*
 bgp_broadcast()
 {
-  DIR=$1
-  FILE=$2
-  DEST=$3
-  if [[ ! -f ips.list ]] 
+  while [[ ${*} != "" ]]
+   do
+   L=$1 # -l
+   shift 
+   ARGS=$1 # Location
+   shift
+   while true
+    do
+    if [[ $1 == "-l" || $1 == "" ]] 
+      then
+      break
+    fi
+    ARGS="${ARGS} $1"
+    shift
+   done
+   bgp_broadcast_perform ${ARGS}
+  done
+}
+
+# Broadcast the given files to the given location
+# Input: bgp_broadcast_perform <location> <file>*
+bgp_broadcast_perform()
+{
+  LOCATION=$1
+  shift
+  WORK=( ${*} )
+
+  IP=$( listip ${LOCATION} )  
+
+  if [[ ${#WORK[@]} > 3 ]] 
     then
-    BLOCKS=$( qstat -u ${USER} | grep ${USER} | awk '{ print $6 }' )
-    IPS=$( listip ${BLOCKS} )
-    for IP in ${IPS}
+    SCRIPT=$( mktemp )
+    {
+      echo "#!/bin/sh"
+      while [[ ${*} != "" ]] 
+       do
+       FILE=$1
+       DEST=$2
+       shift 2
+       echo "/bin.rd/f2cn ${FILE} ${DEST}"
+      done
+    } > ${SCRIPT}
+    scp ${SCRIPT} ${IP}:${SCRIPT}
+    ssh ${IP} ${SCRIPT}
+  else
+    while [[ ${*} != "" ]] 
      do
-     echo ${IP}
-    done >> ip.list
-  else
-    while read T
-     do 
-     BLOCKS="$BLOCKS $T" 
-    done < ip.list
+     FILE=$1
+     DEST=$2
+     shift 2
+     ssh_until_success 120 ${IP} /bin.rd/f2cn ${FILE} ${DEST}
+    done
   fi
-  for IP in ${BLOCKS}
+}
+
+# Repeat command N times until success
+ssh_until_success()
+{
+  N=$1 
+  shift 
+  for (( i=0 ; i < N ; i++ ))
    do
-   ssh ${IP} /bin.rd/f2cn ${DIR}/${FILE} ${DEST}/${FILE}
+   ssh -o PasswordAuthentication=no ${*}
+   if [[ $? == 0 ]]; 
+     then
+     break
+   fi
+   sleep 1
   done
+  return 0
 }
 
 local_broadcast()
@@ -36,19 +87,15 @@
   cp -v ${FILE} ${DEST}/${FILE}
 }
   
+set -x
 {
-  declare -p PWD
-    set -x
-
-    FILE=$1
-    DIR=$2
-    DEST=$3
-
-    if [[ $( uname -p ) == "ppc64" ]]
-      then
-      bgp_broadcast ${DIR} ${FILE} ${DEST}
-    else 
-      bgp_local ${DIR} ${FILE} ${DEST}
-    fi
-
-} >> ${LOG} 2>&1
+  declare -p PWD LOG
+  
+  if [[ $( uname -p ) == "ppc64" ]]
+    then
+    bgp_broadcast ${*}
+  else 
+    bgp_local ${*}
+  fi
+  
+} >> /tmp/cdm_broadcast.log 2>&1 # ${LOG} 2>&1

Modified: trunk/libexec/vdl-int.k
===================================================================
--- trunk/libexec/vdl-int.k	2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/libexec/vdl-int.k	2010-04-11 21:50:14 UTC (rev 3281)
@@ -267,8 +267,6 @@
 			log(LOG:INFO, "START jobid={jobid} - Staging in files")
 
 			cdmfile := cdm:file()
-			log(LOG:DEBUG, "cdmfile: {cdmfile}")
-			log(LOG:DEBUG, "swift.home: {swift.home}")
 			libexec := "{swift.home}/libexec"
 
 			if (cdmfile != "" then(
@@ -292,7 +290,7 @@
 				size := file:size("{srcdir}/{filename}", host=srchost, provider=provider)
 
 				policy := cdm:query(query=file)
-				log(LOG:DEBUG, "policy: {file} : {policy}")
+				log(LOG:DEBUG, "CDM: {file} : {policy}")
 
 				doStageinFile(provider=provider, srchost=srchost, srcfile=filename, 
 						srcdir=srcdir, desthost=host, destdir=destdir, size=size, policy=policy)
@@ -304,8 +302,10 @@
 			vdl:cacheAddAndLockFile(srcfile, destdir, desthost, size
 				       cleanupFiles(cacheFilesToRemove, desthost)
 
-			log(LOG:DEBUG, "FILE_STAGE_IN_START file={srcfile} srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
-					"desthost={desthost} destdir={destdir} provider={provider}")
+			log(LOG:DEBUG, "FILE_STAGE_IN_START file={srcfile} ",
+                                        "srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
+					"desthost={desthost} destdir={destdir} provider={provider} ", 
+                                        "policy={policy}")
 			if (policy == "DEFAULT" then( 
 				restartOnError(".*", 2
 					task:transfer(srcprovider=provider, srchost=srchost, srcfile=srcfile, 
@@ -314,9 +314,12 @@
 				log(LOG:DEBUG, "FILE_STAGE_IN_BROADCAST file={srcfile} policy={policy}")
 				cdm:broadcast(srcfile=srcfile, srcdir=srcdir))
 			else(log(LOG:DEBUG, "FILE_STAGE_IN_SKIP file={srcfile} policy={policy}")))
-				log(LOG:DEBUG, "FILE_STAGE_IN_END file={srcfile} srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
-                                              "desthost={desthost} destdir={destdir} provider={provider}")
-			)			
+				log(LOG:DEBUG, "FILE_STAGE_IN_END file={srcfile} ",
+					"srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
+                                        "desthost={desthost} destdir={destdir} provider={provider}")
+			)
+			cdm:wait()
+			echo("doStageinFile: complete: {srcfile}")
  	        )
 
 		element(doStageout, [jobid, stageouts, dir, host]

Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml	2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/libexec/vdl-lib.xml	2010-04-11 21:50:14 UTC (rev 3281)
@@ -106,6 +106,7 @@
 	<export name="get"><elementDef classname="org.globus.swift.data.Query"/></export>
   	<export name="file"><elementDef classname="org.globus.swift.data.Query"/></export>
   	<export name="broadcast"><elementDef classname="org.globus.swift.data.Action"/></export>
+  	<export name="wait"><elementDef classname="org.globus.swift.data.Action"/></export>
   </namespace>
 
 </karajan>

Modified: trunk/src/org/globus/swift/data/Action.java
===================================================================
--- trunk/src/org/globus/swift/data/Action.java	2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/Action.java	2010-04-11 21:50:14 UTC (rev 3281)
@@ -9,6 +9,9 @@
 import org.globus.swift.data.policy.Broadcast;
 import org.globus.swift.data.policy.Policy;
 
+/**
+ * Karajan-accessible CDM functions that change something.
+ * */
 public class Action extends FunctionsCollection {
 
     public static final Arg PA_FILE = new Arg.Positional("srcfile");
@@ -16,23 +19,39 @@
 
     static {
         setArguments("cdm_broadcast", new Arg[]{ PA_FILE, PA_DIR });
+        setArguments("cdm_wait", new Arg[]{});
     }
 
+    /**
+       Register a file for broadcast by CDM.
+       The actual broadcast is triggered by {@link cdm_wait}.
+    */
     public void cdm_broadcast(VariableStack stack) throws ExecutionException {
         String srcfile = (String) PA_FILE.getValue(stack);
         String srcdir  = (String) PA_DIR.getValue(stack);
+
+        System.out.println("cdm_broadcast()");
         
         Policy policy = Director.lookup(srcfile);
         
         if (!(policy instanceof Broadcast)) {
-            throw new RuntimeException("Attempting to BROADCAST the wrong file");
+            throw new RuntimeException
+                ("Attempting to BROADCAST the wrong file: directory: `" +
+                 srcdir + "' `" + srcfile + "' -> " + policy);
         }
         
         if (srcdir == "") { 
             srcdir = ".";
         }
-        
-        Broadcast broadcast   = (Broadcast) policy;
-        broadcast.action(srcfile, srcdir);
+
+        Director.addBroadcast(srcdir, srcfile);
     }
+
+    /**
+       Wait until CDM has ensured that all data has been propagated.
+    */
+    public void cdm_wait(VariableStack stack) throws ExecutionException {
+        System.out.println("cdm_wait()");
+        Director.doBroadcast();
+    }
 }

Modified: trunk/src/org/globus/swift/data/Director.java
===================================================================
--- trunk/src/org/globus/swift/data/Director.java	2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/Director.java	2010-04-11 21:50:14 UTC (rev 3281)
@@ -3,19 +3,23 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.log4j.Logger;
 import org.globus.swift.data.policy.Policy;
+import org.globus.swift.data.policy.Broadcast;
 import org.globus.swift.data.util.LineReader;
 import org.griphyn.vdl.karajan.Loader;
 
@@ -28,6 +32,11 @@
     private static final Logger logger = Logger.getLogger(Director.class);
 
     /**
+       Has a CDM policy file been provided?
+    */
+    static boolean enabled = false;
+    
+    /**
        Save the location of the given CDM policy file
      */
     static File policyFile;
@@ -43,12 +52,35 @@
     static Map<String,String> properties = new HashMap();
 
     /** 
-       Remember the files we have broadcasted
+       Remember the files we have broadcasted.
+       Map from allocations to filenames.
+       NOTE: must be accessed only using synchronized Director methods
      */
-    static Set<String> broadcasted = new HashSet<String>();
+    private static Map<String,Set<String>> broadcasted =
+        new LinkedHashMap<String,Set<String>>();
+
+    /**
+       Set of files to be broadcasted.
+       NOTE: must be accessed only using synchronized Director methods
+    */
+    private static Set<String> broadcastWork = new LinkedHashSet<String>();
     
+    /**
+       Remember all known allocations
+    */
+    private static List<String> allocations = new ArrayList<String>();
+
+    public static boolean isEnabled()
+    {
+        return enabled;
+    }
+    
+    /**
+       Read in the user-supplied CDM policy file.
+    */
     public static void load(File policyFile) throws IOException {
-        logger.info("loading: " + policyFile);
+        logger.info("CDM file: " + policyFile);
+        enabled = true;
         Director.policyFile = policyFile;
         LineReader lines = new LineReader();
         List list = lines.read(policyFile);
@@ -67,7 +99,6 @@
         else if (type.equals("property")) { 
             addProperty(tokens);
         }
-       
     }
 
     static void addRule(String[] tokens) { 
@@ -91,38 +122,127 @@
         }
         return result.toString();
     }
-    
+
+    /**
+       Obtain the CDM policy for a given file.
+    */
     public static Policy lookup(String file) {
+        logger.debug("Director.lookup(): map: " + map);
     	for (Pattern pattern : map.keySet()) {
-    		Matcher matcher = pattern.matcher(file);
-    		if (matcher.matches())
-    			return map.get(pattern);
+            Matcher matcher = pattern.matcher(file);
+            if (matcher.matches())
+                return map.get(pattern);
     	}
+        
     	return Policy.DEFAULT;
     }
-    
+
+    /**
+       Obtain the value of a CDM property.
+    */
     public static String property(String name) {
         String result = properties.get(name);
         if (result == null) 
             result = "UNSET";
         return result;
     }
+
+    /**
+       Add a file to the list of files to be broadcasted.
+    */
+    public static synchronized void addBroadcast(String srcdir, String srcfile) {
+        logger.debug("addBroadcast(): " + srcdir + " " + srcfile);
+        String path = srcdir+"/"+srcfile;
+        broadcastWork.add(path);
+    }
+
+    /**
+       Add a location to the list of allocations.
+       If the location is added twice, the second addition is considered to be an
+       empty allocation with no CDM state
+    */
+    public static synchronized void addAllocation(String allocation) {
+        logger.debug("addAllocation(): " + allocation);
+        allocations.add(allocation);
+        broadcasted.put(allocation, new HashSet<String>());
+        doBroadcast();
+    }
     
+    /**
+       Create a batch of broadcast work to do and send it to be performed.
+    */
+    public static synchronized void doBroadcast() {
+        logger.debug("doBroadcast: broadcasted: " + broadcasted);
+        // Map from locations to files
+        Map<String,List<String>> batch = getBroadcastBatch();
+        if (batch.size() == 0)
+            return;
+        logger.debug("doBroadcast(): batch: " + batch);
+        Broadcast.perform(batch);
+        markBroadcasts(batch);
+        logger.debug("marked: " + broadcasted);
+    }
+
+    /**
+       Obtain a map from allocations to files.
+       For each allocation, its corresponding files should be broadcasted to it.
+       Should only be called by {@link doBroadcast}.
+    */
+    private static Map<String,List<String>> getBroadcastBatch() {
+        logger.debug("getBroadcastBatch(): ");
+        Map<String,List<String>> batch = new LinkedHashMap<String,List<String>>();
+        for (String file : broadcastWork) {
+            logger.debug("file: " + file);
+            logger.debug("allocations: " + allocations);
+            for (String allocation : allocations) {
+                Set<String> files = broadcasted.get(allocation);
+                logger.debug("files: " + files);
+                if (! files.contains(file)) {
+                    logger.debug("adding: " + file + " to: " + allocation);
+                    List<String> work = batch.get(allocation);
+                    if (work == null) {
+                        work = new ArrayList<String>();
+                        batch.put(allocation, work);
+                    }
+                    work.add(file);
+                }
+            }
+        }
+        return batch;
+    }
+
+    /**
+       Mark that the files in the given batch have been sucessfully broadcasted.
+       Should only be called by {@link doBroadcast}.
+    */
+    private static void markBroadcasts(Map<String,List<String>> batch) {
+        logger.debug("markBroadcasts: batch: " + batch);
+        for (Map.Entry<String,List<String>> entry : batch.entrySet()) {
+            String location = entry.getKey();
+            logger.debug("markBroadcasts: location: " + location);
+            List<String> files = entry.getValue();
+            for (String file : files) {
+                Set<String> contents = broadcasted.get(location);
+                assert (! contents.contains(file));
+                logger.debug("markBroadcasts: add: " + file);
+                contents.add(file);
+            }                
+        }
+    }
+
+    /*
     public static boolean broadcasted(String file, String dir) {
         return broadcasted.contains(dir+"/"+file);
     }
+    */
     
-    public static void broadcast(String file, String dir) {
-        broadcasted.add(dir+"/"+file);
-    }
-    
     /** 
-     * Check the policy effect of name with respect to policy_file
-     * @param args {name, policy_file} 
-     */
+        Check the policy effect of name with respect to policy_file
+        @param args {name, policy_file} 
+    */
     public static void main(String[] args) {
         if (args.length != 2) {
-            System.out.println("Incorrect args");
+            logger.debug("Incorrect args");
             System.exit(1);
         }
             
@@ -131,12 +251,12 @@
             String name = args[0]; 
             File policyFile = new File(args[1]);
             if (! policyFile.exists()) {
-                System.out.println("Policy file does not exist: " + 
+                logger.debug("Policy file does not exist: " + 
                     args[1]);
             }
             load(policyFile);
             Policy policy = lookup(name);
-            System.out.println(name + ": " + policy);
+            logger.debug(name + ": " + policy);
         } catch (Exception e) { 
             e.printStackTrace();
             System.exit(2);

Modified: trunk/src/org/globus/swift/data/Query.java
===================================================================
--- trunk/src/org/globus/swift/data/Query.java	2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/Query.java	2010-04-11 21:50:14 UTC (rev 3281)
@@ -12,6 +12,9 @@
 
 import org.globus.swift.data.policy.Policy;
 
+/**
+   Karajan-accessible read-queries to CDM functionality.
+*/
 public class Query extends FunctionsCollection {
 
     public static final Arg PA_QUERY = new Arg.Positional("query");
@@ -23,21 +26,29 @@
         setArguments("cdm_file", new Arg[]{});
     }
 
+    /**
+       Do CDM policy lookup based on the CDM file.
+    */
     public String cdm_query(VariableStack stack) throws ExecutionException {
         String file = (String) PA_QUERY.getValue(stack);
         Policy policy = Director.lookup(file);
+        System.out.println("Director.lookup(): " + file + " -> " + policy);
         return policy.toString();
     }
 
     /** 
-     Get a CDM property
+        Get a CDM property
     */
     public String cdm_get(VariableStack stack) throws ExecutionException {
         String name  = (String) PA_NAME.getValue(stack);
         String value = Director.property(name);
         return value;
     }
-    
+
+    /**
+       Obtain the CDM policy file given on the command-line,
+       conventionally "fs.data".  If not set, returns an empty String.
+    */
     public String cdm_file(VariableStack stack) throws ExecutionException {
         String file = "";
         if (Director.policyFile != null)

Modified: trunk/src/org/globus/swift/data/policy/AllocationHook.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/AllocationHook.java	2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/policy/AllocationHook.java	2010-04-11 21:50:14 UTC (rev 3281)
@@ -5,10 +5,22 @@
 import org.globus.cog.abstraction.impl.common.StatusEvent;
 import org.globus.cog.abstraction.coaster.service.job.manager.Hook;
 
+import org.globus.swift.data.Director;
+
+/**
+ * Re-apply CDM policies when we obtain a new allocation from Coasters.
+ * */
 public class AllocationHook extends Hook
 {
     public void blockActive(StatusEvent e)
     {
+        if (!Director.isEnabled())
+            return;
+        
         System.out.println("blockActive: " + e.getStatus().getMessage());
+        String msg = e.getStatus().getMessage();
+        String[] tokens = msg.split("=");
+        String allocation = tokens[1];
+        Director.addAllocation(allocation);
     }
 }

Modified: trunk/src/org/globus/swift/data/policy/Broadcast.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Broadcast.java	2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/policy/Broadcast.java	2010-04-11 21:50:14 UTC (rev 3281)
@@ -1,6 +1,11 @@
 package org.globus.swift.data.policy;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.globus.swift.data.Director;
 
@@ -17,32 +22,66 @@
             throw new RuntimeException("Incorrect settings for BROADCAST");
         }
     }
-    
-    public void action(String srcfile, String srcdir) {
-        if (! Director.broadcasted(srcfile, srcdir))
-            callScript(srcfile, srcdir, destination);
-    }
-    
-    void callScript(String srcfile, String srcdir, String destination) { 
-        String home = System.getProperties().getProperty("swift.home");
+
+    /**
+       Call the external script to perform the broadcast for this batch.
+    */
+    public static void perform(Map<String,List<String>> batch) {
+        String[] line = commandLine(batch);
+        System.out.println("Broadcast.perform(): " + Arrays.toString(line));
+        Process process = null;
         try {
-            String[] line = new String[4];
-            line[0] = home+"/libexec/cdm_broadcast.sh";
-            line[1] = srcfile; 
-            line[2] = srcdir; 
-            line[3] = destination;
-            Process process = Runtime.getRuntime().exec(line);
+            process = Runtime.getRuntime().exec(line);
             process.waitFor();
         }
         catch (Exception e) {
             e.printStackTrace();
             throw new RuntimeException("Could not launch external broadcast");
         }
-    }   
-    
+        int code = process.exitValue();
+        if (code != 0)
+            throw new RuntimeException("External broadcast failed!");
+    }
+
+    /**
+       Generate the command line for the external broadcast script.
+    */
+    static String[] commandLine(Map<String,List<String>> batch) {
+        String home = System.getProperties().getProperty("swift.home");
+        List<String> line = new ArrayList<String>();
+        line.add(home+"/libexec/cdm_broadcast.sh");
+        for (Map.Entry<String,List<String>> entry : batch.entrySet()) {
+            line.add("-l");
+            String location = entry.getKey();
+            List<String> files = entry.getValue();
+            line.add(location);
+            for (String file : files) {
+                line.add(file);
+                line.add(getDestination(file)+"/"+file);
+            }
+        }
+        String[] result = new String[line.size()];
+        line.toArray(result);
+        return result;
+    }
+
+    /**
+       Return the remote destination directory for this policy.
+    */
     public String getDestination() {
         return destination;
     }
+
+    /**
+       Return the remote destination directory for this broadcasted file.
+    */
+    public static String getDestination(String file) {
+        String result = null;
+        Policy policy = Director.lookup(file);
+        Broadcast broadcast = (Broadcast) policy;
+        result = broadcast.getDestination();
+        return result;
+    }
     
     public String toString() {
         return "BROADCAST";




More information about the Swift-commit mailing list