CHUKWA-767. Implemented low pass filter for Charting REST API. (Eric Yang)
diff --git a/CHANGES.txt b/CHANGES.txt
index 8ea4d46..633a7c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,8 @@
IMPROVEMENTS
+ CHUKWA-767. Implemented low pass filter for Charting REST API. (Eric Yang)
+
CHUKWA-765. Minor stylesheets clean up. (Eric Yang)
CHUKWA-759. Configuration for Chukwa to monitor HBase. (Eric Yang)
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index cade66a..2bef452 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -63,11 +63,17 @@
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
+import org.mortbay.log.Log;
import com.google.gson.Gson;
public class ChukwaHBaseStore {
static Logger LOG = Logger.getLogger(ChukwaHBaseStore.class);
+ static int MINUTES_IN_HOUR = 60;
+ static double RESOLUTION = 360;
+ static int MINUTE = 60000; //60 milliseconds
+ static int SECOND = 1000;
+
static byte[] COLUMN_FAMILY = "t".getBytes();
static byte[] ANNOTATION_FAMILY = "a".getBytes();
static byte[] KEY_NAMES = "k".getBytes();
@@ -139,6 +145,7 @@
startTime = endTime;
endTime = temp;
}
+
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA));
Scan scan = new Scan();
@@ -150,10 +157,7 @@
long currentDay = startTime;
for (int i = startDay; i <= endDay; i++) {
byte[] rowKey = HBaseUtil.buildKey(currentDay, metric, source);
- // ColumnRangeFilter crf = new
- // ColumnRangeFilter(Long.valueOf(startTime).toString().getBytes(),
- // true, Long.valueOf(endTime).toString().getBytes(), true);
- // scan.setFilter(crf);
+
scan.addFamily(COLUMN_FAMILY);
scan.setStartRow(rowKey);
scan.setStopRow(rowKey);
@@ -162,8 +166,7 @@
ResultScanner results = table.getScanner(scan);
Iterator<Result> it = results.iterator();
- // TODO: Apply discrete wavelet transformation to limit the output
- // size to 1000 data points for graphing optimization. (i.e jwave)
+
while (it.hasNext()) {
Result result = it.next();
for (KeyValue kv : result.raw()) {
@@ -497,6 +500,23 @@
startTime = endTime;
endTime = temp;
}
+ // Figure out the time range and determine the best resolution
+ // to fetch the data
+ long range = Math.round((endTime - startTime)
+ / (MINUTES_IN_HOUR * MINUTE));
+ long sampleRate = 1;
+ if (range <= 1) {
+ sampleRate = 5;
+ } else if(range <= 24) {
+ sampleRate = 240;
+ } else if (range <= 720) {
+ sampleRate = 7200;
+ } else if(range >= 720) {
+ sampleRate = 87600;
+ }
+ double smoothing = (endTime - startTime)
+ / (sampleRate * SECOND ) / RESOLUTION;
+
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA));
Scan scan = new Scan();
@@ -522,8 +542,11 @@
ResultScanner results = table.getScanner(scan);
Iterator<Result> it = results.iterator();
- // TODO: Apply discrete wavelet transformation to limit the output
- // size to 1000 data points for graphing optimization. (i.e jwave)
+ double filteredValue = 0.0d;
+ long lastTime = startTime;
+ long totalElapsedTime = 0;
+ int initial = 0;
+
while (it.hasNext()) {
Result result = it.next();
for (KeyValue kv : result.raw()) {
@@ -531,11 +554,34 @@
long timestamp = ByteBuffer.wrap(key).getLong();
double value = Double.parseDouble(new String(kv.getValue(),
"UTF-8"));
- ArrayList<Number> points = new ArrayList<Number>();
- points.add(timestamp);
- points.add(value);
- data.add(points);
+ if(initial==0) {
+ filteredValue = value;
+ }
+ long elapsedTime = (timestamp - lastTime) / SECOND;
+ lastTime = timestamp;
+ // Determine if there is any gap, if there is gap in data, reset
+ // calculation.
+ if (elapsedTime > sampleRate) {
+ filteredValue = 0.0d;
+ } else {
+ if (smoothing != 0.0d) {
+ // Apply low pass filter to calculate
+ filteredValue = filteredValue + (double) ((double) elapsedTime * (double) ((double) (value - filteredValue) / smoothing));
+ } else {
+ // Use original value
+ filteredValue = value;
+ }
+ }
+ totalElapsedTime = totalElapsedTime + elapsedTime;
+ if (totalElapsedTime >= sampleRate) {
+ ArrayList<Number> points = new ArrayList<Number>();
+ points.add(timestamp);
+ points.add(filteredValue);
+ data.add(points);
+ totalElapsedTime = 0;
+ }
}
+ initial++;
}
results.close();
currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
@@ -748,15 +794,15 @@
String[] metrics = { "SystemMetrics.LoadAverage.1" };
createChart("1", "", "System Load Average", metrics, hostname);
String[] cpuMetrics = { "SystemMetrics.cpu.combined", "SystemMetrics.cpu.sys", "SystemMetrics.cpu.user" };
- createChart("2", "%", "CPU Utilization", cpuMetrics, hostname);
+ createChart("2", "percent", "CPU Utilization", cpuMetrics, hostname);
String[] memMetrics = { "SystemMetrics.memory.FreePercent", "SystemMetrics.memory.UsedPercent"};
- createChart("3", "%", "Memory Utilization", memMetrics, hostname);
+ createChart("3", "percent", "Memory Utilization", memMetrics, hostname);
String[] diskMetrics = { "SystemMetrics.disk.ReadBytes", "SystemMetrics.disk.WriteBytes" };
createChart("4", "bytes-decimal", "Disk Utilization", diskMetrics, hostname);
String[] netMetrics = { "SystemMetrics.network.TxBytes", "SystemMetrics.network.RxBytes" };
createChart("5", "bytes", "Network Utilization", netMetrics, hostname);
String[] swapMetrics = { "SystemMetrics.swap.Total", "SystemMetrics.swap.Used", "SystemMetrics.swap.Free" };
- createChart("6", "", "Swap Utilization", swapMetrics, hostname);
+ createChart("6", "bytes-decimal", "Swap Utilization", swapMetrics, hostname);
// Populate default widgets
Widget widget = new Widget();