[Swift-commit] r7965 - in trunk/src/org/griphyn/vdl/karajan/monitor: . common items monitors/http processors processors/coasters processors/karajan processors/swift

hategan at ci.uchicago.edu hategan at ci.uchicago.edu
Fri Jul 4 01:54:52 CDT 2014


Author: hategan
Date: 2014-07-04 01:54:51 -0500 (Fri, 04 Jul 2014)
New Revision: 7965

Added:
   trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractListenableStatefulItem.java
   trunk/src/org/griphyn/vdl/karajan/monitor/items/ChainedListener.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppDetailBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppInstanceBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppListBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppsSummaryBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/BrowserDataBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SiteInfoBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SwiftLogInfo.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerInfoBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerListBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeItem.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppFailureProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/ConfigurationProcessor.java
Modified:
   trunk/src/org/griphyn/vdl/karajan/monitor/MonitorAppender.java
   trunk/src/org/griphyn/vdl/karajan/monitor/StateUpdater.java
   trunk/src/org/griphyn/vdl/karajan/monitor/SystemState.java
   trunk/src/org/griphyn/vdl/karajan/monitor/common/DataSampler.java
   trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractStatefulItem.java
   trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationItem.java
   trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationState.java
   trunk/src/org/griphyn/vdl/karajan/monitor/items/StatefulItem.java
   trunk/src/org/griphyn/vdl/karajan/monitor/items/SummaryItem.java
   trunk/src/org/griphyn/vdl/karajan/monitor/items/TaskItem.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/HTTPServer.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/JSONEncoder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SummaryDataBuilder.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/AbstractMessageProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/SimpleParser.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/BlockRequestedProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/CoasterStatusItem.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/RemoteLogProcessorDispatcher.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerActiveProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerLostProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerShutDownProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/karajan/TaskProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppEndProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppStartProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/JobProcessor.java
   trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/SummaryProcessor.java
Log:
monitor updates

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/MonitorAppender.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/MonitorAppender.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/MonitorAppender.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -40,15 +40,18 @@
 import org.griphyn.vdl.karajan.monitor.processors.coasters.BlockFailedProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.coasters.BlockRequestedProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.coasters.RemoteLogProcessorDispatcher;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerActiveProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerLostProcessor;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerProbeProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerShutDownProcessor;
-import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerActiveProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.karajan.ExecutionContextProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.karajan.SchedulerInfoProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.karajan.TaskProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.swift.AppEndProcessor;
+import org.griphyn.vdl.karajan.monitor.processors.swift.AppFailureProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.swift.AppStartProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.swift.AppThreadProcessor;
+import org.griphyn.vdl.karajan.monitor.processors.swift.ConfigurationProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.swift.ForeachItEndProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.swift.ForeachItStartProcessor;
 import org.griphyn.vdl.karajan.monitor.processors.swift.JobProcessor;
@@ -92,11 +95,13 @@
     	updater.addProcessor(new JobProcessor());
     	updater.addProcessor(new SchedulerInfoProcessor());
     	updater.addProcessor(new ExecutionContextProcessor());
