[Swift-commit] cog r3843

swift at ci.uchicago.edu swift at ci.uchicago.edu
Mon Nov 25 17:05:03 CST 2013


------------------------------------------------------------------------
r3843 | hategan | 2013-11-25 17:02:59 -0600 (Mon, 25 Nov 2013) | 1 line

added a rudimentary client and the info command
------------------------------------------------------------------------
Index: modules/provider-coaster/launchers.xml
===================================================================
--- modules/provider-coaster/launchers.xml	(revision 3842)
+++ modules/provider-coaster/launchers.xml	(working copy)
@@ -2,9 +2,12 @@
   <target name="create">
 	<ant antfile="${main.buildfile}" target="launcher">
 		<property name="launcher-name" value="coaster-service"/>
-		<property name="class-name" 
-                         value="org.globus.cog.abstraction.coaster.service.CoasterPersistentService"/>
+		<property name="class-name" value="org.globus.cog.abstraction.coaster.service.CoasterPersistentService"/>
 	</ant>
+	<ant antfile="${main.buildfile}" target="launcher">
+		<property name="launcher-name" value="coaster-client"/>
+		<property name="class-name" value="org.globus.cog.abstraction.coaster.client.CoasterClient"/>
+	</ant>
   </target>
   <target name="webstart">
    </target>
