[Swift-commit] r3170 - trunk/src/org/griphyn/vdl/karajan/lib
noreply at svn.ci.uchicago.edu
noreply at svn.ci.uchicago.edu
Fri Oct 16 16:54:52 CDT 2009
Author: hategan
Date: 2009-10-16 16:54:51 -0500 (Fri, 16 Oct 2009)
New Revision: 3170
Added:
trunk/src/org/griphyn/vdl/karajan/lib/FileCopier.java
Modified:
trunk/src/org/griphyn/vdl/karajan/lib/SetFieldValue.java
Log:
handle non-composite assignments
Added: trunk/src/org/griphyn/vdl/karajan/lib/FileCopier.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/FileCopier.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/lib/FileCopier.java 2009-10-16 21:54:51 UTC (rev 3170)
@@ -0,0 +1,145 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 16, 2009
+ */
+package org.griphyn.vdl.karajan.lib;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.globus.cog.abstraction.impl.common.StatusEvent;
+import org.globus.cog.abstraction.impl.common.task.FileTransferSpecificationImpl;
+import org.globus.cog.abstraction.impl.common.task.FileTransferTask;
+import org.globus.cog.abstraction.impl.common.task.FileTransferTaskHandler;
+import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
+import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
+import org.globus.cog.abstraction.impl.common.task.InvalidServiceContactException;
+import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
+import org.globus.cog.abstraction.impl.common.task.ServiceImpl;
+import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
+import org.globus.cog.abstraction.interfaces.FileTransferSpecification;
+import org.globus.cog.abstraction.interfaces.Service;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.StatusListener;
+import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.karajan.workflow.ExecutionException;
+import org.globus.cog.karajan.workflow.events.Event;
+import org.globus.cog.karajan.workflow.events.EventBus;
+import org.globus.cog.karajan.workflow.events.EventListener;
+import org.globus.cog.karajan.workflow.events.EventTargetPair;
+import org.globus.cog.karajan.workflow.futures.Future;
+import org.globus.cog.karajan.workflow.futures.FutureEvaluationException;
+import org.globus.cog.karajan.workflow.futures.FuturesMonitor;
+import org.griphyn.vdl.mapping.AbsFile;
+import org.griphyn.vdl.mapping.PhysicalFormat;
+
+public class FileCopier implements Future, StatusListener {
+ private static final TaskHandler fth = new FileTransferTaskHandler();
+
+ private FileTransferTask task;
+ private List actions;
+ private Exception exception;
+ private boolean closed;
+
+ public FileCopier(PhysicalFormat src, PhysicalFormat dst) {
+ AbsFile fsrc = (AbsFile) src;
+ AbsFile fdst = (AbsFile) dst;
+ FileTransferSpecification fts = new FileTransferSpecificationImpl();
+ fts.setDestinationDirectory(fdst.getDir());
+ fts.setDestinationFile(fdst.getPath());
+ fts.setSourceDirectory(fsrc.getDir());
+ fts.setSourceFile(fsrc.getPath());
+ fts.setThirdPartyIfPossible(true);
+ task = new FileTransferTask();
+ task.setSpecification(fts);
+ task.setService(Service.FILE_TRANSFER_SOURCE_SERVICE, new ServiceImpl(
+ fsrc.getProtocol(), new ServiceContactImpl(fsrc.getHost()), null));
+ task.setService(Service.FILE_TRANSFER_DESTINATION_SERVICE,
+ new ServiceImpl(fdst.getProtocol(), new ServiceContactImpl(fdst
+ .getHost()), null));
+ task.addStatusListener(this);
+ }
+
+ public synchronized void addModificationAction(EventListener target,
+ Event event) {
+ if (actions == null) {
+ actions = new LinkedList();
+ }
+ EventTargetPair etp = new EventTargetPair(event, target);
+ if (FuturesMonitor.debug) {
+ FuturesMonitor.monitor.add(etp, this);
+ }
+ synchronized (actions) {
+ actions.add(etp);
+ }
+ if (closed) {
+ actions();
+ }
+ }
+
+ public List getModificationActions() {
+ return actions;
+ }
+
+ private void actions() {
+ if (actions != null) {
+ synchronized (actions) {
+ java.util.Iterator i = actions.iterator();
+ while (i.hasNext()) {
+ EventTargetPair etp = (EventTargetPair) i.next();
+ if (FuturesMonitor.debug) {
+ FuturesMonitor.monitor.remove(etp);
+ }
+ i.remove();
+ EventBus.post(etp.getTarget(), etp.getEvent());
+ }
+ }
+ }
+ }
+
+ public void fail(FutureEvaluationException e) {
+ this.exception = e;
+ actions();
+ }
+
+ public Object getValue() throws ExecutionException {
+ return null;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void start() throws IllegalSpecException,
+ InvalidSecurityContextException, InvalidServiceContactException,
+ TaskSubmissionException {
+ fth.submit(task);
+ }
+
+ public void close() {
+ closed = true;
+ actions();
+ }
+
+ public void statusChanged(StatusEvent event) {
+ Status s = event.getStatus();
+ if (s.isTerminal()) {
+ if (s.getStatusCode() == Status.COMPLETED) {
+ close();
+ }
+ else {
+ this.exception = new Exception(s.getMessage(), s.getException());
+ close();
+ }
+ }
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+}
Modified: trunk/src/org/griphyn/vdl/karajan/lib/SetFieldValue.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/lib/SetFieldValue.java 2009-10-16 21:54:09 UTC (rev 3169)
+++ trunk/src/org/griphyn/vdl/karajan/lib/SetFieldValue.java 2009-10-16 21:54:51 UTC (rev 3170)
@@ -4,16 +4,19 @@
package org.griphyn.vdl.karajan.lib;
import org.apache.log4j.Logger;
-import org.griphyn.vdl.karajan.Pair;
-import org.griphyn.vdl.karajan.PairIterator;
import org.globus.cog.karajan.arguments.Arg;
import org.globus.cog.karajan.stack.VariableStack;
import org.globus.cog.karajan.workflow.ExecutionException;
import org.globus.cog.karajan.workflow.futures.FutureNotYetAvailable;
+import org.griphyn.vdl.karajan.Pair;
+import org.griphyn.vdl.karajan.PairIterator;
import org.griphyn.vdl.mapping.DSHandle;
import org.griphyn.vdl.mapping.InvalidPathException;
import org.griphyn.vdl.mapping.Path;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+
public class SetFieldValue extends VDLFunction {
public static final Logger logger = Logger.getLogger(SetFieldValue.class);
@@ -28,21 +31,21 @@
try {
Path path = parsePath(OA_PATH.getValue(stack), stack);
DSHandle leaf = var.getField(path);
- DSHandle value = (DSHandle)PA_VALUE.getValue(stack);
+ DSHandle value = (DSHandle) PA_VALUE.getValue(stack);
if (logger.isInfoEnabled()) {
logger.info("Setting " + leaf + " to " + value);
}
synchronized (var.getRoot()) {
-// TODO want to do a type check here, for runtime type checking
-// and pull out the appropriate internal value from value if it
-// is a DSHandle. There is no need (I think? maybe numerical casting?)
-// for type conversion here; but would be useful to have
-// type checking.
- synchronized(value.getRoot()) {
- if(!value.isClosed()) {
+ // TODO want to do a type check here, for runtime type checking
+ // and pull out the appropriate internal value from value if it
+ // is a DSHandle. There is no need (I think? maybe numerical casting?)
+ // for type conversion here; but would be useful to have
+ // type checking.
+ synchronized (value.getRoot()) {
+ if (!value.isClosed()) {
throw new FutureNotYetAvailable(addFutureListener(stack, value));
}
- deepCopy(leaf,value,stack);
+ deepCopy(leaf, value, stack);
}
}
return null;
@@ -58,12 +61,12 @@
/** make dest look like source - if its a simple value, copy that
and if its an array then recursively copy */
void deepCopy(DSHandle dest, DSHandle source, VariableStack stack) throws ExecutionException {
- if(source.getType().isPrimitive()) {
+ if (source.getType().isPrimitive()) {
dest.setValue(source.getValue());
}
- else if(source.getType().isArray()) {
+ else if (source.getType().isArray()) {
PairIterator it = new PairIterator(source.getArrayValue());
- while(it.hasNext()) {
+ while (it.hasNext()) {
Pair pair = (Pair) it.next();
Object lhs = pair.get(0);
DSHandle rhs = (DSHandle) pair.get(1);
@@ -77,16 +80,56 @@
DSHandle field;
try {
field = dest.getField(memberPath);
- } catch(InvalidPathException ipe) {
+ }
+ catch (InvalidPathException ipe) {
throw new ExecutionException("Could not get destination field",ipe);
}
- deepCopy(field,rhs,stack);
+ deepCopy(field, rhs, stack);
}
closeShallow(stack, dest);
- } else {
- // TODO implement this
- throw new RuntimeException("Deep non-array structure copying not implemented, when trying to copy "+source);
+ }
+ else if (!source.getType().isComposite()) {
+ Path dpath = dest.getPathFromRoot();
+ if (dest.getMapper().canBeRemapped(dpath)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Remapping " + dest + " to " + source);
+ }
+ dest.getMapper().remap(dpath, source.getMapper().map(source.getPathFromRoot()));
+ dest.closeShallow();
+ }
+ else {
+ if (stack.currentFrame().isDefined("fc")) {
+ FileCopier fc = (FileCopier) stack.currentFrame().getVar("fc");
+ if (!fc.isClosed()) {
+ throw new FutureNotYetAvailable(fc);
+ }
+ else {
+ if (fc.getException() != null) {
+ throw new ExecutionException("Failed to copy " + source + " to " + dest, fc.getException());
+ }
+ }
+ }
+ else {
+ FileCopier fc = new FileCopier(source.getMapper().map(source.getPathFromRoot()),
+ dest.getMapper().map(dpath));
+ stack.setVar("fc", fc);
+ try {
+ fc.start();
+ throw new FutureNotYetAvailable(fc);
+ }
+ catch (FutureNotYetAvailable e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new ExecutionException("Failed to start file copy", e);
+ }
+ }
+ }
}
+ else {
+ // TODO implement this
+ //throw new RuntimeException("Deep non-array structure copying not implemented, when trying to copy "+source);
+ }
}
}
More information about the Swift-commit
mailing list