[Swift-devel] Fix to run multiple local coaster services within same Swift JVM?
Michael Wilde
wilde at mcs.anl.gov
Tue Nov 20 12:36:59 CST 2012
Mihael, does this commit address the problem I had running two local coaster services for two separate resource pools, within the same Swift JVM?
If so, I will re-test.
Thanks,
- <ike
----- Original Message -----
> From: swift at ci.uchicago.edu
> To: swift-commit at ci.uchicago.edu
> Sent: Sunday, November 18, 2012 5:05:28 PM
> Subject: [Swift-commit] cog r3515
> ------------------------------------------------------------------------
> r3515 | hategan | 2012-11-18 17:01:24 -0600 (Sun, 18 Nov 2012) | 1
> line
>
> better channel names in logs; ensure unique ids for worker channels if
> two or more services are on the same jvm
> ------------------------------------------------------------------------
> Index:
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
> ===================================================================
> ---
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
> (revision 3514)
> +++
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
> (working copy)
> @@ -48,7 +48,9 @@
> options = Collections.emptyMap();
> }
>
> - logger.debug("registering: " + id + " " + url);
> + if (logger.isDebugEnabled()) {
> + logger.debug("registering: " + id + " " + url);
> + }
>
> KarajanChannel channel = getChannel();
> ChannelContext context = channel.getChannelContext();
> Index:
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
> ===================================================================
> ---
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
> (revision 3514)
> +++
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
> (working copy)
> @@ -18,9 +18,7 @@
>
> import org.apache.log4j.Logger;
> import org.globus.cog.abstraction.coaster.service.Registering;
> -import org.globus.cog.abstraction.impl.common.AbstractionFactory;
> import org.globus.cog.abstraction.impl.common.execution.JobException;
> -import
> org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
> import
> org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
> import org.globus.cog.abstraction.interfaces.Service;
> import org.globus.cog.abstraction.interfaces.Status;
> @@ -71,7 +69,7 @@
> logger.info("Got connection");
> try {
> ConnectionHandler handler =
> - new ConnectionHandler(this, sock, LocalRequestManager.INSTANCE);
> + new ConnectionHandler("service-" + sock.getPort(), this, sock,
> LocalRequestManager.INSTANCE);
> logger.info("Initialized connection handler");
> handler.start();
> logger.info("Connection handler started");
> @@ -84,7 +82,7 @@
> public void handleConnection(InputStream is, OutputStream os) {
> try {
> ConnectionHandler handler =
> - new ConnectionHandler(this, is, os, LocalRequestManager.INSTANCE);
> + new ConnectionHandler("service-pipe", this, is, os,
> LocalRequestManager.INSTANCE);
> handler.start();
> }
> catch (Exception e) {
> @@ -93,7 +91,7 @@
> }
>
> public PipedServerChannel newPipedServerChannel() {
> - return new PipedServerChannel(LocalRequestManager.INSTANCE, new
> ChannelContext(this));
> + return new PipedServerChannel(LocalRequestManager.INSTANCE, new
> ChannelContext("spipe", this));
> }
>
> public String waitForRegistration(Task t, String id) throws
> InterruptedException,
> Index:
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
> ===================================================================
> ---
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
> (revision 3514)
> +++
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
> (working copy)
> @@ -128,7 +128,7 @@
> }
> else {
> try {
> - ConnectionHandler handler = new ConnectionHandler(this, sock,
> + ConnectionHandler handler = new ConnectionHandler("cps-" +
> sock.getPort(), this, sock,
> COASTER_REQUEST_MANAGER);
> handler.start();
> }
> @@ -194,7 +194,7 @@
> PipedServerChannel psc =
> ServiceManager.getDefault().getLocalService().newPipedServerChannel();
> PipedClientChannel pcc =
> - new PipedClientChannel(COASTER_REQUEST_MANAGER, new
> ChannelContext(), psc);
> + new PipedClientChannel(COASTER_REQUEST_MANAGER, new
> ChannelContext("cpipe"), psc);
> psc.setClientChannel(pcc);
> ChannelManager.getManager().registerChannel(pcc.getChannelContext().getChannelID(),
> pcc);
> ChannelManager.getManager().registerChannel(psc.getChannelContext().getChannelID(),
> psc);
> Index:
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
> ===================================================================
> ---
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
> (revision 3514)
> +++
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
> (working copy)
> @@ -102,10 +102,13 @@
> channel = ServerSocketChannel.open();
> channel.configureBlocking(true);
> if(port == 0) {
> - PortRange portRange = PortRange.getTcpInstance();
> - if(portRange != null && portRange.isEnabled()) {
> - port = portRange.getFreePort(port);
> - }
> + PortRange portRange = PortRange.getTcpInstance();
> + if(portRange != null && portRange.isEnabled()) {
> + synchronized(portRange) {
> + port = portRange.getFreePort(port);
> + portRange.setUsed(port);
> + }
> + }
> }
> channel.socket().bind(new InetSocketAddress(port));
>
> @@ -189,12 +192,18 @@
> System.err.println("Irrecoverable channel exception: " +
> e.getMessage());
> System.exit(2);
> }
> +
> + private static int idSeq = 1;
> +
> + private synchronized static int nextId() {
> + return idSeq++;
> + }
>
> private static class WorkerConnectionHandler extends ConnectionHandler
> {
> public WorkerConnectionHandler(Service service, Socket socket,
> RequestManager requestManager)
> throws IOException {
> super(socket, new WorkerChannel(socket, requestManager,
> - new ChannelContext(service)), requestManager);
> + new ChannelContext("worker-" + nextId(), service)), requestManager);
> }
> }
>
> Index:
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java
> ===================================================================
> ---
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java
> (revision 3514)
> +++
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java
> (working copy)
> @@ -85,6 +85,7 @@
> }
> try {
> KarajanChannel channel = getChannel();
> + channel.setLocalShutdown();
> ChannelManager.getManager().reserveLongTerm(channel);
> ShutdownCommand cmd = new ShutdownCommand();
> cmd.executeAsync(channel, this);
> Index:
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
> ===================================================================
> ---
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
> (revision 3514)
> +++
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
> (working copy)
> @@ -98,6 +98,12 @@
> */
> private static final NumberFormat SECONDS_3 =
> new DecimalFormat("0.000");
> +
> + private static int sid;
> +
> + private synchronized static int nextSid() {
> + return sid++;
> + }
>
> public BlockQueueProcessor(Settings settings) {
> super("Block Queue Processor");
> @@ -105,7 +111,7 @@
> holding = new SortedJobSet();
> blocks = new TreeMap<String, Block>();
> tl = new HashMap<Integer, List<Job>>();
> - id = DDF.format(new Date());
> + id = DDF.format(new Date()) + nextSid();
> incoming = new ArrayList<Job>();
> metric = new OverallocatedJobDurationMetric(settings);
> queued = new SortedJobSet(metric);
> Index:
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
> ===================================================================
> ---
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
> (revision 3514)
> +++
> modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
> (working copy)
> @@ -59,6 +59,10 @@
> private long lastUsed;
>
> private static int sid;
> +
> + private synchronized static int nextSID() {
> + return sid++;
> + }
>
> private static final NumberFormat IDF = new DecimalFormat("000000");
>
> @@ -72,7 +76,7 @@
> }
>
> public Block(int workers, TimeInterval walltime, BlockQueueProcessor
> ap) {
> - this(ap.getBQPId() + "-" + IDF.format(sid++), workers, walltime,
> ap);
> + this(ap.getBQPId() + "-" + IDF.format(nextSID()), workers, walltime,
> ap);
> }
>
> public Block(String id, int workers, TimeInterval walltime,
> BlockQueueProcessor ap) {
> _______________________________________________
> Swift-commit mailing list
> Swift-commit at ci.uchicago.edu
> https://lists.ci.uchicago.edu/cgi-bin/mailman/listinfo/swift-commit
--
Michael Wilde
Computation Institute, University of Chicago
Mathematics and Computer Science Division
Argonne National Laboratory
More information about the Swift-devel
mailing list