[Swift-commit] cog r3853

swift at ci.uchicago.edu swift at ci.uchicago.edu
Sat Dec 28 02:00:05 CST 2013


------------------------------------------------------------------------
r3853 | hategan | 2013-12-28 01:56:10 -0600 (Sat, 28 Dec 2013) | 1 line

make the soft image use normal staging mechanisms (unless sfs:// is used)
------------------------------------------------------------------------
Index: modules/provider-coaster/README.softimage
===================================================================
--- modules/provider-coaster/README.softimage	(revision 0)
+++ modules/provider-coaster/README.softimage	(revision 3853)
@@ -0,0 +1,50 @@
+This is the high-level overview/design of the locking used for
+managing soft images. Here are some of the issues:
+
+ - Waiting or time-consuming operations can only happen in runjob(), 
+   since that runs in a sub-process
+ - Clean presents a problem this way. However if another process starts 
+   unpacking a new image between the moment the decision to clean was made 
+   and when things are actually cleaned, it can lead to undesired results
+ - All things not happening inside runjob() are locally thread-safe.
+ - If anybody can do it in an easier way, please do
+   
+
+first = true
+lead = false
+
+submitjob() {
+	if (first) {
+		with lockw(main) {
+			if (trylockw(use)) {
+				unlockw(use)
+				lead = true
+				lockw(create)
+			}
+			lockr(use)
+		}
+		first = false
+	}
+}
+
+runjob() {
+	if (lead) {
+		init()
+		unlockw(create)
+	}
+
+	lockr(create)
+	runjob()
+	unlockr(create)
+}
+
+exit() {
+	with lockw(main) {
+		unlockr(use)
+		if (trylockw(use)) {
+			doClean = 1
+			clean()
+			unlockw(use)
+		}
+	}
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java	(revision 3852)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java	(working copy)
@@ -103,7 +103,16 @@
         }
 
         while ((s = helper.read("attr")) != null) {
-            spec.setAttribute(getKey(s), getValue(s));
+            String key = getKey(s);
+            String value = getValue(s);
+            
+            if ("softimage".equals(key)) {
+                String[] sd = value.split("\\s+");
+                spec.setAttribute(key, makeAbsolute(sd[0]) + " " + sd[1]);
+            }
+            else {
+                spec.setAttribute(key, value);
+            }
         }
 
         StagingSet ss = null;
Index: modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java	(revision 3852)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java	(working copy)
@@ -10,7 +10,6 @@
 package org.globus.cog.abstraction.impl.execution.coaster;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.Charset;
@@ -26,7 +25,6 @@
 import org.globus.cog.abstraction.interfaces.ExecutionService;
 import org.globus.cog.abstraction.interfaces.FileLocation;
 import org.globus.cog.abstraction.interfaces.JobSpecification;
-import org.globus.cog.abstraction.interfaces.RemoteFile;
 import org.globus.cog.abstraction.interfaces.Service;
 import org.globus.cog.abstraction.interfaces.StagingSetEntry;
 import org.globus.cog.abstraction.interfaces.StagingSetEntry.Mode;
@@ -51,7 +49,7 @@
     }
     
     public static final Set<String> ABSOLUTIZE = new HashSet<String>() {
-        {add("sfs");}
+        {}
     };
 
     private Task t;
@@ -113,31 +111,33 @@
     
         if (simple) {
         	add(sb, "attr", "maxwalltime=" + formatWalltime(spec.getAttribute("maxwalltime")));
-        	if (spec.getAttribute("tracePerformance") != null) {
-        	    add(sb, "attr", "tracePerformance=" + spec.getAttribute("tracePerformance"));
+        	if (spec.getAttribute("traceperformance") != null) {
+        	    add(sb, "attr", "traceperformance=" + spec.getAttribute("traceperformance"));
         	}
-        	if (spec.getAttribute("softImage") != null) {
-        	    add(sb, "attr", "softImage=" + spec.getAttribute("softImage"));
+        	if (spec.getAttribute("softimage") != null) {
+        	    String value = (String) spec.getAttribute("softimage");
+        	    String[] sd = value.split("\\s+");
+                add(sb, "attr", "softimage=" + sd[0] + " " + sd[1]);
         	}
         }
         else {
             for (String name : spec.getAttributeNames())
-                if (!IGNORED_ATTRIBUTES.contains(name) || 
-                        spec.isBatchJob())
+                if (!IGNORED_ATTRIBUTES.contains(name) || spec.isBatchJob()) {
                     add(sb, "attr", 
                         name + "=" + spec.getAttribute(name));
+                }
         }
             
         if (spec.getStageIn() != null) {
             for (StagingSetEntry e : spec.getStageIn())
-                add(sb, "stagein", absolutize(e.getSource()) + '\n' + 
+                add(sb, "stagein", e.getSource() + '\n' + 
                     e.getDestination() + '\n' + Mode.getId(e.getMode()));
         }
         
         if (spec.getStageOut() != null) {
             for (StagingSetEntry e : spec.getStageOut())
                 add(sb, "stageout", e.getSource() + '\n' + 
-                    absolutize(e.getDestination()) + '\n' + Mode.getId(e.getMode()));
+                    e.getDestination() + '\n' + Mode.getId(e.getMode()));
         }
 
         if (spec.getCleanUpSet() != null)
@@ -182,22 +182,6 @@
         }
     }
 
