[EAGLE-1023] Update jmx metric collector scripts
https://issues.apache.org/jira/browse/EAGLE-1023
* fix multi-thread bug in fnmatch
* add HBase ha check script
* change url connection timeout from 30s to 60s
* add necessary exception handling
* add two new metrics `hadoop.namenode.dfs.checkpointtimelag` & `hadoop.namenode.fsnamesystemstate.numrevisedlivedatanodes`
* update metric filter configuration
Author: Zhao, Qingwen <qingwzhao@apache.org>
Closes #935 from qingwen220/EAGLE-1023.
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
index dcca28e..f02d8df 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
@@ -22,38 +22,35 @@
class HadoopNNHAChecker(MetricCollector):
def run(self):
hosts = []
- host_name_list = []
+
for input in self.config["input"]:
if not input.has_key("host"):
input["host"] = socket.getfqdn()
if input.has_key("component") and input["component"] == "namenode":
hosts.append(input)
- host_name_list.append(input["host"])
if not bool(hosts):
logging.warn("non hosts are configured as 'namenode' in 'input' config, exit")
return
logging.info("Checking namenode HA: " + str(hosts))
- total_count = len(hosts)
-
- all_hosts_name = string.join(host_name_list,",")
-
- self.collect({
- "host": all_hosts_name,
- "component": "namenode",
- "metric": "hadoop.namenode.hastate.total.count",
- "value": total_count
- })
active_count = 0
standby_count = 0
failed_count = 0
+ failed_host_list = []
+ host_name_list = []
+
for host in hosts:
try:
- bean = JmxReader(host["host"], host["port"], host["https"]).open().get_jmx_bean_by_name(
- "Hadoop:service=NameNode,name=FSNamesystem")
+ if host.has_key("source_host"):
+ host["host"] = host["source_host"]
+
+ host_name_list.append(host["host"])
+ bean = JmxReader(host["host"], host["port"], host["https"]) \
+ .read_query("/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem&anonymous=true") \
+ .get_jmx_bean_by_name("Hadoop:service=NameNode,name=FSNamesystem")
if not bean:
logging.error("JMX Bean[Hadoop:service=NameNode,name=FSNamesystem] is null from " + host["host"])
if bean.has_key("tag.HAState"):
@@ -67,6 +64,19 @@
except Exception as e:
logging.exception("failed to read jmx from " + host["host"] + ":" + host["port"])
failed_count += 1
+ failed_host_list.append(host["host"])
+
+
+ total_count = len(hosts)
+ all_hosts_name = string.join(host_name_list,",")
+
+ self.collect({
+ "host": all_hosts_name,
+ "component": "namenode",
+ "metric": "hadoop.namenode.hastate.total.count",
+ "value": total_count
+ })
+
self.collect({
"host": all_hosts_name,
"component": "namenode",
@@ -81,6 +91,9 @@
"value": standby_count
})
+ if len(failed_host_list) > 0:
+ all_hosts_name = string.join(failed_host_list,",")
+
self.collect({
"host": all_hosts_name,
"component": "namenode",
@@ -88,6 +101,87 @@
"value": failed_count
})
+class HadoopHBaseHAChecker(MetricCollector):
+ def run(self):
+ hosts = []
+
+ for input in self.config["input"]:
+ if not input.has_key("host"):
+ input["host"] = socket.getfqdn()
+ if input.has_key("component") and input["component"] == "hbasemaster":
+ hosts.append(input)
+
+ if not bool(hosts):
+ logging.warn("non hosts are configured as 'hbasemaster' in 'input' config, exit")
+ return
+
+ logging.info("Checking HBase HA: " + str(hosts))
+
+ active_count = 0
+ standby_count = 0
+ failed_count = 0
+
+ failed_host_list = []
+ host_name_list = []
+
+ for host in hosts:
+ try:
+ if host.has_key("source_host"):
+ host["host"] = host["source_host"]
+ host_name_list.append(host["host"])
+ bean = JmxReader(host["host"], host["port"], host["https"]) \
+ .read_query("/jmx?qry=Hadoop:service=HBase,name=Master,sub=Server&anonymous=true") \
+ .get_jmx_bean_by_name("Hadoop:service=HBase,name=Master,sub=Server")
+ if not bean:
+ logging.error("JMX Bean[Hadoop:service=HBase,name=Master,sub=Server] is null from " + host["host"])
+ if bean.has_key("tag.isActiveMaster"):
+ logging.debug(str(host) + " is " + bean["tag.isActiveMaster"])
+ if bean["tag.isActiveMaster"] == "true":
+ active_count += 1
+ else:
+ standby_count += 1
+ else:
+ logging.info("'tag.isActiveMaster' not found from jmx of " + host["host"] + ":" + host["port"])
+ except Exception as e:
+ logging.exception("failed to read jmx from " + host["host"] + ":" + host["port"])
+ failed_count += 1
+ failed_host_list.append(host["host"])
+
+ total_count = len(hosts)
+ all_hosts_name = string.join(host_name_list,",")
+
+ self.collect({
+ "host": all_hosts_name,
+ "component": "hbasemaster",
+ "metric": "hadoop.hbasemaster.hastate.total.count",
+ "value": total_count
+ })
+
+ self.collect({
+ "host": all_hosts_name,
+ "component": "hbasemaster",
+ "metric": "hadoop.hbasemaster.hastate.active.count",
+ "value": active_count
+ })
+
+ self.collect({
+ "host": all_hosts_name,
+ "component": "hbasemaster",
+ "metric": "hadoop.hbasemaster.hastate.standby.count",
+ "value": standby_count
+ })
+
+ if len(failed_host_list) > 0:
+ all_hosts_name = string.join(failed_host_list,",")
+
+ self.collect({
+ "host": all_hosts_name,
+ "component": "hbasemaster",
+ "metric": "hadoop.hbasemaster.hastate.failed.count",
+ "value": failed_count
+ })
+
+
class HadoopRMHAChecker(MetricCollector):
def run(self):
hosts = []
@@ -103,20 +197,13 @@
return
logging.info("Checking resource manager HA: " + str(hosts))
- total_count = len(hosts)
- all_hosts_name = string.join(all_hosts,",")
-
- self.collect({
- "host": all_hosts_name,
- "component": "resourcemanager",
- "metric": "hadoop.resourcemanager.hastate.total.count",
- "value": total_count
- })
active_count = 0
standby_count = 0
failed_count = 0
+ failed_host_list = []
+
for host in hosts:
try:
cluster_info = YarnWSReader(host["host"], host["port"], host["https"]).read_cluster_info()
@@ -130,6 +217,17 @@
except Exception as e:
logging.error("Failed to read yarn ws from " + str(host))
failed_count += 1
+ failed_host_list.append(host["host"])
+
+ total_count = len(hosts)
+ all_hosts_name = string.join(all_hosts,",")
+
+ self.collect({
+ "host": all_hosts_name,
+ "component": "resourcemanager",
+ "metric": "hadoop.resourcemanager.hastate.total.count",
+ "value": total_count
+ })
self.collect({
"host": all_hosts_name,
@@ -145,6 +243,9 @@
"value": standby_count
})
+ if len(failed_host_list) > 0:
+ all_hosts_name = string.join(failed_host_list,",")
+
self.collect({
"host": all_hosts_name,
"component": "resourcemanager",
@@ -153,4 +254,4 @@
})
if __name__ == '__main__':
- Runner.run(HadoopNNHAChecker(), HadoopRMHAChecker())
\ No newline at end of file
+ Runner.run(HadoopNNHAChecker(), HadoopHBaseHAChecker(), HadoopRMHAChecker())
\ No newline at end of file
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
index a6ddf7d..786072b 100755
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
@@ -29,7 +29,76 @@
"metric_name_filter": [
"hadoop.memory.heapmemoryusage.used",
"hadoop.memory.nonheapmemoryusage.used",
+ "hadoop.bufferpool.direct.memoryused",
+
+ "hadoop.hbase.master.server.averageload",
+ "hadoop.hbase.master.server.numdeadregionservers",
+ "hadoop.hbase.master.assignmentmanger.ritcount",
+ "hadoop.hbase.master.assignmentmanger.ritcountoverthreshold",
+ "hadoop.hbase.master.assignmentmanger.assign_num_ops",
+ "hadoop.hbase.master.assignmentmanger.assign_min",
+ "hadoop.hbase.master.assignmentmanger.assign_max",
+ "hadoop.hbase.master.assignmentmanger.assign_75th_percentile",
+ "hadoop.hbase.master.assignmentmanger.assign_95th_percentile",
+ "hadoop.hbase.master.assignmentmanger.assign_99th_percentile",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_num_ops",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_min",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_max",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_75th_percentile",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_95th_percentile",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_99th_percentile",
+ "hadoop.hbase.master.balancer.balancercluster_num_ops",
+ "hadoop.hbase.master.balancer.balancercluster_min",
+ "hadoop.hbase.master.balancer.balancercluster_max",
+ "hadoop.hbase.master.balancer.balancercluster_75th_percentile",
+ "hadoop.hbase.master.balancer.balancercluster_95th_percentile",
+ "hadoop.hbase.master.balancer.balancercluster_99th_percentile",
+ "hadoop.hbase.master.filesystem.hlogsplittime_min",
+ "hadoop.hbase.master.filesystem.hlogsplittime_max",
+ "hadoop.hbase.master.filesystem.hlogsplittime_75th_percentile",
+ "hadoop.hbase.master.filesystem.hlogsplittime_95th_percentile",
+ "hadoop.hbase.master.filesystem.hlogsplittime_99th_percentile",
+ "hadoop.hbase.master.filesystem.hlogsplitsize_min",
+ "hadoop.hbase.master.filesystem.hlogsplitsize_max",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_min",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_max",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_75th_percentile",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_95th_percentile",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_99th_percentile",
+ "hadoop.hbase.master.filesystem.metahlogsplitsize_min",
+ "hadoop.hbase.master.filesystem.metahlogsplitsize_max",
+
+ "hadoop.hbase.jvm.gccount",
+ "hadoop.hbase.jvm.gctimemillis",
+ "hadoop.hbase.ipc.ipc.queuesize",
+ "hadoop.hbase.ipc.ipc.numcallsingeneralqueue",
+ "hadoop.hbase.ipc.ipc.numactivehandler",
+ "hadoop.hbase.ipc.ipc.queuecalltime_99th_percentile",
+ "hadoop.hbase.ipc.ipc.processcalltime_99th_percentile",
+ "hadoop.hbase.ipc.ipc.queuecalltime_num_ops",
+ "hadoop.hbase.ipc.ipc.processcalltime_num_ops",
+ "hadoop.hbase.regionserver.server.regioncount",
+ "hadoop.hbase.regionserver.server.storecount",
+ "hadoop.hbase.regionserver.server.memstoresize",
+ "hadoop.hbase.regionserver.server.storefilesize",
+ "hadoop.hbase.regionserver.server.totalrequestcount",
+ "hadoop.hbase.regionserver.server.readrequestcount",
+ "hadoop.hbase.regionserver.server.writerequestcount",
+ "hadoop.hbase.regionserver.server.splitqueuelength",
+ "hadoop.hbase.regionserver.server.compactionqueuelength",
+ "hadoop.hbase.regionserver.server.flushqueuelength",
+ "hadoop.hbase.regionserver.server.blockcachesize",
+ "hadoop.hbase.regionserver.server.blockcachehitcount",
+ "hadoop.hbase.regionserver.server.blockcachecounthitpercent",
+
+ "hadoop.memory.heapmemoryusage.used",
+ "hadoop.memory.nonheapmemoryusage.used",
"hadoop.namenode.fsnamesystemstate.capacitytotal",
+ "hadoop.namenode.fsnamesystemstate.capacityusage",
+ "hadoop.namenode.fsnamesystemstate.topuseropcounts",
+ "hadoop.namenode.fsnamesystemstate.fsstate",
+ "hadoop.namenode.fsnamesystemstate.numlivedatanodes",
+ "hadoop.namenode.fsnamesystemstate.numrevisedlivedatanodes",
"hadoop.namenode.dfs.capacityused",
"hadoop.namenode.dfs.capacityremaining",
"hadoop.namenode.dfs.blockstotal",
@@ -38,6 +107,7 @@
"hadoop.namenode.dfs.missingblocks",
"hadoop.namenode.dfs.corruptblocks",
"hadoop.namenode.dfs.lastcheckpointtime",
+ "hadoop.namenode.dfs.checkpointtimelag",
"hadoop.namenode.dfs.transactionssincelastcheckpoint",
"hadoop.namenode.dfs.lastwrittentransactionid",
"hadoop.namenode.dfs.snapshottabledirectories",
@@ -46,6 +116,9 @@
"hadoop.namenode.rpc.rpcprocessingtimeavgtime",
"hadoop.namenode.rpc.numopenconnections",
"hadoop.namenode.rpc.callqueuelength",
+ "hadoop.namenode.rpc.hadoop.namenode.rpc.rpcqueuetimeavgtime",
+ "hadoop.namenode.rpc.hadoop.namenode.rpc.rpcprocessingtimeavgtime",
+ "hadoop.namenode.namenodeinfo.corruptfiles",
"hadoop.datanode.fsdatasetstate.capacity",
"hadoop.datanode.fsdatasetstate.dfsused",
@@ -53,16 +126,32 @@
"hadoop.datanode.rpc.rpcqueuetimeavgtime",
"hadoop.datanode.rpc.rpcprocessingtimeavgtime",
"hadoop.datanode.rpc.numopenconnections",
- "hadoop.datanode.rpc.callqueuelength"
+ "hadoop.datanode.rpc.callqueuelength",
+
+ "hadoop.namenode.hastate.total.count",
+ "hadoop.namenode.hastate.active.count",
+ "hadoop.namenode.hastate.standby.count",
+ "hadoop.namenode.hastate.failed.count",
+
+ "hadoop.resourcemanager.yarn.numunhealthynms",
+ "hadoop.resourcemanager.yarn.numlostnms",
+ "hadoop.resourcemanager.yarn.numrebootednms",
+ "hadoop.resourcemanager.yarn.numdecommissionednms",
+ "hadoop.resourcemanager.yarn.numactivenms",
+
+ "hadoop.resourcemanager.hastate.total.count",
+ "hadoop.resourcemanager.hastate.active.count",
+ "hadoop.resourcemanager.hastate.standby.count",
+ "hadoop.resourcemanager.hastate.failed.count"
]
},
"output": {
"kafka": {
"debug": false,
"default_topic": "hadoop_jmx_metric_sandbox",
- "component_topic_mapping": {
- "namenode": "nn_jmx_metric_sandbox",
- "resourcemanager": "rm_jmx_metric_sandbox"
+ "metric_topic_mapping": {
+ "hadoop.namenode.namenodeinfo.corruptfiles": "hadoop_jmx_resource_sandbox",
+ "hadoop.namenode.fsnamesystemstate.topuseropcounts" : "hadoop_jmx_resource_sandbox"
},
"broker_list": [
"sandbox.hortonworks.com:6667"
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json
deleted file mode 100755
index 23c89b3..0000000
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json
+++ /dev/null
@@ -1,62 +0,0 @@
-{
- "env": {
- "site": "sandbox",
- "metric_prefix": "hadoop.",
- "log_file": "/tmp/hadoop-jmx-collector.log"
- },
- "input": [
- {
- "component": "namenode",
- "host": "sandbox.hortonworks.com",
- "port": "50070",
- "https": true
- }
- ],
- "filter": {
- "bean_group_filter": ["hadoop","java.lang"],
- "metric_name_filter": [
- "hadoop.memory.heapmemoryusage.used",
- "hadoop.memory.nonheapmemoryusage.used",
- "hadoop.namenode.fsnamesystemstate.capacitytotal",
- "hadoop.namenode.fsnamesystemstate.topuseropcounts",
- "hadoop.namenode.namenodeinfo.corruptfiles",
- "hadoop.namenode.dfs.capacityused",
- "hadoop.namenode.dfs.capacityremaining",
- "hadoop.namenode.dfs.blockstotal",
- "hadoop.namenode.dfs.filestotal",
- "hadoop.namenode.dfs.underreplicatedblocks",
- "hadoop.namenode.dfs.missingblocks",
- "hadoop.namenode.dfs.corruptblocks",
- "hadoop.namenode.dfs.lastcheckpointtime",
- "hadoop.namenode.dfs.transactionssincelastcheckpoint",
- "hadoop.namenode.dfs.lastwrittentransactionid",
- "hadoop.namenode.dfs.snapshottabledirectories",
- "hadoop.namenode.dfs.snapshots",
- "hadoop.namenode.rpc.rpcqueuetimeavgtime",
- "hadoop.namenode.rpc.rpcprocessingtimeavgtime",
- "hadoop.namenode.rpc.numopenconnections",
- "hadoop.namenode.rpc.callqueuelength",
-
- "hadoop.datanode.fsdatasetstate.capacity",
- "hadoop.datanode.fsdatasetstate.dfsused",
- "hadoop.datanode.datanodeinfo.xceivercount",
- "hadoop.datanode.rpc.rpcqueuetimeavgtime",
- "hadoop.datanode.rpc.rpcprocessingtimeavgtime",
- "hadoop.datanode.rpc.numopenconnections",
- "hadoop.datanode.rpc.callqueuelength"
- ]
- },
- "output": {
- "kafka": {
- "debug": false,
- "default_topic": "hadoop_jmx_metric_sandbox",
- "metric_topic_mapping": {
- "hadoop.namenode.namenodeinfo.corruptfiles": "hadoop_jmx_resource_sandbox",
- "hadoop.namenode.fsnamesystemstate.topuseropcounts" : "hadoop_jmx_resource_sandbox"
- },
- "broker_list": [
- "sandbox.hortonworks.com:6667"
- ]
- }
- }
-}
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index 60c6367..8b64177 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -17,7 +17,7 @@
#
from metric_collector import JmxMetricCollector,JmxMetricListener,Runner,MetricNameConverter
-import json, logging, fnmatch, sys
+import json, logging, fnmatch, sys, time
class NNSafeModeMetric(JmxMetricListener):
def on_metric(self, metric):
@@ -28,15 +28,13 @@
metric["value"] = 0
self.collector.collect(metric)
-class NNHAMetric(JmxMetricListener):
- PREFIX = "hadoop.namenode.fsnamesystem"
+class NNFileSystemMetric(JmxMetricListener):
+ PREFIX = "hadoop.namenode.dfs"
def on_bean(self, component, bean):
if bean["name"] == "Hadoop:service=NameNode,name=FSNamesystem":
- if bean[u"tag.HAState"] == "active":
- self.collector.on_bean_kv(self.PREFIX, component, "hastate", 0)
- else:
- self.collector.on_bean_kv(self.PREFIX, component, "hastate", 1)
+ checkpointtimelag = int(round(time.time() * 1000)) - bean['LastCheckpointTime']
+ self.collector.on_bean_kv(self.PREFIX, component, "checkpointtimelag", checkpointtimelag)
class corruptfilesMetric(JmxMetricListener):
def on_metric(self, metric):
@@ -48,7 +46,6 @@
if metric["metric"] == "hadoop.namenode.fsnamesystemstate.topuseropcounts":
self.collector.collect(metric, "string", MetricNameConverter())
-
class MemoryUsageMetric(JmxMetricListener):
PREFIX = "hadoop.namenode.jvm"
@@ -72,6 +69,9 @@
capacityusage = round(float(bean['CapacityUsed']) / float(bean['CapacityTotal']) * 100, 2)
self.collector.on_bean_kv(self.PREFIX, component, "capacityusage", capacityusage)
+ numrevisedlivedatanodes = bean['NumLiveDataNodes'] + bean['NumDecomDeadDataNodes']
+ self.collector.on_bean_kv(self.PREFIX, component, "numrevisedlivedatanodes", numrevisedlivedatanodes)
+
class JournalTransactionInfoMetric(JmxMetricListener):
PREFIX = "hadoop.namenode.journaltransaction"
@@ -107,12 +107,11 @@
collector = JmxMetricCollector()
collector.register(
NNSafeModeMetric(),
- NNHAMetric(),
+ NNFileSystemMetric(),
MemoryUsageMetric(),
NNCapacityUsageMetric(),
JournalTransactionInfoMetric(),
DatanodeFSDatasetState(),
- HBaseRegionServerMetric(),
corruptfilesMetric(),
TopUserOpCountsMetric()
)
diff --git a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
deleted file mode 100644
index c37a9ae..0000000
--- a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
+++ /dev/null
@@ -1,96 +0,0 @@
-{
- "env": {
- "site": "sandbox",
- "metric_prefix": "hadoop.",
- "log_file": "/tmp/hadoop-jmx-collector.log"
- },
- "input": [
- {
- "component": "hbasemaster",
- "host": "sandbox.hortonworks.com",
- "port": "60010",
- "https": false
- },
- {
- "component": "regionserver",
- "host": "sandbox.hortonworks.com",
- "port": "60030",
- "https": false
- }
- ],
- "filter": {
- "bean_group_filter": ["hadoop","java.lang","java.nio"],
- "metric_name_filter": [
- "hadoop.memory.heapmemoryusage.used",
- "hadoop.memory.nonheapmemoryusage.used",
- "hadoop.bufferpool.direct.memoryused",
- "hadoop.hbase.master.server.averageload",
- "hadoop.hbase.master.assignmentmanger.ritcount",
- "hadoop.hbase.master.assignmentmanger.ritcountoverthreshold",
- "hadoop.hbase.master.assignmentmanger.assign_num_ops",
- "hadoop.hbase.master.assignmentmanger.assign_min",
- "hadoop.hbase.master.assignmentmanger.assign_max",
- "hadoop.hbase.master.assignmentmanger.assign_75th_percentile",
- "hadoop.hbase.master.assignmentmanger.assign_95th_percentile",
- "hadoop.hbase.master.assignmentmanger.assign_99th_percentile",
- "hadoop.hbase.master.assignmentmanger.bulkassign_num_ops",
- "hadoop.hbase.master.assignmentmanger.bulkassign_min",
- "hadoop.hbase.master.assignmentmanger.bulkassign_max",
- "hadoop.hbase.master.assignmentmanger.bulkassign_75th_percentile",
- "hadoop.hbase.master.assignmentmanger.bulkassign_95th_percentile",
- "hadoop.hbase.master.assignmentmanger.bulkassign_99th_percentile",
- "hadoop.hbase.master.balancer.balancercluster_num_ops",
- "hadoop.hbase.master.balancer.balancercluster_min",
- "hadoop.hbase.master.balancer.balancercluster_max",
- "hadoop.hbase.master.balancer.balancercluster_75th_percentile",
- "hadoop.hbase.master.balancer.balancercluster_95th_percentile",
- "hadoop.hbase.master.balancer.balancercluster_99th_percentile",
- "hadoop.hbase.master.filesystem.hlogsplittime_min",
- "hadoop.hbase.master.filesystem.hlogsplittime_max",
- "hadoop.hbase.master.filesystem.hlogsplittime_75th_percentile",
- "hadoop.hbase.master.filesystem.hlogsplittime_95th_percentile",
- "hadoop.hbase.master.filesystem.hlogsplittime_99th_percentile",
- "hadoop.hbase.master.filesystem.hlogsplitsize_min",
- "hadoop.hbase.master.filesystem.hlogsplitsize_max",
- "hadoop.hbase.master.filesystem.metahlogsplittime_min",
- "hadoop.hbase.master.filesystem.metahlogsplittime_max",
- "hadoop.hbase.master.filesystem.metahlogsplittime_75th_percentile",
- "hadoop.hbase.master.filesystem.metahlogsplittime_95th_percentile",
- "hadoop.hbase.master.filesystem.metahlogsplittime_99th_percentile",
- "hadoop.hbase.master.filesystem.metahlogsplitsize_min",
- "hadoop.hbase.master.filesystem.metahlogsplitsize_max",
-
- "hadoop.hbase.jvm.gccount",
- "hadoop.hbase.jvm.gctimemillis",
- "hadoop.hbase.ipc.ipc.queuesize",
- "hadoop.hbase.ipc.ipc.numcallsingeneralqueue",
- "hadoop.hbase.ipc.ipc.numactivehandler",
- "hadoop.hbase.ipc.ipc.queuecalltime_99th_percentile",
- "hadoop.hbase.ipc.ipc.processcalltime_99th_percentile",
- "hadoop.hbase.ipc.ipc.queuecalltime_num_ops",
- "hadoop.hbase.ipc.ipc.processcalltime_num_ops",
- "hadoop.hbase.regionserver.server.regioncount",
- "hadoop.hbase.regionserver.server.storecount",
- "hadoop.hbase.regionserver.server.memstoresize",
- "hadoop.hbase.regionserver.server.storefilesize",
- "hadoop.hbase.regionserver.server.totalrequestcount",
- "hadoop.hbase.regionserver.server.readrequestcount",
- "hadoop.hbase.regionserver.server.writerequestcount",
- "hadoop.hbase.regionserver.server.splitqueuelength",
- "hadoop.hbase.regionserver.server.compactionqueuelength",
- "hadoop.hbase.regionserver.server.flushqueuelength",
- "hadoop.hbase.regionserver.server.blockcachesize",
- "hadoop.hbase.regionserver.server.blockcachehitcount",
- "hadoop.hbase.regionserver.server.blockcounthitpercent"
- ]
- },
- "output": {
- "kafka": {
- "debug": false,
- "default_topic": "hadoop_jmx_metric_sandbox",
- "broker_list": [
- "sandbox.hortonworks.com:6667"
- ]
- }
- }
-}
\ No newline at end of file
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index c83fe6b..efd998a 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -106,12 +106,12 @@
try:
if https:
logging.info("Reading https://" + str(url) + path)
- c = httplib.HTTPSConnection(url, timeout=30)
+ c = httplib.HTTPSConnection(url, timeout=60)
c.request("GET", path)
response = c.getresponse()
else:
logging.info("Reading http://" + str(url) + path)
- response = urllib2.urlopen("http://" + str(url) + path, timeout=30)
+ response = urllib2.urlopen("http://" + str(url) + path, timeout=60)
logging.debug("Got response")
result = response.read()
break
@@ -158,6 +158,13 @@
raise Exception("Response from " + url + " is None")
return self
+ def read_query(self, qry):
+ self.jmx_raw = Helper.http_get(self.host, self.port, self.https, qry)
+ if self.jmx_raw is None:
+ raise Exception("Response from " + url + " is None")
+ self.set_raw(self.jmx_raw)
+ return self
+
def set_raw(self, text):
self.jmx_json = json.loads(text)
self.jmx_beans = self.jmx_json[u'beans']
@@ -496,7 +503,11 @@
self.on_bean_kv(metric_prefix_name, source, key, value)
for listener in self.listeners:
- listener.on_bean(source, bean.copy())
+ try:
+ listener.on_bean(source, bean.copy())
+ except Exception as e:
+ logging.error("Failed to parse bean: " + bean["name"])
+ logging.exception(e)
def on_bean_kv(self, prefix, source, key, value):
# Skip Tags
@@ -574,7 +585,9 @@
return True
else:
for name_filter in self.metric_name_filter:
- if fnmatch.fnmatch(metric["metric"], name_filter):
+ # multiple threads bug exists in fnmatch
+ #if fnmatch.fnmatch(metric["metric"], name_filter):
+ if re.match(name_filter, metric['metric']):
return True
return False
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java
index 541d352..f01c93e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java
@@ -144,14 +144,20 @@
LOG.warn("application MR_HISTORY_JOB_APP does not run on any sites!");
return;
}
+
+ int reportHour = currentHour / dailySentPeriod * dailySentPeriod;
+ calendar.set(Calendar.HOUR_OF_DAY, reportHour);
+ long endTime = calendar.getTimeInMillis() / DateTimeUtil.ONEHOUR * DateTimeUtil.ONEHOUR;
+ long startTime = endTime - DateTimeUtil.ONEHOUR * dailySentPeriod;
+
for (String site : sites) {
- int reportHour = currentHour / dailySentPeriod * dailySentPeriod;
- calendar.set(Calendar.HOUR_OF_DAY, reportHour);
- long endTime = calendar.getTimeInMillis() / DateTimeUtil.ONEHOUR * DateTimeUtil.ONEHOUR;
- long startTime = endTime - DateTimeUtil.ONEHOUR * dailySentPeriod;
- String subject = buildAlertSubject(site, startTime, endTime);
- Map<String, Object> alertData = buildAlertData(site, startTime, endTime);
- sendByEmailWithSubject(alertData, subject);
+ try {
+ String subject = buildAlertSubject(site, startTime, endTime);
+ Map<String, Object> alertData = buildAlertData(site, startTime, endTime);
+ sendByEmailWithSubject(alertData, subject);
+ } catch (Exception e) {
+ LOG.error("Job report failed for {} due to {}", site, e.getMessage(), e);
+ }
}
} catch (Exception ex) {
LOG.error("Fail to get job summery info due to {}", ex.getMessage(), ex);
@@ -215,12 +221,18 @@
return data;
}
Long totalJobs = jobSummery.values().stream().reduce((a, b) -> a + b).get();
- String finishedJobQuery = String.format(FINISHED_JOB_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, endTime);
- String failedJobQuery = String.format(FAILED_JOBS_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, endTime);
- String succeededJobQuery = String.format(SUCCEEDED_JOB_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, jobOvertimeLimit * DateTimeUtil.ONEHOUR, endTime);
data.put(SUMMARY_INFO_KEY, processResult(jobSummery, totalJobs));
- data.put(FAILED_JOB_USERS_KEY, buildJobSummery(failedJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.FAILED.toString())));
- data.put(SUCCEEDED_JOB_USERS_KEY, buildJobSummery(succeededJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.SUCCEEDED.toString())));
+
+ if (jobSummery.containsKey(Constants.JobState.FAILED.toString())) {
+ String failedJobQuery = String.format(FAILED_JOBS_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, endTime);
+ data.put(FAILED_JOB_USERS_KEY, buildJobSummery(failedJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.FAILED.toString())));
+ }
+ if (jobSummery.containsKey(Constants.JobState.SUCCEEDED.toString())) {
+ String succeededJobQuery = String.format(SUCCEEDED_JOB_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, jobOvertimeLimit * DateTimeUtil.ONEHOUR, endTime);
+ data.put(SUCCEEDED_JOB_USERS_KEY, buildJobSummery(succeededJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.SUCCEEDED.toString())));
+ }
+
+ String finishedJobQuery = String.format(FINISHED_JOB_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, endTime);
data.put(FINISHED_JOB_USERS_KEY, buildJobSummery(finishedJobQuery, startTime, endTime, totalJobs));
return data;