[Swift-commit] Cog update

swift at ci.uchicago.edu swift at ci.uchicago.edu
Tue Apr 10 02:15:03 CDT 2012


------------------------------------------------------------------------
r3370 | hategan | 2012-04-10 02:14:54 -0500 (Tue, 10 Apr 2012) | 1 line

use NIO for the worker TCP connection
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java	(revision 3369)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java	(working copy)
@@ -10,23 +10,27 @@
 package org.globus.cog.abstraction.coaster.service;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.globus.cog.karajan.workflow.service.ConnectionHandler;
-import org.globus.cog.karajan.workflow.service.GSSService;
 import org.globus.cog.karajan.workflow.service.RequestManager;
 import org.globus.cog.karajan.workflow.service.Service;
-import org.globus.cog.karajan.workflow.service.channels.AbstractKarajanChannel;
+import org.globus.cog.karajan.workflow.service.ServiceContext;
 import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
 import org.globus.cog.karajan.workflow.service.channels.ChannelException;
 import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
 import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
 import org.globus.cog.karajan.workflow.service.channels.TCPChannel;
 
-public class LocalTCPService extends GSSService implements Registering {
+public class LocalTCPService implements Registering, Service, Runnable {
     public static final Logger logger = Logger.getLogger(LocalTCPService.class);
 
     public static final int TCP_BUFSZ = 32768;
@@ -34,20 +38,32 @@
     private RegistrationManager registrationManager;
     private final TCPBufferManager buffMan;
 
-    // private int idseq;
+    private ServerSocketChannel channel;
+    private int port;
+    
+    private RequestManager requestManager;
+    private ServiceContext context = new ServiceContext(this);
+    
+    private final URI contact;
+    
+    private Thread serverThread;
 
-    // private static final NumberFormat IDF = new DecimalFormat("000000");
-
     public LocalTCPService(RequestManager rm) throws IOException {
-        super(false, 0);
-        setRequestManager(rm);
-        buffMan = new TCPBufferManager();
+        this(rm, 0);
     }
 
     public LocalTCPService(RequestManager rm, int port) throws IOException {
-        super(false, port);
         setRequestManager(rm);
         buffMan = new TCPBufferManager();
+        this.port = port;
+        URI lh = null;
+        try {
+            lh = new URI("localhost:" + port);
+        }
+        catch (URISyntaxException e) {
+            logger.error("Internal error", e);
+        }
+        this.contact = lh;
     }
 
     public String registrationReceived(String blockid, String url, 
@@ -76,8 +92,46 @@
     public void setRegistrationManager(RegistrationManager workerManager) {
         this.registrationManager = workerManager;
     }
+    
+    protected void setRequestManager(RequestManager requestManager) {
+        this.requestManager = requestManager;
+    }
+    
+    public ServiceContext getContext() {
+        return context;
+    }
+    
+    public void start() {
+        try {
+            channel = ServerSocketChannel.open();
+            channel.configureBlocking(true);
+            channel.socket().bind(new InetSocketAddress(port));
+            
+            if (serverThread == null) {
+                serverThread = new Thread(this);
+                serverThread.setDaemon(true);
+                serverThread.setName("Local TCP service");
+                serverThread.start();
+            }
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public void run() {
+        while(true) {
+            try {
+                SocketChannel c = channel.accept();
+                handleConnection(c.socket());
+            }
+            catch (IOException e) {
+                logger.warn("Accept() failed", e);
+                return;
+            }
+        }
+    }
 
-    @Override
     protected void handleConnection(Socket socket) {
         try {
             buffMan.addSocket(socket);
@@ -88,13 +142,35 @@
             e.printStackTrace();
         }
         try {
-            new WorkerConnectionHandler(this, buffMan.wrap(socket), getRequestManager()).start();
+            new WorkerConnectionHandler(this, buffMan.wrap(socket), requestManager).start();
         }
         catch (Exception e) {
             logger.warn("Could not start worker connection", e);
         }
     }
     
+    public URI getContact() {
+        return null;
+    }
+    
+    public int getPort() {
+        if (port == 0) {
+            return channel.socket().getLocalPort();
+        }
+        else {
+            return port;
+        }
+    }
+
+    public boolean isRestricted() {
+        return false;
+    }
+
+    public void irrecoverableChannelError(KarajanChannel channel, Exception e) {
+        System.err.println("Irrecoverable channel exception: " + e.getMessage());
+        System.exit(2);
+    }
+
     private static class WorkerConnectionHandler extends ConnectionHandler {
         public WorkerConnectionHandler(Service service, Socket socket, RequestManager requestManager)
                 throws IOException {
@@ -116,5 +192,5 @@
             // x > interval).
             return false;
         }
-    }
+    }    
 }



More information about the Swift-commit mailing list