blob: a2e31cc9fe3fb9bd3c9ceff0c5f2be6ed19e7e93 [file] [log] [blame]
#!/usr/bin/env ambari-python-wrap
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Python imports
import imp
import os
import traceback
import re
import socket
import fnmatch
from ambari_commons.str_utils import string_set_equals
from resource_management.core.logger import Logger
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
try:
with open(PARENT_FILE, 'rb') as fp:
service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
except Exception as e:
traceback.print_exc()
print "Failed to load parent"
class AtlasServiceAdvisor(service_advisor.ServiceAdvisor):
def __init__(self, *args, **kwargs):
self.as_super = super(AtlasServiceAdvisor, self)
self.as_super.__init__(*args, **kwargs)
# Always call these methods
self.modifyMastersWithMultipleInstances()
self.modifyCardinalitiesDict()
self.modifyHeapSizeProperties()
self.modifyNotValuableComponents()
self.modifyComponentsNotPreferableOnServer()
self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
Modify the set of masters with multiple instances.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyCardinalitiesDict(self):
"""
Modify the dictionary of cardinalities.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyHeapSizeProperties(self):
"""
Modify the dictionary of heap size properties.
Must be overriden in child class.
"""
pass
def modifyNotValuableComponents(self):
"""
Modify the set of components whose host assignment is based on other services.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyComponentsNotPreferableOnServer(self):
"""
Modify the set of components that are not preferable on the server.
Must be overriden in child class.
"""
# Nothing to do
pass
def modifyComponentLayoutSchemes(self):
"""
Modify layout scheme dictionaries for components.
The scheme dictionary basically maps the number of hosts to
host index where component should exist.
Must be overriden in child class.
"""
# Nothing to do
pass
def getServiceComponentLayoutValidations(self, services, hosts):
"""
Get a list of errors.
Must be overriden in child class.
"""
return []
def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
"""
Entry point.
Must be overriden in child class.
"""
#Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
# (self.__class__.__name__, inspect.stack()[0][3]))
recommender = AtlasRecommender()
recommender.recommendAtlasConfigurationsFromHDP25(configurations, clusterData, services, hosts)
recommender.recommendAtlasConfigurationsFromHDP26(configurations, clusterData, services, hosts)
def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
"""
Entry point.
Validate configurations for the service. Return a list of errors.
The code for this function should be the same for each Service Advisor.
"""
#Logger.info("Class: %s, Method: %s. Validating Configurations." %
# (self.__class__.__name__, inspect.stack()[0][3]))
validator = AtlasValidator()
# Calls the methods of the validator using arguments,
# method(siteProperties, siteRecommendations, configurations, services, hosts)
return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
class AtlasRecommender(service_advisor.ServiceAdvisor):
"""
Atlas Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
"""
def __init__(self, *args, **kwargs):
self.as_super = super(AtlasRecommender, self)
self.as_super.__init__(*args, **kwargs)
def constructAtlasRestAddress(self, services, hosts):
"""
:param services: Collection of services in the cluster with configs
:param hosts: Collection of hosts in the cluster
:return: The suggested property for atlas.rest.address if it is valid, otherwise, None
"""
atlas_rest_address = None
services_list = [service["StackServices"]["service_name"] for service in services["services"]]
is_atlas_in_cluster = "ATLAS" in services_list
atlas_server_hosts_info = self.getHostsWithComponent("ATLAS", "ATLAS_SERVER", services, hosts)
if is_atlas_in_cluster and atlas_server_hosts_info and len(atlas_server_hosts_info) > 0:
# Multiple Atlas Servers can exist, so sort by hostname to create deterministic csv
atlas_host_names = [e['Hosts']['host_name'] for e in atlas_server_hosts_info]
if len(atlas_host_names) > 1:
atlas_host_names = sorted(atlas_host_names)
scheme = "http"
metadata_port = "21000"
atlas_server_default_https_port = "21443"
tls_enabled = "false"
if 'application-properties' in services['configurations']:
if 'atlas.enableTLS' in services['configurations']['application-properties']['properties']:
tls_enabled = services['configurations']['application-properties']['properties']['atlas.enableTLS']
if 'atlas.server.http.port' in services['configurations']['application-properties']['properties']:
metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.http.port'])
if str(tls_enabled).lower() == "true":
scheme = "https"
if 'atlas.server.https.port' in services['configurations']['application-properties']['properties']:
metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.https.port'])
else:
metadata_port = atlas_server_default_https_port
atlas_rest_address_list = ["{0}://{1}:{2}".format(scheme, hostname, metadata_port) for hostname in atlas_host_names]
atlas_rest_address = ",".join(atlas_rest_address_list)
self.logger.info("Constructing atlas.rest.address=%s" % atlas_rest_address)
return atlas_rest_address
def recommendAtlasConfigurationsFromHDP25(self, configurations, clusterData, services, hosts):
putAtlasApplicationProperty = self.putProperty(configurations, "application-properties", services)
putAtlasRangerPluginProperty = self.putProperty(configurations, "ranger-atlas-plugin-properties", services)
putAtlasEnvProperty = self.putProperty(configurations, "atlas-env", services)
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
# Generate atlas.rest.address since the value is always computed
atlas_rest_address = self.constructAtlasRestAddress(services, hosts)
if atlas_rest_address is not None:
putAtlasApplicationProperty("atlas.rest.address", atlas_rest_address)
if "AMBARI_INFRA" in servicesList and 'infra-solr-env' in services['configurations']:
if 'infra_solr_znode' in services['configurations']['infra-solr-env']['properties']:
infra_solr_znode = services['configurations']['infra-solr-env']['properties']['infra_solr_znode']
else:
infra_solr_znode = None
zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services)
zookeeper_host_arr = []
zookeeper_port = self.getZKPort(services)
for i in range(len(zookeeper_hosts)):
zookeeper_host = zookeeper_hosts[i] + ':' + zookeeper_port
if infra_solr_znode is not None:
zookeeper_host += infra_solr_znode
zookeeper_host_arr.append(zookeeper_host)
solr_zookeeper_url = ",".join(zookeeper_host_arr)
putAtlasApplicationProperty('atlas.graph.index.search.solr.zookeeper-url', solr_zookeeper_url)
else:
putAtlasApplicationProperty('atlas.graph.index.search.solr.zookeeper-url', "")
# Kafka section
if "KAFKA" in servicesList and 'kafka-broker' in services['configurations']:
kafka_hosts = self.getHostNamesWithComponent("KAFKA", "KAFKA_BROKER", services)
if 'port' in services['configurations']['kafka-broker']['properties']:
kafka_broker_port = services['configurations']['kafka-broker']['properties']['port']
else:
kafka_broker_port = '6667'
if 'kafka-broker' in services['configurations'] and 'listeners' in services['configurations']['kafka-broker']['properties']:
kafka_server_listeners = services['configurations']['kafka-broker']['properties']['listeners']
else:
kafka_server_listeners = 'PLAINTEXT://localhost:6667'
security_enabled = self.isSecurityEnabled(services)
if ',' in kafka_server_listeners and len(kafka_server_listeners.split(',')) > 1:
for listener in kafka_server_listeners.split(','):
listener = listener.strip().split(':')
if len(listener) == 3:
if 'SASL' in listener[0] and security_enabled:
kafka_broker_port = listener[2]
break
elif 'SASL' not in listener[0] and not security_enabled:
kafka_broker_port = listener[2]
else:
listener = kafka_server_listeners.strip().split(':')
if len(listener) == 3:
kafka_broker_port = listener[2]
kafka_host_arr = []
for i in range(len(kafka_hosts)):
kafka_host_arr.append(kafka_hosts[i] + ':' + kafka_broker_port)
kafka_bootstrap_servers = ",".join(kafka_host_arr)
if 'zookeeper.connect' in services['configurations']['kafka-broker']['properties']:
kafka_zookeeper_connect = services['configurations']['kafka-broker']['properties']['zookeeper.connect']
else:
kafka_zookeeper_connect = None
putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', kafka_bootstrap_servers)
putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', kafka_zookeeper_connect)
else:
putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', "")
putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', "")
if "HBASE" in servicesList and 'hbase-site' in services['configurations']:
if 'hbase.zookeeper.quorum' in services['configurations']['hbase-site']['properties']:
hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum']
else:
hbase_zookeeper_quorum = ""
putAtlasApplicationProperty('atlas.graph.storage.hostname', hbase_zookeeper_quorum)
putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', hbase_zookeeper_quorum)
else:
putAtlasApplicationProperty('atlas.graph.storage.hostname', "")
putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', "")
if "ranger-env" in services["configurations"] and "ranger-atlas-plugin-properties" in services["configurations"] and \
"ranger-atlas-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
ranger_atlas_plugin_enabled = services["configurations"]["ranger-env"]["properties"]["ranger-atlas-plugin-enabled"]
putAtlasRangerPluginProperty('ranger-atlas-plugin-enabled', ranger_atlas_plugin_enabled)
ranger_atlas_plugin_enabled = ''
if 'ranger-atlas-plugin-properties' in configurations and 'ranger-atlas-plugin-enabled' in configurations['ranger-atlas-plugin-properties']['properties']:
ranger_atlas_plugin_enabled = configurations['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled']
elif 'ranger-atlas-plugin-properties' in services['configurations'] and 'ranger-atlas-plugin-enabled' in services['configurations']['ranger-atlas-plugin-properties']['properties']:
ranger_atlas_plugin_enabled = services['configurations']['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled']
if ranger_atlas_plugin_enabled and (ranger_atlas_plugin_enabled.lower() == 'Yes'.lower()):
putAtlasApplicationProperty('atlas.authorizer.impl','ranger')
else:
putAtlasApplicationProperty('atlas.authorizer.impl','simple')
#atlas server memory settings
if 'atlas-env' in services['configurations']:
atlas_server_metadata_size = 50000
if 'atlas_server_metadata_size' in services['configurations']['atlas-env']['properties']:
atlas_server_metadata_size = float(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size'])
atlas_server_xmx = 2048
if 300000 <= atlas_server_metadata_size < 500000:
atlas_server_xmx = 1024*5
if 500000 <= atlas_server_metadata_size < 1000000:
atlas_server_xmx = 1024*10
if atlas_server_metadata_size >= 1000000:
atlas_server_xmx = 1024*16
atlas_server_max_new_size = (atlas_server_xmx / 100) * 30
putAtlasEnvProperty("atlas_server_xmx", atlas_server_xmx)
putAtlasEnvProperty("atlas_server_max_new_size", atlas_server_max_new_size)
def recommendAtlasConfigurationsFromHDP26(self, configurations, clusterData, services, hosts):
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
putAtlasApplicationProperty = self.putProperty(configurations, "application-properties", services)
knox_host = 'localhost'
knox_port = '8443'
if 'KNOX' in servicesList:
knox_hosts = self.getComponentHostNames(services, "KNOX", "KNOX_GATEWAY")
if len(knox_hosts) > 0:
knox_hosts.sort()
knox_host = knox_hosts[0]
if 'gateway-site' in services['configurations'] and 'gateway.port' in services['configurations']["gateway-site"]["properties"]:
knox_port = services['configurations']["gateway-site"]["properties"]['gateway.port']
putAtlasApplicationProperty('atlas.sso.knox.providerurl', 'https://{0}:{1}/gateway/knoxsso/api/v1/websso'.format(knox_host, knox_port))
class AtlasValidator(service_advisor.ServiceAdvisor):
"""
Atlas Validator checks the correctness of properties whenever the service is first added or the user attempts to
change configs via the UI.
"""
def __init__(self, *args, **kwargs):
self.as_super = super(AtlasValidator, self)
self.as_super.__init__(*args, **kwargs)
self.validators = [("application-properties", self.validateAtlasConfigurationsFromHDP25)]
def validateAtlasConfigurationsFromHDP25(self, properties, recommendedDefaults, configurations, services, hosts):
application_properties = self.getSiteProperties(configurations, "application-properties")
validationItems = []
auth_type = application_properties['atlas.authentication.method.ldap.type']
auth_ldap_enable = application_properties['atlas.authentication.method.ldap'].lower() == 'true'
self.logger.info("Validating Atlas configs, authentication type: %s" % str(auth_type))
# Required props
ldap_props = {"atlas.authentication.method.ldap.url": "",
"atlas.authentication.method.ldap.userDNpattern": "uid=",
"atlas.authentication.method.ldap.groupSearchBase": "",
"atlas.authentication.method.ldap.groupSearchFilter": "",
"atlas.authentication.method.ldap.groupRoleAttribute": "cn",
"atlas.authentication.method.ldap.base.dn": "",
"atlas.authentication.method.ldap.bind.dn": "",
"atlas.authentication.method.ldap.bind.password": "",
"atlas.authentication.method.ldap.user.searchfilter": ""
}
ad_props = {"atlas.authentication.method.ldap.ad.domain": "",
"atlas.authentication.method.ldap.ad.url": "",
"atlas.authentication.method.ldap.ad.base.dn": "",
"atlas.authentication.method.ldap.ad.bind.dn": "",
"atlas.authentication.method.ldap.ad.bind.password": "",
"atlas.authentication.method.ldap.ad.user.searchfilter": "(sAMAccountName={0})"
}
props_to_require = set()
if auth_type.lower() == "ldap":
props_to_require = set(ldap_props.keys())
elif auth_type.lower() == "ad":
props_to_require = set(ad_props.keys())
elif auth_type.lower() == "none":
pass
if auth_ldap_enable:
for prop in props_to_require:
if prop not in application_properties or application_properties[prop] is None or application_properties[prop].strip() == "":
validationItems.append({"config-name": prop,
"item": self.getErrorItem("If authentication type is %s, this property is required." % auth_type)})
if application_properties['atlas.graph.index.search.backend'] == 'solr5' and \
not application_properties['atlas.graph.index.search.solr.zookeeper-url']:
validationItems.append({"config-name": "atlas.graph.index.search.solr.zookeeper-url",
"item": self.getErrorItem(
"If AMBARI_INFRA is not installed then the SOLR zookeeper url configuration must be specified.")})
if not application_properties['atlas.kafka.bootstrap.servers']:
validationItems.append({"config-name": "atlas.kafka.bootstrap.servers",
"item": self.getErrorItem(
"If KAFKA is not installed then the Kafka bootstrap servers configuration must be specified.")})
if not application_properties['atlas.kafka.zookeeper.connect']:
validationItems.append({"config-name": "atlas.kafka.zookeeper.connect",
"item": self.getErrorItem(
"If KAFKA is not installed then the Kafka zookeeper quorum configuration must be specified.")})
if application_properties['atlas.graph.storage.backend'] == 'hbase' and 'hbase-site' in services['configurations']:
hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum']
if not application_properties['atlas.graph.storage.hostname']:
validationItems.append({"config-name": "atlas.graph.storage.hostname",
"item": self.getErrorItem(
"If HBASE is not installed then the hbase zookeeper quorum configuration must be specified.")})
elif string_set_equals(application_properties['atlas.graph.storage.hostname'], hbase_zookeeper_quorum):
validationItems.append({"config-name": "atlas.graph.storage.hostname",
"item": self.getWarnItem(
"Atlas is configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")})
if not application_properties['atlas.audit.hbase.zookeeper.quorum']:
validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum",
"item": self.getErrorItem(
"If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")})
elif application_properties['atlas.graph.storage.backend'] == 'hbase' and 'hbase-site' not in services[
'configurations']:
if not application_properties['atlas.graph.storage.hostname']:
validationItems.append({"config-name": "atlas.graph.storage.hostname",
"item": self.getErrorItem(
"Atlas is not configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")})
if not application_properties['atlas.audit.hbase.zookeeper.quorum']:
validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum",
"item": self.getErrorItem(
"If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")})
validationProblems = self.toConfigurationValidationProblems(validationItems, "application-properties")
return validationProblems