| #!/usr/bin/env ambari-python-wrap |
| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| |
| import math |
| import traceback |
| |
| from ambari_commons.str_utils import string_set_equals |
| from resource_management.core.logger import Logger |
| from resource_management.core.exceptions import Fail |
| from resource_management.libraries.functions.get_bare_principal import get_bare_principal |
| |
| class HDP25StackAdvisor(HDP24StackAdvisor): |
| |
| def __init__(self): |
| super(HDP25StackAdvisor, self).__init__() |
| Logger.initialize_logger() |
| self.HIVE_INTERACTIVE_SITE = 'hive-interactive-site' |
| self.YARN_ROOT_DEFAULT_QUEUE_NAME = 'default' |
| self.AMBARI_MANAGED_LLAP_QUEUE_NAME = 'llap' |
| |
| def recommendOozieConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP25StackAdvisor,self).recommendOozieConfigurations(configurations, clusterData, services, hosts) |
| putOozieEnvProperty = self.putProperty(configurations, "oozie-env", services) |
| |
| if not "oozie-env" in services["configurations"] : |
| Logger.info("No oozie configurations available") |
| return |
| |
| falconUser = 'falcon' |
| |
| if "falcon-env" in services["configurations"] : |
| if "falcon_user" in services["configurations"]["falcon-env"]["properties"] : |
| falconUser = services["configurations"]["falcon-env"]["properties"]["falcon_user"] |
| Logger.info("Falcon user from configuration: %s " % falconUser) |
| |
| Logger.info("Falcon user : %s" % falconUser) |
| |
| oozieUser = 'oozie' |
| |
| if "oozie_user" \ |
| in services["configurations"]["oozie-env"]["properties"] : |
| oozieUser = services["configurations"]["oozie-env"]["properties"]["oozie_user"] |
| Logger.info("Oozie user from configuration %s" % oozieUser) |
| |
| Logger.info("Oozie user %s" % oozieUser) |
| |
| if "oozie_admin_users" \ |
| in services["configurations"]["oozie-env"]["properties"] : |
| currentAdminUsers = services["configurations"]["oozie-env"]["properties"]["oozie_admin_users"] |
| Logger.info("Oozie admin users from configuration %s" % currentAdminUsers) |
| else : |
| currentAdminUsers = "{0}, oozie-admin".format(oozieUser) |
| Logger.info("Setting default oozie admin users to %s" % currentAdminUsers) |
| |
| |
| if falconUser in currentAdminUsers : |
| Logger.info("Falcon user %s already member of oozie admin users " % falconUser) |
| return |
| |
| newAdminUsers = "{0},{1}".format(currentAdminUsers, falconUser) |
| |
| Logger.info("new oozie admin users : %s" % newAdminUsers) |
| |
| services["forced-configurations"].append({"type" : "oozie-env", "name" : "oozie_admin_users"}) |
| putOozieEnvProperty("oozie_admin_users", newAdminUsers) |
| |
| def createComponentLayoutRecommendations(self, services, hosts): |
| parentComponentLayoutRecommendations = super(HDP25StackAdvisor, self).createComponentLayoutRecommendations( |
| services, hosts) |
| return parentComponentLayoutRecommendations |
| |
| def getComponentLayoutValidations(self, services, hosts): |
| parentItems = super(HDP25StackAdvisor, self).getComponentLayoutValidations(services, hosts) |
| childItems = [] |
| |
| hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") |
| if len(hsi_hosts) > 1: |
| message = "Only one host can install HIVE_SERVER_INTERACTIVE. " |
| childItems.append( |
| {"type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'HIVE_SERVER_INTERACTIVE'}) |
| |
| parentItems.extend(childItems) |
| return parentItems |
| |
| def getServiceConfigurationValidators(self): |
| parentValidators = super(HDP25StackAdvisor, self).getServiceConfigurationValidators() |
| childValidators = { |
| "ATLAS": {"application-properties": self.validateAtlasConfigurations}, |
| "HIVE": {"hive-interactive-env": self.validateHiveInteractiveEnvConfigurations, |
| "hive-interactive-site": self.validateHiveInteractiveSiteConfigurations}, |
| "YARN": {"yarn-site": self.validateYarnConfigurations}, |
| "RANGER": {"ranger-tagsync-site": self.validateRangerTagsyncConfigurations}, |
| "SPARK2": {"spark2-defaults": self.validateSpark2Defaults, |
| "spark2-thrift-sparkconf": self.validateSpark2ThriftSparkConf} |
| } |
| self.mergeValidators(parentValidators, childValidators) |
| return parentValidators |
| |
| def validateAtlasConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| application_properties = getSiteProperties(configurations, "application-properties") |
| validationItems = [] |
| |
| #<editor-fold desc="LDAP and AD"> |
| auth_type = application_properties['atlas.authentication.method.ldap.type'] |
| Logger.info("Validating Atlas configs, authentication type: %s" % str(auth_type)) |
| |
| # Required props |
| ldap_props = {"atlas.authentication.method.ldap.url": "", |
| "atlas.authentication.method.ldap.userDNpattern": "uid=", |
| "atlas.authentication.method.ldap.groupSearchBase": "", |
| "atlas.authentication.method.ldap.groupSearchFilter": "", |
| "atlas.authentication.method.ldap.groupRoleAttribute": "cn", |
| "atlas.authentication.method.ldap.base.dn": "", |
| "atlas.authentication.method.ldap.bind.dn": "", |
| "atlas.authentication.method.ldap.bind.password": "", |
| "atlas.authentication.method.ldap.user.searchfilter": "" |
| } |
| ad_props = {"atlas.authentication.method.ldap.ad.domain": "", |
| "atlas.authentication.method.ldap.ad.url": "", |
| "atlas.authentication.method.ldap.ad.base.dn": "", |
| "atlas.authentication.method.ldap.ad.bind.dn": "", |
| "atlas.authentication.method.ldap.ad.bind.password": "", |
| "atlas.authentication.method.ldap.ad.user.searchfilter": "(sAMAccountName={0})" |
| } |
| |
| props_to_require = set() |
| if auth_type.lower() == "ldap": |
| props_to_require = set(ldap_props.keys()) |
| elif auth_type.lower() == "ad": |
| props_to_require = set(ad_props.keys()) |
| elif auth_type.lower() == "none": |
| pass |
| |
| for prop in props_to_require: |
| if prop not in application_properties or application_properties[prop] is None or application_properties[prop].strip() == "": |
| validationItems.append({"config-name": prop, |
| "item": self.getErrorItem("If authentication type is %s, this property is required." % auth_type)}) |
| #</editor-fold> |
| |
| if application_properties['atlas.graph.index.search.backend'] == 'solr5' and \ |
| not application_properties['atlas.graph.index.search.solr.zookeeper-url']: |
| validationItems.append({"config-name": "atlas.graph.index.search.solr.zookeeper-url", |
| "item": self.getErrorItem( |
| "If AMBARI_INFRA is not installed then the SOLR zookeeper url configuration must be specified.")}) |
| |
| if not application_properties['atlas.kafka.bootstrap.servers']: |
| validationItems.append({"config-name": "atlas.kafka.bootstrap.servers", |
| "item": self.getErrorItem( |
| "If KAFKA is not installed then the Kafka bootstrap servers configuration must be specified.")}) |
| |
| if not application_properties['atlas.kafka.zookeeper.connect']: |
| validationItems.append({"config-name": "atlas.kafka.zookeeper.connect", |
| "item": self.getErrorItem( |
| "If KAFKA is not installed then the Kafka zookeeper quorum configuration must be specified.")}) |
| |
| if application_properties['atlas.graph.storage.backend'] == 'hbase' and 'hbase-site' in services['configurations']: |
| hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum'] |
| |
| if not application_properties['atlas.graph.storage.hostname']: |
| validationItems.append({"config-name": "atlas.graph.storage.hostname", |
| "item": self.getErrorItem( |
| "If HBASE is not installed then the hbase zookeeper quorum configuration must be specified.")}) |
| elif string_set_equals(application_properties['atlas.graph.storage.hostname'], hbase_zookeeper_quorum): |
| validationItems.append({"config-name": "atlas.graph.storage.hostname", |
| "item": self.getWarnItem( |
| "Atlas is configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")}) |
| |
| if not application_properties['atlas.audit.hbase.zookeeper.quorum']: |
| validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum", |
| "item": self.getErrorItem( |
| "If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")}) |
| |
| elif application_properties['atlas.graph.storage.backend'] == 'hbase' and 'hbase-site' not in services[ |
| 'configurations']: |
| if not application_properties['atlas.graph.storage.hostname']: |
| validationItems.append({"config-name": "atlas.graph.storage.hostname", |
| "item": self.getErrorItem( |
| "Atlas is not configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")}) |
| if not application_properties['atlas.audit.hbase.zookeeper.quorum']: |
| validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum", |
| "item": self.getErrorItem( |
| "If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")}) |
| |
| validationProblems = self.toConfigurationValidationProblems(validationItems, "application-properties") |
| return validationProblems |
| |
| def validateSpark2Defaults(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ |
| { |
| "config-name": 'spark.yarn.queue', |
| "item": self.validatorYarnQueue(properties, recommendedDefaults, 'spark.yarn.queue', services) |
| } |
| ] |
| return self.toConfigurationValidationProblems(validationItems, "spark2-defaults") |
| |
| def validateSpark2ThriftSparkConf(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ |
| { |
| "config-name": 'spark.yarn.queue', |
| "item": self.validatorYarnQueue(properties, recommendedDefaults, 'spark.yarn.queue', services) |
| } |
| ] |
| return self.toConfigurationValidationProblems(validationItems, "spark2-thrift-sparkconf") |
| |
| def validateYarnConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| parentValidationProblems = super(HDP25StackAdvisor, self).validateYARNConfigurations(properties, recommendedDefaults, configurations, services, hosts) |
| yarn_site_properties = getSiteProperties(configurations, "yarn-site") |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| componentsListList = [service["components"] for service in services["services"]] |
| componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist] |
| validationItems = [] |
| |
| hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") |
| if len(hsi_hosts) > 0: |
| # HIVE_SERVER_INTERACTIVE is mapped to a host |
| if 'yarn.resourcemanager.work-preserving-recovery.enabled' not in yarn_site_properties or \ |
| 'true' != yarn_site_properties['yarn.resourcemanager.work-preserving-recovery.enabled']: |
| validationItems.append({"config-name": "yarn.resourcemanager.work-preserving-recovery.enabled", |
| "item": self.getWarnItem( |
| "While enabling HIVE_SERVER_INTERACTIVE it is recommended that you enable work preserving restart in YARN.")}) |
| |
| validationProblems = self.toConfigurationValidationProblems(validationItems, "yarn-site") |
| validationProblems.extend(parentValidationProblems) |
| return validationProblems |
| |
| """ |
| Does the following validation checks for HIVE_SERVER_INTERACTIVE's hive-interactive-site configs. |
| 1. Queue selected in 'hive.llap.daemon.queue.name' config should be sized >= to minimum required to run LLAP |
| and Hive2 app. |
| 2. Queue selected in 'hive.llap.daemon.queue.name' config state should not be 'STOPPED'. |
| 3. 'hive.server2.enable.doAs' config should be set to 'false' for Hive2. |
| 4. 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) should not consume more that 50% of selected queue for LLAP. |
| 5. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB. |
| """ |
| def validateHiveInteractiveSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") |
| curr_selected_queue_for_llap = None |
| curr_selected_queue_for_llap_cap_perc = None |
| MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS = 512 |
| current_selected_queue_for_llap_cap = None |
| |
| if len(hsi_hosts) > 0: |
| # Get total cluster capacity |
| node_manager_host_list = self.get_node_manager_hosts(services, hosts) |
| node_manager_cnt = len(node_manager_host_list) |
| yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) |
| total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb |
| |
| capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) |
| if capacity_scheduler_properties: |
| if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ |
| 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: |
| curr_selected_queue_for_llap = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] |
| if curr_selected_queue_for_llap: |
| current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, |
| curr_selected_queue_for_llap, total_cluster_capacity) |
| if current_selected_queue_for_llap_cap: |
| curr_selected_queue_for_llap_cap_perc = int(current_selected_queue_for_llap_cap * 100 / total_cluster_capacity) |
| min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations) |
| |
| # Validate that the selected queue in 'hive.llap.daemon.queue.name' should be sized >= to minimum required |
| # to run LLAP and Hive2 app. |
| if curr_selected_queue_for_llap_cap_perc < min_reqd_queue_cap_perc: |
| errMsg1 = "Selected queue '{0}' capacity ({1}%) is less than minimum required capacity ({2}%) for LLAP " \ |
| "app to run".format(curr_selected_queue_for_llap, curr_selected_queue_for_llap_cap_perc, min_reqd_queue_cap_perc) |
| validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg1)}) |
| else: |
| Logger.error("Couldn't retrieve '{0}' queue's capacity from 'capacity-scheduler' while doing validation checks for " |
| "Hive Server Interactive.".format(curr_selected_queue_for_llap)) |
| |
| # Validate that current selected queue in 'hive.llap.daemon.queue.name' state is not STOPPED. |
| llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, curr_selected_queue_for_llap) |
| if llap_selected_queue_state: |
| if llap_selected_queue_state == "STOPPED": |
| errMsg2 = "Selected queue '{0}' current state is : '{1}'. It is required to be in 'RUNNING' state for LLAP to run"\ |
| .format(curr_selected_queue_for_llap, llap_selected_queue_state) |
| validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg2)}) |
| else: |
| Logger.error("Couldn't retrieve '{0}' queue's state from 'capacity-scheduler' while doing validation checks for " |
| "Hive Server Interactive.".format(curr_selected_queue_for_llap)) |
| else: |
| Logger.error("Couldn't retrieve current selection for 'hive.llap.daemon.queue.name' while doing validation " |
| "checks for Hive Server Interactive.") |
| else: |
| Logger.error("Couldn't retrieve 'hive.llap.daemon.queue.name' config from 'hive-interactive-site' while doing " |
| "validation checks for Hive Server Interactive.") |
| pass |
| else: |
| Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing validation checks for Hive Server Interactive.") |
| pass |
| |
| if self.HIVE_INTERACTIVE_SITE in services['configurations']: |
| # Validate that 'hive.server2.enable.doAs' config is not set to 'true' for Hive2. |
| if 'hive.server2.enable.doAs' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: |
| hive2_enable_do_as = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.server2.enable.doAs'] |
| if hive2_enable_do_as == 'true': |
| validationItems.append({"config-name": "hive.server2.enable.doAs","item": self.getErrorItem("Value should be set to 'false' for Hive2.")}) |
| |
| # Validate that 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) is not consuming more that |
| # 50% of selected queue for LLAP. |
| if current_selected_queue_for_llap_cap and 'hive.server2.tez.sessions.per.default.queue' in \ |
| services['configurations']['hive-interactive-site']['properties']: |
| num_tez_sessions = services['configurations']['hive-interactive-site']['properties']['hive.server2.tez.sessions.per.default.queue'] |
| if num_tez_sessions: |
| num_tez_sessions = long(num_tez_sessions) |
| yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) |
| tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity)) |
| normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) |
| llap_selected_queue_cap_remaining = current_selected_queue_for_llap_cap - (normalized_tez_am_container_size * num_tez_sessions) |
| if llap_selected_queue_cap_remaining <= current_selected_queue_for_llap_cap/2: |
| errMsg3 = " Reducing the 'Maximum Total Concurrent Queries' (value: {0}) is advisable as it is consuming more than 50% of " \ |
| "'{1}' queue for LLAP.".format(num_tez_sessions, curr_selected_queue_for_llap) |
| validationItems.append({"config-name": "hive.server2.tez.sessions.per.default.queue","item": self.getWarnItem(errMsg3)}) |
| |
| # Validate that 'remaining available capacity' in cluster is atleast 512 MB, after 'llap' queue is selected, |
| # in order to run Service Checks. |
| if curr_selected_queue_for_llap and curr_selected_queue_for_llap_cap_perc and \ |
| curr_selected_queue_for_llap == self.AMBARI_MANAGED_LLAP_QUEUE_NAME: |
| curr_selected_queue_for_llap_cap = float(curr_selected_queue_for_llap_cap_perc) / 100 * total_cluster_capacity |
| available_cap_in_cluster = total_cluster_capacity - curr_selected_queue_for_llap_cap |
| if available_cap_in_cluster < MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS: |
| errMsg4 = "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \ |
| "({2}) in cluster is less than 512 MB.".format(self.AMBARI_MANAGED_LLAP_QUEUE_NAME, curr_selected_queue_for_llap_cap, available_cap_in_cluster) |
| validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg4)}) |
| |
| validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-site") |
| return validationProblems |
| |
| def validateHiveInteractiveEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| hive_site_env_properties = getSiteProperties(configurations, "hive-interactive-env") |
| validationItems = [] |
| hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") |
| if len(hsi_hosts) > 0: |
| # HIVE_SERVER_INTERACTIVE is mapped to a host |
| if 'enable_hive_interactive' not in hive_site_env_properties or ( |
| 'enable_hive_interactive' in hive_site_env_properties and hive_site_env_properties[ |
| 'enable_hive_interactive'].lower() != 'true'): |
| validationItems.append({"config-name": "enable_hive_interactive", |
| "item": self.getErrorItem( |
| "HIVE_SERVER_INTERACTIVE requires enable_hive_interactive in hive-interactive-env set to true.")}) |
| if 'hive_server_interactive_host' in hive_site_env_properties: |
| hsi_host = hsi_hosts[0] |
| if hive_site_env_properties['hive_server_interactive_host'].lower() != hsi_host.lower(): |
| validationItems.append({"config-name": "hive_server_interactive_host", |
| "item": self.getErrorItem( |
| "HIVE_SERVER_INTERACTIVE requires hive_server_interactive_host in hive-interactive-env set to its host name.")}) |
| pass |
| if 'hive_server_interactive_host' not in hive_site_env_properties: |
| validationItems.append({"config-name": "hive_server_interactive_host", |
| "item": self.getErrorItem( |
| "HIVE_SERVER_INTERACTIVE requires hive_server_interactive_host in hive-interactive-env set to its host name.")}) |
| pass |
| |
| else: |
| # no HIVE_SERVER_INTERACTIVE |
| if 'enable_hive_interactive' in hive_site_env_properties and hive_site_env_properties[ |
| 'enable_hive_interactive'].lower() != 'false': |
| validationItems.append({"config-name": "enable_hive_interactive", |
| "item": self.getErrorItem( |
| "enable_hive_interactive in hive-interactive-env should be set to false.")}) |
| pass |
| pass |
| |
| validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-env") |
| return validationProblems |
| |
| def getServiceConfigurationRecommenderDict(self): |
| parentRecommendConfDict = super(HDP25StackAdvisor, self).getServiceConfigurationRecommenderDict() |
| childRecommendConfDict = { |
| "RANGER": self.recommendRangerConfigurations, |
| "HBASE": self.recommendHBASEConfigurations, |
| "HIVE": self.recommendHIVEConfigurations, |
| "ATLAS": self.recommendAtlasConfigurations, |
| "RANGER_KMS": self.recommendRangerKMSConfigurations, |
| "STORM": self.recommendStormConfigurations, |
| "OOZIE": self.recommendOozieConfigurations, |
| "SPARK2": self.recommendSpark2Configurations |
| } |
| parentRecommendConfDict.update(childRecommendConfDict) |
| return parentRecommendConfDict |
| |
| def recommendSpark2Configurations(self, configurations, clusterData, services, hosts): |
| """ |
| :type configurations dict |
| :type clusterData dict |
| :type services dict |
| :type hosts dict |
| """ |
| putSparkProperty = self.putProperty(configurations, "spark2-defaults", services) |
| putSparkThriftSparkConf = self.putProperty(configurations, "spark2-thrift-sparkconf", services) |
| |
| spark_queue = self.recommendYarnQueue(services, "spark2-defaults", "spark.yarn.queue") |
| if spark_queue is not None: |
| putSparkProperty("spark.yarn.queue", spark_queue) |
| |
| spart_thrift_queue = self.recommendYarnQueue(services, "spark2-thrift-sparkconf", "spark.yarn.queue") |
| if spart_thrift_queue is not None: |
| putSparkThriftSparkConf("spark.yarn.queue", spart_thrift_queue) |
| |
| def recommendStormConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP25StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts) |
| storm_site = getServicesSiteProperties(services, "storm-site") |
| putStormSiteProperty = self.putProperty(configurations, "storm-site", services) |
| putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site") |
| security_enabled = (storm_site is not None and "storm.zookeeper.superACL" in storm_site) |
| |
| if security_enabled: |
| _storm_principal_name = services['configurations']['storm-env']['properties']['storm_principal_name'] |
| storm_bare_jaas_principal = get_bare_principal(_storm_principal_name) |
| if 'nimbus.impersonation.acl' in storm_site: |
| storm_nimbus_impersonation_acl = storm_site["nimbus.impersonation.acl"] |
| storm_nimbus_impersonation_acl.replace('{{storm_bare_jaas_principal}}', storm_bare_jaas_principal) |
| putStormSiteProperty('nimbus.impersonation.acl', storm_nimbus_impersonation_acl) |
| rangerPluginEnabled = '' |
| if 'ranger-storm-plugin-properties' in configurations and 'ranger-storm-plugin-enabled' in configurations['ranger-storm-plugin-properties']['properties']: |
| rangerPluginEnabled = configurations['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] |
| elif 'ranger-storm-plugin-properties' in services['configurations'] and 'ranger-storm-plugin-enabled' in services['configurations']['ranger-storm-plugin-properties']['properties']: |
| rangerPluginEnabled = services['configurations']['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] |
| |
| storm_authorizer_class = 'org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer' |
| ranger_authorizer_class = 'org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer' |
| # Cluster is kerberized |
| if security_enabled: |
| if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): |
| putStormSiteProperty('nimbus.authorizer',ranger_authorizer_class) |
| elif rangerPluginEnabled and (rangerPluginEnabled.lower() == 'No'.lower()) and (services["configurations"]["storm-site"]["properties"]["nimbus.authorizer"] == ranger_authorizer_class): |
| putStormSiteProperty('nimbus.authorizer', storm_authorizer_class) |
| else: |
| putStormSiteAttributes('nimbus.authorizer', 'delete', 'true') |
| |
| def constructAtlasRestAddress(self, services, hosts): |
| """ |
| :param services: Collection of services in the cluster with configs |
| :param hosts: Collection of hosts in the cluster |
| :return: The suggested property for atlas.rest.address if it is valid, otherwise, None |
| """ |
| atlas_rest_address = None |
| services_list = [service["StackServices"]["service_name"] for service in services["services"]] |
| is_atlas_in_cluster = "ATLAS" in services_list |
| |
| atlas_server_hosts_info = self.getHostsWithComponent("ATLAS", "ATLAS_SERVER", services, hosts) |
| if is_atlas_in_cluster and atlas_server_hosts_info and len(atlas_server_hosts_info) > 0: |
| # Multiple Atlas Servers can exist, so sort by hostname to create deterministic csv |
| atlas_host_names = [e['Hosts']['host_name'] for e in atlas_server_hosts_info] |
| if len(atlas_host_names) > 1: |
| atlas_host_names = sorted(atlas_host_names) |
| |
| scheme = "http" |
| metadata_port = "21000" |
| atlas_server_default_https_port = "21443" |
| tls_enabled = "false" |
| if 'application-properties' in services['configurations']: |
| if 'atlas.enableTLS' in services['configurations']['application-properties']['properties']: |
| tls_enabled = services['configurations']['application-properties']['properties']['atlas.enableTLS'] |
| if 'atlas.server.http.port' in services['configurations']['application-properties']['properties']: |
| metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.http.port']) |
| |
| if str(tls_enabled).lower() == "true": |
| scheme = "https" |
| if 'atlas.server.https.port' in services['configurations']['application-properties']['properties']: |
| metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.https.port']) |
| else: |
| metadata_port = atlas_server_default_https_port |
| |
| atlas_rest_address_list = ["{0}://{1}:{2}".format(scheme, hostname, metadata_port) for hostname in atlas_host_names] |
| atlas_rest_address = ",".join(atlas_rest_address_list) |
| Logger.info("Constructing atlas.rest.address=%s" % atlas_rest_address) |
| return atlas_rest_address |
| |
| def recommendAtlasConfigurations(self, configurations, clusterData, services, hosts): |
| putAtlasApplicationProperty = self.putProperty(configurations, "application-properties", services) |
| putAtlasRangerPluginProperty = self.putProperty(configurations, "ranger-atlas-plugin-properties", services) |
| putAtlasEnvProperty = self.putProperty(configurations, "atlas-env", services) |
| |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| |
| # Generate atlas.rest.address since the value is always computed |
| atlas_rest_address = self.constructAtlasRestAddress(services, hosts) |
| if atlas_rest_address is not None: |
| putAtlasApplicationProperty("atlas.rest.address", atlas_rest_address) |
| |
| if "AMBARI_INFRA" in servicesList and 'infra-solr-env' in services['configurations']: |
| if 'infra_solr_znode' in services['configurations']['infra-solr-env']['properties']: |
| infra_solr_znode = services['configurations']['infra-solr-env']['properties']['infra_solr_znode'] |
| else: |
| infra_solr_znode = None |
| |
| zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services) |
| zookeeper_host_arr = [] |
| |
| zookeeper_port = self.getZKPort(services) |
| for i in range(len(zookeeper_hosts)): |
| zookeeper_host = zookeeper_hosts[i] + ':' + zookeeper_port |
| if infra_solr_znode is not None: |
| zookeeper_host += infra_solr_znode |
| zookeeper_host_arr.append(zookeeper_host) |
| |
| solr_zookeeper_url = ",".join(zookeeper_host_arr) |
| |
| putAtlasApplicationProperty('atlas.graph.index.search.solr.zookeeper-url', solr_zookeeper_url) |
| else: |
| putAtlasApplicationProperty('atlas.graph.index.search.solr.zookeeper-url', "") |
| |
| # Kafka section |
| if "KAFKA" in servicesList and 'kafka-broker' in services['configurations']: |
| kafka_hosts = self.getHostNamesWithComponent("KAFKA", "KAFKA_BROKER", services) |
| |
| if 'port' in services['configurations']['kafka-broker']['properties']: |
| kafka_broker_port = services['configurations']['kafka-broker']['properties']['port'] |
| else: |
| kafka_broker_port = '6667' |
| |
| kafka_host_arr = [] |
| for i in range(len(kafka_hosts)): |
| kafka_host_arr.append(kafka_hosts[i] + ':' + kafka_broker_port) |
| |
| kafka_bootstrap_servers = ",".join(kafka_host_arr) |
| |
| if 'zookeeper.connect' in services['configurations']['kafka-broker']['properties']: |
| kafka_zookeeper_connect = services['configurations']['kafka-broker']['properties']['zookeeper.connect'] |
| else: |
| kafka_zookeeper_connect = None |
| |
| putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', kafka_bootstrap_servers) |
| putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', kafka_zookeeper_connect) |
| else: |
| putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', "") |
| putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', "") |
| |
| if "HBASE" in servicesList and 'hbase-site' in services['configurations']: |
| if 'hbase.zookeeper.quorum' in services['configurations']['hbase-site']['properties']: |
| hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum'] |
| else: |
| hbase_zookeeper_quorum = "" |
| |
| putAtlasApplicationProperty('atlas.graph.storage.hostname', hbase_zookeeper_quorum) |
| putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', hbase_zookeeper_quorum) |
| else: |
| putAtlasApplicationProperty('atlas.graph.storage.hostname', "") |
| putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', "") |
| |
| if "ranger-env" in services["configurations"] and "ranger-atlas-plugin-properties" in services["configurations"] and \ |
| "ranger-atlas-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| ranger_atlas_plugin_enabled = services["configurations"]["ranger-env"]["properties"]["ranger-atlas-plugin-enabled"] |
| putAtlasRangerPluginProperty('ranger-atlas-plugin-enabled', ranger_atlas_plugin_enabled) |
| |
| ranger_atlas_plugin_enabled = '' |
| if 'ranger-atlas-plugin-properties' in configurations and 'ranger-atlas-plugin-enabled' in configurations['ranger-atlas-plugin-properties']['properties']: |
| ranger_atlas_plugin_enabled = configurations['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled'] |
| elif 'ranger-atlas-plugin-properties' in services['configurations'] and 'ranger-atlas-plugin-enabled' in services['configurations']['ranger-atlas-plugin-properties']['properties']: |
| ranger_atlas_plugin_enabled = services['configurations']['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled'] |
| |
| if ranger_atlas_plugin_enabled and (ranger_atlas_plugin_enabled.lower() == 'Yes'.lower()): |
| putAtlasApplicationProperty('atlas.authorizer.impl','ranger') |
| else: |
| putAtlasApplicationProperty('atlas.authorizer.impl','simple') |
| |
| #atlas server memory settings |
| if 'atlas-env' in services['configurations']: |
| atlas_server_metadata_size = 50000 |
| if 'atlas_server_metadata_size' in services['configurations']['atlas-env']['properties']: |
| atlas_server_metadata_size = int(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size']) |
| |
| atlas_server_xmx = 2048 |
| |
| if 300000 <= atlas_server_metadata_size < 500000: |
| atlas_server_xmx = 1024*5 |
| if 500000 <= atlas_server_metadata_size < 1000000: |
| atlas_server_xmx = 1024*10 |
| if atlas_server_metadata_size >= 1000000: |
| atlas_server_xmx = 1024*16 |
| |
| atlas_server_max_new_size = (atlas_server_xmx / 100) * 30 |
| |
| putAtlasEnvProperty("atlas_server_xmx", atlas_server_xmx) |
| putAtlasEnvProperty("atlas_server_max_new_size", atlas_server_max_new_size) |
| |
| def recommendHBASEConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP25StackAdvisor, self).recommendHBASEConfigurations(configurations, clusterData, services, hosts) |
| putHbaseSiteProperty = self.putProperty(configurations, "hbase-site", services) |
| appendCoreSiteProperty = self.updateProperty(configurations, "core-site", services) |
| |
| if "cluster-env" in services["configurations"] \ |
| and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ |
| and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true": |
| # Set the master's UI to readonly |
| putHbaseSiteProperty('hbase.master.ui.readonly', 'true') |
| |
| phoenix_query_server_hosts = self.get_phoenix_query_server_hosts(services, hosts) |
| Logger.debug("Calculated Phoenix Query Server hosts: %s" % str(phoenix_query_server_hosts)) |
| if phoenix_query_server_hosts: |
| Logger.debug("Attempting to update hadoop.proxyuser.HTTP.hosts with %s" % str(phoenix_query_server_hosts)) |
| # The PQS hosts we want to ensure are set |
| new_value = ','.join(phoenix_query_server_hosts) |
| # Compute the unique set of hosts for the property |
| def updateCallback(originalValue, newValue): |
| Logger.debug("Original hadoop.proxyuser.HTTP.hosts value %s, appending %s" % (originalValue, newValue)) |
| # Only update the original value if it's not whitespace only |
| if originalValue and not originalValue.isspace(): |
| hosts = originalValue.split(',') |
| # Add in the new hosts if we have some |
| if newValue and not newValue.isspace(): |
| hosts.extend(newValue.split(',')) |
| # Return the combined (uniqued) list of hosts |
| result = ','.join(set(hosts)) |
| Logger.debug("Setting final to %s" % result) |
| return result |
| else: |
| Logger.debug("Setting final value to %s" % newValue) |
| return newValue |
| # Update the proxyuser setting, deferring to out callback to merge results together |
| appendCoreSiteProperty('hadoop.proxyuser.HTTP.hosts', new_value, updateCallback) |
| else: |
| Logger.debug("No phoenix query server hosts to update") |
| else: |
| putHbaseSiteProperty('hbase.master.ui.readonly', 'false') |
| |
| """ |
| Returns the list of Phoenix Query Server host names, or None. |
| """ |
| def get_phoenix_query_server_hosts(self, services, hosts): |
| if len(hosts['items']) > 0: |
| phoenix_query_server_hosts = self.getHostsWithComponent("HBASE", "PHOENIX_QUERY_SERVER", services, hosts) |
| if phoenix_query_server_hosts is None: |
| return [] |
| return [host['Hosts']['host_name'] for host in phoenix_query_server_hosts] |
| |
| def recommendHIVEConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP25StackAdvisor, self).recommendHIVEConfigurations(configurations, clusterData, services, hosts) |
| putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) |
| putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) |
| putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") |
| |
| # For 'Hive Server Interactive', if the component exists. |
| hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") |
| if len(hsi_hosts) > 0: |
| hsi_host = hsi_hosts[0] |
| putHiveInteractiveEnvProperty('enable_hive_interactive', 'true') |
| putHiveInteractiveEnvProperty('hive_server_interactive_host', hsi_host) |
| |
| # Update 'hive.llap.daemon.queue.name' property attributes if capacity scheduler is changed. |
| if self.HIVE_INTERACTIVE_SITE in services['configurations']: |
| if 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: |
| self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations) |
| |
| # Update 'hive.server2.tez.default.queues' value |
| hive_tez_default_queue = None |
| if 'hive-interactive-site' in configurations and \ |
| 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']: |
| hive_tez_default_queue = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] |
| Logger.info("'hive.llap.daemon.queue.name' value from configurations : '{0}'".format(hive_tez_default_queue)) |
| if not hive_tez_default_queue: |
| hive_tez_default_queue = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] |
| Logger.info("'hive.llap.daemon.queue.name' value from services : '{0}'".format(hive_tez_default_queue)) |
| if hive_tez_default_queue: |
| putHiveInteractiveSiteProperty("hive.server2.tez.default.queues", hive_tez_default_queue) |
| Logger.info("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue)) |
| else: |
| putHiveInteractiveEnvProperty('enable_hive_interactive', 'false') |
| putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") |
| |
| if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ |
| 'hive.llap.zk.sm.connectionString' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: |
| # Fill the property 'hive.llap.zk.sm.connectionString' required by Hive Server Interactive (HiveServer2) |
| zookeeper_host_port = self.getZKHostPortString(services) |
| if zookeeper_host_port: |
| putHiveInteractiveSiteProperty("hive.llap.zk.sm.connectionString", zookeeper_host_port) |
| pass |
| |
| def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP25StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts) |
| |
| # Queue 'llap' creation/removal logic (Used by Hive Interactive server and associated LLAP) |
| if 'hive-interactive-env' in services['configurations'] and \ |
| 'enable_hive_interactive' in services['configurations']['hive-interactive-env']['properties']: |
| enable_hive_interactive = services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive'] |
| LLAP_QUEUE_NAME = 'llap' |
| |
| # Hive Server interactive is already added or getting added |
| if enable_hive_interactive == 'true': |
| self.checkAndManageLlapQueue(services, configurations, hosts, LLAP_QUEUE_NAME) |
| self.updateLlapConfigs(configurations, services, hosts, LLAP_QUEUE_NAME) |
| else: # When Hive Interactive Server is in 'off/removed' state. |
| self.checkAndStopLlapQueue(services, configurations, LLAP_QUEUE_NAME) |
| |
| putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services) |
| stack_root = "/usr/hdp" |
| if "cluster-env" in services["configurations"] and "stack_root" in services["configurations"]["cluster-env"]["properties"]: |
| stack_root = services["configurations"]["cluster-env"]["properties"]["stack_root"] |
| |
| timeline_plugin_classes_values = [] |
| timeline_plugin_classpath_values = [] |
| |
| if self.__isServiceDeployed(services, "TEZ"): |
| timeline_plugin_classes_values.append('org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl') |
| |
| if self.__isServiceDeployed(services, "SPARK"): |
| timeline_plugin_classes_values.append('org.apache.spark.deploy.history.yarn.plugin.SparkATSPlugin') |
| timeline_plugin_classpath_values.append(stack_root + "/${hdp.version}/spark/hdpLib/*") |
| |
| putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', ",".join(timeline_plugin_classes_values)) |
| putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath', ":".join(timeline_plugin_classpath_values)) |
| |
| """ |
| Entry point for updating Hive's 'LLAP app' configs namely : (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb |
| (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb, |
| and (7). hive.server2.tez.sessions.per.default.queue |
| |
| The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following: |
| (1). 'enable_hive_interactive' set to 'true' (2). 'llap_queue_capacity' (3). 'hive.server2.tez.sessions.per.default.queue' |
| (4). Change in queue selection for config 'hive.llap.daemon.queue.name'. |
| |
| If change in value for 'llap_queue_capacity' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config |
| value is not calulated, but read and use in calculation for dependent configs. |
| """ |
| def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name): |
| putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) |
| putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE) |
| |
| putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) |
| putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") |
| |
| putTezInteractiveSiteProperty = self.putProperty(configurations, "tez-interactive-site", services) |
| |
| llap_daemon_selected_queue_name = None |
| llap_queue_selected_in_current_call = None |
| LLAP_MAX_CONCURRENCY = 32 # Allow a max of 32 concurrency. |
| |
| # Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility. |
| self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations) |
| |
| if not services["changed-configurations"]: |
| read_llap_daemon_yarn_cont_mb = long(self.get_yarn_min_container_size(services, configurations)) |
| putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', read_llap_daemon_yarn_cont_mb) |
| # initial memory setting to make sure hive.llap.daemon.yarn.container.mb >= yarn.scheduler.minimum-allocation-mb |
| Logger.info("Adjusted 'hive.llap.daemon.yarn.container.mb' to yarn min container size as initial size " |
| "(" + str(self.get_yarn_min_container_size(services, configurations)) + " MB).") |
| |
| try: |
| if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ |
| 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: |
| llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] |
| |
| if 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']: |
| llap_queue_selected_in_current_call = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] |
| |
| # Update Visibility of 'llap_queue_capacity' slider. |
| capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) |
| if capacity_scheduler_properties: |
| # Get all leaf queues. |
| leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) |
| if len(leafQueueNames) == 2 and \ |
| (llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == llap_queue_name) or \ |
| (llap_queue_selected_in_current_call != None and llap_queue_selected_in_current_call == llap_queue_name): |
| putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "true") |
| Logger.info("Selected YARN queue is '{0}'. Setting LLAP queue capacity slider visibility to 'True'".format(llap_queue_name)) |
| else: |
| putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") |
| Logger.info("Queue selected for LLAP app is : '{0}'. Current YARN queues : {1}. Setting '{2}' queue capacity slider " |
| "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames), llap_queue_name)) |
| if llap_daemon_selected_queue_name: |
| llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name) |
| if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED": |
| putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") |
| raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default values " |
| "and 'llap' queue capacity slider visibility to 'False'." |
| .format(llap_daemon_selected_queue_name, llap_selected_queue_state)) |
| else: |
| raise Fail("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values." |
| .format(llap_daemon_selected_queue_name)) |
| else: |
| Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive." |
| " Not calculating LLAP configs.") |
| return |
| |
| changed_configs_in_hive_int_env = None |
| llap_concurrency_in_changed_configs = None |
| llap_daemon_queue_in_changed_configs = None |
| # Calculations are triggered only if there is change in any one of the following props : |
| # 'llap_queue_capacity', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue' |
| # or 'hive.llap.daemon.queue.name' has change in value selection. |
| # OR |
| # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation) |
| if 'changed-configurations' in services.keys(): |
| config_names_to_be_checked = set(['llap_queue_capacity', 'enable_hive_interactive']) |
| changed_configs_in_hive_int_env = self.are_config_props_in_changed_configs(services, "hive-interactive-env", |
| config_names_to_be_checked, False) |
| |
| # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs. |
| llap_concurrency_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-site", |
| set(['hive.server2.tez.sessions.per.default.queue']), False) |
| llap_daemon_queue_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-site", |
| set(['hive.llap.daemon.queue.name']), False) |
| |
| if not changed_configs_in_hive_int_env and \ |
| not llap_concurrency_in_changed_configs and \ |
| not llap_daemon_queue_in_changed_configs and \ |
| services["changed-configurations"]: |
| Logger.info("LLAP parameters not modified. Not adjusting LLAP configs.") |
| Logger.info("Current 'changed-configuration' received is : {0}".format(services["changed-configurations"])) |
| return |
| |
| node_manager_host_list = self.get_node_manager_hosts(services, hosts) |
| node_manager_cnt = len(node_manager_host_list) |
| yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) |
| total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb |
| Logger.info("\n\nCalculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, " |
| "yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb)) |
| |
| # Check which queue is selected in 'hive.llap.daemon.queue.name', to determine current queue capacity |
| current_selected_queue_for_llap_cap = None |
| yarn_root_queues = capacity_scheduler_properties.get("yarn.scheduler.capacity.root.queues") |
| if llap_queue_selected_in_current_call == llap_queue_name \ |
| or llap_daemon_selected_queue_name == llap_queue_name \ |
| and (llap_queue_name in yarn_root_queues and len(leafQueueNames) == 2): |
| current_selected_queue_for_llap_cap_perc = self.get_llap_cap_percent_slider(services, configurations) |
| current_selected_queue_for_llap_cap = current_selected_queue_for_llap_cap_perc / 100 * total_cluster_capacity |
| else: # any queue other than 'llap' |
| current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, |
| llap_daemon_selected_queue_name, total_cluster_capacity) |
| assert (current_selected_queue_for_llap_cap >= 1), "Current selected queue '{0}' capacity value : {1}. Expected value : >= 1" \ |
| .format(llap_daemon_selected_queue_name, current_selected_queue_for_llap_cap) |
| yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) |
| tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity)) |
| normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) |
| Logger.info("Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, " |
| "total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size, |
| total_cluster_capacity)) |
| normalized_selected_queue_for_llap_cap = long(self._normalizeDown(current_selected_queue_for_llap_cap, yarn_min_container_size)) |
| |
| # Get calculated value for Slider AM container Size |
| slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size), |
| yarn_min_container_size) |
| |
| # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it. |
| if not llap_concurrency_in_changed_configs: |
| # Calculate llap concurrency (i.e. Number of Tez AM's) |
| llap_concurrency = float(normalized_selected_queue_for_llap_cap * 0.25 / normalized_tez_am_container_size) |
| llap_concurrency = max(long(llap_concurrency), 1) |
| Logger.info("Calculated llap_concurrency : {0}, using following : normalized_selected_queue_for_llap_cap : {1}, " |
| "normalized_tez_am_container_size : {2}".format(llap_concurrency, normalized_selected_queue_for_llap_cap, |
| normalized_tez_am_container_size)) |
| # Limit 'llap_concurrency' to reach a max. of 32. |
| if llap_concurrency > LLAP_MAX_CONCURRENCY: |
| llap_concurrency = LLAP_MAX_CONCURRENCY |
| else: |
| # Read current value |
| if 'hive.server2.tez.sessions.per.default.queue' in services['configurations'][self.HIVE_INTERACTIVE_SITE][ |
| 'properties']: |
| llap_concurrency = long(services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties'][ |
| 'hive.server2.tez.sessions.per.default.queue']) |
| assert ( |
| llap_concurrency >= 1), "'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1" \ |
| .format(llap_concurrency) |
| else: |
| raise Fail( |
| "Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config.") |
| |
| |
| # Calculate 'total memory available for llap daemons' across cluster |
| total_am_capacity_required = normalized_tez_am_container_size * llap_concurrency + slider_am_container_size |
| cap_available_for_daemons = normalized_selected_queue_for_llap_cap - total_am_capacity_required |
| Logger.info( |
| "Calculated cap_available_for_daemons : {0}, using following : current_selected_queue_for_llap_cap : {1}, " |
| "yarn_nm_mem_in_mb : {2}, total_cluster_capacity : {3}, normalized_selected_queue_for_llap_cap : {4}, normalized_tez_am_container_size" |
| " : {5}, yarn_min_container_size : {6}, llap_concurrency : {7}, total_am_capacity_required : {8}" |
| .format(cap_available_for_daemons, current_selected_queue_for_llap_cap, yarn_nm_mem_in_mb, |
| total_cluster_capacity, |
| normalized_selected_queue_for_llap_cap, normalized_tez_am_container_size, yarn_min_container_size, llap_concurrency, |
| total_am_capacity_required)) |
| if cap_available_for_daemons < yarn_min_container_size: |
| raise Fail( |
| "'Capacity available for LLAP daemons'({0}) < 'YARN minimum container size'({1}). Invalid configuration detected. " |
| "Increase LLAP queue size.".format(cap_available_for_daemons, yarn_min_container_size)) |
| |
| |
| |
| # Calculate value for 'num_llap_nodes', an across cluster config. |
| # Also, get calculated value for 'hive.llap.daemon.yarn.container.mb' based on 'num_llap_nodes' value, a per node config. |
| num_llap_nodes_raw = cap_available_for_daemons / yarn_nm_mem_in_mb |
| if num_llap_nodes_raw < 1.00: |
| # Set the llap nodes to min. value of 1 and 'llap_container_size' to min. YARN allocation. |
| num_llap_nodes = 1 |
| llap_container_size = self._normalizeUp(cap_available_for_daemons, yarn_min_container_size) |
| Logger.info("Calculated llap_container_size : {0}, using following : cap_available_for_daemons : {1}, " |
| "yarn_min_container_size : {2}".format(llap_container_size, cap_available_for_daemons, |
| yarn_min_container_size)) |
| else: |
| num_llap_nodes = math.floor(num_llap_nodes_raw) |
| llap_container_size = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size) |
| Logger.info("Calculated llap_container_size : {0}, using following : yarn_nm_mem_in_mb : {1}, " |
| "yarn_min_container_size : {2}".format(llap_container_size, yarn_nm_mem_in_mb, |
| yarn_min_container_size)) |
| Logger.info( |
| "Calculated num_llap_nodes : {0} using following : yarn_nm_mem_in_mb : {1}, cap_available_for_daemons : {2} " \ |
| .format(num_llap_nodes, yarn_nm_mem_in_mb, cap_available_for_daemons)) |
| |
| |
| # Calculate value for 'hive.llap.daemon.num.executors', a per node config. |
| hive_tez_container_size = self.get_hive_tez_container_size(services, configurations) |
| if 'yarn.nodemanager.resource.cpu-vcores' in services['configurations']['yarn-site']['properties']: |
| cpu_per_nm_host = float(services['configurations']['yarn-site']['properties'][ |
| 'yarn.nodemanager.resource.cpu-vcores']) |
| assert (cpu_per_nm_host > 0), "'yarn.nodemanager.resource.cpu-vcores' current value : {0}. Expected value : > 0" \ |
| .format(cpu_per_nm_host) |
| else: |
| raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.cpu-vcores' config.") |
| |
| num_executors_per_node_raw = math.floor(llap_container_size / hive_tez_container_size) |
| num_executors_per_node = min(num_executors_per_node_raw, cpu_per_nm_host) |
| Logger.info("calculated num_executors_per_node: {0}, using following : hive_tez_container_size : {1}, " |
| "cpu_per_nm_host : {2}, num_executors_per_node_raw : {3}, llap_container_size : {4}" |
| .format(num_executors_per_node, hive_tez_container_size, cpu_per_nm_host, num_executors_per_node_raw, |
| llap_container_size)) |
| assert (num_executors_per_node >= 0), "'Number of executors per node' : {0}. Expected value : > 0".format( |
| num_executors_per_node) |
| |
| total_mem_for_executors = num_executors_per_node * hive_tez_container_size |
| |
| # Calculate value for 'cache' (hive.llap.io.memory.size), a per node config. |
| cache_size_per_node = llap_container_size - total_mem_for_executors |
| Logger.info( |
| "Calculated cache_size_per_node : {0} using following : hive_container_size : {1}, llap_container_size" |
| " : {2}, num_executors_per_node : {3}" |
| .format(cache_size_per_node, hive_tez_container_size, llap_container_size, num_executors_per_node)) |
| if cache_size_per_node < 0: # Run with '0' cache. |
| Logger.info( |
| "Calculated 'cache_size_per_node' : {0}. Setting 'cache_size_per_node' to 0.".format(cache_size_per_node)) |
| cache_size_per_node = 0 |
| |
| |
| # Calculate value for prop 'llap_heap_size' |
| llap_xmx = max(total_mem_for_executors * 0.8, total_mem_for_executors - 1024) |
| Logger.info("Calculated llap_app_heap_size : {0}, using following : hive_container_size : {1}, " |
| "total_mem_for_executors : {2}".format(llap_xmx, hive_tez_container_size, total_mem_for_executors)) |
| |
| |
| # Updating calculated configs. |
| normalized_tez_am_container_size = long(normalized_tez_am_container_size) |
| putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size) |
| Logger.info("'Tez for Hive2' config 'tez.am.resource.memory.mb' updated. Current: {0}".format( |
| normalized_tez_am_container_size)) |
| |
| if not llap_concurrency_in_changed_configs: |
| min_llap_concurrency = 1 |
| putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency) |
| putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", |
| min_llap_concurrency) |
| putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", |
| LLAP_MAX_CONCURRENCY) |
| Logger.info( |
| "Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Min : {0}, Current: {1}, Max: {2}" \ |
| .format(min_llap_concurrency, llap_concurrency, LLAP_MAX_CONCURRENCY)) |
| |
| num_llap_nodes = long(num_llap_nodes) |
| |
| putHiveInteractiveEnvProperty('num_llap_nodes', num_llap_nodes) |
| Logger.info("LLAP config 'num_llap_nodes' updated. Current: {0}".format(num_llap_nodes)) |
| |
| llap_container_size = long(llap_container_size) |
| putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size) |
| Logger.info("LLAP config 'hive.llap.daemon.yarn.container.mb' updated. Current: {0}".format(llap_container_size)) |
| |
| num_executors_per_node = long(num_executors_per_node) |
| putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node) |
| Logger.info("LLAP config 'hive.llap.daemon.num.executors' updated. Current: {0}".format(num_executors_per_node)) |
| # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for |
| # 'hive.llap.daemon.num.executors' at all times. |
| putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node) |
| Logger.info("LLAP config 'hive.llap.io.threadpool.size' updated. Current: {0}".format(num_executors_per_node)) |
| |
| cache_size_per_node = long(cache_size_per_node) |
| putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_size_per_node) |
| Logger.info("LLAP config 'hive.llap.io.memory.size' updated. Current: {0}".format(cache_size_per_node)) |
| llap_io_enabled = 'false' |
| if cache_size_per_node >= 64: |
| llap_io_enabled = 'true' |
| |
| putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled) |
| Logger.info("Hive2 config 'hive.llap.io.enabled' updated to '{0}' as part of " |
| "'hive.llap.io.memory.size' calculation.".format(llap_io_enabled)) |
| |
| llap_xmx = long(llap_xmx) |
| putHiveInteractiveEnvProperty('llap_heap_size', llap_xmx) |
| Logger.info("LLAP config 'llap_heap_size' updated. Current: {0}".format(llap_xmx)) |
| |
| slider_am_container_size = long(slider_am_container_size) |
| putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size) |
| Logger.info("LLAP config 'slider_am_container_mb' updated. Current: {0}".format(slider_am_container_size)) |
| |
| except Exception as e: |
| # Set default values, if caught an Exception. The 'llap queue capacity' is left untouched, as it can be increased, |
| # triggerring recalculation. |
| Logger.info(e.message+" Skipping calculating LLAP configs. Setting them to minimum values.") |
| traceback.print_exc() |
| |
| try: |
| yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations)) |
| slider_am_container_size = long(self.calculate_slider_am_size(yarn_min_container_size)) |
| |
| node_manager_host_list = self.get_node_manager_hosts(services, hosts) |
| node_manager_cnt = len(node_manager_host_list) |
| |
| putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1) |
| putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1) |
| putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 32) |
| |
| putHiveInteractiveEnvProperty('num_llap_nodes', 0) |
| putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) |
| putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) |
| |
| putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size) |
| putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.yarn.container.mb', "minimum", yarn_min_container_size) |
| |
| putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 0) |
| putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1) |
| |
| putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0) |
| |
| putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0) |
| |
| putHiveInteractiveSiteProperty('hive.llap.io.memory.size', 0) |
| |
| putHiveInteractiveEnvProperty('llap_heap_size', 0) |
| |
| putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size) |
| |
| except Exception as e: |
| Logger.info("Problem setting minimum values for LLAP configs in Exception code.") |
| traceback.print_exc() |
| |
| """ |
| Checks for the presence of passed-in configuration properties in a given config, if they are changed. |
| Reads from services["changed-configurations"]. |
| Parameters: |
| services: Configuration information for the cluster |
| config_type : Type of the configuration |
| config_names_set : Set of configuration properties to be checked if they are changed. |
| all_exists: If True : returns True only if all properties mentioned in 'config_names_set' we found |
| in services["changed-configurations"]. |
| Otherwise, returns False. |
| If False : return True, if any of the properties mentioned in config_names_set we found in |
| services["changed-configurations"]. |
| Otherwise, returns False. |
| """ |
| def are_config_props_in_changed_configs(self, services, config_type, config_names_set, all_exists): |
| changedConfigs = services["changed-configurations"] |
| changed_config_names_set = set() |
| for changedConfig in changedConfigs: |
| if changedConfig['type'] == config_type: |
| changed_config_names_set.update([changedConfig['name']]) |
| |
| if changed_config_names_set is None: |
| return False |
| else: |
| configs_intersection = changed_config_names_set.intersection(config_names_set) |
| if all_exists: |
| if configs_intersection == config_names_set: |
| return True |
| else: |
| if len(configs_intersection) > 0 : |
| return True |
| return False |
| |
| """ |
| Returns all the Node Manager configs in cluster. |
| """ |
| def get_node_manager_hosts(self, services, hosts): |
| if len(hosts["items"]) > 0: |
| node_manager_hosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) |
| assert (node_manager_hosts is not None), "Information about NODEMANAGER not found in cluster." |
| return node_manager_hosts |
| |
| """ |
| Returns the current LLAP queue capacity percentage value. (llap_queue_capacity) |
| """ |
| def get_llap_cap_percent_slider(self, services, configurations): |
| llap_slider_cap_percentage = 0 |
| if 'llap_queue_capacity' in services['configurations']['hive-interactive-env']['properties']: |
| llap_slider_cap_percentage = float( |
| services['configurations']['hive-interactive-env']['properties']['llap_queue_capacity']) |
| Logger.error("'llap_queue_capacity' not present in services['configurations']['hive-interactive-env']['properties'].") |
| if llap_slider_cap_percentage <= 0 : |
| if 'hive-interactive-env' in configurations and \ |
| 'llap_queue_capacity' in configurations["hive-interactive-env"]["properties"]: |
| llap_slider_cap_percentage = float(configurations["hive-interactive-env"]["properties"]["llap_queue_capacity"]) |
| assert (llap_slider_cap_percentage > 0), "'llap_queue_capacity' is set to : {0}. Should be > 0.".format(llap_slider_cap_percentage) |
| return llap_slider_cap_percentage |
| |
| |
| """ |
| Returns current value of number of LLAP nodes in cluster (num_llap_nodes) |
| """ |
| def get_num_llap_nodes(self, services): |
| if 'num_llap_nodes' in services['configurations']['hive-interactive-env']['properties']: |
| num_llap_nodes = float( |
| services['configurations']['hive-interactive-env']['properties']['num_llap_nodes']) |
| assert (num_llap_nodes > 0), "Number of LLAP nodes read : {0}. Expected value : > 0".format( |
| num_llap_nodes) |
| return num_llap_nodes |
| else: |
| raise Fail("Couldn't retrieve Hive Server interactive's 'num_llap_nodes' config.") |
| |
| """ |
| Gets HIVE Tez container size (hive.tez.container.size). Takes into account if it has been calculated as part of current |
| Stack Advisor invocation. |
| """ |
| def get_hive_tez_container_size(self, services, configurations): |
| hive_container_size = None |
| # Check if 'hive.tez.container.size' is modified in current ST invocation. |
| if 'hive-site' in configurations and 'hive.tez.container.size' in configurations['hive-site']['properties']: |
| hive_container_size = float(configurations['hive-site']['properties']['hive.tez.container.size']) |
| Logger.info("'hive.tez.container.size' read from configurations as : {0}".format(hive_container_size)) |
| |
| if not hive_container_size: |
| # Check if 'hive.tez.container.size' is input in services array. |
| if 'hive.tez.container.size' in services['configurations']['hive-site']['properties']: |
| hive_container_size = float(services['configurations']['hive-site']['properties']['hive.tez.container.size']) |
| Logger.info("'hive.tez.container.size' read from services as : {0}".format(hive_container_size)) |
| if not hive_container_size: |
| raise Fail("Couldn't retrieve Hive Server 'hive.tez.container.size' config.") |
| |
| assert (hive_container_size > 0), "'hive.tez.container.size' current value : {0}. Expected value : > 0".format( |
| hive_container_size) |
| |
| return hive_container_size |
| |
| """ |
| Gets YARN's minimum container size (yarn.scheduler.minimum-allocation-mb). |
| Reads from: |
| - configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"] |
| is empty, else |
| - services['configurations'] (input). |
| |
| services["changed-configurations"] would be empty if Stack Advisor call is made from Blueprints (1st invocation). Subsequent |
| Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advsior calculates this |
| value (configurations), it is finally not recommended, making 'input' value to survive. |
| """ |
| def get_yarn_min_container_size(self, services, configurations): |
| yarn_min_container_size = None |
| # Check if services["changed-configurations"] is empty and 'yarn.scheduler.minimum-allocation-mb' is modified in current ST invocation. |
| if not services["changed-configurations"] and \ |
| 'yarn-site' in configurations and 'yarn.scheduler.minimum-allocation-mb' in configurations['yarn-site']['properties']: |
| yarn_min_container_size = float(configurations['yarn-site']['properties']['yarn.scheduler.minimum-allocation-mb']) |
| Logger.info("'yarn.scheduler.minimum-allocation-mb' read from configurations as : {0}".format(yarn_min_container_size)) |
| |
| if not yarn_min_container_size: |
| # Check if 'yarn.scheduler.minimum-allocation-mb' is input in services array. |
| if 'yarn-site' in services['configurations'] and \ |
| 'yarn.scheduler.minimum-allocation-mb' in services['configurations']['yarn-site']['properties']: |
| yarn_min_container_size = float(services['configurations']['yarn-site']['properties']['yarn.scheduler.minimum-allocation-mb']) |
| Logger.info("'yarn.scheduler.minimum-allocation-mb' read from services as : {0}".format(yarn_min_container_size)) |
| |
| if not yarn_min_container_size: |
| raise Fail("Couldn't retrieve YARN's 'yarn.scheduler.minimum-allocation-mb' config.") |
| |
| assert (yarn_min_container_size > 0), "'yarn.scheduler.minimum-allocation-mb' current value : {0}. " \ |
| "Expected value : > 0".format(yarn_min_container_size) |
| return yarn_min_container_size |
| |
| """ |
| Calculates the Slider App Master size based on YARN's Minimum Container Size. |
| """ |
| def calculate_slider_am_size(self, yarn_min_container_size): |
| if yarn_min_container_size > 1024: |
| return 1024 |
| if yarn_min_container_size >= 256 and yarn_min_container_size <= 1024: |
| return yarn_min_container_size |
| if yarn_min_container_size < 256: |
| return 256 |
| |
| """ |
| Gets YARN NodeManager memory in MB (yarn.nodemanager.resource.memory-mb). |
| Reads from: |
| - configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"] |
| is empty, else |
| - services['configurations'] (input). |
| |
| services["changed-configurations"] would be empty is Stack Advisor call if made from Blueprints (1st invocation). Subsequent |
| Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advsior calculates this |
| value (configurations), it is finally not recommended, making 'input' value to survive. |
| """ |
| def get_yarn_nm_mem_in_mb(self, services, configurations): |
| yarn_nm_mem_in_mb = None |
| |
| # Check if services["changed-configurations"] is empty and 'yarn.nodemanager.resource.memory-mb' is modified in current ST invocation. |
| if not services["changed-configurations"] and\ |
| 'yarn-site' in configurations and 'yarn.nodemanager.resource.memory-mb' in configurations['yarn-site']['properties']: |
| yarn_nm_mem_in_mb = float(configurations['yarn-site']['properties']['yarn.nodemanager.resource.memory-mb']) |
| Logger.info("'yarn.nodemanager.resource.memory-mb' read from configurations as : {0}".format(yarn_nm_mem_in_mb)) |
| |
| if not yarn_nm_mem_in_mb: |
| # Check if 'yarn.nodemanager.resource.memory-mb' is input in services array. |
| if 'yarn-site' in services['configurations'] and \ |
| 'yarn.nodemanager.resource.memory-mb' in services['configurations']['yarn-site']['properties']: |
| yarn_nm_mem_in_mb = float(services['configurations']['yarn-site']['properties']['yarn.nodemanager.resource.memory-mb']) |
| Logger.info("'yarn.nodemanager.resource.memory-mb' read from services as : {0}".format(yarn_nm_mem_in_mb)) |
| |
| if not yarn_nm_mem_in_mb: |
| raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.memory-mb' config.") |
| |
| assert (yarn_nm_mem_in_mb > 0.0), "'yarn.nodemanager.resource.memory-mb' current value : {0}. " \ |
| "Expected value : > 0".format(yarn_nm_mem_in_mb) |
| |
| return yarn_nm_mem_in_mb |
| |
| """ |
| Determines Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site based on total cluster capacity. |
| """ |
| def calculate_tez_am_container_size(self, total_cluster_capacity): |
| if total_cluster_capacity is None or not isinstance(total_cluster_capacity, long): |
| raise Fail ("Passed-in 'Total Cluster Capacity' is : '{0}'".format(total_cluster_capacity)) |
| |
| if total_cluster_capacity <= 0: |
| raise Fail ("Passed-in 'Total Cluster Capacity' ({0}) is Invalid.".format(total_cluster_capacity)) |
| if total_cluster_capacity <= 4096: |
| return 256 |
| elif total_cluster_capacity > 4096 and total_cluster_capacity <= 73728: |
| return 512 |
| elif total_cluster_capacity > 73728: |
| return 1536 |
| |
| |
| """ |
| Calculate minimum queue capacity required in order to get LLAP and HIVE2 app into running state. |
| """ |
| def min_queue_perc_reqd_for_llap_and_hive_app(self, services, hosts, configurations): |
| # Get queue size if sized at 20% |
| node_manager_hosts = self.get_node_manager_hosts(services, hosts) |
| yarn_rm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) |
| total_cluster_cap = len(node_manager_hosts) * yarn_rm_mem_in_mb |
| total_queue_size_at_20_perc = 20.0 / 100 * total_cluster_cap |
| |
| # Calculate based on minimum size required by containers. |
| yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) |
| slider_am_size = self.calculate_slider_am_size(yarn_min_container_size) |
| hive_tez_container_size = self.get_hive_tez_container_size(services, configurations) |
| tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_cap)) |
| normalized_val = self._normalizeUp(slider_am_size, yarn_min_container_size) + self._normalizeUp\ |
| (hive_tez_container_size, yarn_min_container_size) + self._normalizeUp(tez_am_container_size, yarn_min_container_size) |
| |
| min_required = max(total_queue_size_at_20_perc, normalized_val) |
| |
| min_required_perc = min_required * 100 / total_cluster_cap |
| Logger.info("Calculated 'min_queue_perc_required_in_cluster' : {0}% and 'min_queue_cap_required_in_cluster': {1} " |
| "for LLAP and HIVE2 app using following : yarn_min_container_size : {2}, slider_am_size : {3}, hive_tez_container_size : {4}, " |
| "hive_am_container_size : {5}, total_cluster_cap : {6}, yarn_rm_mem_in_mb : {7}" |
| "".format(min_required_perc, min_required, yarn_min_container_size, slider_am_size, hive_tez_container_size, |
| tez_am_container_size, total_cluster_cap, yarn_rm_mem_in_mb)) |
| return int(math.ceil(min_required_perc)) |
| |
| """ |
| Normalize down 'val2' with respect to 'val1'. |
| """ |
| def _normalizeDown(self, val1, val2): |
| tmp = math.floor(val1 / val2) |
| if tmp < 1.00: |
| return val1 |
| return tmp * val2 |
| |
| """ |
| Normalize up 'val2' with respect to 'val1'. |
| """ |
| def _normalizeUp(self, val1, val2): |
| tmp = math.ceil(val1 / val2) |
| return tmp * val2 |
| |
| """ |
| Checks and (1). Creates 'llap' queue if only 'default' queue exist at leaf level and is consuming 100% capacity OR |
| (2). Updates 'llap' queue capacity and state, if current selected queue is 'llap', and only 2 queues exist |
| at root level : 'default' and 'llap'. |
| """ |
| def checkAndManageLlapQueue(self, services, configurations, hosts, llap_queue_name): |
| Logger.info("Determining creation/adjustment of 'capacity-scheduler' for 'llap' queue.") |
| putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) |
| putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) |
| putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") |
| putCapSchedProperty = self.putProperty(configurations, "capacity-scheduler", services) |
| leafQueueNames = None |
| |
| capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) |
| if capacity_scheduler_properties: |
| leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) |
| # Get the llap Cluster percentage used for 'llap' Queue creation |
| if 'llap_queue_capacity' in services['configurations']['hive-interactive-env']['properties']: |
| llap_slider_cap_percentage = int( |
| services['configurations']['hive-interactive-env']['properties']['llap_queue_capacity']) |
| min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations) |
| if min_reqd_queue_cap_perc > 100: |
| min_reqd_queue_cap_perc = 100 |
| Logger.info("Received 'Minimum Required LLAP queue capacity' : {0}% (out of bounds), adjusted it to : 100%".format(min_reqd_queue_cap_perc)) |
| |
| # Adjust 'llap' queue capacity slider value to be minimum required if out of expected bounds. |
| if llap_slider_cap_percentage <= 0 or llap_slider_cap_percentage > 100: |
| Logger.info("Adjusting HIVE 'llap_queue_capacity' from {0}% (invalid size) to {1}%".format(llap_slider_cap_percentage, min_reqd_queue_cap_perc)) |
| putHiveInteractiveEnvProperty('llap_queue_capacity', min_reqd_queue_cap_perc) |
| llap_slider_cap_percentage = min_reqd_queue_cap_perc |
| else: |
| Logger.error("Problem retrieving LLAP Queue Capacity. Skipping creating {0} queue".format(llap_queue_name)) |
| return |
| |
| cap_sched_config_keys = capacity_scheduler_properties.keys() |
| |
| yarn_default_queue_capacity = -1 |
| if 'yarn.scheduler.capacity.root.default.capacity' in cap_sched_config_keys: |
| yarn_default_queue_capacity = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.default.capacity') |
| |
| # Get 'llap' queue state |
| currLlapQueueState = '' |
| if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state' in cap_sched_config_keys: |
| currLlapQueueState = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.state') |
| |
| # Get 'llap' queue capacity |
| currLlapQueueCap = -1 |
| if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity' in cap_sched_config_keys: |
| currLlapQueueCap = int(float(capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity'))) |
| |
| if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ |
| 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: |
| llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] |
| else: |
| Logger.error("Couldn't retrive 'hive.llap.daemon.queue.name' property. Skipping adjusting queues.") |
| return |
| updated_cap_sched_configs_str = '' |
| |
| enabled_hive_int_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-env", |
| set(['enable_hive_interactive']), False) |
| """ |
| We create OR "modify 'llap' queue 'state and/or capacity' " based on below conditions: |
| - if only 1 queue exists at root level and is 'default' queue and has 100% cap -> Create 'llap' queue, OR |
| - if 2 queues exists at root level ('llap' and 'default') : |
| - Queue selected is 'llap' and state is STOPPED -> Modify 'llap' queue state to RUNNING, adjust capacity, OR |
| - Queue selected is 'llap', state is RUNNING and 'llap_queue_capacity' prop != 'llap' queue current running capacity -> |
| Modify 'llap' queue capacity to 'llap_queue_capacity' |
| """ |
| if 'default' in leafQueueNames and \ |
| ((len(leafQueueNames) == 1 and int(yarn_default_queue_capacity) == 100) or \ |
| ((len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames) and \ |
| ((currLlapQueueState == 'STOPPED' and enabled_hive_int_in_changed_configs) or (currLlapQueueState == 'RUNNING' and currLlapQueueCap != llap_slider_cap_percentage)))): |
| adjusted_default_queue_cap = str(100 - llap_slider_cap_percentage) |
| |
| hive_user = '*' # Open to all |
| if 'hive_user' in services['configurations']['hive-env']['properties']: |
| hive_user = services['configurations']['hive-env']['properties']['hive_user'] |
| |
| llap_slider_cap_percentage = str(llap_slider_cap_percentage) |
| |
| # If capacity-scheduler configs are received as one concatenated string, we deposit the changed configs back as |
| # one concatenated string. |
| updated_cap_sched_configs_as_dict = False |
| if not received_as_key_value_pair: |
| for prop, val in capacity_scheduler_properties.items(): |
| if llap_queue_name not in prop: |
| if prop == 'yarn.scheduler.capacity.root.queues': |
| updated_cap_sched_configs_str = updated_cap_sched_configs_str \ |
| + prop + "=default,llap\n" |
| elif prop == 'yarn.scheduler.capacity.root.default.capacity': |
| updated_cap_sched_configs_str = updated_cap_sched_configs_str \ |
| + prop + "=" + adjusted_default_queue_cap + "\n" |
| elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity': |
| updated_cap_sched_configs_str = updated_cap_sched_configs_str \ |
| + prop + "=" + adjusted_default_queue_cap + "\n" |
| elif prop.startswith('yarn.') and '.llap.' not in prop: |
| updated_cap_sched_configs_str = updated_cap_sched_configs_str + prop + "=" + val + "\n" |
| |
| # Now, append the 'llap' queue related properties |
| updated_cap_sched_configs_str = updated_cap_sched_configs_str \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".user-limit-factor=1\n" \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".state=RUNNING\n" \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".ordering-policy=fifo\n" \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".minimum-user-limit-percent=100\n" \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity=" \ |
| + llap_slider_cap_percentage + "\n" \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".capacity=" \ |
| + llap_slider_cap_percentage + "\n" \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".acl_submit_applications=" \ |
| + hive_user + "\n" \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".acl_administer_queue=" \ |
| + hive_user + "\n" \ |
| + "yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-am-resource-percent=1" |
| |
| putCapSchedProperty("capacity-scheduler", updated_cap_sched_configs_str) |
| Logger.info("Updated 'capacity-scheduler' configs as one concatenated string.") |
| else: |
| # If capacity-scheduler configs are received as a dictionary (generally 1st time), we deposit the changed |
| # values back as dictionary itself. |
| # Update existing configs in 'capacity-scheduler'. |
| for prop, val in capacity_scheduler_properties.items(): |
| if llap_queue_name not in prop: |
| if prop == 'yarn.scheduler.capacity.root.queues': |
| putCapSchedProperty(prop, 'default,llap') |
| elif prop == 'yarn.scheduler.capacity.root.default.capacity': |
| putCapSchedProperty(prop, adjusted_default_queue_cap) |
| elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity': |
| putCapSchedProperty(prop, adjusted_default_queue_cap) |
| elif prop.startswith('yarn.') and '.llap.' not in prop: |
| putCapSchedProperty(prop, val) |
| |
| # Add new 'llap' queue related configs. |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".user-limit-factor", "1") |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".state", "RUNNING") |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".ordering-policy", "fifo") |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".minimum-user-limit-percent", "100") |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity", llap_slider_cap_percentage) |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".capacity", llap_slider_cap_percentage) |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_submit_applications", hive_user) |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_administer_queue", hive_user) |
| putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-am-resource-percent", "1") |
| |
| |
| Logger.info("Updated 'capacity-scheduler' configs as a dictionary.") |
| updated_cap_sched_configs_as_dict = True |
| |
| if updated_cap_sched_configs_str or updated_cap_sched_configs_as_dict: |
| if len(leafQueueNames) == 1: # 'llap' queue didn't exist before |
| Logger.info("Created YARN Queue : '{0}' with capacity : {1}%. Adjusted 'default' queue capacity to : {2}%" \ |
| .format(llap_queue_name, llap_slider_cap_percentage, adjusted_default_queue_cap)) |
| else: # Queue existed, only adjustments done. |
| Logger.info("Adjusted YARN Queue : '{0}'. Current capacity : {1}%. State: RUNNING.".format(llap_queue_name, llap_slider_cap_percentage)) |
| Logger.info("Adjusted 'default' queue capacity to : {0}%".format(adjusted_default_queue_cap)) |
| |
| # Update Hive 'hive.llap.daemon.queue.name' prop to use 'llap' queue. |
| putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name) |
| putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name) |
| putHiveInteractiveEnvPropertyAttribute('llap_queue_capacity', "minimum", min_reqd_queue_cap_perc) |
| putHiveInteractiveEnvPropertyAttribute('llap_queue_capacity', "maximum", 100) |
| |
| # Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility. |
| self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations) |
| else: |
| Logger.debug("Not creating/adjusting {0} queue. Current YARN queues : {1}".format(llap_queue_name, list(leafQueueNames))) |
| else: |
| Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive.") |
| |
| """ |
| Checks and sees (1). If only two leaf queues exist at root level, namely: 'default' and 'llap', |
| and (2). 'llap' is in RUNNING state. |
| |
| If yes, performs the following actions: (1). 'llap' queue state set to STOPPED, |
| (2). 'llap' queue capacity set to 0 %, |
| (3). 'default' queue capacity set to 100 % |
| """ |
| def checkAndStopLlapQueue(self, services, configurations, llap_queue_name): |
| putCapSchedProperty = self.putProperty(configurations, "capacity-scheduler", services) |
| putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) |
| capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) |
| updated_default_queue_configs = '' |
| updated_llap_queue_configs = '' |
| if capacity_scheduler_properties: |
| # Get all leaf queues. |
| leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) |
| |
| if len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames and 'default' in leafQueueNames: |
| # Get 'llap' queue state |
| currLlapQueueState = 'STOPPED' |
| if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state' in capacity_scheduler_properties.keys(): |
| currLlapQueueState = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.state') |
| else: |
| Logger.error("{0} queue 'state' property not present in capacity scheduler. Skipping adjusting queues.".format(llap_queue_name)) |
| return |
| if currLlapQueueState == 'RUNNING': |
| DEFAULT_MAX_CAPACITY = '100' |
| for prop, val in capacity_scheduler_properties.items(): |
| # Update 'default' related configs in 'updated_default_queue_configs' |
| if llap_queue_name not in prop: |
| if prop == 'yarn.scheduler.capacity.root.default.capacity': |
| # Set 'default' capacity back to maximum val |
| updated_default_queue_configs = updated_default_queue_configs \ |
| + prop + "="+DEFAULT_MAX_CAPACITY + "\n" |
| elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity': |
| # Set 'default' max. capacity back to maximum val |
| updated_default_queue_configs = updated_default_queue_configs \ |
| + prop + "="+DEFAULT_MAX_CAPACITY + "\n" |
| elif prop.startswith('yarn.'): |
| updated_default_queue_configs = updated_default_queue_configs + prop + "=" + val + "\n" |
| else: # Update 'llap' related configs in 'updated_llap_queue_configs' |
| if prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state': |
| updated_llap_queue_configs = updated_llap_queue_configs \ |
| + prop + "=STOPPED\n" |
| elif prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity': |
| updated_llap_queue_configs = updated_llap_queue_configs \ |
| + prop + "=0\n" |
| elif prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.maximum-capacity': |
| updated_llap_queue_configs = updated_llap_queue_configs \ |
| + prop + "=0\n" |
| elif prop.startswith('yarn.'): |
| updated_llap_queue_configs = updated_llap_queue_configs + prop + "=" + val + "\n" |
| else: |
| Logger.debug("{0} queue state is : {1}. Skipping adjusting queues.".format(llap_queue_name, currLlapQueueState)) |
| return |
| |
| if updated_default_queue_configs and updated_llap_queue_configs: |
| putCapSchedProperty("capacity-scheduler", updated_default_queue_configs+updated_llap_queue_configs) |
| Logger.info("Changed YARN '{0}' queue state to 'STOPPED', and capacity to 0%. Adjusted 'default' queue capacity to : {1}%" \ |
| .format(llap_queue_name, DEFAULT_MAX_CAPACITY)) |
| |
| # Update Hive 'hive.llap.daemon.queue.name' prop to use 'default' queue. |
| putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', self.YARN_ROOT_DEFAULT_QUEUE_NAME) |
| putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', self.YARN_ROOT_DEFAULT_QUEUE_NAME) |
| else: |
| Logger.debug("Not removing '{0}' queue as number of Queues not equal to 2. Current YARN queues : {1}".format(llap_queue_name, list(leafQueueNames))) |
| else: |
| Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive.") |
| |
| """ |
| Checks and sets the 'Hive Server Interactive' 'hive.llap.daemon.queue.name' config Property Attributes. Takes into |
| account that 'capacity-scheduler' may have changed (got updated) in current Stack Advisor invocation. |
| |
| Also, updates the 'llap_queue_capacity' slider visibility. |
| """ |
| def setLlapDaemonQueuePropAttributesAndCapSliderVisibility(self, services, configurations): |
| Logger.info("Determining 'hive.llap.daemon.queue.name' config Property Attributes.") |
| putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE) |
| putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") |
| |
| capacity_scheduler_properties = dict() |
| |
| # Read 'capacity-scheduler' from configurations if we modified and added recommendation to it, as part of current |
| # StackAdvisor invocation. |
| if "capacity-scheduler" in configurations: |
| cap_sched_props_as_dict = configurations["capacity-scheduler"]["properties"] |
| if 'capacity-scheduler' in cap_sched_props_as_dict: |
| cap_sched_props_as_str = configurations['capacity-scheduler']['properties']['capacity-scheduler'] |
| if cap_sched_props_as_str: |
| cap_sched_props_as_str = str(cap_sched_props_as_str).split('\n') |
| if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != 'null': |
| # Got 'capacity-scheduler' configs as one "\n" separated string |
| for property in cap_sched_props_as_str: |
| key, sep, value = property.partition("=") |
| capacity_scheduler_properties[key] = value |
| Logger.info("'capacity-scheduler' configs is set as a single '\\n' separated string in current invocation. " |
| "count(configurations['capacity-scheduler']['properties']['capacity-scheduler']) = " |
| "{0}".format(len(capacity_scheduler_properties))) |
| else: |
| Logger.info("Read configurations['capacity-scheduler']['properties']['capacity-scheduler'] is : {0}".format(cap_sched_props_as_str)) |
| else: |
| Logger.info("configurations['capacity-scheduler']['properties']['capacity-scheduler'] : {0}.".format(cap_sched_props_as_str)) |
| |
| # if 'capacity_scheduler_properties' is empty, implies we may have 'capacity-scheduler' configs as dictionary |
| # in configurations, if 'capacity-scheduler' changed in current invocation. |
| if not capacity_scheduler_properties: |
| if isinstance(cap_sched_props_as_dict, dict) and len(cap_sched_props_as_dict) > 1: |
| capacity_scheduler_properties = cap_sched_props_as_dict |
| Logger.info("'capacity-scheduler' changed in current Stack Advisor invocation. Retrieved the configs as dictionary from configurations.") |
| else: |
| Logger.info("Read configurations['capacity-scheduler']['properties'] is : {0}".format(cap_sched_props_as_dict)) |
| else: |
| Logger.info("'capacity-scheduler' not modified in the current Stack Advisor invocation.") |
| |
| |
| # if 'capacity_scheduler_properties' is still empty, implies 'capacity_scheduler' wasn't change in current |
| # SA invocation. Thus, read it from input : 'services'. |
| if not capacity_scheduler_properties: |
| capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) |
| Logger.info("'capacity-scheduler' not changed in current Stack Advisor invocation. Retrieved the configs from services.") |
| |
| # Get set of current YARN leaf queues. |
| leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) |
| if leafQueueNames: |
| leafQueues = [{"label": str(queueName), "value": queueName} for queueName in leafQueueNames] |
| leafQueues = sorted(leafQueues, key=lambda q: q['value']) |
| putHiveInteractiveSitePropertyAttribute("hive.llap.daemon.queue.name", "entries", leafQueues) |
| Logger.info("'hive.llap.daemon.queue.name' config Property Attributes set to : {0}".format(leafQueues)) |
| |
| # Update 'llap_queue_capacity' slider visibility to 'true' if current selected queue in 'hive.llap.daemon.queue.name' |
| # is 'llap', else 'false'. |
| llap_daemon_selected_queue_name = None |
| llap_queue_selected_in_current_call = None |
| if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ |
| 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: |
| llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] |
| |
| if self.HIVE_INTERACTIVE_SITE in configurations and \ |
| 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']: |
| llap_queue_selected_in_current_call = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] |
| |
| # Check to see if only 2 queues exist at root level : 'default' and 'llap' and current selected queue in 'hive.llap.daemon.queue.name' |
| # is 'llap'. |
| if len(leafQueueNames) == 2 and \ |
| ((llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == 'llap') or \ |
| (llap_queue_selected_in_current_call != None and llap_queue_selected_in_current_call == 'llap')): |
| putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "true") |
| Logger.info("Setting LLAP queue capacity slider visibility to 'True'.") |
| else: |
| putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") |
| Logger.info("Setting LLAP queue capacity slider visibility to 'False'.") |
| else: |
| Logger.error("Problem retrieving YARN queues. Skipping updating HIVE Server Interactve " |
| "'hive.server2.tez.default.queues' property attributes.") |
| |
| """ |
| Retrieves the passed in queue's 'capacity' related key from Capacity Scheduler. |
| """ |
| def __getQueueCapacityKeyFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name): |
| # Identify the key which contains the capacity for 'llap_daemon_selected_queue_name'. |
| cap_sched_keys = capacity_scheduler_properties.keys() |
| llap_selected_queue_cap_key = None |
| current_selected_queue_for_llap_cap = None |
| for key in cap_sched_keys: |
| # Expected capacity prop key is of form : 'yarn.scheduler.capacity.<one or more queues in path separated by '.'>.[llap_daemon_selected_queue_name].capacity' |
| if key.endswith(llap_daemon_selected_queue_name+".capacity"): |
| llap_selected_queue_cap_key = key |
| break; |
| return llap_selected_queue_cap_key |
| |
| """ |
| Retrieves the passed in queue's 'state' from Capacity Scheduler. |
| """ |
| def __getQueueStateFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name): |
| # Identify the key which contains the state for 'llap_daemon_selected_queue_name'. |
| cap_sched_keys = capacity_scheduler_properties.keys() |
| llap_selected_queue_state_key = None |
| llap_selected_queue_state = None |
| for key in cap_sched_keys: |
| if key.endswith(llap_daemon_selected_queue_name+".state"): |
| llap_selected_queue_state_key = key |
| break; |
| llap_selected_queue_state = capacity_scheduler_properties.get(llap_selected_queue_state_key) |
| return llap_selected_queue_state |
| |
| """ |
| Calculates the total available capacity for the passed-in YARN queue of any level based on the percentages. |
| """ |
| def __getSelectedQueueTotalCap(self, capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity): |
| Logger.info("Entered __getSelectedQueueTotalCap fn().") |
| available_capacity = total_cluster_capacity |
| queue_cap_key = self.__getQueueCapacityKeyFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name) |
| if queue_cap_key: |
| queue_cap_key = queue_cap_key.strip() |
| if len(queue_cap_key) >= 34: # len('yarn.scheduler.capacity.<single letter queue name>.capacity') = 34 |
| # Expected capacity prop key is of form : 'yarn.scheduler.capacity.<one or more queues (path)>.capacity' |
| queue_path = queue_cap_key[24:] # Strip from beginning 'yarn.scheduler.capacity.' |
| queue_path = queue_path[0:-9] # Strip from end '.capacity' |
| queues_list = queue_path.split('.') |
| Logger.info("Queue list : {0}".format(queues_list)) |
| if queues_list: |
| for queue in queues_list: |
| queue_cap_key = self.__getQueueCapacityKeyFromCapacityScheduler(capacity_scheduler_properties, queue) |
| queue_cap_perc = float(capacity_scheduler_properties.get(queue_cap_key)) |
| available_capacity = queue_cap_perc / 100 * available_capacity |
| Logger.info("Total capacity available for queue {0} is : {1}".format(queue, available_capacity)) |
| else: |
| raise Fail("Retrieved 'queue list' from capacity-scheduler is empty while doing '{0}' queue capacity caluclation." |
| .format(llap_daemon_selected_queue_name)) |
| else: |
| raise Fail("Expected length for queue_cap_key(val:{0}) should be greater than atleast 34.".format(queue_cap_key)) |
| else: |
| raise Fail("Couldn't retrieve {0}' queue capacity KEY from capacity-scheduler while doing capacity caluclation.".format(llap_daemon_selected_queue_name)) |
| # returns the capacity calculated for passed-in queue in 'llap_daemon_selected_queue_name'. |
| return available_capacity |
| |
| def recommendRangerKMSConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP25StackAdvisor, self).recommendRangerKMSConfigurations(configurations, clusterData, services, hosts) |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| putRangerKmsSiteProperty = self.putProperty(configurations, "kms-site", services) |
| |
| if 'ranger-env' in services['configurations'] and 'ranger_user' in services['configurations']['ranger-env']['properties']: |
| rangerUser = services['configurations']['ranger-env']['properties']['ranger_user'] |
| |
| if 'kms-site' in services['configurations'] and 'KERBEROS' in servicesList: |
| putRangerKmsSiteProperty('hadoop.kms.proxyuser.{0}.groups'.format(rangerUser), '*') |
| putRangerKmsSiteProperty('hadoop.kms.proxyuser.{0}.hosts'.format(rangerUser), '*') |
| putRangerKmsSiteProperty('hadoop.kms.proxyuser.{0}.users'.format(rangerUser), '*') |
| |
| def recommendRangerConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP25StackAdvisor, self).recommendRangerConfigurations(configurations, clusterData, services, hosts) |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| has_ranger_tagsync = False |
| |
| putTagsyncAppProperty = self.putProperty(configurations, "tagsync-application-properties", services) |
| putTagsyncSiteProperty = self.putProperty(configurations, "ranger-tagsync-site", services) |
| putRangerAdminProperty = self.putProperty(configurations, "ranger-admin-site", services) |
| putRangerEnvProperty = self.putProperty(configurations, "ranger-env", services) |
| |
| ranger_tagsync_host = self.__getHostsForComponent(services, "RANGER", "RANGER_TAGSYNC") |
| has_ranger_tagsync = len(ranger_tagsync_host) > 0 |
| |
| if 'ATLAS' in servicesList and has_ranger_tagsync: |
| atlas_hosts = self.getHostNamesWithComponent("ATLAS", "ATLAS_SERVER", services) |
| atlas_host = 'localhost' if len(atlas_hosts) == 0 else atlas_hosts[0] |
| protocol = 'http' |
| atlas_port = '21000' |
| |
| if 'application-properties' in services['configurations'] and 'atlas.enableTLS' in services['configurations']['application-properties']['properties'] \ |
| and services['configurations']['application-properties']['properties']['atlas.enableTLS'].lower() == 'true': |
| protocol = 'https' |
| if 'application-properties' in services['configurations'] and 'atlas.server.https.port' in services['configurations']['application-properties']['properties']: |
| atlas_port = services['configurations']['application-properties']['properties']['atlas.server.https.port'] |
| else: |
| protocol = 'http' |
| if 'application-properties' in services['configurations'] and 'atlas.server.http.port' in services['configurations']['application-properties']['properties']: |
| atlas_port = services['configurations']['application-properties']['properties']['atlas.server.http.port'] |
| |
| atlas_rest_endpoint = '{0}://{1}:{2}'.format(protocol, atlas_host, atlas_port) |
| |
| putTagsyncSiteProperty('ranger.tagsync.source.atlas', 'true') |
| putTagsyncSiteProperty('ranger.tagsync.source.atlasrest.endpoint', atlas_rest_endpoint) |
| |
| zookeeper_host_port = self.getZKHostPortString(services) |
| if zookeeper_host_port and has_ranger_tagsync: |
| putTagsyncAppProperty('atlas.kafka.zookeeper.connect', zookeeper_host_port) |
| |
| if 'KAFKA' in servicesList and has_ranger_tagsync: |
| kafka_hosts = self.getHostNamesWithComponent("KAFKA", "KAFKA_BROKER", services) |
| kafka_port = '6667' |
| if 'kafka-broker' in services['configurations'] and ( |
| 'port' in services['configurations']['kafka-broker']['properties']): |
| kafka_port = services['configurations']['kafka-broker']['properties']['port'] |
| kafka_host_port = [] |
| for i in range(len(kafka_hosts)): |
| kafka_host_port.append(kafka_hosts[i] + ':' + kafka_port) |
| |
| final_kafka_host = ",".join(kafka_host_port) |
| putTagsyncAppProperty('atlas.kafka.bootstrap.servers', final_kafka_host) |
| |
| is_solr_cloud_enabled = False |
| if 'ranger-env' in services['configurations'] and 'is_solrCloud_enabled' in services['configurations']['ranger-env']['properties']: |
| is_solr_cloud_enabled = services['configurations']['ranger-env']['properties']['is_solrCloud_enabled'] == 'true' |
| |
| is_external_solr_cloud_enabled = False |
| if 'ranger-env' in services['configurations'] and 'is_external_solrCloud_enabled' in services['configurations']['ranger-env']['properties']: |
| is_external_solr_cloud_enabled = services['configurations']['ranger-env']['properties']['is_external_solrCloud_enabled'] == 'true' |
| |
| ranger_audit_zk_port = '' |
| |
| if 'AMBARI_INFRA' in servicesList and zookeeper_host_port and is_solr_cloud_enabled and not is_external_solr_cloud_enabled: |
| zookeeper_host_port = zookeeper_host_port.split(',') |
| zookeeper_host_port.sort() |
| zookeeper_host_port = ",".join(zookeeper_host_port) |
| infra_solr_znode = '/infra-solr' |
| |
| if 'infra-solr-env' in services['configurations'] and \ |
| ('infra_solr_znode' in services['configurations']['infra-solr-env']['properties']): |
| infra_solr_znode = services['configurations']['infra-solr-env']['properties']['infra_solr_znode'] |
| ranger_audit_zk_port = '{0}{1}'.format(zookeeper_host_port, infra_solr_znode) |
| putRangerAdminProperty('ranger.audit.solr.zookeepers', ranger_audit_zk_port) |
| elif zookeeper_host_port and is_solr_cloud_enabled and is_external_solr_cloud_enabled: |
| ranger_audit_zk_port = '{0}/{1}'.format(zookeeper_host_port, 'ranger_audits') |
| putRangerAdminProperty('ranger.audit.solr.zookeepers', ranger_audit_zk_port) |
| else: |
| putRangerAdminProperty('ranger.audit.solr.zookeepers', 'NONE') |
| |
| ranger_services = [ |
| {'service_name': 'HDFS', 'audit_file': 'ranger-hdfs-audit'}, |
| {'service_name': 'YARN', 'audit_file': 'ranger-yarn-audit'}, |
| {'service_name': 'HBASE', 'audit_file': 'ranger-hbase-audit'}, |
| {'service_name': 'HIVE', 'audit_file': 'ranger-hive-audit'}, |
| {'service_name': 'KNOX', 'audit_file': 'ranger-knox-audit'}, |
| {'service_name': 'KAFKA', 'audit_file': 'ranger-kafka-audit'}, |
| {'service_name': 'STORM', 'audit_file': 'ranger-storm-audit'}, |
| {'service_name': 'RANGER_KMS', 'audit_file': 'ranger-kms-audit'}, |
| {'service_name': 'ATLAS', 'audit_file': 'ranger-atlas-audit'} |
| ] |
| |
| for item in range(len(ranger_services)): |
| if ranger_services[item]['service_name'] in servicesList: |
| component_audit_file = ranger_services[item]['audit_file'] |
| if component_audit_file in services["configurations"]: |
| ranger_audit_dict = [ |
| {'filename': 'ranger-admin-site', 'configname': 'ranger.audit.solr.urls', 'target_configname': 'xasecure.audit.destination.solr.urls'}, |
| {'filename': 'ranger-admin-site', 'configname': 'ranger.audit.solr.zookeepers', 'target_configname': 'xasecure.audit.destination.solr.zookeepers'} |
| ] |
| putRangerAuditProperty = self.putProperty(configurations, component_audit_file, services) |
| |
| for item in ranger_audit_dict: |
| if item['filename'] in services["configurations"] and item['configname'] in services["configurations"][item['filename']]["properties"]: |
| if item['filename'] in configurations and item['configname'] in configurations[item['filename']]["properties"]: |
| rangerAuditProperty = configurations[item['filename']]["properties"][item['configname']] |
| else: |
| rangerAuditProperty = services["configurations"][item['filename']]["properties"][item['configname']] |
| putRangerAuditProperty(item['target_configname'], rangerAuditProperty) |
| |
| if "HDFS" in servicesList: |
| hdfs_user = None |
| if "hadoop-env" in services["configurations"] and "hdfs_user" in services["configurations"]["hadoop-env"]["properties"]: |
| hdfs_user = services["configurations"]["hadoop-env"]["properties"]["hdfs_user"] |
| putRangerAdminProperty('ranger.kms.service.user.hdfs', hdfs_user) |
| |
| if "HIVE" in servicesList: |
| hive_user = None |
| if "hive-env" in services["configurations"] and "hive_user" in services["configurations"]["hive-env"]["properties"]: |
| hive_user = services["configurations"]["hive-env"]["properties"]["hive_user"] |
| putRangerAdminProperty('ranger.kms.service.user.hive', hive_user) |
| |
| ranger_plugins_serviceuser = [ |
| {'service_name': 'HDFS', 'file_name': 'hadoop-env', 'config_name': 'hdfs_user', 'target_configname': 'ranger.plugins.hdfs.serviceuser'}, |
| {'service_name': 'HIVE', 'file_name': 'hive-env', 'config_name': 'hive_user', 'target_configname': 'ranger.plugins.hive.serviceuser'}, |
| {'service_name': 'YARN', 'file_name': 'yarn-env', 'config_name': 'yarn_user', 'target_configname': 'ranger.plugins.yarn.serviceuser'}, |
| {'service_name': 'HBASE', 'file_name': 'hbase-env', 'config_name': 'hbase_user', 'target_configname': 'ranger.plugins.hbase.serviceuser'}, |
| {'service_name': 'KNOX', 'file_name': 'knox-env', 'config_name': 'knox_user', 'target_configname': 'ranger.plugins.knox.serviceuser'}, |
| {'service_name': 'STORM', 'file_name': 'storm-env', 'config_name': 'storm_user', 'target_configname': 'ranger.plugins.storm.serviceuser'}, |
| {'service_name': 'KAFKA', 'file_name': 'kafka-env', 'config_name': 'kafka_user', 'target_configname': 'ranger.plugins.kafka.serviceuser'}, |
| {'service_name': 'RANGER_KMS', 'file_name': 'kms-env', 'config_name': 'kms_user', 'target_configname': 'ranger.plugins.kms.serviceuser'}, |
| {'service_name': 'ATLAS', 'file_name': 'atlas-env', 'config_name': 'metadata_user', 'target_configname': 'ranger.plugins.atlas.serviceuser'} |
| ] |
| |
| for item in range(len(ranger_plugins_serviceuser)): |
| if ranger_plugins_serviceuser[item]['service_name'] in servicesList: |
| file_name = ranger_plugins_serviceuser[item]['file_name'] |
| config_name = ranger_plugins_serviceuser[item]['config_name'] |
| target_configname = ranger_plugins_serviceuser[item]['target_configname'] |
| if file_name in services["configurations"] and config_name in services["configurations"][file_name]["properties"]: |
| service_user = services["configurations"][file_name]["properties"][config_name] |
| putRangerAdminProperty(target_configname, service_user) |
| |
| if "ATLAS" in servicesList: |
| if "ranger-env" in services["configurations"]: |
| putAtlasRangerAuditProperty = self.putProperty(configurations, 'ranger-atlas-audit', services) |
| xasecure_audit_destination_hdfs = '' |
| xasecure_audit_destination_hdfs_dir = '' |
| xasecure_audit_destination_solr = '' |
| if 'xasecure.audit.destination.hdfs' in configurations['ranger-env']['properties']: |
| xasecure_audit_destination_hdfs = configurations['ranger-env']['properties']['xasecure.audit.destination.hdfs'] |
| else: |
| xasecure_audit_destination_hdfs = services['configurations']['ranger-env']['properties']['xasecure.audit.destination.hdfs'] |
| |
| if 'core-site' in services['configurations'] and ('fs.defaultFS' in services['configurations']['core-site']['properties']): |
| xasecure_audit_destination_hdfs_dir = '{0}/{1}/{2}'.format(services['configurations']['core-site']['properties']['fs.defaultFS'] ,'ranger','audit') |
| |
| if 'xasecure.audit.destination.solr' in configurations['ranger-env']['properties']: |
| xasecure_audit_destination_solr = configurations['ranger-env']['properties']['xasecure.audit.destination.solr'] |
| else: |
| xasecure_audit_destination_solr = services['configurations']['ranger-env']['properties']['xasecure.audit.destination.solr'] |
| |
| putAtlasRangerAuditProperty('xasecure.audit.destination.hdfs',xasecure_audit_destination_hdfs) |
| putAtlasRangerAuditProperty('xasecure.audit.destination.hdfs.dir',xasecure_audit_destination_hdfs_dir) |
| putAtlasRangerAuditProperty('xasecure.audit.destination.solr',xasecure_audit_destination_solr) |
| |
| def validateRangerTagsyncConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| ranger_tagsync_properties = properties |
| validationItems = [] |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| |
| has_atlas = False |
| if "RANGER" in servicesList: |
| has_atlas = not "ATLAS" in servicesList |
| |
| if has_atlas and 'ranger.tagsync.source.atlas' in ranger_tagsync_properties and \ |
| ranger_tagsync_properties['ranger.tagsync.source.atlas'].lower() == 'true': |
| validationItems.append({"config-name": "ranger.tagsync.source.atlas", |
| "item": self.getWarnItem( |
| "Need to Install ATLAS service to set ranger.tagsync.source.atlas as true.")}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "ranger-tagsync-site") |
| |
| def __isServiceDeployed(self, services, serviceName): |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| return serviceName in servicesList |
| |
| """ |
| Returns the host(s) on which a requested service's component is hosted. |
| Parameters : |
| services : Configuration information for the cluster |
| serviceName : Passed-in service in consideration |
| componentName : Passed-in component in consideration |
| """ |
| def __getHostsForComponent(self, services, serviceName, componentName): |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| componentsListList = [service["components"] for service in services["services"]] |
| componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist] |
| hosts_for_component = [] |
| if serviceName in servicesList: |
| hosts_for_component = [component["hostnames"] for component in componentsList if component["component_name"] == componentName][0] |
| return hosts_for_component |
| |
| def isComponentUsingCardinalityForLayout(self, componentName): |
| return super(HDP25StackAdvisor, self).isComponentUsingCardinalityForLayout (componentName) or componentName in ['SPARK2_THRIFTSERVER', 'LIVY_SERVER'] |