blob: 7e30122fec81395bedb31855e49abbf0ed201e2d [file] [log] [blame]
#!/usr/bin/env ambari-python-wrap
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Python imports
import imp
import os
import traceback
import re
import socket
import fnmatch
import math
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
STACKS_DIR = os.path.join(SCRIPT_DIR, "../../../../")
PARENT_FILE = os.path.join(STACKS_DIR, "service_advisor.py")
try:
if "BASE_SERVICE_ADVISOR" in os.environ:
PARENT_FILE = os.environ["BASE_SERVICE_ADVISOR"]
with open(PARENT_FILE, "rb") as fp:
service_advisor = imp.load_module("service_advisor", fp, PARENT_FILE, (".py", "rb", imp.PY_SOURCE))
except Exception as e:
traceback.print_exc()
print "Failed to load parent"
class HiveServiceAdvisor(service_advisor.ServiceAdvisor):
def __init__(self, *args, **kwargs):
self.as_super = super(HiveServiceAdvisor, self)
self.as_super.__init__(*args, **kwargs)
self.initialize_logger("HiveServiceAdvisor")
# Always call these methods
self.modifyMastersWithMultipleInstances()
self.modifyCardinalitiesDict()
self.modifyHeapSizeProperties()
self.modifyNotValuableComponents()
self.modifyComponentsNotPreferableOnServer()
self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
Modify the set of masters with multiple instances.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyCardinalitiesDict(self):
"""
Modify the dictionary of cardinalities.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyHeapSizeProperties(self):
"""
Modify the dictionary of heap size properties.
Must be overriden in child class.
"""
pass
def modifyNotValuableComponents(self):
"""
Modify the set of components whose host assignment is based on other services.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyComponentsNotPreferableOnServer(self):
"""
Modify the set of components that are not preferable on the server.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyComponentLayoutSchemes(self):
"""
Modify layout scheme dictionaries for components.
The scheme dictionary basically maps the number of hosts to
host index where component should exist.
Must be overriden in child class.
"""
self.componentLayoutSchemes.update({
"HIVE_SERVER": {6: 1, 31: 2, "else": 4},
"HIVE_METASTORE": {6: 1, 31: 2, "else": 4}
})
def getServiceComponentLayoutValidations(self, services, hosts):
"""
Get a list of errors.
Must be overriden in child class.
"""
return self.getServiceComponentCardinalityValidations(services, hosts, "HIVE")
def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
"""
Entry point.
Must be overriden in child class.
"""
# self.logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
# (self.__class__.__name__, inspect.stack()[0][3]))
recommender = HiveRecommender()
recommender.recommendHiveConfigurationsFromHDP30(configurations, clusterData, services, hosts)
def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
"""
Entry point.
Validate configurations for the service. Return a list of errors.
The code for this function should be the same for each Service Advisor.
"""
# self.logger.info("Class: %s, Method: %s. Validating Configurations." %
# (self.__class__.__name__, inspect.stack()[0][3]))
validator = HiveValidator()
# Calls the methods of the validator using arguments,
# method(siteProperties, siteRecommendations, configurations, services, hosts)
return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
@staticmethod
def isKerberosEnabled(services, configurations):
"""
Determines if security is enabled by testing the value of hive-site/hive.server2.authentication enabled.
If the property exists and is equal to "kerberos", then is it enabled; otherwise is it assumed to be
disabled.
:type services: dict
:param services: the dictionary containing the existing configuration values
:type configurations: dict
:param configurations: the dictionary containing the updated configuration values
:rtype: bool
:return: True or False
"""
if configurations and "hive-site" in configurations and \
"hive.server2.authentication" in configurations["hive-site"]["properties"]:
return configurations["hive-site"]["properties"]["hive.server2.authentication"].lower() == "kerberos"
elif services and "hive-site" in services["configurations"] and \
"hive.server2.authentication" in services["configurations"]["hive-site"]["properties"]:
return services["configurations"]["hive-site"]["properties"]["hive.server2.authentication"].lower() == "kerberos"
else:
return False
class HiveRecommender(service_advisor.ServiceAdvisor):
"""
Hive Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
"""
def __init__(self, *args, **kwargs):
self.as_super = super(HiveRecommender, self)
self.as_super.__init__(*args, **kwargs)
self.HIVE_INTERACTIVE_SITE = "hive-interactive-site"
def recommendHiveConfigurationsFromHDP30(self, configurations, clusterData, services, hosts):
hiveSiteProperties = self.getSiteProperties(services["configurations"], "hive-site")
hiveEnvProperties = self.getSiteProperties(services["configurations"], "hive-env")
putHiveEnvProperty = self.putProperty(configurations, "hive-env", services)
putHiveSiteProperty = self.putProperty(configurations, "hive-site", services)
putHiveProperty = self.putProperty(configurations, "hive-site", services)
putHiveServerProperty = self.putProperty(configurations, "hiveserver2-site", services)
putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services)
putRangerHivePluginProperty = self.putProperty(configurations, "ranger-hive-plugin-properties", services)
putHiveAtlasHookProperty = self.putProperty(configurations, "hive-atlas-application.properties", services)
putHiveSitePropertyAttribute = self.putPropertyAttribute(configurations, "hive-site")
putHiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-env")
putHiveServerPropertyAttribute = self.putPropertyAttribute(configurations, "hiveserver2-site")
putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-site")
putHiveAtlasHookPropertyAttribute = self.putPropertyAttribute(configurations,"hive-atlas-application.properties")
hive_server_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER", services, hosts)
hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts)
hive_client_hosts = self.getHostsWithComponent("HIVE", "HIVE_CLIENT", services, hosts)
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
# druid is not in list of services to be installed
if 'DRUID' in servicesList:
putHiveInteractiveSiteProperty = self.putProperty(configurations, "hive-interactive-site", services)
druid_coordinator_host_port = self.druid_host('DRUID_COORDINATOR', 'druid-coordinator', services, hosts, default_host='localhost:8081')
druid_overlord_host_port = self.druid_host('DRUID_OVERLORD', 'druid-overlord', services, hosts, default_host='localhost:8090')
druid_broker_host_port = self.druid_host('DRUID_ROUTER', 'druid-router', services, hosts)
if druid_broker_host_port is None:
druid_broker_host_port = self.druid_host('DRUID_BROKER', 'druid-broker', services, hosts, default_host='localhost:8083')
druid_metadata_uri = ""
druid_metadata_user = ""
druid_metadata_type = ""
if 'druid-common' in services['configurations']:
druid_metadata_uri = services['configurations']['druid-common']['properties']['druid.metadata.storage.connector.connectURI']
druid_metadata_type = services['configurations']['druid-common']['properties']['druid.metadata.storage.type']
if 'druid.metadata.storage.connector.user' in services['configurations']['druid-common']['properties']:
druid_metadata_user = services['configurations']['druid-common']['properties']['druid.metadata.storage.connector.user']
else:
druid_metadata_user = ""
putHiveInteractiveSiteProperty('hive.druid.broker.address.default', druid_broker_host_port)
putHiveInteractiveSiteProperty('hive.druid.coordinator.address.default', druid_coordinator_host_port)
putHiveInteractiveSiteProperty('hive.druid.overlord.address.default', druid_overlord_host_port)
putHiveInteractiveSiteProperty('hive.druid.metadata.uri', druid_metadata_uri)
putHiveInteractiveSiteProperty('hive.druid.metadata.username', druid_metadata_user)
putHiveInteractiveSiteProperty('hive.druid.metadata.db.type', druid_metadata_type)
# javax.jdo.option.ConnectionURL recommendations
if hiveEnvProperties and self.checkSiteProperties(hiveEnvProperties, "hive_database", "hive_database_type"):
putHiveEnvProperty("hive_database_type", self.getDBTypeAlias(hiveEnvProperties["hive_database"]))
if hiveEnvProperties and hiveSiteProperties and self.checkSiteProperties(hiveSiteProperties, "javax.jdo.option.ConnectionDriverName") and self.checkSiteProperties(hiveEnvProperties, "hive_database"):
putHiveProperty("javax.jdo.option.ConnectionDriverName", self.getDBDriver(hiveEnvProperties["hive_database"]))
if hiveSiteProperties and hiveEnvProperties and self.checkSiteProperties(hiveSiteProperties, "ambari.hive.db.schema.name", "javax.jdo.option.ConnectionURL") and self.checkSiteProperties(hiveEnvProperties, "hive_database"):
hiveServerHost = self.getHostWithComponent("HIVE", "HIVE_SERVER", services, hosts)
hiveDBConnectionURL = hiveSiteProperties["javax.jdo.option.ConnectionURL"]
protocol = self.getProtocol(hiveEnvProperties["hive_database"])
oldSchemaName = self.getOldValue(services, "hive-site", "ambari.hive.db.schema.name")
oldDBType = self.getOldValue(services, "hive-env", "hive_database")
# under these if constructions we are checking if hive server hostname available,
# if it's default db connection url with "localhost" or if schema name was changed or if db type was changed (only for db type change from default mysql to existing mysql)
# or if protocol according to current db type differs with protocol in db connection url(other db types changes)
if hiveServerHost is not None:
if (hiveDBConnectionURL and "//localhost" in hiveDBConnectionURL) or oldSchemaName or oldDBType or (protocol and hiveDBConnectionURL and not hiveDBConnectionURL.startswith(protocol)):
dbConnection = self.getDBConnectionString(hiveEnvProperties["hive_database"]).format(hiveServerHost["Hosts"]["host_name"], hiveSiteProperties["ambari.hive.db.schema.name"])
putHiveProperty("javax.jdo.option.ConnectionURL", dbConnection)
# Transactions
cpu_count = 0
for hostData in hive_server_hosts:
cpu_count = max(cpu_count, hostData["Hosts"]["cpu_count"])
putHiveSiteProperty("hive.compactor.worker.threads", str(max(cpu_count / 8, 1)))
hiveMetastoreHost = self.getHostWithComponent("HIVE", "HIVE_METASTORE", services, hosts)
if hiveMetastoreHost is not None and len(hiveMetastoreHost) > 0:
putHiveSiteProperty("hive.metastore.uris", "thrift://" + hiveMetastoreHost["Hosts"]["host_name"] + ":9083")
# DAS Hook
putHiveEnvProperty("hive_timeline_logging_enabled", "false")
das_hook_class = "org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook"
hooks_properties = ["hive.exec.pre.hooks", "hive.exec.post.hooks", "hive.exec.failure.hooks"]
for hooks_property in hooks_properties:
if hooks_property in configurations["hive-site"]["properties"]:
hooks_value = configurations["hive-site"]["properties"][hooks_property]
else:
hooks_value = " "
if das_hook_class not in hooks_value:
if hooks_value == " ":
hooks_value = das_hook_class
else:
hooks_value = hooks_value + "," + das_hook_class
# Put updated hooks property
# Maybe org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook has a bug, so I comment out this code
# putHiveSiteProperty(hooks_property, hooks_value)
if not "yarn-site" in configurations:
self.calculateYarnAllocationSizes(configurations, services, hosts)
containerSize = clusterData["mapMemory"] if clusterData["mapMemory"] > 2048 else int(clusterData["reduceMemory"])
containerSize = min(clusterData["containers"] * clusterData["ramPerContainer"], containerSize)
container_size_bytes = int(containerSize)*1024*1024
yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]))
#duplicate tez task resource calc logic, direct dependency doesn't look good here (in case of Hive without Tez)
tez_container_size = min(containerSize, yarnMaxAllocationSize)
putHiveSitePropertyAttribute("hive.tez.container.size", "minimum", int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]))
putHiveSitePropertyAttribute("hive.tez.container.size", "maximum", int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]))
if "yarn-site" in configurations:
if "yarn.scheduler.minimum-allocation-mb" in configurations["yarn-site"]["properties"]:
putHiveSitePropertyAttribute("hive.tez.container.size", "minimum", int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]))
tez_container_size = max(tez_container_size, int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]))
if "yarn.scheduler.maximum-allocation-mb" in configurations["yarn-site"]["properties"]:
putHiveSitePropertyAttribute("hive.tez.container.size", "maximum", int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]))
tez_container_size = min(tez_container_size, int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]))
putHiveSiteProperty("hive.tez.container.size", tez_container_size)
tez_container_size_bytes = int(int(tez_container_size)*0.8*1024*1024) # Xmx == 80% of container
# Memory
putHiveSiteProperty("hive.auto.convert.join.noconditionaltask.size", int(round(tez_container_size_bytes/3)))
putHiveSitePropertyAttribute("hive.auto.convert.join.noconditionaltask.size", "maximum", tez_container_size_bytes)
# CBO
if "hive-site" in services["configurations"] and "hive.cbo.enable" in services["configurations"]["hive-site"]["properties"]:
hive_cbo_enable = services["configurations"]["hive-site"]["properties"]["hive.cbo.enable"]
putHiveSiteProperty("hive.stats.fetch.partition.stats", hive_cbo_enable)
putHiveSiteProperty("hive.stats.fetch.column.stats", hive_cbo_enable)
# Interactive Query
yarn_queues = "default"
capacitySchedulerProperties = {}
if "capacity-scheduler" in services["configurations"]:
if "capacity-scheduler" in services["configurations"]["capacity-scheduler"]["properties"]:
properties = str(services["configurations"]["capacity-scheduler"]["properties"]["capacity-scheduler"]).split("\n")
for property in properties:
key,sep,value = property.partition("=")
capacitySchedulerProperties[key] = value
if "yarn.scheduler.capacity.root.queues" in capacitySchedulerProperties:
yarn_queues = str(capacitySchedulerProperties["yarn.scheduler.capacity.root.queues"])
elif "yarn.scheduler.capacity.root.queues" in services["configurations"]["capacity-scheduler"]["properties"]:
yarn_queues = services["configurations"]["capacity-scheduler"]["properties"]["yarn.scheduler.capacity.root.queues"]
toProcessQueues = yarn_queues.split(",")
leafQueueNames = set() # Remove duplicates
while len(toProcessQueues) > 0:
queue = toProcessQueues.pop()
queueKey = "yarn.scheduler.capacity.root." + queue + ".queues"
if queueKey in capacitySchedulerProperties:
# This is a parent queue - need to add children
subQueues = capacitySchedulerProperties[queueKey].split(",")
for subQueue in subQueues:
toProcessQueues.append(queue + "." + subQueue)
else:
# This is a leaf queue
queueName = queue.split(".")[-1] # Fully qualified queue name does not work, we should use only leaf name
leafQueueNames.add(queueName)
leafQueues = [{"label": str(queueName) + " queue", "value": queueName} for queueName in leafQueueNames]
leafQueues = sorted(leafQueues, key=lambda q:q["value"])
putHiveSitePropertyAttribute("hive.server2.tez.default.queues", "entries", leafQueues)
putHiveSiteProperty("hive.server2.tez.default.queues", ",".join([leafQueue["value"] for leafQueue in leafQueues]))
#HSI HA
is_hsi_ha = len(hive_server_interactive_hosts) > 1
putHiveInteractiveSitePropertyAttribute("hive.server2.active.passive.ha.registry.namespace", "visible", str(is_hsi_ha).lower())
# Security
if ("configurations" not in services) or ("hive-env" not in services["configurations"]) or \
("properties" not in services["configurations"]["hive-env"]) or \
("hive_security_authorization" not in services["configurations"]["hive-env"]["properties"]) or \
str(services["configurations"]["hive-env"]["properties"]["hive_security_authorization"]).lower() == "none":
putHiveEnvProperty("hive_security_authorization", "None")
else:
putHiveEnvProperty("hive_security_authorization", services["configurations"]["hive-env"]["properties"]["hive_security_authorization"])
# Recommend Ranger Hive authorization as per Ranger Hive plugin property
if "ranger-env" in services["configurations"] and "hive-env" in services["configurations"] and \
"ranger-hive-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
rangerEnvHivePluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hive-plugin-enabled"]
rangerEnvHiveAuthProperty = services["configurations"]["hive-env"]["properties"]["hive_security_authorization"]
if (rangerEnvHivePluginProperty.lower() == "yes"):
putHiveEnvProperty("hive_security_authorization", "Ranger")
elif (rangerEnvHiveAuthProperty.lower() == "ranger"):
putHiveEnvProperty("hive_security_authorization", "None")
try:
auth_manager_value = str(configurations["hive-env"]["properties"]["hive.security.metastore.authorization.manager"])
except KeyError:
auth_manager_value = "org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider"
pass
auth_manager_values = auth_manager_value.split(",")
sqlstdauth_class = "org.apache.hadoop.hive.ql.security.authorization.MetaStoreAuthzAPIAuthorizerEmbedOnly"
# hive_security_authorization == "sqlstdauth"
if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "sqlstdauth":
putHiveSiteProperty("hive.server2.enable.doAs", "false")
putHiveServerProperty("hive.security.authorization.enabled", "true")
putHiveServerProperty("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory")
putHiveServerProperty("hive.security.authenticator.manager", "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator")
putHiveServerProperty("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.security.metastore.authorization.manager,"
"hive.security.metastore.authenticator.manager,hive.users.in.admin.role,hive.server2.xsrf.filter.enabled,hive.security.authorization.enabled")
if sqlstdauth_class not in auth_manager_values:
auth_manager_values.append(sqlstdauth_class)
elif sqlstdauth_class in auth_manager_values:
#remove item from csv
auth_manager_values = [x for x in auth_manager_values if x != sqlstdauth_class]
pass
putHiveSiteProperty("hive.security.metastore.authorization.manager", ",".join(auth_manager_values))
# hive_security_authorization == "ranger"
if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "ranger":
putHiveSiteProperty("hive.server2.enable.doAs", "false")
putHiveServerProperty("hive.security.authorization.enabled", "true")
putHiveServerProperty("hive.security.authorization.manager", "org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory")
putHiveServerProperty("hive.security.authenticator.manager", "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator")
putHiveServerProperty("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.security.metastore.authorization.manager,"
"hive.security.metastore.authenticator.manager,hive.users.in.admin.role,hive.server2.xsrf.filter.enabled,hive.security.authorization.enabled")
# hive_security_authorization == "None"
if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "none":
putHiveSiteProperty("hive.server2.enable.doAs", "true")
putHiveServerProperty("hive.security.authorization.enabled", "false")
putHiveServerPropertyAttribute("hive.security.authorization.manager", "delete", "true")
putHiveServerPropertyAttribute("hive.security.authenticator.manager", "delete", "true")
putHiveServerPropertyAttribute("hive.conf.restricted.list", "delete", "true")
#Hive authentication
hive_server2_auth = None
if "hive-site" in services["configurations"] and "hive.server2.authentication" in services["configurations"]["hive-site"]["properties"]:
hive_server2_auth = str(services["configurations"]["hive-site"]["properties"]["hive.server2.authentication"]).lower()
elif "hive.server2.authentication" in configurations["hive-site"]["properties"]:
hive_server2_auth = str(configurations["hive-site"]["properties"]["hive.server2.authentication"]).lower()
if hive_server2_auth == "ldap":
putHiveSiteProperty("hive.server2.authentication.ldap.url", "")
putHiveSitePropertyAttribute("hive.server2.authentication.ldap.url", "delete", "false")
else:
if ("hive.server2.authentication.ldap.url" in configurations["hive-site"]["properties"]) or \
("hive-site" not in services["configurations"]) or \
("hive-site" in services["configurations"] and "hive.server2.authentication.ldap.url" in services["configurations"]["hive-site"]["properties"]):
putHiveSitePropertyAttribute("hive.server2.authentication.ldap.url", "delete", "true")
if hive_server2_auth == "kerberos":
if "hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.keytab" not in services["configurations"]["hive-site"]["properties"]:
putHiveSiteProperty("hive.server2.authentication.kerberos.keytab", "")
if "hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.principal" not in services["configurations"]["hive-site"]["properties"]:
putHiveSiteProperty("hive.server2.authentication.kerberos.principal", "")
elif "KERBEROS" not in servicesList: # Since "hive_server2_auth" cannot be relied on within the default, empty recommendations request
if ("hive.server2.authentication.kerberos.keytab" in configurations["hive-site"]["properties"]) or \
("hive-site" not in services["configurations"]) or \
("hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.keytab" in services["configurations"]["hive-site"]["properties"]):
putHiveSitePropertyAttribute("hive.server2.authentication.kerberos.keytab", "delete", "true")
if ("hive.server2.authentication.kerberos.principal" in configurations["hive-site"]["properties"]) or \
("hive-site" not in services["configurations"]) or \
("hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.principal" in services["configurations"]["hive-site"]["properties"]):
putHiveSitePropertyAttribute("hive.server2.authentication.kerberos.principal", "delete", "true")
if hive_server2_auth == "pam":
putHiveSiteProperty("hive.server2.authentication.pam.services", "")
else:
if ("hive.server2.authentication.pam.services" in configurations["hive-site"]["properties"]) or \
("hive-site" not in services["configurations"]) or \
("hive-site" in services["configurations"] and "hive.server2.authentication.pam.services" in services["configurations"]["hive-site"]["properties"]):
putHiveSitePropertyAttribute("hive.server2.authentication.pam.services", "delete", "true")
if hive_server2_auth == "custom":
putHiveSiteProperty("hive.server2.custom.authentication.class", "")
else:
if ("hive.server2.authentication" in configurations["hive-site"]["properties"]) or \
("hive-site" not in services["configurations"]) or \
("hive-site" in services["configurations"] and "hive.server2.custom.authentication.class" in services["configurations"]["hive-site"]["properties"]):
putHiveSitePropertyAttribute("hive.server2.custom.authentication.class", "delete", "true")
# HiveServer, Client, Metastore heapsize
hs_heapsize_multiplier = 3.0/8
hm_heapsize_multiplier = 1.0/8
# HiveServer2 and HiveMetastore located on the same host
if hive_server_hosts is not None and len(hive_server_hosts):
hs_host_ram = hive_server_hosts[0]["Hosts"]["total_mem"]/1024
putHiveEnvProperty("hive.metastore.heapsize", max(512, int(hs_host_ram*hm_heapsize_multiplier)))
putHiveEnvProperty("hive.heapsize", max(512, int(hs_host_ram*hs_heapsize_multiplier)))
putHiveEnvPropertyAttribute("hive.metastore.heapsize", "maximum", max(1024, hs_host_ram))
putHiveEnvPropertyAttribute("hive.heapsize", "maximum", max(1024, hs_host_ram))
# if hive using sqla db, then we should add DataNucleus property
sqla_db_used = "hive-env" in services["configurations"] and "hive_database" in services["configurations"]["hive-env"]["properties"] and \
services["configurations"]["hive-env"]["properties"]["hive_database"] == "Existing SQL Anywhere Database"
if sqla_db_used:
putHiveSiteProperty("datanucleus.rdbms.datastoreAdapterClassName","org.datanucleus.store.rdbms.adapter.SQLAnywhereAdapter")
else:
putHiveSitePropertyAttribute("datanucleus.rdbms.datastoreAdapterClassName", "delete", "true")
# Atlas
hooks_value = ""
if "hive.exec.post.hooks" in configurations["hive-site"]["properties"]:
hooks_value = configurations["hive-site"]["properties"]["hive.exec.post.hooks"]
hive_hooks = [x.strip() for x in hooks_value.split(",")]
hive_hooks = [x for x in hive_hooks if x != ""]
is_atlas_present_in_cluster = "ATLAS" in servicesList
enable_external_atlas_for_hive = False
if "hive-atlas-application.properties" in services["configurations"] and "enable.external.atlas.for.hive" in services["configurations"]["hive-atlas-application.properties"]["properties"]:
enable_external_atlas_for_hive = services["configurations"]["hive-atlas-application.properties"]["properties"]["enable.external.atlas.for.hive"].lower() == "true"
if is_atlas_present_in_cluster or enable_external_atlas_for_hive:
putHiveEnvProperty("hive.atlas.hook", "true")
else:
putHiveEnvProperty("hive.atlas.hook", "false")
enable_atlas_hook = False
if "hive-env" in configurations and "hive.atlas.hook" in configurations["hive-env"]["properties"]:
enable_atlas_hook = configurations["hive-env"]["properties"]["hive.atlas.hook"] == "true"
elif "hive-env" in services["configurations"] and "hive.atlas.hook" in services["configurations"]["hive-env"]["properties"]:
enable_atlas_hook = services["configurations"]["hive-env"]["properties"]["hive.atlas.hook"] == "true"
atlas_hook_class = "org.apache.atlas.hive.hook.HiveHook"
if enable_atlas_hook:
# Append atlas hook if not already present.
is_atlas_hook_in_config = atlas_hook_class in hive_hooks
if not is_atlas_hook_in_config:
hive_hooks.append(atlas_hook_class)
else:
# Remove the atlas hook since Atlas service is not present.
hive_hooks = [x for x in hive_hooks if x != atlas_hook_class]
# Convert hive_hooks back to a csv, unless there are 0 elements, which should be " "
hooks_value = " " if len(hive_hooks) == 0 else ",".join(hive_hooks)
putHiveSiteProperty("hive.exec.post.hooks", hooks_value)
# This is no longer used in HDP 2.5, but still needed in HDP 2.3 and 2.4
atlas_server_host_info = self.getHostWithComponent("ATLAS", "ATLAS_SERVER", services, hosts)
if is_atlas_present_in_cluster and atlas_server_host_info:
atlas_rest_host = atlas_server_host_info["Hosts"]["host_name"]
scheme = "http"
metadata_port = "21000"
atlas_server_default_https_port = "21443"
tls_enabled = "false"
if "application-properties" in services["configurations"]:
if "atlas.enableTLS" in services["configurations"]["application-properties"]["properties"]:
tls_enabled = services["configurations"]["application-properties"]["properties"]["atlas.enableTLS"]
if "atlas.server.http.port" in services["configurations"]["application-properties"]["properties"]:
metadata_port = services["configurations"]["application-properties"]["properties"]["atlas.server.http.port"]
if tls_enabled.lower() == "true":
scheme = "https"
if "atlas.server.https.port" in services["configurations"]["application-properties"]["properties"]:
metadata_port = services["configurations"]["application-properties"]["properties"]["atlas.server.https.port"]
else:
metadata_port = atlas_server_default_https_port
putHiveSiteProperty("atlas.rest.address", "{0}://{1}:{2}".format(scheme, atlas_rest_host, metadata_port))
else:
putHiveSitePropertyAttribute("atlas.cluster.name", "delete", "true")
putHiveSitePropertyAttribute("atlas.rest.address", "delete", "true")
# For "Hive Server Interactive", if the component exists.
hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
hsi_properties = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE)
if len(hsi_hosts) > 0:
putHiveInteractiveEnvProperty("enable_hive_interactive", "true")
# Update "hive.llap.daemon.queue.name" property attributes if capacity scheduler is changed.
if hsi_properties and "hive.llap.daemon.queue.name" in hsi_properties:
self.setLlapDaemonQueuePropAttributes(services, configurations)
hsi_conf_properties = self.getSiteProperties(configurations, self.HIVE_INTERACTIVE_SITE)
hive_tez_default_queue = hsi_properties["hive.llap.daemon.queue.name"]
if hsi_conf_properties and "hive.llap.daemon.queue.name" in hsi_conf_properties:
hive_tez_default_queue = hsi_conf_properties["hive.llap.daemon.queue.name"]
if hive_tez_default_queue:
putHiveInteractiveSiteProperty("hive.server2.tez.default.queues", hive_tez_default_queue)
self.logger.debug("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue))
else:
self.logger.info("DBG: Setting 'num_llap_nodes' config's READ ONLY attribute as 'True'.")
putHiveInteractiveEnvProperty("enable_hive_interactive", "false")
putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", "true")
if hsi_properties and "hive.llap.zk.sm.connectionString" in hsi_properties:
zookeeper_host_port = self.getZKHostPortString(services)
if zookeeper_host_port:
putHiveInteractiveSiteProperty("hive.llap.zk.sm.connectionString", zookeeper_host_port)
hive_user = "hive"
if "hive-env" in services["configurations"] and "hive_user" in services["configurations"]["hive-env"]["properties"]:
hive_user = services["configurations"]["hive-env"]["properties"]["hive_user"]
# Ranger user
ranger_hive_plugin_enabled = False
if "hive-env" in configurations and "hive_security_authorization" in configurations["hive-env"]["properties"]:
ranger_hive_plugin_enabled = (configurations["hive-env"]["properties"]["hive_security_authorization"].lower() == "ranger")
elif "hive-env" in services["configurations"] and "hive_security_authorization" in services["configurations"]["hive-env"]["properties"]:
ranger_hive_plugin_enabled = (services["configurations"]["hive-env"]["properties"]["hive_security_authorization"].lower() == "ranger")
if ranger_hive_plugin_enabled and "ranger-hive-plugin-properties" in services["configurations"] and "REPOSITORY_CONFIG_USERNAME" in services["configurations"]["ranger-hive-plugin-properties"]["properties"]:
self.logger.info("Setting Hive Repo user for Ranger.")
putRangerHivePluginProperty("REPOSITORY_CONFIG_USERNAME", hive_user)
else:
self.logger.info("Not setting Hive Repo user for Ranger.")
# Atlas Kerberos settings
if "hive-atlas-application.properties" in services["configurations"]:
security_enabled = HiveServiceAdvisor.isKerberosEnabled(services, configurations)
enable_atlas_hook = False
if "hive-env" in configurations and "hive.atlas.hook" in configurations["hive-env"]["properties"]:
enable_atlas_hook = configurations["hive-env"]["properties"]["hive.atlas.hook"].lower() == "true"
elif "hive-env" in services["configurations"] and "hive.atlas.hook" in services["configurations"]["hive-env"]["properties"]:
enable_atlas_hook = services["configurations"]["hive-env"]["properties"]["hive.atlas.hook"].lower() == "true"
if security_enabled and enable_atlas_hook:
putHiveAtlasHookProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", "required")
putHiveAtlasHookProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName", "com.sun.security.auth.module.Krb5LoginModule")
putHiveAtlasHookProperty("atlas.jaas.ticketBased-KafkaClient.option.useTicketCache", "true")
else:
putHiveAtlasHookPropertyAttribute("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", "delete", "true")
putHiveAtlasHookPropertyAttribute("atlas.jaas.ticketBased-KafkaClient.loginModuleName", "delete", "true")
putHiveAtlasHookPropertyAttribute("atlas.jaas.ticketBased-KafkaClient.option.useTicketCache", "delete", "true")
#beeline-site
beeline_jdbc_url_default = "llap" if (hive_server_interactive_hosts and not hive_server_hosts) else "container"
putHiveEnvProperty("beeline_jdbc_url_default", beeline_jdbc_url_default)
def druid_host(self, component_name, config_type, services, hosts, default_host=None):
hosts = self.getHostsWithComponent('DRUID', component_name, services, hosts)
if hosts and config_type in services['configurations']:
host = hosts[0]['Hosts']['host_name']
port = services['configurations'][config_type]['properties']['druid.port']
return "%s:%s" % (host, port)
else:
return default_host
def setLlapDaemonQueuePropAttributes(self, services, configurations):
"""
Checks and sets the 'Hive Server Interactive' 'hive.llap.daemon.queue.name' config Property Attributes. Takes into
account that "capacity-scheduler' may have changed (got updated) in current Stack Advisor invocation.
"""
self.logger.info("Determining 'hive.llap.daemon.queue.name' config Property Attributes.")
#TODO Determine if this is doing the right thing if some queue is setup with capacity=0, or is STOPPED. Maybe don't list it.
putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE)
capacity_scheduler_properties = dict()
# Read "capacity-scheduler" from configurations if we modified and added recommendation to it, as part of current
# StackAdvisor invocation.
if "capacity-scheduler" in configurations:
cap_sched_props_as_dict = configurations["capacity-scheduler"]["properties"]
if "capacity-scheduler" in cap_sched_props_as_dict:
cap_sched_props_as_str = configurations["capacity-scheduler"]["properties"]["capacity-scheduler"]
if cap_sched_props_as_str:
cap_sched_props_as_str = str(cap_sched_props_as_str).split("\n")
if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != "null":
# Got "capacity-scheduler" configs as one "\n" separated string
for property in cap_sched_props_as_str:
key, sep, value = property.partition("=")
capacity_scheduler_properties[key] = value
self.logger.info("'capacity-scheduler' configs is set as a single '\\n' separated string in current invocation. "
"count(configurations['capacity-scheduler']['properties']['capacity-scheduler']) = "
"{0}".format(len(capacity_scheduler_properties)))
else:
self.logger.info("Read configurations['capacity-scheduler']['properties']['capacity-scheduler'] is : {0}".format(cap_sched_props_as_str))
else:
self.logger.info("configurations['capacity-scheduler']['properties']['capacity-scheduler'] : {0}.".format(cap_sched_props_as_str))
# if "capacity_scheduler_properties" is empty, implies we may have "capacity-scheduler" configs as dictionary
# in configurations, if "capacity-scheduler" changed in current invocation.
if not capacity_scheduler_properties:
if isinstance(cap_sched_props_as_dict, dict) and len(cap_sched_props_as_dict) > 1:
capacity_scheduler_properties = cap_sched_props_as_dict
self.logger.info("'capacity-scheduler' changed in current Stack Advisor invocation. Retrieved the configs as dictionary from configurations.")
else:
self.logger.info("Read configurations['capacity-scheduler']['properties'] is : {0}".format(cap_sched_props_as_dict))
else:
self.logger.info("'capacity-scheduler' not modified in the current Stack Advisor invocation.")
# if "capacity_scheduler_properties" is still empty, implies "capacity_scheduler" wasn't change in current
# SA invocation. Thus, read it from input : "services".
if not capacity_scheduler_properties:
capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
self.logger.info("'capacity-scheduler' not changed in current Stack Advisor invocation. Retrieved the configs from services.")
# Get set of current YARN leaf queues.
leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
if leafQueueNames:
leafQueues = [{"label": str(queueName), "value": queueName} for queueName in leafQueueNames]
leafQueues = sorted(leafQueues, key=lambda q: q["value"])
putHiveInteractiveSitePropertyAttribute("hive.llap.daemon.queue.name", "entries", leafQueues)
self.logger.info("'hive.llap.daemon.queue.name' config Property Attributes set to : {0}".format(leafQueues))
else:
self.logger.error("Problem retrieving YARN queues. Skipping updating HIVE Server Interactve "
"'hive.server2.tez.default.queues' property attributes.")
class HiveValidator(service_advisor.ServiceAdvisor):
"""
Hive Validator checks the correctness of properties whenever the service is first added or the user attempts to
change configs via the UI.
"""
def __init__(self, *args, **kwargs):
self.as_super = super(HiveValidator, self)
self.as_super.__init__(*args, **kwargs)
self.HIVE_INTERACTIVE_SITE = "hive-interactive-site"
self.AMBARI_MANAGED_LLAP_QUEUE_NAME = "llap"
self.validators = [("hive-site", self.validateHiveConfigurationsFromHDP30),
("hive-env", self.validateHiveConfigurationsEnvFromHDP30),
("hiveserver2-site", self.validateHiveServer2ConfigurationsFromHDP30),
("hive-interactive-env", self.validateHiveInteractiveEnvConfigurationsFromHDP30),
("hive-interactive-site", self.validateHiveInteractiveSiteConfigurationsFromHDP30)]
def validateHiveConfigurationsFromHDP30(self, properties, recommendedDefaults, configurations, services, hosts):
validationItems = [ {"config-name": "hive.tez.container.size", "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hive.tez.container.size")},
{"config-name": "hive.auto.convert.join.noconditionaltask.size", "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hive.auto.convert.join.noconditionaltask.size")} ]
hive_site = properties
hive_env = self.getSiteProperties(configurations, "hive-env")
yarn_site = self.getSiteProperties(configurations, "yarn-site")
if yarn_site:
yarnSchedulerMaximumAllocationMb = self.to_number(yarn_site["yarn.scheduler.maximum-allocation-mb"])
hiveTezContainerSize = self.to_number(properties["hive.tez.container.size"])
if hiveTezContainerSize is not None and yarnSchedulerMaximumAllocationMb is not None and hiveTezContainerSize > yarnSchedulerMaximumAllocationMb:
validationItems.append({"config-name": "hive.tez.container.size", "item": self.getWarnItem("hive.tez.container.size is greater than the maximum container size specified in yarn.scheduler.maximum-allocation-mb")})
authentication_property = "hive.server2.authentication"
ldap_baseDN_property = "hive.server2.authentication.ldap.baseDN"
ldap_domain_property = "hive.server2.authentication.ldap.Domain"
if authentication_property in properties and properties[authentication_property].lower() == "ldap" \
and not (ldap_baseDN_property in properties or ldap_domain_property in properties):
validationItems.append({"config-name" : authentication_property, "item" :
self.getWarnItem("According to LDAP value for " + authentication_property + ", you should add " +
ldap_domain_property + " property, if you are using AD, if not, then " + ldap_baseDN_property + "!")})
hive_enforce_bucketing = "hive.enforce.bucketing"
if hive_enforce_bucketing in properties and properties[hive_enforce_bucketing].lower() == "false":
validationItems.append({"config-name" : hive_enforce_bucketing, "item" :
self.getWarnItem("Set " + hive_enforce_bucketing + " to true otherwise there is a potential of data corruption!")})
sqla_db_used = "hive_database" in hive_env and \
hive_env["hive_database"] == "Existing SQL Anywhere Database"
prop_name = "datanucleus.rdbms.datastoreAdapterClassName"
prop_value = "org.datanucleus.store.rdbms.adapter.SQLAnywhereAdapter"
if sqla_db_used:
if not prop_name in hive_site:
validationItems.append({"config-name": prop_name,
"item": self.getWarnItem(
"If Hive using SQL Anywhere db." \
" {0} needs to be added with value {1}".format(prop_name,prop_value))})
elif prop_name in hive_site and hive_site[prop_name] != "org.datanucleus.store.rdbms.adapter.SQLAnywhereAdapter":
validationItems.append({"config-name": prop_name,
"item": self.getWarnItem(
"If Hive using SQL Anywhere db." \
" {0} needs to be set to {1}".format(prop_name,prop_value))})
return self.toConfigurationValidationProblems(validationItems, "hive-site")
def validateHiveConfigurationsEnvFromHDP30(self, properties, recommendedDefaults, configurations, services, hosts):
validationItems = []
hive_env = properties
hive_site = self.getSiteProperties(configurations, "hive-site")
hiveserver2_site = self.getSiteProperties(configurations, "hiveserver2-site")
hive_server_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER", services, hosts)
hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts)
hive_client_hosts = self.getHostsWithComponent("HIVE", "HIVE_CLIENT", services, hosts)
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
if "hive_security_authorization" in hive_env and \
str(hive_env["hive_security_authorization"]).lower() == "none" \
and str(hiveserver2_site["hive.security.authorization.enabled"]).lower() == "true":
authorization_item = self.getErrorItem("hive_security_authorization should not be None "
"if hive.security.authorization.enabled is set")
validationItems.append({"config-name": "hive_security_authorization", "item": authorization_item})
if "hive_security_authorization" in hive_env and \
str(hive_env["hive_security_authorization"]).lower() == "ranger":
# ranger-hive-plugin must be enabled in ranger-env
if "RANGER" in servicesList:
ranger_env = self.getServicesSiteProperties(services, "ranger-env")
if not ranger_env or not "ranger-hive-plugin-enabled" in ranger_env or \
ranger_env["ranger-hive-plugin-enabled"].lower() != "yes":
validationItems.append({"config-name": "hive_security_authorization",
"item": self.getWarnItem(
"ranger-env/ranger-hive-plugin-enabled must be enabled when hive_security_authorization is set to Ranger")})
if "hive.server2.authentication" in hive_site and "LDAP" == hive_site["hive.server2.authentication"]:
if "alert_ldap_username" not in hive_env or hive_env["alert_ldap_username"] == "":
validationItems.append({"config-name": "alert_ldap_username",
"item": self.getWarnItem(
"Provide an user to be used for alerts. Hive authentication type LDAP requires valid LDAP credentials for the alerts.")})
if "alert_ldap_password" not in hive_env or hive_env["alert_ldap_password"] == "":
validationItems.append({"config-name": "alert_ldap_password",
"item": self.getWarnItem(
"Provide the password for the alert user. Hive authentication type LDAP requires valid LDAP credentials for the alerts.")})
beeline_jdbc_url_default = hive_env["beeline_jdbc_url_default"]
if beeline_jdbc_url_default not in ["container", "llap"]:
validationItems.append({"config-name": "beeline_jdbc_url_default",
"item": self.getWarnItem(
"beeline_jdbc_url_default should be \"container\" or \"llap\".")})
if beeline_jdbc_url_default == "container" and not hive_server_hosts and hive_server_interactive_hosts:
validationItems.append({"config-name": "beeline_jdbc_url_default",
"item": self.getWarnItem(
"beeline_jdbc_url_default may not be \"container\" if only HSI is installed.")})
if beeline_jdbc_url_default == "llap" and not hive_server_interactive_hosts:
validationItems.append({"config-name": "beeline_jdbc_url_default",
"item": self.getWarnItem(
"beeline_jdbc_url_default may not be \"llap\" if no HSI is installed.")})
return self.toConfigurationValidationProblems(validationItems, "hive-env")
def validateHiveServer2ConfigurationsFromHDP30(self, properties, recommendedDefaults, configurations, services, hosts):
hive_server2 = properties
validationItems = []
#Adding Ranger Plugin logic here
ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hive-plugin-properties")
hive_env_properties = self.getSiteProperties(configurations, "hive-env")
ranger_plugin_enabled = "hive_security_authorization" in hive_env_properties and hive_env_properties["hive_security_authorization"].lower() == "ranger"
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
##Add stack validations only if Ranger is enabled.
if ("RANGER" in servicesList):
##Add stack validations for Ranger plugin enabled.
if ranger_plugin_enabled:
prop_name = "hive.security.authorization.manager"
prop_val = "org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory"
if prop_name in hive_server2 and hive_server2[prop_name] != prop_val:
validationItems.append({"config-name": prop_name,
"item": self.getWarnItem(
"If Ranger Hive Plugin is enabled."\
" {0} under hiveserver2-site needs to be set to {1}".format(prop_name,prop_val))})
prop_name = "hive.security.authenticator.manager"
prop_val = "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator"
if prop_name in hive_server2 and hive_server2[prop_name] != prop_val:
validationItems.append({"config-name": prop_name,
"item": self.getWarnItem(
"If Ranger Hive Plugin is enabled."\
" {0} under hiveserver2-site needs to be set to {1}".format(prop_name,prop_val))})
prop_name = "hive.security.authorization.enabled"
prop_val = "true"
if prop_name in hive_server2 and hive_server2[prop_name] != prop_val:
validationItems.append({"config-name": prop_name,
"item": self.getWarnItem(
"If Ranger Hive Plugin is enabled."\
" {0} under hiveserver2-site needs to be set to {1}".format(prop_name, prop_val))})
prop_name = "hive.conf.restricted.list"
prop_vals = "hive.security.authorization.enabled,hive.security.authorization.manager,hive.security.authenticator.manager".split(",")
current_vals = []
missing_vals = []
if hive_server2 and prop_name in hive_server2:
current_vals = hive_server2[prop_name].split(",")
current_vals = [x.strip() for x in current_vals]
for val in prop_vals:
if not val in current_vals:
missing_vals.append(val)
if missing_vals:
validationItems.append({"config-name": prop_name,
"item": self.getWarnItem("If Ranger Hive Plugin is enabled."\
" {0} under hiveserver2-site needs to contain missing value {1}".format(prop_name, ",".join(missing_vals)))})
##Add stack validations for Ranger plugin disabled.
elif not ranger_plugin_enabled:
prop_name = "hive.security.authorization.manager"
prop_val = "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"
if prop_name in hive_server2 and hive_server2[prop_name] != prop_val:
validationItems.append({"config-name": prop_name,
"item": self.getWarnItem(
"If Ranger Hive Plugin is disabled."\
" {0} needs to be set to {1}".format(prop_name,prop_val))})
prop_name = "hive.security.authenticator.manager"
prop_val = "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator"
if prop_name in hive_server2 and hive_server2[prop_name] != prop_val:
validationItems.append({"config-name": prop_name,
"item": self.getWarnItem(
"If Ranger Hive Plugin is disabled."\
" {0} needs to be set to {1}".format(prop_name,prop_val))})
validationProblems = self.toConfigurationValidationProblems(validationItems, "hiveserver2-site")
return validationProblems
def validateHiveInteractiveEnvConfigurationsFromHDP30(self, properties, recommendedDefaults, configurations, services, hosts):
hive_site_env_properties = self.getSiteProperties(configurations, "hive-interactive-env")
yarn_site_properties = self.getSiteProperties(configurations, "yarn-site")
validationItems = []
hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
# Check for expecting "enable_hive_interactive" is ON given that there is HSI on at least one host present.
if len(hsi_hosts) > 0:
# HIVE_SERVER_INTERACTIVE is mapped to a host
if "enable_hive_interactive" not in hive_site_env_properties or (
"enable_hive_interactive" in hive_site_env_properties and
hive_site_env_properties["enable_hive_interactive"].lower() != "true"):
validationItems.append({"config-name": "enable_hive_interactive",
"item": self.getErrorItem(
"HIVE_SERVER_INTERACTIVE requires enable_hive_interactive in hive-interactive-env set to true.")})
else:
# no HIVE_SERVER_INTERACTIVE
if "enable_hive_interactive" in hive_site_env_properties and hive_site_env_properties[
"enable_hive_interactive"].lower() != "false":
validationItems.append({"config-name": "enable_hive_interactive",
"item": self.getErrorItem(
"enable_hive_interactive in hive-interactive-env should be set to false.")})
# Check for "yarn.resourcemanager.scheduler.monitor.enable" config to be true if HSI is ON.
if yarn_site_properties and "yarn.resourcemanager.scheduler.monitor.enable" in yarn_site_properties:
scheduler_monitor_enabled = yarn_site_properties["yarn.resourcemanager.scheduler.monitor.enable"]
if scheduler_monitor_enabled.lower() == "false" and hive_site_env_properties and "enable_hive_interactive" in hive_site_env_properties and \
hive_site_env_properties["enable_hive_interactive"].lower() == "true":
validationItems.append({"config-name": "enable_hive_interactive",
"item": self.getWarnItem(
"When enabling LLAP, set 'yarn.resourcemanager.scheduler.monitor.enable' to true to ensure that LLAP gets the full allocated capacity.")})
validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-env")
return validationProblems
def validateHiveInteractiveSiteConfigurationsFromHDP30(self, properties, recommendedDefaults, configurations, services, hosts):
"""
Does the following validation checks for HIVE_SERVER_INTERACTIVE's hive-interactive-site configs.
1. Queue selected in 'hive.llap.daemon.queue.name' config should be sized >= to minimum required to run LLAP
and Hive2 app.
2. Queue selected in 'hive.llap.daemon.queue.name' config state should not be 'STOPPED'.
3. 'hive.server2.enable.doAs' config should be set to 'false' for Hive2.
4. 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) should not consume more that 50% of selected queue for LLAP.
5. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB.
"""
validationItems = []
hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
llap_queue_name = None
llap_queue_cap_perc = None
MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS = 512
llap_queue_cap = None
hsi_site = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE)
if len(hsi_hosts) == 0:
return []
# Get total cluster capacity
node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
node_manager_cnt = len(node_manager_host_list)
yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
total_cluster_cap = node_manager_cnt * yarn_nm_mem_in_mb
capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
if not capacity_scheduler_properties:
self.logger.warning("Couldn't retrieve 'capacity-scheduler' properties while doing validation checks for Hive Server Interactive.")
return []
if hsi_site:
if "hive.llap.daemon.queue.name" in hsi_site and hsi_site["hive.llap.daemon.queue.name"]:
llap_queue_name = hsi_site["hive.llap.daemon.queue.name"]
llap_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_queue_name, total_cluster_cap)
if llap_queue_cap:
llap_queue_cap_perc = float(llap_queue_cap * 100 / total_cluster_cap)
min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations)
# Validate that the selected queue in "hive.llap.daemon.queue.name" should be sized >= to minimum required
# to run LLAP and Hive2 app.
if llap_queue_cap_perc < min_reqd_queue_cap_perc:
errMsg1 = "Selected queue '{0}' capacity ({1}%) is less than minimum required capacity ({2}%) for LLAP " \
"app to run".format(llap_queue_name, llap_queue_cap_perc, min_reqd_queue_cap_perc)
validationItems.append({"config-name": "hive.llap.daemon.queue.name", "item": self.getErrorItem(errMsg1)})
else:
self.logger.error("Couldn't retrieve '{0}' queue's capacity from 'capacity-scheduler' while doing validation checks for "
"Hive Server Interactive.".format(llap_queue_name))
# Validate that current selected queue in 'hive.llap.daemon.queue.name' state is not STOPPED.
llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_queue_name)
if llap_selected_queue_state:
if llap_selected_queue_state == "STOPPED":
errMsg2 = "Selected queue '{0}' current state is : '{1}'. It is required to be in 'RUNNING' state for LLAP to run"\
.format(llap_queue_name, llap_selected_queue_state)
validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg2)})
else:
self.logger.error("Couldn't retrieve '{0}' queue's state from 'capacity-scheduler' while doing validation checks for "
"Hive Server Interactive.".format(llap_queue_name))
else:
self.logger.error("Couldn't retrieve 'hive.llap.daemon.queue.name' config from 'hive-interactive-site' while doing "
"validation checks for Hive Server Interactive.")
# Validate that "hive.server2.enable.doAs" config is not set to "true" for Hive2.
if "hive.server2.enable.doAs" in hsi_site and hsi_site["hive.server2.enable.doAs"] == "true":
validationItems.append({"config-name": "hive.server2.enable.doAs", "item": self.getErrorItem("Value should be set to 'false' for Hive2.")})
# Validate that "Maximum Total Concurrent Queries" (hive.server2.tez.sessions.per.default.queue) is not consuming more that
# 50% of selected queue for LLAP.
if llap_queue_cap and "hive.server2.tez.sessions.per.default.queue" in hsi_site:
num_tez_sessions = hsi_site["hive.server2.tez.sessions.per.default.queue"]
if num_tez_sessions:
num_tez_sessions = long(num_tez_sessions)
yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_cap))
normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
llap_selected_queue_cap_remaining = llap_queue_cap - (normalized_tez_am_container_size * num_tez_sessions)
if llap_selected_queue_cap_remaining <= llap_queue_cap/2:
errMsg3 = " Reducing the 'Maximum Total Concurrent Queries' (value: {0}) is advisable as it is consuming more than 50% of " \
"'{1}' queue for LLAP.".format(num_tez_sessions, llap_queue_name)
validationItems.append({"config-name": "hive.server2.tez.sessions.per.default.queue","item": self.getWarnItem(errMsg3)})
if int(hsi_site["hive.llap.io.memory.size"]) > int(hsi_site["hive.llap.daemon.yarn.container.mb"]):
errorMessage = "In-Memory Cache per Daemon (value: {0}) may not be more then Memory per Daemon (value: {1})".format(hsi_site["hive.llap.io.memory.size"], hsi_site["hive.llap.daemon.yarn.container.mb"])
validationItems.append({"config-name": "hive.llap.io.memory.size","item": self.getErrorItem(errorMessage)})
# Validate that "remaining available capacity" in cluster is at least 512 MB, after "llap" queue is selected,
# in order to run Service Checks.
if llap_queue_name and llap_queue_cap_perc and llap_queue_name == self.AMBARI_MANAGED_LLAP_QUEUE_NAME:
curr_selected_queue_for_llap_cap = float(llap_queue_cap_perc) / 100 * total_cluster_cap
available_cap_in_cluster = total_cluster_cap - curr_selected_queue_for_llap_cap
if available_cap_in_cluster < MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS:
errMsg4 = "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \
"({2}) in cluster is less than 512 MB.".format(self.AMBARI_MANAGED_LLAP_QUEUE_NAME, curr_selected_queue_for_llap_cap, available_cap_in_cluster)
validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg4)})
validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-site")
return validationProblems
def get_yarn_nm_mem_in_mb(self, services, configurations):
"""
Gets YARN NodeManager memory in MB (yarn.nodemanager.resource.memory-mb).
Reads from:
- configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"]
is empty, else
- services['configurations'] (input).
services["changed-configurations"] would be empty is Stack Advisor call if made from Blueprints (1st invocation). Subsequent
Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advsior calculates this
value (configurations), it is finally not recommended, making 'input' value to survive.
"""
yarn_nm_mem_in_mb = None
yarn_site = self.getServicesSiteProperties(services, "yarn-site")
yarn_site_properties = self.getSiteProperties(configurations, "yarn-site")
# Check if services["changed-configurations"] is empty and "yarn.nodemanager.resource.memory-mb" is modified in current ST invocation.
if not ("changed-configurations" in services and services["changed-configurations"]) and yarn_site_properties and "yarn.nodemanager.resource.memory-mb" in yarn_site_properties:
yarn_nm_mem_in_mb = float(yarn_site_properties["yarn.nodemanager.resource.memory-mb"])
elif yarn_site and "yarn.nodemanager.resource.memory-mb" in yarn_site:
# Check if "yarn.nodemanager.resource.memory-mb" is input in services array.
yarn_nm_mem_in_mb = float(yarn_site["yarn.nodemanager.resource.memory-mb"])
if yarn_nm_mem_in_mb <= 0.0:
self.logger.warning("'yarn.nodemanager.resource.memory-mb' current value : {0}. Expected value : > 0".format(yarn_nm_mem_in_mb))
return yarn_nm_mem_in_mb
def __getQueueCapacityKeyFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name):
"""
Retrieves the passed in queue's 'capacity' related key from Capacity Scheduler.
"""
# Identify the key which contains the capacity for "llap_daemon_selected_queue_name".
cap_sched_keys = capacity_scheduler_properties.keys()
llap_selected_queue_cap_key = None
current_selected_queue_for_llap_cap = None
for key in cap_sched_keys:
# Expected capacity prop key is of form : "yarn.scheduler.capacity.<one or more queues in path separated by ".'>.[llap_daemon_selected_queue_name].capacity'
if key.endswith(llap_daemon_selected_queue_name+".capacity") and key.startswith("yarn.scheduler.capacity.root"):
self.logger.info("DBG: Selected queue name as: " + key)
llap_selected_queue_cap_key = key
break;
return llap_selected_queue_cap_key
def __getQueueStateFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name):
"""
Retrieves the passed in queue's 'state' from Capacity Scheduler.
"""
# Identify the key which contains the state for "llap_daemon_selected_queue_name".
cap_sched_keys = capacity_scheduler_properties.keys()
llap_selected_queue_state_key = None
llap_selected_queue_state = None
for key in cap_sched_keys:
if key.endswith(llap_daemon_selected_queue_name+".state"):
llap_selected_queue_state_key = key
break;
llap_selected_queue_state = capacity_scheduler_properties.get(llap_selected_queue_state_key)
return llap_selected_queue_state
def __getQueueAmFractionFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name):
"""
Retrieves the passed in queue's 'AM fraction' from Capacity Scheduler. Returns default value of 0.1 if AM Percent
pertaining to passed-in queue is not present.
"""
# Identify the key which contains the AM fraction for "llap_daemon_selected_queue_name".
cap_sched_keys = capacity_scheduler_properties.keys()
llap_selected_queue_am_percent_key = None
for key in cap_sched_keys:
if key.endswith("."+llap_daemon_selected_queue_name+".maximum-am-resource-percent"):
llap_selected_queue_am_percent_key = key
self.logger.info("AM percent key got for '{0}' queue is : '{1}'".format(llap_daemon_selected_queue_name, llap_selected_queue_am_percent_key))
break;
if llap_selected_queue_am_percent_key is None:
self.logger.info("Returning default AM percent value : '0.1' for queue : {0}".format(llap_daemon_selected_queue_name))
return 0.1 # Default value to use if we couldn't retrieve queue's corresponding AM Percent key.
else:
llap_selected_queue_am_percent = capacity_scheduler_properties.get(llap_selected_queue_am_percent_key)
self.logger.info("Returning read value for key '{0}' as : '{1}' for queue : '{2}'".format(llap_selected_queue_am_percent_key,
llap_selected_queue_am_percent,
llap_daemon_selected_queue_name))
return llap_selected_queue_am_percent
def __getSelectedQueueTotalCap(self, capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity):
"""
Calculates the total available capacity for the passed-in YARN queue of any level based on the percentages.
"""
self.logger.info("Entered __getSelectedQueueTotalCap fn() with llap_daemon_selected_queue_name= '{0}'.".format(llap_daemon_selected_queue_name))
available_capacity = total_cluster_capacity
queue_cap_key = self.__getQueueCapacityKeyFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
if queue_cap_key:
queue_cap_key = queue_cap_key.strip()
if len(queue_cap_key) >= 34: # len("yarn.scheduler.capacity.<single letter queue name>.capacity") = 34
# Expected capacity prop key is of form : "yarn.scheduler.capacity.<one or more queues (path)>.capacity"
queue_path = queue_cap_key[24:] # Strip from beginning "yarn.scheduler.capacity."
queue_path = queue_path[0:-9] # Strip from end ".capacity"
queues_list = queue_path.split(".")
self.logger.info("Queue list : {0}".format(queues_list))
if queues_list:
for queue in queues_list:
queue_cap_key = self.__getQueueCapacityKeyFromCapacityScheduler(capacity_scheduler_properties, queue)
queue_cap_perc = float(capacity_scheduler_properties.get(queue_cap_key))
available_capacity = queue_cap_perc / 100 * available_capacity
self.logger.info("Total capacity available for queue {0} is : {1}".format(queue, available_capacity))
# returns the capacity calculated for passed-in queue in "llap_daemon_selected_queue_name".
return available_capacity
def min_queue_perc_reqd_for_llap_and_hive_app(self, services, hosts, configurations):
"""
Calculate minimum queue capacity required in order to get LLAP and HIVE2 app into running state.
"""
# Get queue size if sized at 20%
node_manager_hosts = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
yarn_rm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
total_cluster_cap = len(node_manager_hosts) * yarn_rm_mem_in_mb
total_queue_size_at_20_perc = 20.0 / 100 * total_cluster_cap
# Calculate based on minimum size required by containers.
yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
hive_tez_container_size = long(self.get_hive_tez_container_size(services))
tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_cap))
normalized_val = self._normalizeUp(hive_tez_container_size, yarn_min_container_size) \
+ self._normalizeUp(tez_am_container_size, yarn_min_container_size)
min_required = max(total_queue_size_at_20_perc, normalized_val)
min_required_perc = min_required * 100 / total_cluster_cap
return int(math.ceil(min_required_perc))
#TODO Convert this to a helper. It can apply to any property. Check config, or check if in the list of changed configurations and read the latest value
def get_yarn_min_container_size(self, services, configurations):
"""
Gets YARN's minimum container size (yarn.scheduler.minimum-allocation-mb).
Reads from:
- configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"]
is empty, else
- services['configurations'] (input).
services["changed-configurations"] would be empty if Stack Advisor call is made from Blueprints (1st invocation). Subsequent
Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advisor calculates this
value (configurations), it is finally not recommended, making 'input' value to survive.
:type services dict
:type configurations dict
:rtype str
"""
yarn_min_container_size = None
yarn_min_allocation_property = "yarn.scheduler.minimum-allocation-mb"
yarn_site = self.getSiteProperties(configurations, "yarn-site")
yarn_site_properties = self.getServicesSiteProperties(services, "yarn-site")
# Check if services["changed-configurations"] is empty and "yarn.scheduler.minimum-allocation-mb" is modified in current ST invocation.
if not services["changed-configurations"] and yarn_site and yarn_min_allocation_property in yarn_site:
yarn_min_container_size = yarn_site[yarn_min_allocation_property]
self.logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from output as : {0}".format(yarn_min_container_size))
# Check if "yarn.scheduler.minimum-allocation-mb" is input in services array.
elif yarn_site_properties and yarn_min_allocation_property in yarn_site_properties:
yarn_min_container_size = yarn_site_properties[yarn_min_allocation_property]
self.logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from services as : {0}".format(yarn_min_container_size))
if not yarn_min_container_size:
self.logger.error("{0} was not found in the configuration".format(yarn_min_allocation_property))
return yarn_min_container_size
def get_hive_tez_container_size(self, services):
"""
Gets HIVE Tez container size (hive.tez.container.size).
"""
hive_container_size = None
hsi_site = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE)
if hsi_site and "hive.tez.container.size" in hsi_site:
hive_container_size = hsi_site["hive.tez.container.size"]
if not hive_container_size:
# This can happen (1). If config is missing in hive-interactive-site or (2). its an
# upgrade scenario from Ambari 2.4 to Ambari 2.5 with HDP 2.5 installed. Read it
# from hive-site.
#
# If Ambari 2.5 after upgrade from 2.4 is managing HDP 2.6 here, this config would have
# already been added in hive-interactive-site as part of HDP upgrade from 2.5 to 2.6,
# and we wont end up in this block to look up in hive-site.
hive_site = self.getServicesSiteProperties(services, "hive-site")
if hive_site and "hive.tez.container.size" in hive_site:
hive_container_size = hive_site["hive.tez.container.size"]
return hive_container_size
def calculate_tez_am_container_size(self, services, total_cluster_capacity, is_cluster_create_opr=False, enable_hive_interactive_1st_invocation=False):
"""
Calculates Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site on initialization if values read is 0.
Else returns the read value.
"""
tez_am_resource_memory_mb = self.get_tez_am_resource_memory_mb(services)
calculated_tez_am_resource_memory_mb = None
if is_cluster_create_opr or enable_hive_interactive_1st_invocation:
if total_cluster_capacity <= 4096:
calculated_tez_am_resource_memory_mb = 512
elif total_cluster_capacity > 4096 and total_cluster_capacity <= 98304:
calculated_tez_am_resource_memory_mb = 1024
elif total_cluster_capacity > 98304:
calculated_tez_am_resource_memory_mb = 4096
self.logger.info("DBG: Calculated and returning 'tez_am_resource_memory_mb' as : {0}".format(calculated_tez_am_resource_memory_mb))
return float(calculated_tez_am_resource_memory_mb)
else:
self.logger.info("DBG: Returning 'tez_am_resource_memory_mb' as : {0}".format(tez_am_resource_memory_mb))
return float(tez_am_resource_memory_mb)
def get_tez_am_resource_memory_mb(self, services):
"""
Gets Tez's AM resource memory (tez.am.resource.memory.mb) from services.
"""
tez_am_resource_memory_mb = None
if "tez.am.resource.memory.mb" in services["configurations"]["tez-interactive-site"]["properties"]:
tez_am_resource_memory_mb = services["configurations"]["tez-interactive-site"]["properties"]["tez.am.resource.memory.mb"]
return tez_am_resource_memory_mb
def _normalizeUp(self, val1, val2):
"""
Normalize up 'val2' with respect to 'val1'.
"""
tmp = math.ceil(val1 / val2)
return tmp * val2