Index: modules/provider-coaster/src/org/globus/cog/coaster/handlers/InfoHandler.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/coaster/handlers/InfoHandler.java	(revision 0)
+++ modules/provider-coaster/src/org/globus/cog/coaster/handlers/InfoHandler.java	(revision 3843)
@@ -0,0 +1,142 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2005
+ */
+package org.globus.cog.coaster.handlers;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Map;
+
+import org.globus.cog.abstraction.coaster.service.CoasterService;
+import org.globus.cog.abstraction.coaster.service.job.manager.Block;
+import org.globus.cog.abstraction.coaster.service.job.manager.BlockQueueProcessor;
+import org.globus.cog.abstraction.coaster.service.job.manager.Cpu;
+import org.globus.cog.abstraction.coaster.service.job.manager.Job;
+import org.globus.cog.abstraction.coaster.service.job.manager.Node;
+import org.globus.cog.abstraction.coaster.service.job.manager.Time;
+import org.globus.cog.abstraction.coaster.service.job.manager.TimeInterval;
+import org.globus.cog.abstraction.impl.common.execution.WallTime;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.coaster.ProtocolException;
+
+
+public class InfoHandler extends RequestHandler {
+	
+	public void requestComplete() throws ProtocolException {
+	    String type = getInDataAsString(0);
+	    String opts = getInDataAsString(1);
+	    
+	    BlockQueueProcessor bqp = (BlockQueueProcessor) ((CoasterService) getChannel().getChannelContext().
+                getService()).getJobQueue().getCoasterQueueProcessor();
+		
+	    if (type.equals("workers")) {
+	        Map<String, Block> blocks = bqp.getBlocks();
+            addOutData("           ID    Cores     Running");
+            for (Block b : blocks.values()) {
+                for (Node n : b.getNodes()) {
+                    addOutData(formatWorkerStatusLine(n));
+                }
+            }
+            sendReply();
+	    }
+	    else if (type.equals("jobs")) {
+	        Map<String, Block> blocks = bqp.getBlocks();
+	        addOutData("          ID          Worker            Executable           Start Time     Walltime");
+	        for (Block b : blocks.values()) {
+                for (Node n : b.getNodes()) {
+                    for (Cpu c : n.getCpus()) {
+                        Job running = c.getRunning();
+                        if (running != null) {
+                            addOutData(formatJobStatusLine(running, n));
+                        }
+                    }
+                }
+	        }
+	        sendReply();
+	    }
+	    else if (type.equals("blocks")) {
+	        Map<String, Block> blocks = bqp.getBlocks();
+	        addOutData("    ID    State    Workers           Start Time    Walltime");
+	        for (Block b : blocks.values()) {
+	            addOutData(formatBlockStatusLine(b));
+	        }
+	        sendReply();
+	    }
+	    else {
+	        sendError("Unknown type: '" + type + "'");
+	    }
+	}
+
+    private String formatJobStatusLine(Job j, Node n) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(j.getTask().getIdentity());
+        sb.append("    ");
+        sb.append(String.format("%6s:%06d", n.getBlock().getId(), n.getId()));
+        sb.append("    ");
+        sb.append(String.format("%20s", ((JobSpecification) j.getTask().getSpecification()).getExecutable()));
+        sb.append("    ");
+        sb.append(formatTime(j.getStartTime()));
+        sb.append("    ");
+        sb.append(WallTime.format(j.getMaxWallTime().getSeconds()));
+        return sb.toString();
+    }
+
+    private String formatWorkerStatusLine(Node n) {
+        return String.format("%6s:%06d    %5d    %7d", 
+                n.getBlock().getId(), n.getId(), n.getCpus().size(), countRunning(n.getCpus()));
+    }
+
+    private int countRunning(Collection<Cpu> cpus) {
+        int s = 0;
+        for (Cpu c : cpus) {
+            if (c.getRunning() != null) {
+                s++;
+            }
+        }
+        return s;
+    }
+
+    private String formatBlockStatusLine(Block b) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(String.format("%6s", b.getId()));
+        sb.append("        ");
+        if (b.isRunning()) {
+            sb.append("R");
+        }
+        else if (b.isDone()) {
+            sb.append("C");
+        }
+        else if (b.isSuspended()) {
+            sb.append("S");
+        }
+        else {
+            sb.append("Q");
+        }
+        
+        sb.append("    ");
+        sb.append(String.format("%7d", b.getWorkerCount()));
+        sb.append("    ");
+        sb.append(formatTime(b.getStartTime()));
+        sb.append("    ");
+        if (b.getWalltime() == TimeInterval.FOREVER) {
+            sb.append("-");
+        }
+        else {
+            sb.append(WallTime.format(b.getWalltime().getSeconds()));
+        }
+        return sb.toString();
+    }
+    
+    private static final DateFormat DF = new SimpleDateFormat("MM/dd/yy HH:mm:ss");
+
+    private String formatTime(Time t) {
+        return DF.format(t.getMilliseconds());
+    }
+}
Index: modules/provider-coaster/src/org/globus/cog/coaster/commands/InfoCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/coaster/commands/InfoCommand.java	(revision 0)
+++ modules/provider-coaster/src/org/globus/cog/coaster/commands/InfoCommand.java	(revision 3843)
@@ -0,0 +1,31 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.coaster.commands;
+
+import org.globus.cog.coaster.ProtocolException;
+
+
+public class InfoCommand extends Command {
+    public static final String NAME = "INFO";
+    
+	private final String type, options;
+	
+	public InfoCommand(String type, String options) {
+	    super(NAME);
+		this.type = type;
+		this.options = options;
+	}
+	
+	public void send() throws ProtocolException {
+		addOutData(type);
+		addOutData(options);
+		super.send();
+	}
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/CoasterClient.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/CoasterClient.java	(revision 0)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/CoasterClient.java	(revision 3843)
@@ -0,0 +1,109 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Nov 24, 2013
+ */
+package org.globus.cog.abstraction.coaster.client;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.globus.cog.abstraction.coaster.service.local.LocalRequestManager;
+import org.globus.cog.abstraction.impl.execution.coaster.WorkerShellCommand;
+import org.globus.cog.coaster.ProtocolException;
+import org.globus.cog.coaster.channels.ChannelException;
+import org.globus.cog.coaster.channels.ChannelManager;
+import org.globus.cog.coaster.channels.CoasterChannel;
+import org.globus.cog.coaster.commands.Command;
+import org.globus.cog.coaster.commands.Command.Callback;
+import org.globus.cog.coaster.commands.InfoCommand;
+
+public class CoasterClient implements Callback {
+    private String url;
+    
+    public CoasterClient(String url) {
+        this.url = url;
+    }
+    
+    public void runCommand(String cmd, String[] args) 
+            throws ChannelException, ProtocolException, IOException, InterruptedException {
+        Command c = null;
+        if (cmd.equals("submitjob")) {
+            // TODO this would need to parse the args in a meaningful way
+            // and feed them to a SubmitJobCommand
+        }
+        else if (cmd.equals("list")) {
+            c = new InfoCommand(args[0], args.length > 1 ? args[1] : "");
+        }
+        else if (cmd.equals("runcmd")) {
+            c = new WorkerShellCommand(args[0], join(args, 1, " ")) {
+                @Override
+                public void handleSignal(byte[] data) {
+                    // live output is sent as signals
+                    System.out.print(new String(data));
+                }
+            };
+        }
+        if (c == null) {
+            System.err.println("Command not handled: " + cmd);
+        }
+        else {
+            CoasterChannel channel = ChannelManager.getManager().reserveChannel(url, 
+                null, LocalRequestManager.INSTANCE);
+            // do async execute since we can process the
+            // replies/errors instead of having exceptions thrown by execute()
+            c.executeAsync(channel, this);
+            c.waitForReply();
+            ChannelManager.getManager().releaseChannel(channel);
+        }
+    }
+
+    @Override
+    public void replyReceived(Command cmd) {
+        List<byte[]> reply = cmd.getInDataChunks();
+        for (byte[] b : reply) {
+            System.out.println(new String(b));
+        }
+    }
+
+    @Override
+    public void errorReceived(Command cmd, String msg, Exception t) {
+        System.err.println(msg);
+        t.printStackTrace();
+    }
+
+    private String join(String[] a, int start, String sep) {
+        StringBuilder sb = new StringBuilder();
+        boolean first = true;
+        for (int i = start; i < a.length; i++) {
+            if (first) {
+                first = false;
+            }
+            else {
+                sb.append(sep);
+            }
+            sb.append(a[i]);
+        }
+        return sb.toString();
+    }
+
+    public static void main(String[] args) {
+        try {
+            String url = args[0];
+            String cmd = args[1];
+            String[] a = new String[args.length - 2];
+            System.arraycopy(args, 2, a, 0, a.length);
+            CoasterClient client = new CoasterClient(url);
+            client.runCommand(cmd, a);
+            System.exit(0);
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/GenericCommand.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/GenericCommand.java	(revision 0)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/client/GenericCommand.java	(revision 3843)
@@ -0,0 +1,22 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Nov 25, 2013
+ */
+package org.globus.cog.abstraction.coaster.client;
+
+import org.globus.cog.coaster.commands.Command;
+
+public class GenericCommand extends Command {    
+    public GenericCommand(String cmd, String[] args) {
+        super(cmd);
+        
+        for (String s : args) {
+            this.addOutData(s);
+        }
+    }
+}
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java	(revision 3842)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java	(working copy)
@@ -33,8 +33,10 @@
 import org.globus.cog.abstraction.impl.file.coaster.handlers.RenameHandler;
 import org.globus.cog.abstraction.impl.file.coaster.handlers.RmdirHandler;
 import org.globus.cog.coaster.AbstractRequestManager;
+import org.globus.cog.coaster.commands.InfoCommand;
 import org.globus.cog.coaster.handlers.ChannelConfigurationHandler;
 import org.globus.cog.coaster.handlers.HeartBeatHandler;
+import org.globus.cog.coaster.handlers.InfoHandler;
 import org.globus.cog.coaster.handlers.ShutdownHandler;
 import org.globus.cog.coaster.handlers.VersionHandler;
 
@@ -44,6 +46,7 @@
         addHandler("CHANNELCONFIG", ChannelConfigurationHandler.class);
         addHandler("SHUTDOWN", ShutdownHandler.class);
         addHandler("HEARTBEAT", HeartBeatHandler.class);
+        addHandler(InfoCommand.NAME, InfoHandler.class);
         addHandler(SubmitJobCommand.NAME, SubmitJobHandler.class);
         addHandler(ServiceShutdownHandler.NAME, ServiceShutdownHandler.class);
         addHandler(WorkerShellHandler.NAME, WorkerShellHandler.class);
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/PassiveQueueProcessor.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/PassiveQueueProcessor.java	(revision 3842)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/PassiveQueueProcessor.java	(working copy)
@@ -90,6 +90,7 @@
             b = blocks.get(id);
             if (b == null) {
                 b = new Block(id, 1, TimeInterval.FOREVER, this);
+                b.setStartTime(Time.now());
                 b.setRunning(true);
                 blocks.put(id, b);
             }
@@ -104,6 +105,9 @@
             currentWorkers--;
             wsc = new ResourceUpdateCommand("job-capacity", 
                 String.valueOf(currentWorkers * getSettings().getJobsPerNode()));
+            if (node.getBlock().getNodes().isEmpty()) {
+                getBlocks().remove(node.getBlock().getId());
+            }
         }
         try {
             CoasterChannel channel = ChannelManager.getManager().reserveChannel(getClientChannelContext());
Index: modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java
===================================================================
--- modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java	(revision 3842)
+++ modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Block.java	(working copy)
@@ -577,12 +577,14 @@
     }
 
     public void removeNode(Node node) {
+        int left;
         synchronized(cpus) {
             nodes.remove(node);
             for (Cpu cpu : node.getCpus()) {
                 scpus.remove(cpu);
                 cpus.remove(cpu);
             }
+            left = nodes.size();
         }
         bqp.nodeRemoved(node);
     }



More information about the Swift-commit mailing list