-    private String absolutize(String file) throws IOException {
-        try {
-            RemoteFile u = new RemoteFile(file);
-            if (ABSOLUTIZE.contains(u.getProtocol())) {
-                return u.getProtocol() + "://" + u.getHost() + 
-                    (u.getPort() != -1 ? ":" + u.getPort() : "") + "/" + new File(u.getPath()).getAbsolutePath(); 
-            }
-            else {
-                return file;
-            }
-        }
-        catch (Exception e) {
-            throw new IOException("Invalid file specification: " + file);
-        }
-    }
-
     private void add(StringBuilder sb, String key, boolean value) throws IOException {
         add(sb, key, String.valueOf(value));
     }
Index: modules/provider-coaster/resources/worker.pl
===================================================================
--- modules/provider-coaster/resources/worker.pl	(revision 3852)
+++ modules/provider-coaster/resources/worker.pl	(working copy)
@@ -134,8 +134,20 @@
 my $LAST_JOB_CHECK_TIME = 0;
 my $JOB_COUNT = 0;
 
-my $SOFT_IMAGE_DST;
+use constant {
+	LOCK_SH => 1,
+	LOCK_EX => 2,
+	LOCK_NB => 4,
+	LOCK_UN => 8,
+};
 
+my $SOFT_IMAGE_MAIN_LOCK;
+my $SOFT_IMAGE_CREATE_LOCK;
+my $SOFT_IMAGE_USE_LOCK;
+my $SOFT_IMAGE_DIR;
+my $SOFT_IMAGE_LEAD_PROCESS = 0;
+my $SOFT_IMAGE_FIRST_IN_PROCESS = 1;
+
 use constant BUFSZ => 2048;
 use constant IOBUFSZ => 32768;
 use constant IOBLOCKSZ => 8;
@@ -1090,8 +1102,9 @@
 sub dieNicely {
 	my ($msg) = @_;
 	
-	cleanSoftImage();	
 	wlog ERROR, "$msg\n";
+	wlog DEBUG, "dieNicely called\n";
+	cleanSoftImage();
 	if ($CONNECTED) {
 		$CONNECTED = 0; # avoid recursive calls to this method
 		queueCmd((nullCB(), "RLOG", "WARN", $msg));
@@ -1113,8 +1126,7 @@
 			my $pid       = shift(@PROFILE_EVENTS);
 			my $timestamp = shift(@PROFILE_EVENTS);
 			my $pidnum    = ( $pid =~ /\d+/ ) ? $pid : 0;
-			wlog(INFO, sprintf("PROFILE: %-5s %6d %.3f\n",
-                               $event, $pidnum, $timestamp));
+			wlog(INFO, sprintf("PROFILE: %-5s %6d %.3f\n", $event, $pidnum, $timestamp));
 		}
 	}
 }
@@ -1794,121 +1806,124 @@
 	return substr($fn, 0, 1) eq "/";
 }
 
