| #!/usr/bin/env ambari-python-wrap |
| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| import os |
| import re |
| import fnmatch |
| import socket |
| |
| class HDP23StackAdvisor(HDP22StackAdvisor): |
| |
| def createComponentLayoutRecommendations(self, services, hosts): |
| parentComponentLayoutRecommendations = super(HDP23StackAdvisor, self).createComponentLayoutRecommendations(services, hosts) |
| |
| hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] |
| hostGroups = parentComponentLayoutRecommendations["blueprint"]["host_groups"] |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| componentsList = [component for service in services["services"] for component in service["components"]] |
| |
| if "HAWQ" in servicesList: |
| # remove HAWQSTANDBY on a single node |
| if len(hostsList) == 1: |
| components = parentComponentLayoutRecommendations["blueprint"]["host_groups"][0]["components"] |
| components = [component for component in components if component["name"] != 'HAWQSTANDBY'] |
| parentComponentLayoutRecommendations["blueprint"]["host_groups"][0]["components"] = components |
| |
| # co-locate HAWQSEGMENT with DATANODE, if no hosts have been allocated for HAWQSEGMENT |
| hawqSegment = [component for component in componentsList if component["StackServiceComponents"]["component_name"] == "HAWQSEGMENT"][0] |
| if not self.isComponentHostsPopulated(hawqSegment): |
| for host_group in hostGroups: |
| if {"name": "DATANODE"} in host_group["components"] and {"name": "HAWQSEGMENT"} not in host_group["components"]: |
| host_group["components"].append({"name": "HAWQSEGMENT"}) |
| if {"name": "DATANODE"} not in host_group["components"] and {"name": "HAWQSEGMENT"} in host_group["components"]: |
| host_group["components"].remove({"name": "HAWQSEGMENT"}) |
| |
| if "PXF" in servicesList: |
| # co-locate PXF with NAMENODE and DATANODE, if no hosts have been allocated for PXF |
| pxf = [component for component in componentsList if component["StackServiceComponents"]["component_name"] == "PXF"][0] |
| if not self.isComponentHostsPopulated(pxf): |
| for host_group in hostGroups: |
| if ({"name": "NAMENODE"} in host_group["components"] or {"name": "DATANODE"} in host_group["components"]) \ |
| and {"name": "PXF"} not in host_group["components"]: |
| host_group["components"].append({"name": "PXF"}) |
| if ({"name": "NAMENODE"} not in host_group["components"] and {"name": "DATANODE"} not in host_group["components"]) \ |
| and {"name": "PXF"} in host_group["components"]: |
| host_group["components"].remove({"name": "PXF"}) |
| |
| return parentComponentLayoutRecommendations |
| |
| def getComponentLayoutValidations(self, services, hosts): |
| parentItems = super(HDP23StackAdvisor, self).getComponentLayoutValidations(services, hosts) |
| |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| componentsListList = [service["components"] for service in services["services"]] |
| componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist] |
| childItems = [] |
| |
| if "HAWQ" in servicesList: |
| hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] |
| hostsCount = len(hostsList) |
| |
| hawqMasterHosts = self.__getHosts(componentsList, "HAWQMASTER") |
| hawqStandbyHosts = self.__getHosts(componentsList, "HAWQSTANDBY") |
| hawqSegmentHosts = self.__getHosts(componentsList, "HAWQSEGMENT") |
| datanodeHosts = self.__getHosts(componentsList, "DATANODE") |
| |
| # Generate WARNING if any HAWQSEGMENT is not colocated with a DATANODE |
| mismatchHosts = sorted(set(hawqSegmentHosts).symmetric_difference(set(datanodeHosts))) |
| if len(mismatchHosts) > 0: |
| hostsString = ', '.join(mismatchHosts) |
| message = "HAWQ Segment must be installed on all DataNodes. " \ |
| "The following {0} host(s) do not satisfy the colocation recommendation: {1}".format(len(mismatchHosts), hostsString) |
| childItems.append( { "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'HAWQSEGMENT' } ) |
| |
| # single node case is not analyzed because HAWQ Standby Master will not be present in single node topology due to logic in createComponentLayoutRecommendations() |
| if len(hawqMasterHosts) == 1 and len(hawqStandbyHosts) == 1 and hawqMasterHosts == hawqStandbyHosts: |
| message = "HAWQ Master and HAWQ Standby Master cannot be deployed on the same host." |
| childItems.append( { "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'HAWQSTANDBY', "host": hawqStandbyHosts[0] } ) |
| |
| if len(hawqMasterHosts) == 1 and hostsCount > 1 and self.isLocalHost(hawqMasterHosts[0]): |
| message = "The default Postgres port (5432) on the Ambari Server conflicts with the default HAWQ Masters port. " \ |
| "If you are using port 5432 for Postgres, you must either deploy the HAWQ Master on a different host " \ |
| "or configure a different port for the HAWQ Masters in the HAWQ Configuration page." |
| childItems.append( { "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'HAWQMASTER', "host": hawqMasterHosts[0] } ) |
| |
| if len(hawqStandbyHosts) == 1 and hostsCount > 1 and self.isLocalHost(hawqStandbyHosts[0]): |
| message = "The default Postgres port (5432) on the Ambari Server conflicts with the default HAWQ Masters port. " \ |
| "If you are using port 5432 for Postgres, you must either deploy the HAWQ Standby Master on a different host " \ |
| "or configure a different port for the HAWQ Masters in the HAWQ Configuration page." |
| childItems.append( { "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'HAWQSTANDBY', "host": hawqStandbyHosts[0] } ) |
| |
| if "PXF" in servicesList: |
| pxfHosts = self.__getHosts(componentsList, "PXF") |
| expectedPxfHosts = set(self.__getHosts(componentsList, "NAMENODE") + self.__getHosts(componentsList, "DATANODE")) |
| |
| # Generate WARNING if any PXF is not colocated with NAMENODE or DATANODE |
| mismatchHosts = sorted(expectedPxfHosts.symmetric_difference(set(pxfHosts))) |
| if len(mismatchHosts) > 0: |
| hostsString = ', '.join(mismatchHosts) |
| message = "PXF must be installed on the NameNode, Standby NameNode and all DataNodes. " \ |
| "The following {0} host(s) do not satisfy the colocation recommendation: {1}".format(len(mismatchHosts), hostsString) |
| childItems.append( { "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'PXF' } ) |
| |
| if "SPARK" in servicesList: |
| if "SPARK_THRIFTSERVER" in servicesList: |
| if not "HIVE_SERVER" in servicesList: |
| message = "SPARK_THRIFTSERVER requires HIVE services to be selected." |
| childItems.append( {"type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'SPARK_THRIFTSERVER'} ) |
| |
| hmsHosts = self.__getHosts(componentsList, "HIVE_METASTORE") if "HIVE" in servicesList else [] |
| sparkTsHosts = self.__getHosts(componentsList, "SPARK_THRIFTSERVER") if "SPARK" in servicesList else [] |
| |
| # if Spark Thrift Server is deployed but no Hive Server is deployed |
| if len(sparkTsHosts) > 0 and len(hmsHosts) == 0: |
| message = "SPARK_THRIFTSERVER requires HIVE_METASTORE to be selected/deployed." |
| childItems.append( { "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'SPARK_THRIFTSERVER' } ) |
| |
| parentItems.extend(childItems) |
| return parentItems |
| |
| def __getHosts(self, componentsList, componentName): |
| host_lists = [component["hostnames"] for component in componentsList if |
| component["component_name"] == componentName] |
| if host_lists and len(host_lists) > 0: |
| return host_lists[0] |
| else: |
| return [] |
| |
| def getNotPreferableOnServerComponents(self): |
| parentComponents = super(HDP23StackAdvisor, self).getNotPreferableOnServerComponents() |
| parentComponents.extend(['HAWQMASTER', 'HAWQSTANDBY']) |
| return parentComponents |
| |
| def getComponentLayoutSchemes(self): |
| parentSchemes = super(HDP23StackAdvisor, self).getComponentLayoutSchemes() |
| # key is max number of cluster hosts + 1, value is index in host list where to put the component |
| childSchemes = { |
| 'HAWQMASTER' : {6: 2, 31: 1, "else": 5}, |
| 'HAWQSTANDBY': {6: 1, 31: 2, "else": 3} |
| } |
| parentSchemes.update(childSchemes) |
| return parentSchemes |
| |
| def getServiceConfigurationRecommenderDict(self): |
| parentRecommendConfDict = super(HDP23StackAdvisor, self).getServiceConfigurationRecommenderDict() |
| childRecommendConfDict = { |
| "TEZ": self.recommendTezConfigurations, |
| "HDFS": self.recommendHDFSConfigurations, |
| "YARN": self.recommendYARNConfigurations, |
| "HIVE": self.recommendHIVEConfigurations, |
| "HBASE": self.recommendHBASEConfigurations, |
| "KAFKA": self.recommendKAFKAConfigurations, |
| "RANGER": self.recommendRangerConfigurations, |
| "RANGER_KMS": self.recommendRangerKMSConfigurations, |
| "HAWQ": self.recommendHAWQConfigurations |
| } |
| parentRecommendConfDict.update(childRecommendConfDict) |
| return parentRecommendConfDict |
| |
| def recommendTezConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP23StackAdvisor, self).recommendTezConfigurations(configurations, clusterData, services, hosts) |
| |
| putTezProperty = self.putProperty(configurations, "tez-site") |
| # remove 2gb limit for tez.runtime.io.sort.mb |
| # in HDP 2.3 "tez.runtime.sorter.class" is set by default to PIPELINED, in other case comment calculation code below |
| taskResourceMemory = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory']) |
| taskResourceMemory = min(clusterData['containers'] * clusterData['ramPerContainer'], taskResourceMemory) |
| putTezProperty("tez.runtime.io.sort.mb", int(taskResourceMemory * 0.4)) |
| |
| if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]: |
| if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY": |
| putTezAttribute = self.putPropertyAttribute(configurations, "tez-site") |
| putTezAttribute("tez.runtime.io.sort.mb", "maximum", 2047) |
| pass |
| |
| serverProperties = services["ambari-server-properties"] |
| latest_tez_jar_version = None |
| |
| server_host = socket.getfqdn() |
| server_port = '8080' |
| server_protocol = 'http' |
| views_dir = '/var/lib/ambari-server/resources/views/' |
| |
| if serverProperties: |
| if 'client.api.port' in serverProperties: |
| server_port = serverProperties['client.api.port'] |
| if 'views.dir' in serverProperties: |
| views_dir = serverProperties['views.dir'] |
| if 'api.ssl' in serverProperties: |
| if serverProperties['api.ssl'].lower() == 'true': |
| server_protocol = 'https' |
| |
| views_work_dir = os.path.join(views_dir, 'work') |
| |
| if os.path.exists(views_work_dir) and os.path.isdir(views_work_dir): |
| last_version = '0.0.0' |
| for file in os.listdir(views_work_dir): |
| if fnmatch.fnmatch(file, 'TEZ{*}'): |
| current_version = file.lstrip("TEZ{").rstrip("}") # E.g.: TEZ{0.7.0.2.3.0.0-2154} |
| if self.versionCompare(current_version.replace("-", "."), last_version.replace("-", ".")) >= 0: |
| latest_tez_jar_version = current_version |
| last_version = current_version |
| pass |
| pass |
| pass |
| pass |
| |
| if latest_tez_jar_version: |
| tez_url = '{0}://{1}:{2}/#/main/views/TEZ/{3}/TEZ_CLUSTER_INSTANCE'.format(server_protocol, server_host, server_port, latest_tez_jar_version) |
| putTezProperty("tez.tez-ui.history-url.base", tez_url) |
| pass |
| |
| # TEZ JVM options |
| jvmGCParams = "-XX:+UseParallelGC" |
| if "ambari-server-properties" in services and "java.home" in services["ambari-server-properties"]: |
| # JDK8 needs different parameters |
| match = re.match(".*\/jdk(1\.\d+)[\-\_\.][^/]*$", services["ambari-server-properties"]["java.home"]) |
| if match and len(match.groups()) > 0: |
| # Is version >= 1.8 |
| versionSplits = re.split("\.", match.group(1)) |
| if versionSplits and len(versionSplits) > 1 and int(versionSplits[0]) > 0 and int(versionSplits[1]) > 7: |
| jvmGCParams = "-XX:+UseG1GC -XX:+ResizeTLAB" |
| putTezProperty('tez.am.launch.cmd-opts', "-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA " + jvmGCParams) |
| putTezProperty('tez.task.launch.cmd-opts', "-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA " + jvmGCParams) |
| |
| |
| def recommendHBASEConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP23StackAdvisor, self).recommendHBASEConfigurations(configurations, clusterData, services, hosts) |
| putHbaseSiteProperty = self.putProperty(configurations, "hbase-site", services) |
| putHbaseSitePropertyAttributes = self.putPropertyAttribute(configurations, "hbase-site") |
| putHbaseEnvProperty = self.putProperty(configurations, "hbase-env", services) |
| putHbaseEnvPropertyAttributes = self.putPropertyAttribute(configurations, "hbase-env") |
| |
| # bucket cache for 1.x is configured slightly differently, HBASE-11520 |
| threshold = 23 # 2 Gb is reserved for other offheap memory |
| if (int(clusterData["hbaseRam"]) > threshold): |
| # To enable cache - calculate values |
| regionserver_total_ram = int(clusterData["hbaseRam"]) * 1024 |
| regionserver_heap_size = 20480 |
| regionserver_max_direct_memory_size = regionserver_total_ram - regionserver_heap_size |
| hfile_block_cache_size = '0.4' |
| block_cache_heap = 8192 # int(regionserver_heap_size * hfile_block_cache_size) |
| hbase_regionserver_global_memstore_size = '0.4' |
| reserved_offheap_memory = 2048 |
| bucketcache_offheap_memory = regionserver_max_direct_memory_size - reserved_offheap_memory |
| hbase_bucketcache_size = bucketcache_offheap_memory |
| |
| # Set values in hbase-site |
| putHbaseSiteProperty('hfile.block.cache.size', hfile_block_cache_size) |
| putHbaseSiteProperty('hbase.regionserver.global.memstore.size', hbase_regionserver_global_memstore_size) |
| putHbaseSiteProperty('hbase.bucketcache.ioengine', 'offheap') |
| putHbaseSiteProperty('hbase.bucketcache.size', hbase_bucketcache_size) |
| # 2.2 stack method was called earlier, unset |
| putHbaseSitePropertyAttributes('hbase.bucketcache.percentage.in.combinedcache', 'delete', 'true') |
| |
| # Enable in hbase-env |
| putHbaseEnvProperty('hbase_max_direct_memory_size', regionserver_max_direct_memory_size) |
| putHbaseEnvProperty('hbase_regionserver_heapsize', regionserver_heap_size) |
| else: |
| # Disable |
| putHbaseSitePropertyAttributes('hbase.bucketcache.ioengine', 'delete', 'true') |
| putHbaseSitePropertyAttributes('hbase.bucketcache.size', 'delete', 'true') |
| putHbaseSitePropertyAttributes('hbase.bucketcache.percentage.in.combinedcache', 'delete', 'true') |
| |
| putHbaseEnvPropertyAttributes('hbase_max_direct_memory_size', 'delete', 'true') |
| |
| if 'hbase-env' in services['configurations'] and 'phoenix_sql_enabled' in services['configurations']['hbase-env']['properties'] and \ |
| 'true' == services['configurations']['hbase-env']['properties']['phoenix_sql_enabled'].lower(): |
| putHbaseSiteProperty("hbase.rpc.controllerfactory.class", "org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory") |
| putHbaseSiteProperty("hbase.region.server.rpc.scheduler.factory.class", "org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory") |
| else: |
| putHbaseSitePropertyAttributes('hbase.region.server.rpc.scheduler.factory.class', 'delete', 'true') |
| |
| |
| def recommendHIVEConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP23StackAdvisor, self).recommendHIVEConfigurations(configurations, clusterData, services, hosts) |
| putHiveSiteProperty = self.putProperty(configurations, "hive-site", services) |
| putHiveServerProperty = self.putProperty(configurations, "hiveserver2-site", services) |
| putHiveSitePropertyAttribute = self.putPropertyAttribute(configurations, "hive-site") |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| # hive_security_authorization == 'ranger' |
| if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "ranger": |
| putHiveServerProperty("hive.security.authorization.manager", "org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory") |
| |
| # TEZ JVM options |
| jvmGCParams = "-XX:+UseParallelGC" |
| if "ambari-server-properties" in services and "java.home" in services["ambari-server-properties"]: |
| # JDK8 needs different parameters |
| match = re.match(".*\/jdk(1\.\d+)[\-\_\.][^/]*$", services["ambari-server-properties"]["java.home"]) |
| if match and len(match.groups()) > 0: |
| # Is version >= 1.8 |
| versionSplits = re.split("\.", match.group(1)) |
| if versionSplits and len(versionSplits) > 1 and int(versionSplits[0]) > 0 and int(versionSplits[1]) > 7: |
| jvmGCParams = "-XX:+UseG1GC -XX:+ResizeTLAB" |
| putHiveSiteProperty('hive.tez.java.opts', "-server -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA " + jvmGCParams + " -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps") |
| |
| # if hive using sqla db, then we should add DataNucleus property |
| sqla_db_used = 'hive-env' in services['configurations'] and 'hive_database' in services['configurations']['hive-env']['properties'] and \ |
| services['configurations']['hive-env']['properties']['hive_database'] == 'Existing SQL Anywhere Database' |
| if sqla_db_used: |
| putHiveSiteProperty('datanucleus.rdbms.datastoreAdapterClassName','org.datanucleus.store.rdbms.adapter.SQLAnywhereAdapter') |
| else: |
| putHiveSitePropertyAttribute('datanucleus.rdbms.datastoreAdapterClassName', 'delete', 'true') |
| |
| # atlas |
| hooks_property = "hive.exec.post.hooks" |
| if hooks_property in configurations["hive-site"]["properties"]: |
| hooks_value = configurations["hive-site"]["properties"][hooks_property] |
| else: |
| hooks_value = " " |
| |
| include_atlas = "ATLAS" in servicesList |
| atlas_hook_class = "org.apache.atlas.hive.hook.HiveHook" |
| if include_atlas and atlas_hook_class not in hooks_value: |
| if hooks_value == " ": |
| hooks_value = atlas_hook_class |
| else: |
| hooks_value = hooks_value + "," + atlas_hook_class |
| if not include_atlas and atlas_hook_class in hooks_value: |
| hooks_classes = [] |
| for hook_class in hooks_value.split(","): |
| if hook_class != atlas_hook_class and hook_class != " ": |
| hooks_classes.append(hook_class) |
| if hooks_classes: |
| hooks_value = ",".join(hooks_classes) |
| else: |
| hooks_value = " " |
| putHiveSiteProperty(hooks_property, hooks_value) |
| |
| atlas_server_host_info = self.getHostWithComponent("ATLAS", "ATLAS_SERVER", services, hosts) |
| if include_atlas and atlas_server_host_info: |
| cluster_name = 'default' |
| putHiveSiteProperty('atlas.cluster.name', cluster_name) |
| atlas_rest_host = atlas_server_host_info['Hosts']['host_name'] |
| scheme = "http" |
| metadata_port = "21000" |
| if 'application-properties' in services['configurations']: |
| tls_enabled = services['configurations']['application-properties']['properties']['atlas.enableTLS'] |
| metadata_port = services['configurations']['application-properties']['properties']['atlas.server.http.port'] |
| if tls_enabled.lower() == "true": |
| scheme = "https" |
| metadata_port = services['configurations']['application-properties']['properties']['atlas.server.https.port'] |
| putHiveSiteProperty('atlas.rest.address', '{0}://{1}:{2}'.format(scheme, atlas_rest_host, metadata_port)) |
| else: |
| putHiveSitePropertyAttribute('atlas.cluster.name', 'delete', 'true') |
| putHiveSitePropertyAttribute('atlas.rest.address', 'delete', 'true') |
| |
| def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP23StackAdvisor, self).recommendHDFSConfigurations(configurations, clusterData, services, hosts) |
| |
| putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services) |
| putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") |
| |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if "HAWQ" in servicesList: |
| # Set dfs.allow.truncate to true |
| putHdfsSiteProperty('dfs.allow.truncate', 'true') |
| |
| if ('ranger-hdfs-plugin-properties' in services['configurations']) and ('ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']): |
| rangerPluginEnabled = '' |
| if 'ranger-hdfs-plugin-properties' in configurations and 'ranger-hdfs-plugin-enabled' in configurations['ranger-hdfs-plugin-properties']['properties']: |
| rangerPluginEnabled = configurations['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled'] |
| elif 'ranger-hdfs-plugin-properties' in services['configurations'] and 'ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']: |
| rangerPluginEnabled = services['configurations']['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled'] |
| |
| if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): |
| putHdfsSiteProperty("dfs.namenode.inode.attributes.provider.class",'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer') |
| else: |
| putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true') |
| else: |
| putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true') |
| |
| def recommendKAFKAConfigurations(self, configurations, clusterData, services, hosts): |
| kafka_broker = getServicesSiteProperties(services, "kafka-broker") |
| |
| # kerberos security for kafka is decided from `security.inter.broker.protocol` property value |
| security_enabled = (kafka_broker is not None and 'security.inter.broker.protocol' in kafka_broker |
| and 'SASL' in kafka_broker['security.inter.broker.protocol']) |
| putKafkaBrokerProperty = self.putProperty(configurations, "kafka-broker", services) |
| putKafkaLog4jProperty = self.putProperty(configurations, "kafka-log4j", services) |
| putKafkaBrokerAttributes = self.putPropertyAttribute(configurations, "kafka-broker") |
| |
| #If AMS is part of Services, use the KafkaTimelineMetricsReporter for metric reporting. Default is ''. |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if "AMBARI_METRICS" in servicesList: |
| putKafkaBrokerProperty('kafka.metrics.reporters', 'org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter') |
| |
| if "ranger-env" in services["configurations"] and "ranger-kafka-plugin-properties" in services["configurations"] and \ |
| "ranger-kafka-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| putKafkaRangerPluginProperty = self.putProperty(configurations, "ranger-kafka-plugin-properties", services) |
| rangerEnvKafkaPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"] |
| putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", rangerEnvKafkaPluginProperty) |
| |
| if 'ranger-kafka-plugin-properties' in services['configurations'] and ('ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']): |
| kafkaLog4jRangerLines = [{ |
| "name": "log4j.appender.rangerAppender", |
| "value": "org.apache.log4j.DailyRollingFileAppender" |
| }, |
| { |
| "name": "log4j.appender.rangerAppender.DatePattern", |
| "value": "'.'yyyy-MM-dd-HH" |
| }, |
| { |
| "name": "log4j.appender.rangerAppender.File", |
| "value": "${kafka.logs.dir}/ranger_kafka.log" |
| }, |
| { |
| "name": "log4j.appender.rangerAppender.layout", |
| "value": "org.apache.log4j.PatternLayout" |
| }, |
| { |
| "name": "log4j.appender.rangerAppender.layout.ConversionPattern", |
| "value": "%d{ISO8601} %p [%t] %C{6} (%F:%L) - %m%n" |
| }, |
| { |
| "name": "log4j.logger.org.apache.ranger", |
| "value": "INFO, rangerAppender" |
| }] |
| |
| rangerPluginEnabled='' |
| if 'ranger-kafka-plugin-properties' in configurations and 'ranger-kafka-plugin-enabled' in configurations['ranger-kafka-plugin-properties']['properties']: |
| rangerPluginEnabled = configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'] |
| elif 'ranger-kafka-plugin-properties' in services['configurations'] and 'ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']: |
| rangerPluginEnabled = services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'] |
| |
| if rangerPluginEnabled and rangerPluginEnabled.lower() == "Yes".lower(): |
| # recommend authorizer.class.name |
| putKafkaBrokerProperty("authorizer.class.name", 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer') |
| # change kafka-log4j when ranger plugin is installed |
| |
| if 'kafka-log4j' in services['configurations'] and 'content' in services['configurations']['kafka-log4j']['properties']: |
| kafkaLog4jContent = services['configurations']['kafka-log4j']['properties']['content'] |
| for item in range(len(kafkaLog4jRangerLines)): |
| if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent: |
| kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + '=' + kafkaLog4jRangerLines[item]["value"] |
| putKafkaLog4jProperty("content",kafkaLog4jContent) |
| |
| |
| else: |
| # Kerberized Cluster with Ranger plugin disabled |
| if security_enabled and 'kafka-broker' in services['configurations'] and 'authorizer.class.name' in services['configurations']['kafka-broker']['properties'] and \ |
| services['configurations']['kafka-broker']['properties']['authorizer.class.name'] == 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer': |
| putKafkaBrokerProperty("authorizer.class.name", 'kafka.security.auth.SimpleAclAuthorizer') |
| # Non-kerberos Cluster with Ranger plugin disabled |
| else: |
| putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true') |
| |
| # Non-Kerberos Cluster without Ranger |
| elif not security_enabled: |
| putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true') |
| |
| def recommendRangerKMSConfigurations(self, configurations, clusterData, services, hosts): |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| putRangerKmsDbksProperty = self.putProperty(configurations, "dbks-site", services) |
| putRangerKmsProperty = self.putProperty(configurations, "kms-properties", services) |
| |
| if 'kms-properties' in services['configurations'] and ('DB_FLAVOR' in services['configurations']['kms-properties']['properties']): |
| |
| rangerKmsDbFlavor = services['configurations']["kms-properties"]["properties"]["DB_FLAVOR"] |
| ranger_kms_sql_connector_dict = { |
| 'MYSQL': '/usr/share/java/mysql-connector-java.jar', |
| 'ORACLE': '/usr/share/java/ojdbc6.jar', |
| 'POSTGRES': '/usr/share/java/postgresql.jar', |
| 'MSSQL': '/usr/share/java/sqljdbc4.jar', |
| 'SQLA': '/path_to_driver/sqla-client-jdbc.tar.gz' |
| } |
| |
| rangerKmsSqlConnectorProperty = ranger_kms_sql_connector_dict.get(rangerKmsDbFlavor, ranger_kms_sql_connector_dict['MYSQL']) |
| putRangerKmsProperty('SQL_CONNECTOR_JAR', rangerKmsSqlConnectorProperty) |
| |
| if ('db_host' in services['configurations']['kms-properties']['properties']) and ('db_name' in services['configurations']['kms-properties']['properties']): |
| |
| rangerKmsDbHost = services['configurations']["kms-properties"]["properties"]["db_host"] |
| rangerKmsDbName = services['configurations']["kms-properties"]["properties"]["db_name"] |
| |
| ranger_kms_db_url_dict = { |
| 'MYSQL': {'ranger.ks.jpa.jdbc.driver': 'com.mysql.jdbc.Driver', 'ranger.ks.jpa.jdbc.url': 'jdbc:mysql://' + rangerKmsDbHost + '/' + rangerKmsDbName}, |
| 'ORACLE': {'ranger.ks.jpa.jdbc.driver': 'oracle.jdbc.driver.OracleDriver', 'ranger.ks.jpa.jdbc.url': 'jdbc:oracle:thin:@//' + rangerKmsDbHost}, |
| 'POSTGRES': {'ranger.ks.jpa.jdbc.driver': 'org.postgresql.Driver', 'ranger.ks.jpa.jdbc.url': 'jdbc:postgresql://' + rangerKmsDbHost + '/' + rangerKmsDbName}, |
| 'MSSQL': {'ranger.ks.jpa.jdbc.driver': 'com.microsoft.sqlserver.jdbc.SQLServerDriver', 'ranger.ks.jpa.jdbc.url': 'jdbc:sqlserver://' + rangerKmsDbHost + ';databaseName=' + rangerKmsDbName}, |
| 'SQLA': {'ranger.ks.jpa.jdbc.driver': 'sap.jdbc4.sqlanywhere.IDriver', 'ranger.ks.jpa.jdbc.url': 'jdbc:sqlanywhere:host=' + rangerKmsDbHost + ';database=' + rangerKmsDbName} |
| } |
| |
| rangerKmsDbProperties = ranger_kms_db_url_dict.get(rangerKmsDbFlavor, ranger_kms_db_url_dict['MYSQL']) |
| for key in rangerKmsDbProperties: |
| putRangerKmsDbksProperty(key, rangerKmsDbProperties.get(key)) |
| |
| def recommendRangerConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP23StackAdvisor, self).recommendRangerConfigurations(configurations, clusterData, services, hosts) |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| putRangerAdminProperty = self.putProperty(configurations, "ranger-admin-site", services) |
| putRangerEnvProperty = self.putProperty(configurations, "ranger-env", services) |
| putRangerUgsyncSite = self.putProperty(configurations, "ranger-ugsync-site", services) |
| |
| if 'admin-properties' in services['configurations'] and ('DB_FLAVOR' in services['configurations']['admin-properties']['properties'])\ |
| and ('db_host' in services['configurations']['admin-properties']['properties']) and ('db_name' in services['configurations']['admin-properties']['properties']): |
| |
| rangerDbFlavor = services['configurations']["admin-properties"]["properties"]["DB_FLAVOR"] |
| rangerDbHost = services['configurations']["admin-properties"]["properties"]["db_host"] |
| rangerDbName = services['configurations']["admin-properties"]["properties"]["db_name"] |
| ranger_db_url_dict = { |
| 'MYSQL': {'ranger.jpa.jdbc.driver': 'com.mysql.jdbc.Driver', 'ranger.jpa.jdbc.url': 'jdbc:mysql://' + rangerDbHost + '/' + rangerDbName}, |
| 'ORACLE': {'ranger.jpa.jdbc.driver': 'oracle.jdbc.driver.OracleDriver', 'ranger.jpa.jdbc.url': 'jdbc:oracle:thin:@//' + rangerDbHost + ':1521/' + rangerDbName}, |
| 'POSTGRES': {'ranger.jpa.jdbc.driver': 'org.postgresql.Driver', 'ranger.jpa.jdbc.url': 'jdbc:postgresql://' + rangerDbHost + ':5432/' + rangerDbName}, |
| 'MSSQL': {'ranger.jpa.jdbc.driver': 'com.microsoft.sqlserver.jdbc.SQLServerDriver', 'ranger.jpa.jdbc.url': 'jdbc:sqlserver://' + rangerDbHost + ';databaseName=' + rangerDbName}, |
| 'SQLA': {'ranger.jpa.jdbc.driver': 'sap.jdbc4.sqlanywhere.IDriver', 'ranger.jpa.jdbc.url': 'jdbc:sqlanywhere:host=' + rangerDbHost + ';database=' + rangerDbName} |
| } |
| rangerDbProperties = ranger_db_url_dict.get(rangerDbFlavor, ranger_db_url_dict['MYSQL']) |
| for key in rangerDbProperties: |
| putRangerAdminProperty(key, rangerDbProperties.get(key)) |
| |
| if 'admin-properties' in services['configurations'] and ('DB_FLAVOR' in services['configurations']['admin-properties']['properties']) \ |
| and ('db_host' in services['configurations']['admin-properties']['properties']): |
| |
| rangerDbFlavor = services['configurations']["admin-properties"]["properties"]["DB_FLAVOR"] |
| rangerDbHost = services['configurations']["admin-properties"]["properties"]["db_host"] |
| ranger_db_privelege_url_dict = { |
| 'MYSQL': {'ranger_privelege_user_jdbc_url': 'jdbc:mysql://' + rangerDbHost}, |
| 'ORACLE': {'ranger_privelege_user_jdbc_url': 'jdbc:oracle:thin:@//' + rangerDbHost + ':1521'}, |
| 'POSTGRES': {'ranger_privelege_user_jdbc_url': 'jdbc:postgresql://' + rangerDbHost + ':5432/postgres'}, |
| 'MSSQL': {'ranger_privelege_user_jdbc_url': 'jdbc:sqlserver://' + rangerDbHost + ';'}, |
| 'SQLA': {'ranger_privelege_user_jdbc_url': 'jdbc:sqlanywhere:host=' + rangerDbHost + ';'} |
| } |
| rangerPrivelegeDbProperties = ranger_db_privelege_url_dict.get(rangerDbFlavor, ranger_db_privelege_url_dict['MYSQL']) |
| for key in rangerPrivelegeDbProperties: |
| putRangerEnvProperty(key, rangerPrivelegeDbProperties.get(key)) |
| |
| # Recommend ldap settings based on ambari.properties configuration |
| if 'ambari-server-properties' in services and \ |
| 'ambari.ldap.isConfigured' in services['ambari-server-properties'] and \ |
| services['ambari-server-properties']['ambari.ldap.isConfigured'].lower() == "true": |
| serverProperties = services['ambari-server-properties'] |
| if 'authentication.ldap.baseDn' in serverProperties: |
| putRangerUgsyncSite('ranger.usersync.ldap.searchBase', serverProperties['authentication.ldap.baseDn']) |
| if 'authentication.ldap.groupMembershipAttr' in serverProperties: |
| putRangerUgsyncSite('ranger.usersync.group.memberattributename', serverProperties['authentication.ldap.groupMembershipAttr']) |
| if 'authentication.ldap.groupNamingAttr' in serverProperties: |
| putRangerUgsyncSite('ranger.usersync.group.nameattribute', serverProperties['authentication.ldap.groupNamingAttr']) |
| if 'authentication.ldap.groupObjectClass' in serverProperties: |
| putRangerUgsyncSite('ranger.usersync.group.objectclass', serverProperties['authentication.ldap.groupObjectClass']) |
| if 'authentication.ldap.managerDn' in serverProperties: |
| putRangerUgsyncSite('ranger.usersync.ldap.binddn', serverProperties['authentication.ldap.managerDn']) |
| if 'authentication.ldap.primaryUrl' in serverProperties: |
| ldap_protocol = 'ldap://' |
| if 'authentication.ldap.useSSL' in serverProperties and serverProperties['authentication.ldap.useSSL'] == 'true': |
| ldap_protocol = 'ldaps://' |
| ldapUrl = ldap_protocol + serverProperties['authentication.ldap.primaryUrl'] if serverProperties['authentication.ldap.primaryUrl'] else serverProperties['authentication.ldap.primaryUrl'] |
| putRangerUgsyncSite('ranger.usersync.ldap.url', ldapUrl) |
| if 'authentication.ldap.userObjectClass' in serverProperties: |
| putRangerUgsyncSite('ranger.usersync.ldap.user.objectclass', serverProperties['authentication.ldap.userObjectClass']) |
| if 'authentication.ldap.usernameAttribute' in serverProperties: |
| putRangerUgsyncSite('ranger.usersync.ldap.user.nameattribute', serverProperties['authentication.ldap.usernameAttribute']) |
| |
| |
| # Recommend Ranger Authentication method |
| authMap = { |
| 'org.apache.ranger.unixusersync.process.UnixUserGroupBuilder': 'UNIX', |
| 'org.apache.ranger.ldapusersync.process.LdapUserGroupBuilder': 'LDAP' |
| } |
| |
| if 'ranger-ugsync-site' in services['configurations'] and 'ranger.usersync.source.impl.class' in services['configurations']["ranger-ugsync-site"]["properties"]: |
| rangerUserSyncClass = services['configurations']["ranger-ugsync-site"]["properties"]["ranger.usersync.source.impl.class"] |
| if rangerUserSyncClass in authMap: |
| rangerSqlConnectorProperty = authMap.get(rangerUserSyncClass) |
| putRangerAdminProperty('ranger.authentication.method', rangerSqlConnectorProperty) |
| |
| |
| if 'ranger-env' in services['configurations'] and 'is_solrCloud_enabled' in services['configurations']["ranger-env"]["properties"]: |
| isSolrCloudEnabled = services['configurations']["ranger-env"]["properties"]["is_solrCloud_enabled"] == "true" |
| else: |
| isSolrCloudEnabled = False |
| |
| if isSolrCloudEnabled: |
| zookeeper_host_port = self.getZKHostPortString(services) |
| ranger_audit_zk_port = '' |
| if zookeeper_host_port: |
| ranger_audit_zk_port = '{0}/{1}'.format(zookeeper_host_port, 'ranger_audits') |
| putRangerAdminProperty('ranger.audit.solr.zookeepers', ranger_audit_zk_port) |
| else: |
| putRangerAdminProperty('ranger.audit.solr.zookeepers', 'NONE') |
| |
| # Recommend ranger.audit.solr.zookeepers and xasecure.audit.destination.hdfs.dir |
| include_hdfs = "HDFS" in servicesList |
| if include_hdfs: |
| if 'core-site' in services['configurations'] and ('fs.defaultFS' in services['configurations']['core-site']['properties']): |
| default_fs = services['configurations']['core-site']['properties']['fs.defaultFS'] |
| putRangerEnvProperty('xasecure.audit.destination.hdfs.dir', '{0}/{1}/{2}'.format(default_fs,'ranger','audit')) |
| |
| # Recommend Ranger supported service's audit properties |
| ranger_services = [ |
| {'service_name': 'HDFS', 'audit_file': 'ranger-hdfs-audit'}, |
| {'service_name': 'YARN', 'audit_file': 'ranger-yarn-audit'}, |
| {'service_name': 'HBASE', 'audit_file': 'ranger-hbase-audit'}, |
| {'service_name': 'HIVE', 'audit_file': 'ranger-hive-audit'}, |
| {'service_name': 'KNOX', 'audit_file': 'ranger-knox-audit'}, |
| {'service_name': 'KAFKA', 'audit_file': 'ranger-kafka-audit'}, |
| {'service_name': 'STORM', 'audit_file': 'ranger-storm-audit'} |
| ] |
| |
| for item in range(len(ranger_services)): |
| if ranger_services[item]['service_name'] in servicesList: |
| component_audit_file = ranger_services[item]['audit_file'] |
| if component_audit_file in services["configurations"]: |
| ranger_audit_dict = [ |
| {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.db', 'target_configname': 'xasecure.audit.destination.db'}, |
| {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.hdfs', 'target_configname': 'xasecure.audit.destination.hdfs'}, |
| {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.hdfs.dir', 'target_configname': 'xasecure.audit.destination.hdfs.dir'}, |
| {'filename': 'ranger-env', 'configname': 'xasecure.audit.destination.solr', 'target_configname': 'xasecure.audit.destination.solr'}, |
| {'filename': 'ranger-admin-site', 'configname': 'ranger.audit.solr.urls', 'target_configname': 'xasecure.audit.destination.solr.urls'}, |
| {'filename': 'ranger-admin-site', 'configname': 'ranger.audit.solr.zookeepers', 'target_configname': 'xasecure.audit.destination.solr.zookeepers'} |
| ] |
| putRangerAuditProperty = self.putProperty(configurations, component_audit_file, services) |
| |
| for item in ranger_audit_dict: |
| if item['filename'] in services["configurations"] and item['configname'] in services["configurations"][item['filename']]["properties"]: |
| if item['filename'] in configurations and item['configname'] in configurations[item['filename']]["properties"]: |
| rangerAuditProperty = configurations[item['filename']]["properties"][item['configname']] |
| else: |
| rangerAuditProperty = services["configurations"][item['filename']]["properties"][item['configname']] |
| putRangerAuditProperty(item['target_configname'], rangerAuditProperty) |
| |
| |
| |
| def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP23StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts) |
| putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services) |
| putYarnSitePropertyAttributes = self.putPropertyAttribute(configurations, "yarn-site") |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| |
| if "tez-site" not in services["configurations"]: |
| putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', '') |
| else: |
| putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', 'org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl') |
| |
| if "ranger-env" in services["configurations"] and "ranger-yarn-plugin-properties" in services["configurations"] and \ |
| "ranger-yarn-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| putYarnRangerPluginProperty = self.putProperty(configurations, "ranger-yarn-plugin-properties", services) |
| rangerEnvYarnPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-yarn-plugin-enabled"] |
| putYarnRangerPluginProperty("ranger-yarn-plugin-enabled", rangerEnvYarnPluginProperty) |
| rangerPluginEnabled = '' |
| if 'ranger-yarn-plugin-properties' in configurations and 'ranger-yarn-plugin-enabled' in configurations['ranger-yarn-plugin-properties']['properties']: |
| rangerPluginEnabled = configurations['ranger-yarn-plugin-properties']['properties']['ranger-yarn-plugin-enabled'] |
| elif 'ranger-yarn-plugin-properties' in services['configurations'] and 'ranger-yarn-plugin-enabled' in services['configurations']['ranger-yarn-plugin-properties']['properties']: |
| rangerPluginEnabled = services['configurations']['ranger-yarn-plugin-properties']['properties']['ranger-yarn-plugin-enabled'] |
| |
| if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): |
| putYarnSiteProperty('yarn.acl.enable','true') |
| putYarnSiteProperty('yarn.authorization-provider','org.apache.ranger.authorization.yarn.authorizer.RangerYarnAuthorizer') |
| else: |
| putYarnSitePropertyAttributes('yarn.authorization-provider', 'delete', 'true') |
| |
| if 'RANGER_KMS' in servicesList and 'KERBEROS' in servicesList: |
| if 'yarn-site' in services["configurations"] and 'yarn.resourcemanager.proxy-user-privileges.enabled' in services["configurations"]["yarn-site"]["properties"]: |
| putYarnSiteProperty('yarn.resourcemanager.proxy-user-privileges.enabled', 'false') |
| |
| |
| def isHawqMasterComponentOnAmbariServer(self, services): |
| componentsListList = [service["components"] for service in services["services"]] |
| componentsList = [item for sublist in componentsListList for item in sublist] |
| hawqMasterComponentHosts = [hostname for component in componentsList if component["StackServiceComponents"]["component_name"] in ("HAWQMASTER", "HAWQSTANDBY") for hostname in component["StackServiceComponents"]["hostnames"]] |
| return any([self.isLocalHost(host) for host in hawqMasterComponentHosts]) |
| |
| |
| def recommendHAWQConfigurations(self, configurations, clusterData, services, hosts): |
| if any(x in services["configurations"] for x in ["hawq-site", "hdfs-client"]): |
| componentsListList = [service["components"] for service in services["services"]] |
| componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist] |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| numSegments = len(self.__getHosts(componentsList, "HAWQSEGMENT")) |
| |
| if "hawq-site" in services["configurations"]: |
| hawq_site = services["configurations"]["hawq-site"]["properties"] |
| putHawqSiteProperty = self.putProperty(configurations, "hawq-site", services) |
| |
| # remove master port when master is colocated with Ambari server |
| if self.isHawqMasterComponentOnAmbariServer(services) and "hawq_master_address_port" in hawq_site: |
| putHawqSiteProperty('hawq_master_address_port', '') |
| |
| # update query limits if segments are deployed |
| if numSegments and "default_hash_table_bucket_number" in hawq_site and "hawq_rm_nvseg_perquery_limit" in hawq_site: |
| factor_min = 1 |
| factor_max = 6 |
| limit = int(hawq_site["hawq_rm_nvseg_perquery_limit"]) |
| factor = limit / numSegments |
| # if too many segments or default limit is too low --> stick with the limit |
| if factor < factor_min: |
| buckets = limit |
| # if the limit is large and results in factor > max --> limit factor to max |
| elif factor > factor_max: |
| buckets = factor_max * numSegments |
| else: |
| buckets = factor * numSegments |
| putHawqSiteProperty('default_hash_table_bucket_number', buckets) |
| |
| # update YARN RM urls with the values from yarn-site if YARN is installed |
| if "YARN" in servicesList and "yarn-site" in services["configurations"]: |
| yarn_site = services["configurations"]["yarn-site"]["properties"] |
| for hs_prop, ys_prop in self.getHAWQYARNPropertyMapping().items(): |
| if hs_prop in hawq_site and ys_prop in yarn_site: |
| putHawqSiteProperty(hs_prop, yarn_site[ys_prop]) |
| |
| # set output.replace-datanode-on-failure in HAWQ hdfs-client depending on the cluster size |
| if "hdfs-client" in services["configurations"]: |
| hdfs_client = services["configurations"]["hdfs-client"]["properties"] |
| if "output.replace-datanode-on-failure" in hdfs_client: |
| propertyValue = "true" if numSegments > 3 else "false" |
| putHdfsClientProperty = self.putProperty(configurations, "hdfs-client", services) |
| putHdfsClientProperty("output.replace-datanode-on-failure", propertyValue) |
| |
| |
| def getServiceConfigurationValidators(self): |
| parentValidators = super(HDP23StackAdvisor, self).getServiceConfigurationValidators() |
| childValidators = { |
| "HDFS": {"hdfs-site": self.validateHDFSConfigurations}, |
| "HIVE": {"hiveserver2-site": self.validateHiveServer2Configurations, |
| "hive-site": self.validateHiveConfigurations}, |
| "HBASE": {"hbase-site": self.validateHBASEConfigurations}, |
| "KAKFA": {"kafka-broker": self.validateKAFKAConfigurations}, |
| "YARN": {"yarn-site": self.validateYARNConfigurations}, |
| "HAWQ": {"hawq-site": self.validateHAWQSiteConfigurations, |
| "hdfs-client": self.validateHAWQHdfsClientConfigurations} |
| } |
| self.mergeValidators(parentValidators, childValidators) |
| return parentValidators |
| |
| def validateHDFSConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| super(HDP23StackAdvisor, self).validateHDFSConfigurations(properties, recommendedDefaults, configurations, services, hosts) |
| |
| # We can not access property hadoop.security.authentication from the |
| # other config (core-site). That's why we are using another heuristics here |
| hdfs_site = properties |
| validationItems = [] #Adding Ranger Plugin logic here |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-hdfs-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): |
| if 'dfs.namenode.inode.attributes.provider.class' not in hdfs_site or \ |
| hdfs_site['dfs.namenode.inode.attributes.provider.class'].lower() != 'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer'.lower(): |
| validationItems.append({"config-name": 'dfs.namenode.inode.attributes.provider.class', |
| "item": self.getWarnItem( |
| "dfs.namenode.inode.attributes.provider.class needs to be set to 'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer' if Ranger HDFS Plugin is enabled.")}) |
| |
| # Check if dfs.allow.truncate is true |
| if "HAWQ" in servicesList and \ |
| not ("dfs.allow.truncate" in services["configurations"]["hdfs-site"]["properties"] and \ |
| services["configurations"]["hdfs-site"]["properties"]["dfs.allow.truncate"].lower() == 'true'): |
| validationItems.append({"config-name": "dfs.allow.truncate", |
| "item": self.getWarnItem("HAWQ requires dfs.allow.truncate in hdfs-site.xml set to True.")}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "hdfs-site") |
| |
| |
| def validateHiveConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| parentValidationProblems = super(HDP23StackAdvisor, self).validateHiveConfigurations(properties, recommendedDefaults, configurations, services, hosts) |
| hive_site = properties |
| hive_env_properties = getSiteProperties(configurations, "hive-env") |
| validationItems = [] |
| sqla_db_used = "hive_database" in hive_env_properties and \ |
| hive_env_properties['hive_database'] == 'Existing SQL Anywhere Database' |
| prop_name = "datanucleus.rdbms.datastoreAdapterClassName" |
| prop_value = "org.datanucleus.store.rdbms.adapter.SQLAnywhereAdapter" |
| if sqla_db_used: |
| if not prop_name in hive_site: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Hive using SQL Anywhere db." \ |
| " {0} needs to be added with value {1}".format(prop_name,prop_value))}) |
| elif prop_name in hive_site and hive_site[prop_name] != "org.datanucleus.store.rdbms.adapter.SQLAnywhereAdapter": |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Hive using SQL Anywhere db." \ |
| " {0} needs to be set to {1}".format(prop_name,prop_value))}) |
| |
| configurationValidationProblems = self.toConfigurationValidationProblems(validationItems, "hive-site") |
| configurationValidationProblems.extend(parentValidationProblems) |
| return configurationValidationProblems |
| |
| def validateHiveServer2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| super(HDP23StackAdvisor, self).validateHiveServer2Configurations(properties, recommendedDefaults, configurations, services, hosts) |
| hive_server2 = properties |
| validationItems = [] |
| #Adding Ranger Plugin logic here |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-hive-plugin-properties") |
| hive_env_properties = getSiteProperties(configurations, "hive-env") |
| ranger_plugin_enabled = 'hive_security_authorization' in hive_env_properties and hive_env_properties['hive_security_authorization'].lower() == 'ranger' |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| ##Add stack validations only if Ranger is enabled. |
| if ("RANGER" in servicesList): |
| ##Add stack validations for Ranger plugin enabled. |
| if ranger_plugin_enabled: |
| prop_name = 'hive.security.authorization.manager' |
| prop_val = "org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory" |
| if prop_name in hive_server2 and hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is enabled."\ |
| " {0} under hiveserver2-site needs to be set to {1}".format(prop_name,prop_val))}) |
| prop_name = 'hive.security.authenticator.manager' |
| prop_val = "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator" |
| if prop_name in hive_server2 and hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is enabled."\ |
| " {0} under hiveserver2-site needs to be set to {1}".format(prop_name,prop_val))}) |
| prop_name = 'hive.security.authorization.enabled' |
| prop_val = 'true' |
| if prop_name in hive_server2 and hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is enabled."\ |
| " {0} under hiveserver2-site needs to be set to {1}".format(prop_name, prop_val))}) |
| prop_name = 'hive.conf.restricted.list' |
| prop_vals = 'hive.security.authorization.enabled,hive.security.authorization.manager,hive.security.authenticator.manager'.split(',') |
| current_vals = [] |
| missing_vals = [] |
| if hive_server2 and prop_name in hive_server2: |
| current_vals = hive_server2[prop_name].split(',') |
| current_vals = [x.strip() for x in current_vals] |
| |
| for val in prop_vals: |
| if not val in current_vals: |
| missing_vals.append(val) |
| |
| if missing_vals: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem("If Ranger Hive Plugin is enabled."\ |
| " {0} under hiveserver2-site needs to contain missing value {1}".format(prop_name, ','.join(missing_vals)))}) |
| ##Add stack validations for Ranger plugin disabled. |
| elif not ranger_plugin_enabled: |
| prop_name = 'hive.security.authorization.manager' |
| prop_val = "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory" |
| if prop_name in hive_server2 and hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is disabled."\ |
| " {0} needs to be set to {1}".format(prop_name,prop_val))}) |
| prop_name = 'hive.security.authenticator.manager' |
| prop_val = "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator" |
| if prop_name in hive_server2 and hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is disabled."\ |
| " {0} needs to be set to {1}".format(prop_name,prop_val))}) |
| return self.toConfigurationValidationProblems(validationItems, "hiveserver2-site") |
| |
| def validateHBASEConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| super(HDP23StackAdvisor, self).validateHBASEConfigurations(properties, recommendedDefaults, configurations, services, hosts) |
| hbase_site = properties |
| validationItems = [] |
| |
| #Adding Ranger Plugin logic here |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-hbase-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-hbase-plugin-enabled'] if ranger_plugin_properties else 'No' |
| prop_name = 'hbase.security.authorization' |
| prop_val = "true" |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): |
| if hbase_site[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger HBase Plugin is enabled."\ |
| "{0} needs to be set to {1}".format(prop_name,prop_val))}) |
| prop_name = "hbase.coprocessor.master.classes" |
| prop_val = "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor" |
| exclude_val = "org.apache.hadoop.hbase.security.access.AccessController" |
| if (prop_val in hbase_site[prop_name] and exclude_val not in hbase_site[prop_name]): |
| pass |
| else: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger HBase Plugin is enabled."\ |
| " {0} needs to contain {1} instead of {2}".format(prop_name,prop_val,exclude_val))}) |
| prop_name = "hbase.coprocessor.region.classes" |
| prop_val = "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor" |
| if (prop_val in hbase_site[prop_name] and exclude_val not in hbase_site[prop_name]): |
| pass |
| else: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger HBase Plugin is enabled."\ |
| " {0} needs to contain {1} instead of {2}".format(prop_name,prop_val,exclude_val))}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "hbase-site") |
| |
| def validateKAFKAConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| kafka_broker = properties |
| validationItems = [] |
| |
| #Adding Ranger Plugin logic here |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] |
| prop_name = 'authorizer.class.name' |
| prop_val = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer" |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): |
| if kafka_broker[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Kafka Plugin is enabled."\ |
| "{0} needs to be set to {1}".format(prop_name,prop_val))}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "kafka-broker") |
| |
| def validateYARNConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| yarn_site = properties |
| validationItems = [] |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if 'RANGER_KMS' in servicesList and 'KERBEROS' in servicesList: |
| yarn_resource_proxy_enabled = yarn_site['yarn.resourcemanager.proxy-user-privileges.enabled'] |
| if yarn_resource_proxy_enabled.lower() == 'true': |
| validationItems.append({"config-name": 'yarn.resourcemanager.proxy-user-privileges.enabled', |
| "item": self.getWarnItem("If Ranger KMS service is installed set yarn.resourcemanager.proxy-user-privileges.enabled "\ |
| "property value as false under yarn-site" |
| )}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "yarn-site") |
| |
| |
| def isHawqMasterPortConflict(self, configurations): |
| prop_name = 'hawq_master_address_port' |
| default_ambari_port = 5432 |
| if prop_name in configurations["hawq-site"]["properties"]: |
| portValue = int(configurations["hawq-site"]["properties"][prop_name]) |
| return portValue == default_ambari_port |
| |
| return False |
| |
| |
| def validateIfRootDir(self, properties, validationItems, prop_name, display_name): |
| root_dir = '/' |
| if prop_name in properties and properties[prop_name].strip() == root_dir: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "It is not advisable to have " + display_name + " at " + root_dir +". Consider creating a sub directory for HAWQ")}) |
| |
| |
| def checkForMultipleDirs(self, properties, validationItems, prop_name, display_name): |
| # check for delimiters space, comma, colon and semi-colon |
| if prop_name in properties and len(re.sub(r'[,;:]', ' ', properties[prop_name]).split(' ')) > 1: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getErrorItem( |
| "Multiple directories for " + display_name + " are not allowed.")}) |
| |
| |
| def validateHAWQSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| hawq_site = properties |
| validationItems = [] |
| |
| # 1. Check if HAWQ master/standby port numbers don't conflict with Ambari ports. Both Ambari and HAWQ use postgres DB and 5432 port. |
| if self.isHawqMasterComponentOnAmbariServer(services) and self.isHawqMasterPortConflict(configurations): |
| prop_name = 'hawq_master_address_port' |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "The default Postgres port (5432) on the Ambari Server conflicts with the default HAWQ Masters port. " |
| "If you are using port 5432 for Postgres, you must either deploy the HAWQ Masters on a different host " |
| "or configure a different port for the HAWQ Masters in the HAWQ Configuration page.")}) |
| |
| # 2. Check if any data directories are pointing to root dir '/' |
| directories = { |
| 'hawq_master_directory': 'HAWQ Master directory', |
| 'hawq_master_temp_directory': 'HAWQ Master temp directory', |
| 'hawq_segment_directory': 'HAWQ Segment directory', |
| 'hawq_segment_temp_directory': 'HAWQ Segment temp directory' |
| } |
| for property_name, display_name in directories.iteritems(): |
| self.validateIfRootDir(properties, validationItems, property_name, display_name) |
| |
| # 2.1 Check if any master or segment directories has multiple values |
| directories = { |
| 'hawq_master_directory': 'HAWQ Master directory', |
| 'hawq_segment_directory': 'HAWQ Segment directory' |
| } |
| for property_name, display_name in directories.iteritems(): |
| self.checkForMultipleDirs(properties, validationItems, property_name, display_name) |
| |
| # 3. Check YARN RM address properties |
| YARN = "YARN" |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if YARN in servicesList and "yarn-site" in configurations: |
| yarn_site = getSiteProperties(configurations, "yarn-site") |
| for hs_prop, ys_prop in self.getHAWQYARNPropertyMapping().items(): |
| if hs_prop in hawq_site and ys_prop in yarn_site and hawq_site[hs_prop] != yarn_site[ys_prop]: |
| message = "Expected value: {0} (this property should have the same value as the property {1} in yarn-site)".format(yarn_site[ys_prop], ys_prop) |
| validationItems.append({"config-name": hs_prop, "item": self.getWarnItem(message)}) |
| |
| # 4. Check HAWQ Resource Manager type |
| HAWQ_GLOBAL_RM_TYPE = "hawq_global_rm_type" |
| if YARN not in servicesList and HAWQ_GLOBAL_RM_TYPE in hawq_site and hawq_site[HAWQ_GLOBAL_RM_TYPE].upper() == YARN: |
| message = "{0} must be set to none if YARN service is not installed".format(HAWQ_GLOBAL_RM_TYPE) |
| validationItems.append({"config-name": HAWQ_GLOBAL_RM_TYPE, "item": self.getErrorItem(message)}) |
| |
| # 5. Check query limits |
| if ("default_hash_table_bucket_number" in hawq_site and |
| "hawq_rm_nvseg_perquery_limit" in hawq_site and |
| int(hawq_site["default_hash_table_bucket_number"]) > int(hawq_site["hawq_rm_nvseg_perquery_limit"])): |
| message = "Default buckets for Hash Distributed tables parameter value should not be greater than the value of Virtual Segments Limit per Query (Total) parameter, currently set to {0}.".format(hawq_site["hawq_rm_nvseg_perquery_limit"]) |
| validationItems.append({"config-name": "default_hash_table_bucket_number", "item": self.getErrorItem(message)}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "hawq-site") |
| |
| def validateHAWQHdfsClientConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| hdfs_client = properties |
| validationItems = [] |
| |
| # check HAWQ hdfs-client output.replace-datanode-on-failure property |
| PROP_NAME = "output.replace-datanode-on-failure" |
| if PROP_NAME in hdfs_client: |
| value = hdfs_client[PROP_NAME].upper() |
| componentsListList = [service["components"] for service in services["services"]] |
| componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist] |
| numSegments = len(self.__getHosts(componentsList, "HAWQSEGMENT")) |
| |
| message = None |
| limit = 3 |
| if numSegments > limit and value != 'TRUE': |
| message = "{0} should be set to true (checked) for clusters with more than {1} HAWQ Segments" |
| elif numSegments <= limit and value != 'FALSE': |
| message = "{0} should be set to false (unchecked) for clusters with {1} or less HAWQ Segments" |
| |
| if message: |
| validationItems.append({"config-name": PROP_NAME, "item": self.getWarnItem(message.format(PROP_NAME, str(limit)))}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "hdfs-client") |
| |
| def isComponentUsingCardinalityForLayout(self, componentName): |
| return componentName in ['NFS_GATEWAY', 'PHOENIX_QUERY_SERVER', 'SPARK_THRIFTSERVER'] |
| |
| def getHAWQYARNPropertyMapping(self): |
| return { "hawq_rm_yarn_address": "yarn.resourcemanager.address", "hawq_rm_yarn_scheduler_address": "yarn.resourcemanager.scheduler.address" } |