[Swift-commit] r3170 - trunk/src/org/griphyn/vdl/karajan/lib

noreply at svn.ci.uchicago.edu noreply at svn.ci.uchicago.edu
Fri Oct 16 16:54:52 CDT 2009


Author: hategan
Date: 2009-10-16 16:54:51 -0500 (Fri, 16 Oct 2009)
New Revision: 3170

Added:
   trunk/src/org/griphyn/vdl/karajan/lib/FileCopier.java
Modified:
   trunk/src/org/griphyn/vdl/karajan/lib/SetFieldValue.java
Log:
handle non-composite assignments

Added: trunk/src/org/griphyn/vdl/karajan/lib/FileCopier.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/FileCopier.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/FileCopier.java	2009-10-16 21:54:51 UTC (rev 3170)
@@ -0,0 +1,145 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 16, 2009
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.globus.cog.abstraction.impl.common.StatusEvent;
+import org.globus.cog.abstraction.impl.common.task.FileTransferSpecificationImpl;
+import org.globus.cog.abstraction.impl.common.task.FileTransferTask;
+import org.globus.cog.abstraction.impl.common.task.FileTransferTaskHandler;
+import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
+import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
+import org.globus.cog.abstraction.impl.common.task.InvalidServiceContactException;
+import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
+import org.globus.cog.abstraction.impl.common.task.ServiceImpl;
+import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
+import org.globus.cog.abstraction.interfaces.FileTransferSpecification;
+import org.globus.cog.abstraction.interfaces.Service;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.StatusListener;
+import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.events.Event;
+import org.globus.cog.karajan.workflow.events.EventBus;
+import org.globus.cog.karajan.workflow.events.EventListener;
+import org.globus.cog.karajan.workflow.events.EventTargetPair;
+import org.globus.cog.karajan.workflow.futures.Future;
+import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
+import org.globus.cog.karajan.workflow.futures.FuturesMonitor;
+import org.griphyn.vdl.mapping.AbsFile;
+import org.griphyn.vdl.mapping.PhysicalFormat;
+
+public class FileCopier implements Future, StatusListener {
+    private static final TaskHandler fth = new FileTransferTaskHandler();
+
+    private FileTransferTask task;
+    private List actions;
+    private Exception exception;
+    private boolean closed;
+
+    public FileCopier(PhysicalFormat src, PhysicalFormat dst) {
+        AbsFile fsrc = (AbsFile) src;
+        AbsFile fdst = (AbsFile) dst;
+        FileTransferSpecification fts = new FileTransferSpecificationImpl();
+        fts.setDestinationDirectory(fdst.getDir());
+        fts.setDestinationFile(fdst.getPath());
+        fts.setSourceDirectory(fsrc.getDir());
+        fts.setSourceFile(fsrc.getPath());
+        fts.setThirdPartyIfPossible(true);
+        task = new FileTransferTask();
+        task.setSpecification(fts);
+        task.setService(Service.FILE_TRANSFER_SOURCE_SERVICE, new ServiceImpl(
+            fsrc.getProtocol(), new ServiceContactImpl(fsrc.getHost()), null));
+        task.setService(Service.FILE_TRANSFER_DESTINATION_SERVICE,
+            new ServiceImpl(fdst.getProtocol(), new ServiceContactImpl(fdst
+                .getHost()), null));
+        task.addStatusListener(this);
+    }
+
+    public synchronized void addModificationAction(EventListener target,
+            Event event) {
+        if (actions == null) {
+            actions = new LinkedList();
+        }
+        EventTargetPair etp = new EventTargetPair(event, target);
+        if (FuturesMonitor.debug) {
+            FuturesMonitor.monitor.add(etp, this);
+        }
+        synchronized (actions) {
+            actions.add(etp);
+        }
+        if (closed) {
+            actions();
+        }
+    }
+
+    public List getModificationActions() {
+        return actions;
+    }
+
+    private void actions() {
+        if (actions != null) {
+            synchronized (actions) {
+                java.util.Iterator i = actions.iterator();
+                while (i.hasNext()) {
+                    EventTargetPair etp = (EventTargetPair) i.next();
+                    if (FuturesMonitor.debug) {
+                        FuturesMonitor.monitor.remove(etp);
+                    }
+                    i.remove();
+                    EventBus.post(etp.getTarget(), etp.getEvent());
+                }
+            }
+        }
+    }
+
+    public void fail(FutureEvaluationException e) {
+        this.exception = e;
+        actions();
+    }
+
+    public Object getValue() throws ExecutionException {
+        return null;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void start() throws IllegalSpecException,
+            InvalidSecurityContextException, InvalidServiceContactException,
+            TaskSubmissionException {
+        fth.submit(task);
+    }
+
+    public void close() {
+        closed = true;
+        actions();
+    }
+
+    public void statusChanged(StatusEvent event) {
+        Status s = event.getStatus();
+        if (s.isTerminal()) {
+            if (s.getStatusCode() == Status.COMPLETED) {
+                close();
+            }
+            else {
+                this.exception = new Exception(s.getMessage(), s.getException());
+                close();
+            }
+        }
+    }
+    
+    public Exception getException() {
+        return exception;
+    }
+}

Modified: trunk/src/org/griphyn/vdl/karajan/lib/SetFieldValue.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SetFieldValue.java	2009-10-16 21:54:09 UTC (rev 3169)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SetFieldValue.java	2009-10-16 21:54:51 UTC (rev 3170)
@@ -4,16 +4,19 @@
 package org.griphyn.vdl.karajan.lib;
 
 import org.apache.log4j.Logger;
-import org.griphyn.vdl.karajan.Pair;
-import org.griphyn.vdl.karajan.PairIterator;
 import org.globus.cog.karajan.arguments.Arg;
 import org.globus.cog.karajan.stack.VariableStack;
 import org.globus.cog.karajan.workflow.ExecutionException;
 import org.globus.cog.karajan.workflow.futures.FutureNotYetAvailable;
+import org.griphyn.vdl.karajan.Pair;
+import org.griphyn.vdl.karajan.PairIterator;
 import org.griphyn.vdl.mapping.DSHandle;
 import org.griphyn.vdl.mapping.InvalidPathException;
 import org.griphyn.vdl.mapping.Path;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+
 public class SetFieldValue extends VDLFunction {
 	public static final Logger logger = Logger.getLogger(SetFieldValue.class);
 
@@ -28,21 +31,21 @@
 		try {
 			Path path = parsePath(OA_PATH.getValue(stack), stack);
 			DSHandle leaf = var.getField(path);
-			DSHandle value = (DSHandle)PA_VALUE.getValue(stack);
+			DSHandle value = (DSHandle) PA_VALUE.getValue(stack);
 			if (logger.isInfoEnabled()) {
 				logger.info("Setting " + leaf + " to " + value);
 			}
 			synchronized (var.getRoot()) {
-// TODO want to do a type check here, for runtime type checking
-// and pull out the appropriate internal value from value if it
-// is a DSHandle. There is no need (I think? maybe numerical casting?)
-// for type conversion here; but would be useful to have
-// type checking.
-				synchronized(value.getRoot()) {
-					if(!value.isClosed()) {
+            // TODO want to do a type check here, for runtime type checking
+            // and pull out the appropriate internal value from value if it
+            // is a DSHandle. There is no need (I think? maybe numerical casting?)
+            // for type conversion here; but would be useful to have
+            // type checking.
+				synchronized (value.getRoot()) {
+					if (!value.isClosed()) {
 						throw new FutureNotYetAvailable(addFutureListener(stack, value));
 					}
-					deepCopy(leaf,value,stack);
+					deepCopy(leaf, value, stack);
 				}
 			}
 			return null;
@@ -58,12 +61,12 @@
 	/** make dest look like source - if its a simple value, copy that
 	    and if its an array then recursively copy */
 	void deepCopy(DSHandle dest, DSHandle source, VariableStack stack) throws ExecutionException {
-		if(source.getType().isPrimitive()) {
+		if (source.getType().isPrimitive()) {
 			dest.setValue(source.getValue());
 		}
-		else if(source.getType().isArray()) {
+		else if (source.getType().isArray()) {
 			PairIterator it = new PairIterator(source.getArrayValue());
-			while(it.hasNext()) {
+			while (it.hasNext()) {
 				Pair pair = (Pair) it.next();
 				Object lhs = pair.get(0);
 				DSHandle rhs = (DSHandle) pair.get(1);
@@ -77,16 +80,56 @@
 				DSHandle field;
 				try {
 					field = dest.getField(memberPath);
-				} catch(InvalidPathException ipe) {
+				} 
+				catch (InvalidPathException ipe) {
 					throw new ExecutionException("Could not get destination field",ipe);
 				}
-				deepCopy(field,rhs,stack);
+				deepCopy(field, rhs, stack);
 			}
 			closeShallow(stack, dest);
-		} else {
-			// TODO implement this
-			throw new RuntimeException("Deep non-array structure copying not implemented, when trying to copy "+source);
+		} 
+		else if (!source.getType().isComposite()) {
+		    Path dpath = dest.getPathFromRoot();
+		    if (dest.getMapper().canBeRemapped(dpath)) {
+		        if (logger.isDebugEnabled()) {
+		            logger.debug("Remapping " + dest + " to " + source);
+		        }
+		        dest.getMapper().remap(dpath, source.getMapper().map(source.getPathFromRoot()));
+		        dest.closeShallow();
+		    }
+		    else {
+		        if (stack.currentFrame().isDefined("fc")) {
+		            FileCopier fc = (FileCopier) stack.currentFrame().getVar("fc");
+		            if (!fc.isClosed()) {
+		                throw new FutureNotYetAvailable(fc);
+		            }
+		            else {
+		                if (fc.getException() != null) {
+		                    throw new ExecutionException("Failed to copy " + source + " to " + dest, fc.getException());
+		                }
+		            }
+		        }
+		        else {
+		            FileCopier fc = new FileCopier(source.getMapper().map(source.getPathFromRoot()), 
+		                dest.getMapper().map(dpath));
+		            stack.setVar("fc", fc);
+		            try {
+		                fc.start();
+		                throw new FutureNotYetAvailable(fc);
+		            }
+		            catch (FutureNotYetAvailable e) {
+		                throw e;
+		            }
+		            catch (Exception e) {
+		                throw new ExecutionException("Failed to start file copy", e);
+		            }
+		        }
+		    }
 		}
+		else {
+		    // TODO implement this
+            //throw new RuntimeException("Deep non-array structure copying not implemented, when trying to copy "+source);
+		}
 	}
 
 }




More information about the Swift-commit mailing list