-sub readUInt32 {
-	my ($fd) = @_;
-	seek($fd, 0, 0) or wlog ERROR, "Seek failed: $!\n";
-	my $n;
-	read($fd, $n, 4) or wlog ERROR, "Read failed: $!\n";
-	return unpack("V", $n);
-}
+sub unpackSoftImage {
+	my ($src) = @_;
 
-sub writeUInt32 {
-	my ($fd, $n) = @_;
-	
-	seek($fd, 0, 0) or wlog ERROR, "Seek failed: $!\n";
-	if (!(print {$fd} pack("V", $n))) {
-		wlog ERROR, "Write failed: $!\n";
+	wlog DEBUG, "Running tar -xzf $src -C $SOFT_IMAGE_DIR\n";
+	my $out;
+	$out = qx/tar -xzf $src -C $SOFT_IMAGE_DIR 2>&1/; 
+	if ($? != 0) {
+		die "Cannot create soft image: $!\n$out";
 	}
 	
-	return $n;
+	if (-x "$SOFT_IMAGE_DIR/start") {
+		wlog DEBUG, "Running $SOFT_IMAGE_DIR/start\n";
+		$out = qx/$SOFT_IMAGE_DIR\/start 2>&1/;
+		if ($? != 0) {
+			die "Error running soft image startup: $!\n$out";
+		}
+		if (dirname($src) eq $SOFT_IMAGE_DIR) {
+			wlog DEBUG, "Image was staged in. Removing package.\n";
+			unlink($src);
+		}
+		else {
+			wlog DEBUG, "Image was NOT staged in\n";
+		}
+	}		
 }
 
-# Ensures that each job has the $src tar.gz file unpacked in
-# the directory pointed to by $dst. In addition, ensure that
-# the unpacking is only done if necessary so that it can be
-# shared between: 1. workers running concurrently on this node
-# (if $dst points to a local disk) and 2. subsequent jobs
-# running in this worker.
-#
-# If the image contains commonly used binaries and libraries,
-# it can be combined with a $PATH and $LD_LIBRARY_PATH settings
-# to minimize access to network file systems that would otherwise
-# be the sources for those binaries/libraries.  
-sub prepareSoftImage {
-	my ($src, $dst) = @_;
-	my $lock;
-	my $counter;
-	mkpath($dst);
-	if (!open($lock, ">>$dst/.lock")) {
-		die "Cannot open lock file: $!";
+sub acquireSoftImageLock {
+	if (!$SOFT_IMAGE_FIRST_IN_PROCESS) {
+		wlog DEBUG, "Not first in process\n";
+		return 0;
 	}
-	# start critical section
-	if (!flock($lock, 2)) { # 2 - exclusive lock
-		die "Cannot get exclusive lock on soft image directory: $!"; 
+	$SOFT_IMAGE_FIRST_IN_PROCESS = 0;
+	createLocks();
+	wlog DEBUG, "SOFT_IMAGE_MAIN_LOCK: $SOFT_IMAGE_MAIN_LOCK\n";
+	writeLock($SOFT_IMAGE_MAIN_LOCK);
+	wlog DEBUG, "First in process\n";
+	if (tryWriteLock($SOFT_IMAGE_USE_LOCK)) {
+		wlog DEBUG, "First process\n";
+		unlock($SOFT_IMAGE_USE_LOCK);
+		# nobody using this yet
+		$SOFT_IMAGE_LEAD_PROCESS = 1;
+		writeLock($SOFT_IMAGE_CREATE_LOCK);
+		readLock($SOFT_IMAGE_USE_LOCK);
+		return 1;
 	}
-	
-	if (! -f "$dst/.count") {
-		open($counter, "+>$dst/.count");
-	}
 	else {
-		open($counter, "+<$dst/.count");
+		wlog DEBUG, "Not first process\n";
+		readLock($SOFT_IMAGE_USE_LOCK);
+		return 0;
 	}
+}
+
+sub writeLock {
+	my ($lock) = @_;
 	
-	seek($counter, 0, 2);
-	my $pos = tell($counter);
-	wlog DEBUG, "Counter file pos: $pos\n";
-	if ($pos == 0) {
-		wlog INFO, "Lead process. Uncompressing image $src to $dst\n";
-		wlog DEBUG, "Running tar -xzf $src -C $dst\n";
-		my $out;
-		$out = qx/tar -xzf $src -C $dst 2>&1/; 
-		if ($? != 0) {
-			die "Cannot create soft image: $!\n$out";
-		}
-		$ENV{SOFTIMAGE} = $dst;
-		if (-x "$dst/start") {
-			wlog DEBUG, "Running $dst/start\n";
-			$out = qx/$dst\/start 2>&1/;
-			if ($? != 0) {
-				die "Error running soft image startup: $!\n$out";
-			}
-		}
-		
-		writeUInt32($counter, 1);
-		wlog DEBUG, "Soft image use count updated: 1\n";
-		wlog DEBUG, "Soft image initialized\n";
+	wlog DEBUG, "writeLock($lock)\n";
+	flock($lock, LOCK_EX);
+}
+
+sub readLock {
+	my ($lock) = @_;
+	
+	flock($lock, LOCK_SH);
+}
+
+sub unlock {
+	my ($lock) = @_;
+	
+	flock($lock, LOCK_UN);
+}
+
+sub tryWriteLock {
+	my ($lock) = @_;
+	
+	return flock($lock, LOCK_UN + LOCK_NB);
+}
+
+
+
+sub createLocks {
+	mkpath($SOFT_IMAGE_DIR);
+	if (!open($SOFT_IMAGE_MAIN_LOCK, ">>$SOFT_IMAGE_DIR/.main")) {
+		dieNicely("Cannot open lock file: $!");
 	}
-	else {
-		wlog DEBUG, "Not lead process\n";
-		if (! -f "$dst/.l$BLOCKID") {
-			my $n = writeUInt32($counter, readUInt32($counter) + 1);
-			wlog DEBUG, "Soft image use count updated: $n\n";
-			my $rl;
-			open($rl, ">$dst/.l$BLOCKID");
-			close($rl);
-		}
+	if (!open($SOFT_IMAGE_CREATE_LOCK, ">>$SOFT_IMAGE_DIR/.create")) {
+		dieNicely("Cannot open lock file: $!");
 	}
-	close($counter);
-	close($lock);
-	# end critical section
+	if (!open($SOFT_IMAGE_USE_LOCK, ">>$SOFT_IMAGE_DIR/.use")) {
+		dieNicely("Cannot open lock file: $!");
+	}
 }
 
-sub cleanSoftImage() {
+sub cleanSoftImage {
 	my $lock;
 	my $counter;
-	if (!defined $SOFT_IMAGE_DST) {
+	if (!defined $SOFT_IMAGE_DIR) {
 		return;
 	}
-	open($lock, ">>$SOFT_IMAGE_DST/.lock");
-	if (!flock($lock, 2)) {
-		dieNicely("Cannot get exclusive lock on soft image directory: $!"); 
-	}
 	
-	open($counter, "+<$SOFT_IMAGE_DST/.count");
-	if (writeUInt32($counter, readUInt32($counter) - 1) == 0) {
-		wlog INFO, "Tail process. Removing image from $SOFT_IMAGE_DST\n";
+	writeLock($SOFT_IMAGE_MAIN_LOCK);
 		
-		if (-x "$SOFT_IMAGE_DST/stop") {
-			my $out = qx/$SOFT_IMAGE_DST\/stop 2>&1/;
-			if ($? != 0) {
-				die "Error running soft image shutdown: $!\n$out";
+		unlock($SOFT_IMAGE_USE_LOCK);
+		if (tryWriteLock($SOFT_IMAGE_USE_LOCK)) {
+			wlog INFO, "Tail process. Removing image from $SOFT_IMAGE_DIR\n";
+		
+			if (-x "$SOFT_IMAGE_DIR/stop") {
+				my $out = qx/$SOFT_IMAGE_DIR\/stop 2>&1/;
+				if ($? != 0) {
+					die "Error running soft image shutdown: $!\n$out";
+				}
 			}
+			
+			rmtree($SOFT_IMAGE_DIR, 0, 0);
+			
+			unlock($SOFT_IMAGE_USE_LOCK);
 		}
 		
-		rmtree($SOFT_IMAGE_DST, 0, 0);
-	}
-	close($counter);
-	close($lock);
+	unlock($SOFT_IMAGE_MAIN_LOCK);
 }
 
 sub submitjob {
@@ -1954,10 +1969,30 @@
 				# as long as the job is alive, enable debugging
 				$LOGLEVEL = DEBUG;
 			}
-			elsif ($ap[0] eq "softImage") {
-				$SOFTIMAGE = $ap[1];
-				my @SS = split(/ /, $SOFTIMAGE);
-				$SOFT_IMAGE_DST = $SS[1];	
+			elsif ($ap[0] eq "softimage") {
+				my @SS = split(/ /, $ap[1]);
+				if (!defined $SOFT_IMAGE_DIR) {
+					$SOFT_IMAGE_DIR = $SS[1];
+				}
+				if (acquireSoftImageLock()) {
+					my ($proto, $host, $path) = urisplit($SS[0]);
+					if ($proto eq "sfs") {
+						# don't stage in; unpack directly
+						$SOFTIMAGE = $path;
+					}
+					else {
+						# treat the soft image as a normal stage-in file
+						push @STAGEIN, $SS[0];
+						my $fn = basename($path);
+						my $dest = "$SOFT_IMAGE_DIR/$fn";
+						push @STAGEIND, $dest;
+						$SOFTIMAGE = $dest;
+					}
+				}
+				else {
+					# prevent job from trying to unpack the image
+					$SOFTIMAGE = "";
+				}
 			}
 			else {
 				wlog WARN, "Ignoring attribute $ap[0] = $ap[1]\n";
@@ -2238,11 +2273,17 @@
 	my $sout = $$JOB{"stdout"};
 	my $serr = $$JOB{"stderr"};
 	
-	my $softImage = $$JOBDATA{"softimage"};
-	if (defined $softImage) {
-		prepareSoftImage(split(/ /, $softImage));
+	if ($SOFT_IMAGE_LEAD_PROCESS) {
+		unpackSoftImage($$JOBDATA{"softimage"});
+		unlock($SOFT_IMAGE_CREATE_LOCK);
 	}
-
+	# wait until the soft image is created
+	readLock($SOFT_IMAGE_CREATE_LOCK);
+	# no need to hold lock after that
+	unlock($SOFT_IMAGE_CREATE_LOCK);
+	
+	$ENV{SOFTIMAGE} = $SOFT_IMAGE_DIR;
+	
 	my $cwd = getcwd();
 	# wlog DEBUG, "CWD: $cwd\n";
 	# wlog DEBUG, "Running $executable\n";



More information about the Swift-commit mailing list