[Swift-commit] cog r3553

swift at ci.uchicago.edu swift at ci.uchicago.edu
Wed Jan 16 01:30:07 CST 2013


------------------------------------------------------------------------
r3553 | hategan | 2013-01-16 01:28:22 -0600 (Wed, 16 Jan 2013) | 1 line

removed command timeouts and revert to channel timeouts
------------------------------------------------------------------------
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java	(working copy)
@@ -33,10 +33,7 @@
 
 public abstract class RequestReply {
 	public static final Logger logger = Logger.getLogger(RequestReply.class);
-	
-	public static final int DEFAULT_TIMEOUT = 120 * 1000;
-	private int timeout = DEFAULT_TIMEOUT;
-	
+		
 	public static final int NOID = -1;
 	private int id;
 	private String outCmd;
@@ -45,10 +42,7 @@
 	private List<byte[]> inData;
 	private boolean inDataReceived;
 	private KarajanChannel channel;
-	private long lastTime = Long.MAX_VALUE;
-
-	// private static final byte[] NO_EXCEPTION = new byte[0];
-
+	
 	protected String getInCmd() {
 		return inCmd;
 	}
@@ -152,7 +146,6 @@
 	public abstract void send(boolean err) throws ProtocolException;
 	
 	protected void dataReceived(boolean fin, boolean error, byte[] data) throws ProtocolException {
-		setLastTime(System.currentTimeMillis());
 	}
 		
 	protected synchronized void addInData(boolean fin, boolean err, byte[] data) {
@@ -405,28 +398,7 @@
 	protected Object getInObject(int index) {
 		return deserialize(getInData(index));
 	}
-
-	public long getLastTime() {
-		return lastTime;
-	}
-
-	public void setLastTime(long lastTime) {
-		this.lastTime = lastTime;
-	}
 	
-	public int getTimeout() {
-		return timeout;
-	}
-
-	public void setTimeout(int timeout) {
-		this.timeout = timeout;
-	}
-
-	public void handleTimeout() {
-		logger.warn("Unhandled timeout", new Throwable());
-		setLastTime(Long.MAX_VALUE);
-	}
-	
 	public void handleSignal(byte[] data) {
 		
 	}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java	(working copy)
@@ -15,7 +15,6 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
 
 import org.apache.log4j.Logger;
 import org.globus.cog.karajan.workflow.events.EventBus;
@@ -42,8 +41,6 @@
 	private int cmdseq;
 	private TagTable<Command> activeSenders;
 	private TagTable<RequestHandler> activeReceivers;
-	private TagTable<Long> ignoredRequests;
-	private static Timer timer;
 	private ServiceContext serviceContext;
 	private int reconnectionAttempts;
 	private long lastHeartBeat;
@@ -224,7 +221,7 @@
 
 	public void notifyRegisteredCommandsAndHandlers(Exception e) {
 		if (logger.isInfoEnabled()) {
-			logger.info("Notifying commands and handlers about exception");
+			logger.info("Notifying commands and handlers about exception", e);
 		}
 		notifyListeners(activeReceivers, e);
 		notifyListeners(activeSenders, e);
@@ -243,15 +240,6 @@
 		}
 	}
 
-	public Timer getTimer() {
-		synchronized (ChannelContext.class) {
-			if (timer == null) {
-				timer = new Timer();
-			}
-			return timer;
-		}
-	}
-
 	public void reexecute(final Command command) {
 		//TODO
 	}
@@ -334,37 +322,5 @@
 	        }
 	    }
 	}
-
-	public synchronized void ignoreRequest(int tag, int timeout) {
-		if (ignoredRequests == null) {
-			ignoredRequests = new TagTable<Long>();
-		}
-		ignoredRequests.put(tag, System.currentTimeMillis() + timeout);
-	}
-
-	public synchronized boolean isIgnoredRequest(int tag) {
-		if (ignoredRequests == null) {
-			return false;
-		}
-		else {
-			return ignoredRequests.containsKey(tag); 
-		}
-	}
-
-	public synchronized void removeOldIgnoredRequests() {
-		if (ignoredRequests == null) {
-			return;
-		}
-		long now = System.currentTimeMillis();
-		for (Integer i : ignoredRequests.keys()) {
-			if (ignoredRequests.get(i) < now) {
-				ignoredRequests.remove(i);
-			}
-		}
-		if (ignoredRequests.isEmpty()) {
-			ignoredRequests = null;
-		}
-	}
 	
