[Swift-commit] cog r3843
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Mon Nov 25 17:05:03 CST 2013
------------------------------------------------------------------------
r3843 | hategan | 2013-11-25 17:02:59 -0600 (Mon, 25 Nov 2013) | 1 line
added a rudimentary client and the info command
------------------------------------------------------------------------
Index: modules/provider-coaster/launchers.xml
===================================================================
--- modules/provider-coaster/launchers.xml (revision 3842)
+++ modules/provider-coaster/launchers.xml (working copy)
@@ -2,9 +2,12 @@
<target name="create">
<ant antfile="${main.buildfile}" target="launcher">
<property name="launcher-name" value="coaster-service"/>
- <property name="class-name"
- value="org.globus.cog.abstraction.coaster.service.CoasterPersistentService"/>
+ <property name="class-name" value="org.globus.cog.abstraction.coaster.service.CoasterPersistentService"/>
</ant>
+ <ant antfile="${main.buildfile}" target="launcher">
+ <property name="launcher-name" value="coaster-client"/>
+ <property name="class-name" value="org.globus.cog.abstraction.coaster.client.CoasterClient"/>
+ </ant>
</target>
<target name="webstart">
</target>
Index: modules/provider-coaster/src/org/globus/cog/coaster/handlers/InfoHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/coaster/handlers/InfoHandler.java (revision 0)
+++ modules/provider-coaster/src/org/globus/cog/coaster/handlers/InfoHandler.java (revision 3843)
@@ -0,0 +1,142 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2005
+ */
+package org.globus.cog.coaster.handlers;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Map;
+
+import org.globus.cog.abstraction.coaster.service.CoasterService;
+import org.globus.cog.abstraction.coaster.service.job.manager.Block;
+import org.globus.cog.abstraction.coaster.service.job.manager.BlockQueueProcessor;
+import org.globus.cog.abstraction.coaster.service.job.manager.Cpu;
+import org.globus.cog.abstraction.coaster.service.job.manager.Job;
+import org.globus.cog.abstraction.coaster.service.job.manager.Node;
+import org.globus.cog.abstraction.coaster.service.job.manager.Time;
+import org.globus.cog.abstraction.coaster.service.job.manager.TimeInterval;
+import org.globus.cog.abstraction.impl.common.execution.WallTime;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.coaster.ProtocolException;
+
+
+public class InfoHandler extends RequestHandler {
+
+ public void requestComplete() throws ProtocolException {
+ String type = getInDataAsString(0);
+ String opts = getInDataAsString(1);
+
+ BlockQueueProcessor bqp = (BlockQueueProcessor) ((CoasterService) getChannel().getChannelContext().
+ getService()).getJobQueue().getCoasterQueueProcessor();
+
+ if (type.equals("workers")) {
+ Map<String, Block> blocks = bqp.getBlocks();
+ addOutData(" ID Cores Running");
+ for (Block b : blocks.values()) {
+ for (Node n : b.getNodes()) {
+ addOutData(formatWorkerStatusLine(n));
+ }
+ }
+ sendReply();
+ }
+ else if (type.equals("jobs")) {
+ Map<String, Block> blocks = bqp.getBlocks();
+ addOutData(" ID Worker Executable Start Time Walltime");
+ for (Block b : blocks.values()) {
+ for (Node n : b.getNodes()) {
+ for (Cpu c : n.getCpus()) {
+ Job running = c.getRunning();
+ if (running != null) {
+ addOutData(formatJobStatusLine(running, n));
+ }
+ }
+ }
+ }
+ sendReply();
+ }
+ else if (type.equals("blocks")) {
+ Map<String, Block> blocks = bqp.getBlocks();
+ addOutData(" ID State Workers Start Time Walltime");
+ for (Block b : blocks.values()) {
+ addOutData(formatBlockStatusLine(b));
+ }
+ sendReply();
+ }
+ else {
+ sendError("Unknown type: '" + type + "'");
+ }
+ }
+
+ private String formatJobStatusLine(Job j, Node n) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(j.getTask().getIdentity());
+ sb.append(" ");
+ sb.append(String.format("%6s:%06d", n.getBlock().getId(), n.getId()));
+ sb.append(" ");
+ sb.append(String.format("%20s", ((JobSpecification) j.getTask().getSpecification()).getExecutable()));
+ sb.append(" ");
+ sb.append(formatTime(j.getStartTime()));
+ sb.append(" ");
+ sb.append(WallTime.format(j.getMaxWallTime().getSeconds()));
+ return sb.toString();
+ }
+
+ private String formatWorkerStatusLine(Node n) {
+ return String.format("%6s:%06d %5d %7d",
+ n.getBlock().getId(), n.getId(), n.getCpus().size(), countRunning(n.getCpus()));
+ }
+
+ private int countRunning(Collection<Cpu> cpus) {
+ int s = 0;
+ for (Cpu c : cpus) {
+ if (c.getRunning() != null) {
+ s++;
+ }
+ }
+ return s;
+ }
+
+ private String formatBlockStatusLine(Block b) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("%6s", b.getId()));
+ sb.append(" ");
+ if (b.isRunning()) {
+ sb.append("R");
+ }
+ else if (b.isDone()) {
+ sb.append("C");
+ }
+ else if (b.isSuspended()) {
+ sb.append("S");
+ }
+ else {
+ sb.append("Q");
+ }
+
+ sb.append(" ");
+ sb.append(String.format("%7d", b.getWorkerCount()));
+ sb.append(" ");
+ sb.append(formatTime(b.getStartTime()));
+ sb.append(" ");
+ if (b.getWalltime() == TimeInterval.FOREVER) {
+ sb.append("-");
+ }
+ else {
+ sb.append(WallTime.format(b.getWalltime().getSeconds()));
+ }
+ return sb.toString();
+ }
+
+ private static final DateFormat DF = new SimpleDateFormat("MM/dd/yy HH:mm:ss");
+
+ private String formatTime(Time t) {
+ return DF.format(t.getMilliseconds());
+ }
+}
Index: modules/provider-coaster/src/org/globus/cog/coaster/commands/InfoCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/coaster/commands/InfoCommand.java (revision 0)
+++ modules/provider-coaster/src/org/globus/cog/coaster/commands/InfoCommand.java (revision 3843)
@@ -0,0 +1,31 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.coaster.commands;
+
+import org.globus.cog.coaster.ProtocolException;
+
+
+public class InfoCommand extends Command {
+ public static final String NAME = "INFO";
+
+ private final String type, options;
+
+ public InfoCommand(String type, String options) {
+ super(NAME);
+ this.type = type;
+ this.options = options;
+ }
+
+ public void send() throws ProtocolException {
+ addOutData(type);
+ addOutData(options);
+ super.send();
+ }
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/CoasterClient.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/CoasterClient.java (revision 0)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/CoasterClient.java (revision 3843)
@@ -0,0 +1,109 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Nov 24, 2013
+ */
+package org.globus.cog.abstraction.coaster.client;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.globus.cog.abstraction.coaster.service.local.LocalRequestManager;
+import org.globus.cog.abstraction.impl.execution.coaster.WorkerShellCommand;
+import org.globus.cog.coaster.ProtocolException;
+import org.globus.cog.coaster.channels.ChannelException;
+import org.globus.cog.coaster.channels.ChannelManager;
+import org.globus.cog.coaster.channels.CoasterChannel;
+import org.globus.cog.coaster.commands.Command;
+import org.globus.cog.coaster.commands.Command.Callback;
+import org.globus.cog.coaster.commands.InfoCommand;
+
+public class CoasterClient implements Callback {
+ private String url;
+
+ public CoasterClient(String url) {
+ this.url = url;
+ }
+
+ public void runCommand(String cmd, String[] args)
+ throws ChannelException, ProtocolException, IOException, InterruptedException {
+ Command c = null;
+ if (cmd.equals("submitjob")) {
+ // TODO this would need to parse the args in a meaningful way
+ // and feed them to a SubmitJobCommand
+ }
+ else if (cmd.equals("list")) {
+ c = new InfoCommand(args[0], args.length > 1 ? args[1] : "");
+ }
+ else if (cmd.equals("runcmd")) {
+ c = new WorkerShellCommand(args[0], join(args, 1, " ")) {
+ @Override
+ public void handleSignal(byte[] data) {
+ // live output is sent as signals
+ System.out.print(new String(data));
+ }
+ };
+ }
+ if (c == null) {
+ System.err.println("Command not handled: " + cmd);
+ }
+ else {
+ CoasterChannel channel = ChannelManager.getManager().reserveChannel(url,
+ null, LocalRequestManager.INSTANCE);
+ // do async execute since we can process the
+ // replies/errors instead of having exceptions thrown by execute()
+ c.executeAsync(channel, this);
+ c.waitForReply();
+ ChannelManager.getManager().releaseChannel(channel);
+ }
+ }
+
+ @Override
+ public void replyReceived(Command cmd) {
+ List<byte[]> reply = cmd.getInDataChunks();
+ for (byte[] b : reply) {
+ System.out.println(new String(b));
+ }
+ }
+
+ @Override
+ public void errorReceived(Command cmd, String msg, Exception t) {
+ System.err.println(msg);
+ t.printStackTrace();
+ }
+
+ private String join(String[] a, int start, String sep) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (int i = start; i < a.length; i++) {
+ if (first) {
+ first = false;
+ }
+ else {
+ sb.append(sep);
+ }
+ sb.append(a[i]);
+ }
+ return sb.toString();
+ }
+
+ public static void main(String[] args) {
+ try {
+ String url = args[0];
+ String cmd = args[1];
+ String[] a = new String[args.length - 2];
+ System.arraycopy(args, 2, a, 0, a.length);
+ CoasterClient client = new CoasterClient(url);
+ client.runCommand(cmd, a);
+ System.exit(0);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/GenericCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/GenericCommand.java (revision 0)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/GenericCommand.java (revision 3843)
@@ -0,0 +1,22 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Nov 25, 2013
+ */
+package org.globus.cog.abstraction.coaster.client;
+
+import org.globus.cog.coaster.commands.Command;
+
+public class GenericCommand extends Command {
+ public GenericCommand(String cmd, String[] args) {
+ super(cmd);
+
+ for (String s : args) {
+ this.addOutData(s);
+ }
+ }
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java (revision 3842)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java (working copy)
@@ -33,8 +33,10 @@
import org.globus.cog.abstraction.impl.file.coaster.handlers.RenameHandler;
import org.globus.cog.abstraction.impl.file.coaster.handlers.RmdirHandler;
import org.globus.cog.coaster.AbstractRequestManager;
+import org.globus.cog.coaster.commands.InfoCommand;
import org.globus.cog.coaster.handlers.ChannelConfigurationHandler;
import org.globus.cog.coaster.handlers.HeartBeatHandler;
+import org.globus.cog.coaster.handlers.InfoHandler;
import org.globus.cog.coaster.handlers.ShutdownHandler;
import org.globus.cog.coaster.handlers.VersionHandler;
@@ -44,6 +46,7 @@
addHandler("CHANNELCONFIG", ChannelConfigurationHandler.class);
addHandler("SHUTDOWN", ShutdownHandler.class);
addHandler("HEARTBEAT", HeartBeatHandler.class);
+ addHandler(InfoCommand.NAME, InfoHandler.class);
addHandler(SubmitJobCommand.NAME, SubmitJobHandler.class);
addHandler(ServiceShutdownHandler.NAME, ServiceShutdownHandler.class);
addHandler(WorkerShellHandler.NAME, WorkerShellHandler.class);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/PassiveQueueProcessor.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/PassiveQueueProcessor.java (revision 3842)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/PassiveQueueProcessor.java (working copy)
@@ -90,6 +90,7 @@
b = blocks.get(id);
if (b == null) {
b = new Block(id, 1, TimeInterval.FOREVER, this);
+ b.setStartTime(Time.now());
b.setRunning(true);
blocks.put(id, b);
}
@@ -104,6 +105,9 @@
currentWorkers--;
wsc = new ResourceUpdateCommand("job-capacity",
String.valueOf(currentWorkers * getSettings().getJobsPerNode()));
+ if (node.getBlock().getNodes().isEmpty()) {
+ getBlocks().remove(node.getBlock().getId());
+ }
}
try {
CoasterChannel channel = ChannelManager.getManager().reserveChannel(getClientChannelContext());
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java (revision 3842)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java (working copy)
@@ -577,12 +577,14 @@
}
public void removeNode(Node node) {
+ int left;
synchronized(cpus) {
nodes.remove(node);
for (Cpu cpu : node.getCpus()) {
scpus.remove(cpu);
cpus.remove(cpu);
}
+ left = nodes.size();
}
bqp.nodeRemoved(node);
}
More information about the Swift-commit
mailing list