[Swift-commit] r3737 - in trunk: libexec src/org/globus/swift/data src/org/globus/swift/data/policy src/org/griphyn/vdl/karajan src/org/griphyn/vdl/util
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Mon Dec 6 15:22:28 CST 2010
Author: wozniak
Date: 2010-12-06 15:22:28 -0600 (Mon, 06 Dec 2010)
New Revision: 3737
Modified:
trunk/libexec/cdm_broadcast.sh
trunk/src/org/globus/swift/data/Action.java
trunk/src/org/globus/swift/data/Director.java
trunk/src/org/globus/swift/data/policy/AllocationHook.java
trunk/src/org/globus/swift/data/policy/Broadcast.java
trunk/src/org/griphyn/vdl/karajan/Loader.java
trunk/src/org/griphyn/vdl/util/VDL2Config.java
Log:
Cleanups and fixes to CDM BROADCAST
Modified: trunk/libexec/cdm_broadcast.sh
===================================================================
--- trunk/libexec/cdm_broadcast.sh 2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/libexec/cdm_broadcast.sh 2010-12-06 21:22:28 UTC (rev 3737)
@@ -1,34 +1,14 @@
#!/bin/sh
+# Called by org.globus.swift.data.policy.Broadcast
+# usage: cdm_broadcast.sh <MODE> <LOG> <DIR> <FILE> <DEST>
+# copies DIR/FILE to DEST via MODE with logging to LOG
+
SWIFT_HOME=$( dirname $( dirname $0 ) )
-LOG=${SWIFT_HOME}/etc/cdm_broadcast.log
-# For each given location, broadcast the given files to it
-# Input: bgp_broadcast [-l <location> <file>*]*
-bgp_broadcast()
-{
- while [[ ${*} != "" ]]
- do
- L=$1 # -l
- shift
- ARGS=$1 # Location
- shift
- while true
- do
- if [[ $1 == "-l" || $1 == "" ]]
- then
- break
- fi
- ARGS="${ARGS} $1"
- shift
- done
- bgp_broadcast_perform ${ARGS}
- done
-}
-
# Broadcast the given files to the given location
# Input: bgp_broadcast_perform <location> <file>*
-bgp_broadcast_perform()
+bgp_broadcast()
{
LOCATION=$1
shift
@@ -81,21 +61,48 @@
local_broadcast()
{
- DIR=$1
+ ALLOCATION=$1 # Ignored (LOCAL_FILE)
FILE=$2
DEST=$3
cp -v ${FILE} ${DEST}/${FILE}
}
-set -x
+MODE=$1
+LOG=$2
+shift 2
+
+[[ ${LOG} != /dev/null ]] && set -x
{
declare -p PWD LOG
- if [[ $( uname -p ) == "ppc64" ]]
+ if [[ ${MODE} == "f2cn" ]]
then
- bgp_broadcast ${*}
+ BROADCAST="bgp_broadcast"
+ elif [[ ${MODE} == "file" ]]
+ then
+ BROADCAST="local_broadcast"
else
- bgp_local ${*}
+ echo "Unknown broadcast mode!"
+ exit 1
fi
-} >> /tmp/cdm_broadcast.log 2>&1 # ${LOG} 2>&1
+ while [[ ${*} != "" ]]
+ do
+ L=$1 # -l
+ shift
+ ARGS=$1 # Location
+ shift
+ while true
+ do
+ if [[ $1 == "-l" || $1 == "" ]]
+ then
+ break
+ fi
+ ARGS="${ARGS} $1"
+ shift
+ done
+ ${BROADCAST} ${ARGS}
+ done
+} >> ${LOG} 2>&1
+
+exit 0
Modified: trunk/src/org/globus/swift/data/Action.java
===================================================================
--- trunk/src/org/globus/swift/data/Action.java 2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/globus/swift/data/Action.java 2010-12-06 21:22:28 UTC (rev 3737)
@@ -1,7 +1,5 @@
package org.globus.swift.data;
-import java.io.IOException;
-
import org.apache.log4j.Logger;
import org.globus.cog.karajan.arguments.Arg;
Modified: trunk/src/org/globus/swift/data/Director.java
===================================================================
--- trunk/src/org/globus/swift/data/Director.java 2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/globus/swift/data/Director.java 2010-12-06 21:22:28 UTC (rev 3737)
@@ -19,9 +19,11 @@
import org.globus.swift.data.policy.Policy;
import org.globus.swift.data.policy.Broadcast;
import org.globus.swift.data.util.LineReader;
+import org.griphyn.vdl.util.VDL2Config;
/**
* Manages CDM policies for files based on pattern matching.
+ * Initialized when given a CDM file to read.
* @author wozniak
* */
@@ -54,6 +56,7 @@
/**
Remember the files we have broadcasted.
Map from allocations to filenames.
+ The keys represent the list of known allocations.
NOTE: must be accessed only using synchronized Director methods
*/
private static Map<String,Set<String>> broadcasted =
@@ -65,28 +68,46 @@
*/
private static Set<String> broadcastWork = new LinkedHashSet<String>();
- /**
- Remember all known allocations
- */
- private static List<String> allocations = new ArrayList<String>();
-
public static boolean isEnabled()
{
return enabled;
}
+ /**
+ How to broadcast: either "file" or "f2cn"
+ */
+ public static String broadcastMode = null;
+
+ /**
+ Convenience reference to the Swift logfile name
+ */
+ public static String logfile = null;
+
/**
Read in the user-supplied CDM policy file.
*/
public static void load(File file) throws IOException {
- logger.info("CDM file: " + file);
- enabled = true;
+ logger.debug("CDM file: " + file);
Director.policyFile = file;
List<String> list = LineReader.read(file);
for (String s : list)
addLine(s);
+ init();
}
+ static void init() throws IOException
+ {
+ VDL2Config config = VDL2Config.getConfig();
+
+ broadcastMode = config.getProperty("cdm.broadcast.mode");
+ logfile = config.getProperty("logfile");
+
+ if (broadcastMode.equals("file"))
+ broadcasted.put("LOCAL_FILE", new HashSet<String>());
+
+ enabled = true;
+ }
+
/**
A line is either a rule or a property.
*/
@@ -173,13 +194,12 @@
}
/**
- Add a location to the list of allocations.
+ Add a Coasters allocation to the list of allocations.
If the location is added twice, the second addition
is considered to be an empty allocation with no CDM state.
*/
- public static synchronized void addBlock(String blockId) {
- logger.debug("addBlock(): " + blockId);
- allocations.add(blockId);
+ public static synchronized void addAllocation(String blockId) {
+ logger.debug("addAllocation(): " + blockId);
broadcasted.put(blockId, new HashSet<String>());
doBroadcast();
}
@@ -195,7 +215,7 @@
return;
logger.debug("doBroadcast(): batch: " + batch);
Broadcast.perform(batch);
- markBroadcasts(batch);
+ noteBroadcasts(batch);
logger.debug("marked: " + broadcasted);
}
@@ -207,15 +227,18 @@
*/
private static Map<String,List<String>> getBroadcastBatch() {
logger.debug("getBroadcastBatch(): ");
- Map<String,List<String>> batch = new LinkedHashMap<String,List<String>>();
+ Map<String,List<String>> batch =
+ new LinkedHashMap<String,List<String>>();
for (String file : broadcastWork) {
logger.debug("file: " + file);
- logger.debug("allocations: " + allocations);
- for (String allocation : allocations) {
- Set<String> files = broadcasted.get(allocation);
+ for (Map.Entry<String,Set<String>> entry :
+ broadcasted.entrySet()) {
+ String allocation = entry.getKey();
+ Set<String> files = entry.getValue();
logger.debug("files: " + files);
if (! files.contains(file)) {
- logger.debug("adding: " + file + " to: " + allocation);
+ logger.debug("adding: " + file +
+ " to: " + allocation);
List<String> work = batch.get(allocation);
if (work == null) {
work = new ArrayList<String>();
@@ -229,18 +252,21 @@
}
/**
- Mark that the files in the given batch have been successfully broadcasted.
+ Note that the files in the given batch have been successfully
+ broadcasted by adding them to Set broadcasted.
Should only be called by {@link doBroadcast}.
*/
- private static void markBroadcasts(Map<String,List<String>> batch) {
+ private static void noteBroadcasts(Map<String,
+ List<String>> batch) {
logger.debug("markBroadcasts: batch: " + batch);
- for (Map.Entry<String,List<String>> entry : batch.entrySet()) {
+ for (Map.Entry<String,List<String>> entry :
+ batch.entrySet()) {
String location = entry.getKey();
logger.debug("markBroadcasts: location: " + location);
List<String> files = entry.getValue();
for (String file : files) {
Set<String> contents = broadcasted.get(location);
- assert (! contents.contains(file));
+ assert(! contents.contains(file));
logger.debug("markBroadcasts: add: " + file);
contents.add(file);
}
Modified: trunk/src/org/globus/swift/data/policy/AllocationHook.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/AllocationHook.java 2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/globus/swift/data/policy/AllocationHook.java 2010-12-06 21:22:28 UTC (rev 3737)
@@ -12,15 +12,12 @@
* */
public class AllocationHook extends Hook
{
- public void blockActive(StatusEvent e)
+ public void blockActive(StatusEvent e, String blockId)
{
if (!Director.isEnabled())
return;
- System.out.println("blockActive: " + e.getStatus().getMessage());
- String msg = e.getStatus().getMessage();
- String[] tokens = msg.split("=");
- String allocation = tokens[1];
- Director.addAllocation(allocation);
+ System.out.println("blockActive: " + blockId);
+ Director.addAllocation(blockId);
}
}
Modified: trunk/src/org/globus/swift/data/policy/Broadcast.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Broadcast.java 2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/globus/swift/data/policy/Broadcast.java 2010-12-06 21:22:28 UTC (rev 3737)
@@ -4,10 +4,14 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+
+import org.apache.log4j.Logger;
import org.globus.swift.data.Director;
public class Broadcast extends Policy {
+ static Logger logger = Logger.getLogger(Broadcast.class);
+
String destination = null;
@Override
@@ -25,7 +29,7 @@
*/
public static void perform(Map<String,List<String>> batch) {
String[] line = commandLine(batch);
- System.out.println("Broadcast.perform(): " + Arrays.toString(line));
+ logger.debug("arguments: " + Arrays.toString(line));
Process process = null;
try {
process = Runtime.getRuntime().exec(line);
@@ -33,7 +37,8 @@
}
catch (Exception e) {
e.printStackTrace();
- throw new RuntimeException("Could not launch external broadcast");
+ throw new RuntimeException
+ ("Could not launch external broadcast", e);
}
int code = process.exitValue();
if (code != 0)
@@ -47,6 +52,11 @@
String home = System.getProperties().getProperty("swift.home");
List<String> line = new ArrayList<String>();
line.add(home+"/libexec/cdm_broadcast.sh");
+ line.add(Director.broadcastMode);
+ if (logger.isDebugEnabled())
+ line.add(Director.logfile);
+ else
+ line.add("/dev/null");
for (Map.Entry<String,List<String>> entry : batch.entrySet()) {
line.add("-l");
String location = entry.getKey();
@@ -54,7 +64,7 @@
line.add(location);
for (String file : files) {
line.add(file);
- line.add(getDestination(file)+"/"+file);
+ line.add(getDestination(file));
}
}
String[] result = new String[line.size()];
Modified: trunk/src/org/griphyn/vdl/karajan/Loader.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/Loader.java 2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/griphyn/vdl/karajan/Loader.java 2010-12-06 21:22:28 UTC (rev 3737)
@@ -35,7 +35,6 @@
import org.globus.cog.karajan.workflow.PrintStreamChannel;
import org.globus.cog.karajan.workflow.nodes.FlowElement;
import org.globus.cog.karajan.workflow.nodes.grid.AbstractGridNode;
-import org.globus.cog.karajan.workflow.service.channels.AbstractKarajanChannel;
import org.globus.cog.util.ArgumentParser;
import org.globus.cog.util.ArgumentParserException;
import org.globus.swift.data.Director;
@@ -93,9 +92,6 @@
if (ap.isPresent(ARG_MONITOR)) {
new Monitor().start();
}
- if (ap.isPresent(ARG_CDMFILE)) {
- loadCDM(ap);
- }
if (!ap.hasValue(ArgumentParser.DEFAULT)) {
error("No SwiftScript program specified");
}
@@ -130,6 +126,10 @@
setupLogging(ap, projectName, runID);
logger.debug("Max heap: " + Runtime.getRuntime().maxMemory());
+ if (ap.isPresent(ARG_CDMFILE)) {
+ loadCDM(ap);
+ }
+
if (!(new File(project).exists())) {
logger.error("Input file " + project + " does not exist.");
System.exit(4);
@@ -454,6 +454,9 @@
logfile = projectName + "-" + runID + ".log";
}
+ VDL2Config config = VDL2Config.getConfig();
+ config.put("logfile", logfile);
+
File f = new File(logfile);
FileAppender fa = (FileAppender) getAppender(FileAppender.class);
Modified: trunk/src/org/griphyn/vdl/util/VDL2Config.java
===================================================================
--- trunk/src/org/griphyn/vdl/util/VDL2Config.java 2010-12-06 19:36:57 UTC (rev 3736)
+++ trunk/src/org/griphyn/vdl/util/VDL2Config.java 2010-12-06 21:22:28 UTC (rev 3737)
@@ -95,6 +95,8 @@
put("status.mode", "files");
put("wrapper.parameter.mode", "args");
put("wrapper.invocation.mode", "absolute");
+
+ put("cdm.broadcast.mode", "file");
}
private VDL2Config(VDL2Config other) {
More information about the Swift-commit
mailing list