-	
 }
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractPipedChannel.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractPipedChannel.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractPipedChannel.java	(working copy)
@@ -46,6 +46,11 @@
 		// no heart beat for these
 	}
 
+	@Override
+	public void configureTimeoutChecks() {
+		// no timeouts either
+	}
+
 	public boolean isOffline() {
 		return false;
 	}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java	(working copy)
@@ -99,8 +99,6 @@
 	void flush() throws IOException;
 	
 	SelectableChannel getNIOChannel();
-
-	void ignoreRequest(RequestHandler h, int timeout);
 	
 	boolean handleChannelException(Exception e);		
 }
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java	(working copy)
@@ -10,7 +10,6 @@
 package org.globus.cog.karajan.workflow.service.channels;
 
 import java.io.IOException;
-import java.util.Timer;
 import java.util.TimerTask;
 
 import org.apache.log4j.Logger;
@@ -39,6 +38,11 @@
 	public synchronized void sendTaggedData(int tag, int flags, byte[] data, SendCallback cb) {
 		current.sendTaggedData(tag, flags, data, cb);
 	}
+	
+	@Override
+    public void configureTimeoutChecks() {
+        // only for the actual channels
+    }
 
 	public void registerCommand(Command cmd) throws ProtocolException {
 		current.registerCommand(cmd);
@@ -112,13 +116,9 @@
 				ChannelManager.getManager().unregisterChannel(MetaChannel.this);
 			}
 		};
-		getTimer().schedule(deactivator, (long) seconds * 1000);
+		Timer.schedule(deactivator, (long) seconds * 1000);
 	}
 	
-	public Timer getTimer() {
-		return getChannelContext().getTimer();
-	}
-
 	public synchronized void poll(final int seconds) {
 		if (poller != null) {
 			return;
@@ -152,7 +152,7 @@
 			}
 		};
 		long interval = (long) seconds * 1000;
-		getTimer().schedule(poller, interval, interval);
+		Timer.schedule(poller, interval, interval);
 	}
 
 	public String toString() {
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/Timer.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/Timer.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/Timer.java	(working copy)
@@ -21,4 +21,16 @@
 			}			
 		}, interval, interval);
 	}
+	
+	public static void every(final long interval, final TimerTask task) {
+		TIMER.scheduleAtFixedRate(task, interval, interval);
+	}
+
+	public static void schedule(TimerTask task, long delay, long period) {
+		TIMER.schedule(task, delay, period);
+	}
+
+	public static void schedule(TimerTask task, long delay) {
+		TIMER.schedule(task, delay);
+	}
 }
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java	(working copy)
@@ -15,7 +15,6 @@
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectableChannel;
-import java.util.Collection;
 import java.util.TimerTask;
 
 import org.apache.log4j.Logger;
@@ -24,8 +23,8 @@
 import org.globus.cog.karajan.workflow.service.RemoteConfiguration;
 import org.globus.cog.karajan.workflow.service.RemoteConfiguration.Entry;
 import org.globus.cog.karajan.workflow.service.RequestManager;
-import org.globus.cog.karajan.workflow.service.RequestReply;
 import org.globus.cog.karajan.workflow.service.Service;
+import org.globus.cog.karajan.workflow.service.TimeoutException;
 import org.globus.cog.karajan.workflow.service.UserContext;
 import org.globus.cog.karajan.workflow.service.commands.Command;
 import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
@@ -38,7 +37,8 @@
 	public static final int DEFAULT_HBI_SPREAD = 10;
 	
 	public static final int TIMEOUT_CHECK_INTERVAL = 1;
