blob: 80c32772d0a0b9e13da84fa3505de4ca80e11f9f [file] [log] [blame]
# !/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import urllib2
import json
import os
CurRoleTypeInfo= {}
HadoopConfigs= []
LogSwitch= False
JsonPath = None
def CfgCallback(conf):
global JsonPath
CurRoleType = None
Role = None
Port = None
Host = None
OutputFlag = LogSwitch
for EachRoleConfig in conf.children:
collectd.info('hadoop pluginx : %s' % EachRoleConfig.key)
if EachRoleConfig.key == 'HDFSNamenodeConfigHost':
Host = EachRoleConfig.values[0]
CurRoleType = "ROLE_TYPE_NAMENODE"
elif EachRoleConfig.key == 'HDFSDatanodeConfigHost':
Host = EachRoleConfig.values[0]
CurRoleType = "ROLE_TYPE_DATANODE"
elif EachRoleConfig.key == 'YarnNodeManagerHost':
Host = EachRoleConfig.values[0]
CurRoleType = "ROLE_TYPE_NODEMANAGER"
elif EachRoleConfig.key == 'YarnResourceManagerHost':
Host = EachRoleConfig.values[0]
CurRoleType ="ROLE_TYPE_RESOURCEMANAGER"
elif EachRoleConfig.key == 'HbaseMasterHost':
Host = EachRoleConfig.values[0]
CurRoleType = "ROLE_TYPE_HBASE_MASTER"
elif EachRoleConfig.key == 'HbaseRegionserverHost':
Host = EachRoleConfig.values[0]
CurRoleType = "ROLE_TYPE_HBASE_REGIONSERVER"
elif EachRoleConfig.key == 'HDFSJournalEachRoleConfigHost':
Host = EachRoleConfig.values[0]
CurRoleType = "ROLE_TYPE_HDFS_JOURNALNODE"
elif EachRoleConfig.key == 'Port':
Port = EachRoleConfig.values[0]
elif EachRoleConfig.key == 'Instance':
Role = EachRoleConfig.values[0]
elif EachRoleConfig.key == 'Verbose':
OutputFlag = bool(EachRoleConfig.values[0])
elif EachRoleConfig.key == 'JsonPath':
collectd.info('hadoop plugin cfg: %s' % EachRoleConfig.values[0])
JsonPath = EachRoleConfig.values[0]
else:
collectd.warning('hadoop plugin: Unsupported key: %s.' % EachRoleConfig.key)
#collectd.info('hadoop plugin cfg: %s.' % JsonPath)
if not Host or not Role or not CurRoleType or not Port:
collectd.error('hadoop plugin error: *Host, Port, and CurRoleType should not be empty.')
else:
CurrentConfigMap = {
'RoleInstance': Role,
'port': Port,
'host': Host,
'RoleType': CurRoleType ,
'OutputFlag': OutputFlag
}
HadoopConfigs.append(CurrentConfigMap)
def GetdataCallback():
GetJsonConfig()
for EachConfig in HadoopConfigs:
Host = EachConfig['host']
Port = EachConfig['port']
RoleInstance = EachConfig['RoleInstance']
RoleType = EachConfig['RoleType']
OutputFlag = EachConfig['OutputFlag']
if isinstance(Port,int) == False:
MyLog("Host Port is not number error",True)
JmxUrl = "http://" + Host + ":" + Port.__str__() + "/jmx"
try:
Contents = json.load(urllib2.urlopen(JmxUrl, timeout=5))
except urllib2.URLError as e:
if MyDebug == 1:
print(JmxUrl,e)
else:
collectd.error('hadoop plugin: can not connect to %s - %r' % (JmxUrl, e))
if MyDebug == 1:
print(RoleType)
else:
collectd.info('hadoop pluginx [testheju]: %s' % RoleType)
for RoleInfo in Contents["beans"]:
for RoleKey, RoleValue in CurRoleTypeInfo[RoleType].iteritems():
if RoleInfo['name'].startswith(RoleValue):
for k, v in RoleInfo.iteritems():
# Due to the limite of dispatch interface in collectd, the appropriate type is int and float
if isinstance(v, int) or isinstance(v, float):
# gauge is type defined in collectd
Submit2Collectd('gauge', '.'.join((RoleKey, k)), v, RoleInstance, RoleType, OutputFlag)
def Submit2Collectd(type, name, value, instance, instance_type, OutputFlag):
if value is None:
#value = ''
collectd.warning('hadoop pluginx : value is None of the key %s' % name)
else:
plugin_instance = '.'.join((instance, instance_type))
MyLog('%s [%s]: %s=%s' % (plugin_instance, type, name, value), OutputFlag)
if MyDebug == 0:
SendValue = collectd.Values(plugin='hadoop')
SendValue.type = type
SendValue.type_instance = name
SendValue.values = [value]
SendValue.plugin_instance = plugin_instance
SendValue.meta = {'0': True}
SendValue.dispatch()
def MyLog(msg, OutputFlag):
if OutputFlag:
if MyDebug == 1:
print(msg)
pass
else:
collectd.info('hadoop pluginx output: %s' % msg)
def GetJsonConfig():
global JsonPath
#JsonPath = "/opt/collectd/lib/collectd/plugins/hadooproleconfig.json"
MyLog("pwd:%s" % os.getcwd(),True)
if JsonPath == None or JsonPath == "":
with open("./hadooproleconfig.json",'r') as f:
data = json.load(f)
for k,v in data.iteritems():
if k.startswith("ROLE_TYPE"):
CurRoleTypeInfo[k]=v
else:
with open(JsonPath,'r') as f:
data = json.load(f)
for k,v in data.iteritems():
if k.startswith("ROLE_TYPE"):
CurRoleTypeInfo[k]=v
if __name__ == "__main__":
MyDebug = 1
# You can config you Role configuration like example below
ConfigMap = {
'RoleInstance': "instance",
'port': 8042,
'host': "lujian",
'RoleType': "ROLE_TYPE_NODEMANAGER",
'OutputFlag': bool(True)
}
HadoopConfigs.append(ConfigMap)
#print HadoopConfigs
GetdataCallback()
else:
import collectd
MyDebug = 0
collectd.register_config(CfgCallback)
collectd.register_read(GetdataCallback)