+    	updater.addProcessor(new ConfigurationProcessor());
     	
     	addFilteredProcessors(updater, SwiftProcessorDispatcher.class, 
     	    new AppStartProcessor(),
     	    new AppEndProcessor(),
     	    new AppThreadProcessor(),
+    	    new AppFailureProcessor(),
     	    new ProcedureStartProcessor(),
     	    new ProcedureEndProcessor(),
     	    new ForeachItStartProcessor(),
@@ -110,7 +115,8 @@
     	    new BlockFailedProcessor(),
     	    new WorkerActiveProcessor(),
     	    new WorkerLostProcessor(),
-    	    new WorkerShutDownProcessor());
+    	    new WorkerShutDownProcessor(),
+    	    new WorkerProbeProcessor());
     }
 
 	private void addFilteredProcessors(StateUpdater updater, 
@@ -148,6 +154,7 @@
 
 	public void doAppend(LoggingEvent event) {
         try {
+            state.setCurrentTime(event.getTimeStamp());
         	updater.logEvent(event.getLevel(), event.getLogger().getName(),
         			event.getMessage(), event.getThrowableInformation());
         }

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/StateUpdater.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/StateUpdater.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/StateUpdater.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -64,7 +64,6 @@
     }
     
     public void logEvent(Object category, String source, Object message, Object details) {
-        state.setCurrentTime(System.currentTimeMillis());
     	Map<String, List<LogMessageProcessor>> sources = levels.get(category);
         if (sources == null) {
             return;

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/SystemState.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/SystemState.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/SystemState.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -24,11 +24,12 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TimerTask;
+import java.util.TreeSet;
 
 import k.rt.Stack;
 
-import org.griphyn.vdl.karajan.monitor.common.GlobalTimer;
 import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
 import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
 import org.griphyn.vdl.karajan.monitor.items.SummaryItem;
@@ -39,11 +40,13 @@
 	private Map<StatefulItemClass, StatefulItemClassSet<? extends StatefulItem>> classes;
     private Set<SystemStateListener> listeners;
     private Map<String, Stats> stats;
-    private int total, completed, completedPreviously;
-    private long start, currentTime;
+    private int total, completed, completedPreviously, currentThreads, retries;
+    private long start, currentTime, usedHeap, maxHeap;
     private Stack stack;
     private String projectName;
     private final Runtime runtime;
+    private SortedSet<TimerTaskEntry> tasks;
+    private boolean replicationEnabled, resumed;
     
     private static final Unit BYTES = new Unit.P2("B");
 
@@ -54,7 +57,8 @@
         stats = new HashMap<String, Stats>();
         runtime = Runtime.getRuntime();
         addItem(new SummaryItem());
-        GlobalTimer.getTimer().schedule(new TimerTask() {
+        tasks = new TreeSet<TimerTaskEntry>();
+        schedule(new TimerTask() {
             public void run() {
                 update();
             }
@@ -177,7 +181,7 @@
     }
     
     public String getCurrentHeapFormatted() {
-        return BYTES.format(getCurrentHeap());
+        return BYTES.format(getUsedHeap());
     }
     
     public String getElapsedTimeFormatted() {
@@ -248,18 +252,117 @@
 
     
     public long getMaxHeap() {
-        return runtime.maxMemory();
+        return maxHeap;
     }
     
-    public long getCurrentHeap() {
-        return runtime.totalMemory() - runtime.freeMemory();
+    public long getUsedHeap() {
+        return usedHeap;
     }
 
+    public int getCurrentThreads() {
+        return currentThreads;
+    }
+
+    public void setCurrentThreads(int currentThreads) {
+        this.currentThreads = currentThreads;
+    }
+
+    public void setUsedHeap(long usedHeap) {
+        this.usedHeap = usedHeap;
+    }
+
+    public void setMaxHeap(long maxHeap) {
+        this.maxHeap = maxHeap;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+
+    public boolean getReplicationEnabled() {
+        return replicationEnabled;
+    }
+
+    public void setReplicationEnabled(boolean replicationEnabled) {
+        this.replicationEnabled = replicationEnabled;
+    }
+
+    public boolean getResumed() {
+        return resumed;
+    }
+
+    public void setResumed(boolean resumed) {
+        this.resumed = resumed;
+    }
+
     public long getCurrentTime() {
         return currentTime;
     }
 
     public void setCurrentTime(long currentTime) {
         this.currentTime = currentTime;
+        if (!tasks.isEmpty()) {
+            TimerTaskEntry e = tasks.first();
+            while (e.nextTime < currentTime) {
+                tasks.remove(e);
+                
+                if (e.nextTime < MAX_INITIAL) {
+                    e.nextTime = currentTime + e.nextTime;
+                }
+                else {
+                    try {
+                        e.task.run();
+                    }
+                    catch (Exception ex) {
+                        ex.printStackTrace();
+                    }
+                    e.nextTime = e.nextTime + e.delay;
+                }
+                tasks.add(e);
+                e = tasks.first();
+            }
+        }
     }
+    
+    private static class TimerTaskEntry implements Comparable<TimerTaskEntry> {
+        public long nextTime;
+        public long delay;
+        public TimerTask task;
+        
+        public TimerTaskEntry(TimerTask task, long nextTime, long delay) {
+            this.task = task;
+            this.nextTime = nextTime;
+            this.delay = delay;
+        }
+
+        @Override
+        public int compareTo(TimerTaskEntry o) {
+            long diff = nextTime - o.nextTime;
+            if (diff == 0) {
+                return System.identityHashCode(this) - System.identityHashCode(o);
+            }
+            else {
+                if (diff < 0) {
+                    return -1;
+                }
+                else {
+                    return 1;
+                }
+            }
+        }
+    }
+    
+    private static final long MAX_INITIAL = 1000000;
+    
+    public void schedule(TimerTask task, long initial, long repeat) {
+        if (initial > MAX_INITIAL) {
+            throw new IllegalArgumentException("Initial delay too large");
+        }
+        TimerTaskEntry e = new TimerTaskEntry(task, currentTime + initial, repeat);
+        tasks.add(e);
+    }
 }

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/common/DataSampler.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/common/DataSampler.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/common/DataSampler.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -55,7 +55,7 @@
         this.listeners = new ArrayList<Listener>();
         initializeData();
         
-        GlobalTimer.getTimer().schedule(new TimerTask() {
+        state.schedule(new TimerTask() {
             @Override
             public void run() {
                 sample();
@@ -90,9 +90,9 @@
         
         addSeries("Java Virtual Machine", 
             new Series<Long>("jvm/heapUsed", "JVM Heap Used", BYTES, 
-                new ReflectionSampler<Long>(state, "getCurrentHeap")),
+                new ReflectionSampler<Long>(state, "getUsedHeap")),
             new Series<Integer>("jvm/activeThreads", "JVM Active Threads", COUNT,
-                new ReflectionSampler<Integer>(Thread.class, "activeCount")));
+                new ReflectionSampler<Integer>(state, "getCurrentThreads")));
         
         
         CoasterStatusItem coaster = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
@@ -151,7 +151,7 @@
     }
 
     protected void sample() {
-        long now = (System.currentTimeMillis() / 1000);
+        long now = state.getCurrentTime() / 1000;
         if (offset + count != now) {
             if (offset < 0) {
                 offset = now;

Added: trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractListenableStatefulItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractListenableStatefulItem.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractListenableStatefulItem.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,32 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 9, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.items;
+
+public abstract class AbstractListenableStatefulItem extends AbstractStatefulItem {
+    
+    private Listener listener;
+
+    public AbstractListenableStatefulItem(String id) {
+        super(id);
+    }
+    
+    public void addListener(Listener listener) {
+        if (this.listener != null) {
+            listener = new ChainedListener(listener, this.listener);
+        }
+        this.listener = listener;
+    }
+    
+    protected void notifyListener() {
+        if (listener != null) {
+            listener.itemUpdated(this);
+        }
+    }
+}

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractStatefulItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractStatefulItem.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractStatefulItem.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -76,4 +76,9 @@
 	public String getID() {
 		return id;
 	}
+
+    @Override
+    public void addListener(Listener l) {
+        // not implemented by default
+    }
 }

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationItem.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationItem.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -22,7 +22,7 @@
 
 
 public class ApplicationItem extends AbstractStatefulItem {
-	private String name, arguments, host;
+	private String name, arguments, host, workerId;
 	private long startTime, currentStateTime;
 	/**
 	 * The state of the app. Currently swift does not log app state transitions
@@ -97,6 +97,14 @@
         return state;
     }
 
+    public String getWorkerId() {
+        return workerId;
+    }
+
+    public void setWorkerId(String workerId) {
+        this.workerId = workerId;
+    }
+
     public String toString() {
 		return "APP[" + name + ", " + arguments + ", " + host + "]";
 	}

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationState.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationState.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationState.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -13,9 +13,14 @@
 import java.util.List;
 
 public enum ApplicationState {
+    //0
     INITIALIZING("Initializing"), SELECTING_SITE("Selecting site", "Sel. site"), STAGE_IN("Stage in"),
+    //3
     SUBMITTING("Submitting"), SUBMITTED("Submitted"), ACTIVE("Active"), STAGE_OUT("Stage out"),
-    FAILED("Failed"), REPLICATING("Replicating"), FINISHED_IN_PREVIOUS_RUN("Finished in previous run", "Finished in prev. run", false), 
+    //7
+    FAILED("Failed"), REPLICATING("Replicating", "Replicating", false), 
+    FINISHED_IN_PREVIOUS_RUN("Finished in previous run", "Finished in prev. run", false),
+    //10
     FINISHED_SUCCESSFULLY("Finished successfully");
     
     private String name, shortName;
@@ -71,4 +76,8 @@
         
         return enabledValues;
     }
+
+    public boolean isTerminal() {
+        return this == FAILED || this == FINISHED_SUCCESSFULLY || this == FINISHED_IN_PREVIOUS_RUN;
+    }
 }
\ No newline at end of file

Added: trunk/src/org/griphyn/vdl/karajan/monitor/items/ChainedListener.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/ChainedListener.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/ChainedListener.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,25 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 9, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.items;
+
+public class ChainedListener implements StatefulItem.Listener {
+    private StatefulItem.Listener crt, old;
+    
+    public ChainedListener(StatefulItem.Listener crt, StatefulItem.Listener old) {
+        this.crt = crt;
+        this.old = old;
+    }
+
+    @Override
+    public void itemUpdated(StatefulItem item) {
+        crt.itemUpdated(item);
+        old.itemUpdated(item);
+    }
+}

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/StatefulItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/StatefulItem.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/StatefulItem.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -23,6 +23,10 @@
 import java.util.Collection;
 
 public interface StatefulItem {
+    public interface Listener {
+        void itemUpdated(StatefulItem item);
+    }
+    
     StatefulItem getParent();
 	void setParent(StatefulItem parent);
 	
@@ -33,4 +37,6 @@
 	StatefulItemClass getItemClass();
 	
 	String getID();
+	
+    void addListener(Listener l);
 }

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/SummaryItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/SummaryItem.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/SummaryItem.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -20,7 +20,6 @@
  */
 package org.griphyn.vdl.karajan.monitor.items;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -77,23 +76,11 @@
     }
 	
 	public int getCount(String key, SystemState state) {
-	    if (state.getStack() != null) {
-	        // TODO Must get these from log
-	        return -1;
-	    }
-	    else {
-	        return getCount(key);
-	    }
+        return getCount(key);
 	}
 	
 	public synchronized Map<String, Integer> getCounts(SystemState state) {
-	    if (state.getStack() != null) {
-	        // TODO Must get these from log
-            return Collections.emptyMap();
-	    }
-	    else {
-	        return new HashMap<String, Integer>(counts);
-	    }
+        return new HashMap<String, Integer>(counts);
 	}
 	
 	public synchronized void setCount(String key, int value) {

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/TaskItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/TaskItem.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/TaskItem.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -25,6 +25,7 @@
 public class TaskItem extends AbstractStatefulItem {
 	private Task task;
 	private int status, type;
+	private String workerId;
 	
 	public TaskItem(String id, Task task) {
 		super(id);
@@ -76,4 +77,12 @@
 	public int getStatus() {
 		return status;
 	}
+
+    public String getWorkerId() {
+        return workerId;
+    }
+
+    public void setWorkerId(String workerdId) {
+        this.workerId = workerdId;
+    }
 }

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppDetailBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppDetailBuilder.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppDetailBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,243 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.TimedValue;
+
+public class AppDetailBuilder {
+
+    private String name;
+    private SortedMap<String, List<SortedSet<ApplicationItem>>> byName;
+    private BrowserDataBuilder db;
+
+    public AppDetailBuilder(BrowserDataBuilder db, String name) {
+        this.db = db;
+        this.byName = db.getByName();
+        this.name = name;
+    }
+    
+    private static class StateTimesAverage {
+        public static final int N = 7;
+        public int[] stateTimeSum = new int[N];
+        public int[] stateTimeCount = new int[N];
+        
+        public void add(ApplicationState state, long time) {
+            int index = INDEX_MAPPING[state.ordinal()];
+            stateTimeCount[index]++;
+            stateTimeSum[index] += (int) time;
+        }
+        
+        public int[] getAverages() {
+            int[] avg = new int[N];
+            for (int i = 0; i < N; i++) {
+                if (stateTimeCount[i] == 0) {
+                    avg[i] = 0;
+                }
+                else {
+                    avg[i] = stateTimeSum[i] / stateTimeCount[i];
+                }
+            }
+            return avg;
+        }
+    }
+    
+    private static int[] INDEX_MAPPING = new int[ApplicationState.values().length];
+    
+    static {
+        INDEX_MAPPING[ApplicationState.INITIALIZING.ordinal()] = 0; 
+        INDEX_MAPPING[ApplicationState.SELECTING_SITE.ordinal()] = 1;
+        INDEX_MAPPING[ApplicationState.SUBMITTING.ordinal()] = 2;
+        INDEX_MAPPING[ApplicationState.SUBMITTED.ordinal()] = 3;
+        INDEX_MAPPING[ApplicationState.STAGE_IN.ordinal()] = 4;
+        INDEX_MAPPING[ApplicationState.ACTIVE.ordinal()] = 5;
+        INDEX_MAPPING[ApplicationState.STAGE_OUT.ordinal()] = 6;
+    }
+
+
+    public void getData(JSONEncoder e) {
+        // sites it ran on
+        Set<String> sites = new TreeSet<String>();
+        // average times for each relevant state then for total
+        // relevant states: Initializing, Sel. site, Stage in, Submitting, Submitted (queued remotely).
+        // Active, Stage out, Total
+        StateTimesAverage st = new StateTimesAverage();
+        // same for each site
+        Map<String, StateTimesAverage> sts = new HashMap<String, StateTimesAverage>();
+        List<Integer> totalTimesC = new ArrayList<Integer>();
+        List<Integer> totalTimesF = new ArrayList<Integer>();
+        Map<String, List<Integer>> totalTimesCompletedSite = new HashMap<String, List<Integer>>();
+        Map<String, List<Integer>> totalTimesFailedSite = new HashMap<String, List<Integer>>();
+        
+        int count = 0;
+        List<SortedSet<ApplicationItem>> l = byName.get(name);
+        for (int i = 0; i < l.size(); i++) {
+            SortedSet<ApplicationItem> ss = l.get(i);
+            for (ApplicationItem item : ss) {
+                count++;
+                String host = item.getHost();
+                if (!sites.contains(host)) {
+                    sites.add(host);
+                    sts.put(host, new StateTimesAverage());
+                    totalTimesCompletedSite.put(host, new ArrayList<Integer>());
+                    totalTimesFailedSite.put(host, new ArrayList<Integer>());
+                }
+                
+                StateTimesAverage stss = sts.get(host);
+                List<Integer> ttCs = totalTimesCompletedSite.get(host);
+                List<Integer> ttFs = totalTimesFailedSite.get(host);
+                
+                List<TimedValue<ApplicationState>> tl = db.getTimeline(item);
+                long lastTime = -1;
+                long firstTime = -1;
+                ApplicationState lastState = null;
+                for (TimedValue<ApplicationState> p : tl) {
+                    if (lastState != null) {
+                        switch (lastState) {
+                            case STAGE_IN:
+                                firstTime = p.time;
+                            case FINISHED_SUCCESSFULLY:
+                            case FAILED:
+                            case INITIALIZING:
+                            case SELECTING_SITE:
+                            case SUBMITTING:
+                            case SUBMITTED:
+                            case ACTIVE:
+                            case STAGE_OUT:
+                                st.add(lastState, p.time - lastTime);
+                                stss.add(lastState, p.time - lastTime);
+                        }
+                        int time;
+                        switch (lastState) {
+                            case FINISHED_SUCCESSFULLY:
+                                time = (int) (p.time - firstTime);
+                                totalTimesC.add(time);
+                                ttCs.add(time);
+                                break;
+                            case FAILED:
+                                time = (int) (p.time - firstTime);
+                                totalTimesF.add(time);
+                                ttFs.add(time);
+                                break;
+                        }
+                    }
+                    lastTime = p.time;
+                    lastState = p.value;
+                }
+            }
+        }
+        
+        // get range for total times
+        int minTime = Math.min(min(totalTimesC), min(totalTimesF));
+        int maxTime = Math.max(max(totalTimesC), max(totalTimesF));
+        
+        
+        int bins = (int) Math.max(Math.min(count / 5, 100.0), 1);
+        double binSize = ((double) (maxTime - minTime)) / bins;
+        
+        // now serialize this
+        e.beginMap();
+            e.writeMapItem("name", name);
+            e.writeMapItem("count", count);
+            e.writeMapItem("completedCount", totalTimesC.size());
+            e.writeMapItem("failedCount", totalTimesF.size());
+            e.writeMapItem("avgStateTimes", st.getAverages());
+            e.writeMapItem("distMinTime", minTime);
+            e.writeMapItem("distMaxTime", maxTime);
+            e.writeMapItem("bins", 100);
+            e.writeMapItem("completedTimeDist", bin(totalTimesC, binSize, minTime, maxTime, bins));
+            e.writeMapItem("failedTimeDist", bin(totalTimesF, binSize, minTime, maxTime, bins));
+            e.writeMapItem("completedTimeAvg", avg(totalTimesC));
+            e.writeMapItem("failedTimeAvg", avg(totalTimesF));
+            e.writeMapKey("sites");
+            e.beginArray();
+                for (String host : sites) {
+                    e.beginArrayItem();
+                        e.beginMap();
+                            e.writeMapItem("name", host);
+                            e.writeMapItem("count", totalTimesCompletedSite.get(host).size());
+                            e.writeMapItem("completedCount", totalTimesCompletedSite.get(host).size());
+                            e.writeMapItem("failedCount", totalTimesFailedSite.get(host).size());
+                            e.writeMapItem("avgStateTimes", sts.get(host).getAverages());
+                            e.writeMapItem("distMinTime", minTime);
+                            e.writeMapItem("distMaxTime", maxTime);
+                            e.writeMapItem("completedTimeDist", bin(totalTimesCompletedSite.get(host), binSize, minTime, maxTime, bins));
+                            e.writeMapItem("failedTimeDist", bin(totalTimesFailedSite.get(host), binSize, minTime, maxTime, bins));
+                            e.writeMapItem("completedTimeAvg", avg(totalTimesCompletedSite.get(host)));
+                            e.writeMapItem("failedTimeAvg", avg(totalTimesFailedSite.get(host)));
+                        e.endMap();
+                    e.endArrayItem();
+                }
+            e.endArray();
+            
+        e.endMap();
+    }
+
+    private int avg(List<Integer> l) {
+        if (l.isEmpty()) {
+            return 0;
+        }
+        int sum = 0;
+        for (Integer i : l) {
+            sum += i;
+        }
+        return sum / l.size();
+    }
+
+    private List<Integer> bin(List<Integer> l, double binSize, int minTime, int maxTime, int binCount) {
+        List<Integer> hist = new ArrayList<Integer>();
+        for (int i = 0; i < binCount; i++) {
+            hist.add(0);
+        }
+        for (Integer v : l) {
+            int bin = (int) Math.ceil((v - minTime) / binSize) - 1;
+            hist.set(bin, hist.get(bin) + 1);
+        }
+        return hist;
+    }
+    
+    private int min(List<Integer> l) {
+        if (l.isEmpty()) {
+            return 0;
+        }
+        
+        int min = l.get(0);
+        for (Integer i : l) {
+            if (i < min) {
+                min = i;
+            }
+        }
+        return min;
+    }
+    
+    private int max(List<Integer> l) {
+        if (l.isEmpty()) {
+            return 0;
+        }
+        
+        int max = l.get(0);
+        for (Integer i : l) {
+            if (i > max) {
+                max = i;
+            }
+        }
+        return max;
+    }
+}

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppInstanceBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppInstanceBuilder.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppInstanceBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,148 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+import org.griphyn.vdl.karajan.monitor.items.TaskItem;
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.AppEntry;
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.TimedValue;
+
+public class AppInstanceBuilder {
+
+    private BrowserDataBuilder db;
+    private String id;
+    private Map<String, AppEntry> entries;
+    private SystemState state;
+
+    public AppInstanceBuilder(BrowserDataBuilder db, String id) {
+        this.db = db;
+        this.id = id;
+        this.entries = db.getEntries();
+        this.state = db.getState();
+    }
+
+    public void getData(JSONEncoder e) {
+        AppEntry ae = entries.get(id);
+        if (ae == null) {
+            throw new IllegalArgumentException("Unknown application ID: " + id);
+        }
+        
+        ApplicationItem app = ae.item;
+        
+        e.beginMap();
+        e.writeMapItem("id", id);
+        e.writeMapItem("name", app.getName());
+        e.writeMapItem("args", app.getArguments());
+        e.writeMapItem("host", app.getHost());
+        e.writeMapItem("crtState", app.getState().ordinal());
+        e.writeMapItem("totalTime", getTotalTime(ae.stateTimeline));
+        e.writeMapItem("runTime", getRunTime(ae.stateTimeline));
+        e.writeMapItem("timeline", db.getStateTimes(app));
+                
+        if (app.getWorkerId() != null) {
+            e.writeMapItem("workerid", app.getWorkerId());
+        }
+        
+        TaskItem et = null;
+        
+        if (ae.tasks != null) {
+            for (TaskItem it : ae.tasks) {
+                if (it.getType() == Task.JOB_SUBMISSION) {
+                    et = it;
+                }
+            }
+        }
+        
+        if (et != null) {
+            Task t = et.getTask();
+            JobSpecification spec = (JobSpecification) t.getSpecification();
+            List<String> args = spec.getArgumentsAsList();
+            extractJobInfo(e, args);
+            e.writeMapItem("directory", spec.getDirectory());
+        }
+        e.endMap();
+    }
+
+
+    private int getTotalTime(List<TimedValue<ApplicationState>> tl) {
+        if (tl == null || tl.isEmpty()) {
+            return 0;
+        }
+        return (int) (state.getCurrentTime() - tl.get(0).time); 
+    }
+    
+    private int getRunTime(List<TimedValue<ApplicationState>> tl) {
+        if (tl == null || tl.isEmpty()) {
+            return 0;
+        }
+        for (TimedValue<ApplicationState> p : tl) {
+            switch (p.value) {
+                case STAGE_IN:
+                case STAGE_OUT:
+                case ACTIVE:
+                    return (int) (state.getCurrentTime() - p.time);
+            }
+        }
+        return 0;
+    }
+
+    private void extractJobInfo(JSONEncoder e, List<String> args) {
+        String key = null;
+        List<String> l = new ArrayList<String>();
+        for (String arg : args) {
+            if (arg.startsWith("-")) {
+                if (key != null) {
+                    writeJobInfoItem(e, key, l);
+                }
+                key = arg;
+                l.clear();
+            }
+            else {
+                l.add(arg);
+            }
+        }
+    }
+
+    private void writeJobInfoItem(JSONEncoder e, String key, List<String> l) {
+        if (l.size() == 0) {
+            return;
+        }
+        if (key.equals("-e")) {
+            e.writeMapItem("executable", l.get(0));
+        }
+        else if (key.equals("-out")) {
+            e.writeMapItem("stdout", l.get(0));
+        }
+        else if (key.equals("-err")) {
+            e.writeMapItem("stderr", l.get(0));
+        }
+        else if (key.equals("-i")) {
+            e.writeMapItem("stdin", l.get(0));
+        }
+        else if (key.equals("-if")) {
+            e.writeMapItem("stagein", l);
+        }
+        else if (key.equals("-of")) {
+            e.writeMapItem("stageout", l);
+        }
+        else if (key.equals("-a")) {
+            e.writeMapItem("args", l);
+        }
+    }
+
+}

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppListBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppListBuilder.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppListBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,139 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+
+public class AppListBuilder {
+
+    private String name;
+    private int page;
+    private int pageSize;
+    private int stateFilter;
+    private SortedMap<String, List<SortedSet<ApplicationItem>>> byName;
+    private String hostFilter;
+    private BrowserDataBuilder db;
+
+    public AppListBuilder(BrowserDataBuilder db, String name, int page, int pageSize, int state, String host) {
+        this.db = db;
+        this.byName = db.getByName();
+        this.name = name;
+        if (name == null) {
+            this.name = "";
+        }
+        this.page = page;
+        this.pageSize = pageSize;
+        this.stateFilter = state;
+        this.hostFilter = host;
+    }
+
+    public void getData(JSONEncoder e) {
+        SortedSet<ApplicationItem> sorted = getInstances(name, stateFilter, hostFilter);
+        
+        int start = (page - 1) * pageSize;
+        int index = 0;
+        e.beginMap();
+        
+        String title;
+        if (stateFilter == -1) {
+            if (name.isEmpty()) {
+                title =  "All application invocations";
+            }
+            else {
+                title = "Invocations of application \"" + name + "\"";
+            }
+        }
+        else {
+            if (name.isEmpty()) {
+                title = ApplicationState.values()[stateFilter] + " application invocations";
+            }
+            else {
+                title = ApplicationState.values()[stateFilter] + " invocations of application \"" + name + "\"";
+            }
+        }
+        if (hostFilter != null) {
+            title = title + " on site \"" + hostFilter + "\"";
+        }
+        e.writeMapItem("title", title);
+        db.writePagingData(e, sorted.size(), page, pageSize);
+        e.writeMapItem("name", name);
+        e.writeMapItem("state", stateFilter);
+        e.writeMapItem("host", hostFilter);
+        for (ApplicationItem item : sorted) {
+            if (index == start) {
+                e.writeMapKey("data");
+                e.beginArray();
+            }
+            if (index >= start) {
+                ApplicationState state = item.getState();
+                e.beginArrayItem();
+                e.beginMap();
+                e.writeMapItem("id", item.getID());
+                e.writeMapItem("state", state.ordinal());
+                e.writeMapItem("startTime", item.getStartTime());
+                e.writeMapItem("host", item.getHost());
+                if (item.getWorkerId() != null) {
+                    e.writeMapItem("worker", item.getWorkerId());
+                }
+                e.writeMapItem("args", item.getArguments());
+                if (state.isTerminal()) {
+                    e.writeMapItem("runTime", item.getCurrentStateTime() - item.getStartTime());
+                }
+                else {
+                    e.writeMapItem("runTime", 0L);
+                }
+                e.endMap();
+                e.endArrayItem();
+            }
+            if (index > start + pageSize) {
+                e.endArray();
+                e.endMap();
+                return;
+            }
+            index++;
+        }
+        if (sorted.size() > 0) {
+            e.endArray();
+        }
+        e.endMap();
+    }
+
+    private SortedSet<ApplicationItem> getInstances(String name, int stateFilter, String hostFilter) {
+        SortedSet<ApplicationItem> sorted = new TreeSet<ApplicationItem>(BrowserDataBuilder.APP_TIME_COMPARATOR);
+        if (!name.isEmpty()) {
+            List<SortedSet<ApplicationItem>> l = byName.get(name);
+            getInstances(sorted, l, stateFilter, hostFilter);
+        }
+        else {
+            for (List<SortedSet<ApplicationItem>> l : byName.values()) {
+                getInstances(sorted, l, stateFilter, hostFilter);
+            }
+        }
+        return sorted;
+    }
+    
+    private void getInstances(SortedSet<ApplicationItem> sorted, List<SortedSet<ApplicationItem>> l, int stateFilter, String hostFilter) {
+        for (SortedSet<ApplicationItem> ss : l) {
+            for (ApplicationItem app : ss) {
+                boolean stateMatch = stateFilter == -1 || app.getState().ordinal() == stateFilter;
+                boolean hostMatch = hostFilter == null || app.getHost().equals(hostFilter);
+                if (stateMatch && hostMatch) {
+                    sorted.add(app);
+                }
+            }
+        }
+    }
+}

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppsSummaryBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppsSummaryBuilder.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppsSummaryBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,55 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+
+public class AppsSummaryBuilder {
+
+    private final SortedMap<String, List<SortedSet<ApplicationItem>>> byName;
+    private BrowserDataBuilder db;
+
+    public AppsSummaryBuilder(BrowserDataBuilder db) {
+        this.byName = db.getByName();
+        this.db = db;
+    }
+
+    public void getData(JSONEncoder e) {
+        // counts of each state by name
+        e.beginMap();
+          db.writeEnabledStates(e, "enabledStates");
+          e.writeMapKey("apps");
+          e.beginMap();
+          for (Map.Entry<String, List<SortedSet<ApplicationItem>>> en : byName.entrySet()) {
+              e.writeMapKey(en.getKey());
+              e.beginArray();
+                for (ApplicationState s : ApplicationState.values()) {
+                    if (s.isEnabled()) {
+                        e.beginArrayItem();
+                          e.beginArray();
+                            e.writeArrayItem(s.ordinal());
+                            e.writeArrayItem(en.getValue().get(s.ordinal()).size());
+                          e.endArray();
+                        e.endArrayItem();
+                    }
+                }
+              e.endArray();
+          }
+          e.endMap();
+        e.endMap();
+    }
+
+}

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/BrowserDataBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/BrowserDataBuilder.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/BrowserDataBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,391 @@
+//----------------------------------------------------------------------
+    //This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2013
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.SystemStateListener;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
+import org.griphyn.vdl.karajan.monitor.items.TaskItem;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Block;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerProbeItem;
+
+public class BrowserDataBuilder extends StateDataBuilder implements SystemStateListener {
+    
+    public static final Comparator<ApplicationItem> APP_TIME_COMPARATOR = new Comparator<ApplicationItem>() {
+        @Override
+        public int compare(ApplicationItem o1, ApplicationItem o2) {
+            return (int) (o1.getStartTime() - o2.getStartTime());
+        }
+    };
+    
+    public static class TimedValue<T> {
+        public long time;
+        public T value;
+        
+        public TimedValue(long time, T value) {
+            this.time = time;
+            this.value = value;
+        }
+    }
+    
+    public static class AppEntry {
+        public ApplicationItem item;
+        public ApplicationState oldState;
+        public List<TaskItem> tasks;
+        public List<TimedValue<ApplicationState>> stateTimeline;
+    }
+    
+    public static class WorkerData {
+        public String site;
+        public List<WorkerProbeItem.Cpu> cpuLoad;
+        public Map<String, List<WorkerProbeItem.DiskUsage>> diskUsage;
+        public Map<String, List<WorkerProbeItem.IOLoad>> ioLoad;
+        public int activeApps;
+        public int failedApps;
+        public int completedApps;
+        
+        public WorkerData() {
+            cpuLoad = new ArrayList<WorkerProbeItem.Cpu>();
+            diskUsage = new TreeMap<String, List<WorkerProbeItem.DiskUsage>>();
+            ioLoad = new TreeMap<String, List<WorkerProbeItem.IOLoad>>();
+        }
+    }
+    
+    private final SystemState state;
+    private int maxCount;
+    private JSONEncoder e;
+    
+    private SortedMap<String, List<SortedSet<ApplicationItem>>> byName;
+    private Map<String, AppEntry> entries;
+    private List<ApplicationItem> byTime;
+    
+    private SortedMap<String, WorkerData> workerData;
+    private List<String> workersByTime;
+
+    public BrowserDataBuilder(SystemState state) {
+        this.state = state;
+        byName = new TreeMap<String, List<SortedSet<ApplicationItem>>>();
+        byTime = new ArrayList<ApplicationItem>();
+        entries = new HashMap<String, AppEntry>();
+        workerData = new TreeMap<String, WorkerData>();
+        workersByTime = new ArrayList<String>();
+        state.addListener(this);
+    }
+
+    @Override
+    public void itemUpdated(UpdateType updateType, StatefulItem item) {
+        if (item.getItemClass() == StatefulItemClass.APPLICATION) {
+            appUpdated(updateType, (ApplicationItem) item);
+        }
+        else if (item.getItemClass() == StatefulItemClass.TASK) {
+            taskUpdated(updateType, (TaskItem) item);
+        }
+        else if (item instanceof WorkerProbeItem) {
+            addWorkerProbeData((WorkerProbeItem) item);
+        }
+    }
+    
+    private WorkerData getWorkerData(String id) {
+        WorkerData wd = workerData.get(id);
+        if (wd == null) {
+            wd = new WorkerData();
+            workerData.put(id, wd);
+            workersByTime.add(id);
+        }
+        return wd;
+    }
+
+    private void addWorkerProbeData(WorkerProbeItem item) {
+        WorkerData wd = getWorkerData(item.getID());
+        WorkerProbeItem.Data data = item.getData();
+        if (data instanceof WorkerProbeItem.Cpu) {
+            wd.cpuLoad.add((WorkerProbeItem.Cpu) data);
+        }
+        else if (data instanceof WorkerProbeItem.DiskUsage) {
+            WorkerProbeItem.DiskUsage du = (WorkerProbeItem.DiskUsage) data;
+            List<WorkerProbeItem.DiskUsage> l = wd.diskUsage.get(du.getMountPoint());
+            if (l == null) {
+                l = new ArrayList<WorkerProbeItem.DiskUsage>();
+                wd.diskUsage.put(du.getMountPoint(), l);
+            }
+            l.add(du);
+        }
+        else if (data instanceof WorkerProbeItem.IOLoad) {
+            WorkerProbeItem.IOLoad du = (WorkerProbeItem.IOLoad) data;
+            List<WorkerProbeItem.IOLoad> l = wd.ioLoad.get(du.getDevice());
+            if (l == null) {
+                l = new ArrayList<WorkerProbeItem.IOLoad>();
+                wd.ioLoad.put(du.getDevice(), l);
+            }
+            l.add(du);
+        }
+    }
+
+    private void taskUpdated(UpdateType updateType, TaskItem task) {
+        ApplicationItem app = (ApplicationItem) task.getParent();
+        if (app == null) {
+            return;
+        }
+        AppEntry e = entries.get(app.getID());
+        if (e == null) {
+            return;
+        }
+        if (e.tasks == null) {
+            e.tasks = Collections.singletonList(task);
+        }
+        else if (e.tasks.size() == 1) {
+            if (!e.tasks.contains(task)) {
+                List<TaskItem> l = new LinkedList<TaskItem>();
+                l.add(e.tasks.get(0));
+                l.add(task);
+                e.tasks = l;
+            }
+        }
+        else {
+            if (!e.tasks.contains(task)) {
+                e.tasks.add(task);
+            }
+        }
+    }
+
+    private void appUpdated(UpdateType updateType, ApplicationItem app) {
+        if (app.getName() == null) {
+            return;
+        }
+        if (entries.containsKey(app.getID())) {
+            updateApp(app);
+        }
+        else {
+            addApp(app);
+        }
+    }
+
+    private void updateApp(ApplicationItem item) {
+        AppEntry e = entries.get(item.getID());
+        ApplicationState old = e.oldState;
+        ApplicationState state = item.getState();
+        e.oldState = state;
+        String name = item.getName();
+        List<SortedSet<ApplicationItem>> l = getNamed(name);
+        if (old != null) {
+            l.get(old.ordinal()).remove(item);
+        }
+        l.get(state.ordinal()).add(item);
+        List<TimedValue<ApplicationState>> timeline = getTimeline(e);
+        
+        timeline.add(new TimedValue<ApplicationState>(this.state.getCurrentTime(), state));
+        
+        if (item.getWorkerId() != null) {
+            // initialize worker data
+            WorkerData wd = getWorkerData(item.getWorkerId());
+            switch (state) {
+                case ACTIVE:
+                    wd.activeApps++;
+                    break;
+                case FAILED:
+                    wd.failedApps++;
+                    wd.activeApps--;
+                    break;
+                case FINISHED_SUCCESSFULLY:
+                    wd.activeApps--;
+                    wd.completedApps++;
+                    break;
+            }
+            wd.site = item.getHost();
+        }
+    }
+
+    private List<TimedValue<ApplicationState>> getTimeline(AppEntry e) {
+        List<TimedValue<ApplicationState>> tl = e.stateTimeline;
+        if (tl == null) {
+            tl = new ArrayList<TimedValue<ApplicationState>>();
+            e.stateTimeline = tl;
+        }
+        return tl;
+    }
+    
+    List<TimedValue<ApplicationState>> getTimeline(ApplicationItem item) {
+        AppEntry e = entries.get(item.getID());
+        if (e == null) {
+            throw new IllegalArgumentException("Unknown app id: " + item.getID());
+        }
+        return getTimeline(e);
+    }
+
+    private List<SortedSet<ApplicationItem>> getNamed(String name) {
+        List<SortedSet<ApplicationItem>> l = byName.get(name);
+        if (l == null) {
+            l = new ArrayList<SortedSet<ApplicationItem>>();
+            for (ApplicationState s : ApplicationState.values()) {
+                l.add(new TreeSet<ApplicationItem>(APP_TIME_COMPARATOR));
+            }
+            byName.put(name, l);
+        }
+        return l;
+    }
+
+    private void addApp(ApplicationItem item) {
+        byTime.add(item);
+        String name = item.getName();
+        if (name == null) {
+            return;
+        }
+        ApplicationState state = item.getState();
+        AppEntry e = new AppEntry();
+        entries.put(item.getID(), e);
+        e.item = item;
+        e.oldState = state;
+        List<SortedSet<ApplicationItem>> l = getNamed(name);
+        l.get(state.ordinal()).add(item);
+    }
+
+    @Override
+    public ByteBuffer getData(Map<String, String> params) {
+        e = new JSONEncoder();
+        String type = getParam(params, "type", "apps");
+        
+        if (type.equals("apps")) {
+            new AppsSummaryBuilder(this).getData(e);
+        }
+        else if (type.equals("applist")) {
+            new AppListBuilder(this, getParam(params, "name", null), 
+                Integer.parseInt(getParam(params, "page", "1")),
+                Integer.parseInt(getParam(params, "pageSize", "20")), 
+                Integer.parseInt(getParam(params, "state", "-1")), 
+                getParam(params, "host", null)).getData(e);
+        }
+        else if (type.equals("appdetail")) {
+            new AppDetailBuilder(this, getParam(params, "name")).getData(e);
+        }
+        else if (type.equals("appinstance")) {
+            new AppInstanceBuilder(this, getParam(params, "id")).getData(e);
+        }
+        else if (type.equals("sites")) {
+            new SiteInfoBuilder(this).getData(e);
+        }
+        else if (type.equals("workerlist")) {
+            new WorkerListBuilder(this, getParam(params, "site", null), 
+                Integer.parseInt(getParam(params, "page", "1")),
+                Integer.parseInt(getParam(params, "pageSize", "20"))).getData(e);
+        }
+        else if (type.equals("worker")) {
+            new WorkerInfoBuilder(this, getParam(params, "id")).getData(e);
+        }
+        else {
+            throw new IllegalArgumentException("Unknown type: " + type);
+        }
+        return ByteBuffer.wrap(e.toString().getBytes());
+    }
+    
+    public List<List<Integer>> getStateTimes(ApplicationItem app) {
+        List<TimedValue<ApplicationState>> tl = getTimeline(app);
+        List<List<Integer>> l = new ArrayList<List<Integer>>();
+        if (tl != null) {
+            long lastTime = -1;
+            long firstTime = -1;
+            ApplicationState lastState = null;
+            for (TimedValue<ApplicationState> p : tl) {
+                if (lastState != null) {
+                    l.add(new org.griphyn.vdl.karajan.Pair<Integer>(lastState.ordinal(), (int) (p.time - lastTime))); 
+                }
+                lastTime = p.time;
+                lastState = p.value;
+            }
+        }
+        return l;
+    }
+       
+        
+    private String getParam(Map<String, String> params, String name, String _default) {
+        String value = params.get(name);
+        if (value == null) {
+            return _default;
+        }
+        else {
+            return value;
+        }
+    }
+    
+    private String getParam(Map<String, String> params, String name) {
+        String value = params.get(name);
+        if (value == null) {
+            throw new IllegalArgumentException("Missing parameter '" + name + "'");
+        }
+        else {
+            return value;
+        }
+    }
+
+    public SortedMap<String, List<SortedSet<ApplicationItem>>> getByName() {
+        return byName;
+    }
+
+    public Map<String, AppEntry> getEntries() {
+        return entries;
+    }
+
+    public SystemState getState() {
+        return state;
+    }
+
+    public List<ApplicationItem> getByTime() {
+        return byTime;
+    }
+
+    public void writeEnabledStates(JSONEncoder e, String key) {
+        e.writeMapKey(key);
+        e.beginArray();
+        for (ApplicationState s : ApplicationState.values()) {
+            if (s.isEnabled()) {
+                e.beginArrayItem();
+                e.write(s.ordinal());
+            }
+        }
+        e.endArray();
+    }
+
+    public Map<String, WorkerData> getWorkerData() {
+        return workerData;
+    }
+
+    public Block getBlock(String id) {
+        CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC); 
+        return item.getBlocks().get(id);
+    }
+
+    public List<String> getWorkersByTime() {
+        return workersByTime;
+    }
+    
+    public void writePagingData(JSONEncoder e, int size, int page, int pageSize) {
+        int pages = (int) Math.ceil(((double) size) / pageSize);
+        e.writeMapItem("pages", pages);
+        e.writeMapItem("hasPrev", page > 1);
+        e.writeMapItem("hasNext", page < pages);
+        e.writeMapItem("crtPage", page);
+    }
+}
\ No newline at end of file

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/HTTPServer.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/HTTPServer.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/HTTPServer.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -69,6 +69,7 @@
         stateKeys.put("/summary.state", new SummaryDataBuilder(state));
         stateKeys.put("/plotSeriesInfo.state", new PlotInfoBuilder(state));
         stateKeys.put("/plotData.state", new PlotDataBuilder(state));
+        stateKeys.put("/browser.state", new BrowserDataBuilder(state));
     }
 
     public void start() throws IOException {
@@ -181,7 +182,19 @@
                     for (SelectionKey key : keys) { 
                         if (key.isValid()) {
                             ConnectionState s = channels.get(key.channel());
-                            s.process(key);
+                            boolean ok = false;
+                            if (s != null) {
+                                try {
+                                    s.process(key);
+                                    ok = true;
+                                }
+                                catch (Exception e) {
+                                }
+                            }
+                            if (!ok) {
+                                channels.remove(key.channel());
+                                key.cancel();
+                            }
                         }
                     }
                     skeys.clear();

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/JSONEncoder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/JSONEncoder.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/JSONEncoder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -9,6 +9,7 @@
  */
 package org.griphyn.vdl.karajan.monitor.monitors.http;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
@@ -42,11 +43,28 @@
     }
     
     public void write(String value) {
-        sb.append('"');
-        sb.append(value);
-        sb.append('"');
+        if (value == null) {
+            sb.append("null");
+        }
+        else {
+            sb.append('"');
+            escape(sb, value);
+            sb.append('"');
+        }
     }
     
+    private void escape(StringBuilder sb, String value) {
+        for (int i = 0; i < value.length(); i++) {
+            char c = value.charAt(i);
+            switch (c) {
+                case '"':
+                    sb.append('\\');
+                default:
+                    sb.append(c);
+            }
+        }
+    }
+
     public void write(boolean value) {
         sb.append(value);
     }
@@ -73,14 +91,17 @@
         else if (value instanceof Long) {
             write(((Long) value).longValue());
         }
-        else if (value instanceof List) {
-            writeArray((List<?>) value);
+        else if (value instanceof Collection) {
+            writeArray((Collection<?>) value);
         }
         else if (value instanceof Map) {
             @SuppressWarnings("unchecked")
             Map<? extends Object, ? extends Object> m = (Map<? extends Object, ? extends Object>) value;
             writeMap(m);
         }
+        else if (value instanceof int[]) {
+            writeArray((int[]) value);
+        }
         else {
             write(value.toString());
         }
@@ -239,7 +260,7 @@
         }
     }
     
-    public void writeArray(List<?> a) {
+    public void writeArray(Collection<?> a) {
         beginArray();
         for (Object v : a) {
             writeArrayItem(v);

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SiteInfoBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SiteInfoBuilder.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SiteInfoBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,162 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+
+public class SiteInfoBuilder {
+
+    private BrowserDataBuilder db;
+    private List<ApplicationItem> byTime;
+    private List<Integer> enabledStatesMapping;
+
+    public SiteInfoBuilder(BrowserDataBuilder db) {
+        this.db = db;
+        this.byTime = db.getByTime();
+        
+        enabledStatesMapping = new ArrayList<Integer>();
+        for (ApplicationState state : ApplicationState.values()) {
+            if (state.isEnabled()) {
+                enabledStatesMapping.add(state.ordinal());
+            }
+        }
+    }
+    
+    private static class SiteInfo {
+        public String name;
+        public Map<String, List<Integer>> appStates;
+        public Map<String, List<Integer>> appCummulativeStateTimes;
+        public Map<String, Integer> appCountByType;
+        public SortedSet<String> workers;
+        public int appCount;
+        
+        public SiteInfo(String name) {
+            this.name = name;
+            this.appStates = new TreeMap<String, List<Integer>>();
+            this.appCountByType = new TreeMap<String, Integer>();
+            this.appCummulativeStateTimes = new TreeMap<String, List<Integer>>();
+            this.workers = new TreeSet<String>();
+        }
+    }
+   
+
+    public void getData(JSONEncoder e) {
+        /*
+         * List of sites
+         * List of app names it ran with state counts and average times
+         * List of workers
+         */
+        
+        Map<String, SiteInfo> si = new HashMap<String, SiteInfo>();
+        
+        for (ApplicationItem app : byTime) {
+            addApp(getOrCreateSite(si, app.getHost()), app);
+        }
+        
+        e.beginMap();
+        db.writeEnabledStates(e, "enabledStates");
+        e.writeMapKey("sites");
+        e.beginArray();
+            for (SiteInfo i : si.values()) {
+                e.beginArrayItem();
+                e.beginMap();
+                    e.writeMapItem("name", i.name);
+                    e.writeMapItem("appCount", i.appCount);
+                    e.writeMapItem("workers", i.workers);
+                    e.writeMapKey("stateCounts");
+                        e.beginMap();
+                            for (Map.Entry<String, List<Integer>> e1 : i.appStates.entrySet()) {
+                                e.writeMapKey(e1.getKey());
+                                e.beginArray();
+                                    int index = 0;
+                                    for (Integer i2 : e1.getValue()) {
+                                        if (ApplicationState.values()[index++].isEnabled()) {
+                                            e.writeArrayItem(i2.intValue());
+                                        }
+                                    }
+                                e.endArray();
+                            }
+                        e.endMap();
+                    e.writeMapKey("avgStateTimes");
+                        e.beginMap();
+                            for (Map.Entry<String, List<Integer>> e1 : i.appCummulativeStateTimes.entrySet()) {
+                                e.writeMapKey(e1.getKey());
+                                int count = i.appCountByType.get(e1.getKey());
+                                e.beginArray();
+                                    int index = 0;
+                                    for (Integer i2 : e1.getValue()) {
+                                        if (ApplicationState.values()[index++].isEnabled()) {
+                                            e.writeArrayItem(i2.intValue() / count);
+                                        }
+                                    }
+                                e.endArray();
+                            }
+                        e.endMap();
+                e.endMap();
+            }
+        e.endArray();
+        e.endMap();
+    }
+    
+    private void addApp(SiteInfo si, ApplicationItem app) {
+        si.appCount++;
+        if (app.getWorkerId() != null) {
+            si.workers.add(app.getWorkerId());
+        }
+        String name = app.getName();
+        
+        List<Integer> states = si.appStates.get(name);
+        List<Integer> stateTimes = si.appCummulativeStateTimes.get(name);
+        Integer count = si.appCountByType.get(name);
+        if (states == null) {
+            states = new ArrayList<Integer>();
+            states.addAll(Collections.nCopies(ApplicationState.values().length, 0));
+            si.appStates.put(name, states);
+            
+            stateTimes = new ArrayList<Integer>();
+            stateTimes.addAll(Collections.nCopies(ApplicationState.values().length, 0));
+            si.appCummulativeStateTimes.put(name, stateTimes);
+            
+            count = 0;
+        }
+        
+        add(states, app.getState().ordinal(), 1);
+        List<List<Integer>> st = db.getStateTimes(app);
+        for (List<Integer> sti : st) {
+            add(stateTimes, sti.get(0), sti.get(1));
+        }
+        si.appCountByType.put(name, count + 1);
+    }
+
+    private void add(List<Integer> l, int index, int amount) {
+        int crt = l.get(index);
+        l.set(index, crt + amount);
+    }
+
+    private SiteInfo getOrCreateSite(Map<String, SiteInfo> si, String host) {
+        SiteInfo s = si.get(host);
+        if (s == null) {
+            s = new SiteInfo(host);
+            si.put(host, s);
+        }
+        return s;
+    }
+
+}

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SummaryDataBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SummaryDataBuilder.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SummaryDataBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -36,7 +36,7 @@
         e.writeMapItem("completed", state.getCompleted());
         e.writeMapItem("maxHeap", state.getMaxHeap());
         e.writeMapItem("maxHeapFormatted", state.getMaxHeapFormatted());
-        e.writeMapItem("crtHeap", state.getCurrentHeap());
+        e.writeMapItem("crtHeap", state.getUsedHeap());
         e.writeMapItem("crtHeapFormatted", state.getCurrentHeapFormatted());
         e.writeMapItem("timeLeftFormatted", state.getEstimatedTimeLeftFormatted());
         e.writeMapItem("elapsedTimeFormatetd", state.getElapsedTimeFormatted());

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SwiftLogInfo.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SwiftLogInfo.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SwiftLogInfo.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,139 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 1, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+import org.apache.log4j.spi.LoggingEvent;
+import org.globus.cog.util.ArgumentParser;
+import org.globus.cog.util.ArgumentParserException;
+import org.griphyn.vdl.karajan.monitor.MonitorAppender;
+
+public class SwiftLogInfo {
+    private static SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSSZ");
+    
+    public static final int FOLLOW_SLEEP_TIME = 50;
+    
+    private String logFileName;
+    private boolean follow;
+    private double rate;
+    
+    public SwiftLogInfo(String logFileName, boolean follow, double rate) {
+        this.logFileName = logFileName;
+        this.follow = follow;
+        this.rate = rate;
+    }
+    
+    public void run() throws Exception {
+        MonitorAppender ap = new MonitorAppender("bla", "http");
+        BufferedReader br = new BufferedReader(new FileReader(logFileName));
+        
+        long firstLogTime = -1;
+        long firstActualTime = System.currentTimeMillis();
+        
+        if (follow) {
+            System.out.print("Following " + logFileName + ". Hit CTRL+C to end.");
+        }
+        else {
+            System.out.print("Parsing " + logFileName + "...");
+        }
+        String line = null;
+        while (follow || (line = br.readLine()) != null) {
+            if (line == null) {
+                Thread.sleep(FOLLOW_SLEEP_TIME);
+                continue;
+            }
+            String[] els = line.split("\\s+", 5);
+            if (els.length < 5) {
+                continue;
+            }
+            
+            long time;
+            try {
+                time = SDF.parse(els[0] + " " + els[1]).getTime();
+            }
+            catch (ParseException e) {
+                continue;
+            }
+            
+            if (rate != 0) {
+                if (firstLogTime == -1) {
+                    firstLogTime = time;
+                }
+                long now = System.currentTimeMillis();
+                // this event is supposed to happen at this relative time
+                long deadline = (long) ((time - firstLogTime) / rate);
+                long delay = deadline - (now - firstActualTime);
+                System.out.println("deadline: " + deadline + ", now: " + (now - firstActualTime));
+                if (delay >= 0) {
+                    Thread.sleep(delay);
+                }
+            }
+                        
+            LoggingEvent le = new LoggingEvent(els[3], Logger.getLogger(els[3]), time, getLevel(els[2]), els[4], null);
+            ap.doAppend(le);
+        }
+        System.out.println("done");
+    }
+
+    public static void main(String[] args) {
+        ArgumentParser ap = new ArgumentParser();
+        ap.addFlag("f", "Follow: keep parsing the log file as it grows.");
+        ap.addOption("rt", "integer", "Real time: if specified, " +
+        		"generate log events progressively at a rate " +
+        		"proportional to that at which they were generated.", 
+        		ArgumentParser.OPTIONAL);
+        ap.addOption(ArgumentParser.DEFAULT, "logFile", "The log file to parse", 
+            ArgumentParser.NORMAL);
+        ap.addFlag("h", "Display usage information");
+        
+        try {
+            ap.parse(args);
+            if (ap.isPresent("h")) {
+                ap.usage();
+                System.exit(0);
+            }
+            SwiftLogInfo sli = new SwiftLogInfo(ap.getStringValue(ArgumentParser.DEFAULT), 
+                ap.isPresent("f"), ap.getFloatValue("rt", 0));
+            sli.run();
+        }
+        catch (ArgumentParserException e) {
+            System.err.println(e.getMessage());
+            ap.usage();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }        
+    }
+
+    private static Priority getLevel(String s) {
+        if ("WARN".equals(s)) {
+            return Level.WARN;
+        }
+        else if ("ERROR".equals(s)) {
+            return Level.ERROR;
+        }
+        else if ("INFO".equals(s)) {
+            return Level.INFO;
+        }
+        else if ("DEBUG".equals(s)) {
+            return Level.DEBUG;
+        }
+        else {
+            return Level.ALL;
+        }
+    }
+}

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerInfoBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerInfoBuilder.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerInfoBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,116 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 30, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.List;
+import java.util.Map;
+
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.WorkerData;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Block;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Worker;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerProbeItem;
+
+public class WorkerInfoBuilder {
+
+    private BrowserDataBuilder db;
+    private String wid;
+
+    public WorkerInfoBuilder(BrowserDataBuilder db, String wid) {
+        this.db = db;
+        this.wid = wid;
+    }
+
+    public void getData(JSONEncoder e) {
+        /*
+         * worker, node running on, wall time, run time, apps running
+         * probes, all details
+         */
+        e.beginMap();
+        Map<String, WorkerData> wd = db.getWorkerData();
+            
+        WorkerData wdd = wd.get(wid);
+        
+        int index = wid.indexOf(':');
+        String blkId = wid.substring(0, index);
+        String wId = wid.substring(index + 1);
+        
+        Block blk = db.getBlock(blkId);
+        Worker w = blk.getWorker(wId);
+        
+        e.writeMapItem("id", wid);
+        e.writeMapItem("node", w.node);
+        e.writeMapItem("cores", w.cores);
+        e.writeMapItem("startTime", blk.startTime);
+        e.writeMapItem("walltime", blk.walltime);
+        e.writeMapItem("activeApps", wdd.activeApps);
+        e.writeMapItem("failedApps", wdd.failedApps);
+        e.writeMapItem("completedApps", wdd.completedApps);
+     
+        e.writeMapKey("cpuLoad");
+        e.beginArray();
+        for (WorkerProbeItem.Cpu cpu : wdd.cpuLoad) {
+            e.beginArrayItem();
+            e.beginMap();
+            e.writeMapItem("t", cpu.getTime());
+            e.writeMapItem("load", cpu.getLoad());
+            e.endMap();
+        }
+        e.endArray();
+        
+        e.writeMapKey("diskUsage");
+        e.beginArray();
+        int ix = 0;
+        for (Map.Entry<String, List<WorkerProbeItem.DiskUsage>> e1 : wdd.diskUsage.entrySet()) {
+            e.beginArrayItem();
+            e.beginMap();
+            e.writeMapItem("index", ix++);
+            e.writeMapItem("mount", e1.getKey());
+            e.writeMapKey("data");
+            e.beginArray();
+            for (WorkerProbeItem.DiskUsage du : e1.getValue()) {
+                e.beginArrayItem();
+                e.beginMap();
+                e.writeMapItem("t", du.getTime());
+                e.writeMapItem("avail", du.getAvailable());
+                e.writeMapItem("used", du.getUsed());
+                e.endMap();
+            }
+            e.endArray();
+            e.endMap();
+        }
+        e.endArray();
+        
+        e.writeMapKey("ioLoad");
+        e.beginArray();
+        ix = 0;
+        for (Map.Entry<String, List<WorkerProbeItem.IOLoad>> e1 : wdd.ioLoad.entrySet()) {
+            e.beginArrayItem();
+            e.beginMap();
+            e.writeMapItem("index", ix++);
+            e.writeMapItem("device", e1.getKey());
+            e.writeMapKey("data");
+            e.beginArray();
+            for (WorkerProbeItem.IOLoad du : e1.getValue()) {
+                e.beginArrayItem();
+                e.beginMap();
+                e.writeMapItem("t", du.getTime());
+                e.writeMapItem("rt", du.getReadThroughput());
+                e.writeMapItem("wt", du.getWriteThroughput());
+                e.writeMapItem("load", du.getLoad());
+                e.endMap();
+            }
+            e.endArray();
+            e.endMap();
+        }
+        e.endArray();
+                        
+        e.endMap();
+    }
+}

Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerListBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerListBuilder.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerListBuilder.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,106 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 30, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.WorkerData;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Block;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Worker;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerProbeItem;
+
+public class WorkerListBuilder {
+
+    private BrowserDataBuilder db;
+    private int page;
+    private int pageSize;
+    private String site;
+
+    public WorkerListBuilder(BrowserDataBuilder db, String site, int page, int pageSize) {
+        this.db = db;
+        this.site = site;
+        this.page = page;
+        this.pageSize = pageSize;
+    }
+
+    public void getData(JSONEncoder e) {
+        /*
+         * worker, node running on, wall time, run time, #apps running
+         * probes
+         */
+        Map<String, WorkerData> wd = db.getWorkerData();
+        List<String> filtered = new ArrayList<String>();
+        for (String wid : db.getWorkersByTime()) {
+            WorkerData wdd = wd.get(wid);
+            if (site == null || site.equals(wdd.site)) {
+                filtered.add(wid);
+            }
+        }
+        int start = (page - 1) * pageSize;
+        int i = -1;
+        e.beginMap();
+        db.writePagingData(e, filtered.size(), page, pageSize);
+        e.writeMapKey("data");
+        e.beginArray();
+        for (String wid : filtered) {
+            i++;
+            if (i < start) {
+                continue;
+            }
+            if (i > start + pageSize) {
+                break;
+            }
+            WorkerData wdd = wd.get(wid);
+            
+            e.beginArrayItem();
+            e.beginMap();
+            
+            e.writeMapItem("id", wid);
+            int index = wid.indexOf(':');
+            String blkId = wid.substring(0, index);
+            String wId = wid.substring(index + 1);
+            
+            Block blk = db.getBlock(blkId);
+            Worker w = blk.getWorker(wId);
+            
+            e.writeMapItem("node", w.node);
+            e.writeMapItem("cores", w.cores);
+            e.writeMapItem("startTime", blk.startTime);
+            e.writeMapItem("walltime", blk.walltime);
+            e.writeMapItem("activeApps", wdd.activeApps);
+            e.writeMapItem("failedApps", wdd.failedApps);
+            e.writeMapItem("completedApps", wdd.completedApps);
+            
+            e.writeMapKey("cpuLoad");
+            e.beginArray();
+            for (WorkerProbeItem.Cpu cpu : wdd.cpuLoad) {
+                e.beginArrayItem();
+                e.beginMap();
+                e.writeMapItem("t", cpu.getTime());
+                e.writeMapItem("l", cpu.getLoad());
+                e.endMap();
+            }
+            e.endArray();               
+            e.endMap();
+        }
+        e.endArray();
+        e.endMap();
+    }
+
+    private <T> T getLast(List<T> l) {
+        if (l == null || l.isEmpty()) {
+            return null;
+        }
+        return l.get(l.size() - 1);
+    }
+
+}

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/AbstractMessageProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/AbstractMessageProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/AbstractMessageProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -17,7 +17,8 @@
     
     @Override
     public String getSupportedSourceName() {
-        return getSupportedSource().getName();
+        String name = getSupportedSource().getName();
+        return name.substring(name.lastIndexOf('.') + 1);
     }
 
     @Override

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/SimpleParser.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/SimpleParser.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/SimpleParser.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -49,6 +49,25 @@
 		crt = index + tok.length();
 	}
 	
+	public void markMatchedTo(char m, char pair) throws ParsingException {
+	    int level = 1;
+	    for (int i = crt; i < str.length(); i++) {
+	        char c = str.charAt(i);
+	        if (c == m) {
+	            level--;
+	            if (level == 0) {
+	                tokEnd = i;
+	                return;
+	            }
+	        }
+	        if (c == pair) {
+	            level++;
+	        }
+	    }
+        throw new ParsingException("Could not find \"" + m + "\" in \"" + remaining()
+                    + "\". String is \"" + str + "\".");
+    }
+	
 	public void skipTo(String tok) throws ParsingException {
         int index = str.indexOf(tok, crt);
         if (index == -1) {

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/BlockRequestedProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/BlockRequestedProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/BlockRequestedProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -41,9 +41,11 @@
             p.beginToken();
             p.markTo(",");
             int coresPerWorker = Integer.parseInt(p.getToken());
+            p.skip("walltime=");
+            int walltime = Integer.parseInt(p.remaining());
             
             CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
-            item.newBlock(blockId, cores, coresPerWorker);
+            item.newBlock(blockId, cores, coresPerWorker, walltime);
         }
         catch (Exception e) {
             e.printStackTrace();

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/CoasterStatusItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/CoasterStatusItem.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/CoasterStatusItem.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -33,8 +33,8 @@
         return StatefulItemClass.MISC;
     }
     
-    public synchronized void newBlock(String id, int cores, int coresPerWorker) {
-        Block b = new Block(id, cores, coresPerWorker);
+    public synchronized void newBlock(String id, int cores, int coresPerWorker, int walltime) {
+        Block b = new Block(id, cores, coresPerWorker, walltime);
         blocks.put(id, b);
         queuedBlocks++;
         requestedCores += cores;
@@ -69,25 +69,31 @@
         doneBlocks++;
     }
     
-    public synchronized void workerActive(String blockId) {
+    public synchronized void workerActive(String blockId, String workerId, String node, int cores, long now) {
         Block b = getBlock(blockId);
-        b.activeCores += b.coresPerWorker;
-        activeCores += b.coresPerWorker;
-        requestedCores -= b.coresPerWorker;
+        activeCores += cores;
+        requestedCores -= cores;
+        b.addWorker(workerId, node, cores, now);
     }
     
-    public synchronized void workerLost(String blockId) {
+    public synchronized void workerLost(String blockId, String workerId) {
         Block b = getBlock(blockId);
-        b.activeCores -= b.coresPerWorker;
-        activeCores -= b.coresPerWorker;
-        failedCores += b.coresPerWorker;
+        Worker w = b.getWorker(workerId);
+        int cores = w.cores;
+        activeCores -= cores;
+        failedCores += cores;
+        b.activeCores -= cores;
+        w.state = WorkerState.FAILED;
     }
 
-    public synchronized void workerShutDown(String blockId) {
+    public synchronized void workerShutDown(String blockId, String workerId) {
         Block b = getBlock(blockId);
-        b.activeCores -= b.coresPerWorker;
-        activeCores -= b.coresPerWorker;
-        doneCores += b.coresPerWorker;
+        Worker w = b.getWorker(workerId);
+        int cores = w.cores;
+        activeCores -= cores;
+        failedCores += cores;
+        b.activeCores -= cores;
+        w.state = WorkerState.FAILED;
     }
     
     private Block getBlock(String blockId) {
@@ -148,17 +154,53 @@
         QUEUED, ACTIVE, FAILED, DONE
     }
     
+    public enum WorkerState {
+        ACTIVE, FAILED, DONE
+    }
+    
+    public static class Worker {
+        public String node;
+        public int cores;
+        public WorkerState state;
+        
+        public Worker(String node, int cores) {
+            this.node = node;
+            this.cores = cores;
+        }
+    }
+    
     public static class Block {
         public BlockState state;
         public final String id;
         public final int cores, coresPerWorker;
         public int activeCores;
+        public int walltime;
+        public long startTime;
+        public Map<String, Worker> workers;
         
-        public Block(String id, int cores, int coresPerWorker) {
+        public Block(String id, int cores, int coresPerWorker, int walltime) {
             this.id = id;
             this.cores = cores;
             this.coresPerWorker = coresPerWorker;
             this.state = BlockState.QUEUED;
+            this.walltime = walltime;
         }
+
+        public Worker getWorker(String workerId) {
+            return workers.get(workerId);
+        }
+
+        public void addWorker(String workerId, String node, int cores, long now) {
+            if (workers == null) {
+                workers = new HashMap<String, Worker>();
+            }
+            Worker w = new Worker(node, cores);
+            w.state = WorkerState.ACTIVE;
+            workers.put(workerId, w);
+            activeCores += cores;
+            if (startTime == 0) {
+                startTime = now;
+            }
+        }
     }
 }

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/RemoteLogProcessorDispatcher.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/RemoteLogProcessorDispatcher.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/RemoteLogProcessorDispatcher.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -20,7 +20,7 @@
 
     @Override
     public String getSupportedSourceName() {
-        return RemoteLogHandler.class.getName();
+        return RemoteLogHandler.class.getSimpleName();
     }
 
     @Override

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerActiveProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerActiveProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerActiveProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -31,9 +31,15 @@
         try {
             p.skip("blockid=");
             String blockId = p.word();
+            p.skip("id=");
+            String workerId = p.word();
+            p.skip("node=");
+            String node = p.word();
+            p.skip("cores=");
+            int cores = Integer.parseInt(p.word());
             
             CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
-            item.workerActive(blockId);
+            item.workerActive(blockId, workerId, node, cores, state.getCurrentTime());
         }
         catch (Exception e) {
             e.printStackTrace();

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerLostProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerLostProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerLostProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -31,9 +31,11 @@
         try {
             p.skip("blockid=");
             String blockId = p.word();
+            p.skip("id=");
+            String workerId = p.word();
             
             CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
-            item.workerLost(blockId);
+            item.workerLost(blockId, workerId);
         }
         catch (Exception e) {
             e.printStackTrace();

Added: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeItem.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeItem.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,151 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Aug 7, 2013
+ */
+package org.griphyn.vdl.karajan.monitor.processors.coasters;
+
+import java.util.Collection;
+
+import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
+
+public class WorkerProbeItem implements StatefulItem {
+    private String workerid;
+    private final Data data;
+    
+    public WorkerProbeItem(String workerid, Data data) {
+        this.workerid = workerid;
+        this.data = data;
+    }
+    
+    public Data getData() {
+        return data;
+    }
+
+
+
+    public static class Data {
+        private final long time;
+        
+        public Data(long time) {
+            this.time = time;
+        }
+
+        public long getTime() {
+            return time;
+        }
+    }
+
+    public static class Cpu extends Data {
+        private final double load;
+        
+        public Cpu(long time, double load) {
+            super(time);
+            this.load = load;
+        }
+
+        public double getLoad() {
+            return load;
+        }
+    }
+    
+    public static class DiskUsage extends Data {
+        private final String mountPoint, fs;
+        private final long used, available;
+        
+        public DiskUsage(long time, String mountPoint, String fs, long used, long available) {
+            super(time);
+            this.mountPoint = mountPoint;
+            this.fs = fs;
+            this.used = used;
+            this.available = available;
+        }
+        
+        public String getMountPoint() {
+            return mountPoint;
+        }
+
+        public String getFs() {
+            return fs;
+        }
+
+        public long getUsed() {
+            return used;
+        }
+
+        public long getAvailable() {
+            return available;
+        }
+    }
+    
+    public static class IOLoad extends Data {
+        private final String device;
+        private final int writeThroughput, readThroughput;
+        private final double load;
+        
+        public IOLoad(long time, String device, int writeThroughput, int readThroughput, double load) {
+            super(time);
+            this.device = device;
+            this.writeThroughput = writeThroughput;
+            this.readThroughput = readThroughput;
+            this.load = load;
+        }
+        
+        public String getDevice() {
+            return device;
+        }
+
+        public long getWriteThroughput() {
+            return writeThroughput;
+        }
+
+        public long getReadThroughput() {
+            return readThroughput;
+        }
+
+        public double getLoad() {
+            return load;
+        }
+    }
+
+    @Override
+    public StatefulItem getParent() {
+        return null;
+    }
+
+    @Override
+    public void setParent(StatefulItem parent) {
+    }
+
+    @Override
+    public void addChild(StatefulItem child) {
+    }
+
+    @Override
+    public void removeChild(StatefulItem child) {
+    }
+
+    @Override
+    public Collection<StatefulItem> getChildren() {
+        return null;
+    }
+
+    @Override
+    public StatefulItemClass getItemClass() {
+        return StatefulItemClass.MISC;
+    }
+
+    @Override
+    public String getID() {
+        return workerid;
+    }
+
+    @Override
+    public void addListener(Listener l) {
+    }
+}

Added: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeProcessor.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,76 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Aug 7, 2013
+ */
+package org.griphyn.vdl.karajan.monitor.processors.coasters;
+
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
+import org.griphyn.vdl.karajan.monitor.processors.SimpleParser;
+
+public class WorkerProbeProcessor extends AbstractRemoteLogProcessor {
+    private CoasterStatusItem item;
+    
+    @Override
+    public void initialize(SystemState state) {
+        super.initialize(state);
+    }
+
+    @Override
+    public String getMessageHeader() {
+        return "PROBE";
+    }
+
+    @Override
+    public void processMessage(SystemState state, SimpleParser p, Object details) {
+        try {
+            p.skip("type=");
+            String type = p.word();
+            p.skip("workerid=");
+            String workerid = p.word();
+            p.skip("time=");
+            long time = (long) (1000 * Double.parseDouble(p.word()));
+            
+            StatefulItem item = null;
+            
+            if (type.equals("CPU")) {
+                p.skip("load=");
+                double load = Double.parseDouble(p.word());
+                item = new WorkerProbeItem(workerid, new WorkerProbeItem.Cpu(time, load));
+            }
+            else if (type.equals("DF")) {
+                p.skip("mount=");
+                String mountPoint = p.word();
+                p.skip("fs=");
+                String fs = p.word();
+                p.skip("used=");
+                long usedBytes = 1024 * Long.parseLong(p.word());
+                p.skip("avail=");
+                long availBytes = 1024 * Long.parseLong(p.word());
+                item = new WorkerProbeItem(workerid, new WorkerProbeItem.DiskUsage(time, mountPoint, fs, usedBytes, availBytes));
+            }
+            else if (type.equals("DL")) {
+                p.skip("dev=");
+                String device = p.word();
+                p.skip("wtr=");
+                int wtr = (int) Double.parseDouble(p.word());
+                p.skip("rtr=");
+                int rtr = (int) Double.parseDouble(p.word());
+                p.skip("load=");
+                double load = Double.parseDouble(p.word());
+                item = new WorkerProbeItem(workerid, new WorkerProbeItem.IOLoad(time, device, wtr, rtr, load));
+            }
+            if (item != null) {
+                state.itemUpdated(item);
+            }
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerShutDownProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerShutDownProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerShutDownProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -31,9 +31,11 @@
         try {
             p.skip("blockid=");
             String blockId = p.word();
+            p.skip("id=");
+            String workerId = p.word();
             
             CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
-            item.workerShutDown(blockId);
+            item.workerShutDown(blockId, workerId);
         }
         catch (Exception e) {
             e.printStackTrace();

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/karajan/TaskProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/karajan/TaskProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/karajan/TaskProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -21,6 +21,9 @@
 package org.griphyn.vdl.karajan.monitor.processors.karajan;
 
 import org.apache.log4j.Level;
+import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
 import org.globus.cog.abstraction.interfaces.Status;
 import org.globus.cog.abstraction.interfaces.Task;
 import org.griphyn.vdl.karajan.monitor.SystemState;
@@ -100,6 +103,42 @@
                         }
                     }
                 }
+                else if (p.matchAndSkip("JOB_TASK ")) {
+                    p.skip("jobid=");
+                    String jobid = p.word();
+                    p.skip("taskid=");
+                    id = p.word();
+                    p.skip("exec=");
+                    String exec = p.word();
+                    p.skip("dir=");
+                    String dir = p.word();
+                    p.skip("args=");
+                    String args = p.remaining();
+                    
+                    ti = new TaskItem(id, Task.JOB_SUBMISSION);
+                    Task t = new TaskImpl();
+                    t.setType(Task.JOB_SUBMISSION);
+                    JobSpecification spec = new JobSpecificationImpl();
+                    spec.setExecutable(exec);
+                    spec.setArguments(args);
+                    spec.setDirectory(dir);
+                    t.setSpecification(spec);
+                    ti.setTask(t);
+                    updateParent(state, id, ti);
+                    state.addItem(ti);
+                }
+                else if (p.matchAndSkip("TASK_STATUS_CHANGE ")) {
+                    p.skip("taskid=");
+                    id = p.word();
+                    p.skip("status=");
+                    taskState = Integer.parseInt(p.word());
+                    ti = (TaskItem) state.getItemByID(id, StatefulItemClass.TASK);
+                    if (p.matchAndSkip("workerid=")) {
+                        ti.setWorkerId(p.word());
+                    }
+                    ti.setStatus(taskState);
+                    state.itemUpdated(ti);
+                }
                 else if (p.matchAndSkip("Task(")) {
                     p.skip("type=");
                     p.beginToken();
@@ -117,6 +156,7 @@
                     }
                     else {
                         ti = new TaskItem(id, taskType);
+                        updateParent(state, id, ti);
                         state.addItem(ti);
                     }
                 }
@@ -125,6 +165,23 @@
                 e.printStackTrace();
             }
         }
+        if (ti != null && ti.getParent() != null && taskState != 0) {
+            ApplicationItem app = (ApplicationItem) ti.getParent();
+            switch (taskState) {
+                case Status.SUBMITTING:
+                case Status.SUBMITTED:
+                case Status.ACTIVE:
+                case Status.STAGE_IN:
+                case Status.STAGE_OUT:
+                    app.setState(getAppStateFromTaskState(taskState), state.getCurrentTime());
+                    app.setWorkerId(ti.getWorkerId());
+                    state.itemUpdated(app);
+                    break;
+            }
+        }
+    }
+
+    private void updateParent(SystemState state, String id, TaskItem ti) {
         if (ti != null && id != null && ti.getParent() == null) {
             int bi = id.indexOf(':');
             int li = id.lastIndexOf('-');
@@ -138,20 +195,22 @@
                 bridge.addChild(ti);
             }
         }
-        if (ti != null && ti.getParent() != null && taskState != 0) {
-            ApplicationItem app = (ApplicationItem) ti.getParent();
-            if (taskState == Status.SUBMITTING) {
-                app.setState(ApplicationState.SUBMITTING, state.getCurrentTime());
-                state.itemUpdated(app);
-            }
-            else if (taskState == Status.SUBMITTED) {
-                app.setState(ApplicationState.SUBMITTED, state.getCurrentTime());
-                state.itemUpdated(app);
-            }
-            else if (taskState == Status.ACTIVE) {
-                app.setState(ApplicationState.ACTIVE, state.getCurrentTime());
-                state.itemUpdated(app);
-            }
+    }
+
+    private ApplicationState getAppStateFromTaskState(int taskState) {
+        switch (taskState) {
+            case Status.SUBMITTING:
+                return ApplicationState.SUBMITTING;
+            case Status.SUBMITTED:
+                return ApplicationState.SUBMITTED;
+            case Status.ACTIVE:
+                return ApplicationState.ACTIVE;
+            case Status.STAGE_IN:
+                return ApplicationState.STAGE_IN;
+            case Status.STAGE_OUT:
+                return ApplicationState.STAGE_OUT;
+            default:
+                return ApplicationState.INITIALIZING;
         }
     }
 

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppEndProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppEndProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppEndProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -22,6 +22,8 @@
 
 import org.apache.log4j.Level;
 import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
 import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
 import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
 import org.griphyn.vdl.karajan.monitor.processors.SimpleParser;
@@ -29,21 +31,23 @@
 public class AppEndProcessor extends AbstractSwiftProcessor {
 
     public Level getSupportedLevel() {
-        return Level.DEBUG;
+        return Level.INFO;
     }
 
     @Override
     public String getMessageHeader() {
-        return "JOB_END";
+        return "END_SUCCESS";
     }
 
     public void processMessage(SystemState state, SimpleParser p, Object details) {
         try {
-            p.skip("jobid=");
-            String id = p.word();
+            p.skip("thread=");
+            String threadid = p.word();
 
-            StatefulItem app = state.getItemByID(id,
-                StatefulItemClass.APPLICATION);
+            StatefulItem thread = state.getItemByID(threadid, StatefulItemClass.BRIDGE);
+            ApplicationItem app = (ApplicationItem) thread.getParent();
+            app.setState(ApplicationState.FINISHED_SUCCESSFULLY, state.getCurrentTime());
+            state.itemUpdated(app);
             state.removeItem(app);
             state.getStats("apps").remove();
         }

Added: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppFailureProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppFailureProcessor.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppFailureProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012 University of Chicago
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Created on Jan 29, 2007
+ */
+package org.griphyn.vdl.karajan.monitor.processors.swift;
+
+import org.apache.log4j.Level;
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
+import org.griphyn.vdl.karajan.monitor.processors.SimpleParser;
+
+public class AppFailureProcessor extends AbstractSwiftProcessor {
+
+    public Level getSupportedLevel() {
+        return Level.DEBUG;
+    }
+
+    @Override
+    public String getMessageHeader() {
+        return "END_FAILURE";
+    }
+
+    public void processMessage(SystemState state, SimpleParser p, Object details) {
+        try {
+            p.skip("thread=");
+            String threadid = p.word();
+
+            StatefulItem thread = state.getItemByID(threadid, StatefulItemClass.BRIDGE);
+            ApplicationItem app = (ApplicationItem) thread.getParent();
+            app.setState(ApplicationState.FAILED, state.getCurrentTime());
+            state.itemUpdated(app);
+            state.removeItem(app);
+            state.getStats("apps").remove();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppStartProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppStartProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppStartProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -48,7 +48,7 @@
             String args = "";
             if (p.matchAndSkip("arguments=[")) {
                 p.beginToken();
-                p.markTo("]");
+                p.markMatchedTo(']', '[');
                 args = p.getToken();
             }
             p.skip("host=");

Added: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/ConfigurationProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/ConfigurationProcessor.java	                        (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/ConfigurationProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2012 University of Chicago
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Created on Aug 28, 2008
+ */
+package org.griphyn.vdl.karajan.monitor.processors.swift;
+
+import org.apache.log4j.Level;
+import org.griphyn.vdl.karajan.Loader;
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.processors.AbstractMessageProcessor;
+import org.griphyn.vdl.karajan.monitor.processors.ParsingException;
+import org.griphyn.vdl.karajan.monitor.processors.SimpleParser;
+
+public class ConfigurationProcessor extends AbstractMessageProcessor {
+    
+	public Level getSupportedLevel() {
+		return Level.DEBUG;
+	}
+
+	public Class<?> getSupportedSource() {
+		return Loader.class;
+	}
+
+	public void processMessage(SystemState state, Object message, Object details) {
+		String msg = String.valueOf(message);
+		SimpleParser p = new SimpleParser(msg);
+		try {
+    		if (p.matchAndSkip("SWIFT_CONFIGURATION")) {
+    		    p.skipTo(": {");
+    		    p.beginToken();
+    		    p.markTo("}");
+    		    String options = p.getToken();
+    		    String[] els = options.split(",\\s*");
+    		    for (String el : els) {
+    		        String[] kv = el.split("=", 2);
+    		        if (kv[0].equals("execution.retries")) {
+    		            state.setRetries(Integer.parseInt(kv[1]));
+    		        }
+    		        else if (kv[0].equals("replication.enabled")) {
+    		            state.setReplicationEnabled(Boolean.parseBoolean(kv[1]));
+    		        }
+    		    }
+    		}
+		}
+		catch (ParsingException e) {
+		    e.printStackTrace();
+		}
+	}	
+}

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/JobProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/JobProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/JobProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -20,10 +20,15 @@
  */
 package org.griphyn.vdl.karajan.monitor.processors.swift;
 
+import org.apache.log4j.Level;
 import org.griphyn.vdl.karajan.lib.Execute;
 import org.griphyn.vdl.karajan.monitor.processors.karajan.TaskProcessor;
 
 public class JobProcessor extends TaskProcessor {
+    
+    public Level getSupportedLevel() {
+        return Level.INFO;
+    }
 
 	public Class<?> getSupportedSource() {
 		return Execute.class;

Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/SummaryProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/SummaryProcessor.java	2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/SummaryProcessor.java	2014-07-04 06:54:51 UTC (rev 7965)
@@ -41,8 +41,30 @@
 	public void processMessage(SystemState state, Object message, Object details) {
 		String msg = String.valueOf(message);
 		if(msg.contains("CrtHeap")) {
-		   return;
+		    processJVMInfo(state, msg);
 		}
+		else {
+		    processTaskInfo(state, msg);
+		}
+	}
+	
+	private void processJVMInfo(SystemState state, String msg) {
+	    String[] els = msg.split(",\\s");
+	    for (String el : els) {
+	        String[] kv = el.split(": ");
+	        if ("HeapMax".equals(kv[0])) {
+	            state.setMaxHeap(Long.parseLong(kv[1]));
+	        }
+	        else if ("UsedHeap".equals(kv[0])) {
+	            state.setUsedHeap(Long.parseLong(kv[1]));
+	        }
+	        else if ("JVMThreads".equals(kv[0])) {
+	            state.setCurrentThreads(Integer.parseInt(kv[1]));
+	        }
+	    }
+	}
+	
+	private void processTaskInfo(SystemState state, String msg) {
 		SummaryItem s = (SummaryItem) state.getItemByID(SummaryItem.ID, StatefulItemClass.WORKFLOW);
 		String[] pairs = msg.split("  ");
 		for (ApplicationState key : SummaryItem.STATES) {




More information about the Swift-commit mailing list