-
+	public static final int TIMEOUT = 120;
+	
 	private ChannelContext context;
 	private volatile int usageCount, longTermUsageCount;
 	private RequestManager requestManager;
@@ -47,6 +47,7 @@
 	private String name;
 	private Service callbackService;
 	private final boolean client;
+	private long lastTime;
 
 	protected AbstractKarajanChannel(RequestManager requestManager, ChannelContext channelContext,
 			boolean client) {
@@ -58,6 +59,7 @@
 		this.client = client;
 		configureHeartBeat();
 		configureTimeoutChecks();
+		updateLastTime();
 	}
 
 	protected void configureHeartBeat() {
@@ -99,47 +101,45 @@
 	public void scheduleHeartbeats(int heartBeatInterval) {
 	    TimerTask heartBeatTask;
 	    heartBeatTask = new HeartBeatTask(this);
-	    context.getTimer().schedule(heartBeatTask, 
-        heartBeatInterval + (int) (Math.random() * DEFAULT_HBI_INITIAL_SPREAD * 1000), 
-        heartBeatInterval + (int) (Math.random() * DEFAULT_HBI_SPREAD * 1000));
+	    Timer.schedule(heartBeatTask, 
+	    		heartBeatInterval + (int) (Math.random() * DEFAULT_HBI_INITIAL_SPREAD * 1000), 
+	    		heartBeatInterval + (int) (Math.random() * DEFAULT_HBI_SPREAD * 1000));
 	}
 	
 	public void scheduleHeartbeatCheck(int heartBeatInterval) {
 	    TimerTask heartBeatTask;
 	    int mult = 2;
 	    heartBeatTask = new HeartBeatCheckTask(this, heartBeatInterval, mult);
-        context.getTimer().schedule(heartBeatTask, mult * heartBeatInterval,
-        		mult * heartBeatInterval);
+        Timer.every(mult * heartBeatInterval, heartBeatTask);
 	}
 	
 	public void configureTimeoutChecks() {
-	    context.getTimer().schedule(new TimerTask() {
+		Timer.every(TIMEOUT_CHECK_INTERVAL * 1000, new TimerTask() {
 			public void run() {
 			    checkTimeouts();
-			}},
-	        TIMEOUT_CHECK_INTERVAL * 1000, TIMEOUT_CHECK_INTERVAL * 1000);
+			}}
+		);
 	}
 	
 	protected void checkTimeouts() {
-	    checkTimeouts(context.getActiveCommands());
-	    checkTimeouts(context.getActiveHandlers());
-	    getChannelContext().removeOldIgnoredRequests();
+		long now = System.currentTimeMillis();
+		long lastTime = getLastTime();
+		if (now - lastTime > TIMEOUT * 1000) {
+		    lastTime = Long.MAX_VALUE;
+		    TimeoutException e = new TimeoutException(this, "Channel timed out", lastTime);
+			context.notifyRegisteredCommandsAndHandlers(e);
+			handleChannelException(e);
+		}
 	}
 	
-	private void checkTimeouts(Collection<? extends RequestReply> l) {
-	    long now = System.currentTimeMillis();
-	    for (RequestReply r : l) {
-	    	if (now - r.getLastTime() > r.getTimeout()) {
-	    		try {
-	    			r.handleTimeout();
-	    		}
-	    		catch (Exception e) {
-	    			logger.warn("Error handling timeout", e);
-	    		}
-	    	}
-	    }
+	protected synchronized void updateLastTime() {
+		lastTime = System.currentTimeMillis();
 	}
-
+	
+	protected synchronized long getLastTime() {
+	    return lastTime;
+	}
+	
 	protected boolean clientControlsHeartbeats() {
 	    return true;
 	}
@@ -386,6 +386,7 @@
 					flagIsSet(flags, FINAL_FLAG) + ", err = " + flagIsSet(flags, ERROR_FLAG)
 					+ ", datalen = " + len + ", data = " + ppByteBuf(data));
 		}
