[Swift-commit] cog r3853
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Sat Dec 28 02:00:05 CST 2013
------------------------------------------------------------------------
r3853 | hategan | 2013-12-28 01:56:10 -0600 (Sat, 28 Dec 2013) | 1 line
make the soft image use normal staging mechanisms (unless sfs:// is used)
------------------------------------------------------------------------
Index: modules/provider-coaster/README.softimage
===================================================================
--- modules/provider-coaster/README.softimage (revision 0)
+++ modules/provider-coaster/README.softimage (revision 3853)
@@ -0,0 +1,50 @@
+This is the high-level overview/design of the locking used for
+managing soft images. Here are some of the issues:
+
+ - Waiting or time-consuming operations can only happen in runjob(),
+ since that runs in a sub-process
+ - Clean presents a problem this way. However if another process starts
+ unpacking a new image between the moment the decision to clean was made
+ and when things are actually cleaned, it can lead to undesired results
+ - All things not happening inside runjob() are locally thread-safe.
+ - If anybody can do it in an easier way, please do
+
+
+first = true
+lead = false
+
+submitjob() {
+ if (first) {
+ with lockw(main) {
+ if (trylockw(use)) {
+ unlockw(use)
+ lead = true
+ lockw(create)
+ }
+ lockr(use)
+ }
+ first = false
+ }
+}
+
+runjob() {
+ if (lead) {
+ init()
+ unlockw(create)
+ }
+
+ lockr(create)
+ runjob()
+ unlockr(create)
+}
+
+exit() {
+ with lockw(main) {
+ unlockr(use)
+ if (trylockw(use)) {
+ doClean = 1
+ clean()
+ unlockw(use)
+ }
+ }
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java (revision 3852)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java (working copy)
@@ -103,7 +103,16 @@
}
while ((s = helper.read("attr")) != null) {
- spec.setAttribute(getKey(s), getValue(s));
+ String key = getKey(s);
+ String value = getValue(s);
+
+ if ("softimage".equals(key)) {
+ String[] sd = value.split("\\s+");
+ spec.setAttribute(key, makeAbsolute(sd[0]) + " " + sd[1]);
+ }
+ else {
+ spec.setAttribute(key, value);
+ }
}
StagingSet ss = null;
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java (revision 3852)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java (working copy)
@@ -10,7 +10,6 @@
package org.globus.cog.abstraction.impl.execution.coaster;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
@@ -26,7 +25,6 @@
import org.globus.cog.abstraction.interfaces.ExecutionService;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
-import org.globus.cog.abstraction.interfaces.RemoteFile;
import org.globus.cog.abstraction.interfaces.Service;
import org.globus.cog.abstraction.interfaces.StagingSetEntry;
import org.globus.cog.abstraction.interfaces.StagingSetEntry.Mode;
@@ -51,7 +49,7 @@
}
public static final Set<String> ABSOLUTIZE = new HashSet<String>() {
- {add("sfs");}
+ {}
};
private Task t;
@@ -113,31 +111,33 @@
if (simple) {
add(sb, "attr", "maxwalltime=" + formatWalltime(spec.getAttribute("maxwalltime")));
- if (spec.getAttribute("tracePerformance") != null) {
- add(sb, "attr", "tracePerformance=" + spec.getAttribute("tracePerformance"));
+ if (spec.getAttribute("traceperformance") != null) {
+ add(sb, "attr", "traceperformance=" + spec.getAttribute("traceperformance"));
}
- if (spec.getAttribute("softImage") != null) {
- add(sb, "attr", "softImage=" + spec.getAttribute("softImage"));
+ if (spec.getAttribute("softimage") != null) {
+ String value = (String) spec.getAttribute("softimage");
+ String[] sd = value.split("\\s+");
+ add(sb, "attr", "softimage=" + sd[0] + " " + sd[1]);
}
}
else {
for (String name : spec.getAttributeNames())
- if (!IGNORED_ATTRIBUTES.contains(name) ||
- spec.isBatchJob())
+ if (!IGNORED_ATTRIBUTES.contains(name) || spec.isBatchJob()) {
add(sb, "attr",
name + "=" + spec.getAttribute(name));
+ }
}
if (spec.getStageIn() != null) {
for (StagingSetEntry e : spec.getStageIn())
- add(sb, "stagein", absolutize(e.getSource()) + '\n' +
+ add(sb, "stagein", e.getSource() + '\n' +
e.getDestination() + '\n' + Mode.getId(e.getMode()));
}
if (spec.getStageOut() != null) {
for (StagingSetEntry e : spec.getStageOut())
add(sb, "stageout", e.getSource() + '\n' +
- absolutize(e.getDestination()) + '\n' + Mode.getId(e.getMode()));
+ e.getDestination() + '\n' + Mode.getId(e.getMode()));
}
if (spec.getCleanUpSet() != null)
@@ -182,22 +182,6 @@
}
}
- private String absolutize(String file) throws IOException {
- try {
- RemoteFile u = new RemoteFile(file);
- if (ABSOLUTIZE.contains(u.getProtocol())) {
- return u.getProtocol() + "://" + u.getHost() +
- (u.getPort() != -1 ? ":" + u.getPort() : "") + "/" + new File(u.getPath()).getAbsolutePath();
- }
- else {
- return file;
- }
- }
- catch (Exception e) {
- throw new IOException("Invalid file specification: " + file);
- }
- }
-
private void add(StringBuilder sb, String key, boolean value) throws IOException {
add(sb, key, String.valueOf(value));
}
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl (revision 3852)
+++ modules/provider-coaster/resources/worker.pl (working copy)
@@ -134,8 +134,20 @@
my $LAST_JOB_CHECK_TIME = 0;
my $JOB_COUNT = 0;
-my $SOFT_IMAGE_DST;
+use constant {
+ LOCK_SH => 1,
+ LOCK_EX => 2,
+ LOCK_NB => 4,
+ LOCK_UN => 8,
+};
+my $SOFT_IMAGE_MAIN_LOCK;
+my $SOFT_IMAGE_CREATE_LOCK;
+my $SOFT_IMAGE_USE_LOCK;
+my $SOFT_IMAGE_DIR;
+my $SOFT_IMAGE_LEAD_PROCESS = 0;
+my $SOFT_IMAGE_FIRST_IN_PROCESS = 1;
+
use constant BUFSZ => 2048;
use constant IOBUFSZ => 32768;
use constant IOBLOCKSZ => 8;
@@ -1090,8 +1102,9 @@
sub dieNicely {
my ($msg) = @_;
- cleanSoftImage();
wlog ERROR, "$msg\n";
+ wlog DEBUG, "dieNicely called\n";
+ cleanSoftImage();
if ($CONNECTED) {
$CONNECTED = 0; # avoid recursive calls to this method
queueCmd((nullCB(), "RLOG", "WARN", $msg));
@@ -1113,8 +1126,7 @@
my $pid = shift(@PROFILE_EVENTS);
my $timestamp = shift(@PROFILE_EVENTS);
my $pidnum = ( $pid =~ /\d+/ ) ? $pid : 0;
- wlog(INFO, sprintf("PROFILE: %-5s %6d %.3f\n",
- $event, $pidnum, $timestamp));
+ wlog(INFO, sprintf("PROFILE: %-5s %6d %.3f\n", $event, $pidnum, $timestamp));
}
}
}
@@ -1794,121 +1806,124 @@
return substr($fn, 0, 1) eq "/";
}
-sub readUInt32 {
- my ($fd) = @_;
- seek($fd, 0, 0) or wlog ERROR, "Seek failed: $!\n";
- my $n;
- read($fd, $n, 4) or wlog ERROR, "Read failed: $!\n";
- return unpack("V", $n);
-}
+sub unpackSoftImage {
+ my ($src) = @_;
-sub writeUInt32 {
- my ($fd, $n) = @_;
-
- seek($fd, 0, 0) or wlog ERROR, "Seek failed: $!\n";
- if (!(print {$fd} pack("V", $n))) {
- wlog ERROR, "Write failed: $!\n";
+ wlog DEBUG, "Running tar -xzf $src -C $SOFT_IMAGE_DIR\n";
+ my $out;
+ $out = qx/tar -xzf $src -C $SOFT_IMAGE_DIR 2>&1/;
+ if ($? != 0) {
+ die "Cannot create soft image: $!\n$out";
}
- return $n;
+ if (-x "$SOFT_IMAGE_DIR/start") {
+ wlog DEBUG, "Running $SOFT_IMAGE_DIR/start\n";
+ $out = qx/$SOFT_IMAGE_DIR\/start 2>&1/;
+ if ($? != 0) {
+ die "Error running soft image startup: $!\n$out";
+ }
+ if (dirname($src) eq $SOFT_IMAGE_DIR) {
+ wlog DEBUG, "Image was staged in. Removing package.\n";
+ unlink($src);
+ }
+ else {
+ wlog DEBUG, "Image was NOT staged in\n";
+ }
+ }
}
-# Ensures that each job has the $src tar.gz file unpacked in
-# the directory pointed to by $dst. In addition, ensure that
-# the unpacking is only done if necessary so that it can be
-# shared between: 1. workers running concurrently on this node
-# (if $dst points to a local disk) and 2. subsequent jobs
-# running in this worker.
-#
-# If the image contains commonly used binaries and libraries,
-# it can be combined with a $PATH and $LD_LIBRARY_PATH settings
-# to minimize access to network file systems that would otherwise
-# be the sources for those binaries/libraries.
-sub prepareSoftImage {
- my ($src, $dst) = @_;
- my $lock;
- my $counter;
- mkpath($dst);
- if (!open($lock, ">>$dst/.lock")) {
- die "Cannot open lock file: $!";
+sub acquireSoftImageLock {
+ if (!$SOFT_IMAGE_FIRST_IN_PROCESS) {
+ wlog DEBUG, "Not first in process\n";
+ return 0;
}
- # start critical section
- if (!flock($lock, 2)) { # 2 - exclusive lock
- die "Cannot get exclusive lock on soft image directory: $!";
+ $SOFT_IMAGE_FIRST_IN_PROCESS = 0;
+ createLocks();
+ wlog DEBUG, "SOFT_IMAGE_MAIN_LOCK: $SOFT_IMAGE_MAIN_LOCK\n";
+ writeLock($SOFT_IMAGE_MAIN_LOCK);
+ wlog DEBUG, "First in process\n";
+ if (tryWriteLock($SOFT_IMAGE_USE_LOCK)) {
+ wlog DEBUG, "First process\n";
+ unlock($SOFT_IMAGE_USE_LOCK);
+ # nobody using this yet
+ $SOFT_IMAGE_LEAD_PROCESS = 1;
+ writeLock($SOFT_IMAGE_CREATE_LOCK);
+ readLock($SOFT_IMAGE_USE_LOCK);
+ return 1;
}
-
- if (! -f "$dst/.count") {
- open($counter, "+>$dst/.count");
- }
else {
- open($counter, "+<$dst/.count");
+ wlog DEBUG, "Not first process\n";
+ readLock($SOFT_IMAGE_USE_LOCK);
+ return 0;
}
+}
+
+sub writeLock {
+ my ($lock) = @_;
- seek($counter, 0, 2);
- my $pos = tell($counter);
- wlog DEBUG, "Counter file pos: $pos\n";
- if ($pos == 0) {
- wlog INFO, "Lead process. Uncompressing image $src to $dst\n";
- wlog DEBUG, "Running tar -xzf $src -C $dst\n";
- my $out;
- $out = qx/tar -xzf $src -C $dst 2>&1/;
- if ($? != 0) {
- die "Cannot create soft image: $!\n$out";
- }
- $ENV{SOFTIMAGE} = $dst;
- if (-x "$dst/start") {
- wlog DEBUG, "Running $dst/start\n";
- $out = qx/$dst\/start 2>&1/;
- if ($? != 0) {
- die "Error running soft image startup: $!\n$out";
- }
- }
-
- writeUInt32($counter, 1);
- wlog DEBUG, "Soft image use count updated: 1\n";
- wlog DEBUG, "Soft image initialized\n";
+ wlog DEBUG, "writeLock($lock)\n";
+ flock($lock, LOCK_EX);
+}
+
+sub readLock {
+ my ($lock) = @_;
+
+ flock($lock, LOCK_SH);
+}
+
+sub unlock {
+ my ($lock) = @_;
+
+ flock($lock, LOCK_UN);
+}
+
+sub tryWriteLock {
+ my ($lock) = @_;
+
+ return flock($lock, LOCK_UN + LOCK_NB);
+}
+
+
+
+sub createLocks {
+ mkpath($SOFT_IMAGE_DIR);
+ if (!open($SOFT_IMAGE_MAIN_LOCK, ">>$SOFT_IMAGE_DIR/.main")) {
+ dieNicely("Cannot open lock file: $!");
}
- else {
- wlog DEBUG, "Not lead process\n";
- if (! -f "$dst/.l$BLOCKID") {
- my $n = writeUInt32($counter, readUInt32($counter) + 1);
- wlog DEBUG, "Soft image use count updated: $n\n";
- my $rl;
- open($rl, ">$dst/.l$BLOCKID");
- close($rl);
- }
+ if (!open($SOFT_IMAGE_CREATE_LOCK, ">>$SOFT_IMAGE_DIR/.create")) {
+ dieNicely("Cannot open lock file: $!");
}
- close($counter);
- close($lock);
- # end critical section
+ if (!open($SOFT_IMAGE_USE_LOCK, ">>$SOFT_IMAGE_DIR/.use")) {
+ dieNicely("Cannot open lock file: $!");
+ }
}
-sub cleanSoftImage() {
+sub cleanSoftImage {
my $lock;
my $counter;
- if (!defined $SOFT_IMAGE_DST) {
+ if (!defined $SOFT_IMAGE_DIR) {
return;
}
- open($lock, ">>$SOFT_IMAGE_DST/.lock");
- if (!flock($lock, 2)) {
- dieNicely("Cannot get exclusive lock on soft image directory: $!");
- }
- open($counter, "+<$SOFT_IMAGE_DST/.count");
- if (writeUInt32($counter, readUInt32($counter) - 1) == 0) {
- wlog INFO, "Tail process. Removing image from $SOFT_IMAGE_DST\n";
+ writeLock($SOFT_IMAGE_MAIN_LOCK);
- if (-x "$SOFT_IMAGE_DST/stop") {
- my $out = qx/$SOFT_IMAGE_DST\/stop 2>&1/;
- if ($? != 0) {
- die "Error running soft image shutdown: $!\n$out";
+ unlock($SOFT_IMAGE_USE_LOCK);
+ if (tryWriteLock($SOFT_IMAGE_USE_LOCK)) {
+ wlog INFO, "Tail process. Removing image from $SOFT_IMAGE_DIR\n";
+
+ if (-x "$SOFT_IMAGE_DIR/stop") {
+ my $out = qx/$SOFT_IMAGE_DIR\/stop 2>&1/;
+ if ($? != 0) {
+ die "Error running soft image shutdown: $!\n$out";
+ }
}
+
+ rmtree($SOFT_IMAGE_DIR, 0, 0);
+
+ unlock($SOFT_IMAGE_USE_LOCK);
}
- rmtree($SOFT_IMAGE_DST, 0, 0);
- }
- close($counter);
- close($lock);
+ unlock($SOFT_IMAGE_MAIN_LOCK);
}
sub submitjob {
@@ -1954,10 +1969,30 @@
# as long as the job is alive, enable debugging
$LOGLEVEL = DEBUG;
}
- elsif ($ap[0] eq "softImage") {
- $SOFTIMAGE = $ap[1];
- my @SS = split(/ /, $SOFTIMAGE);
- $SOFT_IMAGE_DST = $SS[1];
+ elsif ($ap[0] eq "softimage") {
+ my @SS = split(/ /, $ap[1]);
+ if (!defined $SOFT_IMAGE_DIR) {
+ $SOFT_IMAGE_DIR = $SS[1];
+ }
+ if (acquireSoftImageLock()) {
+ my ($proto, $host, $path) = urisplit($SS[0]);
+ if ($proto eq "sfs") {
+ # don't stage in; unpack directly
+ $SOFTIMAGE = $path;
+ }
+ else {
+ # treat the soft image as a normal stage-in file
+ push @STAGEIN, $SS[0];
+ my $fn = basename($path);
+ my $dest = "$SOFT_IMAGE_DIR/$fn";
+ push @STAGEIND, $dest;
+ $SOFTIMAGE = $dest;
+ }
+ }
+ else {
+ # prevent job from trying to unpack the image
+ $SOFTIMAGE = "";
+ }
}
else {
wlog WARN, "Ignoring attribute $ap[0] = $ap[1]\n";
@@ -2238,11 +2273,17 @@
my $sout = $$JOB{"stdout"};
my $serr = $$JOB{"stderr"};
- my $softImage = $$JOBDATA{"softimage"};
- if (defined $softImage) {
- prepareSoftImage(split(/ /, $softImage));
+ if ($SOFT_IMAGE_LEAD_PROCESS) {
+ unpackSoftImage($$JOBDATA{"softimage"});
+ unlock($SOFT_IMAGE_CREATE_LOCK);
}
-
+ # wait until the soft image is created
+ readLock($SOFT_IMAGE_CREATE_LOCK);
+ # no need to hold lock after that
+ unlock($SOFT_IMAGE_CREATE_LOCK);
+
+ $ENV{SOFTIMAGE} = $SOFT_IMAGE_DIR;
+
my $cwd = getcwd();
# wlog DEBUG, "CWD: $cwd\n";
# wlog DEBUG, "Running $executable\n";
More information about the Swift-commit
mailing list