[Swift-commit] cog r3553
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Wed Jan 16 01:30:07 CST 2013
------------------------------------------------------------------------
r3553 | hategan | 2013-01-16 01:28:22 -0600 (Wed, 16 Jan 2013) | 1 line
removed command timeouts and revert to channel timeouts
------------------------------------------------------------------------
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java (working copy)
@@ -33,10 +33,7 @@
public abstract class RequestReply {
public static final Logger logger = Logger.getLogger(RequestReply.class);
-
- public static final int DEFAULT_TIMEOUT = 120 * 1000;
- private int timeout = DEFAULT_TIMEOUT;
-
+
public static final int NOID = -1;
private int id;
private String outCmd;
@@ -45,10 +42,7 @@
private List<byte[]> inData;
private boolean inDataReceived;
private KarajanChannel channel;
- private long lastTime = Long.MAX_VALUE;
-
- // private static final byte[] NO_EXCEPTION = new byte[0];
-
+
protected String getInCmd() {
return inCmd;
}
@@ -152,7 +146,6 @@
public abstract void send(boolean err) throws ProtocolException;
protected void dataReceived(boolean fin, boolean error, byte[] data) throws ProtocolException {
- setLastTime(System.currentTimeMillis());
}
protected synchronized void addInData(boolean fin, boolean err, byte[] data) {
@@ -405,28 +398,7 @@
protected Object getInObject(int index) {
return deserialize(getInData(index));
}
-
- public long getLastTime() {
- return lastTime;
- }
-
- public void setLastTime(long lastTime) {
- this.lastTime = lastTime;
- }
- public int getTimeout() {
- return timeout;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public void handleTimeout() {
- logger.warn("Unhandled timeout", new Throwable());
- setLastTime(Long.MAX_VALUE);
- }
-
public void handleSignal(byte[] data) {
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java (working copy)
@@ -15,7 +15,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.events.EventBus;
@@ -42,8 +41,6 @@
private int cmdseq;
private TagTable<Command> activeSenders;
private TagTable<RequestHandler> activeReceivers;
- private TagTable<Long> ignoredRequests;
- private static Timer timer;
private ServiceContext serviceContext;
private int reconnectionAttempts;
private long lastHeartBeat;
@@ -224,7 +221,7 @@
public void notifyRegisteredCommandsAndHandlers(Exception e) {
if (logger.isInfoEnabled()) {
- logger.info("Notifying commands and handlers about exception");
+ logger.info("Notifying commands and handlers about exception", e);
}
notifyListeners(activeReceivers, e);
notifyListeners(activeSenders, e);
@@ -243,15 +240,6 @@
}
}
- public Timer getTimer() {
- synchronized (ChannelContext.class) {
- if (timer == null) {
- timer = new Timer();
- }
- return timer;
- }
- }
-
public void reexecute(final Command command) {
//TODO
}
@@ -334,37 +322,5 @@
}
}
}
-
- public synchronized void ignoreRequest(int tag, int timeout) {
- if (ignoredRequests == null) {
- ignoredRequests = new TagTable<Long>();
- }
- ignoredRequests.put(tag, System.currentTimeMillis() + timeout);
- }
-
- public synchronized boolean isIgnoredRequest(int tag) {
- if (ignoredRequests == null) {
- return false;
- }
- else {
- return ignoredRequests.containsKey(tag);
- }
- }
-
- public synchronized void removeOldIgnoredRequests() {
- if (ignoredRequests == null) {
- return;
- }
- long now = System.currentTimeMillis();
- for (Integer i : ignoredRequests.keys()) {
- if (ignoredRequests.get(i) < now) {
- ignoredRequests.remove(i);
- }
- }
- if (ignoredRequests.isEmpty()) {
- ignoredRequests = null;
- }
- }
-
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractPipedChannel.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractPipedChannel.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractPipedChannel.java (working copy)
@@ -46,6 +46,11 @@
// no heart beat for these
}
+ @Override
+ public void configureTimeoutChecks() {
+ // no timeouts either
+ }
+
public boolean isOffline() {
return false;
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java (working copy)
@@ -99,8 +99,6 @@
void flush() throws IOException;
SelectableChannel getNIOChannel();
-
- void ignoreRequest(RequestHandler h, int timeout);
boolean handleChannelException(Exception e);
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java (working copy)
@@ -10,7 +10,6 @@
package org.globus.cog.karajan.workflow.service.channels;
import java.io.IOException;
-import java.util.Timer;
import java.util.TimerTask;
import org.apache.log4j.Logger;
@@ -39,6 +38,11 @@
public synchronized void sendTaggedData(int tag, int flags, byte[] data, SendCallback cb) {
current.sendTaggedData(tag, flags, data, cb);
}
+
+ @Override
+ public void configureTimeoutChecks() {
+ // only for the actual channels
+ }
public void registerCommand(Command cmd) throws ProtocolException {
current.registerCommand(cmd);
@@ -112,13 +116,9 @@
ChannelManager.getManager().unregisterChannel(MetaChannel.this);
}
};
- getTimer().schedule(deactivator, (long) seconds * 1000);
+ Timer.schedule(deactivator, (long) seconds * 1000);
}
- public Timer getTimer() {
- return getChannelContext().getTimer();
- }
-
public synchronized void poll(final int seconds) {
if (poller != null) {
return;
@@ -152,7 +152,7 @@
}
};
long interval = (long) seconds * 1000;
- getTimer().schedule(poller, interval, interval);
+ Timer.schedule(poller, interval, interval);
}
public String toString() {
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/Timer.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/Timer.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/Timer.java (working copy)
@@ -21,4 +21,16 @@
}
}, interval, interval);
}
+
+ public static void every(final long interval, final TimerTask task) {
+ TIMER.scheduleAtFixedRate(task, interval, interval);
+ }
+
+ public static void schedule(TimerTask task, long delay, long period) {
+ TIMER.schedule(task, delay, period);
+ }
+
+ public static void schedule(TimerTask task, long delay) {
+ TIMER.schedule(task, delay);
+ }
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java (working copy)
@@ -15,7 +15,6 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
-import java.util.Collection;
import java.util.TimerTask;
import org.apache.log4j.Logger;
@@ -24,8 +23,8 @@
import org.globus.cog.karajan.workflow.service.RemoteConfiguration;
import org.globus.cog.karajan.workflow.service.RemoteConfiguration.Entry;
import org.globus.cog.karajan.workflow.service.RequestManager;
-import org.globus.cog.karajan.workflow.service.RequestReply;
import org.globus.cog.karajan.workflow.service.Service;
+import org.globus.cog.karajan.workflow.service.TimeoutException;
import org.globus.cog.karajan.workflow.service.UserContext;
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
@@ -38,7 +37,8 @@
public static final int DEFAULT_HBI_SPREAD = 10;
public static final int TIMEOUT_CHECK_INTERVAL = 1;
-
+ public static final int TIMEOUT = 120;
+
private ChannelContext context;
private volatile int usageCount, longTermUsageCount;
private RequestManager requestManager;
@@ -47,6 +47,7 @@
private String name;
private Service callbackService;
private final boolean client;
+ private long lastTime;
protected AbstractKarajanChannel(RequestManager requestManager, ChannelContext channelContext,
boolean client) {
@@ -58,6 +59,7 @@
this.client = client;
configureHeartBeat();
configureTimeoutChecks();
+ updateLastTime();
}
protected void configureHeartBeat() {
@@ -99,47 +101,45 @@
public void scheduleHeartbeats(int heartBeatInterval) {
TimerTask heartBeatTask;
heartBeatTask = new HeartBeatTask(this);
- context.getTimer().schedule(heartBeatTask,
- heartBeatInterval + (int) (Math.random() * DEFAULT_HBI_INITIAL_SPREAD * 1000),
- heartBeatInterval + (int) (Math.random() * DEFAULT_HBI_SPREAD * 1000));
+ Timer.schedule(heartBeatTask,
+ heartBeatInterval + (int) (Math.random() * DEFAULT_HBI_INITIAL_SPREAD * 1000),
+ heartBeatInterval + (int) (Math.random() * DEFAULT_HBI_SPREAD * 1000));
}
public void scheduleHeartbeatCheck(int heartBeatInterval) {
TimerTask heartBeatTask;
int mult = 2;
heartBeatTask = new HeartBeatCheckTask(this, heartBeatInterval, mult);
- context.getTimer().schedule(heartBeatTask, mult * heartBeatInterval,
- mult * heartBeatInterval);
+ Timer.every(mult * heartBeatInterval, heartBeatTask);
}
public void configureTimeoutChecks() {
- context.getTimer().schedule(new TimerTask() {
+ Timer.every(TIMEOUT_CHECK_INTERVAL * 1000, new TimerTask() {
public void run() {
checkTimeouts();
- }},
- TIMEOUT_CHECK_INTERVAL * 1000, TIMEOUT_CHECK_INTERVAL * 1000);
+ }}
+ );
}
protected void checkTimeouts() {
- checkTimeouts(context.getActiveCommands());
- checkTimeouts(context.getActiveHandlers());
- getChannelContext().removeOldIgnoredRequests();
+ long now = System.currentTimeMillis();
+ long lastTime = getLastTime();
+ if (now - lastTime > TIMEOUT * 1000) {
+ lastTime = Long.MAX_VALUE;
+ TimeoutException e = new TimeoutException(this, "Channel timed out", lastTime);
+ context.notifyRegisteredCommandsAndHandlers(e);
+ handleChannelException(e);
+ }
}
- private void checkTimeouts(Collection<? extends RequestReply> l) {
- long now = System.currentTimeMillis();
- for (RequestReply r : l) {
- if (now - r.getLastTime() > r.getTimeout()) {
- try {
- r.handleTimeout();
- }
- catch (Exception e) {
- logger.warn("Error handling timeout", e);
- }
- }
- }
+ protected synchronized void updateLastTime() {
+ lastTime = System.currentTimeMillis();
}
-
+
+ protected synchronized long getLastTime() {
+ return lastTime;
+ }
+
protected boolean clientControlsHeartbeats() {
return true;
}
@@ -386,6 +386,7 @@
flagIsSet(flags, FINAL_FLAG) + ", err = " + flagIsSet(flags, ERROR_FLAG)
+ ", datalen = " + len + ", data = " + ppByteBuf(data));
}
+ updateLastTime();
Command cmd = getChannelContext().getRegisteredCommand(tag);
if (cmd != null) {
try {
@@ -446,6 +447,7 @@
flagIsSet(flags, FINAL_FLAG) + ", err = " + flagIsSet(flags, ERROR_FLAG)
+ ", datalen = " + len + ", data = " + ppByteBuf(data));
}
+ updateLastTime();
RequestHandler handler = getChannelContext().getRegisteredHandler(tag);
boolean fin = finalFlagIsSet(flags);
boolean err = errorFlagIsSet(flags);
@@ -492,9 +494,7 @@
}
}
catch (NoSuchHandlerException e) {
- if (!getChannelContext().isIgnoredRequest(tag)) {
- logger.warn(getName() + "Could not handle request", e);
- }
+ logger.warn(getName() + "Could not handle request", e);
}
}
@@ -547,11 +547,6 @@
return null;
}
- @Override
- public void ignoreRequest(RequestHandler h, int timeout) {
- getChannelContext().ignoreRequest(h.getId(), timeout);
- }
-
public synchronized boolean handleChannelException(Exception e) {
logger.info("Channel config: " + getChannelContext().getConfiguration());
if (!ChannelManager.getManager().handleChannelException(this, e)) {
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java (working copy)
@@ -128,7 +128,7 @@
}
}
- public void execute(Command command) throws IOException, ProtocolException {
+ public void execute(Command command) throws IOException, ProtocolException, InterruptedException {
command.execute(channel);
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/RequestHandler.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/RequestHandler.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/RequestHandler.java (working copy)
@@ -15,7 +15,6 @@
import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.RequestReply;
-import org.globus.cog.karajan.workflow.service.TimeoutException;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
public abstract class RequestHandler extends RequestReply {
@@ -35,7 +34,6 @@
}
protected void sendReply() throws ProtocolException {
- setLastTime(System.currentTimeMillis());
send();
replySent = true;
}
@@ -118,14 +116,4 @@
public String toString() {
return "Handler(tag: " + getId() + ", " + getInCmd() + ")";
}
-
- public void handleTimeout() {
- if (isInDataReceived()) {
- return;
- }
- setLastTime(Long.MAX_VALUE);
- TimeoutException t = new TimeoutException(this, "Timed out receiving request");
- logger.info(t.getMessage());
- errorReceived(t.getMessage(), t);
- }
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/TimeoutException.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/TimeoutException.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/TimeoutException.java (working copy)
@@ -13,39 +13,18 @@
import java.text.SimpleDateFormat;
import java.util.Date;
-import org.globus.cog.karajan.workflow.service.commands.Command;
-import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
-public class TimeoutException extends ProtocolException {
- private static final long serialVersionUID = -6781619140427115780L;
-
+public class TimeoutException extends Exception {
public static final DateFormat DF = new SimpleDateFormat("yyMMdd-HHmmss.SSS");
- public TimeoutException(Command c, String msg) {
- super(c + " " + msg + ". sendReqTime="
- + DF.format(new Date(c.getSendReqTime())) + ", lastSendTime=" + DF.format(new Date(c.getSendTime()))
- + ", now=" + DF.format(new Date()) + ", channel=" + c.getChannel());
+ public TimeoutException(String msg) {
+ super(msg);
}
- public TimeoutException(RequestHandler h, String msg) {
- super(h + " " + msg + ". lastTime="
- + DF.format(new Date(h.getLastTime()))
- + ", now=" + DF.format(new Date()) + ", channel=" + h.getChannel());
+ public TimeoutException(KarajanChannel channel, String msg, long lastTime) {
+ super(msg + ". lastTime="
+ + DF.format(new Date(lastTime))
+ + ", now=" + DF.format(new Date()) + ", channel=" + channel);
}
-
- public TimeoutException() {
- super();
- }
-
- public TimeoutException(String message) {
- super(message);
- }
-
- public TimeoutException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public TimeoutException(Throwable cause) {
- super(cause);
- }
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java (working copy)
@@ -39,8 +39,6 @@
private String errorMsg;
private Exception exception;
private int retries;
- private long sendTime;
- private long sendReqTime;
public Command() {
setId(NOID);
@@ -55,23 +53,10 @@
this.cb = cb;
}
- public void waitForReply() throws TimeoutException {
+ public void waitForReply() throws InterruptedException {
synchronized (this) {
- if (!this.isInDataReceived()) {
- long left = getTimeout();
- while (!this.isInDataReceived()) {
- if (left <= 0) {
- throw new TimeoutException();
- }
- try {
- wait(left);
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- left = sendTime == 0 ? 1000 : getTimeout()
- - (System.currentTimeMillis() - sendTime);
- }
+ while (!this.isInDataReceived()) {
+ wait(0);
}
}
}
@@ -139,8 +124,6 @@
channel.sendTaggedData(id, !i.hasNext(), buf, !i.hasNext() ? this : null);
}
}
- sendReqTime = System.currentTimeMillis();
- setLastTime(sendReqTime);
}
catch (ChannelIOException e) {
reexecute(e.getMessage(), e);
@@ -148,14 +131,11 @@
}
public void dataSent() {
- sendTime = System.currentTimeMillis();
- //when using the piped channels the reply will arrive before this method is called
- setLastTime(sendTime);
}
private static boolean shutdownMsg;
- public byte[] execute(KarajanChannel channel) throws ProtocolException, IOException {
+ public byte[] execute(KarajanChannel channel) throws ProtocolException, IOException, InterruptedException {
send(channel);
waitForReply();
if (errorMsg != null) {
@@ -256,29 +236,7 @@
}
}
}
-
- public void handleTimeout() {
- if (isInDataReceived()) {
- return;
- }
- TimeoutException t = new TimeoutException(this, "Reply timeout");
- logger.warn(t.getMessage());
- errorReceived(t.getMessage(), t);
- getChannel().unregisterCommand(this);
- }
- public long getSendReqTime() {
- return sendReqTime;
- }
-
- public long getSendTime() {
- return sendTime;
- }
-
- protected void setSendReqTime(long sendReqTime) {
- this.sendReqTime = sendReqTime;
- }
-
public String toString() {
return "Command(tag: " + this.getId() + ", " + this.getOutCmd() + ")";
}
Index: modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java
===================================================================
--- modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java (revision 3552)
+++ modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java (working copy)
@@ -11,8 +11,6 @@
import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.ProtocolException;
-import org.globus.cog.karajan.workflow.service.TimeoutException;
-import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
import org.globus.cog.karajan.workflow.service.handlers.HeartBeatHandler;
@@ -42,13 +40,4 @@
logger.info(getChannel() + " up latency: " + (now - rst) + "ms, rtt: " + (now - start) + "ms");
}
}
-
- @Override
- public void handleTimeout() {
- if (logger.isInfoEnabled()) {
- logger.info("Heartbeat timed out. Closing channel.");
- }
- ChannelManager.getManager().handleChannelException(getChannel(), new TimeoutException(this, "Heartbeat timed out"));
- unregister();
- }
}
More information about the Swift-commit
mailing list