+		updateLastTime();
 		Command cmd = getChannelContext().getRegisteredCommand(tag);
 		if (cmd != null) {
 			try {
@@ -446,6 +447,7 @@
 					flagIsSet(flags, FINAL_FLAG) + ", err = " + flagIsSet(flags, ERROR_FLAG)
 					+ ", datalen = " + len + ", data = " + ppByteBuf(data));
 		}
+		updateLastTime();
 		RequestHandler handler = getChannelContext().getRegisteredHandler(tag);
 		boolean fin = finalFlagIsSet(flags);
 		boolean err = errorFlagIsSet(flags);
@@ -492,9 +494,7 @@
 					}
 				}
 				catch (NoSuchHandlerException e) {
-					if (!getChannelContext().isIgnoredRequest(tag)) {
-						logger.warn(getName() + "Could not handle request", e);
-					}
+					logger.warn(getName() + "Could not handle request", e);
 				}
 
 			}
@@ -547,11 +547,6 @@
 		return null;
 	}
 
-	@Override
-	public void ignoreRequest(RequestHandler h, int timeout) {
-		getChannelContext().ignoreRequest(h.getId(), timeout);
-	}
-
 	public synchronized boolean handleChannelException(Exception e) {
         logger.info("Channel config: " + getChannelContext().getConfiguration());
         if (!ChannelManager.getManager().handleChannelException(this, e)) {
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java	(working copy)
@@ -128,7 +128,7 @@
 		}
 	}
 
-	public void execute(Command command) throws IOException, ProtocolException {
+	public void execute(Command command) throws IOException, ProtocolException, InterruptedException {
 		command.execute(channel);
 	}
 
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/RequestHandler.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/RequestHandler.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/RequestHandler.java	(working copy)
@@ -15,7 +15,6 @@
 import org.apache.log4j.Logger;
 import org.globus.cog.karajan.workflow.service.ProtocolException;
 import org.globus.cog.karajan.workflow.service.RequestReply;
-import org.globus.cog.karajan.workflow.service.TimeoutException;
 import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
 
 public abstract class RequestHandler extends RequestReply {
@@ -35,7 +34,6 @@
 	}
 	
 	protected void sendReply() throws ProtocolException {
-	    setLastTime(System.currentTimeMillis());
 		send();
 		replySent = true;
 	}
@@ -118,14 +116,4 @@
 	public String toString() {
 		return "Handler(tag: " + getId() + ", " + getInCmd() + ")";
 	}
-
-	public void handleTimeout() {
-		if (isInDataReceived()) {
-			return;
-		}
-		setLastTime(Long.MAX_VALUE);
-		TimeoutException t = new TimeoutException(this, "Timed out receiving request");
-		logger.info(t.getMessage());
-		errorReceived(t.getMessage(), t);
-	}
 }
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/TimeoutException.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/TimeoutException.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/TimeoutException.java	(working copy)
@@ -13,39 +13,18 @@
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
-import org.globus.cog.karajan.workflow.service.commands.Command;
-import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
 
