[Swift-commit] cog r3515
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Sun Nov 18 17:05:28 CST 2012
------------------------------------------------------------------------
r3515 | hategan | 2012-11-18 17:01:24 -0600 (Sun, 18 Nov 2012) | 1 line
better channel names in logs; ensure unique ids for worker channels if two or more services are on the same jvm
------------------------------------------------------------------------
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java (revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java (working copy)
@@ -48,7 +48,9 @@
options = Collections.emptyMap();
}
- logger.debug("registering: " + id + " " + url);
+ if (logger.isDebugEnabled()) {
+ logger.debug("registering: " + id + " " + url);
+ }
KarajanChannel channel = getChannel();
ChannelContext context = channel.getChannelContext();
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java (revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java (working copy)
@@ -18,9 +18,7 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.coaster.service.Registering;
-import org.globus.cog.abstraction.impl.common.AbstractionFactory;
import org.globus.cog.abstraction.impl.common.execution.JobException;
-import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
import org.globus.cog.abstraction.interfaces.Service;
import org.globus.cog.abstraction.interfaces.Status;
@@ -71,7 +69,7 @@
logger.info("Got connection");
try {
ConnectionHandler handler =
- new ConnectionHandler(this, sock, LocalRequestManager.INSTANCE);
+ new ConnectionHandler("service-" + sock.getPort(), this, sock, LocalRequestManager.INSTANCE);
logger.info("Initialized connection handler");
handler.start();
logger.info("Connection handler started");
@@ -84,7 +82,7 @@
public void handleConnection(InputStream is, OutputStream os) {
try {
ConnectionHandler handler =
- new ConnectionHandler(this, is, os, LocalRequestManager.INSTANCE);
+ new ConnectionHandler("service-pipe", this, is, os, LocalRequestManager.INSTANCE);
handler.start();
}
catch (Exception e) {
@@ -93,7 +91,7 @@
}
public PipedServerChannel newPipedServerChannel() {
- return new PipedServerChannel(LocalRequestManager.INSTANCE, new ChannelContext(this));
+ return new PipedServerChannel(LocalRequestManager.INSTANCE, new ChannelContext("spipe", this));
}
public String waitForRegistration(Task t, String id) throws InterruptedException,
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java (revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java (working copy)
@@ -128,7 +128,7 @@
}
else {
try {
- ConnectionHandler handler = new ConnectionHandler(this, sock,
+ ConnectionHandler handler = new ConnectionHandler("cps-" + sock.getPort(), this, sock,
COASTER_REQUEST_MANAGER);
handler.start();
}
@@ -194,7 +194,7 @@
PipedServerChannel psc =
ServiceManager.getDefault().getLocalService().newPipedServerChannel();
PipedClientChannel pcc =
- new PipedClientChannel(COASTER_REQUEST_MANAGER, new ChannelContext(), psc);
+ new PipedClientChannel(COASTER_REQUEST_MANAGER, new ChannelContext("cpipe"), psc);
psc.setClientChannel(pcc);
ChannelManager.getManager().registerChannel(pcc.getChannelContext().getChannelID(), pcc);
ChannelManager.getManager().registerChannel(psc.getChannelContext().getChannelID(), psc);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java (revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java (working copy)
@@ -102,10 +102,13 @@
channel = ServerSocketChannel.open();
channel.configureBlocking(true);
if(port == 0) {
- PortRange portRange = PortRange.getTcpInstance();
- if(portRange != null && portRange.isEnabled()) {
- port = portRange.getFreePort(port);
- }
+ PortRange portRange = PortRange.getTcpInstance();
+ if(portRange != null && portRange.isEnabled()) {
+ synchronized(portRange) {
+ port = portRange.getFreePort(port);
+ portRange.setUsed(port);
+ }
+ }
}
channel.socket().bind(new InetSocketAddress(port));
@@ -189,12 +192,18 @@
System.err.println("Irrecoverable channel exception: " + e.getMessage());
System.exit(2);
}
+
+ private static int idSeq = 1;
+
+ private synchronized static int nextId() {
+ return idSeq++;
+ }
private static class WorkerConnectionHandler extends ConnectionHandler {
public WorkerConnectionHandler(Service service, Socket socket, RequestManager requestManager)
throws IOException {
super(socket, new WorkerChannel(socket, requestManager,
- new ChannelContext(service)), requestManager);
+ new ChannelContext("worker-" + nextId(), service)), requestManager);
}
}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java (revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java (working copy)
@@ -85,6 +85,7 @@
}
try {
KarajanChannel channel = getChannel();
+ channel.setLocalShutdown();
ChannelManager.getManager().reserveLongTerm(channel);
ShutdownCommand cmd = new ShutdownCommand();
cmd.executeAsync(channel, this);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java (revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java (working copy)
@@ -98,6 +98,12 @@
*/
private static final NumberFormat SECONDS_3 =
new DecimalFormat("0.000");
+
+ private static int sid;
+
+ private synchronized static int nextSid() {
+ return sid++;
+ }
public BlockQueueProcessor(Settings settings) {
super("Block Queue Processor");
@@ -105,7 +111,7 @@
holding = new SortedJobSet();
blocks = new TreeMap<String, Block>();
tl = new HashMap<Integer, List<Job>>();
- id = DDF.format(new Date());
+ id = DDF.format(new Date()) + nextSid();
incoming = new ArrayList<Job>();
metric = new OverallocatedJobDurationMetric(settings);
queued = new SortedJobSet(metric);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java (revision 3514)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java (working copy)
@@ -59,6 +59,10 @@
private long lastUsed;
private static int sid;
+
+ private synchronized static int nextSID() {
+ return sid++;
+ }
private static final NumberFormat IDF = new DecimalFormat("000000");
@@ -72,7 +76,7 @@
}
public Block(int workers, TimeInterval walltime, BlockQueueProcessor ap) {
- this(ap.getBQPId() + "-" + IDF.format(sid++), workers, walltime, ap);
+ this(ap.getBQPId() + "-" + IDF.format(nextSID()), workers, walltime, ap);
}
public Block(String id, int workers, TimeInterval walltime, BlockQueueProcessor ap) {
More information about the Swift-commit
mailing list