[Swift-commit] Cog update
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Tue Apr 10 02:15:03 CDT 2012
------------------------------------------------------------------------
r3370 | hategan | 2012-04-10 02:14:54 -0500 (Tue, 10 Apr 2012) | 1 line
use NIO for the worker TCP connection
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java (revision 3369)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java (working copy)
@@ -10,23 +10,27 @@
package org.globus.cog.abstraction.coaster.service;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
import java.util.Map;
import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.ConnectionHandler;
-import org.globus.cog.karajan.workflow.service.GSSService;
import org.globus.cog.karajan.workflow.service.RequestManager;
import org.globus.cog.karajan.workflow.service.Service;
-import org.globus.cog.karajan.workflow.service.channels.AbstractKarajanChannel;
+import org.globus.cog.karajan.workflow.service.ServiceContext;
import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
import org.globus.cog.karajan.workflow.service.channels.ChannelException;
import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
import org.globus.cog.karajan.workflow.service.channels.TCPChannel;
-public class LocalTCPService extends GSSService implements Registering {
+public class LocalTCPService implements Registering, Service, Runnable {
public static final Logger logger = Logger.getLogger(LocalTCPService.class);
public static final int TCP_BUFSZ = 32768;
@@ -34,20 +38,32 @@
private RegistrationManager registrationManager;
private final TCPBufferManager buffMan;
- // private int idseq;
+ private ServerSocketChannel channel;
+ private int port;
+
+ private RequestManager requestManager;
+ private ServiceContext context = new ServiceContext(this);
+
+ private final URI contact;
+
+ private Thread serverThread;
- // private static final NumberFormat IDF = new DecimalFormat("000000");
-
public LocalTCPService(RequestManager rm) throws IOException {
- super(false, 0);
- setRequestManager(rm);
- buffMan = new TCPBufferManager();
+ this(rm, 0);
}
public LocalTCPService(RequestManager rm, int port) throws IOException {
- super(false, port);
setRequestManager(rm);
buffMan = new TCPBufferManager();
+ this.port = port;
+ URI lh = null;
+ try {
+ lh = new URI("localhost:" + port);
+ }
+ catch (URISyntaxException e) {
+ logger.error("Internal error", e);
+ }
+ this.contact = lh;
}
public String registrationReceived(String blockid, String url,
@@ -76,8 +92,46 @@
public void setRegistrationManager(RegistrationManager workerManager) {
this.registrationManager = workerManager;
}
+
+ protected void setRequestManager(RequestManager requestManager) {
+ this.requestManager = requestManager;
+ }
+
+ public ServiceContext getContext() {
+ return context;
+ }
+
+ public void start() {
+ try {
+ channel = ServerSocketChannel.open();
+ channel.configureBlocking(true);
+ channel.socket().bind(new InetSocketAddress(port));
+
+ if (serverThread == null) {
+ serverThread = new Thread(this);
+ serverThread.setDaemon(true);
+ serverThread.setName("Local TCP service");
+ serverThread.start();
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void run() {
+ while(true) {
+ try {
+ SocketChannel c = channel.accept();
+ handleConnection(c.socket());
+ }
+ catch (IOException e) {
+ logger.warn("Accept() failed", e);
+ return;
+ }
+ }
+ }
- @Override
protected void handleConnection(Socket socket) {
try {
buffMan.addSocket(socket);
@@ -88,13 +142,35 @@
e.printStackTrace();
}
try {
- new WorkerConnectionHandler(this, buffMan.wrap(socket), getRequestManager()).start();
+ new WorkerConnectionHandler(this, buffMan.wrap(socket), requestManager).start();
}
catch (Exception e) {
logger.warn("Could not start worker connection", e);
}
}
+ public URI getContact() {
+ return null;
+ }
+
+ public int getPort() {
+ if (port == 0) {
+ return channel.socket().getLocalPort();
+ }
+ else {
+ return port;
+ }
+ }
+
+ public boolean isRestricted() {
+ return false;
+ }
+
+ public void irrecoverableChannelError(KarajanChannel channel, Exception e) {
+ System.err.println("Irrecoverable channel exception: " + e.getMessage());
+ System.exit(2);
+ }
+
private static class WorkerConnectionHandler extends ConnectionHandler {
public WorkerConnectionHandler(Service service, Socket socket, RequestManager requestManager)
throws IOException {
@@ -116,5 +192,5 @@
// x > interval).
return false;
}
- }
+ }
}
More information about the Swift-commit
mailing list