[Swift-commit] r3737 - in trunk: libexec src/org/globus/swift/data src/org/globus/swift/data/policy src/org/griphyn/vdl/karajan src/org/griphyn/vdl/util

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Mon Dec 6 15:22:28 CST 2010


Author: wozniak
Date: 2010-12-06 15:22:28 -0600 (Mon, 06 Dec 2010)
New Revision: 3737

Modified:
   trunk/libexec/cdm_broadcast.sh
   trunk/src/org/globus/swift/data/Action.java
   trunk/src/org/globus/swift/data/Director.java
   trunk/src/org/globus/swift/data/policy/AllocationHook.java
   trunk/src/org/globus/swift/data/policy/Broadcast.java
   trunk/src/org/griphyn/vdl/karajan/Loader.java
   trunk/src/org/griphyn/vdl/util/VDL2Config.java
Log:
Cleanups and fixes to CDM BROADCAST


Modified: trunk/libexec/cdm_broadcast.sh
===================================================================
--- trunk/libexec/cdm_broadcast.sh	2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/libexec/cdm_broadcast.sh	2010-12-06 21:22:28 UTC (rev 3737)
@@ -1,34 +1,14 @@
 #!/bin/sh
 
+# Called by org.globus.swift.data.policy.Broadcast
+# usage: cdm_broadcast.sh <MODE> <LOG> <DIR> <FILE> <DEST>
+# copies DIR/FILE to DEST via MODE with logging to LOG
+
 SWIFT_HOME=$( dirname $( dirname $0 ) )
-LOG=${SWIFT_HOME}/etc/cdm_broadcast.log
 
-# For each given location, broadcast the given files to it
-# Input: bgp_broadcast [-l <location> <file>*]*
-bgp_broadcast()
-{
-  while [[ ${*} != "" ]]
-   do
-   L=$1 # -l
-   shift
-   ARGS=$1 # Location
-   shift
-   while true
-    do
-    if [[ $1 == "-l" || $1 == "" ]]
-      then
-      break
-    fi
-    ARGS="${ARGS} $1"
-    shift
-   done
-   bgp_broadcast_perform ${ARGS}
-  done
-}
-
 # Broadcast the given files to the given location
 # Input: bgp_broadcast_perform <location> <file>*
-bgp_broadcast_perform()
+bgp_broadcast()
 {
   LOCATION=$1
   shift
@@ -81,21 +61,48 @@
 
 local_broadcast()
 {
-  DIR=$1
+  ALLOCATION=$1 # Ignored (LOCAL_FILE)
   FILE=$2
   DEST=$3
   cp -v ${FILE} ${DEST}/${FILE}
 }
 
-set -x
+MODE=$1
+LOG=$2
+shift 2
+
+[[ ${LOG} != /dev/null ]] && set -x
 {
   declare -p PWD LOG
 
-  if [[ $( uname -p ) == "ppc64" ]]
+  if [[ ${MODE} == "f2cn" ]]
     then
-    bgp_broadcast ${*}
+    BROADCAST="bgp_broadcast"
+  elif [[ ${MODE} == "file" ]]
+    then
+    BROADCAST="local_broadcast"
   else
-    bgp_local ${*}
+    echo "Unknown broadcast mode!"
+    exit 1
   fi
 
-} >> /tmp/cdm_broadcast.log 2>&1 # ${LOG} 2>&1
+  while [[ ${*} != "" ]]
+  do
+    L=$1 # -l
+    shift
+    ARGS=$1 # Location
+    shift
+    while true
+    do
+      if [[ $1 == "-l" || $1 == "" ]]
+      then
+        break
+      fi
+      ARGS="${ARGS} $1"
+      shift
+    done
+    ${BROADCAST} ${ARGS}
+  done
+} >> ${LOG} 2>&1
+
+exit 0

Modified: trunk/src/org/globus/swift/data/Action.java
===================================================================
--- trunk/src/org/globus/swift/data/Action.java	2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/globus/swift/data/Action.java	2010-12-06 21:22:28 UTC (rev 3737)
@@ -1,7 +1,5 @@
 package org.globus.swift.data;
 
-import java.io.IOException;
-
 import org.apache.log4j.Logger;
 
 import org.globus.cog.karajan.arguments.Arg;

Modified: trunk/src/org/globus/swift/data/Director.java
===================================================================
--- trunk/src/org/globus/swift/data/Director.java	2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/globus/swift/data/Director.java	2010-12-06 21:22:28 UTC (rev 3737)
@@ -19,9 +19,11 @@
 import org.globus.swift.data.policy.Policy;
 import org.globus.swift.data.policy.Broadcast;
 import org.globus.swift.data.util.LineReader;
+import org.griphyn.vdl.util.VDL2Config;
 
 /**
  * Manages CDM policies for files based on pattern matching.
+ * Initialized when given a CDM file to read. 
  * @author wozniak
  * */
 
