[Swift-commit] cog r3515

swift at ci.uchicago.edu swift at ci.uchicago.edu
Sun Nov 18 17:05:28 CST 2012


------------------------------------------------------------------------
r3515 | hategan | 2012-11-18 17:01:24 -0600 (Sun, 18 Nov 2012) | 1 line

better channel names in logs; ensure unique ids for worker channels if two or more services are on the same jvm
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java	(revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java	(working copy)
@@ -48,7 +48,9 @@
         	options = Collections.emptyMap();
         }
 
-        logger.debug("registering: " + id + " " + url);
+        if (logger.isDebugEnabled()) {
+            logger.debug("registering: " + id + " " + url);
+        }
 
         KarajanChannel channel = getChannel();
         ChannelContext context = channel.getChannelContext();
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java	(revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java	(working copy)
@@ -18,9 +18,7 @@
 
 import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.coaster.service.Registering;
-import org.globus.cog.abstraction.impl.common.AbstractionFactory;
 import org.globus.cog.abstraction.impl.common.execution.JobException;
-import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
 import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
 import org.globus.cog.abstraction.interfaces.Service;
 import org.globus.cog.abstraction.interfaces.Status;
@@ -71,7 +69,7 @@
         logger.info("Got connection");
         try {
             ConnectionHandler handler =
-                    new ConnectionHandler(this, sock, LocalRequestManager.INSTANCE);
+                    new ConnectionHandler("service-" + sock.getPort(), this, sock, LocalRequestManager.INSTANCE);
             logger.info("Initialized connection handler");
             handler.start();
             logger.info("Connection handler started");
@@ -84,7 +82,7 @@
     public void handleConnection(InputStream is, OutputStream os) {
         try {
             ConnectionHandler handler =
-                    new ConnectionHandler(this, is, os, LocalRequestManager.INSTANCE);
+                    new ConnectionHandler("service-pipe", this, is, os, LocalRequestManager.INSTANCE);
             handler.start();
         }
         catch (Exception e) {
@@ -93,7 +91,7 @@
     }
     
     public PipedServerChannel newPipedServerChannel() {
-        return new PipedServerChannel(LocalRequestManager.INSTANCE, new ChannelContext(this));
+        return new PipedServerChannel(LocalRequestManager.INSTANCE, new ChannelContext("spipe", this));
     }
 
     public String waitForRegistration(Task t, String id) throws InterruptedException,
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java	(revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java	(working copy)
@@ -128,7 +128,7 @@
         }
         else {
             try {
-                ConnectionHandler handler = new ConnectionHandler(this, sock,
+                ConnectionHandler handler = new ConnectionHandler("cps-" + sock.getPort(), this, sock,
                         COASTER_REQUEST_MANAGER);
                 handler.start();
             }
@@ -194,7 +194,7 @@
         PipedServerChannel psc =
                 ServiceManager.getDefault().getLocalService().newPipedServerChannel();
         PipedClientChannel pcc =
-                new PipedClientChannel(COASTER_REQUEST_MANAGER, new ChannelContext(), psc);
+                new PipedClientChannel(COASTER_REQUEST_MANAGER, new ChannelContext("cpipe"), psc);
         psc.setClientChannel(pcc);
         ChannelManager.getManager().registerChannel(pcc.getChannelContext().getChannelID(), pcc);
         ChannelManager.getManager().registerChannel(psc.getChannelContext().getChannelID(), psc);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java	(revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java	(working copy)
@@ -102,10 +102,13 @@
             channel = ServerSocketChannel.open();
             channel.configureBlocking(true);
             if(port == 0) {
-              PortRange portRange = PortRange.getTcpInstance();
-              if(portRange != null && portRange.isEnabled()) {
-                  port = portRange.getFreePort(port);
-              }
+                PortRange portRange = PortRange.getTcpInstance();
+                if(portRange != null && portRange.isEnabled()) {
+                    synchronized(portRange) {
+                        port = portRange.getFreePort(port);
+                        portRange.setUsed(port);
+                    }
+                }
             }
             channel.socket().bind(new InetSocketAddress(port));
             
@@ -189,12 +192,18 @@
         System.err.println("Irrecoverable channel exception: " + e.getMessage());
         System.exit(2);
     }
+    
+    private static int idSeq = 1;
+    
+    private synchronized static int nextId() {
+        return idSeq++;
+    }
 
     private static class WorkerConnectionHandler extends ConnectionHandler {
         public WorkerConnectionHandler(Service service, Socket socket, RequestManager requestManager)
                 throws IOException {
             super(socket, new WorkerChannel(socket, requestManager, 
-                new ChannelContext(service)), requestManager);
+                new ChannelContext("worker-" + nextId(), service)), requestManager);
         }
     }
     
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java	(revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java	(working copy)
@@ -85,6 +85,7 @@
         }
         try {
             KarajanChannel channel = getChannel();
+            channel.setLocalShutdown();
             ChannelManager.getManager().reserveLongTerm(channel);
             ShutdownCommand cmd = new ShutdownCommand();
             cmd.executeAsync(channel, this);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java	(revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java	(working copy)
@@ -98,6 +98,12 @@
     */
     private static final NumberFormat SECONDS_3 =
     	new DecimalFormat("0.000");
+    
+    private static int sid;
+    
+    private synchronized static int nextSid() {
+    	return sid++;
+    }
 
     public BlockQueueProcessor(Settings settings) {
         super("Block Queue Processor");
@@ -105,7 +111,7 @@
         holding = new SortedJobSet();
         blocks = new TreeMap<String, Block>();
         tl = new HashMap<Integer, List<Job>>();
-        id = DDF.format(new Date());
+        id = DDF.format(new Date()) + nextSid();
         incoming = new ArrayList<Job>();
         metric = new OverallocatedJobDurationMetric(settings);
         queued = new SortedJobSet(metric);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java	(revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java	(working copy)
@@ -59,6 +59,10 @@
     private long lastUsed;
 
     private static int sid;
+    
+    private synchronized static int nextSID() {
+    	return sid++;
+    }
 
     private static final NumberFormat IDF = new DecimalFormat("000000");
     
@@ -72,7 +76,7 @@
     }
 
     public Block(int workers, TimeInterval walltime, BlockQueueProcessor ap) {
-        this(ap.getBQPId() + "-" + IDF.format(sid++), workers, walltime, ap);
+        this(ap.getBQPId() + "-" + IDF.format(nextSID()), workers, walltime, ap);
     }
 
     public Block(String id, int workers, TimeInterval walltime, BlockQueueProcessor ap) {



More information about the Swift-commit mailing list