blob: a72e1bd956fc94d2af87f8bb2993eeeacccfea80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Demux parser for system metrics data collected through
* org.apache.hadoop.chukwa.datacollection.adaptor.sigar.SystemMetrics.
*/
package org.apache.hadoop.chukwa.extraction.hbase;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.TimeZone;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
public class SystemMetrics extends AbstractProcessor {
public SystemMetrics() throws NoSuchAlgorithmException {
super();
}
@Override
protected void parse(byte[] recordEntry) throws Throwable {
String buffer = new String(recordEntry);
JSONObject json = (JSONObject) JSONValue.parse(buffer);
time = ((Long) json.get("timestamp")).longValue();
ChukwaRecord record = new ChukwaRecord();
JSONArray cpuList = (JSONArray) json.get("cpu");
double combined = 0.0;
double user = 0.0;
double sys = 0.0;
double idle = 0.0;
int actualSize = 0;
for (int i = 0; i < cpuList.size(); i++) {
JSONObject cpu = (JSONObject) cpuList.get(i);
// Work around for sigar returning null sometimes for cpu metrics on
// pLinux
if (cpu.get("combined") == null) {
continue;
}
actualSize++;
combined = combined + Double.parseDouble(cpu.get("combined").toString());
user = user + Double.parseDouble(cpu.get("user").toString());
sys = sys + Double.parseDouble(cpu.get("sys").toString());
idle = idle + Double.parseDouble(cpu.get("idle").toString());
for (@SuppressWarnings("unchecked")
Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator(); iterator
.hasNext();) {
String key = iterator.next();
addRecord("cpu." + key + "." + i, cpu.get(key).toString());
}
}
combined = combined / actualSize;
user = user / actualSize;
sys = sys / actualSize;
idle = idle / actualSize;
addRecord("cpu.combined", Double.toString(combined));
addRecord("cpu.user", Double.toString(user));
addRecord("cpu.idle", Double.toString(idle));
addRecord("cpu.sys", Double.toString(sys));
addRecord("Uptime", json.get("uptime").toString());
JSONArray loadavg = (JSONArray) json.get("loadavg");
addRecord("LoadAverage.1", loadavg.get(0).toString());
addRecord("LoadAverage.5", loadavg.get(1).toString());
addRecord("LoadAverage.15", loadavg.get(2).toString());
record = new ChukwaRecord();
JSONObject memory = (JSONObject) json.get("memory");
@SuppressWarnings("unchecked")
Iterator<String> memKeys = memory.keySet().iterator();
while (memKeys.hasNext()) {
String key = memKeys.next();
addRecord("memory." + key, memory.get(key).toString());
}
record = new ChukwaRecord();
JSONObject swap = (JSONObject) json.get("swap");
@SuppressWarnings("unchecked")
Iterator<String> swapKeys = swap.keySet().iterator();
while (swapKeys.hasNext()) {
String key = swapKeys.next();
addRecord("swap." + key, swap.get(key).toString());
}
double rxBytes = 0;
double rxDropped = 0;
double rxErrors = 0;
double rxPackets = 0;
double txBytes = 0;
double txCollisions = 0;
double txErrors = 0;
double txPackets = 0;
record = new ChukwaRecord();
JSONArray netList = (JSONArray) json.get("network");
for (int i = 0; i < netList.size(); i++) {
JSONObject netIf = (JSONObject) netList.get(i);
@SuppressWarnings("unchecked")
Iterator<String> keys = netIf.keySet().iterator();
while (keys.hasNext()) {
String key = keys.next();
record.add(key + "." + i, netIf.get(key).toString());
if (i != 0) {
if (key.equals("RxBytes")) {
rxBytes = rxBytes + (Long) netIf.get(key);
} else if (key.equals("RxDropped")) {
rxDropped = rxDropped + (Long) netIf.get(key);
} else if (key.equals("RxErrors")) {
rxErrors = rxErrors + (Long) netIf.get(key);
} else if (key.equals("RxPackets")) {
rxPackets = rxPackets + (Long) netIf.get(key);
} else if (key.equals("TxBytes")) {
txBytes = txBytes + (Long) netIf.get(key);
} else if (key.equals("TxCollisions")) {
txCollisions = txCollisions + (Long) netIf.get(key);
} else if (key.equals("TxErrors")) {
txErrors = txErrors + (Long) netIf.get(key);
} else if (key.equals("TxPackets")) {
txPackets = txPackets + (Long) netIf.get(key);
}
}
}
}
addRecord("network.RxBytes", Double.toString(rxBytes));
addRecord("network.RxDropped", Double.toString(rxDropped));
addRecord("network.RxErrors", Double.toString(rxErrors));
addRecord("network.RxPackets", Double.toString(rxPackets));
addRecord("network.TxBytes", Double.toString(txBytes));
addRecord("network.TxCollisions", Double.toString(txCollisions));
addRecord("network.TxErrors", Double.toString(txErrors));
addRecord("network.TxPackets", Double.toString(txPackets));
double readBytes = 0;
double reads = 0;
double writeBytes = 0;
double writes = 0;
double total = 0;
double used = 0;
record = new ChukwaRecord();
JSONArray diskList = (JSONArray) json.get("disk");
for (int i = 0; i < diskList.size(); i++) {
JSONObject disk = (JSONObject) diskList.get(i);
Iterator<String> keys = disk.keySet().iterator();
while (keys.hasNext()) {
String key = keys.next();
record.add(key + "." + i, disk.get(key).toString());
if (key.equals("ReadBytes")) {
readBytes = readBytes + (Long) disk.get("ReadBytes");
} else if (key.equals("Reads")) {
reads = reads + (Long) disk.get("Reads");
} else if (key.equals("WriteBytes")) {
writeBytes = writeBytes + (Long) disk.get("WriteBytes");
} else if (key.equals("Writes")) {
writes = writes + (Long) disk.get("Writes");
} else if (key.equals("Total")) {
total = total + (Long) disk.get("Total");
} else if (key.equals("Used")) {
used = used + (Long) disk.get("Used");
}
}
}
double percentUsed = used / total;
addRecord("disk.ReadBytes", Double.toString(readBytes));
addRecord("disk.Reads", Double.toString(reads));
addRecord("disk.WriteBytes", Double.toString(writeBytes));
addRecord("disk.Writes", Double.toString(writes));
addRecord("disk.Total", Double.toString(total));
addRecord("disk.Used", Double.toString(used));
addRecord("disk.PercentUsed", Double.toString(percentUsed));
}
}