[EAGLE-877] Rename metric hadoop.hbase.ipc.ipc.* to hadoop.hbase.regionserver.ipc.*
Rename metric "hadoop.hbase.ipc.ipc." to "hadoop.hbase.regionserver.ipc." to support different hbase version metrics.
https://issues.apache.org/jira/browse/EAGLE-877
Author: Hao Chen <hao@apache.org>
Closes #786 from haoch/EAGLE-877.
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index be7b9c7..78183e0 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -82,6 +82,17 @@
metric["metric"] = "hadoop.datanode.fsdatasetstate.dfsused"
self.collector.collect(metric)
+class HBaseRegionServerMetric(JmxMetricListener):
+ def on_metric(self, metric):
+ """
+ Rename metric "hadoop.hbase.ipc.ipc.*" to "hadoop.hbase.regionserver.ipc.*" to support different hbase version metric
+ """
+ if fnmatch.fnmatch(metric["metric"],"hadoop.hbase.ipc.ipc.*") and metric["component"] == "regionserver":
+ new_metric_name = metric["metric"].replace("hadoop.hbase.ipc.ipc.","hadoop.hbase.regionserver.ipc.")
+ logging.debug("Rename metric %s to %s" % (metric["metric"], new_metric_name))
+ metric["metric"] = new_metric_name
+ self.collector.collect(metric)
+
if __name__ == '__main__':
collector = JmxMetricCollector()
collector.register(
@@ -90,6 +101,7 @@
MemoryUsageMetric(),
NNCapacityUsageMetric(),
JournalTransactionInfoMetric(),
- DatanodeFSDatasetState()
+ DatanodeFSDatasetState(),
+ HBaseRegionServerMetric()
)
Runner.run(collector)
\ No newline at end of file
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index 0f09a9e..a472bbe 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -295,23 +295,26 @@
super(MetricCollector, self).start()
def collect(self, msg):
- if not msg.has_key("timestamp"):
- msg["timestamp"] = int(round(time.time() * 1000))
- if msg.has_key("value"):
- msg["value"] = float(str(msg["value"]))
- if not msg.has_key("host") or len(msg["host"]) == 0:
- raise Exception("host is null: " + str(msg))
- if not msg.has_key("site"):
- msg["site"] = self.config["env"]["site"]
- if len(self.filters) == 0:
- self.sender.send(msg)
- return
- else:
- for filter in self.filters:
- if filter.filter_metric(msg):
- self.sender.send(msg)
- return
- # logging.info("Drop metric: " + str(msg))
+ try:
+ if not msg.has_key("timestamp"):
+ msg["timestamp"] = int(round(time.time() * 1000))
+ if msg.has_key("value"):
+ msg["value"] = float(str(msg["value"]))
+ if not msg.has_key("host") or len(msg["host"]) == 0:
+ raise Exception("host is null: " + str(msg))
+ if not msg.has_key("site"):
+ msg["site"] = self.config["env"]["site"]
+ if len(self.filters) == 0:
+ self.sender.send(msg)
+ return
+ else:
+ for filter in self.filters:
+ if filter.filter_metric(msg):
+ self.sender.send(msg)
+ return
+ except Exception as e:
+ logging.error("Failed to emit metric: %s" % msg)
+ logging.exception(e)
def close(self):
self.sender.close()