CHUKWA-744. Implemented new parsers for extract and transform data to HBase format. (Eric Yang)
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
new file mode 100644
index 0000000..b39c789
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter;
+import org.apache.hadoop.chukwa.util.HBaseUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+
+public abstract class AbstractProcessor {
+ static Logger LOG = Logger.getLogger(AbstractProcessor.class);
+
+ protected int entryCount = 0;
+ protected String primaryKeyHelper;
+ protected String sourceHelper;
+
+ protected byte[] key = null;
+ byte[] CF = "t".getBytes();
+
+ boolean chunkInErrorSaved = false;
+ ArrayList<Put> output = null;
+ ArrayList<Put> meta = null;
+ Reporter reporter = null;
+ long time = System.currentTimeMillis();
+ Chunk chunk = null;
+ MessageDigest md5 = null;
+
+ public AbstractProcessor() throws NoSuchAlgorithmException {
+ md5 = MessageDigest.getInstance("md5");
+ }
+
+ protected abstract void parse(byte[] recordEntry) throws Throwable;
+
+ /**
+ * Generic metric function to add a metric to HBase with full primary key and
+ * source computed.
+ *
+ * @param time
+ * @param metric
+ * @param source
+ * @param value
+ * @param output
+ */
+ public void addRecord(long time, String metric, String source, byte[] value,
+ ArrayList<Put> output) {
+ String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
+ .append(metric).toString();
+ byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
+ Put put = new Put(key);
+ byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+ put.add(CF, timeInBytes, time, value);
+ output.add(put);
+ reporter.putMetric(chunk.getDataType(), primaryKey);
+ reporter.putSource(chunk.getDataType(), source);
+ }
+
+ public void addRecord(String primaryKey, String value) {
+ addRecord(primaryKey, value.getBytes());
+ }
+
+ /**
+ * Generic function to add a metric to HBase metric table, this function
+ * assumes "time" and "source" have been defined and will construct primaryKey
+ * only, without recompute time and source md5.
+ *
+ * @param time
+ * @param primaryKey
+ * @param value
+ * @param output
+ */
+ public void addRecord(String metric, byte[] value) {
+ String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
+ .append(metric).toString();
+ byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
+ Put put = new Put(key);
+ byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+ put.add(CF, timeInBytes, time, value);
+ output.add(put);
+ reporter.putMetric(chunk.getDataType(), primaryKey);
+ }
+
+ /**
+ * Process a chunk to store in HBase.
+ *
+ * @param chunk
+ * @param output
+ * @param reporter
+ * @throws Throwable
+ */
+ public void process(Chunk chunk, ArrayList<Put> output, Reporter reporter)
+ throws Throwable {
+ this.output = output;
+ this.reporter = reporter;
+ this.chunk = chunk;
+ this.primaryKeyHelper = chunk.getDataType();
+ this.sourceHelper = chunk.getSource();
+ reporter.putSource(primaryKeyHelper, sourceHelper);
+ parse(chunk.getData());
+ addMeta();
+ }
+
+ protected void addMeta() {
+ byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), sourceHelper);
+ Put put = new Put(key);
+ String family = "a";
+ byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+ put.add(family.getBytes(), timeInBytes, time, chunk.getTags().getBytes());
+ output.add(put);
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java
new file mode 100644
index 0000000..156d9d5
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+
+public class ChukwaMetricsProcessor extends HadoopMetricsProcessor {
+ static Logger LOG = Logger.getLogger(ChukwaMetricsProcessor.class);
+
+ public ChukwaMetricsProcessor() throws NoSuchAlgorithmException {
+ super();
+ }
+
+ /**
+ * Process cluster name and store in HBase.
+ *
+ * @param chunk
+ * @param output
+ * @param reporter
+ * @throws Throwable
+ */
+ @Override
+ public void process(Chunk chunk, ArrayList<Put> output, Reporter reporter)
+ throws Throwable {
+ this.output = output;
+ this.reporter = reporter;
+ this.chunk = chunk;
+ this.primaryKeyHelper = chunk.getDataType();
+ this.sourceHelper = chunk.getSource();
+ String clusterName = chunk.getTag("cluster");
+ reporter.putSource(primaryKeyHelper, sourceHelper);
+ reporter.putClusterName(primaryKeyHelper, clusterName);
+ parse(chunk.getData());
+ addMeta();
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
new file mode 100644
index 0000000..2da64a3
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.hadoop.chukwa.util.HBaseUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+
+public class DefaultProcessor extends AbstractProcessor {
+
+ public DefaultProcessor() throws NoSuchAlgorithmException {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+static Logger LOG = Logger.getLogger(DefaultProcessor.class);
+
+ @Override
+ protected void parse(byte[] recordEntry) throws Throwable {
+ byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), chunk.getSource());
+ Put put = new Put(key);
+ byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+ put.add("t".getBytes(), timeInBytes, chunk.getData());
+ output.add(put);
+ JSONObject json = new JSONObject();
+ json.put("sig", key);
+ json.put("type", "unknown");
+ reporter.put(chunk.getDataType(), chunk.getSource(), json.toString());
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
new file mode 100644
index 0000000..3afd71a
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.util.HBaseUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+public class HadoopMetricsProcessor extends AbstractProcessor {
+
+ static Logger LOG = Logger.getLogger(HadoopMetricsProcessor.class);
+ static final String timestampField = "timestamp";
+ static final String contextNameField = "contextName";
+ static final String recordNameField = "recordName";
+ static final byte[] cf = "t".getBytes();
+
+ public HadoopMetricsProcessor() throws NoSuchAlgorithmException {
+ }
+
+ @Override
+ protected void parse(byte[] recordEntry) throws Throwable {
+ try {
+ String body = new String(recordEntry);
+ int start = body.indexOf('{');
+ JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
+
+ time = ((Long) json.get(timestampField)).longValue();
+ String contextName = (String) json.get(contextNameField);
+ String recordName = (String) json.get(recordNameField);
+ byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+
+ @SuppressWarnings("unchecked")
+ Iterator<String> ki = json.keySet().iterator();
+ while (ki.hasNext()) {
+ String keyName = ki.next();
+ if (timestampField.intern() == keyName.intern()) {
+ continue;
+ } else if (contextNameField.intern() == keyName.intern()) {
+ continue;
+ } else if (recordNameField.intern() == keyName.intern()) {
+ continue;
+ } else {
+ if(json.get(keyName)!=null) {
+ byte[] v = json.get(keyName).toString().getBytes();
+ String primaryKey = new StringBuilder(contextName).append(".").
+ append(recordName).append(".").
+ append(keyName).toString();
+ byte[] rowKey = HBaseUtil.buildKey(time, primaryKey, chunk.getSource());
+ Put r = new Put(rowKey);
+ r.add(cf, timeInBytes, time, v);
+ output.add(r);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
+ e);
+ throw e;
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
new file mode 100644
index 0000000..dcbe2d4
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class LogEntry {
+ private final static SimpleDateFormat sdf = new SimpleDateFormat(
+ "yyyy-MM-dd HH:mm");
+
+ private Date date;
+ private String logLevel;
+ private String className;
+ private String body;
+
+ public LogEntry(String recordEntry) throws ParseException {
+ String dStr = recordEntry.substring(0, 23);
+ date = sdf.parse(dStr);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ logLevel = recordEntry.substring(start, idx);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ className = recordEntry.substring(start, idx - 1);
+ body = recordEntry.substring(idx + 1);
+ }
+
+ public Date getDate() {
+ return date;
+ }
+
+ public void setDate(Date date) {
+ this.date = date;
+ }
+
+ public String getLogLevel() {
+ return logLevel;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public String getBody() {
+ return body;
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java
new file mode 100644
index 0000000..96931d7
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+
+import java.util.HashMap;
+import org.apache.log4j.Logger;
+
+public class ProcessorFactory {
+ static Logger log = Logger.getLogger(ProcessorFactory.class);
+
+ private static HashMap<String, AbstractProcessor> processors = new HashMap<String, AbstractProcessor>(); // registry
+
+ public ProcessorFactory() {
+ }
+
+ public static AbstractProcessor getProcessor(String parserClass)
+ throws UnknownRecordTypeException {
+ if (processors.containsKey(parserClass)) {
+ return processors.get(parserClass);
+ } else {
+ AbstractProcessor processor = null;
+ try {
+ processor = (AbstractProcessor) Class.forName(parserClass).getConstructor()
+ .newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new UnknownRecordTypeException("Unknown parserClass:"
+ + parserClass, e);
+ } catch (Exception e) {
+ throw new UnknownRecordTypeException("error constructing processor", e);
+ }
+
+ // TODO using a ThreadSafe/reuse flag to actually decide if we want
+ // to reuse the same processor again and again
+ processors.put(parserClass, processor);
+ return processor;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
new file mode 100644
index 0000000..a72e1bd
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Demux parser for system metrics data collected through
+ * org.apache.hadoop.chukwa.datacollection.adaptor.sigar.SystemMetrics.
+ */
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.TimeZone;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+public class SystemMetrics extends AbstractProcessor {
+
+ public SystemMetrics() throws NoSuchAlgorithmException {
+ super();
+ }
+
+ @Override
+ protected void parse(byte[] recordEntry) throws Throwable {
+ String buffer = new String(recordEntry);
+ JSONObject json = (JSONObject) JSONValue.parse(buffer);
+ time = ((Long) json.get("timestamp")).longValue();
+ ChukwaRecord record = new ChukwaRecord();
+ JSONArray cpuList = (JSONArray) json.get("cpu");
+ double combined = 0.0;
+ double user = 0.0;
+ double sys = 0.0;
+ double idle = 0.0;
+ int actualSize = 0;
+ for (int i = 0; i < cpuList.size(); i++) {
+ JSONObject cpu = (JSONObject) cpuList.get(i);
+ // Work around for sigar returning null sometimes for cpu metrics on
+ // pLinux
+ if (cpu.get("combined") == null) {
+ continue;
+ }
+ actualSize++;
+ combined = combined + Double.parseDouble(cpu.get("combined").toString());
+ user = user + Double.parseDouble(cpu.get("user").toString());
+ sys = sys + Double.parseDouble(cpu.get("sys").toString());
+ idle = idle + Double.parseDouble(cpu.get("idle").toString());
+ for (@SuppressWarnings("unchecked")
+ Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator(); iterator
+ .hasNext();) {
+ String key = iterator.next();
+ addRecord("cpu." + key + "." + i, cpu.get(key).toString());
+ }
+ }
+ combined = combined / actualSize;
+ user = user / actualSize;
+ sys = sys / actualSize;
+ idle = idle / actualSize;
+ addRecord("cpu.combined", Double.toString(combined));
+ addRecord("cpu.user", Double.toString(user));
+ addRecord("cpu.idle", Double.toString(idle));
+ addRecord("cpu.sys", Double.toString(sys));
+
+ addRecord("Uptime", json.get("uptime").toString());
+ JSONArray loadavg = (JSONArray) json.get("loadavg");
+ addRecord("LoadAverage.1", loadavg.get(0).toString());
+ addRecord("LoadAverage.5", loadavg.get(1).toString());
+ addRecord("LoadAverage.15", loadavg.get(2).toString());
+
+ record = new ChukwaRecord();
+ JSONObject memory = (JSONObject) json.get("memory");
+ @SuppressWarnings("unchecked")
+ Iterator<String> memKeys = memory.keySet().iterator();
+ while (memKeys.hasNext()) {
+ String key = memKeys.next();
+ addRecord("memory." + key, memory.get(key).toString());
+ }
+
+ record = new ChukwaRecord();
+ JSONObject swap = (JSONObject) json.get("swap");
+ @SuppressWarnings("unchecked")
+ Iterator<String> swapKeys = swap.keySet().iterator();
+ while (swapKeys.hasNext()) {
+ String key = swapKeys.next();
+ addRecord("swap." + key, swap.get(key).toString());
+ }
+
+ double rxBytes = 0;
+ double rxDropped = 0;
+ double rxErrors = 0;
+ double rxPackets = 0;
+ double txBytes = 0;
+ double txCollisions = 0;
+ double txErrors = 0;
+ double txPackets = 0;
+ record = new ChukwaRecord();
+ JSONArray netList = (JSONArray) json.get("network");
+ for (int i = 0; i < netList.size(); i++) {
+ JSONObject netIf = (JSONObject) netList.get(i);
+ @SuppressWarnings("unchecked")
+ Iterator<String> keys = netIf.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ record.add(key + "." + i, netIf.get(key).toString());
+ if (i != 0) {
+ if (key.equals("RxBytes")) {
+ rxBytes = rxBytes + (Long) netIf.get(key);
+ } else if (key.equals("RxDropped")) {
+ rxDropped = rxDropped + (Long) netIf.get(key);
+ } else if (key.equals("RxErrors")) {
+ rxErrors = rxErrors + (Long) netIf.get(key);
+ } else if (key.equals("RxPackets")) {
+ rxPackets = rxPackets + (Long) netIf.get(key);
+ } else if (key.equals("TxBytes")) {
+ txBytes = txBytes + (Long) netIf.get(key);
+ } else if (key.equals("TxCollisions")) {
+ txCollisions = txCollisions + (Long) netIf.get(key);
+ } else if (key.equals("TxErrors")) {
+ txErrors = txErrors + (Long) netIf.get(key);
+ } else if (key.equals("TxPackets")) {
+ txPackets = txPackets + (Long) netIf.get(key);
+ }
+ }
+ }
+ }
+
+ addRecord("network.RxBytes", Double.toString(rxBytes));
+ addRecord("network.RxDropped", Double.toString(rxDropped));
+ addRecord("network.RxErrors", Double.toString(rxErrors));
+ addRecord("network.RxPackets", Double.toString(rxPackets));
+ addRecord("network.TxBytes", Double.toString(txBytes));
+ addRecord("network.TxCollisions", Double.toString(txCollisions));
+ addRecord("network.TxErrors", Double.toString(txErrors));
+ addRecord("network.TxPackets", Double.toString(txPackets));
+
+ double readBytes = 0;
+ double reads = 0;
+ double writeBytes = 0;
+ double writes = 0;
+ double total = 0;
+ double used = 0;
+ record = new ChukwaRecord();
+ JSONArray diskList = (JSONArray) json.get("disk");
+ for (int i = 0; i < diskList.size(); i++) {
+ JSONObject disk = (JSONObject) diskList.get(i);
+ Iterator<String> keys = disk.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ record.add(key + "." + i, disk.get(key).toString());
+ if (key.equals("ReadBytes")) {
+ readBytes = readBytes + (Long) disk.get("ReadBytes");
+ } else if (key.equals("Reads")) {
+ reads = reads + (Long) disk.get("Reads");
+ } else if (key.equals("WriteBytes")) {
+ writeBytes = writeBytes + (Long) disk.get("WriteBytes");
+ } else if (key.equals("Writes")) {
+ writes = writes + (Long) disk.get("Writes");
+ } else if (key.equals("Total")) {
+ total = total + (Long) disk.get("Total");
+ } else if (key.equals("Used")) {
+ used = used + (Long) disk.get("Used");
+ }
+ }
+ }
+ double percentUsed = used / total;
+ addRecord("disk.ReadBytes", Double.toString(readBytes));
+ addRecord("disk.Reads", Double.toString(reads));
+ addRecord("disk.WriteBytes", Double.toString(writeBytes));
+ addRecord("disk.Writes", Double.toString(writes));
+ addRecord("disk.Total", Double.toString(total));
+ addRecord("disk.Used", Double.toString(used));
+ addRecord("disk.PercentUsed", Double.toString(percentUsed));
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java
new file mode 100644
index 0000000..866eb2c
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+
+public class UnknownRecordTypeException extends Exception {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 8925135975093252279L;
+
+ public UnknownRecordTypeException() {
+ }
+
+ public UnknownRecordTypeException(String message) {
+ super(message);
+ }
+
+ public UnknownRecordTypeException(Throwable cause) {
+ super(cause);
+ }
+
+ public UnknownRecordTypeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
new file mode 100644
index 0000000..d463dd1
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
@@ -0,0 +1,62 @@
+package org.apache.hadoop.chukwa.util;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.hadoop.chukwa.extraction.hbase.AbstractProcessor;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+
+public class HBaseUtil {
+ private static Logger LOG = Logger.getLogger(HBaseUtil.class);
+
+ static Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ static MessageDigest md5 = null;
+ static {
+ try {
+ md5 = MessageDigest.getInstance("md5");
+ } catch (NoSuchAlgorithmException e) {
+ LOG.warn(ExceptionUtil.getStackTrace(e));
+ }
+ }
+
+ public HBaseUtil() throws NoSuchAlgorithmException {
+ }
+
+ public byte[] buildKey(long time, String metricGroup, String metric,
+ String source) {
+ String fullKey = new StringBuilder(metricGroup).append(".")
+ .append(metric).toString();
+ return buildKey(time, fullKey, source);
+ }
+
+ public static byte[] buildKey(long time, String primaryKey) {
+ c.setTimeInMillis(time);
+ byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+ byte[] pk = getHash(primaryKey);
+ byte[] key = new byte[8];
+ System.arraycopy(day, 0, key, 0, day.length);
+ System.arraycopy(pk, 0, key, 2, 3);
+ return key;
+ }
+
+ public static byte[] buildKey(long time, String primaryKey, String source) {
+ c.setTimeInMillis(time);
+ byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+ byte[] pk = getHash(primaryKey);
+ byte[] src = getHash(source);
+ byte[] key = new byte[8];
+ System.arraycopy(day, 0, key, 0, day.length);
+ System.arraycopy(pk, 0, key, 2, 3);
+ System.arraycopy(src, 0, key, 5, 3);
+ return key;
+ }
+
+ private static byte[] getHash(String key) {
+ byte[] hash = new byte[3];
+ System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 3);
+ return hash;
+ }
+}