@@ -54,6 +56,7 @@
     /**
        Remember the files we have broadcasted.
        Map from allocations to filenames.
+       The keys represent the list of known allocations.
        NOTE: must be accessed only using synchronized Director methods
      */
     private static Map<String,Set<String>> broadcasted =
@@ -65,28 +68,46 @@
     */
     private static Set<String> broadcastWork = new LinkedHashSet<String>();
 
-    /**
-       Remember all known allocations
-    */
-    private static List<String> allocations = new ArrayList<String>();
-
     public static boolean isEnabled()
     {
         return enabled;
     }
 
+    /** 
+       How to broadcast: either "file" or "f2cn"
+     */
+    public static String broadcastMode = null;
+    
+    /** 
+       Convenience reference to the Swift logfile name
+     */
+    public static String logfile = null;
+    
     /**
        Read in the user-supplied CDM policy file.
     */
     public static void load(File file) throws IOException {
-        logger.info("CDM file: " + file);
-        enabled = true;
+        logger.debug("CDM file: " + file);
         Director.policyFile = file;
         List<String> list = LineReader.read(file);
         for (String s : list)
             addLine(s);
+        init();
     }
 
+    static void init() throws IOException
+    {
+        VDL2Config config = VDL2Config.getConfig();
+        
+        broadcastMode = config.getProperty("cdm.broadcast.mode");
+        logfile = config.getProperty("logfile");
+        
+        if (broadcastMode.equals("file")) 
+            broadcasted.put("LOCAL_FILE", new HashSet<String>());
+        
+        enabled = true;
+    }
+    
     /**
        A line is either a rule or a property.
     */
@@ -173,13 +194,12 @@
     }
 
     /**
-       Add a location to the list of allocations.
+       Add a Coasters allocation to the list of allocations.
        If the location is added twice, the second addition
        is considered to be an empty allocation with no CDM state.
     */
-    public static synchronized void addBlock(String blockId) {
-        logger.debug("addBlock(): " + blockId);
-        allocations.add(blockId);
+    public static synchronized void addAllocation(String blockId) {
+        logger.debug("addAllocation(): " + blockId);
         broadcasted.put(blockId, new HashSet<String>());
         doBroadcast();
     }
@@ -195,7 +215,7 @@
             return;
         logger.debug("doBroadcast(): batch: " + batch);
         Broadcast.perform(batch);
-        markBroadcasts(batch);
+        noteBroadcasts(batch);
         logger.debug("marked: " + broadcasted);
     }
 
