blob: ebfe9730e84a1d414a3a0e590524715ca77d1716 [file] [log] [blame]
#!/usr/bin/env ambari-python-wrap
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Python imports
import imp
import re
import os
import sys
import socket
import traceback
from math import ceil, floor, log
from resource_management.core.logger import Logger
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
if "BASE_SERVICE_ADVISOR" in os.environ:
PARENT_FILE = os.environ["BASE_SERVICE_ADVISOR"]
try:
with open(PARENT_FILE, 'rb') as fp:
service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
except Exception as e:
traceback.print_exc()
print "Failed to load parent"
class AMBARI_METRICSServiceAdvisor(service_advisor.ServiceAdvisor):
def __init__(self, *args, **kwargs):
self.as_super = super(AMBARI_METRICSServiceAdvisor, self)
self.as_super.__init__(*args, **kwargs)
# Always call these methods
self.modifyMastersWithMultipleInstances()
self.modifyCardinalitiesDict()
self.modifyHeapSizeProperties()
self.modifyNotValuableComponents()
self.modifyComponentsNotPreferableOnServer()
self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
Modify the set of masters with multiple instances.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyCardinalitiesDict(self):
"""
Modify the dictionary of cardinalities.
Must be overriden in child class.
"""
min_val = 1
self.cardinalitiesDict.update(
{
'METRICS_COLLECTOR': {"min": min_val}
}
)
def modifyHeapSizeProperties(self):
"""
Modify the dictionary of heap size properties.
Must be overriden in child class.
"""
self.heap_size_properties = {"METRICS_COLLECTOR":
[{"config-name": "ams-hbase-env",
"property": "hbase_master_heapsize",
"default": "1024m"},
{"config-name": "ams-hbase-env",
"property": "hbase_regionserver_heapsize",
"default": "1024m"},
{"config-name": "ams-env",
"property": "metrics_collector_heapsize",
"default": "512m"}]}
def modifyNotValuableComponents(self):
"""
Modify the set of components whose host assignment is based on other services.
Must be overriden in child class.
"""
self.notValuableComponents |= set(['METRICS_MONITOR'])
def modifyComponentsNotPreferableOnServer(self):
"""
Modify the set of components that are not preferable on the server.
Must be overriden in child class.
"""
self.notPreferableOnServerComponents |= set(['METRICS_COLLECTOR'])
def modifyComponentLayoutSchemes(self):
"""
Modify layout scheme dictionaries for components.
The scheme dictionary basically maps the number of hosts to
host index where component should exist.
Must be overriden in child class.
"""
self.componentLayoutSchemes.update({'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5}})
def getServiceComponentLayoutValidations(self, services, hosts):
"""
Get a list of errors.
Must be overriden in child class.
"""
return self.getServiceComponentCardinalityValidations(services, hosts, "AMBARI_METRICS")
def getAmsMemoryRecommendation(self, services, hosts):
# MB per sink in hbase heapsize
HEAP_PER_MASTER_COMPONENT = 50
HEAP_PER_SLAVE_COMPONENT = 10
schMemoryMap = {
"HDFS": {
"NAMENODE": HEAP_PER_MASTER_COMPONENT,
"SECONDARY_NAMENODE": HEAP_PER_MASTER_COMPONENT,
"DATANODE": HEAP_PER_SLAVE_COMPONENT
},
"YARN": {
"RESOURCEMANAGER": HEAP_PER_MASTER_COMPONENT,
"NODEMANAGER": HEAP_PER_SLAVE_COMPONENT,
"HISTORYSERVER" : HEAP_PER_MASTER_COMPONENT,
"APP_TIMELINE_SERVER": HEAP_PER_MASTER_COMPONENT
},
"HBASE": {
"HBASE_MASTER": HEAP_PER_MASTER_COMPONENT,
"HBASE_REGIONSERVER": HEAP_PER_SLAVE_COMPONENT
},
"HIVE": {
"HIVE_METASTORE": HEAP_PER_MASTER_COMPONENT,
"HIVE_SERVER": HEAP_PER_MASTER_COMPONENT
},
"KAFKA": {
"KAFKA_BROKER": HEAP_PER_MASTER_COMPONENT
},
"FLUME": {
"FLUME_HANDLER": HEAP_PER_SLAVE_COMPONENT
},
"STORM": {
"NIMBUS": HEAP_PER_MASTER_COMPONENT,
},
"AMBARI_METRICS": {
"METRICS_COLLECTOR": HEAP_PER_MASTER_COMPONENT,
"METRICS_MONITOR": HEAP_PER_SLAVE_COMPONENT
},
"ACCUMULO": {
"ACCUMULO_MASTER": HEAP_PER_MASTER_COMPONENT,
"ACCUMULO_TSERVER": HEAP_PER_SLAVE_COMPONENT
},
"LOGSEARCH": {
"LOGSEARCH_LOGFEEDER" : HEAP_PER_SLAVE_COMPONENT
}
}
total_sinks_count = 0
# minimum heap size
hbase_heapsize = 500
for serviceName, componentsDict in schMemoryMap.items():
for componentName, multiplier in componentsDict.items():
schCount = len(
self.getHostsWithComponent(serviceName, componentName, services,
hosts))
hbase_heapsize += int((schCount * multiplier))
total_sinks_count += schCount
collector_heapsize = int(hbase_heapsize/3 if hbase_heapsize > 2048 else 512)
hbase_heapsize = min(hbase_heapsize, 32768)
return self.round_to_n(collector_heapsize), self.round_to_n(hbase_heapsize), total_sinks_count
def round_to_n(self, mem_size, n=128):
return int(round(float(mem_size) / float(n))) * int(n)
def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
"""
Entry point.
Must be overriden in child class.
"""
#Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
# (self.__class__.__name__, inspect.stack()[0][3]))
recommender = AMBARI_METRICSRecommender()
recommender.recommendAmsConfigurationsFromHDP206(configurations, clusterData, services, hosts)
def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
"""
Entry point.
Validate configurations for the service. Return a list of errors.
The code for this function should be the same for each Service Advisor.
"""
#Logger.info("Class: %s, Method: %s. Validating Configurations." %
# (self.__class__.__name__, inspect.stack()[0][3]))
validator = self.getAMBARI_METRICSValidator()
# Calls the methods of the validator using arguments,
# method(siteProperties, siteRecommendations, configurations, services, hosts)
return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
def getAMBARI_METRICSValidator(self):
return AMBARI_METRICSValidator()
class AMBARI_METRICSRecommender(service_advisor.ServiceAdvisor):
"""
AMS Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
"""
def __init__(self, *args, **kwargs):
self.as_super = super(AMBARI_METRICSRecommender, self)
self.as_super.__init__(*args, **kwargs)
def getPreferredMountPoints(self, hostInfo):
# '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points
undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts",
"/etc/hostname", "/tmp"]
undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"]
mountPoints = []
if hostInfo and "disk_info" in hostInfo:
mountPointsDict = {}
for mountpoint in hostInfo["disk_info"]:
if not (mountpoint["mountpoint"] in undesirableMountPoints or
mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or
mountpoint["type"] in undesirableFsTypes or
mountpoint["available"] == str(0)):
mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"])
if mountPointsDict:
mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True)
mountPoints.append("/")
return mountPoints
def recommendAmsConfigurationsFromHDP206(self, configurations, clusterData, services, hosts):
putAmsEnvProperty = self.putProperty(configurations, "ams-env", services)
putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services)
putAmsSiteProperty = self.putProperty(configurations, "ams-site", services)
putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env", services)
putGrafanaProperty = self.putProperty(configurations, "ams-grafana-env", services)
putGrafanaPropertyAttribute = self.putPropertyAttribute(configurations, "ams-grafana-env")
amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
serviceAdvisor = AMBARI_METRICSServiceAdvisor()
# TODO set "timeline.metrics.service.webapp.address" to 0.0.0.0:port in upgrade catalog
timeline_metrics_service_webapp_address = '0.0.0.0'
putAmsSiteProperty("timeline.metrics.service.webapp.address", str(timeline_metrics_service_webapp_address) + ":6188")
log_dir = "/var/log/ambari-metrics-collector"
if "ams-env" in services["configurations"]:
if "metrics_collector_log_dir" in services["configurations"]["ams-env"]["properties"]:
log_dir = services["configurations"]["ams-env"]["properties"]["metrics_collector_log_dir"]
putHbaseEnvProperty("hbase_log_dir", log_dir)
defaultFs = 'file:///'
if "core-site" in services["configurations"] and \
"fs.defaultFS" in services["configurations"]["core-site"]["properties"]:
defaultFs = services["configurations"]["core-site"]["properties"]["fs.defaultFS"]
operatingMode = "embedded"
if "ams-site" in services["configurations"]:
if "timeline.metrics.service.operation.mode" in services["configurations"]["ams-site"]["properties"]:
operatingMode = services["configurations"]["ams-site"]["properties"]["timeline.metrics.service.operation.mode"]
if len(amsCollectorHosts) > 1 :
operatingMode = "distributed"
putAmsSiteProperty("timeline.metrics.service.operation.mode", operatingMode)
if operatingMode == "distributed":
putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'true')
putAmsHbaseSiteProperty("hbase.cluster.distributed", 'true')
putAmsHbaseSiteProperty("hbase.unsafe.stream.capability.enforce", 'true')
else:
putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'false')
putAmsHbaseSiteProperty("hbase.cluster.distributed", 'false')
rootDir = "file:///var/lib/ambari-metrics-collector/hbase"
tmpDir = "/var/lib/ambari-metrics-collector/hbase-tmp"
zk_port_default = []
if "ams-hbase-site" in services["configurations"]:
if "hbase.rootdir" in services["configurations"]["ams-hbase-site"]["properties"]:
rootDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.rootdir"]
if "hbase.tmp.dir" in services["configurations"]["ams-hbase-site"]["properties"]:
tmpDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.tmp.dir"]
if "hbase.zookeeper.property.clientPort" in services["configurations"]["ams-hbase-site"]["properties"]:
zk_port_default = services["configurations"]["ams-hbase-site"]["properties"]["hbase.zookeeper.property.clientPort"]
# Skip recommendation item if default value is present
if operatingMode == "distributed" and not "{{zookeeper_clientPort}}" in zk_port_default:
zkPort = self.getZKPort(services)
putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", zkPort)
elif operatingMode == "embedded" and not "{{zookeeper_clientPort}}" in zk_port_default:
putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", "61181")
mountpoints = ["/"]
for collectorHostName in amsCollectorHosts:
for host in hosts["items"]:
if host["Hosts"]["host_name"] == collectorHostName:
mountpoints = self.getPreferredMountPoints(host["Hosts"])
break
isLocalRootDir = rootDir.startswith("file://") or (defaultFs.startswith("file://") and rootDir.startswith("/"))
if isLocalRootDir:
rootDir = re.sub("^file:///|/", "", rootDir, count=1)
rootDir = "file://" + os.path.join(mountpoints[0], rootDir)
tmpDir = re.sub("^file:///|/", "", tmpDir, count=1)
if len(mountpoints) > 1 and isLocalRootDir:
tmpDir = os.path.join(mountpoints[1], tmpDir)
else:
tmpDir = os.path.join(mountpoints[0], tmpDir)
putAmsHbaseSiteProperty("hbase.tmp.dir", tmpDir)
if operatingMode == "distributed":
putAmsHbaseSiteProperty("hbase.rootdir", "/user/ams/hbase")
if operatingMode == "embedded":
if isLocalRootDir:
putAmsHbaseSiteProperty("hbase.rootdir", rootDir)
else:
putAmsHbaseSiteProperty("hbase.rootdir", "file:///var/lib/ambari-metrics-collector/hbase")
collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts)
putAmsEnvProperty("metrics_collector_heapsize", collector_heapsize)
putAmsSiteProperty("timeline.metrics.cache.size", max(100, int(log(total_sinks_count)) * 100))
putAmsSiteProperty("timeline.metrics.cache.commit.interval", min(10, max(12 - int(log(total_sinks_count)), 2)))
# blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25
putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3)
putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 134217728)
putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35)
putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3)
if len(amsCollectorHosts) > 1:
pass
else:
# blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3
if total_sinks_count >= 2000:
putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60)
putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728)
putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64)
putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456)
putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3)
putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25)
putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20)
putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000)
putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30)
putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000)
elif total_sinks_count >= 1000:
putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60)
putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728)
putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64)
putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456)
putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000)
putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000)
else:
putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000)
pass
metrics_api_handlers = min(50, max(20, int(total_sinks_count / 100)))
putAmsSiteProperty("timeline.metrics.service.handler.thread.count", metrics_api_handlers)
serviceAdvisor = AMBARI_METRICSServiceAdvisor()
# Distributed mode heap size
if operatingMode == "distributed":
hbase_heapsize = max(hbase_heapsize, 1024)
putHbaseEnvProperty("hbase_master_heapsize", "512")
putHbaseEnvProperty("hbase_master_xmn_size", "102") #20% of 512 heap size
putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_heapsize)
putHbaseEnvProperty("regionserver_xmn_size", serviceAdvisor.round_to_n(0.15 * hbase_heapsize,64))
else:
# Embedded mode heap size : master + regionserver
hbase_rs_heapsize = 512
putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_rs_heapsize)
putHbaseEnvProperty("hbase_master_heapsize", hbase_heapsize)
putHbaseEnvProperty("hbase_master_xmn_size", serviceAdvisor.round_to_n(0.15*(hbase_heapsize + hbase_rs_heapsize),64))
# If no local DN in distributed mode
if operatingMode == "distributed":
dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE")
# call by Kerberos wizard sends only the service being affected
# so it is possible for dn_hosts to be None but not amsCollectorHosts
if dn_hosts and len(dn_hosts) > 0:
if set(amsCollectorHosts).intersection(dn_hosts):
collector_cohosted_with_dn = "true"
else:
collector_cohosted_with_dn = "false"
putAmsHbaseSiteProperty("dfs.client.read.shortcircuit", collector_cohosted_with_dn)
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
ams_hbase_site = None
ams_hbase_env = None
# Overriden properties form the UI
if "ams-hbase-site" in services["configurations"]:
ams_hbase_site = services["configurations"]["ams-hbase-site"]["properties"]
if "ams-hbase-env" in services["configurations"]:
ams_hbase_env = services["configurations"]["ams-hbase-env"]["properties"]
# Recommendations
if not ams_hbase_site:
ams_hbase_site = configurations["ams-hbase-site"]["properties"]
if not ams_hbase_env:
ams_hbase_env = configurations["ams-hbase-env"]["properties"]
component_grafana_exists = False
for service in services['services']:
if 'components' in service:
for component in service['components']:
if 'StackServiceComponents' in component:
# If Grafana is installed the hostnames would indicate its location
if 'METRICS_GRAFANA' in component['StackServiceComponents']['component_name'] and\
len(component['StackServiceComponents']['hostnames']) != 0:
component_grafana_exists = True
break
pass
if not component_grafana_exists:
putGrafanaPropertyAttribute("metrics_grafana_password", "visible", "false")
pass
class AMBARI_METRICSValidator(service_advisor.ServiceAdvisor):
"""
AMS Validator checks the correctness of properties whenever the service is first added or the user attempts to
change configs via the UI.
"""
def __init__(self, *args, **kwargs):
self.as_super = super(AMBARI_METRICSValidator, self)
self.as_super.__init__(*args, **kwargs)
self.validators = [("ams-hbase-site", self.validateAmsHbaseSiteConfigurationsFromHDP206),
("ams-hbase-env", self.validateAmsHbaseEnvConfigurationsFromHDP206),
("ams-site", self.validateAmsSiteConfigurationsFromHDP206),
("ams-env", self.validateAmsEnvConfigurationsFromHDP206),
("ams-grafana-env", self.validateGrafanaEnvConfigurationsFromHDP206)]
def getPreferredMountPoints(self, hostInfo):
# '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points
undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts",
"/etc/hostname", "/tmp"]
undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"]
mountPoints = []
if hostInfo and "disk_info" in hostInfo:
mountPointsDict = {}
for mountpoint in hostInfo["disk_info"]:
if not (mountpoint["mountpoint"] in undesirableMountPoints or
mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or
mountpoint["type"] in undesirableFsTypes or
mountpoint["available"] == str(0)):
mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"])
if mountPointsDict:
mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True)
mountPoints.append("/")
return mountPoints
def validateAmsHbaseSiteConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
ams_site = self.getSiteProperties(configurations, "ams-site")
core_site = self.getSiteProperties(configurations, "core-site")
serviceAdvisor = AMBARI_METRICSServiceAdvisor()
collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts)
recommendedDiskSpace = 10485760
# TODO validate configuration for multiple AMBARI_METRICS collectors
if len(amsCollectorHosts) > 1:
pass
else:
if total_sinks_count > 2000:
recommendedDiskSpace = 104857600 # * 1k == 100 Gb
elif total_sinks_count > 500:
recommendedDiskSpace = 52428800 # * 1k == 50 Gb
elif total_sinks_count > 250:
recommendedDiskSpace = 20971520 # * 1k == 20 Gb
validationItems = []
rootdir_item = None
op_mode = ams_site.get("timeline.metrics.service.operation.mode")
default_fs = core_site.get("fs.defaultFS") if core_site else "file:///"
hbase_rootdir = properties.get("hbase.rootdir")
hbase_tmpdir = properties.get("hbase.tmp.dir")
distributed = properties.get("hbase.cluster.distributed")
is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/"))
if op_mode == "distributed" and is_local_root_dir:
rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.")
elif op_mode == "embedded":
if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"):
rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, "
"Example - file:// for localFS")
pass
distributed_item = None
if op_mode == "distributed" and not distributed.lower() == "true":
distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for "
"distributed mode")
if op_mode == "embedded" and distributed.lower() == "true":
distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode")
hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort")
zkPort = self.getZKPort(services)
hbase_zk_client_port_item = None
if distributed.lower() == "true" and op_mode == "distributed" and \
hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}":
hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort "
"should be the cluster zookeeper server port : {0}".format(zkPort))
if distributed.lower() == "false" and op_mode == "embedded" and \
hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}":
hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort "
"should be a different port than cluster zookeeper port."
"(default:61181)")
validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item },
{"config-name":'hbase.cluster.distributed', "item": distributed_item },
{"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }])
for collectorHostName in amsCollectorHosts:
for host in hosts["items"]:
if host["Hosts"]["host_name"] == collectorHostName:
if op_mode == 'embedded' or is_local_root_dir:
validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}])
validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}])
validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}])
dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE")
if is_local_root_dir:
mountPoints = []
for mountPoint in host["Hosts"]["disk_info"]:
mountPoints.append(mountPoint["mountpoint"])
hbase_rootdir_mountpoint = self.getMountPointForDir(hbase_rootdir, mountPoints)
hbase_tmpdir_mountpoint = self.getMountPointForDir(hbase_tmpdir, mountPoints)
preferred_mountpoints = self.getPreferredMountPoints(host['Hosts'])
# hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition
# if multiple preferred_mountpoints exist
if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \
len(preferred_mountpoints) > 1:
item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. "
"{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint))
validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}])
# if METRICS_COLLECTOR is co-hosted with DATANODE
# cross-check dfs.datanode.data.dir and hbase.rootdir
# they shouldn't share same disk partition IO
hdfs_site = self.getSiteProperties(configurations, "hdfs-site")
dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else []
if dn_hosts and collectorHostName in dn_hosts and ams_site and \
dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs):
for dfs_datadir in dfs_datadirs:
dfs_datadir_mountpoint = self.getMountPointForDir(dfs_datadir, mountPoints)
if dfs_datadir_mountpoint == hbase_rootdir_mountpoint:
item = self.getWarnItem("Consider not using {0} partition for storing metrics data. "
"{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint))
validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}])
break
# If no local DN in distributed mode
elif collectorHostName not in dn_hosts and distributed.lower() == "true":
item = self.getWarnItem("It's recommended to install Datanode component on {0} "
"to speed up IO operations between HDFS and Metrics "
"Collector in distributed mode ".format(collectorHostName))
validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}])
# Short circuit read should be enabled in distibuted mode
# if local DN installed
else:
validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}])
return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site")
def validateAmsHbaseEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
ams_env = self.getSiteProperties(configurations, "ams-env")
amsHbaseSite = self.getSiteProperties(configurations, "ams-hbase-site")
validationItems = []
mb = 1024 * 1024
gb = 1024 * mb
regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added
if regionServerItem:
validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}])
hbaseMasterHeapsizeItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize")
if hbaseMasterHeapsizeItem:
validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", ams_env, "metrics_collector_log_dir")
if logDirItem:
validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}])
hbase_master_heapsize = self.to_number(properties["hbase_master_heapsize"])
hbase_master_xmn_size = self.to_number(properties["hbase_master_xmn_size"])
hbase_regionserver_heapsize = self.to_number(properties["hbase_regionserver_heapsize"])
hbase_regionserver_xmn_size = self.to_number(properties["regionserver_xmn_size"])
# Validate Xmn settings.
masterXmnItem = None
regionServerXmnItem = None
is_hbase_distributed = amsHbaseSite.get("hbase.cluster.distributed").lower() == 'true'
if is_hbase_distributed:
if not regionServerItem and hbase_regionserver_heapsize > 32768:
regionServerItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.")
validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}])
minMasterXmn = 0.12 * hbase_master_heapsize
maxMasterXmn = 0.2 * hbase_master_heapsize
if hbase_master_xmn_size < minMasterXmn:
masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
"(12% of hbase_master_heapsize)".format(int(ceil(minMasterXmn))))
if hbase_master_xmn_size > maxMasterXmn:
masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
"(20% of hbase_master_heapsize)".format(int(floor(maxMasterXmn))))
minRegionServerXmn = 0.12 * hbase_regionserver_heapsize
maxRegionServerXmn = 0.2 * hbase_regionserver_heapsize
if hbase_regionserver_xmn_size < minRegionServerXmn:
regionServerXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
"(12% of hbase_regionserver_heapsize)"
.format(int(ceil(minRegionServerXmn))))
if hbase_regionserver_xmn_size > maxRegionServerXmn:
regionServerXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
"(20% of hbase_regionserver_heapsize)"
.format(int(floor(maxRegionServerXmn))))
else:
if not hbaseMasterHeapsizeItem and (hbase_master_heapsize + hbase_regionserver_heapsize) > 32768:
hbaseMasterHeapsizeItem = self.getWarnItem("Value of Master + Regionserver heapsize is more than the recommended maximum heap size of 32G.")
validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
minMasterXmn = 0.12 * (hbase_master_heapsize + hbase_regionserver_heapsize)
maxMasterXmn = 0.2 * (hbase_master_heapsize + hbase_regionserver_heapsize)
if hbase_master_xmn_size < minMasterXmn:
masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
"(12% of hbase_master_heapsize + hbase_regionserver_heapsize)"
.format(int(ceil(minMasterXmn))))
if hbase_master_xmn_size > maxMasterXmn:
masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
"(20% of hbase_master_heapsize + hbase_regionserver_heapsize)"
.format(int(floor(maxMasterXmn))))
if masterXmnItem:
validationItems.extend([{"config-name": "hbase_master_xmn_size", "item": masterXmnItem}])
if regionServerXmnItem:
validationItems.extend([{"config-name": "regionserver_xmn_size", "item": regionServerXmnItem}])
if hbaseMasterHeapsizeItem is None:
hostMasterComponents = {}
for service in services["services"]:
for component in service["components"]:
if component["StackServiceComponents"]["hostnames"] is not None:
for hostName in component["StackServiceComponents"]["hostnames"]:
if self.isMasterComponent(component):
if hostName not in hostMasterComponents.keys():
hostMasterComponents[hostName] = []
hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"])
amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
for collectorHostName in amsCollectorHosts:
for host in hosts["items"]:
if host["Hosts"]["host_name"] == collectorHostName:
# AMS Collector co-hosted with other master components in bigger clusters
if len(hosts['items']) > 31 and \
len(hostMasterComponents[collectorHostName]) > 2 and \
host["Hosts"]["total_mem"] < 32*mb: # < 32Gb(total_mem in k)
masterHostMessage = "Host {0} is used by multiple master components ({1}). " \
"It is recommended to use a separate host for the " \
"Ambari Metrics Collector component and ensure " \
"the host has sufficient memory available."
hbaseMasterHeapsizeItem = self.getWarnItem(masterHostMessage.format(
collectorHostName, str(", ".join(hostMasterComponents[collectorHostName]))))
if hbaseMasterHeapsizeItem:
validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
pass
return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env")
def validateAmsSiteConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
validationItems = []
serviceAdvisor = AMBARI_METRICSServiceAdvisor()
op_mode = properties.get("timeline.metrics.service.operation.mode")
correct_op_mode_item = None
if op_mode not in ("embedded", "distributed"):
correct_op_mode_item = self.getErrorItem("Correct value should be set.")
pass
elif len(self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")) > 1 and op_mode != 'distributed':
correct_op_mode_item = self.getErrorItem("Correct value should be 'distributed' for clusters with more then 1 Metrics collector")
elif op_mode == 'embedded':
collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts)
if total_sinks_count > 1000:
correct_op_mode_item = self.getWarnItem("Number of sinks writing metrics to collector is expected to be more than 1000. "
"'Embedded' mode AMS might not be able to handle the load. Consider moving to distributed mode.")
validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }])
return self.toConfigurationValidationProblems(validationItems, "ams-site")
def validateAmsEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
validationItems = []
collectorHeapsizeDefaultItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "metrics_collector_heapsize")
validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeDefaultItem}])
ams_env = self.getSiteProperties(configurations, "ams-env")
collector_heapsize = self.to_number(ams_env.get("metrics_collector_heapsize"))
if collector_heapsize > 32768:
collectorHeapsizeMaxItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.")
validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeMaxItem}])
return self.toConfigurationValidationProblems(validationItems, "ams-env")
def validateGrafanaEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
validationItems = []
grafana_pwd = properties.get("metrics_grafana_password")
grafana_pwd_length_item = None
if len(grafana_pwd) < 4:
grafana_pwd_length_item = self.getErrorItem("Grafana password length should be at least 4.")
pass
validationItems.extend([{"config-name":'metrics_grafana_password', "item": grafana_pwd_length_item }])
return self.toConfigurationValidationProblems(validationItems, "ams-site")