-public class TimeoutException extends ProtocolException {
-	private static final long serialVersionUID = -6781619140427115780L;
-
+public class TimeoutException extends Exception {
 	public static final DateFormat DF = new SimpleDateFormat("yyMMdd-HHmmss.SSS");
 	
-	public TimeoutException(Command c, String msg) {
-	    super(c + " " + msg + ". sendReqTime="
-                    + DF.format(new Date(c.getSendReqTime())) + ", lastSendTime=" + DF.format(new Date(c.getSendTime()))
-                    + ", now=" + DF.format(new Date()) + ", channel=" + c.getChannel());
+	public TimeoutException(String msg) {
+	    super(msg);
     }
 	
-	public TimeoutException(RequestHandler h, String msg) {
-        super(h + " " + msg + ". lastTime="
-                    + DF.format(new Date(h.getLastTime()))
-                    + ", now=" + DF.format(new Date()) + ", channel=" + h.getChannel());
+	public TimeoutException(KarajanChannel channel, String msg, long lastTime) {
+        super(msg + ". lastTime="
+                    + DF.format(new Date(lastTime))
+                    + ", now=" + DF.format(new Date()) + ", channel=" + channel);
     }
-	
-	public TimeoutException() {
-		super();
-	}
-	
-	public TimeoutException(String message) {
-		super(message);
-	}
-
-	public TimeoutException(String message, Throwable cause) {
-		super(message, cause);
-	}
-
-	public TimeoutException(Throwable cause) {
-		super(cause);
-	}
 }
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java	(working copy)
@@ -39,8 +39,6 @@
 	private String errorMsg;
 	private Exception exception;
 	private int retries;
-	private long sendTime;
-	private long sendReqTime;
 
 	public Command() {
 		setId(NOID);
@@ -55,23 +53,10 @@
 		this.cb = cb;
 	}
 
-	public void waitForReply() throws TimeoutException {
+	public void waitForReply() throws InterruptedException {
 		synchronized (this) {
-			if (!this.isInDataReceived()) {
-				long left = getTimeout();
-				while (!this.isInDataReceived()) {
-					if (left <= 0) {
-						throw new TimeoutException();
-					}
-					try {
-						wait(left);
-					}
-					catch (InterruptedException e) {
-						e.printStackTrace();
-					}
-					left = sendTime == 0 ? 1000 : getTimeout()
-							- (System.currentTimeMillis() - sendTime);
-				}
+			while (!this.isInDataReceived()) {
+				wait(0);
 			}
 		}
 	}
@@ -139,8 +124,6 @@
 					channel.sendTaggedData(id, !i.hasNext(), buf, !i.hasNext() ? this : null);
 				}
 			}
-			sendReqTime = System.currentTimeMillis();
-			setLastTime(sendReqTime);
 		}
 		catch (ChannelIOException e) {
 			reexecute(e.getMessage(), e);
@@ -148,14 +131,11 @@
 	}
 
 	public void dataSent() {
-		sendTime = System.currentTimeMillis();
-		//when using the piped channels the reply will arrive before this method is called
-		setLastTime(sendTime);
 	}
 	
 	private static boolean shutdownMsg;
 
-	public byte[] execute(KarajanChannel channel) throws ProtocolException, IOException {
+	public byte[] execute(KarajanChannel channel) throws ProtocolException, IOException, InterruptedException {
 		send(channel);
 		waitForReply();
 		if (errorMsg != null) {
@@ -256,29 +236,7 @@
 			}
 		}
 	}
-	
-	public void handleTimeout() {
-		if (isInDataReceived()) {
-			return;
-		}
-		TimeoutException t = new TimeoutException(this, "Reply timeout");
-		logger.warn(t.getMessage());
-		errorReceived(t.getMessage(), t);
-		getChannel().unregisterCommand(this);
-	}
 
-	public long getSendReqTime() {
-		return sendReqTime;
-	}
-	
-	public long getSendTime() {
-	    return sendTime;
-	}
-	
-	protected void setSendReqTime(long sendReqTime) {
-		this.sendReqTime = sendReqTime;
-	}
-
 	public String toString() {
 		return "Command(tag: " + this.getId() + ", " + this.getOutCmd() + ")";
 	}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java	(revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java	(working copy)
@@ -11,8 +11,6 @@
 
 import org.apache.log4j.Logger;
 import org.globus.cog.karajan.workflow.service.ProtocolException;
-import org.globus.cog.karajan.workflow.service.TimeoutException;
-import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
 import org.globus.cog.karajan.workflow.service.handlers.HeartBeatHandler;
 
 
@@ -42,13 +40,4 @@
             logger.info(getChannel() + " up latency: " + (now - rst) + "ms, rtt: " + (now - start) + "ms");
         }
 	}
-
-	@Override
-	public void handleTimeout() {
-		if (logger.isInfoEnabled()) {
-			logger.info("Heartbeat timed out. Closing channel.");
-		}
-		ChannelManager.getManager().handleChannelException(getChannel(), new TimeoutException(this, "Heartbeat timed out"));
-		unregister();
-	}
 }



More information about the Swift-commit mailing list