@@ -207,15 +227,18 @@
     */
     private static Map<String,List<String>> getBroadcastBatch() {
         logger.debug("getBroadcastBatch(): ");
-        Map<String,List<String>> batch = new LinkedHashMap<String,List<String>>();
+        Map<String,List<String>> batch = 
+            new LinkedHashMap<String,List<String>>();
         for (String file : broadcastWork) {
             logger.debug("file: " + file);
-            logger.debug("allocations: " + allocations);
-            for (String allocation : allocations) {
-                Set<String> files = broadcasted.get(allocation);
+            for (Map.Entry<String,Set<String>> entry : 
+                broadcasted.entrySet()) {
+                String allocation = entry.getKey();
+                Set<String> files = entry.getValue();
                 logger.debug("files: " + files);
                 if (! files.contains(file)) {
-                    logger.debug("adding: " + file + " to: " + allocation);
+                    logger.debug("adding: " + file + 
+                                 " to: " + allocation);
                     List<String> work = batch.get(allocation);
                     if (work == null) {
                         work = new ArrayList<String>();
@@ -229,18 +252,21 @@
     }
 
     /**
-       Mark that the files in the given batch have been successfully broadcasted.
+       Note that the files in the given batch have been successfully 
+       broadcasted by adding them to Set broadcasted.
        Should only be called by {@link doBroadcast}.
     */
-    private static void markBroadcasts(Map<String,List<String>> batch) {
+    private static void noteBroadcasts(Map<String,
+                                       List<String>> batch) {
         logger.debug("markBroadcasts: batch: " + batch);
-        for (Map.Entry<String,List<String>> entry : batch.entrySet()) {
+        for (Map.Entry<String,List<String>> entry : 
+            batch.entrySet()) {
             String location = entry.getKey();
             logger.debug("markBroadcasts: location: " + location);
             List<String> files = entry.getValue();
             for (String file : files) {
                 Set<String> contents = broadcasted.get(location);
-                assert (! contents.contains(file));
+                assert(! contents.contains(file));
                 logger.debug("markBroadcasts: add: " + file);
                 contents.add(file);
             }

Modified: trunk/src/org/globus/swift/data/policy/AllocationHook.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/AllocationHook.java	2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/globus/swift/data/policy/AllocationHook.java	2010-12-06 21:22:28 UTC (rev 3737)
@@ -12,15 +12,12 @@
  * */
 public class AllocationHook extends Hook
 {
-    public void blockActive(StatusEvent e)
+    public void blockActive(StatusEvent e, String blockId)
     {
         if (!Director.isEnabled())
             return;
         
-        System.out.println("blockActive: " + e.getStatus().getMessage());
-        String msg = e.getStatus().getMessage();
-        String[] tokens = msg.split("=");
-        String allocation = tokens[1];
-        Director.addAllocation(allocation);
+        System.out.println("blockActive: " + blockId);
+        Director.addAllocation(blockId);
     }
 }

Modified: trunk/src/org/globus/swift/data/policy/Broadcast.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Broadcast.java	2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/globus/swift/data/policy/Broadcast.java	2010-12-06 21:22:28 UTC (rev 3737)
@@ -4,10 +4,14 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.log4j.Logger;
 import org.globus.swift.data.Director;
 
 public class Broadcast extends Policy {
     
+    static Logger logger = Logger.getLogger(Broadcast.class);
+    
     String destination = null; 
     
     @Override
@@ -25,7 +29,7 @@
     */
     public static void perform(Map<String,List<String>> batch) {
         String[] line = commandLine(batch);
-        System.out.println("Broadcast.perform(): " + Arrays.toString(line));
+        logger.debug("arguments: " + Arrays.toString(line));
         Process process = null;
         try {
             process = Runtime.getRuntime().exec(line);
@@ -33,7 +37,8 @@
         }
         catch (Exception e) {
             e.printStackTrace();
-            throw new RuntimeException("Could not launch external broadcast");
+            throw new RuntimeException
+            ("Could not launch external broadcast", e);
         }
         int code = process.exitValue();
         if (code != 0)
@@ -47,6 +52,11 @@
         String home = System.getProperties().getProperty("swift.home");
         List<String> line = new ArrayList<String>();
         line.add(home+"/libexec/cdm_broadcast.sh");
+        line.add(Director.broadcastMode);
+        if (logger.isDebugEnabled())
+            line.add(Director.logfile);
+        else 
+            line.add("/dev/null");
         for (Map.Entry<String,List<String>> entry : batch.entrySet()) {
             line.add("-l");
             String location = entry.getKey();
@@ -54,7 +64,7 @@
             line.add(location);
             for (String file : files) {
                 line.add(file);
-                line.add(getDestination(file)+"/"+file);
+                line.add(getDestination(file));
             }
         }
         String[] result = new String[line.size()];

Modified: trunk/src/org/griphyn/vdl/karajan/Loader.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/Loader.java	2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/griphyn/vdl/karajan/Loader.java	2010-12-06 21:22:28 UTC (rev 3737)
@@ -35,7 +35,6 @@
 import org.globus.cog.karajan.workflow.PrintStreamChannel;
 import org.globus.cog.karajan.workflow.nodes.FlowElement;
 import org.globus.cog.karajan.workflow.nodes.grid.AbstractGridNode;
-import org.globus.cog.karajan.workflow.service.channels.AbstractKarajanChannel;
 import org.globus.cog.util.ArgumentParser;
 import org.globus.cog.util.ArgumentParserException;
 import org.globus.swift.data.Director;
@@ -93,9 +92,6 @@
             if (ap.isPresent(ARG_MONITOR)) {
                 new Monitor().start();
             }
-            if (ap.isPresent(ARG_CDMFILE)) {
-                loadCDM(ap); 
-            }
             if (!ap.hasValue(ArgumentParser.DEFAULT)) {
                 error("No SwiftScript program specified");
             }
@@ -130,6 +126,10 @@
             setupLogging(ap, projectName, runID);
             logger.debug("Max heap: " + Runtime.getRuntime().maxMemory());
 
+            if (ap.isPresent(ARG_CDMFILE)) {
+                loadCDM(ap); 
+            }
+            
             if (!(new File(project).exists())) {
                 logger.error("Input file " + project + " does not exist.");
                 System.exit(4);
@@ -454,6 +454,9 @@
             logfile = projectName + "-" + runID + ".log";
         }
 
+        VDL2Config config = VDL2Config.getConfig();
+        config.put("logfile", logfile);
+        
         File f = new File(logfile);
 
         FileAppender fa = (FileAppender) getAppender(FileAppender.class);

Modified: trunk/src/org/griphyn/vdl/util/VDL2Config.java
===================================================================
--- trunk/src/org/griphyn/vdl/util/VDL2Config.java	2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/griphyn/vdl/util/VDL2Config.java	2010-12-06 21:22:28 UTC (rev 3737)
@@ -95,6 +95,8 @@
 		put("status.mode", "files");
 		put("wrapper.parameter.mode", "args");
 		put("wrapper.invocation.mode", "absolute");
+		
+		put("cdm.broadcast.mode", "file");
 	}
 
 	private VDL2Config(VDL2Config other) {




More information about the Swift-commit mailing list