| #!/usr/bin/env ambari-python-wrap |
| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| |
| import re |
| import os |
| import sys |
| from math import ceil, floor |
| |
| from stack_advisor import DefaultStackAdvisor |
| |
| |
| class HDP206StackAdvisor(DefaultStackAdvisor): |
| |
| def getComponentLayoutValidations(self, services, hosts): |
| """Returns array of Validation objects about issues with hostnames components assigned to""" |
| items = [] |
| |
| # Validating NAMENODE and SECONDARY_NAMENODE are on different hosts if possible |
| hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] |
| hostsCount = len(hostsList) |
| |
| componentsListList = [service["components"] for service in services["services"]] |
| componentsList = [item for sublist in componentsListList for item in sublist] |
| nameNodeHosts = [component["StackServiceComponents"]["hostnames"] for component in componentsList if component["StackServiceComponents"]["component_name"] == "NAMENODE"] |
| secondaryNameNodeHosts = [component["StackServiceComponents"]["hostnames"] for component in componentsList if component["StackServiceComponents"]["component_name"] == "SECONDARY_NAMENODE"] |
| |
| # Validating cardinality |
| for component in componentsList: |
| if component["StackServiceComponents"]["cardinality"] is not None: |
| componentName = component["StackServiceComponents"]["component_name"] |
| componentDisplayName = component["StackServiceComponents"]["display_name"] |
| componentHostsCount = 0 |
| if component["StackServiceComponents"]["hostnames"] is not None: |
| componentHostsCount = len(component["StackServiceComponents"]["hostnames"]) |
| cardinality = str(component["StackServiceComponents"]["cardinality"]) |
| # cardinality types: null, 1+, 1-2, 1, ALL |
| message = None |
| if "+" in cardinality: |
| hostsMin = int(cardinality[:-1]) |
| if componentHostsCount < hostsMin: |
| message = "At least {0} {1} components should be installed in cluster.".format(hostsMin, componentDisplayName) |
| elif "-" in cardinality: |
| nums = cardinality.split("-") |
| hostsMin = int(nums[0]) |
| hostsMax = int(nums[1]) |
| if componentHostsCount > hostsMax or componentHostsCount < hostsMin: |
| message = "Between {0} and {1} {2} components should be installed in cluster.".format(hostsMin, hostsMax, componentDisplayName) |
| elif "ALL" == cardinality: |
| if componentHostsCount != hostsCount: |
| message = "{0} component should be installed on all hosts in cluster.".format(componentDisplayName) |
| else: |
| if componentHostsCount != int(cardinality): |
| message = "Exactly {0} {1} components should be installed in cluster.".format(int(cardinality), componentDisplayName) |
| |
| if message is not None: |
| items.append({"type": 'host-component', "level": 'ERROR', "message": message, "component-name": componentName}) |
| |
| # Validating host-usage |
| usedHostsListList = [component["StackServiceComponents"]["hostnames"] for component in componentsList if not self.isComponentNotValuable(component)] |
| usedHostsList = [item for sublist in usedHostsListList for item in sublist] |
| nonUsedHostsList = [item for item in hostsList if item not in usedHostsList] |
| for host in nonUsedHostsList: |
| items.append( { "type": 'host-component', "level": 'ERROR', "message": 'Host is not used', "host": str(host) } ) |
| |
| return items |
| |
| def getServiceConfigurationRecommenderDict(self): |
| return { |
| "YARN": self.recommendYARNConfigurations, |
| "MAPREDUCE2": self.recommendMapReduce2Configurations, |
| "HDFS": self.recommendHDFSConfigurations, |
| "HBASE": self.recommendHbaseConfigurations, |
| "STORM": self.recommendStormConfigurations, |
| "AMBARI_METRICS": self.recommendAmsConfigurations, |
| "RANGER": self.recommendRangerConfigurations |
| } |
| |
| def putProperty(self, config, configType, services=None): |
| userConfigs = {} |
| changedConfigs = [] |
| # if services parameter, prefer values, set by user |
| if services: |
| if 'configurations' in services.keys(): |
| userConfigs = services['configurations'] |
| if 'changed-configurations' in services.keys(): |
| changedConfigs = services["changed-configurations"] |
| |
| if configType not in config: |
| config[configType] = {} |
| if"properties" not in config[configType]: |
| config[configType]["properties"] = {} |
| def appendProperty(key, value): |
| # If property exists in changedConfigs, do not override, use user defined property |
| if self.__isPropertyInChangedConfigs(configType, key, changedConfigs): |
| config[configType]["properties"][key] = userConfigs[configType]['properties'][key] |
| else: |
| config[configType]["properties"][key] = str(value) |
| return appendProperty |
| |
| def __isPropertyInChangedConfigs(self, configType, propertyName, changedConfigs): |
| for changedConfig in changedConfigs: |
| if changedConfig['type']==configType and changedConfig['name']==propertyName: |
| return True |
| return False |
| |
| def putPropertyAttribute(self, config, configType): |
| if configType not in config: |
| config[configType] = {} |
| def appendPropertyAttribute(key, attribute, attributeValue): |
| if "property_attributes" not in config[configType]: |
| config[configType]["property_attributes"] = {} |
| if key not in config[configType]["property_attributes"]: |
| config[configType]["property_attributes"][key] = {} |
| config[configType]["property_attributes"][key][attribute] = attributeValue if isinstance(attributeValue, list) else str(attributeValue) |
| return appendPropertyAttribute |
| |
| def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): |
| putYarnProperty = self.putProperty(configurations, "yarn-site", services) |
| putYarnEnvProperty = self.putProperty(configurations, "yarn-env", services) |
| nodemanagerMinRam = 1048576 # 1TB in mb |
| if "referenceNodeManagerHost" in clusterData: |
| nodemanagerMinRam = min(clusterData["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam) |
| putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam)))) |
| putYarnProperty('yarn.scheduler.minimum-allocation-mb', int(clusterData['ramPerContainer'])) |
| putYarnProperty('yarn.scheduler.maximum-allocation-mb', int(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])) |
| putYarnEnvProperty('min_user_id', self.get_system_min_uid()) |
| containerExecutorGroup = 'hadoop' |
| if 'cluster-env' in services['configurations'] and 'user_group' in services['configurations']['cluster-env']['properties']: |
| containerExecutorGroup = services['configurations']['cluster-env']['properties']['user_group'] |
| putYarnProperty("yarn.nodemanager.linux-container-executor.group", containerExecutorGroup) |
| |
| def recommendMapReduce2Configurations(self, configurations, clusterData, services, hosts): |
| putMapredProperty = self.putProperty(configurations, "mapred-site", services) |
| putMapredProperty('yarn.app.mapreduce.am.resource.mb', int(clusterData['amMemory'])) |
| putMapredProperty('yarn.app.mapreduce.am.command-opts', "-Xmx" + str(int(round(0.8 * clusterData['amMemory']))) + "m") |
| putMapredProperty('mapreduce.map.memory.mb', clusterData['mapMemory']) |
| putMapredProperty('mapreduce.reduce.memory.mb', int(clusterData['reduceMemory'])) |
| putMapredProperty('mapreduce.map.java.opts', "-Xmx" + str(int(round(0.8 * clusterData['mapMemory']))) + "m") |
| putMapredProperty('mapreduce.reduce.java.opts', "-Xmx" + str(int(round(0.8 * clusterData['reduceMemory']))) + "m") |
| putMapredProperty('mapreduce.task.io.sort.mb', min(int(round(0.4 * clusterData['mapMemory'])), 1024)) |
| |
| def recommendHadoopProxyUsers (self, configurations, services, hosts): |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| users = {} |
| |
| if 'forced-configurations' not in services: |
| services["forced-configurations"] = [] |
| |
| if "HDFS" in servicesList: |
| hdfs_user = None |
| if "hadoop-env" in services["configurations"] and "hdfs_user" in services["configurations"]["hadoop-env"]["properties"]: |
| hdfs_user = services["configurations"]["hadoop-env"]["properties"]["hdfs_user"] |
| if not hdfs_user in users and hdfs_user is not None: |
| users[hdfs_user] = {"propertyHosts" : "*","propertyGroups" : "*", "config" : "hadoop-env", "propertyName" : "hdfs_user"} |
| |
| if "OOZIE" in servicesList: |
| oozie_user = None |
| if "oozie-env" in services["configurations"] and "oozie_user" in services["configurations"]["oozie-env"]["properties"]: |
| oozie_user = services["configurations"]["oozie-env"]["properties"]["oozie_user"] |
| oozieServerrHosts = self.getHostsWithComponent("OOZIE", "OOZIE_SERVER", services, hosts) |
| if oozieServerrHosts is not None: |
| oozieServerHostsNameList = [] |
| for oozieServerHost in oozieServerrHosts: |
| oozieServerHostsNameList.append(oozieServerHost["Hosts"]["public_host_name"]) |
| oozieServerHostsNames = ",".join(oozieServerHostsNameList) |
| if not oozie_user in users and oozie_user is not None: |
| users[oozie_user] = {"propertyHosts" : oozieServerHostsNames,"propertyGroups" : "*", "config" : "oozie-env", "propertyName" : "oozie_user"} |
| |
| if "HIVE" in servicesList: |
| hive_user = None |
| webhcat_user = None |
| if "hive-env" in services["configurations"] and "hive_user" in services["configurations"]["hive-env"]["properties"] \ |
| and "webhcat_user" in services["configurations"]["hive-env"]["properties"]: |
| hive_user = services["configurations"]["hive-env"]["properties"]["hive_user"] |
| webhcat_user = services["configurations"]["hive-env"]["properties"]["webhcat_user"] |
| hiveServerHosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER", services, hosts) |
| webHcatServerHosts = self.getHostsWithComponent("HIVE", "WEBHCAT_SERVER", services, hosts) |
| |
| if hiveServerHosts is not None: |
| hiveServerHostsNameList = [] |
| for hiveServerHost in hiveServerHosts: |
| hiveServerHostsNameList.append(hiveServerHost["Hosts"]["public_host_name"]) |
| hiveServerHostsNames = ",".join(hiveServerHostsNameList) |
| if not hive_user in users and hive_user is not None: |
| users[hive_user] = {"propertyHosts" : hiveServerHostsNames,"propertyGroups" : "*", "config" : "hive-env", "propertyName" : "hive_user"} |
| |
| if webHcatServerHosts is not None: |
| webHcatServerHostsNameList = [] |
| for webHcatServerHost in webHcatServerHosts: |
| webHcatServerHostsNameList.append(webHcatServerHost["Hosts"]["public_host_name"]) |
| webHcatServerHostsNames = ",".join(webHcatServerHostsNameList) |
| if not webhcat_user in users and webhcat_user is not None: |
| users[webhcat_user] = {"propertyHosts" : webHcatServerHostsNames,"propertyGroups" : "*", "config" : "hive-env", "propertyName" : "webhcat_user"} |
| |
| if "FALCON" in servicesList: |
| falconUser = None |
| if "falcon-env" in services["configurations"] and "falcon_user" in services["configurations"]["falcon-env"]["properties"]: |
| falconUser = services["configurations"]["falcon-env"]["properties"]["falcon_user"] |
| if not falconUser in users and falconUser is not None: |
| users[falconUser] = {"propertyHosts" : "*","propertyGroups" : "*", "config" : "falcon-env", "propertyName" : "falcon_user"} |
| |
| putCoreSiteProperty = self.putProperty(configurations, "core-site", services) |
| putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site") |
| |
| for user_name, user_properties in users.iteritems(): |
| # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users |
| putCoreSiteProperty("hadoop.proxyuser.{0}.hosts".format(user_name) , user_properties["propertyHosts"]) |
| putCoreSiteProperty("hadoop.proxyuser.{0}.groups".format(user_name) , user_properties["propertyGroups"]) |
| |
| # Remove old properties if user was renamed |
| userOldValue = getOldValue(self, services, user_properties["config"], user_properties["propertyName"]) |
| if userOldValue is not None and userOldValue != user_name: |
| putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true') |
| putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true') |
| services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)}) |
| services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)}) |
| services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)}) |
| services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)}) |
| |
| def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts): |
| putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) |
| putHDFSSiteProperty = self.putProperty(configurations, "hdfs-site", services) |
| putHDFSSitePropertyAttributes = self.putPropertyAttribute(configurations, "hdfs-site") |
| putHDFSProperty('namenode_heapsize', max(int(clusterData['totalAvailableRam'] / 2), 1024)) |
| putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) |
| putHDFSProperty('namenode_opt_newsize', max(int(clusterData['totalAvailableRam'] / 8), 128)) |
| putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) |
| putHDFSProperty('namenode_opt_maxnewsize', max(int(clusterData['totalAvailableRam'] / 8), 256)) |
| |
| # Check if NN HA is enabled and recommend removing dfs.namenode.rpc-address |
| hdfsSiteProperties = getServicesSiteProperties(services, "hdfs-site") |
| nameServices = None |
| if hdfsSiteProperties and 'dfs.nameservices' in hdfsSiteProperties: |
| nameServices = hdfsSiteProperties['dfs.nameservices'] |
| if nameServices and "dfs.ha.namenodes.%s" % nameServices in hdfsSiteProperties: |
| namenodes = hdfsSiteProperties["dfs.ha.namenodes.%s" % nameServices] |
| if len(namenodes.split(',')) > 1: |
| putHDFSSitePropertyAttributes("dfs.namenode.rpc-address", "delete", "true") |
| |
| # recommendations for "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" properties in core-site |
| self.recommendHadoopProxyUsers(configurations, services, hosts) |
| |
| def recommendHbaseConfigurations(self, configurations, clusterData, services, hosts): |
| # recommendations for HBase env config |
| |
| # If cluster size is < 100, hbase master heap = 2G |
| # else If cluster size is < 500, hbase master heap = 4G |
| # else hbase master heap = 8G |
| # for small test clusters use 1 gb |
| hostsCount = 0 |
| if hosts and "items" in hosts: |
| hostsCount = len(hosts["items"]) |
| |
| hbaseMasterRam = { |
| hostsCount < 20: 1, |
| 20 <= hostsCount < 100: 2, |
| 100 <= hostsCount < 500: 4, |
| 500 <= hostsCount: 8 |
| }[True] |
| |
| putHbaseProperty = self.putProperty(configurations, "hbase-env", services) |
| putHbaseProperty('hbase_regionserver_heapsize', int(clusterData['hbaseRam']) * 1024) |
| putHbaseProperty('hbase_master_heapsize', hbaseMasterRam * 1024) |
| |
| # recommendations for HBase site config |
| putHbaseSiteProperty = self.putProperty(configurations, "hbase-site", services) |
| |
| if 'hbase-site' in services['configurations'] and 'hbase.superuser' in services['configurations']['hbase-site']['properties'] \ |
| and 'hbase-env' in services['configurations'] and 'hbase_user' in services['configurations']['hbase-env']['properties'] \ |
| and services['configurations']['hbase-env']['properties']['hbase_user'] != services['configurations']['hbase-site']['properties']['hbase.superuser']: |
| putHbaseSiteProperty("hbase.superuser", services['configurations']['hbase-env']['properties']['hbase_user']) |
| |
| |
| def recommendRangerConfigurations(self, configurations, clusterData, services, hosts): |
| ranger_sql_connector_dict = { |
| 'MYSQL': '/usr/share/java/mysql-connector-java.jar', |
| 'ORACLE': '/usr/share/java/ojdbc6.jar', |
| 'POSTGRES': '/usr/share/java/postgresql.jar', |
| 'MSSQL': '/usr/share/java/sqljdbc4.jar', |
| 'SQLA': '/path_to_driver/sqla-client-jdbc.tar.gz' |
| } |
| |
| putRangerAdminProperty = self.putProperty(configurations, "admin-properties", services) |
| |
| if 'admin-properties' in services['configurations'] and 'DB_FLAVOR' in services['configurations']['admin-properties']['properties']: |
| rangerDbFlavor = services['configurations']["admin-properties"]["properties"]["DB_FLAVOR"] |
| rangerSqlConnectorProperty = ranger_sql_connector_dict.get(rangerDbFlavor, ranger_sql_connector_dict['MYSQL']) |
| putRangerAdminProperty('SQL_CONNECTOR_JAR', rangerSqlConnectorProperty) |
| |
| # Build policymgr_external_url |
| protocol = 'http' |
| ranger_admin_host = 'localhost' |
| port = '6080' |
| |
| # Check if http is disabled. For HDP-2.3 this can be checked in ranger-admin-site/ranger.service.http.enabled |
| # For Ranger-0.4.0 this can be checked in ranger-site/http.enabled |
| if ('ranger-site' in services['configurations'] and 'http.enabled' in services['configurations']['ranger-site']['properties'] \ |
| and services['configurations']['ranger-site']['properties']['http.enabled'].lower() == 'false') or \ |
| ('ranger-admin-site' in services['configurations'] and 'ranger.service.http.enabled' in services['configurations']['ranger-admin-site']['properties'] \ |
| and services['configurations']['ranger-admin-site']['properties']['ranger.service.http.enabled'].lower() == 'false'): |
| # HTTPS protocol is used |
| protocol = 'https' |
| # Starting Ranger-0.5.0.2.3 port stored in ranger-admin-site ranger.service.https.port |
| if 'ranger-admin-site' in services['configurations'] and \ |
| 'ranger.service.https.port' in services['configurations']['ranger-admin-site']['properties']: |
| port = services['configurations']['ranger-admin-site']['properties']['ranger.service.https.port'] |
| # In Ranger-0.4.0 port stored in ranger-site https.service.port |
| elif 'ranger-site' in services['configurations'] and \ |
| 'https.service.port' in services['configurations']['ranger-site']['properties']: |
| port = services['configurations']['ranger-site']['properties']['https.service.port'] |
| else: |
| # HTTP protocol is used |
| # Starting Ranger-0.5.0.2.3 port stored in ranger-admin-site ranger.service.http.port |
| if 'ranger-admin-site' in services['configurations'] and \ |
| 'ranger.service.http.port' in services['configurations']['ranger-admin-site']['properties']: |
| port = services['configurations']['ranger-admin-site']['properties']['ranger.service.http.port'] |
| # In Ranger-0.4.0 port stored in ranger-site http.service.port |
| elif 'ranger-site' in services['configurations'] and \ |
| 'http.service.port' in services['configurations']['ranger-site']['properties']: |
| port = services['configurations']['ranger-site']['properties']['http.service.port'] |
| |
| ranger_admin_hosts = self.getComponentHostNames(services, "RANGER", "RANGER_ADMIN") |
| if ranger_admin_hosts: |
| if len(ranger_admin_hosts) > 1 \ |
| and services['configurations'] \ |
| and 'admin-properties' in services['configurations'] and 'policymgr_external_url' in services['configurations']['admin-properties']['properties'] \ |
| and services['configurations']['admin-properties']['properties']['policymgr_external_url'] \ |
| and services['configurations']['admin-properties']['properties']['policymgr_external_url'].strip(): |
| |
| # in case of HA deployment keep the policymgr_external_url specified in the config |
| policymgr_external_url = services['configurations']['admin-properties']['properties']['policymgr_external_url'] |
| else: |
| |
| ranger_admin_host = ranger_admin_hosts[0] |
| policymgr_external_url = "%s://%s:%s" % (protocol, ranger_admin_host, port) |
| |
| putRangerAdminProperty('policymgr_external_url', policymgr_external_url) |
| |
| rangerServiceVersion = [service['StackServices']['service_version'] for service in services["services"] if service['StackServices']['service_name'] == 'RANGER'][0] |
| if rangerServiceVersion == '0.4.0': |
| # Recommend ldap settings based on ambari.properties configuration |
| # If 'ambari.ldap.isConfigured' == true |
| # For Ranger version 0.4.0 |
| if 'ambari-server-properties' in services and \ |
| 'ambari.ldap.isConfigured' in services['ambari-server-properties'] and \ |
| services['ambari-server-properties']['ambari.ldap.isConfigured'].lower() == "true": |
| putUserSyncProperty = self.putProperty(configurations, "usersync-properties", services) |
| serverProperties = services['ambari-server-properties'] |
| if 'authentication.ldap.managerDn' in serverProperties: |
| putUserSyncProperty('SYNC_LDAP_BIND_DN', serverProperties['authentication.ldap.managerDn']) |
| if 'authentication.ldap.primaryUrl' in serverProperties: |
| ldap_protocol = 'ldap://' |
| if 'authentication.ldap.useSSL' in serverProperties and serverProperties['authentication.ldap.useSSL'] == 'true': |
| ldap_protocol = 'ldaps://' |
| ldapUrl = ldap_protocol + serverProperties['authentication.ldap.primaryUrl'] if serverProperties['authentication.ldap.primaryUrl'] else serverProperties['authentication.ldap.primaryUrl'] |
| putUserSyncProperty('SYNC_LDAP_URL', ldapUrl) |
| if 'authentication.ldap.userObjectClass' in serverProperties: |
| putUserSyncProperty('SYNC_LDAP_USER_OBJECT_CLASS', serverProperties['authentication.ldap.userObjectClass']) |
| if 'authentication.ldap.usernameAttribute' in serverProperties: |
| putUserSyncProperty('SYNC_LDAP_USER_NAME_ATTRIBUTE', serverProperties['authentication.ldap.usernameAttribute']) |
| |
| |
| # Set Ranger Admin Authentication method |
| if 'admin-properties' in services['configurations'] and 'usersync-properties' in services['configurations'] and \ |
| 'SYNC_SOURCE' in services['configurations']['usersync-properties']['properties']: |
| rangerUserSyncSource = services['configurations']['usersync-properties']['properties']['SYNC_SOURCE'] |
| authenticationMethod = rangerUserSyncSource.upper() |
| if authenticationMethod != 'FILE': |
| putRangerAdminProperty('authentication_method', authenticationMethod) |
| |
| # Recommend xasecure.audit.destination.hdfs.dir |
| # For Ranger version 0.4.0 |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| putRangerEnvProperty = self.putProperty(configurations, "ranger-env", services) |
| include_hdfs = "HDFS" in servicesList |
| if include_hdfs: |
| if 'core-site' in services['configurations'] and ('fs.defaultFS' in services['configurations']['core-site']['properties']): |
| default_fs = services['configurations']['core-site']['properties']['fs.defaultFS'] |
| default_fs += '/ranger/audit/%app-type%/%time:yyyyMMdd%' |
| putRangerEnvProperty('xasecure.audit.destination.hdfs.dir', default_fs) |
| |
| # Recommend Ranger Audit properties for ranger supported services |
| # For Ranger version 0.4.0 |
| ranger_services = [ |
| {'service_name': 'HDFS', 'audit_file': 'ranger-hdfs-plugin-properties'}, |
| {'service_name': 'HBASE', 'audit_file': 'ranger-hbase-plugin-properties'}, |
| {'service_name': 'HIVE', 'audit_file': 'ranger-hive-plugin-properties'}, |
| {'service_name': 'KNOX', 'audit_file': 'ranger-knox-plugin-properties'}, |
| {'service_name': 'STORM', 'audit_file': 'ranger-storm-plugin-properties'} |
| ] |
| |
| for item in range(len(ranger_services)): |
| if ranger_services[item]['service_name'] in servicesList: |
| component_audit_file = ranger_services[item]['audit_file'] |
| if component_audit_file in services["configurations"]: |
| ranger_audit_dict = [ |
| {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.db', 'target_configname': 'XAAUDIT.DB.IS_ENABLED'}, |
| {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.hdfs', 'target_configname': 'XAAUDIT.HDFS.IS_ENABLED'}, |
| {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.hdfs.dir', 'target_configname': 'XAAUDIT.HDFS.DESTINATION_DIRECTORY'} |
| ] |
| putRangerAuditProperty = self.putProperty(configurations, component_audit_file, services) |
| |
| for item in ranger_audit_dict: |
| if item['filename'] in services["configurations"] and item['configname'] in services["configurations"][item['filename']]["properties"]: |
| if item['filename'] in configurations and item['configname'] in configurations[item['filename']]["properties"]: |
| rangerAuditProperty = configurations[item['filename']]["properties"][item['configname']] |
| else: |
| rangerAuditProperty = services["configurations"][item['filename']]["properties"][item['configname']] |
| putRangerAuditProperty(item['target_configname'], rangerAuditProperty) |
| |
| |
| def getAmsMemoryRecommendation(self, services, hosts): |
| # MB per sink in hbase heapsize |
| HEAP_PER_MASTER_COMPONENT = 50 |
| HEAP_PER_SLAVE_COMPONENT = 10 |
| |
| schMemoryMap = { |
| "HDFS": { |
| "NAMENODE": HEAP_PER_MASTER_COMPONENT, |
| "DATANODE": HEAP_PER_SLAVE_COMPONENT |
| }, |
| "YARN": { |
| "RESOURCEMANAGER": HEAP_PER_MASTER_COMPONENT, |
| }, |
| "HBASE": { |
| "HBASE_MASTER": HEAP_PER_MASTER_COMPONENT, |
| "HBASE_REGIONSERVER": HEAP_PER_SLAVE_COMPONENT |
| }, |
| "ACCUMULO": { |
| "ACCUMULO_MASTER": HEAP_PER_MASTER_COMPONENT, |
| "ACCUMULO_TSERVER": HEAP_PER_SLAVE_COMPONENT |
| }, |
| "KAFKA": { |
| "KAFKA_BROKER": HEAP_PER_MASTER_COMPONENT |
| }, |
| "FLUME": { |
| "FLUME_HANDLER": HEAP_PER_SLAVE_COMPONENT |
| }, |
| "STORM": { |
| "NIMBUS": HEAP_PER_MASTER_COMPONENT, |
| }, |
| "AMBARI_METRICS": { |
| "METRICS_COLLECTOR": HEAP_PER_MASTER_COMPONENT, |
| "METRICS_MONITOR": HEAP_PER_SLAVE_COMPONENT |
| } |
| } |
| total_sinks_count = 0 |
| # minimum heap size |
| hbase_heapsize = 500 |
| for serviceName, componentsDict in schMemoryMap.items(): |
| for componentName, multiplier in componentsDict.items(): |
| schCount = len( |
| self.getHostsWithComponent(serviceName, componentName, services, |
| hosts)) |
| hbase_heapsize += int((schCount * multiplier) ** 0.9) |
| total_sinks_count += schCount |
| collector_heapsize = int(hbase_heapsize/4 if hbase_heapsize > 2048 else 512) |
| |
| return round_to_n(collector_heapsize), round_to_n(hbase_heapsize), total_sinks_count |
| |
| def recommendStormConfigurations(self, configurations, clusterData, services, hosts): |
| putStormSiteProperty = self.putProperty(configurations, "storm-site", services) |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| # Storm AMS integration |
| if 'AMBARI_METRICS' in servicesList: |
| putStormSiteProperty('metrics.reporter.register', 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter') |
| |
| def recommendAmsConfigurations(self, configurations, clusterData, services, hosts): |
| putAmsEnvProperty = self.putProperty(configurations, "ams-env", services) |
| putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services) |
| putAmsSiteProperty = self.putProperty(configurations, "ams-site", services) |
| putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env", services) |
| putGrafanaProperty = self.putProperty(configurations, "ams-grafana-env", services) |
| putGrafanaPropertyAttribute = self.putPropertyAttribute(configurations, "ams-grafana-env") |
| |
| amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") |
| |
| if 'cluster-env' in services['configurations'] and \ |
| 'metrics_collector_vip_host' in services['configurations']['cluster-env']['properties']: |
| metric_collector_host = services['configurations']['cluster-env']['properties']['metrics_collector_vip_host'] |
| else: |
| metric_collector_host = 'localhost' if len(amsCollectorHosts) == 0 else amsCollectorHosts[0] |
| |
| putAmsSiteProperty("timeline.metrics.service.webapp.address", str(metric_collector_host) + ":6188") |
| |
| log_dir = "/var/log/ambari-metrics-collector" |
| if "ams-env" in services["configurations"]: |
| if "metrics_collector_log_dir" in services["configurations"]["ams-env"]["properties"]: |
| log_dir = services["configurations"]["ams-env"]["properties"]["metrics_collector_log_dir"] |
| putHbaseEnvProperty("hbase_log_dir", log_dir) |
| |
| defaultFs = 'file:///' |
| if "core-site" in services["configurations"] and \ |
| "fs.defaultFS" in services["configurations"]["core-site"]["properties"]: |
| defaultFs = services["configurations"]["core-site"]["properties"]["fs.defaultFS"] |
| |
| operatingMode = "embedded" |
| if "ams-site" in services["configurations"]: |
| if "timeline.metrics.service.operation.mode" in services["configurations"]["ams-site"]["properties"]: |
| operatingMode = services["configurations"]["ams-site"]["properties"]["timeline.metrics.service.operation.mode"] |
| |
| if operatingMode == "distributed": |
| putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'true') |
| putAmsSiteProperty("timeline.metrics.host.aggregator.ttl", 259200) |
| putAmsHbaseSiteProperty("hbase.cluster.distributed", 'true') |
| else: |
| putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'false') |
| putAmsSiteProperty("timeline.metrics.host.aggregator.ttl", 86400) |
| putAmsHbaseSiteProperty("hbase.cluster.distributed", 'false') |
| |
| rootDir = "file:///var/lib/ambari-metrics-collector/hbase" |
| tmpDir = "/var/lib/ambari-metrics-collector/hbase-tmp" |
| zk_port_default = [] |
| if "ams-hbase-site" in services["configurations"]: |
| if "hbase.rootdir" in services["configurations"]["ams-hbase-site"]["properties"]: |
| rootDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.rootdir"] |
| if "hbase.tmp.dir" in services["configurations"]["ams-hbase-site"]["properties"]: |
| tmpDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.tmp.dir"] |
| if "hbase.zookeeper.property.clientPort" in services["configurations"]["ams-hbase-site"]["properties"]: |
| zk_port_default = services["configurations"]["ams-hbase-site"]["properties"]["hbase.zookeeper.property.clientPort"] |
| |
| # Skip recommendation item if default value is present |
| if operatingMode == "distributed" and not "{{zookeeper_clientPort}}" in zk_port_default: |
| zkPort = self.getZKPort(services) |
| putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", zkPort) |
| elif operatingMode == "embedded" and not "{{zookeeper_clientPort}}" in zk_port_default: |
| putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", "61181") |
| |
| mountpoints = ["/"] |
| for collectorHostName in amsCollectorHosts: |
| for host in hosts["items"]: |
| if host["Hosts"]["host_name"] == collectorHostName: |
| mountpoints = self.getPreferredMountPoints(host["Hosts"]) |
| break |
| isLocalRootDir = rootDir.startswith("file://") or (defaultFs.startswith("file://") and rootDir.startswith("/")) |
| if isLocalRootDir: |
| rootDir = re.sub("^file:///|/", "", rootDir, count=1) |
| rootDir = "file://" + os.path.join(mountpoints[0], rootDir) |
| tmpDir = re.sub("^file:///|/", "", tmpDir, count=1) |
| if len(mountpoints) > 1 and isLocalRootDir: |
| tmpDir = os.path.join(mountpoints[1], tmpDir) |
| else: |
| tmpDir = os.path.join(mountpoints[0], tmpDir) |
| putAmsHbaseSiteProperty("hbase.tmp.dir", tmpDir) |
| |
| if operatingMode == "distributed": |
| putAmsHbaseSiteProperty("hbase.rootdir", defaultFs + "/user/ams/hbase") |
| |
| if operatingMode == "embedded": |
| if isLocalRootDir: |
| putAmsHbaseSiteProperty("hbase.rootdir", rootDir) |
| else: |
| putAmsHbaseSiteProperty("hbase.rootdir", "file:///var/lib/ambari-metrics-collector/hbase") |
| |
| collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) |
| |
| putAmsEnvProperty("metrics_collector_heapsize", collector_heapsize) |
| |
| # blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25 |
| putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3) |
| putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 134217728) |
| putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35) |
| putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3) |
| |
| if len(amsCollectorHosts) > 1: |
| pass |
| else: |
| # blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3 |
| if total_sinks_count >= 2000: |
| putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) |
| putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) |
| putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) |
| putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) |
| putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3) |
| putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25) |
| putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20) |
| putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000) |
| putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30) |
| putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000) |
| elif total_sinks_count >= 500: |
| putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) |
| putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) |
| putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) |
| putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) |
| putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000) |
| putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000) |
| else: |
| putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000) |
| pass |
| |
| metrics_api_handlers = min(50, max(20, int(total_sinks_count / 100))) |
| putAmsSiteProperty("timeline.metrics.service.handler.thread.count", metrics_api_handlers) |
| |
| # Distributed mode heap size |
| if operatingMode == "distributed": |
| hbase_heapsize = max(hbase_heapsize, 768) |
| putHbaseEnvProperty("hbase_master_heapsize", "512") |
| putHbaseEnvProperty("hbase_master_xmn_size", "102") #20% of 512 heap size |
| putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_heapsize) |
| putHbaseEnvProperty("regionserver_xmn_size", round_to_n(0.15*hbase_heapsize,64)) |
| else: |
| # Embedded mode heap size : master + regionserver |
| hbase_rs_heapsize = 768 |
| putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_rs_heapsize) |
| putHbaseEnvProperty("hbase_master_heapsize", hbase_heapsize) |
| putHbaseEnvProperty("hbase_master_xmn_size", round_to_n(0.15*(hbase_heapsize+hbase_rs_heapsize),64)) |
| |
| # If no local DN in distributed mode |
| if operatingMode == "distributed": |
| dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") |
| # call by Kerberos wizard sends only the service being affected |
| # so it is possible for dn_hosts to be None but not amsCollectorHosts |
| if dn_hosts and len(dn_hosts) > 0: |
| if set(amsCollectorHosts).intersection(dn_hosts): |
| collector_cohosted_with_dn = "true" |
| else: |
| collector_cohosted_with_dn = "false" |
| putAmsHbaseSiteProperty("dfs.client.read.shortcircuit", collector_cohosted_with_dn) |
| |
| #split points |
| scriptDir = os.path.dirname(os.path.abspath(__file__)) |
| metricsDir = os.path.join(scriptDir, '../../../../common-services/AMBARI_METRICS/0.1.0/package') |
| serviceMetricsDir = os.path.join(metricsDir, 'files', 'service-metrics') |
| sys.path.append(os.path.join(metricsDir, 'scripts')) |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| |
| from split_points import FindSplitPointsForAMSRegions |
| |
| ams_hbase_site = None |
| ams_hbase_env = None |
| |
| # Overriden properties form the UI |
| if "ams-hbase-site" in services["configurations"]: |
| ams_hbase_site = services["configurations"]["ams-hbase-site"]["properties"] |
| if "ams-hbase-env" in services["configurations"]: |
| ams_hbase_env = services["configurations"]["ams-hbase-env"]["properties"] |
| |
| # Recommendations |
| if not ams_hbase_site: |
| ams_hbase_site = configurations["ams-hbase-site"]["properties"] |
| if not ams_hbase_env: |
| ams_hbase_env = configurations["ams-hbase-env"]["properties"] |
| |
| split_point_finder = FindSplitPointsForAMSRegions( |
| ams_hbase_site, ams_hbase_env, serviceMetricsDir, operatingMode, servicesList) |
| |
| result = split_point_finder.get_split_points() |
| precision_splits = ' ' |
| aggregate_splits = ' ' |
| if result.precision: |
| precision_splits = result.precision |
| if result.aggregate: |
| aggregate_splits = result.aggregate |
| putAmsSiteProperty("timeline.metrics.host.aggregate.splitpoints", ','.join(precision_splits)) |
| putAmsSiteProperty("timeline.metrics.cluster.aggregate.splitpoints", ','.join(aggregate_splits)) |
| |
| component_grafana_exists = False |
| for service in services['services']: |
| if 'components' in service: |
| for component in service['components']: |
| if 'StackServiceComponents' in component: |
| # If Grafana is installed the hostnames would indicate its location |
| if 'METRICS_GRAFANA' in component['StackServiceComponents']['component_name'] and\ |
| len(component['StackServiceComponents']['hostnames']) != 0: |
| component_grafana_exists = True |
| break |
| pass |
| |
| if not component_grafana_exists: |
| putGrafanaPropertyAttribute("metrics_grafana_password", "visible", "false") |
| |
| pass |
| |
| def getHostNamesWithComponent(self, serviceName, componentName, services): |
| """ |
| Returns the list of hostnames on which service component is installed |
| """ |
| if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: |
| service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] |
| components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] |
| if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): |
| componentHostnames = components[0]["StackServiceComponents"]["hostnames"] |
| return componentHostnames |
| return [] |
| |
| def getHostsWithComponent(self, serviceName, componentName, services, hosts): |
| if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: |
| service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] |
| components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] |
| if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): |
| componentHostnames = components[0]["StackServiceComponents"]["hostnames"] |
| componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames] |
| return componentHosts |
| return [] |
| |
| def getHostWithComponent(self, serviceName, componentName, services, hosts): |
| componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts) |
| if (len(componentHosts) > 0): |
| return componentHosts[0] |
| return None |
| |
| def getHostComponentsByCategories(self, hostname, categories, services, hosts): |
| components = [] |
| if services is not None and hosts is not None: |
| for service in services["services"]: |
| components.extend([componentEntry for componentEntry in service["components"] |
| if componentEntry["StackServiceComponents"]["component_category"] in categories |
| and hostname in componentEntry["StackServiceComponents"]["hostnames"]]) |
| return components |
| |
| def getZKHostPortString(self, services, include_port=True): |
| """ |
| Returns the comma delimited string of zookeeper server host with the configure port installed in a cluster |
| Example: zk.host1.org:2181,zk.host2.org:2181,zk.host3.org:2181 |
| include_port boolean param -> If port is also needed. |
| """ |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| include_zookeeper = "ZOOKEEPER" in servicesList |
| zookeeper_host_port = '' |
| |
| if include_zookeeper: |
| zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services) |
| zookeeper_host_port_arr = [] |
| |
| if include_port: |
| zookeeper_port = self.getZKPort(services) |
| for i in range(len(zookeeper_hosts)): |
| zookeeper_host_port_arr.append(zookeeper_hosts[i] + ':' + zookeeper_port) |
| else: |
| for i in range(len(zookeeper_hosts)): |
| zookeeper_host_port_arr.append(zookeeper_hosts[i]) |
| |
| zookeeper_host_port = ",".join(zookeeper_host_port_arr) |
| return zookeeper_host_port |
| |
| def getZKPort(self, services): |
| zookeeper_port = '2181' #default port |
| if 'zoo.cfg' in services['configurations'] and ('clientPort' in services['configurations']['zoo.cfg']['properties']): |
| zookeeper_port = services['configurations']['zoo.cfg']['properties']['clientPort'] |
| return zookeeper_port |
| |
| def getConfigurationClusterSummary(self, servicesList, hosts, components, services): |
| |
| hBaseInstalled = False |
| if 'HBASE' in servicesList: |
| hBaseInstalled = True |
| |
| cluster = { |
| "cpu": 0, |
| "disk": 0, |
| "ram": 0, |
| "hBaseInstalled": hBaseInstalled, |
| "components": components |
| } |
| |
| if len(hosts["items"]) > 0: |
| nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) |
| # NodeManager host with least memory is generally used in calculations as it will work in larger hosts. |
| if nodeManagerHosts is not None and len(nodeManagerHosts) > 0: |
| nodeManagerHost = nodeManagerHosts[0]; |
| for nmHost in nodeManagerHosts: |
| if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]: |
| nodeManagerHost = nmHost |
| host = nodeManagerHost["Hosts"] |
| cluster["referenceNodeManagerHost"] = host |
| else: |
| host = hosts["items"][0]["Hosts"] |
| cluster["referenceHost"] = host |
| cluster["cpu"] = host["cpu_count"] |
| cluster["disk"] = len(host["disk_info"]) |
| cluster["ram"] = int(host["total_mem"] / (1024 * 1024)) |
| |
| ramRecommendations = [ |
| {"os":1, "hbase":1}, |
| {"os":2, "hbase":1}, |
| {"os":2, "hbase":2}, |
| {"os":4, "hbase":4}, |
| {"os":6, "hbase":8}, |
| {"os":8, "hbase":8}, |
| {"os":8, "hbase":8}, |
| {"os":12, "hbase":16}, |
| {"os":24, "hbase":24}, |
| {"os":32, "hbase":32}, |
| {"os":64, "hbase":32} |
| ] |
| index = { |
| cluster["ram"] <= 4: 0, |
| 4 < cluster["ram"] <= 8: 1, |
| 8 < cluster["ram"] <= 16: 2, |
| 16 < cluster["ram"] <= 24: 3, |
| 24 < cluster["ram"] <= 48: 4, |
| 48 < cluster["ram"] <= 64: 5, |
| 64 < cluster["ram"] <= 72: 6, |
| 72 < cluster["ram"] <= 96: 7, |
| 96 < cluster["ram"] <= 128: 8, |
| 128 < cluster["ram"] <= 256: 9, |
| 256 < cluster["ram"]: 10 |
| }[1] |
| |
| |
| cluster["reservedRam"] = ramRecommendations[index]["os"] |
| cluster["hbaseRam"] = ramRecommendations[index]["hbase"] |
| |
| |
| cluster["minContainerSize"] = { |
| cluster["ram"] <= 4: 256, |
| 4 < cluster["ram"] <= 8: 512, |
| 8 < cluster["ram"] <= 24: 1024, |
| 24 < cluster["ram"]: 2048 |
| }[1] |
| |
| totalAvailableRam = cluster["ram"] - cluster["reservedRam"] |
| if cluster["hBaseInstalled"]: |
| totalAvailableRam -= cluster["hbaseRam"] |
| cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024) |
| '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))''' |
| cluster["containers"] = round(max(3, |
| min(2 * cluster["cpu"], |
| min(ceil(1.8 * cluster["disk"]), |
| cluster["totalAvailableRam"] / cluster["minContainerSize"])))) |
| |
| '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers''' |
| cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / cluster["containers"]) |
| '''If greater than 1GB, value will be in multiples of 512.''' |
| if cluster["ramPerContainer"] > 1024: |
| cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512 |
| |
| cluster["mapMemory"] = int(cluster["ramPerContainer"]) |
| cluster["reduceMemory"] = cluster["ramPerContainer"] |
| cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"]) |
| |
| return cluster |
| |
| def getConfigurationsValidationItems(self, services, hosts): |
| """Returns array of Validation objects about issues with configuration values provided in services""" |
| items = [] |
| |
| recommendations = self.recommendConfigurations(services, hosts) |
| recommendedDefaults = recommendations["recommendations"]["blueprint"]["configurations"] |
| |
| configurations = services["configurations"] |
| for service in services["services"]: |
| serviceName = service["StackServices"]["service_name"] |
| validator = self.validateServiceConfigurations(serviceName) |
| if validator is not None: |
| for siteName, method in validator.items(): |
| if siteName in recommendedDefaults: |
| siteProperties = getSiteProperties(configurations, siteName) |
| if siteProperties is not None: |
| siteRecommendations = recommendedDefaults[siteName]["properties"] |
| print("SiteName: %s, method: %s\n" % (siteName, method.__name__)) |
| print("Site properties: %s\n" % str(siteProperties)) |
| print("Recommendations: %s\n********\n" % str(siteRecommendations)) |
| resultItems = method(siteProperties, siteRecommendations, configurations, services, hosts) |
| items.extend(resultItems) |
| |
| clusterWideItems = self.validateClusterConfigurations(configurations, services, hosts) |
| items.extend(clusterWideItems) |
| self.validateMinMax(items, recommendedDefaults, configurations) |
| return items |
| |
| def validateClusterConfigurations(self, configurations, services, hosts): |
| validationItems = [] |
| |
| return self.toConfigurationValidationProblems(validationItems, "") |
| |
| def getServiceConfigurationValidators(self): |
| return { |
| "HDFS": {"hadoop-env": self.validateHDFSConfigurationsEnv}, |
| "MAPREDUCE2": {"mapred-site": self.validateMapReduce2Configurations}, |
| "YARN": {"yarn-site": self.validateYARNConfigurations}, |
| "HBASE": {"hbase-env": self.validateHbaseEnvConfigurations}, |
| "STORM": {"storm-site": self.validateStormConfigurations}, |
| "AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations, |
| "ams-hbase-env": self.validateAmsHbaseEnvConfigurations, |
| "ams-site": self.validateAmsSiteConfigurations} |
| } |
| |
| def validateMinMax(self, items, recommendedDefaults, configurations): |
| |
| # required for casting to the proper numeric type before comparison |
| def convertToNumber(number): |
| try: |
| return int(number) |
| except ValueError: |
| return float(number) |
| |
| for configName in configurations: |
| validationItems = [] |
| if configName in recommendedDefaults and "property_attributes" in recommendedDefaults[configName]: |
| for propertyName in recommendedDefaults[configName]["property_attributes"]: |
| if propertyName in configurations[configName]["properties"]: |
| if "maximum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ |
| propertyName in recommendedDefaults[configName]["properties"]: |
| userValue = convertToNumber(configurations[configName]["properties"][propertyName]) |
| maxValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["maximum"]) |
| if userValue > maxValue: |
| validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is greater than the recommended maximum of {0} ".format(maxValue))}]) |
| if "minimum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ |
| propertyName in recommendedDefaults[configName]["properties"]: |
| userValue = convertToNumber(configurations[configName]["properties"][propertyName]) |
| minValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["minimum"]) |
| if userValue < minValue: |
| validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is less than the recommended minimum of {0} ".format(minValue))}]) |
| items.extend(self.toConfigurationValidationProblems(validationItems, configName)) |
| pass |
| |
| def validateAmsSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| |
| op_mode = properties.get("timeline.metrics.service.operation.mode") |
| correct_op_mode_item = None |
| if op_mode not in ("embedded", "distributed"): |
| correct_op_mode_item = self.getErrorItem("Correct value should be set.") |
| pass |
| |
| validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }]) |
| return self.toConfigurationValidationProblems(validationItems, "ams-site") |
| |
| def validateAmsHbaseSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| |
| amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") |
| ams_site = getSiteProperties(configurations, "ams-site") |
| core_site = getSiteProperties(configurations, "core-site") |
| |
| collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) |
| recommendedDiskSpace = 10485760 |
| # TODO validate configuration for multiple AMBARI_METRICS collectors |
| if len(amsCollectorHosts) > 1: |
| pass |
| else: |
| if total_sinks_count > 2000: |
| recommendedDiskSpace = 104857600 # * 1k == 100 Gb |
| elif total_sinks_count > 500: |
| recommendedDiskSpace = 52428800 # * 1k == 50 Gb |
| elif total_sinks_count > 250: |
| recommendedDiskSpace = 20971520 # * 1k == 20 Gb |
| |
| validationItems = [] |
| |
| rootdir_item = None |
| op_mode = ams_site.get("timeline.metrics.service.operation.mode") |
| default_fs = core_site.get("fs.defaultFS") if core_site else "file:///" |
| hbase_rootdir = properties.get("hbase.rootdir") |
| hbase_tmpdir = properties.get("hbase.tmp.dir") |
| distributed = properties.get("hbase.cluster.distributed") |
| is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/")) |
| |
| if op_mode == "distributed" and is_local_root_dir: |
| rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.") |
| elif op_mode == "embedded": |
| if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"): |
| rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, " |
| "Example - file:// for localFS") |
| pass |
| |
| distributed_item = None |
| if op_mode == "distributed" and not distributed.lower() == "true": |
| distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for " |
| "distributed mode") |
| if op_mode == "embedded" and distributed.lower() == "true": |
| distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode") |
| |
| hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort") |
| zkPort = self.getZKPort(services) |
| hbase_zk_client_port_item = None |
| if distributed.lower() == "true" and op_mode == "distributed" and \ |
| hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": |
| hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort " |
| "should be the cluster zookeeper server port : {0}".format(zkPort)) |
| |
| if distributed.lower() == "false" and op_mode == "embedded" and \ |
| hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": |
| hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort " |
| "should be a different port than cluster zookeeper port." |
| "(default:61181)") |
| |
| validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item }, |
| {"config-name":'hbase.cluster.distributed', "item": distributed_item }, |
| {"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }]) |
| |
| for collectorHostName in amsCollectorHosts: |
| for host in hosts["items"]: |
| if host["Hosts"]["host_name"] == collectorHostName: |
| if op_mode == 'embedded' or is_local_root_dir: |
| validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}]) |
| validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}]) |
| validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}]) |
| |
| dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") |
| if is_local_root_dir: |
| mountPoints = [] |
| for mountPoint in host["Hosts"]["disk_info"]: |
| mountPoints.append(mountPoint["mountpoint"]) |
| hbase_rootdir_mountpoint = getMountPointForDir(hbase_rootdir, mountPoints) |
| hbase_tmpdir_mountpoint = getMountPointForDir(hbase_tmpdir, mountPoints) |
| preferred_mountpoints = self.getPreferredMountPoints(host['Hosts']) |
| # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition |
| # if multiple preferred_mountpoints exist |
| if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \ |
| len(preferred_mountpoints) > 1: |
| item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. " |
| "{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint)) |
| validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}]) |
| |
| # if METRICS_COLLECTOR is co-hosted with DATANODE |
| # cross-check dfs.datanode.data.dir and hbase.rootdir |
| # they shouldn't share same disk partition IO |
| hdfs_site = getSiteProperties(configurations, "hdfs-site") |
| dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else [] |
| if dn_hosts and collectorHostName in dn_hosts and ams_site and \ |
| dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs): |
| for dfs_datadir in dfs_datadirs: |
| dfs_datadir_mountpoint = getMountPointForDir(dfs_datadir, mountPoints) |
| if dfs_datadir_mountpoint == hbase_rootdir_mountpoint: |
| item = self.getWarnItem("Consider not using {0} partition for storing metrics data. " |
| "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint)) |
| validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}]) |
| break |
| # If no local DN in distributed mode |
| elif collectorHostName not in dn_hosts and distributed.lower() == "true": |
| item = self.getWarnItem("It's recommended to install Datanode component on {0} " |
| "to speed up IO operations between HDFS and Metrics " |
| "Collector in distributed mode ".format(collectorHostName)) |
| validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}]) |
| # Short circuit read should be enabled in distibuted mode |
| # if local DN installed |
| else: |
| validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}]) |
| |
| return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site") |
| |
| def validateStormConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| # Storm AMS integration |
| if 'AMBARI_METRICS' in servicesList and "metrics.reporter.register" in properties and \ |
| "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter" not in properties.get("metrics.reporter.register"): |
| |
| validationItems.append({"config-name": 'metrics.reporter.register', |
| "item": self.getWarnItem( |
| "Should be set to org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter to report the metrics to Ambari Metrics service.")}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "storm-site") |
| |
| def validateAmsHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| |
| ams_env = getSiteProperties(configurations, "ams-env") |
| amsHbaseSite = getSiteProperties(configurations, "ams-hbase-site") |
| validationItems = [] |
| mb = 1024 * 1024 |
| gb = 1024 * mb |
| |
| regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added |
| if regionServerItem: |
| validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}]) |
| |
| hbaseMasterHeapsizeItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize") |
| if hbaseMasterHeapsizeItem: |
| validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) |
| |
| logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", ams_env, "metrics_collector_log_dir") |
| if logDirItem: |
| validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}]) |
| |
| collector_heapsize = to_number(ams_env.get("metrics_collector_heapsize")) |
| hbase_master_heapsize = to_number(properties["hbase_master_heapsize"]) |
| hbase_master_xmn_size = to_number(properties["hbase_master_xmn_size"]) |
| hbase_regionserver_heapsize = to_number(properties["hbase_regionserver_heapsize"]) |
| hbase_regionserver_xmn_size = to_number(properties["regionserver_xmn_size"]) |
| |
| # Validate Xmn settings. |
| masterXmnItem = None |
| regionServerXmnItem = None |
| is_hbase_distributed = amsHbaseSite.get("hbase.cluster.distributed").lower() == 'true' |
| |
| if is_hbase_distributed: |
| minMasterXmn = 0.12 * hbase_master_heapsize |
| maxMasterXmn = 0.2 * hbase_master_heapsize |
| if hbase_master_xmn_size < minMasterXmn: |
| masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " |
| "(12% of hbase_master_heapsize)".format(int(ceil(minMasterXmn)))) |
| |
| if hbase_master_xmn_size > maxMasterXmn: |
| masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " |
| "(20% of hbase_master_heapsize)".format(int(floor(maxMasterXmn)))) |
| |
| minRegionServerXmn = 0.12 * hbase_regionserver_heapsize |
| maxRegionServerXmn = 0.2 * hbase_regionserver_heapsize |
| if hbase_regionserver_xmn_size < minRegionServerXmn: |
| regionServerXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " |
| "(12% of hbase_regionserver_heapsize)" |
| .format(int(ceil(minRegionServerXmn)))) |
| |
| if hbase_regionserver_xmn_size > maxRegionServerXmn: |
| regionServerXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " |
| "(20% of hbase_regionserver_heapsize)" |
| .format(int(floor(maxRegionServerXmn)))) |
| else: |
| minMasterXmn = 0.12 * (hbase_master_heapsize + hbase_regionserver_heapsize) |
| maxMasterXmn = 0.2 * (hbase_master_heapsize + hbase_regionserver_heapsize) |
| if hbase_master_xmn_size < minMasterXmn: |
| masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " |
| "(12% of hbase_master_heapsize + hbase_regionserver_heapsize)" |
| .format(int(ceil(minMasterXmn)))) |
| |
| if hbase_master_xmn_size > maxMasterXmn: |
| masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " |
| "(20% of hbase_master_heapsize + hbase_regionserver_heapsize)" |
| .format(int(floor(maxMasterXmn)))) |
| if masterXmnItem: |
| validationItems.extend([{"config-name": "hbase_master_xmn_size", "item": masterXmnItem}]) |
| |
| if regionServerXmnItem: |
| validationItems.extend([{"config-name": "regionserver_xmn_size", "item": regionServerXmnItem}]) |
| |
| if hbaseMasterHeapsizeItem is None: |
| hostMasterComponents = {} |
| |
| for service in services["services"]: |
| for component in service["components"]: |
| if component["StackServiceComponents"]["hostnames"] is not None: |
| for hostName in component["StackServiceComponents"]["hostnames"]: |
| if self.isMasterComponent(component): |
| if hostName not in hostMasterComponents.keys(): |
| hostMasterComponents[hostName] = [] |
| hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"]) |
| |
| amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") |
| for collectorHostName in amsCollectorHosts: |
| for host in hosts["items"]: |
| if host["Hosts"]["host_name"] == collectorHostName: |
| # AMS Collector co-hosted with other master components in bigger clusters |
| if len(hosts['items']) > 31 and \ |
| len(hostMasterComponents[collectorHostName]) > 2 and \ |
| host["Hosts"]["total_mem"] < 32*mb: # < 32Gb(total_mem in k) |
| masterHostMessage = "Host {0} is used by multiple master components ({1}). " \ |
| "It is recommended to use a separate host for the " \ |
| "Ambari Metrics Collector component and ensure " \ |
| "the host has sufficient memory available." |
| |
| hbaseMasterHeapsizeItem = self.getWarnItem(masterHostMessage.format( |
| collectorHostName, str(", ".join(hostMasterComponents[collectorHostName])))) |
| if hbaseMasterHeapsizeItem: |
| validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) |
| |
| # Check for unused RAM on AMS Collector node |
| hostComponents = [] |
| for service in services["services"]: |
| for component in service["components"]: |
| if component["StackServiceComponents"]["hostnames"] is not None: |
| if collectorHostName in component["StackServiceComponents"]["hostnames"]: |
| hostComponents.append(component["StackServiceComponents"]["component_name"]) |
| |
| requiredMemory = getMemorySizeRequired(hostComponents, configurations) |
| unusedMemory = host["Hosts"]["total_mem"] * 1024 - requiredMemory # in bytes |
| if unusedMemory > 4*gb: # warn user, if more than 4GB RAM is unused |
| heapPropertyToIncrease = "hbase_regionserver_heapsize" if is_hbase_distributed else "hbase_master_heapsize" |
| xmnPropertyToIncrease = "regionserver_xmn_size" if is_hbase_distributed else "hbase_master_xmn_size" |
| recommended_collector_heapsize = int((unusedMemory - 4*gb)/5) + collector_heapsize*mb |
| recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + to_number(properties.get(heapPropertyToIncrease))*mb |
| recommended_hbase_heapsize = min(32*gb, recommended_hbase_heapsize) #Make sure heapsize <= 32GB |
| recommended_xmn_size = round_to_n(0.12*recommended_hbase_heapsize/mb,128) |
| |
| if collector_heapsize < recommended_collector_heapsize or \ |
| to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize: |
| collectorHeapsizeItem = self.getWarnItem("{0} MB RAM is unused on the host {1} based on components " \ |
| "assigned. Consider allocating {2} MB to " \ |
| "metrics_collector_heapsize in ams-env, " \ |
| "{3} MB to {4} in ams-hbase-env" |
| .format(unusedMemory/mb, collectorHostName, |
| recommended_collector_heapsize/mb, |
| recommended_hbase_heapsize/mb, |
| heapPropertyToIncrease)) |
| validationItems.extend([{"config-name": heapPropertyToIncrease, "item": collectorHeapsizeItem}]) |
| |
| if to_number(properties[xmnPropertyToIncrease]) < recommended_hbase_heapsize: |
| xmnPropertyToIncreaseItem = self.getWarnItem("Consider allocating {0} MB to use up some unused memory " |
| "on host".format(recommended_xmn_size)) |
| validationItems.extend([{"config-name": xmnPropertyToIncrease, "item": xmnPropertyToIncreaseItem}]) |
| pass |
| |
| return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env") |
| |
| |
| def validateServiceConfigurations(self, serviceName): |
| return self.getServiceConfigurationValidators().get(serviceName, None) |
| |
| def toConfigurationValidationProblems(self, validationProblems, siteName): |
| result = [] |
| for validationProblem in validationProblems: |
| validationItem = validationProblem.get("item", None) |
| if validationItem is not None: |
| problem = {"type": 'configuration', "level": validationItem["level"], "message": validationItem["message"], |
| "config-type": siteName, "config-name": validationProblem["config-name"] } |
| result.append(problem) |
| return result |
| |
| def getWarnItem(self, message): |
| return {"level": "WARN", "message": message} |
| |
| def getErrorItem(self, message): |
| return {"level": "ERROR", "message": message} |
| |
| def getPreferredMountPoints(self, hostInfo): |
| |
| # '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points |
| undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts", |
| "/etc/hostname", "/tmp"] |
| undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"] |
| mountPoints = [] |
| if hostInfo and "disk_info" in hostInfo: |
| mountPointsDict = {} |
| for mountpoint in hostInfo["disk_info"]: |
| if not (mountpoint["mountpoint"] in undesirableMountPoints or |
| mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or |
| mountpoint["type"] in undesirableFsTypes or |
| mountpoint["available"] == str(0)): |
| mountPointsDict[mountpoint["mountpoint"]] = to_number(mountpoint["available"]) |
| if mountPointsDict: |
| mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True) |
| mountPoints.append("/") |
| return mountPoints |
| |
| def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo): |
| if not propertyName in properties: |
| return self.getErrorItem("Value should be set") |
| dir = properties[propertyName] |
| if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName): |
| return None |
| |
| dir = re.sub("^file://", "", dir, count=1) |
| mountPoints = [] |
| for mountPoint in hostInfo["disk_info"]: |
| mountPoints.append(mountPoint["mountpoint"]) |
| mountPoint = getMountPointForDir(dir, mountPoints) |
| |
| if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint: |
| return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName)) |
| |
| return None |
| |
| def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace): |
| if not propertyName in properties: |
| return self.getErrorItem("Value should be set") |
| dir = properties[propertyName] |
| if not dir.startswith("file://"): |
| return None |
| |
| dir = re.sub("^file://", "", dir, count=1) |
| mountPoints = {} |
| for mountPoint in hostInfo["disk_info"]: |
| mountPoints[mountPoint["mountpoint"]] = to_number(mountPoint["available"]) |
| mountPoint = getMountPointForDir(dir, mountPoints.keys()) |
| |
| if not mountPoints: |
| return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"]) |
| |
| if mountPoints[mountPoint] < reqiuredDiskSpace: |
| msg = "Ambari Metrics disk space requirements not met. \n" \ |
| "Recommended disk space for partition {0} is {1}G" |
| return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb |
| return None |
| |
| def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName): |
| if propertyName not in recommendedDefaults: |
| # If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the |
| # "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it. |
| return None |
| |
| if not propertyName in properties: |
| return self.getErrorItem("Value should be set") |
| value = to_number(properties[propertyName]) |
| if value is None: |
| return self.getErrorItem("Value should be integer") |
| defaultValue = to_number(recommendedDefaults[propertyName]) |
| if defaultValue is None: |
| return None |
| if value < defaultValue: |
| return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue)) |
| return None |
| |
| def validatorEqualsPropertyItem(self, properties1, propertyName1, |
| properties2, propertyName2, |
| emptyAllowed=False): |
| if not propertyName1 in properties1: |
| return self.getErrorItem("Value should be set for %s" % propertyName1) |
| if not propertyName2 in properties2: |
| return self.getErrorItem("Value should be set for %s" % propertyName2) |
| value1 = properties1.get(propertyName1) |
| if value1 is None and not emptyAllowed: |
| return self.getErrorItem("Empty value for %s" % propertyName1) |
| value2 = properties2.get(propertyName2) |
| if value2 is None and not emptyAllowed: |
| return self.getErrorItem("Empty value for %s" % propertyName2) |
| if value1 != value2: |
| return self.getWarnItem("It is recommended to set equal values " |
| "for properties {0} and {1}".format(propertyName1, propertyName2)) |
| |
| return None |
| |
| def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults, |
| propertyName): |
| if not propertyName in properties: |
| return self.getErrorItem("Value should be set for %s" % propertyName) |
| value = properties.get(propertyName) |
| if not propertyName in recommendedDefaults: |
| return self.getErrorItem("Value should be recommended for %s" % propertyName) |
| recommendedValue = recommendedDefaults.get(propertyName) |
| if value != recommendedValue: |
| return self.getWarnItem("It is recommended to set value {0} " |
| "for property {1}".format(recommendedValue, propertyName)) |
| return None |
| |
| def validateMinMemorySetting(self, properties, defaultValue, propertyName): |
| if not propertyName in properties: |
| return self.getErrorItem("Value should be set") |
| if defaultValue is None: |
| return self.getErrorItem("Config's default value can't be null or undefined") |
| |
| value = properties[propertyName] |
| if value is None: |
| return self.getErrorItem("Value can't be null or undefined") |
| try: |
| valueInt = to_number(value) |
| # TODO: generify for other use cases |
| defaultValueInt = int(str(defaultValue).strip()) |
| if valueInt < defaultValueInt: |
| return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue)) |
| except: |
| return None |
| |
| return None |
| |
| |
| def validateXmxValue(self, properties, recommendedDefaults, propertyName): |
| if not propertyName in properties: |
| return self.getErrorItem("Value should be set") |
| value = properties[propertyName] |
| defaultValue = recommendedDefaults[propertyName] |
| if defaultValue is None: |
| return self.getErrorItem("Config's default value can't be null or undefined") |
| if not checkXmxValueFormat(value) and checkXmxValueFormat(defaultValue): |
| # Xmx is in the default-value but not the value, should be an error |
| return self.getErrorItem('Invalid value format') |
| if not checkXmxValueFormat(defaultValue): |
| # if default value does not contain Xmx, then there is no point in validating existing value |
| return None |
| valueInt = formatXmxSizeToBytes(getXmxSize(value)) |
| defaultValueXmx = getXmxSize(defaultValue) |
| defaultValueInt = formatXmxSizeToBytes(defaultValueXmx) |
| if valueInt < defaultValueInt: |
| return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx) |
| return None |
| |
| def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ {"config-name": 'mapreduce.map.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.map.java.opts')}, |
| {"config-name": 'mapreduce.reduce.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.reduce.java.opts')}, |
| {"config-name": 'mapreduce.task.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.task.io.sort.mb')}, |
| {"config-name": 'mapreduce.map.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.map.memory.mb')}, |
| {"config-name": 'mapreduce.reduce.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.reduce.memory.mb')}, |
| {"config-name": 'yarn.app.mapreduce.am.resource.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.resource.mb')}, |
| {"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.command-opts')} ] |
| return self.toConfigurationValidationProblems(validationItems, "mapred-site") |
| |
| def validateYARNConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| clusterEnv = getSiteProperties(configurations, "cluster-env") |
| validationItems = [ {"config-name": 'yarn.nodemanager.resource.memory-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.nodemanager.resource.memory-mb')}, |
| {"config-name": 'yarn.scheduler.minimum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.minimum-allocation-mb')}, |
| {"config-name": 'yarn.nodemanager.linux-container-executor.group', "item": self.validatorEqualsPropertyItem(properties, "yarn.nodemanager.linux-container-executor.group", clusterEnv, "user_group")}, |
| {"config-name": 'yarn.scheduler.maximum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.maximum-allocation-mb')} ] |
| return self.toConfigurationValidationProblems(validationItems, "yarn-site") |
| |
| def validateHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| hbase_site = getSiteProperties(configurations, "hbase-site") |
| validationItems = [ {"config-name": 'hbase_regionserver_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_regionserver_heapsize')}, |
| {"config-name": 'hbase_master_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_master_heapsize')}, |
| {"config-name": "hbase_user", "item": self.validatorEqualsPropertyItem(properties, "hbase_user", hbase_site, "hbase.superuser")} ] |
| return self.toConfigurationValidationProblems(validationItems, "hbase-env") |
| |
| def validateHDFSConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')}, |
| {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')}, |
| {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}] |
| return self.toConfigurationValidationProblems(validationItems, "hadoop-env") |
| |
| def getMastersWithMultipleInstances(self): |
| return ['ZOOKEEPER_SERVER', 'HBASE_MASTER'] |
| |
| def getNotValuableComponents(self): |
| return ['JOURNALNODE', 'ZKFC', 'GANGLIA_MONITOR'] |
| |
| def getNotPreferableOnServerComponents(self): |
| return ['GANGLIA_SERVER', 'METRICS_COLLECTOR'] |
| |
| def getCardinalitiesDict(self): |
| return { |
| 'ZOOKEEPER_SERVER': {"min": 3}, |
| 'HBASE_MASTER': {"min": 1}, |
| } |
| |
| def getComponentLayoutSchemes(self): |
| return { |
| 'NAMENODE': {"else": 0}, |
| 'SECONDARY_NAMENODE': {"else": 1}, |
| 'HBASE_MASTER': {6: 0, 31: 2, "else": 3}, |
| |
| 'HISTORYSERVER': {31: 1, "else": 2}, |
| 'RESOURCEMANAGER': {31: 1, "else": 2}, |
| |
| 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3}, |
| |
| 'HIVE_SERVER': {6: 1, 31: 2, "else": 4}, |
| 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4}, |
| 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4}, |
| 'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5}, |
| } |
| |
| def get_system_min_uid(self): |
| login_defs = '/etc/login.defs' |
| uid_min_tag = 'UID_MIN' |
| comment_tag = '#' |
| uid_min = uid_default = '1000' |
| uid = None |
| |
| if os.path.exists(login_defs): |
| with open(login_defs, 'r') as f: |
| data = f.read().split('\n') |
| # look for uid_min_tag in file |
| uid = filter(lambda x: uid_min_tag in x, data) |
| # filter all lines, where uid_min_tag was found in comments |
| uid = filter(lambda x: x.find(comment_tag) > x.find(uid_min_tag) or x.find(comment_tag) == -1, uid) |
| |
| if uid is not None and len(uid) > 0: |
| uid = uid[0] |
| comment = uid.find(comment_tag) |
| tag = uid.find(uid_min_tag) |
| if comment == -1: |
| uid_tag = tag + len(uid_min_tag) |
| uid_min = uid[uid_tag:].strip() |
| elif comment > tag: |
| uid_tag = tag + len(uid_min_tag) |
| uid_min = uid[uid_tag:comment].strip() |
| |
| # check result for value |
| try: |
| int(uid_min) |
| except ValueError: |
| return uid_default |
| |
| return uid_min |
| |
| def mergeValidators(self, parentValidators, childValidators): |
| for service, configsDict in childValidators.iteritems(): |
| if service not in parentValidators: |
| parentValidators[service] = {} |
| parentValidators[service].update(configsDict) |
| |
| def getOldValue(self, services, configType, propertyName): |
| if services: |
| if 'changed-configurations' in services.keys(): |
| changedConfigs = services["changed-configurations"] |
| for changedConfig in changedConfigs: |
| if changedConfig["type"] == configType and changedConfig["name"]== propertyName and "old_value" in changedConfig: |
| return changedConfig["old_value"] |
| return None |
| |
| # Validation helper methods |
| def getSiteProperties(configurations, siteName): |
| siteConfig = configurations.get(siteName) |
| if siteConfig is None: |
| return None |
| return siteConfig.get("properties") |
| |
| def getServicesSiteProperties(services, siteName): |
| configurations = services.get("configurations") |
| if not configurations: |
| return None |
| siteConfig = configurations.get(siteName) |
| if siteConfig is None: |
| return None |
| return siteConfig.get("properties") |
| |
| def to_number(s): |
| try: |
| return int(re.sub("\D", "", s)) |
| except ValueError: |
| return None |
| |
| def checkXmxValueFormat(value): |
| p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?') |
| matches = p.findall(value) |
| return len(matches) == 1 |
| |
| def getXmxSize(value): |
| p = re.compile("-Xmx(\d+)(.?)") |
| result = p.findall(value)[0] |
| if len(result) > 1: |
| # result[1] - is a space or size formatter (b|k|m|g etc) |
| return result[0] + result[1].lower() |
| return result[0] |
| |
| def formatXmxSizeToBytes(value): |
| value = value.lower() |
| if len(value) == 0: |
| return 0 |
| modifier = value[-1] |
| |
| if modifier == ' ' or modifier in "0123456789": |
| modifier = 'b' |
| m = { |
| modifier == 'b': 1, |
| modifier == 'k': 1024, |
| modifier == 'm': 1024 * 1024, |
| modifier == 'g': 1024 * 1024 * 1024, |
| modifier == 't': 1024 * 1024 * 1024 * 1024, |
| modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024 |
| }[1] |
| return to_number(value) * m |
| |
| def getPort(address): |
| """ |
| Extracts port from the address like 0.0.0.0:1019 |
| """ |
| if address is None: |
| return None |
| m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address) |
| if m is not None: |
| return int(m.group(2)) |
| else: |
| return None |
| |
| def isSecurePort(port): |
| """ |
| Returns True if port is root-owned at *nix systems |
| """ |
| if port is not None: |
| return port < 1024 |
| else: |
| return False |
| |
| def getMountPointForDir(dir, mountPoints): |
| """ |
| :param dir: Directory to check, even if it doesn't exist. |
| :return: Returns the closest mount point as a string for the directory. |
| if the "dir" variable is None, will return None. |
| If the directory does not exist, will return "/". |
| """ |
| bestMountFound = None |
| if dir: |
| dir = re.sub("^file://", "", dir, count=1).strip().lower() |
| |
| # If the path is "/hadoop/hdfs/data", then possible matches for mounts could be |
| # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data". |
| # So take the one with the greatest number of segments. |
| for mountPoint in mountPoints: |
| # Ensure that the mount path and the dir path ends with "/" |
| # The mount point "/hadoop" should not match with the path "/hadoop1" |
| if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")): |
| if bestMountFound is None: |
| bestMountFound = mountPoint |
| elif os.path.join(bestMountFound, "").count(os.path.sep) < os.path.join(mountPoint, "").count(os.path.sep): |
| bestMountFound = mountPoint |
| |
| return bestMountFound |
| |
| def getHeapsizeProperties(): |
| return { "NAMENODE": [{"config-name": "hadoop-env", |
| "property": "namenode_heapsize", |
| "default": "1024m"}], |
| "DATANODE": [{"config-name": "hadoop-env", |
| "property": "dtnode_heapsize", |
| "default": "1024m"}], |
| "REGIONSERVER": [{"config-name": "hbase-env", |
| "property": "hbase_regionserver_heapsize", |
| "default": "1024m"}], |
| "HBASE_MASTER": [{"config-name": "hbase-env", |
| "property": "hbase_master_heapsize", |
| "default": "1024m"}], |
| "HIVE_CLIENT": [{"config-name": "hive-site", |
| "property": "hive.heapsize", |
| "default": "1024m"}], |
| "HISTORYSERVER": [{"config-name": "mapred-env", |
| "property": "jobhistory_heapsize", |
| "default": "1024m"}], |
| "OOZIE_SERVER": [{"config-name": "oozie-env", |
| "property": "oozie_heapsize", |
| "default": "1024m"}], |
| "RESOURCEMANAGER": [{"config-name": "yarn-env", |
| "property": "resourcemanager_heapsize", |
| "default": "1024m"}], |
| "NODEMANAGER": [{"config-name": "yarn-env", |
| "property": "nodemanager_heapsize", |
| "default": "1024m"}], |
| "APP_TIMELINE_SERVER": [{"config-name": "yarn-env", |
| "property": "apptimelineserver_heapsize", |
| "default": "1024m"}], |
| "ZOOKEEPER_SERVER": [{"config-name": "zookeeper-env", |
| "property": "zookeeper_heapsize", |
| "default": "1024m"}], |
| "METRICS_COLLECTOR": [{"config-name": "ams-hbase-env", |
| "property": "hbase_master_heapsize", |
| "default": "1024"}, |
| {"config-name": "ams-hbase-env", |
| "property": "hbase_regionserver_heapsize", |
| "default": "1024"}, |
| {"config-name": "ams-env", |
| "property": "metrics_collector_heapsize", |
| "default": "512"}], |
| } |
| |
| def getMemorySizeRequired(components, configurations): |
| totalMemoryRequired = 512*1024*1024 # 512Mb for OS needs |
| for component in components: |
| if component in getHeapsizeProperties().keys(): |
| heapSizeProperties = getHeapsizeProperties()[component] |
| for heapSizeProperty in heapSizeProperties: |
| try: |
| properties = configurations[heapSizeProperty["config-name"]]["properties"] |
| heapsize = properties[heapSizeProperty["property"]] |
| except KeyError: |
| heapsize = heapSizeProperty["default"] |
| |
| # Assume Mb if no modifier |
| if len(heapsize) > 1 and heapsize[-1] in '0123456789': |
| heapsize = str(heapsize) + "m" |
| |
| totalMemoryRequired += formatXmxSizeToBytes(heapsize) |
| |
| return totalMemoryRequired |
| |
| def round_to_n(mem_size, n=128): |
| return int(round(mem_size / float(n))) * int(n) |