[Swift-commit] r3281 - in trunk: libexec src/org/globus/swift/data src/org/globus/swift/data/policy
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Sun Apr 11 16:50:15 CDT 2010
Author: wozniak
Date: 2010-04-11 16:50:14 -0500 (Sun, 11 Apr 2010)
New Revision: 3281
Modified:
trunk/libexec/cdm_broadcast.sh
trunk/libexec/vdl-int.k
trunk/libexec/vdl-lib.xml
trunk/src/org/globus/swift/data/Action.java
trunk/src/org/globus/swift/data/Director.java
trunk/src/org/globus/swift/data/Query.java
trunk/src/org/globus/swift/data/policy/AllocationHook.java
trunk/src/org/globus/swift/data/policy/Broadcast.java
Log:
Connect CDM BROADCAST functionality to Coasters notification.
Modified: trunk/libexec/cdm_broadcast.sh
===================================================================
--- trunk/libexec/cdm_broadcast.sh 2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/libexec/cdm_broadcast.sh 2010-04-11 21:50:14 UTC (rev 3281)
@@ -3,29 +3,80 @@
SWIFT_HOME=$( dirname $( dirname $0 ) )
LOG=${SWIFT_HOME}/etc/cdm_broadcast.log
+# For each given location, broadcast the given files to it
+# Input: bgp_broadcast [-l <location> <file>*]*
bgp_broadcast()
{
- DIR=$1
- FILE=$2
- DEST=$3
- if [[ ! -f ips.list ]]
+ while [[ ${*} != "" ]]
+ do
+ L=$1 # -l
+ shift
+ ARGS=$1 # Location
+ shift
+ while true
+ do
+ if [[ $1 == "-l" || $1 == "" ]]
+ then
+ break
+ fi
+ ARGS="${ARGS} $1"
+ shift
+ done
+ bgp_broadcast_perform ${ARGS}
+ done
+}
+
+# Broadcast the given files to the given location
+# Input: bgp_broadcast_perform <location> <file>*
+bgp_broadcast_perform()
+{
+ LOCATION=$1
+ shift
+ WORK=( ${*} )
+
+ IP=$( listip ${LOCATION} )
+
+ if [[ ${#WORK[@]} > 3 ]]
then
- BLOCKS=$( qstat -u ${USER} | grep ${USER} | awk '{ print $6 }' )
- IPS=$( listip ${BLOCKS} )
- for IP in ${IPS}
+ SCRIPT=$( mktemp )
+ {
+ echo "#!/bin/sh"
+ while [[ ${*} != "" ]]
+ do
+ FILE=$1
+ DEST=$2
+ shift 2
+ echo "/bin.rd/f2cn ${FILE} ${DEST}"
+ done
+ } > ${SCRIPT}
+ scp ${SCRIPT} ${IP}:${SCRIPT}
+ ssh ${IP} ${SCRIPT}
+ else
+ while [[ ${*} != "" ]]
do
- echo ${IP}
- done >> ip.list
- else
- while read T
- do
- BLOCKS="$BLOCKS $T"
- done < ip.list
+ FILE=$1
+ DEST=$2
+ shift 2
+ ssh_until_success 120 ${IP} /bin.rd/f2cn ${FILE} ${DEST}
+ done
fi
- for IP in ${BLOCKS}
+}
+
+# Repeat command N times until success
+ssh_until_success()
+{
+ N=$1
+ shift
+ for (( i=0 ; i < N ; i++ ))
do
- ssh ${IP} /bin.rd/f2cn ${DIR}/${FILE} ${DEST}/${FILE}
+ ssh -o PasswordAuthentication=no ${*}
+ if [[ $? == 0 ]];
+ then
+ break
+ fi
+ sleep 1
done
+ return 0
}
local_broadcast()
@@ -36,19 +87,15 @@
cp -v ${FILE} ${DEST}/${FILE}
}
+set -x
{
- declare -p PWD
- set -x
-
- FILE=$1
- DIR=$2
- DEST=$3
-
- if [[ $( uname -p ) == "ppc64" ]]
- then
- bgp_broadcast ${DIR} ${FILE} ${DEST}
- else
- bgp_local ${DIR} ${FILE} ${DEST}
- fi
-
-} >> ${LOG} 2>&1
+ declare -p PWD LOG
+
+ if [[ $( uname -p ) == "ppc64" ]]
+ then
+ bgp_broadcast ${*}
+ else
+ bgp_local ${*}
+ fi
+
+} >> /tmp/cdm_broadcast.log 2>&1 # ${LOG} 2>&1
Modified: trunk/libexec/vdl-int.k
===================================================================
--- trunk/libexec/vdl-int.k 2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/libexec/vdl-int.k 2010-04-11 21:50:14 UTC (rev 3281)
@@ -267,8 +267,6 @@
log(LOG:INFO, "START jobid={jobid} - Staging in files")
cdmfile := cdm:file()
- log(LOG:DEBUG, "cdmfile: {cdmfile}")
- log(LOG:DEBUG, "swift.home: {swift.home}")
libexec := "{swift.home}/libexec"
if (cdmfile != "" then(
@@ -292,7 +290,7 @@
size := file:size("{srcdir}/{filename}", host=srchost, provider=provider)
policy := cdm:query(query=file)
- log(LOG:DEBUG, "policy: {file} : {policy}")
+ log(LOG:DEBUG, "CDM: {file} : {policy}")
doStageinFile(provider=provider, srchost=srchost, srcfile=filename,
srcdir=srcdir, desthost=host, destdir=destdir, size=size, policy=policy)
@@ -304,8 +302,10 @@
vdl:cacheAddAndLockFile(srcfile, destdir, desthost, size
cleanupFiles(cacheFilesToRemove, desthost)
- log(LOG:DEBUG, "FILE_STAGE_IN_START file={srcfile} srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
- "desthost={desthost} destdir={destdir} provider={provider}")
+ log(LOG:DEBUG, "FILE_STAGE_IN_START file={srcfile} ",
+ "srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
+ "desthost={desthost} destdir={destdir} provider={provider} ",
+ "policy={policy}")
if (policy == "DEFAULT" then(
restartOnError(".*", 2
task:transfer(srcprovider=provider, srchost=srchost, srcfile=srcfile,
@@ -314,9 +314,12 @@
log(LOG:DEBUG, "FILE_STAGE_IN_BROADCAST file={srcfile} policy={policy}")
cdm:broadcast(srcfile=srcfile, srcdir=srcdir))
else(log(LOG:DEBUG, "FILE_STAGE_IN_SKIP file={srcfile} policy={policy}")))
- log(LOG:DEBUG, "FILE_STAGE_IN_END file={srcfile} srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
- "desthost={desthost} destdir={destdir} provider={provider}")
- )
+ log(LOG:DEBUG, "FILE_STAGE_IN_END file={srcfile} ",
+ "srchost={srchost} srcdir={srcdir} srcname={srcfile} ",
+ "desthost={desthost} destdir={destdir} provider={provider}")
+ )
+ cdm:wait()
+ echo("doStageinFile: complete: {srcfile}")
)
element(doStageout, [jobid, stageouts, dir, host]
Modified: trunk/libexec/vdl-lib.xml
===================================================================
--- trunk/libexec/vdl-lib.xml 2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/libexec/vdl-lib.xml 2010-04-11 21:50:14 UTC (rev 3281)
@@ -106,6 +106,7 @@
<export name="get"><elementDef classname="org.globus.swift.data.Query"/></export>
<export name="file"><elementDef classname="org.globus.swift.data.Query"/></export>
<export name="broadcast"><elementDef classname="org.globus.swift.data.Action"/></export>
+ <export name="wait"><elementDef classname="org.globus.swift.data.Action"/></export>
</namespace>
</karajan>
Modified: trunk/src/org/globus/swift/data/Action.java
===================================================================
--- trunk/src/org/globus/swift/data/Action.java 2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/Action.java 2010-04-11 21:50:14 UTC (rev 3281)
@@ -9,6 +9,9 @@
import org.globus.swift.data.policy.Broadcast;
import org.globus.swift.data.policy.Policy;
+/**
+ * Karajan-accessible CDM functions that change something.
+ * */
public class Action extends FunctionsCollection {
public static final Arg PA_FILE = new Arg.Positional("srcfile");
@@ -16,23 +19,39 @@
static {
setArguments("cdm_broadcast", new Arg[]{ PA_FILE, PA_DIR });
+ setArguments("cdm_wait", new Arg[]{});
}
+ /**
+ Register a file for broadcast by CDM.
+ The actual broadcast is triggered by {@link cdm_wait}.
+ */
public void cdm_broadcast(VariableStack stack) throws ExecutionException {
String srcfile = (String) PA_FILE.getValue(stack);
String srcdir = (String) PA_DIR.getValue(stack);
+
+ System.out.println("cdm_broadcast()");
Policy policy = Director.lookup(srcfile);
if (!(policy instanceof Broadcast)) {
- throw new RuntimeException("Attempting to BROADCAST the wrong file");
+ throw new RuntimeException
+ ("Attempting to BROADCAST the wrong file: directory: `" +
+ srcdir + "' `" + srcfile + "' -> " + policy);
}
if (srcdir == "") {
srcdir = ".";
}
-
- Broadcast broadcast = (Broadcast) policy;
- broadcast.action(srcfile, srcdir);
+
+ Director.addBroadcast(srcdir, srcfile);
}
+
+ /**
+ Wait until CDM has ensured that all data has been propagated.
+ */
+ public void cdm_wait(VariableStack stack) throws ExecutionException {
+ System.out.println("cdm_wait()");
+ Director.doBroadcast();
+ }
}
Modified: trunk/src/org/globus/swift/data/Director.java
===================================================================
--- trunk/src/org/globus/swift/data/Director.java 2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/Director.java 2010-04-11 21:50:14 UTC (rev 3281)
@@ -3,19 +3,23 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.log4j.Logger;
import org.globus.swift.data.policy.Policy;
+import org.globus.swift.data.policy.Broadcast;
import org.globus.swift.data.util.LineReader;
import org.griphyn.vdl.karajan.Loader;
@@ -28,6 +32,11 @@
private static final Logger logger = Logger.getLogger(Director.class);
/**
+ Has a CDM policy file been provided?
+ */
+ static boolean enabled = false;
+
+ /**
Save the location of the given CDM policy file
*/
static File policyFile;
@@ -43,12 +52,35 @@
static Map<String,String> properties = new HashMap();
/**
- Remember the files we have broadcasted
+ Remember the files we have broadcasted.
+ Map from allocations to filenames.
+ NOTE: must be accessed only using synchronized Director methods
*/
- static Set<String> broadcasted = new HashSet<String>();
+ private static Map<String,Set<String>> broadcasted =
+ new LinkedHashMap<String,Set<String>>();
+
+ /**
+ Set of files to be broadcasted.
+ NOTE: must be accessed only using synchronized Director methods
+ */
+ private static Set<String> broadcastWork = new LinkedHashSet<String>();
+ /**
+ Remember all known allocations
+ */
+ private static List<String> allocations = new ArrayList<String>();
+
+ public static boolean isEnabled()
+ {
+ return enabled;
+ }
+
+ /**
+ Read in the user-supplied CDM policy file.
+ */
public static void load(File policyFile) throws IOException {
- logger.info("loading: " + policyFile);
+ logger.info("CDM file: " + policyFile);
+ enabled = true;
Director.policyFile = policyFile;
LineReader lines = new LineReader();
List list = lines.read(policyFile);
@@ -67,7 +99,6 @@
else if (type.equals("property")) {
addProperty(tokens);
}
-
}
static void addRule(String[] tokens) {
@@ -91,38 +122,127 @@
}
return result.toString();
}
-
+
+ /**
+ Obtain the CDM policy for a given file.
+ */
public static Policy lookup(String file) {
+ logger.debug("Director.lookup(): map: " + map);
for (Pattern pattern : map.keySet()) {
- Matcher matcher = pattern.matcher(file);
- if (matcher.matches())
- return map.get(pattern);
+ Matcher matcher = pattern.matcher(file);
+ if (matcher.matches())
+ return map.get(pattern);
}
+
return Policy.DEFAULT;
}
-
+
+ /**
+ Obtain the value of a CDM property.
+ */
public static String property(String name) {
String result = properties.get(name);
if (result == null)
result = "UNSET";
return result;
}
+
+ /**
+ Add a file to the list of files to be broadcasted.
+ */
+ public static synchronized void addBroadcast(String srcdir, String srcfile) {
+ logger.debug("addBroadcast(): " + srcdir + " " + srcfile);
+ String path = srcdir+"/"+srcfile;
+ broadcastWork.add(path);
+ }
+
+ /**
+ Add a location to the list of allocations.
+ If the location is added twice, the second addition is considered to be an
+ empty allocation with no CDM state
+ */
+ public static synchronized void addAllocation(String allocation) {
+ logger.debug("addAllocation(): " + allocation);
+ allocations.add(allocation);
+ broadcasted.put(allocation, new HashSet<String>());
+ doBroadcast();
+ }
+ /**
+ Create a batch of broadcast work to do and send it to be performed.
+ */
+ public static synchronized void doBroadcast() {
+ logger.debug("doBroadcast: broadcasted: " + broadcasted);
+ // Map from locations to files
+ Map<String,List<String>> batch = getBroadcastBatch();
+ if (batch.size() == 0)
+ return;
+ logger.debug("doBroadcast(): batch: " + batch);
+ Broadcast.perform(batch);
+ markBroadcasts(batch);
+ logger.debug("marked: " + broadcasted);
+ }
+
+ /**
+ Obtain a map from allocations to files.
+ For each allocation, its corresponding files should be broadcasted to it.
+ Should only be called by {@link doBroadcast}.
+ */
+ private static Map<String,List<String>> getBroadcastBatch() {
+ logger.debug("getBroadcastBatch(): ");
+ Map<String,List<String>> batch = new LinkedHashMap<String,List<String>>();
+ for (String file : broadcastWork) {
+ logger.debug("file: " + file);
+ logger.debug("allocations: " + allocations);
+ for (String allocation : allocations) {
+ Set<String> files = broadcasted.get(allocation);
+ logger.debug("files: " + files);
+ if (! files.contains(file)) {
+ logger.debug("adding: " + file + " to: " + allocation);
+ List<String> work = batch.get(allocation);
+ if (work == null) {
+ work = new ArrayList<String>();
+ batch.put(allocation, work);
+ }
+ work.add(file);
+ }
+ }
+ }
+ return batch;
+ }
+
+ /**
+ Mark that the files in the given batch have been sucessfully broadcasted.
+ Should only be called by {@link doBroadcast}.
+ */
+ private static void markBroadcasts(Map<String,List<String>> batch) {
+ logger.debug("markBroadcasts: batch: " + batch);
+ for (Map.Entry<String,List<String>> entry : batch.entrySet()) {
+ String location = entry.getKey();
+ logger.debug("markBroadcasts: location: " + location);
+ List<String> files = entry.getValue();
+ for (String file : files) {
+ Set<String> contents = broadcasted.get(location);
+ assert (! contents.contains(file));
+ logger.debug("markBroadcasts: add: " + file);
+ contents.add(file);
+ }
+ }
+ }
+
+ /*
public static boolean broadcasted(String file, String dir) {
return broadcasted.contains(dir+"/"+file);
}
+ */
- public static void broadcast(String file, String dir) {
- broadcasted.add(dir+"/"+file);
- }
-
/**
- * Check the policy effect of name with respect to policy_file
- * @param args {name, policy_file}
- */
+ Check the policy effect of name with respect to policy_file
+ @param args {name, policy_file}
+ */
public static void main(String[] args) {
if (args.length != 2) {
- System.out.println("Incorrect args");
+ logger.debug("Incorrect args");
System.exit(1);
}
@@ -131,12 +251,12 @@
String name = args[0];
File policyFile = new File(args[1]);
if (! policyFile.exists()) {
- System.out.println("Policy file does not exist: " +
+ logger.debug("Policy file does not exist: " +
args[1]);
}
load(policyFile);
Policy policy = lookup(name);
- System.out.println(name + ": " + policy);
+ logger.debug(name + ": " + policy);
} catch (Exception e) {
e.printStackTrace();
System.exit(2);
Modified: trunk/src/org/globus/swift/data/Query.java
===================================================================
--- trunk/src/org/globus/swift/data/Query.java 2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/Query.java 2010-04-11 21:50:14 UTC (rev 3281)
@@ -12,6 +12,9 @@
import org.globus.swift.data.policy.Policy;
+/**
+ Karajan-accessible read-queries to CDM functionality.
+*/
public class Query extends FunctionsCollection {
public static final Arg PA_QUERY = new Arg.Positional("query");
@@ -23,21 +26,29 @@
setArguments("cdm_file", new Arg[]{});
}
+ /**
+ Do CDM policy lookup based on the CDM file.
+ */
public String cdm_query(VariableStack stack) throws ExecutionException {
String file = (String) PA_QUERY.getValue(stack);
Policy policy = Director.lookup(file);
+ System.out.println("Director.lookup(): " + file + " -> " + policy);
return policy.toString();
}
/**
- Get a CDM property
+ Get a CDM property
*/
public String cdm_get(VariableStack stack) throws ExecutionException {
String name = (String) PA_NAME.getValue(stack);
String value = Director.property(name);
return value;
}
-
+
+ /**
+ Obtain the CDM policy file given on the command-line,
+ conventionally "fs.data". If not set, returns an empty String.
+ */
public String cdm_file(VariableStack stack) throws ExecutionException {
String file = "";
if (Director.policyFile != null)
Modified: trunk/src/org/globus/swift/data/policy/AllocationHook.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/AllocationHook.java 2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/policy/AllocationHook.java 2010-04-11 21:50:14 UTC (rev 3281)
@@ -5,10 +5,22 @@
import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.coaster.service.job.manager.Hook;
+import org.globus.swift.data.Director;
+
+/**
+ * Re-apply CDM policies when we obtain a new allocation from Coasters.
+ * */
public class AllocationHook extends Hook
{
public void blockActive(StatusEvent e)
{
+ if (!Director.isEnabled())
+ return;
+
System.out.println("blockActive: " + e.getStatus().getMessage());
+ String msg = e.getStatus().getMessage();
+ String[] tokens = msg.split("=");
+ String allocation = tokens[1];
+ Director.addAllocation(allocation);
}
}
Modified: trunk/src/org/globus/swift/data/policy/Broadcast.java
===================================================================
--- trunk/src/org/globus/swift/data/policy/Broadcast.java 2010-04-11 14:38:29 UTC (rev 3280)
+++ trunk/src/org/globus/swift/data/policy/Broadcast.java 2010-04-11 21:50:14 UTC (rev 3281)
@@ -1,6 +1,11 @@
package org.globus.swift.data.policy;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.Map;
+import java.util.Map.Entry;
import org.globus.swift.data.Director;
@@ -17,32 +22,66 @@
throw new RuntimeException("Incorrect settings for BROADCAST");
}
}
-
- public void action(String srcfile, String srcdir) {
- if (! Director.broadcasted(srcfile, srcdir))
- callScript(srcfile, srcdir, destination);
- }
-
- void callScript(String srcfile, String srcdir, String destination) {
- String home = System.getProperties().getProperty("swift.home");
+
+ /**
+ Call the external script to perform the broadcast for this batch.
+ */
+ public static void perform(Map<String,List<String>> batch) {
+ String[] line = commandLine(batch);
+ System.out.println("Broadcast.perform(): " + Arrays.toString(line));
+ Process process = null;
try {
- String[] line = new String[4];
- line[0] = home+"/libexec/cdm_broadcast.sh";
- line[1] = srcfile;
- line[2] = srcdir;
- line[3] = destination;
- Process process = Runtime.getRuntime().exec(line);
+ process = Runtime.getRuntime().exec(line);
process.waitFor();
}
catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Could not launch external broadcast");
}
- }
-
+ int code = process.exitValue();
+ if (code != 0)
+ throw new RuntimeException("External broadcast failed!");
+ }
+
+ /**
+ Generate the command line for the external broadcast script.
+ */
+ static String[] commandLine(Map<String,List<String>> batch) {
+ String home = System.getProperties().getProperty("swift.home");
+ List<String> line = new ArrayList<String>();
+ line.add(home+"/libexec/cdm_broadcast.sh");
+ for (Map.Entry<String,List<String>> entry : batch.entrySet()) {
+ line.add("-l");
+ String location = entry.getKey();
+ List<String> files = entry.getValue();
+ line.add(location);
+ for (String file : files) {
+ line.add(file);
+ line.add(getDestination(file)+"/"+file);
+ }
+ }
+ String[] result = new String[line.size()];
+ line.toArray(result);
+ return result;
+ }
+
+ /**
+ Return the remote destination directory for this policy.
+ */
public String getDestination() {
return destination;
}
+
+ /**
+ Return the remote destination directory for this broadcasted file.
+ */
+ public static String getDestination(String file) {
+ String result = null;
+ Policy policy = Director.lookup(file);
+ Broadcast broadcast = (Broadcast) policy;
+ result = broadcast.getDestination();
+ return result;
+ }
public String toString() {
return "BROADCAST";
More information about the Swift-commit
mailing list