[Swift-commit] r7965 - in trunk/src/org/griphyn/vdl/karajan/monitor: . common items monitors/http processors processors/coasters processors/karajan processors/swift
hategan at ci.uchicago.edu
hategan at ci.uchicago.edu
Fri Jul 4 01:54:52 CDT 2014
Author: hategan
Date: 2014-07-04 01:54:51 -0500 (Fri, 04 Jul 2014)
New Revision: 7965
Added:
trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractListenableStatefulItem.java
trunk/src/org/griphyn/vdl/karajan/monitor/items/ChainedListener.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppDetailBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppInstanceBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppListBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppsSummaryBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/BrowserDataBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SiteInfoBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SwiftLogInfo.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerInfoBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerListBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeItem.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppFailureProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/ConfigurationProcessor.java
Modified:
trunk/src/org/griphyn/vdl/karajan/monitor/MonitorAppender.java
trunk/src/org/griphyn/vdl/karajan/monitor/StateUpdater.java
trunk/src/org/griphyn/vdl/karajan/monitor/SystemState.java
trunk/src/org/griphyn/vdl/karajan/monitor/common/DataSampler.java
trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractStatefulItem.java
trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationItem.java
trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationState.java
trunk/src/org/griphyn/vdl/karajan/monitor/items/StatefulItem.java
trunk/src/org/griphyn/vdl/karajan/monitor/items/SummaryItem.java
trunk/src/org/griphyn/vdl/karajan/monitor/items/TaskItem.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/HTTPServer.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/JSONEncoder.java
trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SummaryDataBuilder.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/AbstractMessageProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/SimpleParser.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/BlockRequestedProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/CoasterStatusItem.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/RemoteLogProcessorDispatcher.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerActiveProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerLostProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerShutDownProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/karajan/TaskProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppEndProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppStartProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/JobProcessor.java
trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/SummaryProcessor.java
Log:
monitor updates
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/MonitorAppender.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/MonitorAppender.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/MonitorAppender.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -40,15 +40,18 @@
import org.griphyn.vdl.karajan.monitor.processors.coasters.BlockFailedProcessor;
import org.griphyn.vdl.karajan.monitor.processors.coasters.BlockRequestedProcessor;
import org.griphyn.vdl.karajan.monitor.processors.coasters.RemoteLogProcessorDispatcher;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerActiveProcessor;
import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerLostProcessor;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerProbeProcessor;
import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerShutDownProcessor;
-import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerActiveProcessor;
import org.griphyn.vdl.karajan.monitor.processors.karajan.ExecutionContextProcessor;
import org.griphyn.vdl.karajan.monitor.processors.karajan.SchedulerInfoProcessor;
import org.griphyn.vdl.karajan.monitor.processors.karajan.TaskProcessor;
import org.griphyn.vdl.karajan.monitor.processors.swift.AppEndProcessor;
+import org.griphyn.vdl.karajan.monitor.processors.swift.AppFailureProcessor;
import org.griphyn.vdl.karajan.monitor.processors.swift.AppStartProcessor;
import org.griphyn.vdl.karajan.monitor.processors.swift.AppThreadProcessor;
+import org.griphyn.vdl.karajan.monitor.processors.swift.ConfigurationProcessor;
import org.griphyn.vdl.karajan.monitor.processors.swift.ForeachItEndProcessor;
import org.griphyn.vdl.karajan.monitor.processors.swift.ForeachItStartProcessor;
import org.griphyn.vdl.karajan.monitor.processors.swift.JobProcessor;
@@ -92,11 +95,13 @@
updater.addProcessor(new JobProcessor());
updater.addProcessor(new SchedulerInfoProcessor());
updater.addProcessor(new ExecutionContextProcessor());
+ updater.addProcessor(new ConfigurationProcessor());
addFilteredProcessors(updater, SwiftProcessorDispatcher.class,
new AppStartProcessor(),
new AppEndProcessor(),
new AppThreadProcessor(),
+ new AppFailureProcessor(),
new ProcedureStartProcessor(),
new ProcedureEndProcessor(),
new ForeachItStartProcessor(),
@@ -110,7 +115,8 @@
new BlockFailedProcessor(),
new WorkerActiveProcessor(),
new WorkerLostProcessor(),
- new WorkerShutDownProcessor());
+ new WorkerShutDownProcessor(),
+ new WorkerProbeProcessor());
}
private void addFilteredProcessors(StateUpdater updater,
@@ -148,6 +154,7 @@
public void doAppend(LoggingEvent event) {
try {
+ state.setCurrentTime(event.getTimeStamp());
updater.logEvent(event.getLevel(), event.getLogger().getName(),
event.getMessage(), event.getThrowableInformation());
}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/StateUpdater.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/StateUpdater.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/StateUpdater.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -64,7 +64,6 @@
}
public void logEvent(Object category, String source, Object message, Object details) {
- state.setCurrentTime(System.currentTimeMillis());
Map<String, List<LogMessageProcessor>> sources = levels.get(category);
if (sources == null) {
return;
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/SystemState.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/SystemState.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/SystemState.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -24,11 +24,12 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.SortedSet;
import java.util.TimerTask;
+import java.util.TreeSet;
import k.rt.Stack;
-import org.griphyn.vdl.karajan.monitor.common.GlobalTimer;
import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
import org.griphyn.vdl.karajan.monitor.items.SummaryItem;
@@ -39,11 +40,13 @@
private Map<StatefulItemClass, StatefulItemClassSet<? extends StatefulItem>> classes;
private Set<SystemStateListener> listeners;
private Map<String, Stats> stats;
- private int total, completed, completedPreviously;
- private long start, currentTime;
+ private int total, completed, completedPreviously, currentThreads, retries;
+ private long start, currentTime, usedHeap, maxHeap;
private Stack stack;
private String projectName;
private final Runtime runtime;
+ private SortedSet<TimerTaskEntry> tasks;
+ private boolean replicationEnabled, resumed;
private static final Unit BYTES = new Unit.P2("B");
@@ -54,7 +57,8 @@
stats = new HashMap<String, Stats>();
runtime = Runtime.getRuntime();
addItem(new SummaryItem());
- GlobalTimer.getTimer().schedule(new TimerTask() {
+ tasks = new TreeSet<TimerTaskEntry>();
+ schedule(new TimerTask() {
public void run() {
update();
}
@@ -177,7 +181,7 @@
}
public String getCurrentHeapFormatted() {
- return BYTES.format(getCurrentHeap());
+ return BYTES.format(getUsedHeap());
}
public String getElapsedTimeFormatted() {
@@ -248,18 +252,117 @@
public long getMaxHeap() {
- return runtime.maxMemory();
+ return maxHeap;
}
- public long getCurrentHeap() {
- return runtime.totalMemory() - runtime.freeMemory();
+ public long getUsedHeap() {
+ return usedHeap;
}
+ public int getCurrentThreads() {
+ return currentThreads;
+ }
+
+ public void setCurrentThreads(int currentThreads) {
+ this.currentThreads = currentThreads;
+ }
+
+ public void setUsedHeap(long usedHeap) {
+ this.usedHeap = usedHeap;
+ }
+
+ public void setMaxHeap(long maxHeap) {
+ this.maxHeap = maxHeap;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+
+ public void setRetries(int retries) {
+ this.retries = retries;
+ }
+
+ public boolean getReplicationEnabled() {
+ return replicationEnabled;
+ }
+
+ public void setReplicationEnabled(boolean replicationEnabled) {
+ this.replicationEnabled = replicationEnabled;
+ }
+
+ public boolean getResumed() {
+ return resumed;
+ }
+
+ public void setResumed(boolean resumed) {
+ this.resumed = resumed;
+ }
+
public long getCurrentTime() {
return currentTime;
}
public void setCurrentTime(long currentTime) {
this.currentTime = currentTime;
+ if (!tasks.isEmpty()) {
+ TimerTaskEntry e = tasks.first();
+ while (e.nextTime < currentTime) {
+ tasks.remove(e);
+
+ if (e.nextTime < MAX_INITIAL) {
+ e.nextTime = currentTime + e.nextTime;
+ }
+ else {
+ try {
+ e.task.run();
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ e.nextTime = e.nextTime + e.delay;
+ }
+ tasks.add(e);
+ e = tasks.first();
+ }
+ }
}
+
+ private static class TimerTaskEntry implements Comparable<TimerTaskEntry> {
+ public long nextTime;
+ public long delay;
+ public TimerTask task;
+
+ public TimerTaskEntry(TimerTask task, long nextTime, long delay) {
+ this.task = task;
+ this.nextTime = nextTime;
+ this.delay = delay;
+ }
+
+ @Override
+ public int compareTo(TimerTaskEntry o) {
+ long diff = nextTime - o.nextTime;
+ if (diff == 0) {
+ return System.identityHashCode(this) - System.identityHashCode(o);
+ }
+ else {
+ if (diff < 0) {
+ return -1;
+ }
+ else {
+ return 1;
+ }
+ }
+ }
+ }
+
+ private static final long MAX_INITIAL = 1000000;
+
+ public void schedule(TimerTask task, long initial, long repeat) {
+ if (initial > MAX_INITIAL) {
+ throw new IllegalArgumentException("Initial delay too large");
+ }
+ TimerTaskEntry e = new TimerTaskEntry(task, currentTime + initial, repeat);
+ tasks.add(e);
+ }
}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/common/DataSampler.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/common/DataSampler.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/common/DataSampler.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -55,7 +55,7 @@
this.listeners = new ArrayList<Listener>();
initializeData();
- GlobalTimer.getTimer().schedule(new TimerTask() {
+ state.schedule(new TimerTask() {
@Override
public void run() {
sample();
@@ -90,9 +90,9 @@
addSeries("Java Virtual Machine",
new Series<Long>("jvm/heapUsed", "JVM Heap Used", BYTES,
- new ReflectionSampler<Long>(state, "getCurrentHeap")),
+ new ReflectionSampler<Long>(state, "getUsedHeap")),
new Series<Integer>("jvm/activeThreads", "JVM Active Threads", COUNT,
- new ReflectionSampler<Integer>(Thread.class, "activeCount")));
+ new ReflectionSampler<Integer>(state, "getCurrentThreads")));
CoasterStatusItem coaster = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
@@ -151,7 +151,7 @@
}
protected void sample() {
- long now = (System.currentTimeMillis() / 1000);
+ long now = state.getCurrentTime() / 1000;
if (offset + count != now) {
if (offset < 0) {
offset = now;
Added: trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractListenableStatefulItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractListenableStatefulItem.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractListenableStatefulItem.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,32 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 9, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.items;
+
+public abstract class AbstractListenableStatefulItem extends AbstractStatefulItem {
+
+ private Listener listener;
+
+ public AbstractListenableStatefulItem(String id) {
+ super(id);
+ }
+
+ public void addListener(Listener listener) {
+ if (this.listener != null) {
+ listener = new ChainedListener(listener, this.listener);
+ }
+ this.listener = listener;
+ }
+
+ protected void notifyListener() {
+ if (listener != null) {
+ listener.itemUpdated(this);
+ }
+ }
+}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractStatefulItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractStatefulItem.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/AbstractStatefulItem.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -76,4 +76,9 @@
public String getID() {
return id;
}
+
+ @Override
+ public void addListener(Listener l) {
+ // not implemented by default
+ }
}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationItem.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationItem.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -22,7 +22,7 @@
public class ApplicationItem extends AbstractStatefulItem {
- private String name, arguments, host;
+ private String name, arguments, host, workerId;
private long startTime, currentStateTime;
/**
* The state of the app. Currently swift does not log app state transitions
@@ -97,6 +97,14 @@
return state;
}
+ public String getWorkerId() {
+ return workerId;
+ }
+
+ public void setWorkerId(String workerId) {
+ this.workerId = workerId;
+ }
+
public String toString() {
return "APP[" + name + ", " + arguments + ", " + host + "]";
}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationState.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationState.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/ApplicationState.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -13,9 +13,14 @@
import java.util.List;
public enum ApplicationState {
+ //0
INITIALIZING("Initializing"), SELECTING_SITE("Selecting site", "Sel. site"), STAGE_IN("Stage in"),
+ //3
SUBMITTING("Submitting"), SUBMITTED("Submitted"), ACTIVE("Active"), STAGE_OUT("Stage out"),
- FAILED("Failed"), REPLICATING("Replicating"), FINISHED_IN_PREVIOUS_RUN("Finished in previous run", "Finished in prev. run", false),
+ //7
+ FAILED("Failed"), REPLICATING("Replicating", "Replicating", false),
+ FINISHED_IN_PREVIOUS_RUN("Finished in previous run", "Finished in prev. run", false),
+ //10
FINISHED_SUCCESSFULLY("Finished successfully");
private String name, shortName;
@@ -71,4 +76,8 @@
return enabledValues;
}
+
+ public boolean isTerminal() {
+ return this == FAILED || this == FINISHED_SUCCESSFULLY || this == FINISHED_IN_PREVIOUS_RUN;
+ }
}
\ No newline at end of file
Added: trunk/src/org/griphyn/vdl/karajan/monitor/items/ChainedListener.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/ChainedListener.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/ChainedListener.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,25 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 9, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.items;
+
+public class ChainedListener implements StatefulItem.Listener {
+ private StatefulItem.Listener crt, old;
+
+ public ChainedListener(StatefulItem.Listener crt, StatefulItem.Listener old) {
+ this.crt = crt;
+ this.old = old;
+ }
+
+ @Override
+ public void itemUpdated(StatefulItem item) {
+ crt.itemUpdated(item);
+ old.itemUpdated(item);
+ }
+}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/StatefulItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/StatefulItem.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/StatefulItem.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -23,6 +23,10 @@
import java.util.Collection;
public interface StatefulItem {
+ public interface Listener {
+ void itemUpdated(StatefulItem item);
+ }
+
StatefulItem getParent();
void setParent(StatefulItem parent);
@@ -33,4 +37,6 @@
StatefulItemClass getItemClass();
String getID();
+
+ void addListener(Listener l);
}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/SummaryItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/SummaryItem.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/SummaryItem.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -20,7 +20,6 @@
*/
package org.griphyn.vdl.karajan.monitor.items;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -77,23 +76,11 @@
}
public int getCount(String key, SystemState state) {
- if (state.getStack() != null) {
- // TODO Must get these from log
- return -1;
- }
- else {
- return getCount(key);
- }
+ return getCount(key);
}
public synchronized Map<String, Integer> getCounts(SystemState state) {
- if (state.getStack() != null) {
- // TODO Must get these from log
- return Collections.emptyMap();
- }
- else {
- return new HashMap<String, Integer>(counts);
- }
+ return new HashMap<String, Integer>(counts);
}
public synchronized void setCount(String key, int value) {
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/items/TaskItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/items/TaskItem.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/items/TaskItem.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -25,6 +25,7 @@
public class TaskItem extends AbstractStatefulItem {
private Task task;
private int status, type;
+ private String workerId;
public TaskItem(String id, Task task) {
super(id);
@@ -76,4 +77,12 @@
public int getStatus() {
return status;
}
+
+ public String getWorkerId() {
+ return workerId;
+ }
+
+ public void setWorkerId(String workerdId) {
+ this.workerId = workerdId;
+ }
}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppDetailBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppDetailBuilder.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppDetailBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,243 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.TimedValue;
+
+public class AppDetailBuilder {
+
+ private String name;
+ private SortedMap<String, List<SortedSet<ApplicationItem>>> byName;
+ private BrowserDataBuilder db;
+
+ public AppDetailBuilder(BrowserDataBuilder db, String name) {
+ this.db = db;
+ this.byName = db.getByName();
+ this.name = name;
+ }
+
+ private static class StateTimesAverage {
+ public static final int N = 7;
+ public int[] stateTimeSum = new int[N];
+ public int[] stateTimeCount = new int[N];
+
+ public void add(ApplicationState state, long time) {
+ int index = INDEX_MAPPING[state.ordinal()];
+ stateTimeCount[index]++;
+ stateTimeSum[index] += (int) time;
+ }
+
+ public int[] getAverages() {
+ int[] avg = new int[N];
+ for (int i = 0; i < N; i++) {
+ if (stateTimeCount[i] == 0) {
+ avg[i] = 0;
+ }
+ else {
+ avg[i] = stateTimeSum[i] / stateTimeCount[i];
+ }
+ }
+ return avg;
+ }
+ }
+
+ private static int[] INDEX_MAPPING = new int[ApplicationState.values().length];
+
+ static {
+ INDEX_MAPPING[ApplicationState.INITIALIZING.ordinal()] = 0;
+ INDEX_MAPPING[ApplicationState.SELECTING_SITE.ordinal()] = 1;
+ INDEX_MAPPING[ApplicationState.SUBMITTING.ordinal()] = 2;
+ INDEX_MAPPING[ApplicationState.SUBMITTED.ordinal()] = 3;
+ INDEX_MAPPING[ApplicationState.STAGE_IN.ordinal()] = 4;
+ INDEX_MAPPING[ApplicationState.ACTIVE.ordinal()] = 5;
+ INDEX_MAPPING[ApplicationState.STAGE_OUT.ordinal()] = 6;
+ }
+
+
+ public void getData(JSONEncoder e) {
+ // sites it ran on
+ Set<String> sites = new TreeSet<String>();
+ // average times for each relevant state then for total
+ // relevant states: Initializing, Sel. site, Stage in, Submitting, Submitted (queued remotely).
+ // Active, Stage out, Total
+ StateTimesAverage st = new StateTimesAverage();
+ // same for each site
+ Map<String, StateTimesAverage> sts = new HashMap<String, StateTimesAverage>();
+ List<Integer> totalTimesC = new ArrayList<Integer>();
+ List<Integer> totalTimesF = new ArrayList<Integer>();
+ Map<String, List<Integer>> totalTimesCompletedSite = new HashMap<String, List<Integer>>();
+ Map<String, List<Integer>> totalTimesFailedSite = new HashMap<String, List<Integer>>();
+
+ int count = 0;
+ List<SortedSet<ApplicationItem>> l = byName.get(name);
+ for (int i = 0; i < l.size(); i++) {
+ SortedSet<ApplicationItem> ss = l.get(i);
+ for (ApplicationItem item : ss) {
+ count++;
+ String host = item.getHost();
+ if (!sites.contains(host)) {
+ sites.add(host);
+ sts.put(host, new StateTimesAverage());
+ totalTimesCompletedSite.put(host, new ArrayList<Integer>());
+ totalTimesFailedSite.put(host, new ArrayList<Integer>());
+ }
+
+ StateTimesAverage stss = sts.get(host);
+ List<Integer> ttCs = totalTimesCompletedSite.get(host);
+ List<Integer> ttFs = totalTimesFailedSite.get(host);
+
+ List<TimedValue<ApplicationState>> tl = db.getTimeline(item);
+ long lastTime = -1;
+ long firstTime = -1;
+ ApplicationState lastState = null;
+ for (TimedValue<ApplicationState> p : tl) {
+ if (lastState != null) {
+ switch (lastState) {
+ case STAGE_IN:
+ firstTime = p.time;
+ case FINISHED_SUCCESSFULLY:
+ case FAILED:
+ case INITIALIZING:
+ case SELECTING_SITE:
+ case SUBMITTING:
+ case SUBMITTED:
+ case ACTIVE:
+ case STAGE_OUT:
+ st.add(lastState, p.time - lastTime);
+ stss.add(lastState, p.time - lastTime);
+ }
+ int time;
+ switch (lastState) {
+ case FINISHED_SUCCESSFULLY:
+ time = (int) (p.time - firstTime);
+ totalTimesC.add(time);
+ ttCs.add(time);
+ break;
+ case FAILED:
+ time = (int) (p.time - firstTime);
+ totalTimesF.add(time);
+ ttFs.add(time);
+ break;
+ }
+ }
+ lastTime = p.time;
+ lastState = p.value;
+ }
+ }
+ }
+
+ // get range for total times
+ int minTime = Math.min(min(totalTimesC), min(totalTimesF));
+ int maxTime = Math.max(max(totalTimesC), max(totalTimesF));
+
+
+ int bins = (int) Math.max(Math.min(count / 5, 100.0), 1);
+ double binSize = ((double) (maxTime - minTime)) / bins;
+
+ // now serialize this
+ e.beginMap();
+ e.writeMapItem("name", name);
+ e.writeMapItem("count", count);
+ e.writeMapItem("completedCount", totalTimesC.size());
+ e.writeMapItem("failedCount", totalTimesF.size());
+ e.writeMapItem("avgStateTimes", st.getAverages());
+ e.writeMapItem("distMinTime", minTime);
+ e.writeMapItem("distMaxTime", maxTime);
+ e.writeMapItem("bins", 100);
+ e.writeMapItem("completedTimeDist", bin(totalTimesC, binSize, minTime, maxTime, bins));
+ e.writeMapItem("failedTimeDist", bin(totalTimesF, binSize, minTime, maxTime, bins));
+ e.writeMapItem("completedTimeAvg", avg(totalTimesC));
+ e.writeMapItem("failedTimeAvg", avg(totalTimesF));
+ e.writeMapKey("sites");
+ e.beginArray();
+ for (String host : sites) {
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("name", host);
+ e.writeMapItem("count", totalTimesCompletedSite.get(host).size());
+ e.writeMapItem("completedCount", totalTimesCompletedSite.get(host).size());
+ e.writeMapItem("failedCount", totalTimesFailedSite.get(host).size());
+ e.writeMapItem("avgStateTimes", sts.get(host).getAverages());
+ e.writeMapItem("distMinTime", minTime);
+ e.writeMapItem("distMaxTime", maxTime);
+ e.writeMapItem("completedTimeDist", bin(totalTimesCompletedSite.get(host), binSize, minTime, maxTime, bins));
+ e.writeMapItem("failedTimeDist", bin(totalTimesFailedSite.get(host), binSize, minTime, maxTime, bins));
+ e.writeMapItem("completedTimeAvg", avg(totalTimesCompletedSite.get(host)));
+ e.writeMapItem("failedTimeAvg", avg(totalTimesFailedSite.get(host)));
+ e.endMap();
+ e.endArrayItem();
+ }
+ e.endArray();
+
+ e.endMap();
+ }
+
+ private int avg(List<Integer> l) {
+ if (l.isEmpty()) {
+ return 0;
+ }
+ int sum = 0;
+ for (Integer i : l) {
+ sum += i;
+ }
+ return sum / l.size();
+ }
+
+ private List<Integer> bin(List<Integer> l, double binSize, int minTime, int maxTime, int binCount) {
+ List<Integer> hist = new ArrayList<Integer>();
+ for (int i = 0; i < binCount; i++) {
+ hist.add(0);
+ }
+ for (Integer v : l) {
+ int bin = (int) Math.ceil((v - minTime) / binSize) - 1;
+ hist.set(bin, hist.get(bin) + 1);
+ }
+ return hist;
+ }
+
+ private int min(List<Integer> l) {
+ if (l.isEmpty()) {
+ return 0;
+ }
+
+ int min = l.get(0);
+ for (Integer i : l) {
+ if (i < min) {
+ min = i;
+ }
+ }
+ return min;
+ }
+
+ private int max(List<Integer> l) {
+ if (l.isEmpty()) {
+ return 0;
+ }
+
+ int max = l.get(0);
+ for (Integer i : l) {
+ if (i > max) {
+ max = i;
+ }
+ }
+ return max;
+ }
+}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppInstanceBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppInstanceBuilder.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppInstanceBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,148 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+import org.griphyn.vdl.karajan.monitor.items.TaskItem;
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.AppEntry;
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.TimedValue;
+
+public class AppInstanceBuilder {
+
+ private BrowserDataBuilder db;
+ private String id;
+ private Map<String, AppEntry> entries;
+ private SystemState state;
+
+ public AppInstanceBuilder(BrowserDataBuilder db, String id) {
+ this.db = db;
+ this.id = id;
+ this.entries = db.getEntries();
+ this.state = db.getState();
+ }
+
+ public void getData(JSONEncoder e) {
+ AppEntry ae = entries.get(id);
+ if (ae == null) {
+ throw new IllegalArgumentException("Unknown application ID: " + id);
+ }
+
+ ApplicationItem app = ae.item;
+
+ e.beginMap();
+ e.writeMapItem("id", id);
+ e.writeMapItem("name", app.getName());
+ e.writeMapItem("args", app.getArguments());
+ e.writeMapItem("host", app.getHost());
+ e.writeMapItem("crtState", app.getState().ordinal());
+ e.writeMapItem("totalTime", getTotalTime(ae.stateTimeline));
+ e.writeMapItem("runTime", getRunTime(ae.stateTimeline));
+ e.writeMapItem("timeline", db.getStateTimes(app));
+
+ if (app.getWorkerId() != null) {
+ e.writeMapItem("workerid", app.getWorkerId());
+ }
+
+ TaskItem et = null;
+
+ if (ae.tasks != null) {
+ for (TaskItem it : ae.tasks) {
+ if (it.getType() == Task.JOB_SUBMISSION) {
+ et = it;
+ }
+ }
+ }
+
+ if (et != null) {
+ Task t = et.getTask();
+ JobSpecification spec = (JobSpecification) t.getSpecification();
+ List<String> args = spec.getArgumentsAsList();
+ extractJobInfo(e, args);
+ e.writeMapItem("directory", spec.getDirectory());
+ }
+ e.endMap();
+ }
+
+
+ private int getTotalTime(List<TimedValue<ApplicationState>> tl) {
+ if (tl == null || tl.isEmpty()) {
+ return 0;
+ }
+ return (int) (state.getCurrentTime() - tl.get(0).time);
+ }
+
+ private int getRunTime(List<TimedValue<ApplicationState>> tl) {
+ if (tl == null || tl.isEmpty()) {
+ return 0;
+ }
+ for (TimedValue<ApplicationState> p : tl) {
+ switch (p.value) {
+ case STAGE_IN:
+ case STAGE_OUT:
+ case ACTIVE:
+ return (int) (state.getCurrentTime() - p.time);
+ }
+ }
+ return 0;
+ }
+
+ private void extractJobInfo(JSONEncoder e, List<String> args) {
+ String key = null;
+ List<String> l = new ArrayList<String>();
+ for (String arg : args) {
+ if (arg.startsWith("-")) {
+ if (key != null) {
+ writeJobInfoItem(e, key, l);
+ }
+ key = arg;
+ l.clear();
+ }
+ else {
+ l.add(arg);
+ }
+ }
+ }
+
+ private void writeJobInfoItem(JSONEncoder e, String key, List<String> l) {
+ if (l.size() == 0) {
+ return;
+ }
+ if (key.equals("-e")) {
+ e.writeMapItem("executable", l.get(0));
+ }
+ else if (key.equals("-out")) {
+ e.writeMapItem("stdout", l.get(0));
+ }
+ else if (key.equals("-err")) {
+ e.writeMapItem("stderr", l.get(0));
+ }
+ else if (key.equals("-i")) {
+ e.writeMapItem("stdin", l.get(0));
+ }
+ else if (key.equals("-if")) {
+ e.writeMapItem("stagein", l);
+ }
+ else if (key.equals("-of")) {
+ e.writeMapItem("stageout", l);
+ }
+ else if (key.equals("-a")) {
+ e.writeMapItem("args", l);
+ }
+ }
+
+}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppListBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppListBuilder.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppListBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,139 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+
+public class AppListBuilder {
+
+ private String name;
+ private int page;
+ private int pageSize;
+ private int stateFilter;
+ private SortedMap<String, List<SortedSet<ApplicationItem>>> byName;
+ private String hostFilter;
+ private BrowserDataBuilder db;
+
+ public AppListBuilder(BrowserDataBuilder db, String name, int page, int pageSize, int state, String host) {
+ this.db = db;
+ this.byName = db.getByName();
+ this.name = name;
+ if (name == null) {
+ this.name = "";
+ }
+ this.page = page;
+ this.pageSize = pageSize;
+ this.stateFilter = state;
+ this.hostFilter = host;
+ }
+
+ public void getData(JSONEncoder e) {
+ SortedSet<ApplicationItem> sorted = getInstances(name, stateFilter, hostFilter);
+
+ int start = (page - 1) * pageSize;
+ int index = 0;
+ e.beginMap();
+
+ String title;
+ if (stateFilter == -1) {
+ if (name.isEmpty()) {
+ title = "All application invocations";
+ }
+ else {
+ title = "Invocations of application \"" + name + "\"";
+ }
+ }
+ else {
+ if (name.isEmpty()) {
+ title = ApplicationState.values()[stateFilter] + " application invocations";
+ }
+ else {
+ title = ApplicationState.values()[stateFilter] + " invocations of application \"" + name + "\"";
+ }
+ }
+ if (hostFilter != null) {
+ title = title + " on site \"" + hostFilter + "\"";
+ }
+ e.writeMapItem("title", title);
+ db.writePagingData(e, sorted.size(), page, pageSize);
+ e.writeMapItem("name", name);
+ e.writeMapItem("state", stateFilter);
+ e.writeMapItem("host", hostFilter);
+ for (ApplicationItem item : sorted) {
+ if (index == start) {
+ e.writeMapKey("data");
+ e.beginArray();
+ }
+ if (index >= start) {
+ ApplicationState state = item.getState();
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("id", item.getID());
+ e.writeMapItem("state", state.ordinal());
+ e.writeMapItem("startTime", item.getStartTime());
+ e.writeMapItem("host", item.getHost());
+ if (item.getWorkerId() != null) {
+ e.writeMapItem("worker", item.getWorkerId());
+ }
+ e.writeMapItem("args", item.getArguments());
+ if (state.isTerminal()) {
+ e.writeMapItem("runTime", item.getCurrentStateTime() - item.getStartTime());
+ }
+ else {
+ e.writeMapItem("runTime", 0L);
+ }
+ e.endMap();
+ e.endArrayItem();
+ }
+ if (index > start + pageSize) {
+ e.endArray();
+ e.endMap();
+ return;
+ }
+ index++;
+ }
+ if (sorted.size() > 0) {
+ e.endArray();
+ }
+ e.endMap();
+ }
+
+ private SortedSet<ApplicationItem> getInstances(String name, int stateFilter, String hostFilter) {
+ SortedSet<ApplicationItem> sorted = new TreeSet<ApplicationItem>(BrowserDataBuilder.APP_TIME_COMPARATOR);
+ if (!name.isEmpty()) {
+ List<SortedSet<ApplicationItem>> l = byName.get(name);
+ getInstances(sorted, l, stateFilter, hostFilter);
+ }
+ else {
+ for (List<SortedSet<ApplicationItem>> l : byName.values()) {
+ getInstances(sorted, l, stateFilter, hostFilter);
+ }
+ }
+ return sorted;
+ }
+
+ private void getInstances(SortedSet<ApplicationItem> sorted, List<SortedSet<ApplicationItem>> l, int stateFilter, String hostFilter) {
+ for (SortedSet<ApplicationItem> ss : l) {
+ for (ApplicationItem app : ss) {
+ boolean stateMatch = stateFilter == -1 || app.getState().ordinal() == stateFilter;
+ boolean hostMatch = hostFilter == null || app.getHost().equals(hostFilter);
+ if (stateMatch && hostMatch) {
+ sorted.add(app);
+ }
+ }
+ }
+ }
+}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppsSummaryBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppsSummaryBuilder.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/AppsSummaryBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,55 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+
+public class AppsSummaryBuilder {
+
+ private final SortedMap<String, List<SortedSet<ApplicationItem>>> byName;
+ private BrowserDataBuilder db;
+
+ public AppsSummaryBuilder(BrowserDataBuilder db) {
+ this.byName = db.getByName();
+ this.db = db;
+ }
+
+ public void getData(JSONEncoder e) {
+ // counts of each state by name
+ e.beginMap();
+ db.writeEnabledStates(e, "enabledStates");
+ e.writeMapKey("apps");
+ e.beginMap();
+ for (Map.Entry<String, List<SortedSet<ApplicationItem>>> en : byName.entrySet()) {
+ e.writeMapKey(en.getKey());
+ e.beginArray();
+ for (ApplicationState s : ApplicationState.values()) {
+ if (s.isEnabled()) {
+ e.beginArrayItem();
+ e.beginArray();
+ e.writeArrayItem(s.ordinal());
+ e.writeArrayItem(en.getValue().get(s.ordinal()).size());
+ e.endArray();
+ e.endArrayItem();
+ }
+ }
+ e.endArray();
+ }
+ e.endMap();
+ e.endMap();
+ }
+
+}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/BrowserDataBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/BrowserDataBuilder.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/BrowserDataBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,391 @@
+//----------------------------------------------------------------------
+ //This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2013
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.SystemStateListener;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
+import org.griphyn.vdl.karajan.monitor.items.TaskItem;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Block;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerProbeItem;
+
+public class BrowserDataBuilder extends StateDataBuilder implements SystemStateListener {
+
+ public static final Comparator<ApplicationItem> APP_TIME_COMPARATOR = new Comparator<ApplicationItem>() {
+ @Override
+ public int compare(ApplicationItem o1, ApplicationItem o2) {
+ return (int) (o1.getStartTime() - o2.getStartTime());
+ }
+ };
+
+ public static class TimedValue<T> {
+ public long time;
+ public T value;
+
+ public TimedValue(long time, T value) {
+ this.time = time;
+ this.value = value;
+ }
+ }
+
+ public static class AppEntry {
+ public ApplicationItem item;
+ public ApplicationState oldState;
+ public List<TaskItem> tasks;
+ public List<TimedValue<ApplicationState>> stateTimeline;
+ }
+
+ public static class WorkerData {
+ public String site;
+ public List<WorkerProbeItem.Cpu> cpuLoad;
+ public Map<String, List<WorkerProbeItem.DiskUsage>> diskUsage;
+ public Map<String, List<WorkerProbeItem.IOLoad>> ioLoad;
+ public int activeApps;
+ public int failedApps;
+ public int completedApps;
+
+ public WorkerData() {
+ cpuLoad = new ArrayList<WorkerProbeItem.Cpu>();
+ diskUsage = new TreeMap<String, List<WorkerProbeItem.DiskUsage>>();
+ ioLoad = new TreeMap<String, List<WorkerProbeItem.IOLoad>>();
+ }
+ }
+
+ private final SystemState state;
+ private int maxCount;
+ private JSONEncoder e;
+
+ private SortedMap<String, List<SortedSet<ApplicationItem>>> byName;
+ private Map<String, AppEntry> entries;
+ private List<ApplicationItem> byTime;
+
+ private SortedMap<String, WorkerData> workerData;
+ private List<String> workersByTime;
+
+ public BrowserDataBuilder(SystemState state) {
+ this.state = state;
+ byName = new TreeMap<String, List<SortedSet<ApplicationItem>>>();
+ byTime = new ArrayList<ApplicationItem>();
+ entries = new HashMap<String, AppEntry>();
+ workerData = new TreeMap<String, WorkerData>();
+ workersByTime = new ArrayList<String>();
+ state.addListener(this);
+ }
+
+ @Override
+ public void itemUpdated(UpdateType updateType, StatefulItem item) {
+ if (item.getItemClass() == StatefulItemClass.APPLICATION) {
+ appUpdated(updateType, (ApplicationItem) item);
+ }
+ else if (item.getItemClass() == StatefulItemClass.TASK) {
+ taskUpdated(updateType, (TaskItem) item);
+ }
+ else if (item instanceof WorkerProbeItem) {
+ addWorkerProbeData((WorkerProbeItem) item);
+ }
+ }
+
+ private WorkerData getWorkerData(String id) {
+ WorkerData wd = workerData.get(id);
+ if (wd == null) {
+ wd = new WorkerData();
+ workerData.put(id, wd);
+ workersByTime.add(id);
+ }
+ return wd;
+ }
+
+ private void addWorkerProbeData(WorkerProbeItem item) {
+ WorkerData wd = getWorkerData(item.getID());
+ WorkerProbeItem.Data data = item.getData();
+ if (data instanceof WorkerProbeItem.Cpu) {
+ wd.cpuLoad.add((WorkerProbeItem.Cpu) data);
+ }
+ else if (data instanceof WorkerProbeItem.DiskUsage) {
+ WorkerProbeItem.DiskUsage du = (WorkerProbeItem.DiskUsage) data;
+ List<WorkerProbeItem.DiskUsage> l = wd.diskUsage.get(du.getMountPoint());
+ if (l == null) {
+ l = new ArrayList<WorkerProbeItem.DiskUsage>();
+ wd.diskUsage.put(du.getMountPoint(), l);
+ }
+ l.add(du);
+ }
+ else if (data instanceof WorkerProbeItem.IOLoad) {
+ WorkerProbeItem.IOLoad du = (WorkerProbeItem.IOLoad) data;
+ List<WorkerProbeItem.IOLoad> l = wd.ioLoad.get(du.getDevice());
+ if (l == null) {
+ l = new ArrayList<WorkerProbeItem.IOLoad>();
+ wd.ioLoad.put(du.getDevice(), l);
+ }
+ l.add(du);
+ }
+ }
+
+ private void taskUpdated(UpdateType updateType, TaskItem task) {
+ ApplicationItem app = (ApplicationItem) task.getParent();
+ if (app == null) {
+ return;
+ }
+ AppEntry e = entries.get(app.getID());
+ if (e == null) {
+ return;
+ }
+ if (e.tasks == null) {
+ e.tasks = Collections.singletonList(task);
+ }
+ else if (e.tasks.size() == 1) {
+ if (!e.tasks.contains(task)) {
+ List<TaskItem> l = new LinkedList<TaskItem>();
+ l.add(e.tasks.get(0));
+ l.add(task);
+ e.tasks = l;
+ }
+ }
+ else {
+ if (!e.tasks.contains(task)) {
+ e.tasks.add(task);
+ }
+ }
+ }
+
+ private void appUpdated(UpdateType updateType, ApplicationItem app) {
+ if (app.getName() == null) {
+ return;
+ }
+ if (entries.containsKey(app.getID())) {
+ updateApp(app);
+ }
+ else {
+ addApp(app);
+ }
+ }
+
+ private void updateApp(ApplicationItem item) {
+ AppEntry e = entries.get(item.getID());
+ ApplicationState old = e.oldState;
+ ApplicationState state = item.getState();
+ e.oldState = state;
+ String name = item.getName();
+ List<SortedSet<ApplicationItem>> l = getNamed(name);
+ if (old != null) {
+ l.get(old.ordinal()).remove(item);
+ }
+ l.get(state.ordinal()).add(item);
+ List<TimedValue<ApplicationState>> timeline = getTimeline(e);
+
+ timeline.add(new TimedValue<ApplicationState>(this.state.getCurrentTime(), state));
+
+ if (item.getWorkerId() != null) {
+ // initialize worker data
+ WorkerData wd = getWorkerData(item.getWorkerId());
+ switch (state) {
+ case ACTIVE:
+ wd.activeApps++;
+ break;
+ case FAILED:
+ wd.failedApps++;
+ wd.activeApps--;
+ break;
+ case FINISHED_SUCCESSFULLY:
+ wd.activeApps--;
+ wd.completedApps++;
+ break;
+ }
+ wd.site = item.getHost();
+ }
+ }
+
+ private List<TimedValue<ApplicationState>> getTimeline(AppEntry e) {
+ List<TimedValue<ApplicationState>> tl = e.stateTimeline;
+ if (tl == null) {
+ tl = new ArrayList<TimedValue<ApplicationState>>();
+ e.stateTimeline = tl;
+ }
+ return tl;
+ }
+
+ List<TimedValue<ApplicationState>> getTimeline(ApplicationItem item) {
+ AppEntry e = entries.get(item.getID());
+ if (e == null) {
+ throw new IllegalArgumentException("Unknown app id: " + item.getID());
+ }
+ return getTimeline(e);
+ }
+
+ private List<SortedSet<ApplicationItem>> getNamed(String name) {
+ List<SortedSet<ApplicationItem>> l = byName.get(name);
+ if (l == null) {
+ l = new ArrayList<SortedSet<ApplicationItem>>();
+ for (ApplicationState s : ApplicationState.values()) {
+ l.add(new TreeSet<ApplicationItem>(APP_TIME_COMPARATOR));
+ }
+ byName.put(name, l);
+ }
+ return l;
+ }
+
+ private void addApp(ApplicationItem item) {
+ byTime.add(item);
+ String name = item.getName();
+ if (name == null) {
+ return;
+ }
+ ApplicationState state = item.getState();
+ AppEntry e = new AppEntry();
+ entries.put(item.getID(), e);
+ e.item = item;
+ e.oldState = state;
+ List<SortedSet<ApplicationItem>> l = getNamed(name);
+ l.get(state.ordinal()).add(item);
+ }
+
+ @Override
+ public ByteBuffer getData(Map<String, String> params) {
+ e = new JSONEncoder();
+ String type = getParam(params, "type", "apps");
+
+ if (type.equals("apps")) {
+ new AppsSummaryBuilder(this).getData(e);
+ }
+ else if (type.equals("applist")) {
+ new AppListBuilder(this, getParam(params, "name", null),
+ Integer.parseInt(getParam(params, "page", "1")),
+ Integer.parseInt(getParam(params, "pageSize", "20")),
+ Integer.parseInt(getParam(params, "state", "-1")),
+ getParam(params, "host", null)).getData(e);
+ }
+ else if (type.equals("appdetail")) {
+ new AppDetailBuilder(this, getParam(params, "name")).getData(e);
+ }
+ else if (type.equals("appinstance")) {
+ new AppInstanceBuilder(this, getParam(params, "id")).getData(e);
+ }
+ else if (type.equals("sites")) {
+ new SiteInfoBuilder(this).getData(e);
+ }
+ else if (type.equals("workerlist")) {
+ new WorkerListBuilder(this, getParam(params, "site", null),
+ Integer.parseInt(getParam(params, "page", "1")),
+ Integer.parseInt(getParam(params, "pageSize", "20"))).getData(e);
+ }
+ else if (type.equals("worker")) {
+ new WorkerInfoBuilder(this, getParam(params, "id")).getData(e);
+ }
+ else {
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+ return ByteBuffer.wrap(e.toString().getBytes());
+ }
+
+ public List<List<Integer>> getStateTimes(ApplicationItem app) {
+ List<TimedValue<ApplicationState>> tl = getTimeline(app);
+ List<List<Integer>> l = new ArrayList<List<Integer>>();
+ if (tl != null) {
+ long lastTime = -1;
+ long firstTime = -1;
+ ApplicationState lastState = null;
+ for (TimedValue<ApplicationState> p : tl) {
+ if (lastState != null) {
+ l.add(new org.griphyn.vdl.karajan.Pair<Integer>(lastState.ordinal(), (int) (p.time - lastTime)));
+ }
+ lastTime = p.time;
+ lastState = p.value;
+ }
+ }
+ return l;
+ }
+
+
+ private String getParam(Map<String, String> params, String name, String _default) {
+ String value = params.get(name);
+ if (value == null) {
+ return _default;
+ }
+ else {
+ return value;
+ }
+ }
+
+ private String getParam(Map<String, String> params, String name) {
+ String value = params.get(name);
+ if (value == null) {
+ throw new IllegalArgumentException("Missing parameter '" + name + "'");
+ }
+ else {
+ return value;
+ }
+ }
+
+ public SortedMap<String, List<SortedSet<ApplicationItem>>> getByName() {
+ return byName;
+ }
+
+ public Map<String, AppEntry> getEntries() {
+ return entries;
+ }
+
+ public SystemState getState() {
+ return state;
+ }
+
+ public List<ApplicationItem> getByTime() {
+ return byTime;
+ }
+
+ public void writeEnabledStates(JSONEncoder e, String key) {
+ e.writeMapKey(key);
+ e.beginArray();
+ for (ApplicationState s : ApplicationState.values()) {
+ if (s.isEnabled()) {
+ e.beginArrayItem();
+ e.write(s.ordinal());
+ }
+ }
+ e.endArray();
+ }
+
+ public Map<String, WorkerData> getWorkerData() {
+ return workerData;
+ }
+
+ public Block getBlock(String id) {
+ CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
+ return item.getBlocks().get(id);
+ }
+
+ public List<String> getWorkersByTime() {
+ return workersByTime;
+ }
+
+ public void writePagingData(JSONEncoder e, int size, int page, int pageSize) {
+ int pages = (int) Math.ceil(((double) size) / pageSize);
+ e.writeMapItem("pages", pages);
+ e.writeMapItem("hasPrev", page > 1);
+ e.writeMapItem("hasNext", page < pages);
+ e.writeMapItem("crtPage", page);
+ }
+}
\ No newline at end of file
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/HTTPServer.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/HTTPServer.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/HTTPServer.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -69,6 +69,7 @@
stateKeys.put("/summary.state", new SummaryDataBuilder(state));
stateKeys.put("/plotSeriesInfo.state", new PlotInfoBuilder(state));
stateKeys.put("/plotData.state", new PlotDataBuilder(state));
+ stateKeys.put("/browser.state", new BrowserDataBuilder(state));
}
public void start() throws IOException {
@@ -181,7 +182,19 @@
for (SelectionKey key : keys) {
if (key.isValid()) {
ConnectionState s = channels.get(key.channel());
- s.process(key);
+ boolean ok = false;
+ if (s != null) {
+ try {
+ s.process(key);
+ ok = true;
+ }
+ catch (Exception e) {
+ }
+ }
+ if (!ok) {
+ channels.remove(key.channel());
+ key.cancel();
+ }
}
}
skeys.clear();
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/JSONEncoder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/JSONEncoder.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/JSONEncoder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -9,6 +9,7 @@
*/
package org.griphyn.vdl.karajan.monitor.monitors.http;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Stack;
@@ -42,11 +43,28 @@
}
public void write(String value) {
- sb.append('"');
- sb.append(value);
- sb.append('"');
+ if (value == null) {
+ sb.append("null");
+ }
+ else {
+ sb.append('"');
+ escape(sb, value);
+ sb.append('"');
+ }
}
+ private void escape(StringBuilder sb, String value) {
+ for (int i = 0; i < value.length(); i++) {
+ char c = value.charAt(i);
+ switch (c) {
+ case '"':
+ sb.append('\\');
+ default:
+ sb.append(c);
+ }
+ }
+ }
+
public void write(boolean value) {
sb.append(value);
}
@@ -73,14 +91,17 @@
else if (value instanceof Long) {
write(((Long) value).longValue());
}
- else if (value instanceof List) {
- writeArray((List<?>) value);
+ else if (value instanceof Collection) {
+ writeArray((Collection<?>) value);
}
else if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<? extends Object, ? extends Object> m = (Map<? extends Object, ? extends Object>) value;
writeMap(m);
}
+ else if (value instanceof int[]) {
+ writeArray((int[]) value);
+ }
else {
write(value.toString());
}
@@ -239,7 +260,7 @@
}
}
- public void writeArray(List<?> a) {
+ public void writeArray(Collection<?> a) {
beginArray();
for (Object v : a) {
writeArrayItem(v);
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SiteInfoBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SiteInfoBuilder.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SiteInfoBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,162 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 29, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+
+public class SiteInfoBuilder {
+
+ private BrowserDataBuilder db;
+ private List<ApplicationItem> byTime;
+ private List<Integer> enabledStatesMapping;
+
+ public SiteInfoBuilder(BrowserDataBuilder db) {
+ this.db = db;
+ this.byTime = db.getByTime();
+
+ enabledStatesMapping = new ArrayList<Integer>();
+ for (ApplicationState state : ApplicationState.values()) {
+ if (state.isEnabled()) {
+ enabledStatesMapping.add(state.ordinal());
+ }
+ }
+ }
+
+ private static class SiteInfo {
+ public String name;
+ public Map<String, List<Integer>> appStates;
+ public Map<String, List<Integer>> appCummulativeStateTimes;
+ public Map<String, Integer> appCountByType;
+ public SortedSet<String> workers;
+ public int appCount;
+
+ public SiteInfo(String name) {
+ this.name = name;
+ this.appStates = new TreeMap<String, List<Integer>>();
+ this.appCountByType = new TreeMap<String, Integer>();
+ this.appCummulativeStateTimes = new TreeMap<String, List<Integer>>();
+ this.workers = new TreeSet<String>();
+ }
+ }
+
+
+ public void getData(JSONEncoder e) {
+ /*
+ * List of sites
+ * List of app names it ran with state counts and average times
+ * List of workers
+ */
+
+ Map<String, SiteInfo> si = new HashMap<String, SiteInfo>();
+
+ for (ApplicationItem app : byTime) {
+ addApp(getOrCreateSite(si, app.getHost()), app);
+ }
+
+ e.beginMap();
+ db.writeEnabledStates(e, "enabledStates");
+ e.writeMapKey("sites");
+ e.beginArray();
+ for (SiteInfo i : si.values()) {
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("name", i.name);
+ e.writeMapItem("appCount", i.appCount);
+ e.writeMapItem("workers", i.workers);
+ e.writeMapKey("stateCounts");
+ e.beginMap();
+ for (Map.Entry<String, List<Integer>> e1 : i.appStates.entrySet()) {
+ e.writeMapKey(e1.getKey());
+ e.beginArray();
+ int index = 0;
+ for (Integer i2 : e1.getValue()) {
+ if (ApplicationState.values()[index++].isEnabled()) {
+ e.writeArrayItem(i2.intValue());
+ }
+ }
+ e.endArray();
+ }
+ e.endMap();
+ e.writeMapKey("avgStateTimes");
+ e.beginMap();
+ for (Map.Entry<String, List<Integer>> e1 : i.appCummulativeStateTimes.entrySet()) {
+ e.writeMapKey(e1.getKey());
+ int count = i.appCountByType.get(e1.getKey());
+ e.beginArray();
+ int index = 0;
+ for (Integer i2 : e1.getValue()) {
+ if (ApplicationState.values()[index++].isEnabled()) {
+ e.writeArrayItem(i2.intValue() / count);
+ }
+ }
+ e.endArray();
+ }
+ e.endMap();
+ e.endMap();
+ }
+ e.endArray();
+ e.endMap();
+ }
+
+ private void addApp(SiteInfo si, ApplicationItem app) {
+ si.appCount++;
+ if (app.getWorkerId() != null) {
+ si.workers.add(app.getWorkerId());
+ }
+ String name = app.getName();
+
+ List<Integer> states = si.appStates.get(name);
+ List<Integer> stateTimes = si.appCummulativeStateTimes.get(name);
+ Integer count = si.appCountByType.get(name);
+ if (states == null) {
+ states = new ArrayList<Integer>();
+ states.addAll(Collections.nCopies(ApplicationState.values().length, 0));
+ si.appStates.put(name, states);
+
+ stateTimes = new ArrayList<Integer>();
+ stateTimes.addAll(Collections.nCopies(ApplicationState.values().length, 0));
+ si.appCummulativeStateTimes.put(name, stateTimes);
+
+ count = 0;
+ }
+
+ add(states, app.getState().ordinal(), 1);
+ List<List<Integer>> st = db.getStateTimes(app);
+ for (List<Integer> sti : st) {
+ add(stateTimes, sti.get(0), sti.get(1));
+ }
+ si.appCountByType.put(name, count + 1);
+ }
+
+ private void add(List<Integer> l, int index, int amount) {
+ int crt = l.get(index);
+ l.set(index, crt + amount);
+ }
+
+ private SiteInfo getOrCreateSite(Map<String, SiteInfo> si, String host) {
+ SiteInfo s = si.get(host);
+ if (s == null) {
+ s = new SiteInfo(host);
+ si.put(host, s);
+ }
+ return s;
+ }
+
+}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SummaryDataBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SummaryDataBuilder.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SummaryDataBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -36,7 +36,7 @@
e.writeMapItem("completed", state.getCompleted());
e.writeMapItem("maxHeap", state.getMaxHeap());
e.writeMapItem("maxHeapFormatted", state.getMaxHeapFormatted());
- e.writeMapItem("crtHeap", state.getCurrentHeap());
+ e.writeMapItem("crtHeap", state.getUsedHeap());
e.writeMapItem("crtHeapFormatted", state.getCurrentHeapFormatted());
e.writeMapItem("timeLeftFormatted", state.getEstimatedTimeLeftFormatted());
e.writeMapItem("elapsedTimeFormatetd", state.getElapsedTimeFormatted());
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SwiftLogInfo.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SwiftLogInfo.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/SwiftLogInfo.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,139 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 1, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+import org.apache.log4j.spi.LoggingEvent;
+import org.globus.cog.util.ArgumentParser;
+import org.globus.cog.util.ArgumentParserException;
+import org.griphyn.vdl.karajan.monitor.MonitorAppender;
+
+public class SwiftLogInfo {
+ private static SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSSZ");
+
+ public static final int FOLLOW_SLEEP_TIME = 50;
+
+ private String logFileName;
+ private boolean follow;
+ private double rate;
+
+ public SwiftLogInfo(String logFileName, boolean follow, double rate) {
+ this.logFileName = logFileName;
+ this.follow = follow;
+ this.rate = rate;
+ }
+
+ public void run() throws Exception {
+ MonitorAppender ap = new MonitorAppender("bla", "http");
+ BufferedReader br = new BufferedReader(new FileReader(logFileName));
+
+ long firstLogTime = -1;
+ long firstActualTime = System.currentTimeMillis();
+
+ if (follow) {
+ System.out.print("Following " + logFileName + ". Hit CTRL+C to end.");
+ }
+ else {
+ System.out.print("Parsing " + logFileName + "...");
+ }
+ String line = null;
+ while (follow || (line = br.readLine()) != null) {
+ if (line == null) {
+ Thread.sleep(FOLLOW_SLEEP_TIME);
+ continue;
+ }
+ String[] els = line.split("\\s+", 5);
+ if (els.length < 5) {
+ continue;
+ }
+
+ long time;
+ try {
+ time = SDF.parse(els[0] + " " + els[1]).getTime();
+ }
+ catch (ParseException e) {
+ continue;
+ }
+
+ if (rate != 0) {
+ if (firstLogTime == -1) {
+ firstLogTime = time;
+ }
+ long now = System.currentTimeMillis();
+ // this event is supposed to happen at this relative time
+ long deadline = (long) ((time - firstLogTime) / rate);
+ long delay = deadline - (now - firstActualTime);
+ System.out.println("deadline: " + deadline + ", now: " + (now - firstActualTime));
+ if (delay >= 0) {
+ Thread.sleep(delay);
+ }
+ }
+
+ LoggingEvent le = new LoggingEvent(els[3], Logger.getLogger(els[3]), time, getLevel(els[2]), els[4], null);
+ ap.doAppend(le);
+ }
+ System.out.println("done");
+ }
+
+ public static void main(String[] args) {
+ ArgumentParser ap = new ArgumentParser();
+ ap.addFlag("f", "Follow: keep parsing the log file as it grows.");
+ ap.addOption("rt", "integer", "Real time: if specified, " +
+ "generate log events progressively at a rate " +
+ "proportional to that at which they were generated.",
+ ArgumentParser.OPTIONAL);
+ ap.addOption(ArgumentParser.DEFAULT, "logFile", "The log file to parse",
+ ArgumentParser.NORMAL);
+ ap.addFlag("h", "Display usage information");
+
+ try {
+ ap.parse(args);
+ if (ap.isPresent("h")) {
+ ap.usage();
+ System.exit(0);
+ }
+ SwiftLogInfo sli = new SwiftLogInfo(ap.getStringValue(ArgumentParser.DEFAULT),
+ ap.isPresent("f"), ap.getFloatValue("rt", 0));
+ sli.run();
+ }
+ catch (ArgumentParserException e) {
+ System.err.println(e.getMessage());
+ ap.usage();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static Priority getLevel(String s) {
+ if ("WARN".equals(s)) {
+ return Level.WARN;
+ }
+ else if ("ERROR".equals(s)) {
+ return Level.ERROR;
+ }
+ else if ("INFO".equals(s)) {
+ return Level.INFO;
+ }
+ else if ("DEBUG".equals(s)) {
+ return Level.DEBUG;
+ }
+ else {
+ return Level.ALL;
+ }
+ }
+}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerInfoBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerInfoBuilder.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerInfoBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,116 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 30, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.List;
+import java.util.Map;
+
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.WorkerData;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Block;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Worker;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerProbeItem;
+
+public class WorkerInfoBuilder {
+
+ private BrowserDataBuilder db;
+ private String wid;
+
+ public WorkerInfoBuilder(BrowserDataBuilder db, String wid) {
+ this.db = db;
+ this.wid = wid;
+ }
+
+ public void getData(JSONEncoder e) {
+ /*
+ * worker, node running on, wall time, run time, apps running
+ * probes, all details
+ */
+ e.beginMap();
+ Map<String, WorkerData> wd = db.getWorkerData();
+
+ WorkerData wdd = wd.get(wid);
+
+ int index = wid.indexOf(':');
+ String blkId = wid.substring(0, index);
+ String wId = wid.substring(index + 1);
+
+ Block blk = db.getBlock(blkId);
+ Worker w = blk.getWorker(wId);
+
+ e.writeMapItem("id", wid);
+ e.writeMapItem("node", w.node);
+ e.writeMapItem("cores", w.cores);
+ e.writeMapItem("startTime", blk.startTime);
+ e.writeMapItem("walltime", blk.walltime);
+ e.writeMapItem("activeApps", wdd.activeApps);
+ e.writeMapItem("failedApps", wdd.failedApps);
+ e.writeMapItem("completedApps", wdd.completedApps);
+
+ e.writeMapKey("cpuLoad");
+ e.beginArray();
+ for (WorkerProbeItem.Cpu cpu : wdd.cpuLoad) {
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("t", cpu.getTime());
+ e.writeMapItem("load", cpu.getLoad());
+ e.endMap();
+ }
+ e.endArray();
+
+ e.writeMapKey("diskUsage");
+ e.beginArray();
+ int ix = 0;
+ for (Map.Entry<String, List<WorkerProbeItem.DiskUsage>> e1 : wdd.diskUsage.entrySet()) {
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("index", ix++);
+ e.writeMapItem("mount", e1.getKey());
+ e.writeMapKey("data");
+ e.beginArray();
+ for (WorkerProbeItem.DiskUsage du : e1.getValue()) {
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("t", du.getTime());
+ e.writeMapItem("avail", du.getAvailable());
+ e.writeMapItem("used", du.getUsed());
+ e.endMap();
+ }
+ e.endArray();
+ e.endMap();
+ }
+ e.endArray();
+
+ e.writeMapKey("ioLoad");
+ e.beginArray();
+ ix = 0;
+ for (Map.Entry<String, List<WorkerProbeItem.IOLoad>> e1 : wdd.ioLoad.entrySet()) {
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("index", ix++);
+ e.writeMapItem("device", e1.getKey());
+ e.writeMapKey("data");
+ e.beginArray();
+ for (WorkerProbeItem.IOLoad du : e1.getValue()) {
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("t", du.getTime());
+ e.writeMapItem("rt", du.getReadThroughput());
+ e.writeMapItem("wt", du.getWriteThroughput());
+ e.writeMapItem("load", du.getLoad());
+ e.endMap();
+ }
+ e.endArray();
+ e.endMap();
+ }
+ e.endArray();
+
+ e.endMap();
+ }
+}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerListBuilder.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerListBuilder.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/monitors/http/WorkerListBuilder.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,106 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jun 30, 2014
+ */
+package org.griphyn.vdl.karajan.monitor.monitors.http;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.griphyn.vdl.karajan.monitor.monitors.http.BrowserDataBuilder.WorkerData;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Block;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.CoasterStatusItem.Worker;
+import org.griphyn.vdl.karajan.monitor.processors.coasters.WorkerProbeItem;
+
+public class WorkerListBuilder {
+
+ private BrowserDataBuilder db;
+ private int page;
+ private int pageSize;
+ private String site;
+
+ public WorkerListBuilder(BrowserDataBuilder db, String site, int page, int pageSize) {
+ this.db = db;
+ this.site = site;
+ this.page = page;
+ this.pageSize = pageSize;
+ }
+
+ public void getData(JSONEncoder e) {
+ /*
+ * worker, node running on, wall time, run time, #apps running
+ * probes
+ */
+ Map<String, WorkerData> wd = db.getWorkerData();
+ List<String> filtered = new ArrayList<String>();
+ for (String wid : db.getWorkersByTime()) {
+ WorkerData wdd = wd.get(wid);
+ if (site == null || site.equals(wdd.site)) {
+ filtered.add(wid);
+ }
+ }
+ int start = (page - 1) * pageSize;
+ int i = -1;
+ e.beginMap();
+ db.writePagingData(e, filtered.size(), page, pageSize);
+ e.writeMapKey("data");
+ e.beginArray();
+ for (String wid : filtered) {
+ i++;
+ if (i < start) {
+ continue;
+ }
+ if (i > start + pageSize) {
+ break;
+ }
+ WorkerData wdd = wd.get(wid);
+
+ e.beginArrayItem();
+ e.beginMap();
+
+ e.writeMapItem("id", wid);
+ int index = wid.indexOf(':');
+ String blkId = wid.substring(0, index);
+ String wId = wid.substring(index + 1);
+
+ Block blk = db.getBlock(blkId);
+ Worker w = blk.getWorker(wId);
+
+ e.writeMapItem("node", w.node);
+ e.writeMapItem("cores", w.cores);
+ e.writeMapItem("startTime", blk.startTime);
+ e.writeMapItem("walltime", blk.walltime);
+ e.writeMapItem("activeApps", wdd.activeApps);
+ e.writeMapItem("failedApps", wdd.failedApps);
+ e.writeMapItem("completedApps", wdd.completedApps);
+
+ e.writeMapKey("cpuLoad");
+ e.beginArray();
+ for (WorkerProbeItem.Cpu cpu : wdd.cpuLoad) {
+ e.beginArrayItem();
+ e.beginMap();
+ e.writeMapItem("t", cpu.getTime());
+ e.writeMapItem("l", cpu.getLoad());
+ e.endMap();
+ }
+ e.endArray();
+ e.endMap();
+ }
+ e.endArray();
+ e.endMap();
+ }
+
+ private <T> T getLast(List<T> l) {
+ if (l == null || l.isEmpty()) {
+ return null;
+ }
+ return l.get(l.size() - 1);
+ }
+
+}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/AbstractMessageProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/AbstractMessageProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/AbstractMessageProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -17,7 +17,8 @@
@Override
public String getSupportedSourceName() {
- return getSupportedSource().getName();
+ String name = getSupportedSource().getName();
+ return name.substring(name.lastIndexOf('.') + 1);
}
@Override
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/SimpleParser.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/SimpleParser.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/SimpleParser.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -49,6 +49,25 @@
crt = index + tok.length();
}
+ public void markMatchedTo(char m, char pair) throws ParsingException {
+ int level = 1;
+ for (int i = crt; i < str.length(); i++) {
+ char c = str.charAt(i);
+ if (c == m) {
+ level--;
+ if (level == 0) {
+ tokEnd = i;
+ return;
+ }
+ }
+ if (c == pair) {
+ level++;
+ }
+ }
+ throw new ParsingException("Could not find \"" + m + "\" in \"" + remaining()
+ + "\". String is \"" + str + "\".");
+ }
+
public void skipTo(String tok) throws ParsingException {
int index = str.indexOf(tok, crt);
if (index == -1) {
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/BlockRequestedProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/BlockRequestedProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/BlockRequestedProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -41,9 +41,11 @@
p.beginToken();
p.markTo(",");
int coresPerWorker = Integer.parseInt(p.getToken());
+ p.skip("walltime=");
+ int walltime = Integer.parseInt(p.remaining());
CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
- item.newBlock(blockId, cores, coresPerWorker);
+ item.newBlock(blockId, cores, coresPerWorker, walltime);
}
catch (Exception e) {
e.printStackTrace();
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/CoasterStatusItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/CoasterStatusItem.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/CoasterStatusItem.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -33,8 +33,8 @@
return StatefulItemClass.MISC;
}
- public synchronized void newBlock(String id, int cores, int coresPerWorker) {
- Block b = new Block(id, cores, coresPerWorker);
+ public synchronized void newBlock(String id, int cores, int coresPerWorker, int walltime) {
+ Block b = new Block(id, cores, coresPerWorker, walltime);
blocks.put(id, b);
queuedBlocks++;
requestedCores += cores;
@@ -69,25 +69,31 @@
doneBlocks++;
}
- public synchronized void workerActive(String blockId) {
+ public synchronized void workerActive(String blockId, String workerId, String node, int cores, long now) {
Block b = getBlock(blockId);
- b.activeCores += b.coresPerWorker;
- activeCores += b.coresPerWorker;
- requestedCores -= b.coresPerWorker;
+ activeCores += cores;
+ requestedCores -= cores;
+ b.addWorker(workerId, node, cores, now);
}
- public synchronized void workerLost(String blockId) {
+ public synchronized void workerLost(String blockId, String workerId) {
Block b = getBlock(blockId);
- b.activeCores -= b.coresPerWorker;
- activeCores -= b.coresPerWorker;
- failedCores += b.coresPerWorker;
+ Worker w = b.getWorker(workerId);
+ int cores = w.cores;
+ activeCores -= cores;
+ failedCores += cores;
+ b.activeCores -= cores;
+ w.state = WorkerState.FAILED;
}
- public synchronized void workerShutDown(String blockId) {
+ public synchronized void workerShutDown(String blockId, String workerId) {
Block b = getBlock(blockId);
- b.activeCores -= b.coresPerWorker;
- activeCores -= b.coresPerWorker;
- doneCores += b.coresPerWorker;
+ Worker w = b.getWorker(workerId);
+ int cores = w.cores;
+ activeCores -= cores;
+ failedCores += cores;
+ b.activeCores -= cores;
+ w.state = WorkerState.FAILED;
}
private Block getBlock(String blockId) {
@@ -148,17 +154,53 @@
QUEUED, ACTIVE, FAILED, DONE
}
+ public enum WorkerState {
+ ACTIVE, FAILED, DONE
+ }
+
+ public static class Worker {
+ public String node;
+ public int cores;
+ public WorkerState state;
+
+ public Worker(String node, int cores) {
+ this.node = node;
+ this.cores = cores;
+ }
+ }
+
public static class Block {
public BlockState state;
public final String id;
public final int cores, coresPerWorker;
public int activeCores;
+ public int walltime;
+ public long startTime;
+ public Map<String, Worker> workers;
- public Block(String id, int cores, int coresPerWorker) {
+ public Block(String id, int cores, int coresPerWorker, int walltime) {
this.id = id;
this.cores = cores;
this.coresPerWorker = coresPerWorker;
this.state = BlockState.QUEUED;
+ this.walltime = walltime;
}
+
+ public Worker getWorker(String workerId) {
+ return workers.get(workerId);
+ }
+
+ public void addWorker(String workerId, String node, int cores, long now) {
+ if (workers == null) {
+ workers = new HashMap<String, Worker>();
+ }
+ Worker w = new Worker(node, cores);
+ w.state = WorkerState.ACTIVE;
+ workers.put(workerId, w);
+ activeCores += cores;
+ if (startTime == 0) {
+ startTime = now;
+ }
+ }
}
}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/RemoteLogProcessorDispatcher.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/RemoteLogProcessorDispatcher.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/RemoteLogProcessorDispatcher.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -20,7 +20,7 @@
@Override
public String getSupportedSourceName() {
- return RemoteLogHandler.class.getName();
+ return RemoteLogHandler.class.getSimpleName();
}
@Override
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerActiveProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerActiveProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerActiveProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -31,9 +31,15 @@
try {
p.skip("blockid=");
String blockId = p.word();
+ p.skip("id=");
+ String workerId = p.word();
+ p.skip("node=");
+ String node = p.word();
+ p.skip("cores=");
+ int cores = Integer.parseInt(p.word());
CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
- item.workerActive(blockId);
+ item.workerActive(blockId, workerId, node, cores, state.getCurrentTime());
}
catch (Exception e) {
e.printStackTrace();
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerLostProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerLostProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerLostProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -31,9 +31,11 @@
try {
p.skip("blockid=");
String blockId = p.word();
+ p.skip("id=");
+ String workerId = p.word();
CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
- item.workerLost(blockId);
+ item.workerLost(blockId, workerId);
}
catch (Exception e) {
e.printStackTrace();
Added: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeItem.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeItem.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeItem.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,151 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Aug 7, 2013
+ */
+package org.griphyn.vdl.karajan.monitor.processors.coasters;
+
+import java.util.Collection;
+
+import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
+
+public class WorkerProbeItem implements StatefulItem {
+ private String workerid;
+ private final Data data;
+
+ public WorkerProbeItem(String workerid, Data data) {
+ this.workerid = workerid;
+ this.data = data;
+ }
+
+ public Data getData() {
+ return data;
+ }
+
+
+
+ public static class Data {
+ private final long time;
+
+ public Data(long time) {
+ this.time = time;
+ }
+
+ public long getTime() {
+ return time;
+ }
+ }
+
+ public static class Cpu extends Data {
+ private final double load;
+
+ public Cpu(long time, double load) {
+ super(time);
+ this.load = load;
+ }
+
+ public double getLoad() {
+ return load;
+ }
+ }
+
+ public static class DiskUsage extends Data {
+ private final String mountPoint, fs;
+ private final long used, available;
+
+ public DiskUsage(long time, String mountPoint, String fs, long used, long available) {
+ super(time);
+ this.mountPoint = mountPoint;
+ this.fs = fs;
+ this.used = used;
+ this.available = available;
+ }
+
+ public String getMountPoint() {
+ return mountPoint;
+ }
+
+ public String getFs() {
+ return fs;
+ }
+
+ public long getUsed() {
+ return used;
+ }
+
+ public long getAvailable() {
+ return available;
+ }
+ }
+
+ public static class IOLoad extends Data {
+ private final String device;
+ private final int writeThroughput, readThroughput;
+ private final double load;
+
+ public IOLoad(long time, String device, int writeThroughput, int readThroughput, double load) {
+ super(time);
+ this.device = device;
+ this.writeThroughput = writeThroughput;
+ this.readThroughput = readThroughput;
+ this.load = load;
+ }
+
+ public String getDevice() {
+ return device;
+ }
+
+ public long getWriteThroughput() {
+ return writeThroughput;
+ }
+
+ public long getReadThroughput() {
+ return readThroughput;
+ }
+
+ public double getLoad() {
+ return load;
+ }
+ }
+
+ @Override
+ public StatefulItem getParent() {
+ return null;
+ }
+
+ @Override
+ public void setParent(StatefulItem parent) {
+ }
+
+ @Override
+ public void addChild(StatefulItem child) {
+ }
+
+ @Override
+ public void removeChild(StatefulItem child) {
+ }
+
+ @Override
+ public Collection<StatefulItem> getChildren() {
+ return null;
+ }
+
+ @Override
+ public StatefulItemClass getItemClass() {
+ return StatefulItemClass.MISC;
+ }
+
+ @Override
+ public String getID() {
+ return workerid;
+ }
+
+ @Override
+ public void addListener(Listener l) {
+ }
+}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeProcessor.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerProbeProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,76 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Aug 7, 2013
+ */
+package org.griphyn.vdl.karajan.monitor.processors.coasters;
+
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
+import org.griphyn.vdl.karajan.monitor.processors.SimpleParser;
+
+public class WorkerProbeProcessor extends AbstractRemoteLogProcessor {
+ private CoasterStatusItem item;
+
+ @Override
+ public void initialize(SystemState state) {
+ super.initialize(state);
+ }
+
+ @Override
+ public String getMessageHeader() {
+ return "PROBE";
+ }
+
+ @Override
+ public void processMessage(SystemState state, SimpleParser p, Object details) {
+ try {
+ p.skip("type=");
+ String type = p.word();
+ p.skip("workerid=");
+ String workerid = p.word();
+ p.skip("time=");
+ long time = (long) (1000 * Double.parseDouble(p.word()));
+
+ StatefulItem item = null;
+
+ if (type.equals("CPU")) {
+ p.skip("load=");
+ double load = Double.parseDouble(p.word());
+ item = new WorkerProbeItem(workerid, new WorkerProbeItem.Cpu(time, load));
+ }
+ else if (type.equals("DF")) {
+ p.skip("mount=");
+ String mountPoint = p.word();
+ p.skip("fs=");
+ String fs = p.word();
+ p.skip("used=");
+ long usedBytes = 1024 * Long.parseLong(p.word());
+ p.skip("avail=");
+ long availBytes = 1024 * Long.parseLong(p.word());
+ item = new WorkerProbeItem(workerid, new WorkerProbeItem.DiskUsage(time, mountPoint, fs, usedBytes, availBytes));
+ }
+ else if (type.equals("DL")) {
+ p.skip("dev=");
+ String device = p.word();
+ p.skip("wtr=");
+ int wtr = (int) Double.parseDouble(p.word());
+ p.skip("rtr=");
+ int rtr = (int) Double.parseDouble(p.word());
+ p.skip("load=");
+ double load = Double.parseDouble(p.word());
+ item = new WorkerProbeItem(workerid, new WorkerProbeItem.IOLoad(time, device, wtr, rtr, load));
+ }
+ if (item != null) {
+ state.itemUpdated(item);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerShutDownProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerShutDownProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/coasters/WorkerShutDownProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -31,9 +31,11 @@
try {
p.skip("blockid=");
String blockId = p.word();
+ p.skip("id=");
+ String workerId = p.word();
CoasterStatusItem item = (CoasterStatusItem) state.getItemByID(CoasterStatusItem.ID, StatefulItemClass.MISC);
- item.workerShutDown(blockId);
+ item.workerShutDown(blockId, workerId);
}
catch (Exception e) {
e.printStackTrace();
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/karajan/TaskProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/karajan/TaskProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/karajan/TaskProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -21,6 +21,9 @@
package org.griphyn.vdl.karajan.monitor.processors.karajan;
import org.apache.log4j.Level;
+import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
import org.griphyn.vdl.karajan.monitor.SystemState;
@@ -100,6 +103,42 @@
}
}
}
+ else if (p.matchAndSkip("JOB_TASK ")) {
+ p.skip("jobid=");
+ String jobid = p.word();
+ p.skip("taskid=");
+ id = p.word();
+ p.skip("exec=");
+ String exec = p.word();
+ p.skip("dir=");
+ String dir = p.word();
+ p.skip("args=");
+ String args = p.remaining();
+
+ ti = new TaskItem(id, Task.JOB_SUBMISSION);
+ Task t = new TaskImpl();
+ t.setType(Task.JOB_SUBMISSION);
+ JobSpecification spec = new JobSpecificationImpl();
+ spec.setExecutable(exec);
+ spec.setArguments(args);
+ spec.setDirectory(dir);
+ t.setSpecification(spec);
+ ti.setTask(t);
+ updateParent(state, id, ti);
+ state.addItem(ti);
+ }
+ else if (p.matchAndSkip("TASK_STATUS_CHANGE ")) {
+ p.skip("taskid=");
+ id = p.word();
+ p.skip("status=");
+ taskState = Integer.parseInt(p.word());
+ ti = (TaskItem) state.getItemByID(id, StatefulItemClass.TASK);
+ if (p.matchAndSkip("workerid=")) {
+ ti.setWorkerId(p.word());
+ }
+ ti.setStatus(taskState);
+ state.itemUpdated(ti);
+ }
else if (p.matchAndSkip("Task(")) {
p.skip("type=");
p.beginToken();
@@ -117,6 +156,7 @@
}
else {
ti = new TaskItem(id, taskType);
+ updateParent(state, id, ti);
state.addItem(ti);
}
}
@@ -125,6 +165,23 @@
e.printStackTrace();
}
}
+ if (ti != null && ti.getParent() != null && taskState != 0) {
+ ApplicationItem app = (ApplicationItem) ti.getParent();
+ switch (taskState) {
+ case Status.SUBMITTING:
+ case Status.SUBMITTED:
+ case Status.ACTIVE:
+ case Status.STAGE_IN:
+ case Status.STAGE_OUT:
+ app.setState(getAppStateFromTaskState(taskState), state.getCurrentTime());
+ app.setWorkerId(ti.getWorkerId());
+ state.itemUpdated(app);
+ break;
+ }
+ }
+ }
+
+ private void updateParent(SystemState state, String id, TaskItem ti) {
if (ti != null && id != null && ti.getParent() == null) {
int bi = id.indexOf(':');
int li = id.lastIndexOf('-');
@@ -138,20 +195,22 @@
bridge.addChild(ti);
}
}
- if (ti != null && ti.getParent() != null && taskState != 0) {
- ApplicationItem app = (ApplicationItem) ti.getParent();
- if (taskState == Status.SUBMITTING) {
- app.setState(ApplicationState.SUBMITTING, state.getCurrentTime());
- state.itemUpdated(app);
- }
- else if (taskState == Status.SUBMITTED) {
- app.setState(ApplicationState.SUBMITTED, state.getCurrentTime());
- state.itemUpdated(app);
- }
- else if (taskState == Status.ACTIVE) {
- app.setState(ApplicationState.ACTIVE, state.getCurrentTime());
- state.itemUpdated(app);
- }
+ }
+
+ private ApplicationState getAppStateFromTaskState(int taskState) {
+ switch (taskState) {
+ case Status.SUBMITTING:
+ return ApplicationState.SUBMITTING;
+ case Status.SUBMITTED:
+ return ApplicationState.SUBMITTED;
+ case Status.ACTIVE:
+ return ApplicationState.ACTIVE;
+ case Status.STAGE_IN:
+ return ApplicationState.STAGE_IN;
+ case Status.STAGE_OUT:
+ return ApplicationState.STAGE_OUT;
+ default:
+ return ApplicationState.INITIALIZING;
}
}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppEndProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppEndProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppEndProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -22,6 +22,8 @@
import org.apache.log4j.Level;
import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
import org.griphyn.vdl.karajan.monitor.processors.SimpleParser;
@@ -29,21 +31,23 @@
public class AppEndProcessor extends AbstractSwiftProcessor {
public Level getSupportedLevel() {
- return Level.DEBUG;
+ return Level.INFO;
}
@Override
public String getMessageHeader() {
- return "JOB_END";
+ return "END_SUCCESS";
}
public void processMessage(SystemState state, SimpleParser p, Object details) {
try {
- p.skip("jobid=");
- String id = p.word();
+ p.skip("thread=");
+ String threadid = p.word();
- StatefulItem app = state.getItemByID(id,
- StatefulItemClass.APPLICATION);
+ StatefulItem thread = state.getItemByID(threadid, StatefulItemClass.BRIDGE);
+ ApplicationItem app = (ApplicationItem) thread.getParent();
+ app.setState(ApplicationState.FINISHED_SUCCESSFULLY, state.getCurrentTime());
+ state.itemUpdated(app);
state.removeItem(app);
state.getStats("apps").remove();
}
Added: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppFailureProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppFailureProcessor.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppFailureProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Created on Jan 29, 2007
+ */
+package org.griphyn.vdl.karajan.monitor.processors.swift;
+
+import org.apache.log4j.Level;
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationItem;
+import org.griphyn.vdl.karajan.monitor.items.ApplicationState;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItem;
+import org.griphyn.vdl.karajan.monitor.items.StatefulItemClass;
+import org.griphyn.vdl.karajan.monitor.processors.SimpleParser;
+
+public class AppFailureProcessor extends AbstractSwiftProcessor {
+
+ public Level getSupportedLevel() {
+ return Level.DEBUG;
+ }
+
+ @Override
+ public String getMessageHeader() {
+ return "END_FAILURE";
+ }
+
+ public void processMessage(SystemState state, SimpleParser p, Object details) {
+ try {
+ p.skip("thread=");
+ String threadid = p.word();
+
+ StatefulItem thread = state.getItemByID(threadid, StatefulItemClass.BRIDGE);
+ ApplicationItem app = (ApplicationItem) thread.getParent();
+ app.setState(ApplicationState.FAILED, state.getCurrentTime());
+ state.itemUpdated(app);
+ state.removeItem(app);
+ state.getStats("apps").remove();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppStartProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppStartProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/AppStartProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -48,7 +48,7 @@
String args = "";
if (p.matchAndSkip("arguments=[")) {
p.beginToken();
- p.markTo("]");
+ p.markMatchedTo(']', '[');
args = p.getToken();
}
p.skip("host=");
Added: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/ConfigurationProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/ConfigurationProcessor.java (rev 0)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/ConfigurationProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2012 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Created on Aug 28, 2008
+ */
+package org.griphyn.vdl.karajan.monitor.processors.swift;
+
+import org.apache.log4j.Level;
+import org.griphyn.vdl.karajan.Loader;
+import org.griphyn.vdl.karajan.monitor.SystemState;
+import org.griphyn.vdl.karajan.monitor.processors.AbstractMessageProcessor;
+import org.griphyn.vdl.karajan.monitor.processors.ParsingException;
+import org.griphyn.vdl.karajan.monitor.processors.SimpleParser;
+
+public class ConfigurationProcessor extends AbstractMessageProcessor {
+
+ public Level getSupportedLevel() {
+ return Level.DEBUG;
+ }
+
+ public Class<?> getSupportedSource() {
+ return Loader.class;
+ }
+
+ public void processMessage(SystemState state, Object message, Object details) {
+ String msg = String.valueOf(message);
+ SimpleParser p = new SimpleParser(msg);
+ try {
+ if (p.matchAndSkip("SWIFT_CONFIGURATION")) {
+ p.skipTo(": {");
+ p.beginToken();
+ p.markTo("}");
+ String options = p.getToken();
+ String[] els = options.split(",\\s*");
+ for (String el : els) {
+ String[] kv = el.split("=", 2);
+ if (kv[0].equals("execution.retries")) {
+ state.setRetries(Integer.parseInt(kv[1]));
+ }
+ else if (kv[0].equals("replication.enabled")) {
+ state.setReplicationEnabled(Boolean.parseBoolean(kv[1]));
+ }
+ }
+ }
+ }
+ catch (ParsingException e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/JobProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/JobProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/JobProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -20,10 +20,15 @@
*/
package org.griphyn.vdl.karajan.monitor.processors.swift;
+import org.apache.log4j.Level;
import org.griphyn.vdl.karajan.lib.Execute;
import org.griphyn.vdl.karajan.monitor.processors.karajan.TaskProcessor;
public class JobProcessor extends TaskProcessor {
+
+ public Level getSupportedLevel() {
+ return Level.INFO;
+ }
public Class<?> getSupportedSource() {
return Execute.class;
Modified: trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/SummaryProcessor.java
===================================================================
--- trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/SummaryProcessor.java 2014-07-04 06:52:32 UTC (rev 7964)
+++ trunk/src/org/griphyn/vdl/karajan/monitor/processors/swift/SummaryProcessor.java 2014-07-04 06:54:51 UTC (rev 7965)
@@ -41,8 +41,30 @@
public void processMessage(SystemState state, Object message, Object details) {
String msg = String.valueOf(message);
if(msg.contains("CrtHeap")) {
- return;
+ processJVMInfo(state, msg);
}
+ else {
+ processTaskInfo(state, msg);
+ }
+ }
+
+ private void processJVMInfo(SystemState state, String msg) {
+ String[] els = msg.split(",\\s");
+ for (String el : els) {
+ String[] kv = el.split(": ");
+ if ("HeapMax".equals(kv[0])) {
+ state.setMaxHeap(Long.parseLong(kv[1]));
+ }
+ else if ("UsedHeap".equals(kv[0])) {
+ state.setUsedHeap(Long.parseLong(kv[1]));
+ }
+ else if ("JVMThreads".equals(kv[0])) {
+ state.setCurrentThreads(Integer.parseInt(kv[1]));
+ }
+ }
+ }
+
+ private void processTaskInfo(SystemState state, String msg) {
SummaryItem s = (SummaryItem) state.getItemByID(SummaryItem.ID, StatefulItemClass.WORKFLOW);
String[] pairs = msg.split(" ");
for (ApplicationState key : SummaryItem.STATES) {
More information about the Swift-commit
mailing list