blob: 769249f7ece5a54cd34ba696bd3aa1853f367516 [file] [log] [blame]
#!/usr/bin/env ambari-python-wrap
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Python Imports
import imp
import os
import random
import re
import socket
import string
import traceback
import json
import sys
import logging
from math import ceil, floor
from urlparse import urlparse
# Local imports
from ambari_configuration import AmbariConfiguration
from resource_management.libraries.functions.data_structure_utils import get_from_dict
from resource_management.core.exceptions import Fail
class StackAdvisor(object):
"""
Abstract class implemented by all stack advisors. Stack advisors advise on stack specific questions.
Currently stack advisors provide following abilities:
- Recommend where services should be installed in cluster
- Recommend configurations based on host hardware
- Validate user selection of where services are installed on cluster
- Validate user configuration values
Each of the above methods is passed in parameters about services and hosts involved as described below.
@type services: dictionary
@param services: Dictionary containing all information about services selected by the user.
Example: {
"services": [
{
"StackServices": {
"service_name" : "HDFS",
"service_version" : "2.6.0.2.2",
},
"components" : [
{
"StackServiceComponents" : {
"cardinality" : "1+",
"component_category" : "SLAVE",
"component_name" : "DATANODE",
"display_name" : "DataNode",
"service_name" : "HDFS",
"hostnames" : []
},
"dependencies" : []
}, {
"StackServiceComponents" : {
"cardinality" : "1-2",
"component_category" : "MASTER",
"component_name" : "NAMENODE",
"display_name" : "NameNode",
"service_name" : "HDFS",
"hostnames" : []
},
"dependencies" : []
},
...
]
},
...
]
}
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
Example: {
"items": [
{
Hosts: {
"host_name": "c6401.ambari.apache.org",
"public_host_name" : "c6401.ambari.apache.org",
"ip": "192.168.1.101",
"cpu_count" : 1,
"disk_info" : [
{
"available" : "4564632",
"used" : "5230344",
"percent" : "54%",
"size" : "10319160",
"type" : "ext4",
"mountpoint" : "/"
},
{
"available" : "1832436",
"used" : "0",
"percent" : "0%",
"size" : "1832436",
"type" : "tmpfs",
"mountpoint" : "/dev/shm"
}
],
"host_state" : "HEALTHY",
"os_arch" : "x86_64",
"os_type" : "centos6",
"total_mem" : 3664872
}
},
...
]
}
Each of the methods can either return recommendations or validations.
Recommendations are made in a Ambari Blueprints friendly format.
Validations are an array of validation objects.
"""
def recommendComponentLayout(self, services, hosts):
"""
Returns recommendation of which hosts various service components should be installed on.
This function takes as input all details about services being installed, and hosts
they are being installed into, to generate hostname assignments to various components
of each service.
@type services: dictionary
@param services: Dictionary containing all information about services selected by the user.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Layout recommendation of service components on cluster hosts in Ambari Blueprints friendly format.
Example: {
"resources" : [
{
"hosts" : [
"c6402.ambari.apache.org",
"c6401.ambari.apache.org"
],
"services" : [
"HDFS"
],
"recommendations" : {
"blueprint" : {
"host_groups" : [
{
"name" : "host-group-2",
"components" : [
{ "name" : "JOURNALNODE" },
{ "name" : "ZKFC" },
{ "name" : "DATANODE" },
{ "name" : "SECONDARY_NAMENODE" }
]
},
{
"name" : "host-group-1",
"components" : [
{ "name" : "HDFS_CLIENT" },
{ "name" : "NAMENODE" },
{ "name" : "JOURNALNODE" },
{ "name" : "ZKFC" },
{ "name" : "DATANODE" }
]
}
]
},
"blueprint_cluster_binding" : {
"host_groups" : [
{
"name" : "host-group-1",
"hosts" : [ { "fqdn" : "c6401.ambari.apache.org" } ]
},
{
"name" : "host-group-2",
"hosts" : [ { "fqdn" : "c6402.ambari.apache.org" } ]
}
]
}
}
}
]
}
"""
pass
def validateComponentLayout(self, services, hosts):
"""
Returns array of Validation issues with service component layout on hosts
This function takes as input all details about services being installed along with
hosts the components are being installed on (hostnames property is populated for
each component).
@type services: dictionary
@param services: Dictionary containing information about services and host layout selected by the user.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Dictionary containing array of validation items
Example: {
"items": [
{
"type" : "host-group",
"level" : "ERROR",
"message" : "NameNode and Secondary NameNode should not be hosted on the same machine",
"component-name" : "NAMENODE",
"host" : "c6401.ambari.apache.org"
},
...
]
}
"""
pass
def recommendConfigurations(self, services, hosts):
"""
Returns recommendation of service configurations based on host-specific layout of components.
This function takes as input all details about services being installed, and hosts
they are being installed into, to recommend host-specific configurations.
@type services: dictionary
@param services: Dictionary containing all information about services and component layout selected by the user.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Layout recommendation of service components on cluster hosts in Ambari Blueprints friendly format.
Example: {
"services": [
"HIVE",
"TEZ",
"YARN"
],
"recommendations": {
"blueprint": {
"host_groups": [],
"configurations": {
"yarn-site": {
"properties": {
"yarn.scheduler.minimum-allocation-mb": "682",
"yarn.scheduler.maximum-allocation-mb": "2048",
"yarn.nodemanager.resource.memory-mb": "2048"
}
},
"tez-site": {
"properties": {
"tez.am.java.opts": "-server -Xmx546m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC",
"tez.am.resource.memory.mb": "682"
}
},
"hive-site": {
"properties": {
"hive.tez.container.size": "682",
"hive.tez.java.opts": "-server -Xmx546m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC",
"hive.auto.convert.join.noconditionaltask.size": "238026752"
}
}
}
},
"blueprint_cluster_binding": {
"host_groups": []
}
},
"hosts": [
"c6401.ambari.apache.org",
"c6402.ambari.apache.org",
"c6403.ambari.apache.org"
]
}
"""
pass
def recommendConfigurationsForSSO(self, services, hosts):
"""
Returns recommendation of SSO-related service configurations based on host-specific layout of components.
This function takes as input all details about services being installed, and hosts
they are being installed into, to recommend host-specific configurations.
@type services: dictionary
@param services: Dictionary containing all information about services and component layout selected by the user.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Layout recommendation of service components on cluster hosts in Ambari Blueprints friendly format.
Example: {
"services": [
"HIVE",
"TEZ",
"YARN"
],
"recommendations": {
"blueprint": {
"host_groups": [],
"configurations": {
"yarn-site": {
"properties": {
"yarn.scheduler.minimum-allocation-mb": "682",
"yarn.scheduler.maximum-allocation-mb": "2048",
"yarn.nodemanager.resource.memory-mb": "2048"
}
},
"tez-site": {
"properties": {
"tez.am.java.opts": "-server -Xmx546m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC",
"tez.am.resource.memory.mb": "682"
}
},
"hive-site": {
"properties": {
"hive.tez.container.size": "682",
"hive.tez.java.opts": "-server -Xmx546m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC",
"hive.auto.convert.join.noconditionaltask.size": "238026752"
}
}
}
},
"blueprint_cluster_binding": {
"host_groups": []
}
},
"hosts": [
"c6401.ambari.apache.org",
"c6402.ambari.apache.org",
"c6403.ambari.apache.org"
]
}
"""
pass
def recommendConfigurationsForKerberos(self, services, hosts):
"""
Returns recommendation of Kerberos-related service configurations based on host-specific layout
of components.
This function takes as input all details about services being installed, and hosts
they are being installed into, to recommend host-specific configurations.
For backwards compatibility, this function redirects to recommendConfigurations. Implementations
should override this function to recommend Kerberos-specific property changes.
:type services: dict
:param services: Dictionary containing all information about services and component layout selected by the user.
:type hosts: dict
:param hosts: Dictionary containing all information about hosts in this cluster
:rtype: dict
:return: Layout recommendation of service components on cluster hosts in Ambari Blueprints friendly format.
"""
return self.recommendConfigurations(services, hosts)
def validateConfigurations(self, services, hosts):
""""
Returns array of Validation issues with configurations provided by user
This function takes as input all details about services being installed along with
configuration values entered by the user. These configurations can be validated against
service requirements, or host hardware to generate validation issues.
@type services: dictionary
@param services: Dictionary containing information about services and user configurations.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Dictionary containing array of validation items
Example: {
"items": [
{
"config-type": "yarn-site",
"message": "Value is less than the recommended default of 682",
"type": "configuration",
"config-name": "yarn.scheduler.minimum-allocation-mb",
"level": "WARN"
}
]
}
"""
pass
class DefaultStackAdvisor(StackAdvisor):
CLUSTER_CREATE_OPERATION = "ClusterCreate"
ADD_SERVICE_OPERATION = "AddService"
EDIT_CONFIG_OPERATION = "EditConfig"
RECOMMEND_ATTRIBUTE_OPERATION = "RecommendAttribute"
OPERATION = "operation"
OPERATION_DETAILS = "operation_details"
ADVISOR_CONTEXT = "advisor_context"
CALL_TYPE = "call_type"
"""
Default stack advisor implementation.
This implementation is used when a stack-version, or its hierarchy does not
have an advisor. Stack-versions can extend this class to provide their own
implement
"""
def __init__(self):
self.services = None
self.initialize_logger('DefaultStackAdvisor')
# Dictionary that maps serviceName or componentName to serviceAdvisor
self.serviceAdvisorsDict = {}
# Contains requested properties during 'recommend-configuration-dependencies' request.
# It's empty during other requests.
self.allRequestedProperties = None
# Data structures that may be extended by Service Advisors with information specific to each Service
self.mastersWithMultipleInstances = set()
self.notValuableComponents = set()
self.notPreferableOnServerComponents = set()
self.cardinalitiesDict = {}
self.componentLayoutSchemes = {}
self.loaded_service_advisors = False
def initialize_logger(self, name='DefaultStackAdvisor', logging_level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s %(funcName)s: - %(message)s'):
# set up logging (two separate loggers for stderr and stdout with different loglevels)
self.logger = logging.getLogger(name)
self.logger.setLevel(logging_level)
formatter = logging.Formatter(format)
chout = logging.StreamHandler(sys.stdout)
chout.setLevel(logging_level)
chout.setFormatter(formatter)
cherr = logging.StreamHandler(sys.stderr)
cherr.setLevel(logging.ERROR)
cherr.setFormatter(formatter)
self.logger.handlers = []
self.logger.addHandler(cherr)
self.logger.addHandler(chout)
def getServiceComponentLayoutValidations(self, services, hosts):
"""
Get a list of errors.
:param services: Dictionary of the form:
{
'changed-configurations': [],
'Versions": {
'parent_stack_version': '9.0',
'stack_name': 'HDP',
'stack_version': '9.0',
'stack_hierarchy': {
'stack_name': 'HDP',
'stack_versions': ['8.0', '7.0', ..., '1.0']
}
},
'ambari-server-properties': {'key': 'value', ...},
'services': [
{'StackServices': {
'advisor_path': '/var/lib/ambari-server/resources/common-services/MYSERVICE/1.2.3/service_advisor.py',
'service_version': '1.2.3',
'stack_name': 'HDP',
'service_name': 'MYSERVICE',
'stack_version': '9.0',
'advisor_name': 'MYSERVICEServiceAdvisor'
},
'components': [
{'StackServiceComponents': {
'stack_version': '9.0',
'decommission_allowed': True|False,
'display_name': 'My Service Display Name',
'stack_name': 'HDP',
'custom_commands': [],
'component_category': 'CLIENT|MASTER|SLAVE',
'advertise_version': True|False,
'is_client': True|False,
'is_master': False|False,
'bulk_commands_display_name': '',
'bulk_commands_master_component_name': '',
'service_name': 'MYSERVICE',
'has_bulk_commands_definition': True|False,
'reassign_allowed': True|False,
'recovery_enabled': True|False,
'cardinality': '0+|1|1+',
'hostnames': ['c6401.ambari.apache.org'],
'component_name': 'MY_COMPONENT_NAME'
},
'dependencies': []
},
...
}],
'configurations': [
{
'StackConfigurations':
{
'stack_name': 'HDP',
'service_name': 'MYSERVICE',
'stack_version': '9.0',
'property_depends_on': [],
'type': 'myservice-config.xml',
'property_name': 'foo'
},
'dependencies': []
},
{
'StackConfigurations': {
'stack_name': 'HDP',
'service_name': 'ZOOKEEPER',
'stack_version':
'2.6',
'property_depends_on': [],
'type': 'zoo.cfg.xml',
'property_name': 'autopurge.snapRetainCount'
},
'dependencies': []
}
...
]
}
],
'configurations': {}
}
:param hosts: Dictionary where hosts["items"] contains list of hosts on the cluster.
E.g. of the form,
{
'items': [
{
'Hosts':
{
'host_name': 'c6401.ambari.apache.org',
'public_host_name': 'c6401.ambari.apache.org',
'ip': '192.168.64.101',
'rack_info': '/default-rack',
'os_type': 'centos6',
'os_arch': 'x86_64',
'cpu_count': 1,
'ph_cpu_count': 1
'host_state': 'HEALTHY',
'total_mem': 2926196,
'host_status': 'HEALTHY',
'last_registration_time': 1481833146522,
'os_family': 'redhat6',
'last_heartbeat_time': 1481835051067,
'recovery_summary': 'DISABLED',
'host_health_report': '',
'desired_configs': None,
'disk_info': [
{
'available': '483608892',
'used': '3304964',
'percent': '1%',
'device': '/dev/mapper/VolGroup-lv_root',
'mountpoint': '/',
'type': 'ext4',
'size': '512971376'
},
...
],
'recovery_report': {
'component_reports': [],
'summary': 'DISABLED'
},
'last_agent_env': {
'transparentHugePage': 'always',
'hostHealth': {
'agentTimeStampAtReporting': 1481835031135,
'activeJavaProcs': [],
'serverTimeStampAtReporting': 1481835031180,
'liveServices': [{
'status': 'Healthy',
'name': 'ntpd',
'desc': ''
}]
},
'umask': 18,
'reverseLookup': True,
'alternatives': [],
'existingUsers': [],
'firewallName': 'iptables',
'stackFoldersAndFiles': [],
'existingRepos': [],
'installedPackages': [],
'firewallRunning': False
}
}
}
]
}
:return: List of errors
"""
# To be overriden by subclass or Service Advisor
raise Fail("Must be overriden by subclass or Service Advisor")
def getActiveHosts(self, hosts):
""" Filters the list of specified hosts object and returns
a list of hosts which are not in maintenance mode. """
hostsList = []
if hosts is not None:
hostsList = [host['host_name'] for host in hosts
if host.get('maintenance_state') is None or host.get('maintenance_state') == "OFF"]
return hostsList
def getServiceAdvisor(self, key):
"""
Get the class name for the Service Advisor with the given name if it exists, or None otherwise.
:param key: Service Name
:return: Class name if it exists, or None otherwise.
"""
if not self.loaded_service_advisors:
self.loadServiceAdvisors()
return self.serviceAdvisorsDict[key] if key in self.serviceAdvisorsDict else None
def loadServiceAdvisors(self):
"""
If not loaded, for all of the services requested load the Service Advisor into the in-memory dictionary.
"""
self.loaded_service_advisors = True
if self.services is None or "services" not in self.services:
return
for service in self.services["services"]:
serviceName = service["StackServices"]["service_name"]
serviceAdvisor = self.instantiateServiceAdvisor(service)
# This may store None for that service advisor.
self.serviceAdvisorsDict[serviceName] = serviceAdvisor
for component in service["components"]:
componentName = self.getComponentName(component)
self.serviceAdvisorsDict[componentName] = self.serviceAdvisorsDict[serviceName]
def instantiateServiceAdvisor(self, service):
"""
Load the Service Advisor for the given services by finding the best class in the given file.
:param service: Service object that contains a path to the advisor being requested.
:return: The class name for the Service Advisor requested, or None if one could not be found.
"""
service_name = service["StackServices"]["service_name"]
class_name = service["StackServices"]["advisor_name"] if "advisor_name" in service["StackServices"] else None
path = service["StackServices"]["advisor_path"] if "advisor_path" in service["StackServices"] else None
class_name_pattern = re.compile("%s.*?ServiceAdvisor" % service_name, re.IGNORECASE)
if path is not None and os.path.exists(path) and class_name is not None:
try:
with open(path, 'rb') as fp:
service_advisor = imp.load_module('service_advisor_impl', fp, path, ('.py', 'rb', imp.PY_SOURCE))
# Find the class name by reading from all of the available attributes of the python file.
attributes = dir(service_advisor)
best_class_name = class_name
for potential_class_name in attributes:
if not potential_class_name.startswith("__"):
m = class_name_pattern.match(potential_class_name)
if m:
best_class_name = potential_class_name
break
if hasattr(service_advisor, best_class_name):
self.logger.info("ServiceAdvisor implementation for service {0} was loaded".format(service_name))
return getattr(service_advisor, best_class_name)()
else:
self.logger.error("Failed to load or create ServiceAdvisor implementation for service {0}: " \
"Expecting class name {1} but it was not found.".format(service_name, best_class_name))
except Exception as e:
self.logger.exception("Failed to load or create ServiceAdvisor implementation for service {0}".format(service_name))
return None
def recommendComponentLayout(self, services, hosts):
"""Returns Services object with hostnames array populated for components"""
stackName = services["Versions"]["stack_name"]
stackVersion = services["Versions"]["stack_version"]
hostsList = self.getActiveHosts([host["Hosts"] for host in hosts["items"]])
servicesList = self.getServiceNames(services)
layoutRecommendations = self.createComponentLayoutRecommendations(services, hosts)
recommendations = {
"Versions": {"stack_name": stackName, "stack_version": stackVersion},
"hosts": hostsList,
"services": servicesList,
"recommendations": layoutRecommendations
}
return recommendations
def get_heap_size_properties(self, services):
"""
Get dictionary of all of the components and a mapping to the heap-size configs, along with default values
if the heap-size config could not be found. This is used in calculations for the total memory needed to run
the cluster.
:param services: Dictionary that contains all of the services being requested. This is used to find heap-size
configs that have been delegated to Service Advisors to define.
:return: Dictionary of mappings from component name to another dictionary of the heap-size configs.
"""
default = {
"NAMENODE":
[{"config-name": "hadoop-env",
"property": "namenode_heapsize",
"default": "1024m"}],
"SECONDARY_NAMENODE":
[{"config-name": "hadoop-env",
"property": "namenode_heapsize",
"default": "1024m"}],
"DATANODE":
[{"config-name": "hadoop-env",
"property": "dtnode_heapsize",
"default": "1024m"}],
"REGIONSERVER":
[{"config-name": "hbase-env",
"property": "hbase_regionserver_heapsize",
"default": "1024m"}],
"HBASE_MASTER":
[{"config-name": "hbase-env",
"property": "hbase_master_heapsize",
"default": "1024m"}],
"HIVE_CLIENT":
[{"config-name": "hive-env",
"property": "hive.client.heapsize",
"default": "1024m"}],
"HIVE_METASTORE":
[{"config-name": "hive-env",
"property": "hive.metastore.heapsize",
"default": "1024m"}],
"HIVE_SERVER":
[{"config-name": "hive-env",
"property": "hive.heapsize",
"default": "1024m"}],
"HISTORYSERVER":
[{"config-name": "mapred-env",
"property": "jobhistory_heapsize",
"default": "1024m"}],
"OOZIE_SERVER":
[{"config-name": "oozie-env",
"property": "oozie_heapsize",
"default": "1024m"}],
"RESOURCEMANAGER":
[{"config-name": "yarn-env",
"property": "resourcemanager_heapsize",
"default": "1024m"}],
"NODEMANAGER":
[{"config-name": "yarn-env",
"property": "nodemanager_heapsize",
"default": "1024m"}],
"APP_TIMELINE_SERVER":
[{"config-name": "yarn-env",
"property": "apptimelineserver_heapsize",
"default": "1024m"}],
"ZOOKEEPER_SERVER":
[{"config-name": "zookeeper-env",
"property": "zk_server_heapsize",
"default": "1024m"}],
"METRICS_COLLECTOR":
[{"config-name": "ams-hbase-env",
"property": "hbase_master_heapsize",
"default": "1024m"},
{"config-name": "ams-hbase-env",
"property": "hbase_regionserver_heapsize",
"default": "1024m"},
{"config-name": "ams-env",
"property": "metrics_collector_heapsize",
"default": "512m"}],
"ATLAS_SERVER":
[{"config-name": "atlas-env",
"property": "atlas_server_xmx",
"default": "2048m"}],
"LOGSEARCH_SERVER":
[{"config-name": "logsearch-env",
"property": "logsearch_app_max_memory",
"default": "1024m"}],
"LOGSEARCH_LOGFEEDER":
[{"config-name": "logfeeder-env",
"property": "logfeeder_max_mem",
"default": "512m"}],
"SPARK_JOBHISTORYSERVER":
[{"config-name": "spark-env",
"property": "spark_daemon_memory",
"default": "1024m"}],
"SPARK2_JOBHISTORYSERVER":
[{"config-name": "spark2-env",
"property": "spark_daemon_memory",
"default": "1024m"}]
}
try:
# Override any by reading from the Service Advisors
for service in services["services"]:
serviceName = service["StackServices"]["service_name"]
serviceAdvisor = self.getServiceAdvisor(serviceName)
# This seems confusing, but "self" may actually refer to the actual Service Advisor class that was loaded
# as opposed to this class.
advisor = serviceAdvisor if serviceAdvisor is not None else self
# TODO, switch this to a function instead of a property.
if hasattr(advisor, "heap_size_properties"):
# Override the values in "default" with those from the service advisor
default.update(advisor.heap_size_properties)
except Exception, e:
self.logger.exception()
return default
def createComponentLayoutRecommendations(self, services, hosts):
self.services = services
recommendations = {
"blueprint": {
"host_groups": [ ]
},
"blueprint_cluster_binding": {
"host_groups": [ ]
}
}
hostsList = self.getActiveHosts([host["Hosts"] for host in hosts["items"]])
# for fast lookup
hostsSet = set(hostsList)
hostsComponentsMap = {}
for hostName in hostsList:
if hostName not in hostsComponentsMap:
hostsComponentsMap[hostName] = []
#Sort the services so that the dependent services will be processed before those that depend on them.
sortedServices = self.getServicesSortedByDependencies(services)
#extend hostsComponentsMap' with MASTER components
for service in sortedServices:
masterComponents = [component for component in service["components"] if self.isMasterComponent(component)]
serviceName = service["StackServices"]["service_name"]
serviceAdvisor = self.getServiceAdvisor(serviceName)
for component in masterComponents:
componentName = component["StackServiceComponents"]["component_name"]
advisor = serviceAdvisor if serviceAdvisor is not None else self
#Filter the hosts such that only hosts that meet the dependencies are included (if possible)
filteredHosts = self.getFilteredHostsBasedOnDependencies(services, component, hostsList, hostsComponentsMap)
hostsForComponent = advisor.getHostsForMasterComponent(services, hosts, component, filteredHosts)
#extend 'hostsComponentsMap' with 'hostsForComponent'
for hostName in hostsForComponent:
if hostName in hostsSet:
hostsComponentsMap[hostName].append( { "name":componentName } )
#extend 'hostsComponentsMap' with Slave and Client Components
componentsListList = [service["components"] for service in services["services"]]
componentsList = [item for sublist in componentsListList for item in sublist]
usedHostsListList = [component["StackServiceComponents"]["hostnames"] for component in componentsList if not self.isComponentNotValuable(component)]
utilizedHosts = [item for sublist in usedHostsListList for item in sublist]
freeHosts = [hostName for hostName in hostsList if hostName not in utilizedHosts]
for service in sortedServices:
slaveClientComponents = [component for component in service["components"]
if self.isSlaveComponent(component) or self.isClientComponent(component)]
serviceName = service["StackServices"]["service_name"]
serviceAdvisor = self.getServiceAdvisor(serviceName)
for component in slaveClientComponents:
componentName = component["StackServiceComponents"]["component_name"]
advisor = serviceAdvisor if serviceAdvisor is not None else self
#Filter the hosts and free hosts such that only hosts that meet the dependencies are included (if possible)
filteredHosts = self.getFilteredHostsBasedOnDependencies(services, component, hostsList, hostsComponentsMap)
filteredFreeHosts = self.filterList(freeHosts, filteredHosts)
hostsForComponent = advisor.getHostsForSlaveComponent(services, hosts, component, filteredHosts, filteredFreeHosts)
#extend 'hostsComponentsMap' with 'hostsForComponent'
for hostName in hostsForComponent:
if hostName not in hostsComponentsMap and hostName in hostsSet:
hostsComponentsMap[hostName] = []
if hostName in hostsSet:
hostsComponentsMap[hostName].append( { "name": componentName } )
#colocate custom services
for service in sortedServices:
serviceName = service["StackServices"]["service_name"]
serviceAdvisor = self.getServiceAdvisor(serviceName)
if serviceAdvisor is not None:
serviceComponents = [component for component in service["components"]]
serviceAdvisor.colocateService(hostsComponentsMap, serviceComponents)
serviceAdvisor.colocateServiceWithServicesInfo(hostsComponentsMap, serviceComponents, services)
#prepare 'host-group's from 'hostsComponentsMap'
host_groups = recommendations["blueprint"]["host_groups"]
bindings = recommendations["blueprint_cluster_binding"]["host_groups"]
index = 0
for key in hostsComponentsMap.keys():
index += 1
host_group_name = "host-group-{0}".format(index)
host_groups.append( { "name": host_group_name, "components": hostsComponentsMap[key] } )
bindings.append( { "name": host_group_name, "hosts": [{ "fqdn": key }] } )
return recommendations
def getHostsForMasterComponent(self, services, hosts, component, hostsList):
if self.isComponentHostsPopulated(component):
return component["StackServiceComponents"]["hostnames"]
if len(hostsList) > 1 and self.isMasterComponentWithMultipleInstances(component):
hostsCount = self.getMinComponentCount(component, hosts)
if hostsCount > 1: # get first 'hostsCount' available hosts
hostsForComponent = []
hostIndex = 0
while hostsCount > len(hostsForComponent) and hostIndex < len(hostsList):
currentHost = hostsList[hostIndex]
if self.isHostSuitableForComponent(currentHost, component):
hostsForComponent.append(currentHost)
hostIndex += 1
return hostsForComponent
return [self.getHostForComponent(component, hostsList)]
def getHostsForSlaveComponent(self, services, hosts, component, hostsList, freeHosts):
if component["StackServiceComponents"]["cardinality"] == "ALL":
return hostsList
if self.isComponentHostsPopulated(component):
return component["StackServiceComponents"]["hostnames"]
hostsForComponent = []
componentName = component["StackServiceComponents"]["component_name"]
if self.isSlaveComponent(component):
cardinality = str(component["StackServiceComponents"]["cardinality"])
hostsMin, hostsMax = self.parseCardinality(cardinality, len(hostsList))
hostsMin, hostsMax = (0 if hostsMin is None else hostsMin, len(hostsList) if hostsMax is None else hostsMax)
if self.isComponentUsingCardinalityForLayout(componentName) and cardinality:
if hostsMin > len(hostsForComponent):
hostsForComponent.extend(freeHosts[0:hostsMin-len(hostsForComponent)])
else:
hostsForComponent.extend(freeHosts)
if not hostsForComponent: # hostsForComponent is empty
hostsForComponent = hostsList[-1:]
hostsForComponent = list(set(hostsForComponent)) # removing duplicates
if len(hostsForComponent) < hostsMin:
hostsForComponent = list(set(hostsList))[0:hostsMin]
elif len(hostsForComponent) > hostsMax:
hostsForComponent = list(set(hostsList))[0:hostsMax]
elif self.isClientComponent(component):
hostsForComponent = freeHosts[0:1]
if not hostsForComponent: # hostsForComponent is empty
hostsForComponent = hostsList[-1:]
return hostsForComponent
def getServicesSortedByDependencies(self, services):
"""
Sorts the services based on their dependencies. This is limited to non-conditional host scope dependencies.
Services with no dependencies will go first. Services with dependencies will go after the services they are dependent on.
If there are circular dependencies, the services will go in the order in which they were processed.
"""
processedServices = []
sortedServices = []
for service in services["services"]:
self.sortServicesByDependencies(services, service, processedServices, sortedServices)
return sortedServices
def sortServicesByDependencies(self, services, service, processedServices, sortedServices):
"""
Sorts the services based on their dependencies. This is limited to non-conditional host scope dependencies.
Services with no dependencies will go first. Services with dependencies will go after the services they are dependent on.
If there are circular dependencies, the services will go in the order in which they were processed.
"""
if service is None or service in processedServices:
return
processedServices.append(service)
components = [] if "components" not in service else service["components"]
for component in components:
dependencies = [] if "dependencies" not in component else component['dependencies']
for dependency in dependencies:
# accounts only for dependencies that are not conditional
conditionsPresent = "conditions" in dependency["Dependencies"] and dependency["Dependencies"]["conditions"]
scope = "cluster" if "scope" not in dependency["Dependencies"] else dependency["Dependencies"]["scope"]
if not conditionsPresent and scope == "host":
componentName = component["StackServiceComponents"]["component_name"]
requiredComponentName = dependency["Dependencies"]["component_name"]
requiredService = self.getServiceForComponentName(services, requiredComponentName)
self.sortServicesByDependencies(services, requiredService, processedServices, sortedServices)
sortedServices.append(service)
def getFilteredHostsBasedOnDependencies(self, services, component, hostsList, hostsComponentsMap):
"""
Returns a list of hosts that only includes the ones which have all host scope dependencies already assigned to them.
If an empty list would be returned, instead the full list of hosts are returned.
In that case, we can't possibly return a valid recommended layout so we will at least return a fully filled layout.
"""
removeHosts = []
dependencies = [] if "dependencies" not in component else component['dependencies']
for dependency in dependencies:
# accounts only for dependencies that are not conditional
conditionsPresent = "conditions" in dependency["Dependencies"] and dependency["Dependencies"]["conditions"]
if not conditionsPresent:
componentName = component["StackServiceComponents"]["component_name"]
requiredComponentName = dependency["Dependencies"]["component_name"]
requiredComponent = self.getRequiredComponent(services, requiredComponentName)
# We only deal with "host" scope.
if (requiredComponent is not None) and (requiredComponent["component_category"] != "CLIENT"):
scope = "cluster" if "scope" not in dependency["Dependencies"] else dependency["Dependencies"]["scope"]
if scope == "host":
for host, hostComponents in hostsComponentsMap.iteritems():
isRequiredIncluded = False
for hostComponent in hostComponents:
currentComponentName = None if "name" not in hostComponent else hostComponent["name"]
if requiredComponentName == currentComponentName:
isRequiredIncluded = True
if not isRequiredIncluded:
removeHosts.append(host)
filteredHostsList = []
for host in hostsList:
if host not in removeHosts:
filteredHostsList.append(host)
return filteredHostsList
def filterList(self, list, filter):
"""
Returns the union of the two lists passed in (list and filter params).
"""
filteredList = []
for item in list:
if item in filter:
filteredList.append(item)
return filteredList
def getServiceForComponentName(self, services, componentName):
"""
Return service for component name
:type services dict
:type componentName str
"""
for service in services["services"]:
for component in service["components"]:
if self.getComponentName(component) == componentName:
return service
return None
def isComponentUsingCardinalityForLayout(self, componentName):
return False
def createValidationResponse(self, services, validationItems):
"""Returns array of Validation objects about issues with hostnames components assigned to"""
stackName = services["Versions"]["stack_name"]
stackVersion = services["Versions"]["stack_version"]
validations = {
"Versions": {"stack_name": stackName, "stack_version": stackVersion},
"items": validationItems
}
return validations
def validateComponentLayout(self, services, hosts):
"""Returns array of Validation objects about issues with hostnames components assigned to"""
validationItems = self.getComponentLayoutValidations(services, hosts)
return self.createValidationResponse(services, validationItems)
def validateConfigurations(self, services, hosts):
"""Returns array of Validation objects about issues with hostnames components assigned to"""
self.services = services
validationItems = self.getConfigurationsValidationItems(services, hosts)
return self.createValidationResponse(services, validationItems)
def getComponentLayoutValidations(self, services, hosts):
self.services = services
items = []
if services is None:
return items
items.extend(self.validateRequiredComponentsPresent(services))
for service in services["services"]:
serviceName = service["StackServices"]["service_name"]
serviceAdvisor = self.getServiceAdvisor(serviceName)
if serviceAdvisor is not None:
items.extend(serviceAdvisor.getServiceComponentLayoutValidations(services, hosts))
return items
def validateRequiredComponentsPresent(self, services):
"""
Returns validation items derived from component dependencies as specified in service metainfo.xml for all services
:type services dict
:rtype list
"""
items = []
for service in services["services"]:
for component in service["components"]:
# Client components are not validated for the dependencies
# Rather dependent client components are auto-deployed in both UI deployments and blueprint deployments
if (self.isSlaveComponent(component) or self.isMasterComponent(component)) and \
component["StackServiceComponents"]["hostnames"]:
for dependency in component['dependencies']:
# account for only dependencies that are not conditional
conditionsPresent = "conditions" in dependency["Dependencies"] and dependency["Dependencies"]["conditions"]
if not conditionsPresent:
requiredComponent = self.getRequiredComponent(services, dependency["Dependencies"]["component_name"])
componentDisplayName = component["StackServiceComponents"]["display_name"]
requiredComponentDisplayName = requiredComponent["display_name"] \
if requiredComponent is not None else dependency["Dependencies"]["component_name"]
requiredComponentHosts = requiredComponent["hostnames"] if requiredComponent is not None else []
# Client dependencies are not included in validation
# Client dependencies are auto-deployed in both UI deployements and blueprint deployments
if (requiredComponent is None) or \
(requiredComponent["component_category"] != "CLIENT"):
scope = "cluster" if "scope" not in dependency["Dependencies"] else dependency["Dependencies"]["scope"]
if scope == "host":
componentHosts = component["StackServiceComponents"]["hostnames"]
requiredComponentHostsAbsent = []
for componentHost in componentHosts:
if componentHost not in requiredComponentHosts:
requiredComponentHostsAbsent.append(componentHost)
if requiredComponentHostsAbsent:
message = "{0} requires {1} to be co-hosted on following host(s): {2}.".format(componentDisplayName,
requiredComponentDisplayName, ', '.join(requiredComponentHostsAbsent))
items.append({ "type": 'host-component', "level": 'ERROR', "message": message,
"component-name": component["StackServiceComponents"]["component_name"]})
elif scope == "cluster" and not requiredComponentHosts:
message = "{0} requires {1} to be present in the cluster.".format(componentDisplayName, requiredComponentDisplayName)
items.append({ "type": 'host-component', "level": 'ERROR', "message": message, "component-name": component["StackServiceComponents"]["component_name"]})
return items
def calculateYarnAllocationSizes(self, configurations, services, hosts):
# initialize data
servicesList, componentsList = self.get_service_and_component_lists(services["services"])
putYarnProperty = self.putProperty(configurations, "yarn-site", services)
putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site")
# calculate memory properties and get cluster data dictionary with whole information
clusterSummary = self.getConfigurationClusterSummary(servicesList, hosts, componentsList, services)
# executing code from stack advisor HDP 206
nodemanagerMinRam = 1048576 # 1TB in mb
if "referenceNodeManagerHost" in clusterSummary:
nodemanagerMinRam = min(clusterSummary["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam)
callContext = self.getCallContext(services)
putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterSummary['containers'] * clusterSummary['ramPerContainer'], nodemanagerMinRam))))
if 'recommendConfigurations' == callContext:
putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterSummary['containers'] * clusterSummary['ramPerContainer'], nodemanagerMinRam))))
else:
# read from the supplied config
if "yarn-site" in services["configurations"] and "yarn.nodemanager.resource.memory-mb" in services["configurations"]["yarn-site"]["properties"]:
putYarnProperty('yarn.nodemanager.resource.memory-mb', int(services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]))
else:
putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterSummary['containers'] * clusterSummary['ramPerContainer'], nodemanagerMinRam))))
pass
pass
putYarnProperty('yarn.scheduler.minimum-allocation-mb', int(clusterSummary['yarnMinContainerSize']))
putYarnProperty('yarn.scheduler.maximum-allocation-mb', int(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]))
# executing code from stack advisor HDP 22
nodeManagerHost = self.getHostWithComponent("YARN", "NODEMANAGER", services, hosts)
if (nodeManagerHost is not None):
if "yarn-site" in services["configurations"] and "yarn.nodemanager.resource.percentage-physical-cpu-limit" in services["configurations"]["yarn-site"]["properties"]:
putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])
putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])
def getConfigurationClusterSummary(self, servicesList, hosts, components, services):
"""
Copied from HDP 2.0.6 so that it could be used by Service Advisors.
:return: Dictionary of memory and CPU attributes in the cluster
"""
hBaseInstalled = False
if 'HBASE' in servicesList:
hBaseInstalled = True
cluster = {
"cpu": 0,
"disk": 0,
"ram": 0,
"hBaseInstalled": hBaseInstalled,
"components": components
}
if len(hosts["items"]) > 0:
nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts)
# NodeManager host with least memory is generally used in calculations as it will work in larger hosts.
if nodeManagerHosts is not None and len(nodeManagerHosts) > 0:
nodeManagerHost = nodeManagerHosts[0];
for nmHost in nodeManagerHosts:
if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]:
nodeManagerHost = nmHost
host = nodeManagerHost["Hosts"]
cluster["referenceNodeManagerHost"] = host
else:
host = hosts["items"][0]["Hosts"]
cluster["referenceHost"] = host
cluster["cpu"] = host["cpu_count"]
cluster["disk"] = len(host["disk_info"])
cluster["ram"] = int(host["total_mem"] / (1024 * 1024))
ramRecommendations = [
{"os":1, "hbase":1},
{"os":2, "hbase":1},
{"os":2, "hbase":2},
{"os":4, "hbase":4},
{"os":6, "hbase":8},
{"os":8, "hbase":8},
{"os":8, "hbase":8},
{"os":12, "hbase":16},
{"os":24, "hbase":24},
{"os":32, "hbase":32},
{"os":64, "hbase":32}
]
index = {
cluster["ram"] <= 4: 0,
4 < cluster["ram"] <= 8: 1,
8 < cluster["ram"] <= 16: 2,
16 < cluster["ram"] <= 24: 3,
24 < cluster["ram"] <= 48: 4,
48 < cluster["ram"] <= 64: 5,
64 < cluster["ram"] <= 72: 6,
72 < cluster["ram"] <= 96: 7,
96 < cluster["ram"] <= 128: 8,
128 < cluster["ram"] <= 256: 9,
256 < cluster["ram"]: 10
}[1]
cluster["reservedRam"] = ramRecommendations[index]["os"]
cluster["hbaseRam"] = ramRecommendations[index]["hbase"]
cluster["minContainerSize"] = {
cluster["ram"] <= 3: 128,
3 < cluster["ram"] <= 4: 256,
4 < cluster["ram"] <= 8: 512,
8 < cluster["ram"] <= 24: 1024,
24 < cluster["ram"]: 2048
}[1]
totalAvailableRam = cluster["ram"] - cluster["reservedRam"]
if cluster["hBaseInstalled"]:
totalAvailableRam -= cluster["hbaseRam"]
cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024)
self.logger.info("Memory for YARN apps - cluster[totalAvailableRam]: " + str(cluster["totalAvailableRam"]))
suggestedMinContainerRam = 1024 # new smaller value for YARN min container
callContext = self.getCallContext(services)
operation = self.getUserOperationContext(services, DefaultStackAdvisor.OPERATION)
adding_yarn = self.isServiceBeingAdded(services, 'YARN')
if operation:
self.logger.info("user operation context : " + str(operation))
if services: # its never None but some unit tests pass it as None
# If min container value is changed (user is changing it)
# if its a validation call - just use what ever value is set
# If its a recommend attribute call (when UI lands on a page)
# If add service but YARN is not being added
if self.getOldValue(services, "yarn-site", "yarn.scheduler.minimum-allocation-mb") or \
'recommendConfigurations' != callContext or \
operation == DefaultStackAdvisor.RECOMMEND_ATTRIBUTE_OPERATION or \
(operation == DefaultStackAdvisor.ADD_SERVICE_OPERATION and not adding_yarn):
self.logger.info("Full context: callContext = " + str(callContext) +
" and operation = " + str(operation) + " and adding YARN = " + str(adding_yarn) +
" and old value exists = " +
str(self.getOldValue(services, "yarn-site", "yarn.scheduler.minimum-allocation-mb")))
'''yarn.scheduler.minimum-allocation-mb has changed - then pick this value up'''
if "yarn-site" in services["configurations"] and \
"yarn.scheduler.minimum-allocation-mb" in services["configurations"]["yarn-site"]["properties"] and \
str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]).isdigit():
self.logger.info("Using user provided yarn.scheduler.minimum-allocation-mb = " +
str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]))
cluster["yarnMinContainerSize"] = int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])
self.logger.info("Minimum ram per container due to user input - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
if cluster["yarnMinContainerSize"] > cluster["totalAvailableRam"]:
cluster["yarnMinContainerSize"] = cluster["totalAvailableRam"]
self.logger.info("Minimum ram per container after checking against limit - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
pass
cluster["minContainerSize"] = cluster["yarnMinContainerSize"] # set to what user has suggested as YARN min container size
suggestedMinContainerRam = cluster["yarnMinContainerSize"]
pass
pass
pass
'''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))'''
cluster["containers"] = int(round(max(3,
min(2 * cluster["cpu"],
min(ceil(1.8 * cluster["disk"]),
cluster["totalAvailableRam"] / cluster["minContainerSize"])))))
self.logger.info("Containers per node - cluster[containers]: " + str(cluster["containers"]))
if cluster["containers"] * cluster["minContainerSize"] > cluster["totalAvailableRam"]:
cluster["containers"] = int(ceil(cluster["totalAvailableRam"] / cluster["minContainerSize"]))
self.logger.info("Modified number of containers based on provided value for yarn.scheduler.minimum-allocation-mb")
pass
cluster["ramPerContainer"] = int(abs(cluster["totalAvailableRam"] / cluster["containers"]))
cluster["yarnMinContainerSize"] = min(suggestedMinContainerRam, cluster["ramPerContainer"])
self.logger.info("Ram per containers before normalization - cluster[ramPerContainer]: " + str(cluster["ramPerContainer"]))
'''If greater than cluster["yarnMinContainerSize"], value will be in multiples of cluster["yarnMinContainerSize"]'''
if cluster["ramPerContainer"] > cluster["yarnMinContainerSize"]:
cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / cluster["yarnMinContainerSize"]) * cluster["yarnMinContainerSize"]
cluster["mapMemory"] = int(cluster["ramPerContainer"])
cluster["reduceMemory"] = cluster["ramPerContainer"]
cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"])
self.logger.info("Min container size - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
self.logger.info("Available memory for map - cluster[mapMemory]: " + str(cluster["mapMemory"]))
self.logger.info("Available memory for reduce - cluster[reduceMemory]: " + str(cluster["reduceMemory"]))
self.logger.info("Available memory for am - cluster[amMemory]: " + str(cluster["amMemory"]))
return cluster
def getCallContext(self, services):
if services:
if DefaultStackAdvisor.ADVISOR_CONTEXT in services:
self.logger.info("call type context : " + str(services[DefaultStackAdvisor.ADVISOR_CONTEXT]))
return services[DefaultStackAdvisor.ADVISOR_CONTEXT][DefaultStackAdvisor.CALL_TYPE]
return ""
# if serviceName is being added
def isServiceBeingAdded(self, services, serviceName):
if services:
if 'user-context' in services.keys():
userContext = services["user-context"]
if DefaultStackAdvisor.OPERATION in userContext and \
'AddService' == userContext[DefaultStackAdvisor.OPERATION] and \
DefaultStackAdvisor.OPERATION_DETAILS in userContext:
if -1 != userContext["operation_details"].find(serviceName):
return True
return False
def getUserOperationContext(self, services, contextName):
if services:
if 'user-context' in services.keys():
userContext = services["user-context"]
if contextName in userContext:
return userContext[contextName]
return None
def get_system_min_uid(self):
login_defs = '/etc/login.defs'
uid_min_tag = 'UID_MIN'
comment_tag = '#'
uid_min = uid_default = '1000'
uid = None
if os.path.exists(login_defs):
with open(login_defs, 'r') as f:
data = f.read().split('\n')
# look for uid_min_tag in file
uid = filter(lambda x: uid_min_tag in x, data)
# filter all lines, where uid_min_tag was found in comments
uid = filter(lambda x: x.find(comment_tag) > x.find(uid_min_tag) or x.find(comment_tag) == -1, uid)
if uid is not None and len(uid) > 0:
uid = uid[0]
comment = uid.find(comment_tag)
tag = uid.find(uid_min_tag)
if comment == -1:
uid_tag = tag + len(uid_min_tag)
uid_min = uid[uid_tag:].strip()
elif comment > tag:
uid_tag = tag + len(uid_min_tag)
uid_min = uid[uid_tag:comment].strip()
# check result for value
try:
int(uid_min)
except ValueError:
return uid_default
return uid_min
def validateClusterConfigurations(self, configurations, services, hosts):
validationItems = []
return self.toConfigurationValidationProblems(validationItems, "")
def toConfigurationValidationProblems(self, validationProblems, siteName):
"""
Encapsulate the validation item's fields of "level" and "message" for the given validation's config-name.
:param validationProblems: List of validation problems
:param siteName: Config type
:return: List of configuration validation problems that include additional fields like the log level.
"""
result = []
for validationProblem in validationProblems:
validationItem = validationProblem.get("item", None)
if validationItem is not None:
problem = {"type": 'configuration', "level": validationItem["level"], "message": validationItem["message"],
"config-type": siteName, "config-name": validationProblem["config-name"] }
result.append(problem)
return result
def validateServiceConfigurations(self, serviceName):
return self.getServiceConfigurationValidators().get(serviceName, None)
def getServiceConfigurationValidators(self):
return {}
def validateMinMax(self, items, recommendedDefaults, configurations):
# required for casting to the proper numeric type before comparison
def convertToNumber(number):
try:
return int(number)
except ValueError:
return float(number)
for configName in configurations:
validationItems = []
if configName in recommendedDefaults and "property_attributes" in recommendedDefaults[configName]:
for propertyName in recommendedDefaults[configName]["property_attributes"]:
if propertyName in configurations[configName]["properties"]:
if "maximum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \
propertyName in recommendedDefaults[configName]["properties"]:
userValue = convertToNumber(configurations[configName]["properties"][propertyName])
maxValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["maximum"])
if userValue > maxValue:
validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is greater than the recommended maximum of {0} ".format(maxValue))}])
if "minimum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \
propertyName in recommendedDefaults[configName]["properties"]:
userValue = convertToNumber(configurations[configName]["properties"][propertyName])
minValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["minimum"])
if userValue < minValue:
validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is less than the recommended minimum of {0} ".format(minValue))}])
items.extend(self.toConfigurationValidationProblems(validationItems, configName))
pass
def getConfigurationsValidationItems(self, services, hosts):
"""Returns array of Validation objects about issues with configuration values provided in services"""
items = []
recommendations = self.recommendConfigurations(services, hosts)
recommendedDefaults = recommendations["recommendations"]["blueprint"]["configurations"]
configurations = services["configurations"]
for service in services["services"]:
items.extend(self.getConfigurationsValidationItemsForService(configurations, recommendedDefaults, service, services, hosts))
clusterWideItems = self.validateClusterConfigurations(configurations, services, hosts)
items.extend(clusterWideItems)
self.validateMinMax(items, recommendedDefaults, configurations)
return items
def validateListOfConfigUsingMethod(self, configurations, recommendedDefaults, services, hosts, validators):
"""
Service Advisors can use this method to pass in a list of validators, each of which is a tuple of a
a config type (string) and a function (pointer). Each validator is then executed.
:param validators: List of tuples like [("hadoop-env", someFunctionPointer), ("hdfs-site", someFunctionPointer)]
:return: List of validation errors
"""
items = []
for (configType, method) in validators:
if configType in recommendedDefaults:
siteProperties = self.getSiteProperties(configurations, configType)
if siteProperties is not None:
siteRecommendations = recommendedDefaults[configType]["properties"]
self.logger.info("SiteName: %s, method: %s" % (configType, method.__name__))
self.logger.info("Site properties: %s" % str(siteProperties))
self.logger.info("Recommendations: %s" % str(siteRecommendations))
validationItems = method(siteProperties, siteRecommendations, configurations, services, hosts)
items.extend(validationItems)
return items
def validateConfigurationsForSite(self, configurations, recommendedDefaults, services, hosts, siteName, method):
"""
Deprecated, please use validateListOfConfigUsingMethod
:return: List of validation errors by calling the corresponding method.
"""
if siteName in recommendedDefaults:
siteProperties = self.getSiteProperties(configurations, siteName)
if siteProperties is not None:
siteRecommendations = recommendedDefaults[siteName]["properties"]
self.logger.info("SiteName: %s, method: %s" % (siteName, method.__name__))
self.logger.info("Recommendations: %s" % str(siteRecommendations))
return method(siteProperties, siteRecommendations, configurations, services, hosts)
return []
def getConfigurationsValidationItemsForService(self, configurations, recommendedDefaults, service, services, hosts):
items = []
serviceName = service["StackServices"]["service_name"]
validator = self.validateServiceConfigurations(serviceName)
if validator is not None:
for siteName, method in validator.items():
resultItems = self.validateConfigurationsForSite(configurations, recommendedDefaults, services, hosts, siteName, method)
items.extend(resultItems)
serviceAdvisor = self.getServiceAdvisor(serviceName)
if serviceAdvisor is not None:
items.extend(serviceAdvisor.getServiceConfigurationsValidationItems(configurations, recommendedDefaults, services, hosts))
return items
def recommendConfigGroupsConfigurations(self, recommendations, services, components, hosts,
servicesList):
recommendations["recommendations"]["config-groups"] = []
for configGroup in services["config-groups"]:
# Override configuration with the config group values
cgServices = services.copy()
for configName in configGroup["configurations"].keys():
if configName in cgServices["configurations"]:
cgServices["configurations"][configName]["properties"].update(
configGroup["configurations"][configName]['properties'])
else:
cgServices["configurations"][configName] = \
configGroup["configurations"][configName]
# Override hosts with the config group hosts
cgHosts = {"items": [host for host in hosts["items"] if
host["Hosts"]["host_name"] in configGroup["hosts"]]}
# Override clusterSummary
cgClusterSummary = self.getConfigurationClusterSummary(servicesList,
cgHosts,
components,
cgServices)
configurations = {}
# there can be dependencies between service recommendations which require special ordering
# for now, make sure custom services (that have service advisors) run after standard ones
serviceAdvisors = []
for service in services["services"]:
serviceName = service["StackServices"]["service_name"]
# "calculation" is a function pointer
calculation = self.getServiceConfigurationRecommender(serviceName)
if calculation is not None:
calculation(configurations, cgClusterSummary, cgServices, cgHosts)
else:
serviceAdvisor = self.getServiceAdvisor(serviceName)
if serviceAdvisor is not None:
serviceAdvisors.append(serviceAdvisor)
for serviceAdvisor in serviceAdvisors:
serviceAdvisor.getServiceConfigurationRecommendations(configurations, cgClusterSummary, cgServices, cgHosts)
cgRecommendation = {
"configurations": {},
"dependent_configurations": {},
"hosts": configGroup["hosts"]
}
recommendations["recommendations"]["config-groups"].append(
cgRecommendation)
# Parse results.
for config in configurations.keys():
cgRecommendation["configurations"][config] = {}
cgRecommendation["dependent_configurations"][config] = {}
# property + property_attributes
for configElement in configurations[config].keys():
cgRecommendation["configurations"][config][configElement] = {}
cgRecommendation["dependent_configurations"][config][
configElement] = {}
for property, value in configurations[config][configElement].items():
if config in configGroup["configurations"]:
cgRecommendation["configurations"][config][configElement][
property] = value
else:
cgRecommendation["dependent_configurations"][config][
configElement][property] = value
def recommendConfigurations(self, services, hosts):
self.services = services
stackName = services["Versions"]["stack_name"]
stackVersion = services["Versions"]["stack_version"]
hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]]
servicesList, componentsList = self.get_service_and_component_lists(services["services"])
clusterSummary = self.getConfigurationClusterSummary(servicesList, hosts, componentsList, services)
recommendations = {
"Versions": {"stack_name": stackName, "stack_version": stackVersion},
"hosts": hostsList,
"services": servicesList,
"recommendations": {
"blueprint": {
"configurations": {},
"host_groups": []
},
"blueprint_cluster_binding": {
"host_groups": []
}
}
}
# If recommendation for config groups
if "config-groups" in services:
self.recommendConfigGroupsConfigurations(recommendations, services, componentsList, hosts,
servicesList)
else:
configurations = recommendations["recommendations"]["blueprint"]["configurations"]
# there can be dependencies between service recommendations which require special ordering
# for now, make sure custom services (that have service advisors) run after standard ones
serviceAdvisors = []
for service in services["services"]:
serviceName = service["StackServices"]["service_name"]
calculation = self.getServiceConfigurationRecommender(serviceName)
if calculation is not None:
calculation(configurations, clusterSummary, services, hosts)
else:
serviceAdvisor = self.getServiceAdvisor(serviceName)
if serviceAdvisor is not None:
serviceAdvisors.append(serviceAdvisor)
for serviceAdvisor in serviceAdvisors:
serviceAdvisor.getServiceConfigurationRecommendations(configurations, clusterSummary, services, hosts)
return recommendations
def recommendConfigurationsForSSO(self, services, hosts):
self.services = services
stackName = services["Versions"]["stack_name"]
stackVersion = services["Versions"]["stack_version"]
hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]]
servicesList, componentsList = self.get_service_and_component_lists(services["services"])
clusterSummary = self.getConfigurationClusterSummary(servicesList, hosts, componentsList, services)
recommendations = {
"Versions": {"stack_name": stackName, "stack_version": stackVersion},
"hosts": hostsList,
"services": servicesList,
"recommendations": {
"blueprint": {
"configurations": {},
"host_groups": []
},
"blueprint_cluster_binding": {
"host_groups": []
}
}
}
# If recommendation for config groups
if "config-groups" in services:
self.recommendConfigGroupsConfigurations(recommendations, services, componentsList, hosts,
servicesList)
else:
configurations = recommendations["recommendations"]["blueprint"]["configurations"]
# there can be dependencies between service recommendations which require special ordering
# for now, make sure custom services (that have service advisors) run after standard ones
serviceAdvisors = []
recommenderDict = self.getServiceConfigurationRecommenderForSSODict()
for service in services["services"]:
serviceName = service["StackServices"]["service_name"]
calculation = recommenderDict.get(serviceName, None)
if calculation is not None:
calculation(configurations, clusterSummary, services, hosts)
else:
serviceAdvisor = self.getServiceAdvisor(serviceName)
if serviceAdvisor is not None:
serviceAdvisors.append(serviceAdvisor)
for serviceAdvisor in serviceAdvisors:
serviceAdvisor.getServiceConfigurationRecommendationsForSSO(configurations, clusterSummary, services, hosts)
return recommendations
def recommendConfigurationsForKerberos(self, services, hosts):
self.services = services
stackName = services["Versions"]["stack_name"]
stackVersion = services["Versions"]["stack_version"]
hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]]
servicesList, componentsList = self.get_service_and_component_lists(services["services"])
clusterSummary = self.getConfigurationClusterSummary(servicesList, hosts, componentsList, services)
recommendations = {
"Versions": {
"stack_name": stackName,
"stack_version": stackVersion
},
"hosts": hostsList,
"services": servicesList,
"recommendations": {
"blueprint": {
"configurations": {},
"host_groups": []
},
"blueprint_cluster_binding": {
"host_groups": []
}
}
}
# If recommendation for config groups
if "config-groups" in services:
self.recommendConfigGroupsConfigurations(recommendations, services, componentsList, hosts,
servicesList)
else:
configurations = recommendations["recommendations"]["blueprint"]["configurations"]
# there can be dependencies between service recommendations which require special ordering
# for now, make sure custom services (that have service advisors) run after standard ones
serviceAdvisors = []
recommenderDict = self.getServiceConfigurationRecommenderForKerberosDict()
for service in services["services"]:
serviceName = service["StackServices"]["service_name"]
calculation = recommenderDict.get(serviceName, None)
if calculation is not None:
calculation(configurations, clusterSummary, services, hosts)
else:
serviceAdvisor = self.getServiceAdvisor(serviceName)
if serviceAdvisor is not None:
serviceAdvisors.append(serviceAdvisor)
for serviceAdvisor in serviceAdvisors:
serviceAdvisor.getServiceConfigurationRecommendationsForKerberos(configurations, clusterSummary, services, hosts)
return recommendations
def getServiceConfigurationRecommender(self, service):
return self.getServiceConfigurationRecommenderDict().get(service, None)
def getServiceConfigurationRecommenderDict(self):
return {}
def getServiceConfigurationRecommenderForSSODict(self):
return {}
def getServiceConfigurationRecommenderForKerberosDict(self):
return {}
# Recommendation helper methods
def isComponentHostsPopulated(self, component):
hostnames = self.getComponentAttribute(component, "hostnames")
if hostnames is not None:
return len(hostnames) > 0
return False
def checkSiteProperties(self, siteProperties, *propertyNames):
"""
Check if properties defined in site properties.
:param siteProperties: config properties dict
:param *propertyNames: property names to validate
:returns: True if all properties defined, in other cases returns False
"""
if siteProperties is None:
return False
for name in propertyNames:
if not (name in siteProperties):
return False
return True
def get_ambari_configuration(self, services):
"""
Gets the AmbariConfiguration object that can be used to request details about
the Ambari configuration. For example LDAP and SSO configurations
:param services: the services structure containing the "ambari-server-configurations" block
:return: an AmbariConfiguration
"""
return AmbariConfiguration(services)
def is_secured_cluster(self, services):
"""
Detects if cluster is secured or not
:type services dict
:rtype bool
"""
return services and "cluster-env" in services["configurations"] and \
"security_enabled" in services["configurations"]["cluster-env"]["properties"] and \
services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true"
def getZKHostPortString(self, services, include_port=True):
"""
Returns the comma delimited string of zookeeper server host with the configure port installed in a cluster
Example: zk.host1.org:2181,zk.host2.org:2181,zk.host3.org:2181
include_port boolean param -> If port is also needed.
"""
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
include_zookeeper = "ZOOKEEPER" in servicesList
zookeeper_host_port = ''
if include_zookeeper:
zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services)
zookeeper_host_port_arr = []
if include_port:
zookeeper_port = self.getZKPort(services)
for i in range(len(zookeeper_hosts)):
zookeeper_host_port_arr.append(zookeeper_hosts[i] + ':' + zookeeper_port)
else:
for i in range(len(zookeeper_hosts)):
zookeeper_host_port_arr.append(zookeeper_hosts[i])
zookeeper_host_port = ",".join(zookeeper_host_port_arr)
return zookeeper_host_port
def getZKPort(self, services):
zookeeper_port = '2181' #default port
if 'zoo.cfg' in services['configurations'] and ('clientPort' in services['configurations']['zoo.cfg']['properties']):
zookeeper_port = services['configurations']['zoo.cfg']['properties']['clientPort']
return zookeeper_port
def isClientComponent(self, component):
return self.getComponentAttribute(component, "component_category") == 'CLIENT'
def isSlaveComponent(self, component):
return self.getComponentAttribute(component, "component_category") == 'SLAVE'
def isMasterComponent(self, component):
return self.getComponentAttribute(component, "is_master")
def getRequiredComponent(self, services, componentName):
"""
Return Category for component
:type services dict
:type componentName str
:rtype dict
"""
componentsListList = [service["components"] for service in services["services"]]
componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
component = next((component for component in componentsList
if component["component_name"] == componentName), None)
if component is None and componentName == 'HDFS_CLIENT':
component = next((component for component in componentsList if component["component_type"] == 'HCFS_CLIENT'), None)
return component
def getComponentAttribute(self, component, attribute):
serviceComponent = component.get("StackServiceComponents", None)
if serviceComponent is None:
return None
return serviceComponent.get(attribute, None)
def isLocalHost(self, hostName):
return socket.getfqdn(hostName) == socket.getfqdn()
def isMasterComponentWithMultipleInstances(self, component):
componentName = self.getComponentName(component)
masters = self.getMastersWithMultipleInstances()
return componentName in masters
def isComponentNotValuable(self, component):
componentName = self.getComponentName(component)
service = self.getNotValuableComponents()
return componentName in service
def getMinComponentCount(self, component, hosts):
componentName = self.getComponentName(component)
return self.getComponentCardinality(componentName, hosts)["min"]
# Helper dictionaries
def getComponentCardinality(self, componentName, hosts):
dict = self.getCardinalitiesDict(hosts)
if componentName in dict:
return dict[componentName]
else:
return {"min": 1, "max": 1}
def isServiceDeployed(self, services, serviceName):
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
return serviceName in servicesList
def getHostForComponent(self, component, hostsList):
if len(hostsList) == 0:
return None
componentName = self.getComponentName(component)
if len(hostsList) != 1:
scheme = self.getComponentLayoutSchemes().get(componentName, None)
if scheme is not None:
hostIndex = next((index for key, index in scheme.iteritems() if isinstance(key, (int, long)) and len(hostsList) < key), scheme['else'])
else:
hostIndex = 0
for host in hostsList[hostIndex:]:
if self.isHostSuitableForComponent(host, component):
return host
return hostsList[0]
def getComponentName(self, component):
return self.getComponentAttribute(component, "component_name")
def isHostSuitableForComponent(self, host, component):
return not (self.getComponentName(component) in self.getNotPreferableOnServerComponents() and self.isLocalHost(host))
def getMastersWithMultipleInstances(self):
return self.mastersWithMultipleInstances
def getNotValuableComponents(self):
return self.notValuableComponents
def getNotPreferableOnServerComponents(self):
return self.notPreferableOnServerComponents
def getCardinalitiesDict(self, hosts):
return self.cardinalitiesDict
def getComponentLayoutSchemes(self):
"""
Provides layout scheme dictionaries for components.
The scheme dictionary basically maps the number of hosts to
host index where component should exist.
"""
return self.componentLayoutSchemes
def getWarnItem(self, message):
"""
Utility method used for validation warnings.
"""
return {"level": "WARN", "message": message}
def getErrorItem(self, message):
"""
Utility method used for validation errors.
"""
return {"level": "ERROR", "message": message}
def getNotApplicableItem(self, message):
'''
Creates report about validation error that can not be ignored.
UI should not allow the proceeding of work.
:param message: error description.
:return: report about error.
'''
return {"level": "NOT_APPLICABLE", "message": message}
def getComponentHostNames(self, servicesDict, serviceName, componentName):
for service in servicesDict["services"]:
if service["StackServices"]["service_name"] == serviceName:
for component in service['components']:
if component["StackServiceComponents"]["component_name"] == componentName:
return component["StackServiceComponents"]["hostnames"]
def recommendConfigurationDependencies(self, services, hosts):
self.allRequestedProperties = self.getAllRequestedProperties(services)
result = self.recommendConfigurations(services, hosts)
return self.filterResult(result, services)
# returns recommendations only for changed and depended properties
def filterResult(self, result, services):
allRequestedProperties = self.getAllRequestedProperties(services)
self.filterConfigs(result['recommendations']['blueprint']['configurations'], allRequestedProperties)
if "config-groups" in services:
for configGroup in result['recommendations']["config-groups"]:
self.filterConfigs(configGroup["configurations"], allRequestedProperties)
self.filterConfigs(configGroup["dependent_configurations"], allRequestedProperties)
return result
def filterConfigs(self, configs, requestedProperties):
filteredConfigs = {}
for type, names in configs.items():
if 'properties' in names.keys():
for name in names['properties']:
if type in requestedProperties.keys() and \
name in requestedProperties[type]:
if type not in filteredConfigs.keys():
filteredConfigs[type] = {'properties': {}}
filteredConfigs[type]['properties'][name] = \
configs[type]['properties'][name]
if 'property_attributes' in names.keys():
for name in names['property_attributes']:
if type in requestedProperties.keys() and \
name in requestedProperties[type]:
if type not in filteredConfigs.keys():
filteredConfigs[type] = {'property_attributes': {}}
elif 'property_attributes' not in filteredConfigs[type].keys():
filteredConfigs[type]['property_attributes'] = {}
filteredConfigs[type]['property_attributes'][name] = \
configs[type]['property_attributes'][name]
configs.clear()
configs.update(filteredConfigs)
def getAllRequestedProperties(self, services):
affectedConfigs = self.getAffectedConfigs(services)
allRequestedProperties = {}
for config in affectedConfigs:
if config['type'] in allRequestedProperties:
allRequestedProperties[config['type']].append(config['name'])
else:
allRequestedProperties[config['type']] = [config['name']]
return allRequestedProperties
def getAffectedConfigs(self, services):
"""returns properties dict including changed-configurations and depended-by configs"""
changedConfigs = services['changed-configurations']
changedConfigs = [{"type": entry["type"], "name": entry["name"]} for entry in changedConfigs]
allDependencies = []
for item in services['services']:
allDependencies.extend(item['configurations'])
dependencies = []
size = -1
while size != len(dependencies):
size = len(dependencies)
for config in allDependencies:
property = {
"type": re.sub('\.xml$', '', config['StackConfigurations']['type']),
"name": config['StackConfigurations']['property_name']
}
if property in dependencies or property in changedConfigs:
for dependedConfig in config['dependencies']:
dependency = {
"name": dependedConfig["StackConfigurationDependency"]["dependency_name"],
"type": dependedConfig["StackConfigurationDependency"]["dependency_type"]
}
if dependency not in dependencies:
dependencies.append(dependency)
if "forced-configurations" in services and services["forced-configurations"] is not None:
dependencies.extend(services["forced-configurations"])
return dependencies
def versionCompare(self, version1, version2):
def normalize(v):
return [int(x) for x in re.sub(r'(\.0+)*$','', v).split(".")]
return cmp(normalize(version1), normalize(version2))
pass
def getSiteProperties(self, configurations, siteName):
siteConfig = configurations.get(siteName)
if siteConfig is None:
return None
return siteConfig.get("properties")
def getServicesSiteProperties(self, services, siteName):
if not services:
return None
configurations = services.get("configurations")
if not configurations:
return None
siteConfig = configurations.get(siteName)
if siteConfig is None:
return None
return siteConfig.get("properties")
def putProperty(self, config, configType, services=None):
userConfigs = {}
changedConfigs = []
# if services parameter, prefer values, set by user
if services:
if 'configurations' in services.keys():
userConfigs = services['configurations']
if 'changed-configurations' in services.keys():
changedConfigs = services["changed-configurations"]
if configType not in config:
config[configType] = {}
if"properties" not in config[configType]:
config[configType]["properties"] = {}
def appendProperty(key, value):
# If property exists in changedConfigs, do not override, use user defined property
if not self.isPropertyRequested(configType, key, changedConfigs) \
and configType in userConfigs and key in userConfigs[configType]['properties']:
config[configType]["properties"][key] = userConfigs[configType]['properties'][key]
else:
config[configType]["properties"][key] = str(value)
return appendProperty
def __isPropertyInChangedConfigs(self, configType, propertyName, changedConfigs):
for changedConfig in changedConfigs:
if changedConfig['type']==configType and changedConfig['name']==propertyName:
return True
return False
def isPropertyRequested(self, configType, propertyName, changedConfigs):
# When the property depends on more than one property, we need to recalculate it based on the actual values
# of all related properties. But "changed-configurations" usually contains only one on the dependent on properties.
# So allRequestedProperties is used to avoid recommendations of other properties that are not requested.
# Calculations should use user provided values for all properties that we depend on, not only the one that
# came in the "changed-configurations".
if self.allRequestedProperties:
return configType in self.allRequestedProperties and propertyName in self.allRequestedProperties[configType]
else:
return not self.__isPropertyInChangedConfigs(configType, propertyName, changedConfigs)
def updateProperty(self, config, configType, services=None):
userConfigs = {}
changedConfigs = []
# if services parameter, prefer values, set by user
if services:
if 'configurations' in services.keys():
userConfigs = services['configurations']
if 'changed-configurations' in services.keys():
changedConfigs = services["changed-configurations"]
if configType not in config:
config[configType] = {}
if "properties" not in config[configType]:
config[configType]["properties"] = {}
def updatePropertyWithCallback(key, value, callback):
# If property exists in changedConfigs, do not override, use user defined property
if self.__isPropertyInChangedConfigs(configType, key, changedConfigs):
config[configType]["properties"][key] = userConfigs[configType]['properties'][key]
else:
# Give the callback an empty string if the mapping doesn't exist
current_value = ""
if key in config[configType]["properties"]:
current_value = config[configType]["properties"][key]
config[configType]["properties"][key] = callback(current_value, value)
return updatePropertyWithCallback
def putPropertyAttribute(self, config, configType):
if configType not in config:
config[configType] = {}
def appendPropertyAttribute(key, attribute, attributeValue):
if "property_attributes" not in config[configType]:
config[configType]["property_attributes"] = {}
if key not in config[configType]["property_attributes"]:
config[configType]["property_attributes"][key] = {}
config[configType]["property_attributes"][key][attribute] = attributeValue if isinstance(attributeValue, list) else str(attributeValue)
return appendPropertyAttribute
def getHosts(self, componentsList, componentName):
"""
Returns the hosts which are running the given component.
"""
hostNamesList = [component["hostnames"] for component in componentsList if component["component_name"] == componentName]
return hostNamesList[0] if len(hostNamesList) > 0 else []
def getServiceComponents(self, services, serviceName):
"""
Return list of components for serviceName service
:type services dict
:type serviceName str
:rtype list
"""
components = []
if not services or not serviceName:
return components
for service in services["services"]:
if service["StackServices"]["service_name"] == serviceName:
components.extend(service["components"])
break
return components
def getHostsForComponent(self, services, serviceName, componentName):
"""
Returns the host(s) on which a requested service's component is hosted.
:argument services Configuration information for the cluster
:argument serviceName Passed-in service in consideration
:argument componentName Passed-in component in consideration
:type services dict
:type serviceName str
:type componentName str
:rtype list
"""
hosts_for_component = []
components = self.getServiceComponents(services, serviceName)
for component in components:
if component["StackServiceComponents"]["component_name"] == componentName:
hosts_for_component.extend(component["StackServiceComponents"]["hostnames"])
break
return hosts_for_component
def getMountPoints(self, hosts):
"""
Return list of mounts available on the hosts
:type hosts dict
"""
mount_points = []
for item in hosts["items"]:
if "disk_info" in item["Hosts"]:
mount_points.append(item["Hosts"]["disk_info"])
return mount_points
def getStackRoot(self, services):
"""
Gets the stack root associated with the stack
:param services: the services structure containing the current configurations
:return: the stack root as specified in the config or /usr/hdp
"""
cluster_env = self.getServicesSiteProperties(services, "cluster-env")
stack_root = "/usr/hdp"
if cluster_env and "stack_root" in cluster_env:
stack_root_as_str = cluster_env["stack_root"]
stack_roots = json.loads(stack_root_as_str)
if "stack_name" in cluster_env:
stack_name = cluster_env["stack_name"]
if stack_name in stack_roots:
stack_root = stack_roots[stack_name]
return stack_root
def isSecurityEnabled(self, services):
"""
Determines if security is enabled by testing the value of cluster-env/security enabled.
If the property exists and is equal to "true", then is it enabled; otherwise is it assumed to be
disabled.
This is an alias for stacks.stack_advisor.DefaultStackAdvisor#is_secured_cluster
:param services: the services structure containing the current configurations
:return: true if security is enabled; otherwise false
"""
return self.is_secured_cluster(services)
def parseCardinality(self, cardinality, hostsCount):
"""
Cardinality types: 1+, 1-2, 1, ALL
@return: a tuple: (minHosts, maxHosts) or (None, None) if cardinality string is invalid
"""
if not cardinality:
return (None, None)
if "+" in cardinality:
return (int(cardinality[:-1]), int(hostsCount))
elif "-" in cardinality:
nums = cardinality.split("-")
return (int(nums[0]), int(nums[1]))
elif "ALL" == cardinality:
return (int(hostsCount), int(hostsCount))
elif cardinality.isdigit():
return (int(cardinality),int(cardinality))
return (None, None)
def getServiceNames(self, services):
return [service["StackServices"]["service_name"] for service in services["services"]]
def filterHostMounts(self, hosts, services):
"""
Filter mounts on the host using agent_mounts_ignore_list, by excluding and record with mount-point
mentioned in agent_mounts_ignore_list.
This function updates hosts dictionary
Example:
agent_mounts_ignore_list : "/run/secrets"
Hosts record :
"disk_info" : [
{
...
"mountpoint" : "/"
},
{
...
"mountpoint" : "/run/secrets"
}
]
Result would be :
"disk_info" : [
{
...
"mountpoint" : "/"
}
]
:type hosts dict
:type services dict
"""
if not services or "items" not in hosts:
return hosts
banned_filesystems = ["devtmpfs", "tmpfs", "vboxsf", "cdfs"]
banned_mount_points = ["/etc/resolv.conf", "/etc/hostname", "/boot", "/mnt", "/tmp", "/run/secrets"]
cluster_env = self.getServicesSiteProperties(services, "cluster-env")
ignore_list = []
if cluster_env and "agent_mounts_ignore_list" in cluster_env and cluster_env["agent_mounts_ignore_list"].strip():
ignore_list = [x.strip() for x in cluster_env["agent_mounts_ignore_list"].strip().split(",")]
ignore_list.extend(banned_mount_points)
for host in hosts["items"]:
if "Hosts" not in host and "disk_info" not in host["Hosts"]:
continue
host = host["Hosts"]
disk_info = []
for disk in host["disk_info"]:
if disk["mountpoint"] not in ignore_list\
and disk["type"].lower() not in banned_filesystems:
disk_info.append(disk)
host["disk_info"] = disk_info
return hosts
def __getSameHostMounts(self, hosts):
"""
Return list of the mounts which are same and present on all hosts
:type hosts dict
:rtype list
"""
if not hosts:
return None
hostMounts = self.getMountPoints(hosts)
mounts = []
for m in hostMounts:
host_mounts = set([item["mountpoint"] for item in m])
mounts = host_mounts if not mounts else mounts & host_mounts
return sorted(mounts)
def getMountPathVariations(self, initial_value, component_name, services, hosts, banned_mounts=[]):
"""
Recommends best fitted mount by prefixing path with it.
:return return list of paths with properly selected paths. If no recommendation possible,
would be returned empty list
:type initial_value str
:type component_name str
:type services dict
:type hosts dict
:type banned_mounts list
:rtype list
"""
available_mounts = []
if not initial_value:
return available_mounts
mounts = self.__getSameHostMounts(hosts)
for banned in banned_mounts:
mounts.remove(banned)
sep = "/"
if not mounts:
return available_mounts
for mount in mounts:
new_mount = initial_value if mount == "/" else os.path.join(mount + sep, initial_value.lstrip(sep))
if new_mount not in available_mounts:
available_mounts.append(new_mount)
# no list transformations after filling the list, because this will cause item order change
return available_mounts
def getMountPathVariation(self, initial_value, component_name, services, hosts, banned_mounts=[]):
"""
Recommends best fitted mount by prefixing path with it.
:return return list of paths with properly selected paths. If no recommendation possible,
would be returned empty list
:type initial_value str
:type component_name str
:type services dict
:type hosts dict
:type banned_mounts list
:rtype str
"""
try:
return [self.getMountPathVariations(initial_value, component_name, services, hosts, banned_mounts)[0]]
except IndexError:
return []
def updateMountProperties(self, siteConfig, propertyDefinitions, configurations, services, hosts, banned_mounts=[]):
"""
Update properties according to recommendations for available mount-points
propertyDefinitions is an array of set : property name, component name, initial value, recommendation type
Where,
property name - name of the property
component name, name of the component to which belongs this property
initial value - initial path
recommendation type - could be "multi" or "single". This describes recommendation strategy, to use only one disk
or use all available space on the host
:type propertyDefinitions list
:type siteConfig str
:type configurations dict
:type services dict
:type hosts dict
:type banned_mounts list
"""
props = self.getServicesSiteProperties(services, siteConfig)
put_f = self.putProperty(configurations, siteConfig, services)
for prop_item in propertyDefinitions:
name, component, default_value, rc_type = prop_item
recommendation = None
if props is None or name not in props:
if rc_type == "multi":
recommendation = self.getMountPathVariations(default_value, component, services, hosts, banned_mounts)
else:
recommendation = self.getMountPathVariation(default_value, component, services, hosts, banned_mounts)
elif props and name in props and props[name] == default_value:
if rc_type == "multi":
recommendation = self.getMountPathVariations(default_value, component, services, hosts, banned_mounts)
else:
recommendation = self.getMountPathVariation(default_value, component, services, hosts, banned_mounts)
if recommendation:
put_f(name, ",".join(recommendation))
def getHostNamesWithComponent(self, serviceName, componentName, services):
"""
Returns the list of hostnames on which service component is installed
"""
if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
return componentHostnames
return []
def getHostsWithComponent(self, serviceName, componentName, services, hosts):
if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames]
return componentHosts
return []
def getHostWithComponent(self, serviceName, componentName, services, hosts):
componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts)
if (len(componentHosts) > 0):
return componentHosts[0]
return None
def getHostComponentsByCategories(self, hostname, categories, services, hosts):
components = []
if services is not None and hosts is not None:
for service in services["services"]:
components.extend([componentEntry for componentEntry in service["components"]
if componentEntry["StackServiceComponents"]["component_category"] in categories
and hostname in componentEntry["StackServiceComponents"]["hostnames"]])
return components
def get_services_list(self, services):
"""
Returns available services as list
:type services dict
:rtype list
"""
if not services:
return []
return [service["StackServices"]["service_name"] for service in services["services"]]
def get_service_component_meta(self, service, component, services):
"""
Function retrieve service component meta information as dict from services.json
If no service or component found, would be returned empty dict
Return value example:
"advertise_version" : true,
"bulk_commands_display_name" : "",
"bulk_commands_master_component_name" : "",
"cardinality" : "1+",
"component_category" : "CLIENT",
"component_name" : "HBASE_CLIENT",
"custom_commands" : [ ],
"decommission_allowed" : false,
"display_name" : "HBase Client",
"has_bulk_commands_definition" : false,
"is_client" : true,
"is_master" : false,
"reassign_allowed" : false,
"recovery_enabled" : false,
"service_name" : "HBASE",
"stack_name" : "HDP",
"stack_version" : "2.5",
"hostnames" : [ "host1", "host2" ]
:type service str
:type component str
:type services dict
:rtype dict
"""
__stack_services = "StackServices"
__stack_service_components = "StackServiceComponents"
if not services:
return {}
service_meta = [item for item in services["services"] if item[__stack_services]["service_name"] == service]
if len(service_meta) == 0:
return {}
service_meta = service_meta[0]
component_meta = [item for item in service_meta["components"] if item[__stack_service_components]["component_name"] == component]
if len(component_meta) == 0:
return {}
return component_meta[0][__stack_service_components]
#region HDFS
def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations):
validationItems = []
users = self.getHadoopProxyUsers(services, hosts, configurations)
for user_name, user_properties in users.iteritems():
props = ["hadoop.proxyuser.{0}.hosts".format(user_name)]
if "propertyGroups" in user_properties:
props.append("hadoop.proxyuser.{0}.groups".format(user_name))
for prop in props:
validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)})
return validationItems
def getHadoopProxyUsers(self, services, hosts, configurations):
"""
Gets Hadoop Proxy User recommendations based on the configuration that is provided by
getServiceHadoopProxyUsersConfigurationDict.
See getServiceHadoopProxyUsersConfigurationDict
"""
servicesList = self.get_services_list(services)
users = {}
for serviceName, serviceUserComponents in self.getServiceHadoopProxyUsersConfigurationDict().iteritems():
users.update(self._getHadoopProxyUsersForService(serviceName, serviceUserComponents, services, hosts, configurations))
return users
def getServiceHadoopProxyUsersConfigurationDict(self):
"""
Returns a map that is used by 'getHadoopProxyUsers' to determine service
user properties and related components and get proxyuser recommendations.
This method can be overridden in stackadvisors for the further stacks to
add additional services or change the previous logic.
Example of the map format:
{
"serviceName": [
("configTypeName1", "userPropertyName1", {"propertyHosts": "*", "propertyGroups": "exact string value"})
("configTypeName2", "userPropertyName2", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"], "propertyGroups": "*"}),
("configTypeName3", "userPropertyName3", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"]}, filterFunction)
],
"serviceName2": [
...
}
If the third element of a tuple is map that maps proxy property to it's value.
The key could be either 'propertyHosts' or 'propertyGroups'. (Both are optional)
If the map value is a string, then this string will be used for the proxyuser
value (e.g. 'hadoop.proxyuser.{user}.hosts' = '*').
Otherwise map value should be alist or a tuple with component names.
All hosts with the provided components will be added
to the property (e.g. 'hadoop.proxyuser.{user}.hosts' = 'host1,host2,host3')
The forth element of the tuple is optional and if it's provided,
it should be a function that takes two arguments: services and hosts.
If it returns False, proxyusers for the tuple will not be added.
"""
ALL_WILDCARD = "*"
HOSTS_PROPERTY = "propertyHosts"
GROUPS_PROPERTY = "propertyGroups"
return {
"HDFS": [("hadoop-env", "hdfs_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
"OOZIE": [("oozie-env", "oozie_user", {HOSTS_PROPERTY: ["OOZIE_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
"HIVE": [("hive-env", "hive_user", {HOSTS_PROPERTY: ["HIVE_SERVER", "HIVE_SERVER_INTERACTIVE"], GROUPS_PROPERTY: ALL_WILDCARD}),
("hive-env", "webhcat_user", {HOSTS_PROPERTY: ["WEBHCAT_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
"YARN": [("yarn-env", "yarn_user", {HOSTS_PROPERTY: ["RESOURCEMANAGER"]}, lambda services, hosts: len(self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts)) > 1)],
"FALCON": [("falcon-env", "falcon_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
"SPARK": [("livy-env", "livy_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
"SPARK2": [("livy2-env", "livy2_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})]
}
def _getHadoopProxyUsersForService(self, serviceName, serviceUserComponents, services, hosts, configurations):
self.logger.info("Calculating Hadoop Proxy User recommendations for {0} service.".format(serviceName))
servicesList = self.get_services_list(services)
resultUsers = {}
if serviceName in servicesList:
usersComponents = {}
for values in serviceUserComponents:
# Filter, if 4th argument is present in the tuple
filterFunction = values[3:]
if filterFunction and not filterFunction[0](services, hosts):
continue
userNameConfig, userNameProperty, hostSelectorMap = values[:3]
user = get_from_dict(services, ("configurations", userNameConfig, "properties", userNameProperty), None)
if user:
usersComponents[user] = (userNameConfig, userNameProperty, hostSelectorMap)
for user, (userNameConfig, userNameProperty, hostSelectorMap) in usersComponents.iteritems():
proxyUsers = {"config": userNameConfig, "propertyName": userNameProperty}
for proxyPropertyName, hostSelector in hostSelectorMap.iteritems():
componentHostNamesString = hostSelector if isinstance(hostSelector, basestring) else '*'
if isinstance(hostSelector, (list, tuple)):
_, componentHostNames = self.get_data_for_proxyuser(user, services, configurations) # preserve old values
for component in hostSelector:
componentHosts = self.getHostsWithComponent(serviceName, component, services, hosts)
if componentHosts is not None:
for componentHost in componentHosts:
componentHostName = componentHost["Hosts"]["host_name"]
componentHostNames.add(componentHostName)
componentHostNamesString = ",".join(sorted(componentHostNames))
self.logger.info("Host List for [service='{0}'; user='{1}'; components='{2}']: {3}".format(serviceName, user, ','.join(hostSelector), componentHostNamesString))
if not proxyPropertyName in proxyUsers:
proxyUsers[proxyPropertyName] = componentHostNamesString
if not user in resultUsers:
resultUsers[user] = proxyUsers
return resultUsers
def recommendHadoopProxyUsers(self, configurations, services, hosts):
servicesList = self.get_services_list(services)
if 'forced-configurations' not in services:
services["forced-configurations"] = []
putCoreSiteProperty = self.putProperty(configurations, "core-site", services)
putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site")
users = self.getHadoopProxyUsers(services, hosts, configurations)
# Force dependencies for HIVE
if "HIVE" in servicesList:
hive_user = get_from_dict(services, ("configurations", "hive-env", "properties", "hive_user"), default_value=None)
if hive_user and get_from_dict(users, (hive_user, "propertyHosts"), default_value=None):
services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(hive_user)})
for user_name, user_properties in users.iteritems():
# Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users
self.put_proxyuser_value(user_name, user_properties["propertyHosts"], services=services, configurations=configurations, put_function=putCoreSiteProperty)
self.logger.info("Updated hadoop.proxyuser.{0}.hosts as : {1}".format(user_name, user_properties["propertyHosts"]))
if "propertyGroups" in user_properties:
self.put_proxyuser_value(user_name, user_properties["propertyGroups"], is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
# Remove old properties if user was renamed
userOldValue = self.getOldValue(services, user_properties["config"], user_properties["propertyName"])
if userOldValue is not None and userOldValue != user_name:
putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true')
services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)})
services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)})
if "propertyGroups" in user_properties:
putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true')
services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)})
services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)})
self.recommendAmbariProxyUsersForHDFS(services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute)
def recommendAmbariProxyUsersForHDFS(self, services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute):
if "HDFS" in servicesList:
ambari_user = self.getAmbariUser(services)
ambariHostName = socket.getfqdn()
self.put_proxyuser_value(ambari_user, ambariHostName, services=services, configurations=configurations, put_function=putCoreSiteProperty)
self.put_proxyuser_value(ambari_user, "*", is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
old_ambari_user = self.getOldAmbariUser(services)
if old_ambari_user is not None:
putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
def getAmbariUser(self, services):
ambari_user = services['ambari-server-properties']['ambari-server.user']
if "cluster-env" in services["configurations"] \
and "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"] \
and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \
and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true":
ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
ambari_user = ambari_user.split('@')[0]
return ambari_user
def getOldAmbariUser(self, services):
ambari_user = None
if "cluster-env" in services["configurations"]:
if "security_enabled" in services["configurations"]["cluster-env"]["properties"] \
and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true":
ambari_user = services['ambari-server-properties']['ambari-server.user']
elif "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"]:
ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
ambari_user = ambari_user.split('@')[0]
return ambari_user
#endregion
#region Generic
PROXYUSER_SPECIAL_RE = [r"\$\{(?:([\w\-\.]+)/)?([\w\-\.]+)(?:\s*\|\s*(.+?))?\}"]
@classmethod
def preserve_special_values(cls, value):
"""
Replace matches of PROXYUSER_SPECIAL_RE with random strings.
:param value: input string
:return: result string and dictionary that contains mapping random string to original value
"""
def gen_random_str():
return ''.join(random.choice(string.digits + string.ascii_letters) for _ in range(20))
result = value
replacements_dict = {}
for regexp in cls.PROXYUSER_SPECIAL_RE:
for match in re.finditer(regexp, value):
matched_string = match.string[match.start():match.end()]
rand_str = gen_random_str()
result = result.replace(matched_string, rand_str)
replacements_dict[rand_str] = matched_string
return result, replacements_dict
@staticmethod
def restore_special_values(data, replacement_dict):
"""
Replace random strings in data set to their original values using replacement_dict.
:param data:
:param replacement_dict:
:return:
"""
for replacement, original in replacement_dict.iteritems():
data.remove(replacement)
data.add(original)
def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None):
is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups)
result_value = "*"
result_values_set = self.merge_proxyusers_values(current_value, value)
if len(result_values_set) > 0:
result_value = ",".join(sorted([val for val in result_values_set if val]))
if is_groups:
property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
else:
property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
put_function(property_name, result_value)
def get_data_for_proxyuser(self, user_name, services, configurations, groups=False):
"""
Returns values of proxyuser properties for given user. Properties can be
hadoop.proxyuser.username.groups or hadoop.proxyuser.username.hosts
:param user_name:
:param services:
:param configurations:
:param groups: if true, will return values for group property, not hosts
:return: tuple (wildcard_value, set[values]), where wildcard_value indicates if property value was *
"""
if "core-site" in services["configurations"]:
coreSite = services["configurations"]["core-site"]['properties']
else:
coreSite = {}
if groups:
property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
else:
property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
if property_name in coreSite:
property_value = coreSite[property_name]
if property_value == "*":
return True, set()
else:
property_value, replacement_map = self.preserve_special_values(property_value)
result_values = set([v.strip() for v in property_value.split(",")])
if "core-site" in configurations:
if property_name in configurations["core-site"]['properties']:
additional_value, additional_replacement_map = self.preserve_special_values(
configurations["core-site"]['properties'][property_name]
)
replacement_map.update(additional_replacement_map)
result_values = result_values.union([v.strip() for v in additional_value.split(",")])
self.restore_special_values(result_values, replacement_map)
return False, result_values
return False, set()
def merge_proxyusers_values(self, first, second):
result = set()
def append(data):
if isinstance(data, str) or isinstance(data, unicode):
if data != "*":
result.update(data.split(","))
else:
result.update(data)
append(first)
append(second)
return result
def getOldValue(self, services, configType, propertyName):
if services:
if 'changed-configurations' in services.keys():
changedConfigs = services["changed-configurations"]
for changedConfig in changedConfigs:
if changedConfig["type"] == configType and changedConfig["name"]== propertyName and "old_value" in changedConfig:
return changedConfig["old_value"]
return None
@classmethod
def isSecurePort(cls, port):
"""
Returns True if port is root-owned at *nix systems
"""
if port is not None:
return port < 1024
else:
return False
@classmethod
def getPort(cls, address):
"""
Extracts port from the address like 0.0.0.0:1019
"""
if address is None:
return None
m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
if m is not None:
return int(m.group(2))
else:
return None
#endregion
#region Validators
def validateXmxValue(self, properties, recommendedDefaults, propertyName):
if not propertyName in properties:
return self.getErrorItem("Value should be set")
value = properties[propertyName]
defaultValue = recommendedDefaults[propertyName]
if defaultValue is None:
return self.getErrorItem("Config's default value can't be null or undefined")
if not self.checkXmxValueFormat(value) and self.checkXmxValueFormat(defaultValue):
# Xmx is in the default-value but not the value, should be an error
return self.getErrorItem('Invalid value format')
if not self.checkXmxValueFormat(defaultValue):
# if default value does not contain Xmx, then there is no point in validating existing value
return None
valueInt = self.formatXmxSizeToBytes(self.getXmxSize(value))
defaultValueXmx = self.getXmxSize(defaultValue)
defaultValueInt = self.formatXmxSizeToBytes(defaultValueXmx)
if valueInt < defaultValueInt:
return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx)
return None
def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName):
if propertyName not in recommendedDefaults:
# If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the
# "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it.
return None
if not propertyName in properties:
return self.getErrorItem("Value should be set")
value = self.to_number(properties[propertyName])
if value is None:
return self.getErrorItem("Value should be integer")
defaultValue = self.to_number(recommendedDefaults[propertyName])
if defaultValue is None:
return None
if value < defaultValue:
return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue))
return None
def validatorGreaterThenDefaultValue(self, properties, recommendedDefaults, propertyName):
if propertyName not in recommendedDefaults:
# If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the
# "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it.
return None
if not propertyName in properties:
return self.getErrorItem("Value should be set")
value = self.to_number(properties[propertyName])
if value is None:
return self.getErrorItem("Value should be integer")
defaultValue = self.to_number(recommendedDefaults[propertyName])
if defaultValue is None:
return None
if value > defaultValue:
return self.getWarnItem("Value is greater than the recommended default of {0}".format(defaultValue))
return None
def validatorEqualsPropertyItem(self, properties1, propertyName1,
properties2, propertyName2,
emptyAllowed=False):
if not propertyName1 in properties1:
return self.getErrorItem("Value should be set for %s" % propertyName1)
if not propertyName2 in properties2:
return self.getErrorItem("Value should be set for %s" % propertyName2)
value1 = properties1.get(propertyName1)
if value1 is None and not emptyAllowed:
return self.getErrorItem("Empty value for %s" % propertyName1)
value2 = properties2.get(propertyName2)
if value2 is None and not emptyAllowed:
return self.getErrorItem("Empty value for %s" % propertyName2)
if value1 != value2:
return self.getWarnItem("It is recommended to set equal values "
"for properties {0} and {1}".format(propertyName1, propertyName2))
return None
def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults,
propertyName):
if not propertyName in properties:
return self.getErrorItem("Value should be set for %s" % propertyName)
value = properties.get(propertyName)
if not propertyName in recommendedDefaults:
return self.getErrorItem("Value should be recommended for %s" % propertyName)
recommendedValue = recommendedDefaults.get(propertyName)
if value != recommendedValue:
return self.getWarnItem("It is recommended to set value {0} "
"for property {1}".format(recommendedValue, propertyName))
return None
def validatorNotEmpty(self, properties, propertyName):
if not propertyName in properties:
return self.getErrorItem("Value should be set for {0}".format(propertyName))
value = properties.get(propertyName)
if not value:
return self.getWarnItem("Empty value for {0}".format(propertyName))
return None
def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo):
if not propertyName in properties:
return self.getErrorItem("Value should be set")
dir = properties[propertyName]
if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName):
return None
dir = re.sub("^file://", "", dir, count=1)
mountPoints = []
for mountPoint in hostInfo["disk_info"]:
mountPoints.append(mountPoint["mountpoint"])
mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints)
if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint:
return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName))
return None
def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace):
if not propertyName in properties:
return self.getErrorItem("Value should be set")
dir = properties[propertyName]
if not dir.startswith("file://"):
return None
dir = re.sub("^file://", "", dir, count=1)
if not dir:
return self.getErrorItem("Value has wrong format")
mountPoints = {}
for mountPoint in hostInfo["disk_info"]:
mountPoints[mountPoint["mountpoint"]] = self.to_number(mountPoint["available"])
mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints.keys())
if not mountPoints:
return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"])
if mountPoint is None:
return self.getErrorItem("No mount point in directory %s. Mount points: %s" % (dir, ', '.join(mountPoints.keys())))
if mountPoints[mountPoint] < reqiuredDiskSpace:
msg = "Ambari Metrics disk space requirements not met. \n" \
"Recommended disk space for partition {0} is {1}G"
return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb
return None
@classmethod
def is_valid_host_port_authority(cls, target):
has_scheme = "://" in target
if not has_scheme:
target = "dummyscheme://"+target
try:
result = urlparse(target)
if result.hostname is not None and result.port is not None:
return True
except ValueError:
pass
return False
#endregion
#region YARN and MAPREDUCE
def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services):
if propertyName not in properties:
return None
capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services)
leaf_queue_names = self.getAllYarnLeafQueues(capacity_scheduler_properties)
queue_name = properties[propertyName]
if len(leaf_queue_names) == 0:
return None
elif queue_name not in leaf_queue_names:
return self.getErrorItem("Queue does not exist or correspond to an existing YARN leaf queue")
return None
def recommendYarnQueue(self, services, catalog_name=None, queue_property=None):
old_queue_name = None
if services and 'configurations' in services:
configurations = services["configurations"]
if catalog_name in configurations and queue_property in configurations[catalog_name]["properties"]:
old_queue_name = configurations[catalog_name]["properties"][queue_property]
capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services)
leaf_queues = sorted(self.getAllYarnLeafQueues(capacity_scheduler_properties))
if leaf_queues and (old_queue_name is None or old_queue_name not in leaf_queues):
return leaf_queues.pop()
elif old_queue_name and old_queue_name in leaf_queues:
return None
return "default"
def isConfigPropertiesChanged(self, services, config_type, config_names, all_exists=True):
"""
Checks for the presence of passed-in configuration properties in a given config, if they are changed.
Reads from services["changed-configurations"].
:argument services: Configuration information for the cluster
:argument config_type: Type of the configuration
:argument config_names: Set of configuration properties to be checked if they are changed.
:argument all_exists: If True: returns True only if all properties mentioned in 'config_names_set' we found
in services["changed-configurations"].
Otherwise, returns False.
If False: return True, if any of the properties mentioned in config_names_set we found in
services["changed-configurations"].
Otherwise, returns False.
:type services: dict
:type config_type: str
:type config_names: list|set
:type all_exists: bool
"""
changedConfigs = services["changed-configurations"]
changed_config_names_set = set([changedConfig['name'] for changedConfig in changedConfigs if changedConfig['type'] == config_type])
config_names_set = set(config_names)
configs_intersection = changed_config_names_set & config_names_set
if all_exists and configs_intersection == config_names_set:
return True
elif not all_exists and len(configs_intersection) > 0:
return True
return False
def getCapacitySchedulerProperties(self, services):
"""
Returns the dictionary of configs for 'capacity-scheduler'.
"""
capacity_scheduler_properties = dict()
received_as_key_value_pair = True
if "capacity-scheduler" in services['configurations']:
if "capacity-scheduler" in services['configurations']["capacity-scheduler"]["properties"]:
cap_sched_props_as_str = services['configurations']["capacity-scheduler"]["properties"]["capacity-scheduler"]
if cap_sched_props_as_str:
cap_sched_props_as_str = str(cap_sched_props_as_str).split('\n')
if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != 'null':
# Received confgs as one "\n" separated string
for property in cap_sched_props_as_str:
key, sep, value = property.partition("=")
capacity_scheduler_properties[key] = value
self.logger.info("'capacity-scheduler' configs is passed-in as a single '\\n' separated string. "
"count(services['configurations']['capacity-scheduler']['properties']['capacity-scheduler']) = "
"{0}".format(len(capacity_scheduler_properties)))
received_as_key_value_pair = False
else:
self.logger.info("Passed-in services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'] is 'null'.")
else:
self.logger.info("'capacity-scheduler' configs not passed-in as single '\\n' string in "
"services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'].")
if not capacity_scheduler_properties:
# Received configs as a dictionary (Generally on 1st invocation).
capacity_scheduler_properties = services['configurations']["capacity-scheduler"]["properties"]
self.logger.info("'capacity-scheduler' configs is passed-in as a dictionary. "
"count(services['configurations']['capacity-scheduler']['properties']) = {0}".format(len(capacity_scheduler_properties)))
else:
self.logger.error("Couldn't retrieve 'capacity-scheduler' from services.")
self.logger.info("Retrieved 'capacity-scheduler' received as dictionary : '{0}'. configs : {1}" \
.format(received_as_key_value_pair, capacity_scheduler_properties.items()))
return capacity_scheduler_properties, received_as_key_value_pair
def getAllYarnLeafQueues(self, capacitySchedulerProperties):
"""
Gets all YARN leaf queues.
"""
config_list = capacitySchedulerProperties.keys()
yarn_queues = None
leafQueueNames = set()
if 'yarn.scheduler.capacity.root.queues' in config_list:
yarn_queues = capacitySchedulerProperties.get('yarn.scheduler.capacity.root.queues')
if yarn_queues:
toProcessQueues = yarn_queues.split(",")
while len(toProcessQueues) > 0:
queue = toProcessQueues.pop()
queueKey = "yarn.scheduler.capacity.root." + queue + ".queues"
if queueKey in capacitySchedulerProperties:
# If parent queue, add children
subQueues = capacitySchedulerProperties[queueKey].split(",")
for subQueue in subQueues:
toProcessQueues.append(queue + "." + subQueue)
else:
# Leaf queues
# We only take the leaf queue name instead of the complete path, as leaf queue names are unique in YARN.
# Eg: If YARN queues are like :
# (1). 'yarn.scheduler.capacity.root.a1.b1.c1.d1',
# (2). 'yarn.scheduler.capacity.root.a1.b1.c2',
# (3). 'yarn.scheduler.capacity.root.default,
# Added leaf queues names are as : d1, c2 and default for the 3 leaf queues.
leafQueuePathSplits = queue.split(".")
if leafQueuePathSplits > 0:
leafQueueName = leafQueuePathSplits[-1]
leafQueueNames.add(leafQueueName)
return leafQueueNames
#endregion
@classmethod
def getMountPointForDir(cls, dir, mountPoints):
"""
:param dir: Directory to check, even if it doesn't exist.
:return: Returns the closest mount point as a string for the directory.
if the "dir" variable is None, will return None.
If the directory does not exist, will return "/".
"""
bestMountFound = None
if dir:
dir = re.sub("^file://", "", dir, count=1).strip().lower()
# If the path is "/hadoop/hdfs/data", then possible matches for mounts could be
# "/", "/hadoop/hdfs", and "/hadoop/hdfs/data".
# So take the one with the greatest number of segments.
for mountPoint in mountPoints:
# Ensure that the mount path and the dir path ends with "/"
# The mount point "/hadoop" should not match with the path "/hadoop1"
if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")):
if bestMountFound is None:
bestMountFound = mountPoint
elif os.path.join(bestMountFound, "").count(os.path.sep) < os.path.join(mountPoint, "").count(os.path.sep):
bestMountFound = mountPoint
return bestMountFound
def validateMinMemorySetting(self, properties, defaultValue, propertyName):
if not propertyName in properties:
return self.getErrorItem("Value should be set")
if defaultValue is None:
return self.getErrorItem("Config's default value can't be null or undefined")
value = properties[propertyName]
if value is None:
return self.getErrorItem("Value can't be null or undefined")
try:
valueInt = self.to_number(value)
# TODO: generify for other use cases
defaultValueInt = int(str(defaultValue).strip())
if valueInt < defaultValueInt:
return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue))
except:
return None
return None
@classmethod
def checkXmxValueFormat(cls, value):
p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?')
matches = p.findall(value)
return len(matches) == 1
@classmethod
def getXmxSize(cls, value):
p = re.compile("-Xmx(\d+)(.?)")
result = p.findall(value)[0]
if len(result) > 1:
# result[1] - is a space or size formatter (b|k|m|g etc)
return result[0] + result[1].lower()
return result[0]
@classmethod
def formatXmxSizeToBytes(cls, value):
value = value.lower()
if len(value) == 0:
return 0
modifier = value[-1]
if modifier == ' ' or modifier in "0123456789":
modifier = 'b'
m = {
modifier == 'b': 1,
modifier == 'k': 1024,
modifier == 'm': 1024 * 1024,
modifier == 'g': 1024 * 1024 * 1024,
modifier == 't': 1024 * 1024 * 1024 * 1024,
modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024
}[1]
return cls.to_number(value) * m
@classmethod
def to_number(cls, s):
try:
return int(re.sub("\D", "", s))
except ValueError:
return None
def get_service_and_component_lists(self, services):
serviceList = []
componentList = []
if services:
for service in services:
serviceList.append(service["StackServices"]["service_name"])
if service["components"]:
for component in service["components"]:
componentList.append(component["StackServiceComponents"]["component_name"])
return serviceList, componentList