[Swift-devel] Fix to run multiple local coaster services within same Swift JVM?

Mihael Hategan hategan at mcs.anl.gov
Tue Nov 20 13:41:58 CST 2012


Yes. I updated the bug with the relevant info. See
https://bugzilla.mcs.anl.gov/swift/show_bug.cgi?id=869

Mihael

On Tue, 2012-11-20 at 12:36 -0600, Michael Wilde wrote:
> Mihael, does this commit address the problem I had running two local coaster services for two separate resource pools, within the same Swift JVM?
> 
> If so, I will re-test.
> 
> Thanks,
> 
> - <ike
> 
> ----- Original Message -----
> > From: swift at ci.uchicago.edu
> > To: swift-commit at ci.uchicago.edu
> > Sent: Sunday, November 18, 2012 5:05:28 PM
> > Subject: [Swift-commit] cog r3515
> > ------------------------------------------------------------------------
> > r3515 | hategan | 2012-11-18 17:01:24 -0600 (Sun, 18 Nov 2012) | 1
> > line
> > 
> > better channel names in logs; ensure unique ids for worker channels if
> > two or more services are on the same jvm
> > ------------------------------------------------------------------------
> > Index:
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
> > ===================================================================
> > ---
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
> > (revision 3514)
> > +++
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
> > (working copy)
> > @@ -48,7 +48,9 @@
> > options = Collections.emptyMap();
> > }
> > 
> > - logger.debug("registering: " + id + " " + url);
> > + if (logger.isDebugEnabled()) {
> > + logger.debug("registering: " + id + " " + url);
> > + }
> > 
> > KarajanChannel channel = getChannel();
> > ChannelContext context = channel.getChannelContext();
> > Index:
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
> > ===================================================================
> > ---
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
> > (revision 3514)
> > +++
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
> > (working copy)
> > @@ -18,9 +18,7 @@
> > 
> > import org.apache.log4j.Logger;
> > import org.globus.cog.abstraction.coaster.service.Registering;
> > -import org.globus.cog.abstraction.impl.common.AbstractionFactory;
> > import org.globus.cog.abstraction.impl.common.execution.JobException;
> > -import
> > org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
> > import
> > org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
> > import org.globus.cog.abstraction.interfaces.Service;
> > import org.globus.cog.abstraction.interfaces.Status;
> > @@ -71,7 +69,7 @@
> > logger.info("Got connection");
> > try {
> > ConnectionHandler handler =
> > - new ConnectionHandler(this, sock, LocalRequestManager.INSTANCE);
> > + new ConnectionHandler("service-" + sock.getPort(), this, sock,
> > LocalRequestManager.INSTANCE);
> > logger.info("Initialized connection handler");
> > handler.start();
> > logger.info("Connection handler started");
> > @@ -84,7 +82,7 @@
> > public void handleConnection(InputStream is, OutputStream os) {
> > try {
> > ConnectionHandler handler =
> > - new ConnectionHandler(this, is, os, LocalRequestManager.INSTANCE);
> > + new ConnectionHandler("service-pipe", this, is, os,
> > LocalRequestManager.INSTANCE);
> > handler.start();
> > }
> > catch (Exception e) {
> > @@ -93,7 +91,7 @@
> > }
> > 
> > public PipedServerChannel newPipedServerChannel() {
> > - return new PipedServerChannel(LocalRequestManager.INSTANCE, new
> > ChannelContext(this));
> > + return new PipedServerChannel(LocalRequestManager.INSTANCE, new
> > ChannelContext("spipe", this));
> > }
> > 
> > public String waitForRegistration(Task t, String id) throws
> > InterruptedException,
> > Index:
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
> > ===================================================================
> > ---
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
> > (revision 3514)
> > +++
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
> > (working copy)
> > @@ -128,7 +128,7 @@
> > }
> > else {
> > try {
> > - ConnectionHandler handler = new ConnectionHandler(this, sock,
> > + ConnectionHandler handler = new ConnectionHandler("cps-" +
> > sock.getPort(), this, sock,
> > COASTER_REQUEST_MANAGER);
> > handler.start();
> > }
> > @@ -194,7 +194,7 @@
> > PipedServerChannel psc =
> > ServiceManager.getDefault().getLocalService().newPipedServerChannel();
> > PipedClientChannel pcc =
> > - new PipedClientChannel(COASTER_REQUEST_MANAGER, new
> > ChannelContext(), psc);
> > + new PipedClientChannel(COASTER_REQUEST_MANAGER, new
> > ChannelContext("cpipe"), psc);
> > psc.setClientChannel(pcc);
> > ChannelManager.getManager().registerChannel(pcc.getChannelContext().getChannelID(),
> > pcc);
> > ChannelManager.getManager().registerChannel(psc.getChannelContext().getChannelID(),
> > psc);
> > Index:
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
> > ===================================================================
> > ---
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
> > (revision 3514)
> > +++
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
> > (working copy)
> > @@ -102,10 +102,13 @@
> > channel = ServerSocketChannel.open();
> > channel.configureBlocking(true);
> > if(port == 0) {
> > - PortRange portRange = PortRange.getTcpInstance();
> > - if(portRange != null && portRange.isEnabled()) {
> > - port = portRange.getFreePort(port);
> > - }
> > + PortRange portRange = PortRange.getTcpInstance();
> > + if(portRange != null && portRange.isEnabled()) {
> > + synchronized(portRange) {
> > + port = portRange.getFreePort(port);
> > + portRange.setUsed(port);
> > + }
> > + }
> > }
> > channel.socket().bind(new InetSocketAddress(port));
> > 
> > @@ -189,12 +192,18 @@
> > System.err.println("Irrecoverable channel exception: " +
> > e.getMessage());
> > System.exit(2);
> > }
> > +
> > + private static int idSeq = 1;
> > +
> > + private synchronized static int nextId() {
> > + return idSeq++;
> > + }
> > 
> > private static class WorkerConnectionHandler extends ConnectionHandler
> > {
> > public WorkerConnectionHandler(Service service, Socket socket,
> > RequestManager requestManager)
> > throws IOException {
> > super(socket, new WorkerChannel(socket, requestManager,
> > - new ChannelContext(service)), requestManager);
> > + new ChannelContext("worker-" + nextId(), service)), requestManager);
> > }
> > }
> > 
> > Index:
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java
> > ===================================================================
> > ---
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java
> > (revision 3514)
> > +++
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Node.java
> > (working copy)
> > @@ -85,6 +85,7 @@
> > }
> > try {
> > KarajanChannel channel = getChannel();
> > + channel.setLocalShutdown();
> > ChannelManager.getManager().reserveLongTerm(channel);
> > ShutdownCommand cmd = new ShutdownCommand();
> > cmd.executeAsync(channel, this);
> > Index:
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
> > ===================================================================
> > ---
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
> > (revision 3514)
> > +++
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/BlockQueueProcessor.java
> > (working copy)
> > @@ -98,6 +98,12 @@
> > */
> > private static final NumberFormat SECONDS_3 =
> > new DecimalFormat("0.000");
> > +
> > + private static int sid;
> > +
> > + private synchronized static int nextSid() {
> > + return sid++;
> > + }
> > 
> > public BlockQueueProcessor(Settings settings) {
> > super("Block Queue Processor");
> > @@ -105,7 +111,7 @@
> > holding = new SortedJobSet();
> > blocks = new TreeMap<String, Block>();
> > tl = new HashMap<Integer, List<Job>>();
> > - id = DDF.format(new Date());
> > + id = DDF.format(new Date()) + nextSid();
> > incoming = new ArrayList<Job>();
> > metric = new OverallocatedJobDurationMetric(settings);
> > queued = new SortedJobSet(metric);
> > Index:
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
> > ===================================================================
> > ---
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
> > (revision 3514)
> > +++
> > modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
> > (working copy)
> > @@ -59,6 +59,10 @@
> > private long lastUsed;
> > 
> > private static int sid;
> > +
> > + private synchronized static int nextSID() {
> > + return sid++;
> > + }
> > 
> > private static final NumberFormat IDF = new DecimalFormat("000000");
> > 
> > @@ -72,7 +76,7 @@
> > }
> > 
> > public Block(int workers, TimeInterval walltime, BlockQueueProcessor
> > ap) {
> > - this(ap.getBQPId() + "-" + IDF.format(sid++), workers, walltime,
> > ap);
> > + this(ap.getBQPId() + "-" + IDF.format(nextSID()), workers, walltime,
> > ap);
> > }
> > 
> > public Block(String id, int workers, TimeInterval walltime,
> > BlockQueueProcessor ap) {
> > _______________________________________________
> > Swift-commit mailing list
> > Swift-commit at ci.uchicago.edu
> > https://lists.ci.uchicago.edu/cgi-bin/mailman/listinfo/swift-commit
> 





More information about the Swift-devel mailing list