| #!/usr/bin/env ambari-python-wrap |
| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| |
| import math |
| from math import floor |
| from urlparse import urlparse |
| import os |
| import fnmatch |
| import socket |
| import re |
| import xml.etree.ElementTree as ET |
| |
| |
| class HDP22StackAdvisor(HDP21StackAdvisor): |
| |
| def getServiceConfigurationRecommenderDict(self): |
| parentRecommendConfDict = super(HDP22StackAdvisor, self).getServiceConfigurationRecommenderDict() |
| childRecommendConfDict = { |
| "HDFS": self.recommendHDFSConfigurations, |
| "HIVE": self.recommendHIVEConfigurations, |
| "HBASE": self.recommendHBASEConfigurations, |
| "MAPREDUCE2": self.recommendMapReduce2Configurations, |
| "TEZ": self.recommendTezConfigurations, |
| "AMBARI_METRICS": self.recommendAmsConfigurations, |
| "YARN": self.recommendYARNConfigurations, |
| "STORM": self.recommendStormConfigurations, |
| "KNOX": self.recommendKnoxConfigurations, |
| "RANGER": self.recommendRangerConfigurations, |
| "LOGSEARCH" : self.recommendLogsearchConfigurations, |
| "SPARK": self.recommendSparkConfigurations, |
| } |
| parentRecommendConfDict.update(childRecommendConfDict) |
| return parentRecommendConfDict |
| |
| |
| def recommendSparkConfigurations(self, configurations, clusterData, services, hosts): |
| """ |
| :type configurations dict |
| :type clusterData dict |
| :type services dict |
| :type hosts dict |
| """ |
| putSparkProperty = self.putProperty(configurations, "spark-defaults", services) |
| putSparkProperty("spark.yarn.queue", self.recommendYarnQueue(services)) |
| |
| # add only if spark supports this config |
| if "configurations" in services and "spark-thrift-sparkconf" in services["configurations"]: |
| putSparkThriftSparkConf = self.putProperty(configurations, "spark-thrift-sparkconf", services) |
| putSparkThriftSparkConf("spark.yarn.queue", self.recommendYarnQueue(services)) |
| |
| |
| def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP22StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts) |
| putYarnProperty = self.putProperty(configurations, "yarn-site", services) |
| putYarnProperty('yarn.nodemanager.resource.cpu-vcores', clusterData['cpu']) |
| putYarnProperty('yarn.scheduler.minimum-allocation-vcores', 1) |
| putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) |
| # Property Attributes |
| putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site") |
| nodeManagerHost = self.getHostWithComponent("YARN", "NODEMANAGER", services, hosts) |
| if (nodeManagerHost is not None): |
| cpuPercentageLimit = 0.8 |
| if "yarn.nodemanager.resource.percentage-physical-cpu-limit" in configurations["yarn-site"]["properties"]: |
| cpuPercentageLimit = float(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.percentage-physical-cpu-limit"]) |
| cpuLimit = max(1, int(floor(nodeManagerHost["Hosts"]["cpu_count"] * cpuPercentageLimit))) |
| putYarnProperty('yarn.nodemanager.resource.cpu-vcores', str(cpuLimit)) |
| putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) |
| putYarnPropertyAttribute('yarn.nodemanager.resource.memory-mb', 'maximum', int(nodeManagerHost["Hosts"]["total_mem"] / 1024)) # total_mem in kb |
| putYarnPropertyAttribute('yarn.nodemanager.resource.cpu-vcores', 'maximum', nodeManagerHost["Hosts"]["cpu_count"] * 2) |
| putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) |
| putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) |
| putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) |
| putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) |
| |
| # Above is the default calculated 'maximum' values derived purely from hosts. |
| # However, there are 'maximum' and other attributes that actually change based on the values |
| # of other configs. We need to update those values. |
| if ("yarn-site" in services["configurations"]): |
| if ("yarn.nodemanager.resource.memory-mb" in services["configurations"]["yarn-site"]["properties"]): |
| putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-mb', 'maximum', services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) |
| putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-mb', 'maximum', services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) |
| if ("yarn.nodemanager.resource.cpu-vcores" in services["configurations"]["yarn-site"]["properties"]): |
| putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-vcores', 'maximum', services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) |
| putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-vcores', 'maximum', services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) |
| |
| kerberos_authentication_enabled = self.isSecurityEnabled(services) |
| if kerberos_authentication_enabled: |
| putYarnProperty('yarn.nodemanager.container-executor.class', |
| 'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor') |
| |
| if "yarn-env" in services["configurations"] and "yarn_cgroups_enabled" in services["configurations"]["yarn-env"]["properties"]: |
| yarn_cgroups_enabled = services["configurations"]["yarn-env"]["properties"]["yarn_cgroups_enabled"].lower() == "true" |
| if yarn_cgroups_enabled: |
| putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor') |
| putYarnProperty('yarn.nodemanager.linux-container-executor.group', 'hadoop') |
| putYarnProperty('yarn.nodemanager.linux-container-executor.resources-handler.class', 'org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler') |
| putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.hierarchy', '/yarn') |
| putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount', 'true') |
| putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount-path', '/cgroup') |
| else: |
| if not kerberos_authentication_enabled: |
| putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor') |
| putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.resources-handler.class', 'delete', 'true') |
| putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.hierarchy', 'delete', 'true') |
| putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount', 'delete', 'true') |
| putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount-path', 'delete', 'true') |
| # recommend hadoop.registry.rm.enabled based on SLIDER in services |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if "SLIDER" in servicesList: |
| putYarnProperty('hadoop.registry.rm.enabled', 'true') |
| else: |
| putYarnProperty('hadoop.registry.rm.enabled', 'false') |
| |
| def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP22StackAdvisor, self).recommendHDFSConfigurations(configurations, clusterData, services, hosts) |
| putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services) |
| putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") |
| putHdfsSiteProperty("dfs.datanode.max.transfer.threads", 16384 if clusterData["hBaseInstalled"] else 4096) |
| |
| dataDirsCount = 1 |
| # Use users 'dfs.datanode.data.dir' first |
| if "hdfs-site" in services["configurations"] and "dfs.datanode.data.dir" in services["configurations"]["hdfs-site"]["properties"]: |
| dataDirsCount = len(str(services["configurations"]["hdfs-site"]["properties"]["dfs.datanode.data.dir"]).split(",")) |
| elif "dfs.datanode.data.dir" in configurations["hdfs-site"]["properties"]: |
| dataDirsCount = len(str(configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"]).split(",")) |
| if dataDirsCount <= 2: |
| failedVolumesTolerated = 0 |
| elif dataDirsCount <= 4: |
| failedVolumesTolerated = 1 |
| else: |
| failedVolumesTolerated = 2 |
| putHdfsSiteProperty("dfs.datanode.failed.volumes.tolerated", failedVolumesTolerated) |
| |
| namenodeHosts = self.getHostsWithComponent("HDFS", "NAMENODE", services, hosts) |
| |
| # 25 * # of cores on NameNode |
| nameNodeCores = 4 |
| if namenodeHosts is not None and len(namenodeHosts): |
| nameNodeCores = int(namenodeHosts[0]['Hosts']['cpu_count']) |
| putHdfsSiteProperty("dfs.namenode.handler.count", 25 * nameNodeCores) |
| if 25 * nameNodeCores > 200: |
| putHdfsSitePropertyAttribute("dfs.namenode.handler.count", "maximum", 25 * nameNodeCores) |
| |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if ('ranger-hdfs-plugin-properties' in services['configurations']) and ('ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']): |
| rangerPluginEnabled = services['configurations']['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled'] |
| if ("RANGER" in servicesList) and (rangerPluginEnabled.lower() == 'Yes'.lower()): |
| putHdfsSiteProperty("dfs.permissions.enabled",'true') |
| |
| putHdfsSiteProperty("dfs.namenode.safemode.threshold-pct", "0.999" if len(namenodeHosts) > 1 else "1.000") |
| |
| putHdfsEnvProperty = self.putProperty(configurations, "hadoop-env", services) |
| putHdfsEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hadoop-env") |
| |
| putHdfsEnvProperty('namenode_heapsize', max(int(clusterData['totalAvailableRam'] / 2), 1024)) |
| |
| nn_heapsize_limit = None |
| if (namenodeHosts is not None and len(namenodeHosts) > 0): |
| if len(namenodeHosts) > 1: |
| nn_max_heapsize = min(int(namenodeHosts[0]["Hosts"]["total_mem"]), int(namenodeHosts[1]["Hosts"]["total_mem"])) / 1024 |
| masters_at_host = max(self.getHostComponentsByCategories(namenodeHosts[0]["Hosts"]["host_name"], ["MASTER"], services, hosts), |
| self.getHostComponentsByCategories(namenodeHosts[1]["Hosts"]["host_name"], ["MASTER"], services, hosts)) |
| else: |
| nn_max_heapsize = int(namenodeHosts[0]["Hosts"]["total_mem"] / 1024) # total_mem in kb |
| masters_at_host = self.getHostComponentsByCategories(namenodeHosts[0]["Hosts"]["host_name"], ["MASTER"], services, hosts) |
| |
| putHdfsEnvPropertyAttribute('namenode_heapsize', 'maximum', max(nn_max_heapsize, 1024)) |
| |
| nn_heapsize_limit = nn_max_heapsize |
| nn_heapsize_limit -= clusterData["reservedRam"] |
| if len(masters_at_host) > 1: |
| nn_heapsize_limit = int(nn_heapsize_limit/2) |
| |
| putHdfsEnvProperty('namenode_heapsize', max(nn_heapsize_limit, 1024)) |
| |
| |
| datanodeHosts = self.getHostsWithComponent("HDFS", "DATANODE", services, hosts) |
| if datanodeHosts is not None and len(datanodeHosts) > 0: |
| min_datanode_ram_kb = 1073741824 # 1 TB |
| for datanode in datanodeHosts: |
| ram_kb = datanode['Hosts']['total_mem'] |
| min_datanode_ram_kb = min(min_datanode_ram_kb, ram_kb) |
| |
| datanodeFilesM = len(datanodeHosts)*dataDirsCount/10 # in millions, # of files = # of disks * 100'000 |
| nn_memory_configs = [ |
| {'nn_heap':1024, 'nn_opt':128}, |
| {'nn_heap':3072, 'nn_opt':512}, |
| {'nn_heap':5376, 'nn_opt':768}, |
| {'nn_heap':9984, 'nn_opt':1280}, |
| {'nn_heap':14848, 'nn_opt':2048}, |
| {'nn_heap':19456, 'nn_opt':2560}, |
| {'nn_heap':24320, 'nn_opt':3072}, |
| {'nn_heap':33536, 'nn_opt':4352}, |
| {'nn_heap':47872, 'nn_opt':6144}, |
| {'nn_heap':59648, 'nn_opt':7680}, |
| {'nn_heap':71424, 'nn_opt':8960}, |
| {'nn_heap':94976, 'nn_opt':8960} |
| ] |
| index = { |
| datanodeFilesM < 1 : 0, |
| 1 <= datanodeFilesM < 5 : 1, |
| 5 <= datanodeFilesM < 10 : 2, |
| 10 <= datanodeFilesM < 20 : 3, |
| 20 <= datanodeFilesM < 30 : 4, |
| 30 <= datanodeFilesM < 40 : 5, |
| 40 <= datanodeFilesM < 50 : 6, |
| 50 <= datanodeFilesM < 70 : 7, |
| 70 <= datanodeFilesM < 100 : 8, |
| 100 <= datanodeFilesM < 125 : 9, |
| 125 <= datanodeFilesM < 150 : 10, |
| 150 <= datanodeFilesM : 11 |
| }[1] |
| |
| nn_memory_config = nn_memory_configs[index] |
| |
| #override with new values if applicable |
| if nn_heapsize_limit is not None and nn_memory_config['nn_heap'] <= nn_heapsize_limit: |
| putHdfsEnvProperty('namenode_heapsize', nn_memory_config['nn_heap']) |
| |
| putHdfsEnvPropertyAttribute('dtnode_heapsize', 'maximum', int(min_datanode_ram_kb/1024)) |
| |
| nn_heapsize = int(configurations["hadoop-env"]["properties"]["namenode_heapsize"]) |
| putHdfsEnvProperty('namenode_opt_newsize', max(int(nn_heapsize / 8), 128)) |
| putHdfsEnvProperty('namenode_opt_maxnewsize', max(int(nn_heapsize / 8), 128)) |
| |
| putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") |
| putHdfsSitePropertyAttribute('dfs.datanode.failed.volumes.tolerated', 'maximum', dataDirsCount) |
| |
| keyserverHostsString = None |
| keyserverPortString = None |
| if "hadoop-env" in services["configurations"] and "keyserver_host" in services["configurations"]["hadoop-env"]["properties"] and "keyserver_port" in services["configurations"]["hadoop-env"]["properties"]: |
| keyserverHostsString = services["configurations"]["hadoop-env"]["properties"]["keyserver_host"] |
| keyserverPortString = services["configurations"]["hadoop-env"]["properties"]["keyserver_port"] |
| |
| # Irrespective of what hadoop-env has, if Ranger-KMS is installed, we use its values. |
| rangerKMSServerHosts = self.getHostsWithComponent("RANGER_KMS", "RANGER_KMS_SERVER", services, hosts) |
| if rangerKMSServerHosts is not None and len(rangerKMSServerHosts) > 0: |
| rangerKMSServerHostsArray = [] |
| for rangeKMSServerHost in rangerKMSServerHosts: |
| rangerKMSServerHostsArray.append(rangeKMSServerHost["Hosts"]["host_name"]) |
| keyserverHostsString = ";".join(rangerKMSServerHostsArray) |
| if "kms-env" in services["configurations"] and "kms_port" in services["configurations"]["kms-env"]["properties"]: |
| keyserverPortString = services["configurations"]["kms-env"]["properties"]["kms_port"] |
| |
| if keyserverHostsString is not None and len(keyserverHostsString.strip()) > 0: |
| urlScheme = "http" |
| if "ranger-kms-site" in services["configurations"] and \ |
| "ranger.service.https.attrib.ssl.enabled" in services["configurations"]["ranger-kms-site"]["properties"] and \ |
| services["configurations"]["ranger-kms-site"]["properties"]["ranger.service.https.attrib.ssl.enabled"].lower() == "true": |
| urlScheme = "https" |
| |
| if keyserverPortString is None or len(keyserverPortString.strip()) < 1: |
| keyserverPortString = ":9292" |
| else: |
| keyserverPortString = ":" + keyserverPortString.strip() |
| putCoreSiteProperty = self.putProperty(configurations, "core-site", services) |
| kmsPath = "kms://" + urlScheme + "@" + keyserverHostsString.strip() + keyserverPortString + "/kms" |
| putCoreSiteProperty("hadoop.security.key.provider.path", kmsPath) |
| putHdfsSiteProperty("dfs.encryption.key.provider.uri", kmsPath) |
| |
| if "ranger-env" in services["configurations"] and "ranger-hdfs-plugin-properties" in services["configurations"] and \ |
| "ranger-hdfs-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| putHdfsRangerPluginProperty = self.putProperty(configurations, "ranger-hdfs-plugin-properties", services) |
| rangerEnvHdfsPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hdfs-plugin-enabled"] |
| putHdfsRangerPluginProperty("ranger-hdfs-plugin-enabled", rangerEnvHdfsPluginProperty) |
| |
| putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") |
| putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site") |
| if not "RANGER_KMS" in servicesList: |
| putCoreSitePropertyAttribute('hadoop.security.key.provider.path','delete','true') |
| putHdfsSitePropertyAttribute('dfs.encryption.key.provider.uri','delete','true') |
| |
| def recommendHIVEConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP22StackAdvisor, self).recommendHiveConfigurations(configurations, clusterData, services, hosts) |
| |
| putHiveServerProperty = self.putProperty(configurations, "hiveserver2-site", services) |
| putHiveEnvProperty = self.putProperty(configurations, "hive-env", services) |
| putHiveSiteProperty = self.putProperty(configurations, "hive-site", services) |
| putWebhcatSiteProperty = self.putProperty(configurations, "webhcat-site", services) |
| putHiveSitePropertyAttribute = self.putPropertyAttribute(configurations, "hive-site") |
| putHiveEnvPropertyAttributes = self.putPropertyAttribute(configurations, "hive-env") |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| |
| # Storage |
| putHiveEnvProperty("hive_exec_orc_storage_strategy", "SPEED") |
| putHiveSiteProperty("hive.exec.orc.encoding.strategy", configurations["hive-env"]["properties"]["hive_exec_orc_storage_strategy"]) |
| putHiveSiteProperty("hive.exec.orc.compression.strategy", configurations["hive-env"]["properties"]["hive_exec_orc_storage_strategy"]) |
| |
| putHiveSiteProperty("hive.exec.orc.default.stripe.size", "67108864") |
| putHiveSiteProperty("hive.exec.orc.default.compress", "ZLIB") |
| putHiveSiteProperty("hive.optimize.index.filter", "true") |
| putHiveSiteProperty("hive.optimize.sort.dynamic.partition", "false") |
| |
| # Vectorization |
| putHiveSiteProperty("hive.vectorized.execution.enabled", "true") |
| putHiveSiteProperty("hive.vectorized.execution.reduce.enabled", "false") |
| |
| # Transactions |
| putHiveEnvProperty("hive_txn_acid", "off") |
| if str(configurations["hive-env"]["properties"]["hive_txn_acid"]).lower() == "on": |
| putHiveSiteProperty("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") |
| putHiveSiteProperty("hive.support.concurrency", "true") |
| putHiveSiteProperty("hive.compactor.initiator.on", "true") |
| putHiveSiteProperty("hive.compactor.worker.threads", "1") |
| putHiveSiteProperty("hive.enforce.bucketing", "true") |
| putHiveSiteProperty("hive.exec.dynamic.partition.mode", "nonstrict") |
| else: |
| putHiveSiteProperty("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager") |
| putHiveSiteProperty("hive.support.concurrency", "false") |
| putHiveSiteProperty("hive.compactor.initiator.on", "false") |
| putHiveSiteProperty("hive.compactor.worker.threads", "0") |
| putHiveSiteProperty("hive.enforce.bucketing", "false") |
| putHiveSiteProperty("hive.exec.dynamic.partition.mode", "strict") |
| |
| hiveMetastoreHost = self.getHostWithComponent("HIVE", "HIVE_METASTORE", services, hosts) |
| if hiveMetastoreHost is not None and len(hiveMetastoreHost) > 0: |
| putHiveSiteProperty("hive.metastore.uris", "thrift://" + hiveMetastoreHost["Hosts"]["host_name"] + ":9083") |
| |
| # ATS |
| putHiveEnvProperty("hive_timeline_logging_enabled", "true") |
| |
| hooks_properties = ["hive.exec.pre.hooks", "hive.exec.post.hooks", "hive.exec.failure.hooks"] |
| include_ats_hook = str(configurations["hive-env"]["properties"]["hive_timeline_logging_enabled"]).lower() == "true" |
| |
| ats_hook_class = "org.apache.hadoop.hive.ql.hooks.ATSHook" |
| for hooks_property in hooks_properties: |
| if hooks_property in configurations["hive-site"]["properties"]: |
| hooks_value = configurations["hive-site"]["properties"][hooks_property] |
| else: |
| hooks_value = " " |
| if include_ats_hook and ats_hook_class not in hooks_value: |
| if hooks_value == " ": |
| hooks_value = ats_hook_class |
| else: |
| hooks_value = hooks_value + "," + ats_hook_class |
| if not include_ats_hook and ats_hook_class in hooks_value: |
| hooks_classes = [] |
| for hook_class in hooks_value.split(","): |
| if hook_class != ats_hook_class and hook_class != " ": |
| hooks_classes.append(hook_class) |
| if hooks_classes: |
| hooks_value = ",".join(hooks_classes) |
| else: |
| hooks_value = " " |
| |
| putHiveSiteProperty(hooks_property, hooks_value) |
| |
| # Tez Engine |
| if "TEZ" in servicesList: |
| putHiveSiteProperty("hive.execution.engine", "tez") |
| else: |
| putHiveSiteProperty("hive.execution.engine", "mr") |
| |
| container_size = "512" |
| |
| if not "yarn-site" in configurations: |
| self.recommendYARNConfigurations(configurations, clusterData, services, hosts) |
| #properties below should be always present as they are provided in HDP206 stack advisor at least |
| yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) |
| #duplicate tez task resource calc logic, direct dependency doesn't look good here (in case of Hive without Tez) |
| container_size = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory']) |
| container_size = min(clusterData['containers'] * clusterData['ramPerContainer'], container_size, yarnMaxAllocationSize) |
| |
| putHiveSiteProperty("hive.tez.container.size", min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), container_size)) |
| |
| putHiveSitePropertyAttribute("hive.tez.container.size", "minimum", int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])) |
| putHiveSitePropertyAttribute("hive.tez.container.size", "maximum", int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) |
| |
| if "yarn-site" in services["configurations"]: |
| if "yarn.scheduler.minimum-allocation-mb" in services["configurations"]["yarn-site"]["properties"]: |
| putHiveSitePropertyAttribute("hive.tez.container.size", "minimum", int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])) |
| if "yarn.scheduler.maximum-allocation-mb" in services["configurations"]["yarn-site"]["properties"]: |
| putHiveSitePropertyAttribute("hive.tez.container.size", "maximum", int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) |
| |
| putHiveSiteProperty("hive.prewarm.enabled", "false") |
| putHiveSiteProperty("hive.prewarm.numcontainers", "3") |
| putHiveSiteProperty("hive.tez.auto.reducer.parallelism", "true") |
| putHiveSiteProperty("hive.tez.dynamic.partition.pruning", "true") |
| |
| container_size = configurations["hive-site"]["properties"]["hive.tez.container.size"] |
| container_size_bytes = int(int(container_size)*0.8*1024*1024) # Xmx == 80% of container |
| # Memory |
| putHiveSiteProperty("hive.auto.convert.join.noconditionaltask.size", int(round(container_size_bytes/3))) |
| putHiveSitePropertyAttribute("hive.auto.convert.join.noconditionaltask.size", "maximum", container_size_bytes) |
| putHiveSiteProperty("hive.exec.reducers.bytes.per.reducer", "67108864") |
| |
| # CBO |
| if "hive-site" in services["configurations"] and "hive.cbo.enable" in services["configurations"]["hive-site"]["properties"]: |
| hive_cbo_enable = services["configurations"]["hive-site"]["properties"]["hive.cbo.enable"] |
| putHiveSiteProperty("hive.stats.fetch.partition.stats", hive_cbo_enable) |
| putHiveSiteProperty("hive.stats.fetch.column.stats", hive_cbo_enable) |
| |
| putHiveSiteProperty("hive.compute.query.using.stats", "true") |
| |
| # Interactive Query |
| putHiveSiteProperty("hive.server2.tez.initialize.default.sessions", "false") |
| putHiveSiteProperty("hive.server2.tez.sessions.per.default.queue", "1") |
| putHiveSiteProperty("hive.server2.enable.doAs", "true") |
| |
| yarn_queues = "default" |
| capacitySchedulerProperties = {} |
| if "capacity-scheduler" in services['configurations']: |
| if "capacity-scheduler" in services['configurations']["capacity-scheduler"]["properties"]: |
| properties = str(services['configurations']["capacity-scheduler"]["properties"]["capacity-scheduler"]).split('\n') |
| for property in properties: |
| key,sep,value = property.partition("=") |
| capacitySchedulerProperties[key] = value |
| if "yarn.scheduler.capacity.root.queues" in capacitySchedulerProperties: |
| yarn_queues = str(capacitySchedulerProperties["yarn.scheduler.capacity.root.queues"]) |
| elif "yarn.scheduler.capacity.root.queues" in services['configurations']["capacity-scheduler"]["properties"]: |
| yarn_queues = services['configurations']["capacity-scheduler"]["properties"]["yarn.scheduler.capacity.root.queues"] |
| # Interactive Queues property attributes |
| putHiveServerPropertyAttribute = self.putPropertyAttribute(configurations, "hiveserver2-site") |
| toProcessQueues = yarn_queues.split(",") |
| leafQueueNames = set() # Remove duplicates |
| while len(toProcessQueues) > 0: |
| queue = toProcessQueues.pop() |
| queueKey = "yarn.scheduler.capacity.root." + queue + ".queues" |
| if queueKey in capacitySchedulerProperties: |
| # This is a parent queue - need to add children |
| subQueues = capacitySchedulerProperties[queueKey].split(",") |
| for subQueue in subQueues: |
| toProcessQueues.append(queue + "." + subQueue) |
| else: |
| # This is a leaf queue |
| queueName = queue.split(".")[-1] # Fully qualified queue name does not work, we should use only leaf name |
| leafQueueNames.add(queueName) |
| leafQueues = [{"label": str(queueName) + " queue", "value": queueName} for queueName in leafQueueNames] |
| leafQueues = sorted(leafQueues, key=lambda q:q['value']) |
| putHiveSitePropertyAttribute("hive.server2.tez.default.queues", "entries", leafQueues) |
| putHiveSiteProperty("hive.server2.tez.default.queues", ",".join([leafQueue['value'] for leafQueue in leafQueues])) |
| putWebhcatSiteProperty("templeton.hadoop.queue.name", self.recommendYarnQueue(services)) |
| |
| |
| # Recommend Ranger Hive authorization as per Ranger Hive plugin property |
| if "ranger-env" in services["configurations"] and "hive-env" in services["configurations"] and \ |
| "ranger-hive-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| rangerEnvHivePluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hive-plugin-enabled"] |
| if (rangerEnvHivePluginProperty.lower() == "yes"): |
| putHiveEnvProperty("hive_security_authorization", "RANGER") |
| |
| # Security |
| if ("configurations" not in services) or ("hive-env" not in services["configurations"]) or \ |
| ("properties" not in services["configurations"]["hive-env"]) or \ |
| ("hive_security_authorization" not in services["configurations"]["hive-env"]["properties"]) or \ |
| str(services["configurations"]["hive-env"]["properties"]["hive_security_authorization"]).lower() == "none": |
| putHiveEnvProperty("hive_security_authorization", "None") |
| else: |
| putHiveEnvProperty("hive_security_authorization", services["configurations"]["hive-env"]["properties"]["hive_security_authorization"]) |
| |
| |
| # Recommend Ranger Hive authorization as per Ranger Hive plugin property |
| if "ranger-env" in services["configurations"] and "hive-env" in services["configurations"] and \ |
| "ranger-hive-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| rangerEnvHivePluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hive-plugin-enabled"] |
| rangerEnvHiveAuthProperty = services["configurations"]["hive-env"]["properties"]["hive_security_authorization"] |
| if (rangerEnvHivePluginProperty.lower() == "yes"): |
| putHiveEnvProperty("hive_security_authorization", "Ranger") |
| elif (rangerEnvHiveAuthProperty.lower() == "ranger"): |
| putHiveEnvProperty("hive_security_authorization", "None") |
| |
| # hive_security_authorization == 'none' |
| # this property is unrelated to Kerberos |
| if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "none": |
| putHiveSiteProperty("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory") |
| if ("hive.security.authorization.manager" in configurations["hiveserver2-site"]["properties"]) or \ |
| ("hiveserver2-site" not in services["configurations"]) or \ |
| ("hiveserver2-site" in services["configurations"] and "hive.security.authorization.manager" in services["configurations"]["hiveserver2-site"]["properties"]): |
| putHiveServerPropertyAttribute("hive.security.authorization.manager", "delete", "true") |
| if ("hive.security.authenticator.manager" in configurations["hiveserver2-site"]["properties"]) or \ |
| ("hiveserver2-site" not in services["configurations"]) or \ |
| ("hiveserver2-site" in services["configurations"] and "hive.security.authenticator.manager" in services["configurations"]["hiveserver2-site"]["properties"]): |
| putHiveServerPropertyAttribute("hive.security.authenticator.manager", "delete", "true") |
| if ("hive.conf.restricted.list" in configurations["hiveserver2-site"]["properties"]) or \ |
| ("hiveserver2-site" not in services["configurations"]) or \ |
| ("hiveserver2-site" in services["configurations"] and "hive.conf.restricted.list" in services["configurations"]["hiveserver2-site"]["properties"]): |
| putHiveServerPropertyAttribute("hive.conf.restricted.list", "delete", "true") |
| if "KERBEROS" not in servicesList: # Kerberos security depends on this property |
| putHiveSiteProperty("hive.security.authorization.enabled", "false") |
| else: |
| putHiveSiteProperty("hive.security.authorization.enabled", "true") |
| |
| try: |
| auth_manager_value = str(configurations["hive-env"]["properties"]["hive.security.metastore.authorization.manager"]) |
| except KeyError: |
| auth_manager_value = 'org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider' |
| pass |
| auth_manager_values = auth_manager_value.split(",") |
| sqlstdauth_class = "org.apache.hadoop.hive.ql.security.authorization.MetaStoreAuthzAPIAuthorizerEmbedOnly" |
| |
| putHiveSiteProperty("hive.server2.enable.doAs", "true") |
| |
| # hive_security_authorization == 'sqlstdauth' |
| if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "sqlstdauth": |
| putHiveSiteProperty("hive.server2.enable.doAs", "false") |
| putHiveServerProperty("hive.security.authorization.enabled", "true") |
| putHiveServerProperty("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory") |
| putHiveServerProperty("hive.security.authenticator.manager", "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator") |
| putHiveServerProperty("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role") |
| putHiveSiteProperty("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory") |
| if sqlstdauth_class not in auth_manager_values: |
| auth_manager_values.append(sqlstdauth_class) |
| elif sqlstdauth_class in auth_manager_values: |
| #remove item from csv |
| auth_manager_values = [x for x in auth_manager_values if x != sqlstdauth_class] |
| pass |
| putHiveSiteProperty("hive.security.metastore.authorization.manager", ",".join(auth_manager_values)) |
| |
| # hive_security_authorization == 'ranger' |
| if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "ranger": |
| putHiveSiteProperty("hive.server2.enable.doAs", "false") |
| putHiveServerProperty("hive.security.authorization.enabled", "true") |
| putHiveServerProperty("hive.security.authorization.manager", "com.xasecure.authorization.hive.authorizer.XaSecureHiveAuthorizerFactory") |
| putHiveServerProperty("hive.security.authenticator.manager", "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator") |
| putHiveServerProperty("hive.conf.restricted.list", "hive.security.authorization.enabled,hive.security.authorization.manager,hive.security.authenticator.manager") |
| |
| putHiveSiteProperty("hive.server2.use.SSL", "false") |
| |
| #Hive authentication |
| hive_server2_auth = None |
| if "hive-site" in services["configurations"] and "hive.server2.authentication" in services["configurations"]["hive-site"]["properties"]: |
| hive_server2_auth = str(services["configurations"]["hive-site"]["properties"]["hive.server2.authentication"]).lower() |
| elif "hive.server2.authentication" in configurations["hive-site"]["properties"]: |
| hive_server2_auth = str(configurations["hive-site"]["properties"]["hive.server2.authentication"]).lower() |
| |
| if hive_server2_auth == "ldap": |
| putHiveSiteProperty("hive.server2.authentication.ldap.url", "") |
| else: |
| if ("hive.server2.authentication.ldap.url" in configurations["hive-site"]["properties"]) or \ |
| ("hive-site" not in services["configurations"]) or \ |
| ("hive-site" in services["configurations"] and "hive.server2.authentication.ldap.url" in services["configurations"]["hive-site"]["properties"]): |
| putHiveSitePropertyAttribute("hive.server2.authentication.ldap.url", "delete", "true") |
| |
| if hive_server2_auth == "kerberos": |
| if "hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.keytab" not in services["configurations"]["hive-site"]["properties"]: |
| putHiveSiteProperty("hive.server2.authentication.kerberos.keytab", "") |
| if "hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.principal" not in services["configurations"]["hive-site"]["properties"]: |
| putHiveSiteProperty("hive.server2.authentication.kerberos.principal", "") |
| elif "KERBEROS" not in servicesList: # Since 'hive_server2_auth' cannot be relied on within the default, empty recommendations request |
| if ("hive.server2.authentication.kerberos.keytab" in configurations["hive-site"]["properties"]) or \ |
| ("hive-site" not in services["configurations"]) or \ |
| ("hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.keytab" in services["configurations"]["hive-site"]["properties"]): |
| putHiveSitePropertyAttribute("hive.server2.authentication.kerberos.keytab", "delete", "true") |
| if ("hive.server2.authentication.kerberos.principal" in configurations["hive-site"]["properties"]) or \ |
| ("hive-site" not in services["configurations"]) or \ |
| ("hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.principal" in services["configurations"]["hive-site"]["properties"]): |
| putHiveSitePropertyAttribute("hive.server2.authentication.kerberos.principal", "delete", "true") |
| |
| if hive_server2_auth == "pam": |
| putHiveSiteProperty("hive.server2.authentication.pam.services", "") |
| else: |
| if ("hive.server2.authentication.pam.services" in configurations["hive-site"]["properties"]) or \ |
| ("hive-site" not in services["configurations"]) or \ |
| ("hive-site" in services["configurations"] and "hive.server2.authentication.pam.services" in services["configurations"]["hive-site"]["properties"]): |
| putHiveSitePropertyAttribute("hive.server2.authentication.pam.services", "delete", "true") |
| |
| if hive_server2_auth == "custom": |
| putHiveSiteProperty("hive.server2.custom.authentication.class", "") |
| else: |
| if ("hive.server2.authentication" in configurations["hive-site"]["properties"]) or \ |
| ("hive-site" not in services["configurations"]) or \ |
| ("hive-site" in services["configurations"] and "hive.server2.custom.authentication.class" in services["configurations"]["hive-site"]["properties"]): |
| putHiveSitePropertyAttribute("hive.server2.custom.authentication.class", "delete", "true") |
| |
| # HiveServer, Client, Metastore heapsize |
| hs_heapsize_multiplier = 3.0/8 |
| hm_heapsize_multiplier = 1.0/8 |
| # HiveServer2 and HiveMetastore located on the same host |
| hive_server_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER", services, hosts) |
| hive_client_hosts = self.getHostsWithComponent("HIVE", "HIVE_CLIENT", services, hosts) |
| |
| if hive_server_hosts is not None and len(hive_server_hosts): |
| hs_host_ram = hive_server_hosts[0]["Hosts"]["total_mem"]/1024 |
| putHiveEnvProperty("hive.metastore.heapsize", max(512, int(hs_host_ram*hm_heapsize_multiplier))) |
| putHiveEnvProperty("hive.heapsize", max(512, int(hs_host_ram*hs_heapsize_multiplier))) |
| putHiveEnvPropertyAttributes("hive.metastore.heapsize", "maximum", max(1024, hs_host_ram)) |
| putHiveEnvPropertyAttributes("hive.heapsize", "maximum", max(1024, hs_host_ram)) |
| |
| if hive_client_hosts is not None and len(hive_client_hosts): |
| putHiveEnvProperty("hive.client.heapsize", 1024) |
| putHiveEnvPropertyAttributes("hive.client.heapsize", "maximum", max(1024, int(hive_client_hosts[0]["Hosts"]["total_mem"]/1024))) |
| |
| |
| def recommendHBASEConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP22StackAdvisor, self).recommendHbaseConfigurations(configurations, clusterData, services, hosts) |
| putHbaseEnvPropertyAttributes = self.putPropertyAttribute(configurations, "hbase-env") |
| |
| hmaster_host = self.getHostWithComponent("HBASE", "HBASE_MASTER", services, hosts) |
| if hmaster_host is not None: |
| host_ram = hmaster_host["Hosts"]["total_mem"] |
| putHbaseEnvPropertyAttributes('hbase_master_heapsize', 'maximum', max(1024, int(host_ram/1024))) |
| |
| rs_hosts = self.getHostsWithComponent("HBASE", "HBASE_REGIONSERVER", services, hosts) |
| if rs_hosts is not None and len(rs_hosts) > 0: |
| min_ram = rs_hosts[0]["Hosts"]["total_mem"] |
| for host in rs_hosts: |
| host_ram = host["Hosts"]["total_mem"] |
| min_ram = min(min_ram, host_ram) |
| |
| putHbaseEnvPropertyAttributes('hbase_regionserver_heapsize', 'maximum', max(1024, int(min_ram*0.8/1024))) |
| |
| putHbaseSiteProperty = self.putProperty(configurations, "hbase-site", services) |
| putHbaseSitePropertyAttributes = self.putPropertyAttribute(configurations, "hbase-site") |
| putHbaseSiteProperty("hbase.regionserver.global.memstore.size", '0.4') |
| |
| if 'hbase-env' in services['configurations'] and 'phoenix_sql_enabled' in services['configurations']['hbase-env']['properties'] and \ |
| 'true' == services['configurations']['hbase-env']['properties']['phoenix_sql_enabled'].lower(): |
| putHbaseSiteProperty("hbase.regionserver.wal.codec", 'org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec') |
| putHbaseSiteProperty("phoenix.functions.allowUserDefinedFunctions", 'true') |
| else: |
| putHbaseSiteProperty("hbase.regionserver.wal.codec", 'org.apache.hadoop.hbase.regionserver.wal.WALCellCodec') |
| if ('hbase.rpc.controllerfactory.class' in configurations["hbase-site"]["properties"]) or \ |
| ('hbase-site' in services['configurations'] and 'hbase.rpc.controllerfactory.class' in services['configurations']["hbase-site"]["properties"]): |
| putHbaseSitePropertyAttributes('hbase.rpc.controllerfactory.class', 'delete', 'true') |
| if ('phoenix.functions.allowUserDefinedFunctions' in configurations["hbase-site"]["properties"]) or \ |
| ('hbase-site' in services['configurations'] and 'phoenix.functions.allowUserDefinedFunctions' in services['configurations']["hbase-site"]["properties"]): |
| putHbaseSitePropertyAttributes('phoenix.functions.allowUserDefinedFunctions', 'delete', 'true') |
| |
| if "ranger-env" in services["configurations"] and "ranger-hbase-plugin-properties" in services["configurations"] and \ |
| "ranger-hbase-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| putHbaseRangerPluginProperty = self.putProperty(configurations, "ranger-hbase-plugin-properties", services) |
| rangerEnvHbasePluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hbase-plugin-enabled"] |
| putHbaseRangerPluginProperty("ranger-hbase-plugin-enabled", rangerEnvHbasePluginProperty) |
| if "cluster-env" in services["configurations"] and "smokeuser" in services["configurations"]["cluster-env"]["properties"]: |
| smoke_user = services["configurations"]["cluster-env"]["properties"]["smokeuser"] |
| putHbaseRangerPluginProperty("policy_user", smoke_user) |
| rangerPluginEnabled = '' |
| if 'ranger-hbase-plugin-properties' in configurations and 'ranger-hbase-plugin-enabled' in configurations['ranger-hbase-plugin-properties']['properties']: |
| rangerPluginEnabled = configurations['ranger-hbase-plugin-properties']['properties']['ranger-hbase-plugin-enabled'] |
| elif 'ranger-hbase-plugin-properties' in services['configurations'] and 'ranger-hbase-plugin-enabled' in services['configurations']['ranger-hbase-plugin-properties']['properties']: |
| rangerPluginEnabled = services['configurations']['ranger-hbase-plugin-properties']['properties']['ranger-hbase-plugin-enabled'] |
| |
| if rangerPluginEnabled and rangerPluginEnabled.lower() == 'Yes'.lower(): |
| putHbaseSiteProperty('hbase.security.authorization','true') |
| |
| # Recommend configs for bucket cache |
| threshold = 23 # 2 Gb is reserved for other offheap memory |
| mb = 1024 |
| if (int(clusterData["hbaseRam"]) > threshold): |
| # To enable cache - calculate values |
| regionserver_total_ram = int(clusterData["hbaseRam"]) * mb |
| regionserver_heap_size = 20480 |
| regionserver_max_direct_memory_size = regionserver_total_ram - regionserver_heap_size |
| hfile_block_cache_size = '0.4' |
| block_cache_heap = 8192 # int(regionserver_heap_size * hfile_block_cache_size) |
| hbase_regionserver_global_memstore_size = '0.4' |
| reserved_offheap_memory = 2048 |
| bucketcache_offheap_memory = regionserver_max_direct_memory_size - reserved_offheap_memory |
| hbase_bucketcache_size = bucketcache_offheap_memory |
| hbase_bucketcache_percentage_in_combinedcache = float(bucketcache_offheap_memory) / hbase_bucketcache_size |
| hbase_bucketcache_percentage_in_combinedcache_str = "{0:.4f}".format(math.ceil(hbase_bucketcache_percentage_in_combinedcache * 10000) / 10000.0) |
| |
| # Set values in hbase-site |
| putHbaseSiteProperty('hfile.block.cache.size', hfile_block_cache_size) |
| putHbaseSiteProperty('hbase.regionserver.global.memstore.size', hbase_regionserver_global_memstore_size) |
| putHbaseSiteProperty('hbase.bucketcache.ioengine', 'offheap') |
| putHbaseSiteProperty('hbase.bucketcache.size', hbase_bucketcache_size) |
| putHbaseSiteProperty('hbase.bucketcache.percentage.in.combinedcache', hbase_bucketcache_percentage_in_combinedcache_str) |
| |
| # Enable in hbase-env |
| putHbaseEnvProperty = self.putProperty(configurations, "hbase-env", services) |
| putHbaseEnvProperty('hbase_max_direct_memory_size', regionserver_max_direct_memory_size) |
| putHbaseEnvProperty('hbase_regionserver_heapsize', regionserver_heap_size) |
| else: |
| # Disable |
| if ('hbase.bucketcache.ioengine' in configurations["hbase-site"]["properties"]) or \ |
| ('hbase-site' in services['configurations'] and 'hbase.bucketcache.ioengine' in services['configurations']["hbase-site"]["properties"]): |
| putHbaseSitePropertyAttributes('hbase.bucketcache.ioengine', 'delete', 'true') |
| if ('hbase.bucketcache.size' in configurations["hbase-site"]["properties"]) or \ |
| ('hbase-site' in services['configurations'] and 'hbase.bucketcache.size' in services['configurations']["hbase-site"]["properties"]): |
| putHbaseSitePropertyAttributes('hbase.bucketcache.size', 'delete', 'true') |
| if ('hbase.bucketcache.percentage.in.combinedcache' in configurations["hbase-site"]["properties"]) or \ |
| ('hbase-site' in services['configurations'] and 'hbase.bucketcache.percentage.in.combinedcache' in services['configurations']["hbase-site"]["properties"]): |
| putHbaseSitePropertyAttributes('hbase.bucketcache.percentage.in.combinedcache', 'delete', 'true') |
| if ('hbase_max_direct_memory_size' in configurations["hbase-env"]["properties"]) or \ |
| ('hbase-env' in services['configurations'] and 'hbase_max_direct_memory_size' in services['configurations']["hbase-env"]["properties"]): |
| putHbaseEnvPropertyAttributes('hbase_max_direct_memory_size', 'delete', 'true') |
| |
| # Authorization |
| hbaseCoProcessorConfigs = { |
| 'hbase.coprocessor.region.classes': [], |
| 'hbase.coprocessor.regionserver.classes': [], |
| 'hbase.coprocessor.master.classes': [] |
| } |
| for key in hbaseCoProcessorConfigs: |
| hbase_coprocessor_classes = None |
| if key in configurations["hbase-site"]["properties"]: |
| hbase_coprocessor_classes = configurations["hbase-site"]["properties"][key].strip() |
| elif 'hbase-site' in services['configurations'] and key in services['configurations']["hbase-site"]["properties"]: |
| hbase_coprocessor_classes = services['configurations']["hbase-site"]["properties"][key].strip() |
| if hbase_coprocessor_classes: |
| hbaseCoProcessorConfigs[key] = hbase_coprocessor_classes.split(',') |
| |
| # If configurations has it - it has priority as it is calculated. Then, the service's configurations will be used. |
| hbase_security_authorization = None |
| if 'hbase-site' in configurations and 'hbase.security.authorization' in configurations['hbase-site']['properties']: |
| hbase_security_authorization = configurations['hbase-site']['properties']['hbase.security.authorization'] |
| elif 'hbase-site' in services['configurations'] and 'hbase.security.authorization' in services['configurations']['hbase-site']['properties']: |
| hbase_security_authorization = services['configurations']['hbase-site']['properties']['hbase.security.authorization'] |
| if hbase_security_authorization: |
| if 'true' == hbase_security_authorization.lower(): |
| hbaseCoProcessorConfigs['hbase.coprocessor.master.classes'].append('org.apache.hadoop.hbase.security.access.AccessController') |
| hbaseCoProcessorConfigs['hbase.coprocessor.regionserver.classes'].append('org.apache.hadoop.hbase.security.access.AccessController') |
| # regional classes when hbase authorization is enabled |
| authRegionClasses = ['org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint', 'org.apache.hadoop.hbase.security.access.AccessController'] |
| for item in range(len(authRegionClasses)): |
| hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append(authRegionClasses[item]) |
| else: |
| if 'org.apache.hadoop.hbase.security.access.AccessController' in hbaseCoProcessorConfigs['hbase.coprocessor.region.classes']: |
| hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].remove('org.apache.hadoop.hbase.security.access.AccessController') |
| if 'org.apache.hadoop.hbase.security.access.AccessController' in hbaseCoProcessorConfigs['hbase.coprocessor.master.classes']: |
| hbaseCoProcessorConfigs['hbase.coprocessor.master.classes'].remove('org.apache.hadoop.hbase.security.access.AccessController') |
| |
| hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint") |
| if ('hbase.coprocessor.regionserver.classes' in configurations["hbase-site"]["properties"]) or \ |
| ('hbase-site' in services['configurations'] and 'hbase.coprocessor.regionserver.classes' in services['configurations']["hbase-site"]["properties"]): |
| putHbaseSitePropertyAttributes('hbase.coprocessor.regionserver.classes', 'delete', 'true') |
| else: |
| hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint") |
| if ('hbase.coprocessor.regionserver.classes' in configurations["hbase-site"]["properties"]) or \ |
| ('hbase-site' in services['configurations'] and 'hbase.coprocessor.regionserver.classes' in services['configurations']["hbase-site"]["properties"]): |
| putHbaseSitePropertyAttributes('hbase.coprocessor.regionserver.classes', 'delete', 'true') |
| |
| # Authentication |
| if 'hbase-site' in services['configurations'] and 'hbase.security.authentication' in services['configurations']['hbase-site']['properties']: |
| if 'kerberos' == services['configurations']['hbase-site']['properties']['hbase.security.authentication'].lower(): |
| if 'org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint' not in hbaseCoProcessorConfigs['hbase.coprocessor.region.classes']: |
| hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append('org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint') |
| if 'org.apache.hadoop.hbase.security.token.TokenProvider' not in hbaseCoProcessorConfigs['hbase.coprocessor.region.classes']: |
| hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append('org.apache.hadoop.hbase.security.token.TokenProvider') |
| else: |
| if 'org.apache.hadoop.hbase.security.token.TokenProvider' in hbaseCoProcessorConfigs['hbase.coprocessor.region.classes']: |
| hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].remove('org.apache.hadoop.hbase.security.token.TokenProvider') |
| |
| #Remove duplicates |
| for key in hbaseCoProcessorConfigs: |
| uniqueCoprocessorRegionClassList = [] |
| [uniqueCoprocessorRegionClassList.append(i) |
| for i in hbaseCoProcessorConfigs[key] if |
| not i in uniqueCoprocessorRegionClassList |
| and (i.strip() not in ['{{hbase_coprocessor_region_classes}}', '{{hbase_coprocessor_master_classes}}', '{{hbase_coprocessor_regionserver_classes}}'])] |
| putHbaseSiteProperty(key, ','.join(set(uniqueCoprocessorRegionClassList))) |
| |
| |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| rangerServiceVersion='' |
| if 'RANGER' in servicesList: |
| rangerServiceVersion = [service['StackServices']['service_version'] for service in services["services"] if service['StackServices']['service_name'] == 'RANGER'][0] |
| |
| if rangerServiceVersion and rangerServiceVersion == '0.4.0': |
| rangerClass = 'com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor' |
| else: |
| rangerClass = 'org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor' |
| |
| nonRangerClass = 'org.apache.hadoop.hbase.security.access.AccessController' |
| hbaseClassConfigs = hbaseCoProcessorConfigs.keys() |
| |
| for item in range(len(hbaseClassConfigs)): |
| if 'hbase-site' in services['configurations']: |
| if hbaseClassConfigs[item] in services['configurations']['hbase-site']['properties']: |
| if 'hbase-site' in configurations and hbaseClassConfigs[item] in configurations['hbase-site']['properties']: |
| coprocessorConfig = configurations['hbase-site']['properties'][hbaseClassConfigs[item]] |
| else: |
| coprocessorConfig = services['configurations']['hbase-site']['properties'][hbaseClassConfigs[item]] |
| coprocessorClasses = coprocessorConfig.split(",") |
| coprocessorClasses = filter(None, coprocessorClasses) # Removes empty string elements from array |
| if rangerPluginEnabled and rangerPluginEnabled.lower() == 'Yes'.lower(): |
| if nonRangerClass in coprocessorClasses: |
| coprocessorClasses.remove(nonRangerClass) |
| if not rangerClass in coprocessorClasses: |
| coprocessorClasses.append(rangerClass) |
| putHbaseSiteProperty(hbaseClassConfigs[item], ','.join(coprocessorClasses)) |
| elif rangerPluginEnabled and rangerPluginEnabled.lower() == 'No'.lower(): |
| if rangerClass in coprocessorClasses: |
| coprocessorClasses.remove(rangerClass) |
| if not nonRangerClass in coprocessorClasses: |
| coprocessorClasses.append(nonRangerClass) |
| putHbaseSiteProperty(hbaseClassConfigs[item], ','.join(coprocessorClasses)) |
| elif rangerPluginEnabled and rangerPluginEnabled.lower() == 'Yes'.lower(): |
| putHbaseSiteProperty(hbaseClassConfigs[item], rangerClass) |
| |
| |
| def recommendTezConfigurations(self, configurations, clusterData, services, hosts): |
| if not "yarn-site" in configurations: |
| self.recommendYARNConfigurations(configurations, clusterData, services, hosts) |
| #properties below should be always present as they are provided in HDP206 stack advisor |
| yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) |
| |
| putTezProperty = self.putProperty(configurations, "tez-site", services) |
| putTezProperty("tez.am.resource.memory.mb", min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), int(clusterData['amMemory']) * 2 if int(clusterData['amMemory']) < 3072 else int(clusterData['amMemory']))) |
| |
| taskResourceMemory = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory']) |
| taskResourceMemory = min(clusterData['containers'] * clusterData['ramPerContainer'], taskResourceMemory, yarnMaxAllocationSize) |
| putTezProperty("tez.task.resource.memory.mb", min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), taskResourceMemory)) |
| taskResourceMemory = int(configurations["tez-site"]["properties"]["tez.task.resource.memory.mb"]) |
| putTezProperty("tez.runtime.io.sort.mb", min(int(taskResourceMemory * 0.4), 2047)) |
| putTezProperty("tez.runtime.unordered.output.buffer.size-mb", int(taskResourceMemory * 0.075)) |
| putTezProperty("tez.session.am.dag.submit.timeout.secs", "600") |
| putTezProperty("tez.queue.name", self.recommendYarnQueue(services)) |
| |
| serverProperties = services["ambari-server-properties"] |
| latest_tez_jar_version = None |
| |
| server_host = socket.getfqdn() |
| for host in hosts["items"]: |
| if server_host == host["Hosts"]["host_name"]: |
| server_host = host["Hosts"]["public_host_name"] |
| server_port = '8080' |
| server_protocol = 'http' |
| views_dir = '/var/lib/ambari-server/resources/views/' |
| |
| if serverProperties: |
| if 'client.api.port' in serverProperties: |
| server_port = serverProperties['client.api.port'] |
| if 'views.dir' in serverProperties: |
| views_dir = serverProperties['views.dir'] |
| if 'api.ssl' in serverProperties: |
| if serverProperties['api.ssl'].lower() == 'true': |
| server_protocol = 'https' |
| |
| views_work_dir = os.path.join(views_dir, 'work') |
| |
| if os.path.exists(views_work_dir) and os.path.isdir(views_work_dir): |
| last_version = '0.0.0' |
| for file in os.listdir(views_work_dir): |
| if fnmatch.fnmatch(file, 'TEZ{*}'): |
| current_version = file.lstrip("TEZ{").rstrip("}") # E.g.: TEZ{0.7.0.2.3.0.0-2154} |
| if self.versionCompare(current_version.replace("-", "."), last_version.replace("-", ".")) >= 0: |
| latest_tez_jar_version = current_version |
| last_version = current_version |
| pass |
| pass |
| pass |
| pass |
| |
| if latest_tez_jar_version: |
| tez_url = '{0}://{1}:{2}/#/main/views/TEZ/{3}/TEZ_CLUSTER_INSTANCE'.format(server_protocol, server_host, server_port, latest_tez_jar_version) |
| putTezProperty("tez.tez-ui.history-url.base", tez_url) |
| pass |
| |
| def recommendStormConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP22StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts) |
| putStormSiteProperty = self.putProperty(configurations, "storm-site", services) |
| putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site") |
| storm_site = getServicesSiteProperties(services, "storm-site") |
| security_enabled = (storm_site is not None and "storm.zookeeper.superACL" in storm_site) |
| if "ranger-env" in services["configurations"] and "ranger-storm-plugin-properties" in services["configurations"] and \ |
| "ranger-storm-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| putStormRangerPluginProperty = self.putProperty(configurations, "ranger-storm-plugin-properties", services) |
| rangerEnvStormPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-storm-plugin-enabled"] |
| putStormRangerPluginProperty("ranger-storm-plugin-enabled", rangerEnvStormPluginProperty) |
| |
| rangerPluginEnabled = '' |
| if 'ranger-storm-plugin-properties' in configurations and 'ranger-storm-plugin-enabled' in configurations['ranger-storm-plugin-properties']['properties']: |
| rangerPluginEnabled = configurations['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] |
| elif 'ranger-storm-plugin-properties' in services['configurations'] and 'ranger-storm-plugin-enabled' in services['configurations']['ranger-storm-plugin-properties']['properties']: |
| rangerPluginEnabled = services['configurations']['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] |
| |
| nonRangerClass = 'backtype.storm.security.auth.authorizer.SimpleACLAuthorizer' |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| rangerServiceVersion='' |
| if 'RANGER' in servicesList: |
| rangerServiceVersion = [service['StackServices']['service_version'] for service in services["services"] if service['StackServices']['service_name'] == 'RANGER'][0] |
| |
| if rangerServiceVersion and rangerServiceVersion == '0.4.0': |
| rangerClass = 'com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer' |
| else: |
| rangerClass = 'org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer' |
| # Cluster is kerberized |
| if security_enabled: |
| if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): |
| putStormSiteProperty('nimbus.authorizer',rangerClass) |
| elif (services["configurations"]["storm-site"]["properties"]["nimbus.authorizer"] == rangerClass): |
| putStormSiteProperty('nimbus.authorizer', nonRangerClass) |
| else: |
| putStormSiteAttributes('nimbus.authorizer', 'delete', 'true') |
| |
| def recommendKnoxConfigurations(self, configurations, clusterData, services, hosts): |
| if "ranger-env" in services["configurations"] and "ranger-knox-plugin-properties" in services["configurations"] and \ |
| "ranger-knox-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: |
| putKnoxRangerPluginProperty = self.putProperty(configurations, "ranger-knox-plugin-properties", services) |
| rangerEnvKnoxPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-knox-plugin-enabled"] |
| putKnoxRangerPluginProperty("ranger-knox-plugin-enabled", rangerEnvKnoxPluginProperty) |
| |
| if 'topology' in services["configurations"] and 'content' in services["configurations"]["topology"]["properties"]: |
| putKnoxTopologyContent = self.putProperty(configurations, "topology", services) |
| rangerPluginEnabled = '' |
| if 'ranger-knox-plugin-properties' in configurations and 'ranger-knox-plugin-enabled' in configurations['ranger-knox-plugin-properties']['properties']: |
| rangerPluginEnabled = configurations['ranger-knox-plugin-properties']['properties']['ranger-knox-plugin-enabled'] |
| elif 'ranger-knox-plugin-properties' in services['configurations'] and 'ranger-knox-plugin-enabled' in services['configurations']['ranger-knox-plugin-properties']['properties']: |
| rangerPluginEnabled = services['configurations']['ranger-knox-plugin-properties']['properties']['ranger-knox-plugin-enabled'] |
| |
| # check if authorization provider already added |
| topologyContent = services["configurations"]["topology"]["properties"]["content"] |
| authorizationProviderExists = False |
| authNameChanged = False |
| root = ET.fromstring(topologyContent) |
| if root is not None: |
| gateway = root.find("gateway") |
| if gateway is not None: |
| for provider in gateway.findall('provider'): |
| role = provider.find('role') |
| if role is not None and role.text and role.text.lower() == "authorization": |
| authorizationProviderExists = True |
| |
| name = provider.find('name') |
| if name is not None and name.text == "AclsAuthz" and rangerPluginEnabled \ |
| and rangerPluginEnabled.lower() == "Yes".lower(): |
| newAuthName = "XASecurePDPKnox" |
| authNameChanged = True |
| elif name is not None and (((not rangerPluginEnabled) or rangerPluginEnabled.lower() != "Yes".lower()) \ |
| and name.text == 'XASecurePDPKnox'): |
| newAuthName = "AclsAuthz" |
| authNameChanged = True |
| |
| if authNameChanged: |
| name.text = newAuthName |
| putKnoxTopologyContent('content', ET.tostring(root)) |
| |
| if authorizationProviderExists: |
| break |
| |
| if not authorizationProviderExists: |
| if root is not None: |
| gateway = root.find("gateway") |
| if gateway is not None: |
| provider = ET.SubElement(gateway, 'provider') |
| |
| role = ET.SubElement(provider, 'role') |
| role.text = "authorization" |
| |
| name = ET.SubElement(provider, 'name') |
| if rangerPluginEnabled and rangerPluginEnabled.lower() == "Yes".lower(): |
| name.text = "XASecurePDPKnox" |
| else: |
| name.text = "AclsAuthz" |
| |
| enabled = ET.SubElement(provider, 'enabled') |
| enabled.text = "true" |
| |
| #TODO add pretty format for newly added provider |
| putKnoxTopologyContent('content', ET.tostring(root)) |
| |
| |
| |
| def recommendRangerConfigurations(self, configurations, clusterData, services, hosts): |
| super(HDP22StackAdvisor, self).recommendRangerConfigurations(configurations, clusterData, services, hosts) |
| putRangerEnvProperty = self.putProperty(configurations, "ranger-env") |
| cluster_env = getServicesSiteProperties(services, "cluster-env") |
| security_enabled = cluster_env is not None and "security_enabled" in cluster_env and \ |
| cluster_env["security_enabled"].lower() == "true" |
| if "ranger-env" in configurations and not security_enabled: |
| putRangerEnvProperty("ranger-storm-plugin-enabled", "No") |
| |
| def getServiceConfigurationValidators(self): |
| parentValidators = super(HDP22StackAdvisor, self).getServiceConfigurationValidators() |
| childValidators = { |
| "HDFS": {"hdfs-site": self.validateHDFSConfigurations, |
| "hadoop-env": self.validateHDFSConfigurationsEnv, |
| "ranger-hdfs-plugin-properties": self.validateHDFSRangerPluginConfigurations}, |
| "YARN": {"yarn-env": self.validateYARNEnvConfigurations, |
| "ranger-yarn-plugin-properties": self.validateYARNRangerPluginConfigurations}, |
| "HIVE": {"hiveserver2-site": self.validateHiveServer2Configurations, |
| "hive-site": self.validateHiveConfigurations, |
| "hive-env": self.validateHiveConfigurationsEnv, |
| "webhcat-site": self.validateWebhcatConfigurations}, |
| "HBASE": {"hbase-site": self.validateHBASEConfigurations, |
| "hbase-env": self.validateHBASEEnvConfigurations, |
| "ranger-hbase-plugin-properties": self.validateHBASERangerPluginConfigurations}, |
| "KNOX": {"ranger-knox-plugin-properties": self.validateKnoxRangerPluginConfigurations}, |
| "KAFKA": {"ranger-kafka-plugin-properties": self.validateKafkaRangerPluginConfigurations}, |
| "STORM": {"ranger-storm-plugin-properties": self.validateStormRangerPluginConfigurations}, |
| "MAPREDUCE2": {"mapred-site": self.validateMapReduce2Configurations}, |
| "TEZ": {"tez-site": self.validateTezConfigurations}, |
| "RANGER": {"ranger-env": self.validateRangerConfigurationsEnv}, |
| "SPARK": {"spark-defaults": self.validateSparkDefaults, |
| "spark-thrift-sparkconf": self.validateSparkThriftSparkConf} |
| } |
| self.mergeValidators(parentValidators, childValidators) |
| return parentValidators |
| |
| def recommendLogsearchConfigurations(self, configurations, clusterData, services, hosts): |
| putLogsearchProperty = self.putProperty(configurations, "logsearch-properties", services) |
| logsearchSolrHosts = self.getComponentHostNames(services, "LOGSEARCH", "LOGSEARCH_SOLR") |
| |
| if logsearchSolrHosts is not None and len(logsearchSolrHosts) > 0 \ |
| and "logsearch-properties" in services["configurations"]: |
| recommendedMinShards = len(logsearchSolrHosts) |
| recommendedShards = 2 * len(logsearchSolrHosts) |
| recommendedMaxShards = 3 * len(logsearchSolrHosts) |
| # recommend number of shard |
| putLogsearchAttribute = self.putPropertyAttribute(configurations, "logsearch-properties") |
| putLogsearchAttribute('logsearch.collection.service.logs.numshards', 'minimum', recommendedMinShards) |
| putLogsearchAttribute('logsearch.collection.service.logs.numshards', 'maximum', recommendedMaxShards) |
| putLogsearchProperty("logsearch.collection.service.logs.numshards", recommendedShards) |
| |
| putLogsearchAttribute('logsearch.collection.audit.logs.numshards', 'minimum', recommendedMinShards) |
| putLogsearchAttribute('logsearch.collection.audit.logs.numshards', 'maximum', recommendedMaxShards) |
| putLogsearchProperty("logsearch.collection.audit.logs.numshards", recommendedShards) |
| # recommend replication factor |
| replicationReccomendFloat = math.log(len(logsearchSolrHosts), 5) |
| recommendedReplicationFactor = int(1 + math.floor(replicationReccomendFloat)) |
| putLogsearchProperty("logsearch.collection.service.logs.replication.factor", recommendedReplicationFactor) |
| putLogsearchProperty("logsearch.collection.audit.logs.replication.factor", recommendedReplicationFactor) |
| |
| def validateTezConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ {"config-name": 'tez.am.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.am.resource.memory.mb')}, |
| {"config-name": 'tez.task.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.task.resource.memory.mb')}, |
| {"config-name": 'tez.runtime.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.runtime.io.sort.mb')}, |
| {"config-name": 'tez.runtime.unordered.output.buffer.size-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.runtime.unordered.output.buffer.size-mb')}, |
| {"config-name": 'tez.queue.name', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'tez.queue.name', services)} ] |
| if "tez.tez-ui.history-url.base" in recommendedDefaults: |
| validationItems.append({"config-name": 'tez.tez-ui.history-url.base', "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, 'tez.tez-ui.history-url.base')}) |
| |
| tez_site = properties |
| prop_name1 = 'tez.am.resource.memory.mb' |
| prop_name2 = 'tez.task.resource.memory.mb' |
| yarnSiteProperties = getSiteProperties(configurations, "yarn-site") |
| if yarnSiteProperties: |
| yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]),int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) |
| if int(tez_site[prop_name1]) > yarnMaxAllocationSize: |
| validationItems.append({"config-name": prop_name1, |
| "item": self.getWarnItem( |
| "{0} should be less than YARN max allocation size ({1})".format(prop_name1, yarnMaxAllocationSize))}) |
| if int(tez_site[prop_name2]) > yarnMaxAllocationSize: |
| validationItems.append({"config-name": prop_name2, |
| "item": self.getWarnItem( |
| "{0} should be less than YARN max allocation size ({1})".format(prop_name2, yarnMaxAllocationSize))}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "tez-site") |
| |
| def recommendMapReduce2Configurations(self, configurations, clusterData, services, hosts): |
| self.recommendYARNConfigurations(configurations, clusterData, services, hosts) |
| putMapredProperty = self.putProperty(configurations, "mapred-site", services) |
| nodemanagerMinRam = 1048576 # 1TB in mb |
| if "referenceNodeManagerHost" in clusterData: |
| nodemanagerMinRam = min(clusterData["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam) |
| putMapredProperty('yarn.app.mapreduce.am.resource.mb', configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]) |
| putMapredProperty('yarn.app.mapreduce.am.command-opts', "-Xmx" + str(int(0.8 * int(configurations["mapred-site"]["properties"]["yarn.app.mapreduce.am.resource.mb"]))) + "m" + " -Dhdp.version=${hdp.version}") |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| min_mapreduce_map_memory_mb = 0 |
| min_mapreduce_reduce_memory_mb = 0 |
| min_mapreduce_map_java_opts = 0 |
| if ("PIG" in servicesList) and clusterData["totalAvailableRam"] >= 4096: |
| min_mapreduce_map_memory_mb = 1536 |
| min_mapreduce_reduce_memory_mb = 1536 |
| min_mapreduce_map_java_opts = 1024 |
| putMapredProperty('mapreduce.map.memory.mb', min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), max(min_mapreduce_map_memory_mb, int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])))) |
| putMapredProperty('mapreduce.reduce.memory.mb', min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), max(min_mapreduce_reduce_memory_mb, min(2*int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(nodemanagerMinRam))))) |
| mapredMapXmx = int(0.8*int(configurations["mapred-site"]["properties"]["mapreduce.map.memory.mb"])); |
| putMapredProperty('mapreduce.map.java.opts', "-Xmx" + str(max(min_mapreduce_map_java_opts, mapredMapXmx)) + "m") |
| putMapredProperty('mapreduce.reduce.java.opts', "-Xmx" + str(int(0.8*int(configurations["mapred-site"]["properties"]["mapreduce.reduce.memory.mb"]))) + "m") |
| putMapredProperty('mapreduce.task.io.sort.mb', str(min(int(0.7*mapredMapXmx), 2047))) |
| # Property Attributes |
| putMapredPropertyAttribute = self.putPropertyAttribute(configurations, "mapred-site") |
| yarnMinAllocationSize = int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]) |
| yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) |
| putMapredPropertyAttribute("mapreduce.map.memory.mb", "maximum", yarnMaxAllocationSize) |
| putMapredPropertyAttribute("mapreduce.map.memory.mb", "minimum", yarnMinAllocationSize) |
| putMapredPropertyAttribute("mapreduce.reduce.memory.mb", "maximum", yarnMaxAllocationSize) |
| putMapredPropertyAttribute("mapreduce.reduce.memory.mb", "minimum", yarnMinAllocationSize) |
| putMapredPropertyAttribute("yarn.app.mapreduce.am.resource.mb", "maximum", yarnMaxAllocationSize) |
| putMapredPropertyAttribute("yarn.app.mapreduce.am.resource.mb", "minimum", yarnMinAllocationSize) |
| # Hadoop MR limitation |
| putMapredPropertyAttribute("mapreduce.task.io.sort.mb", "maximum", "2047") |
| putMapredProperty("mapreduce.job.queuename", self.recommendYarnQueue(services)) |
| |
| def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ {"config-name": 'mapreduce.map.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.map.java.opts')}, |
| {"config-name": 'mapreduce.reduce.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.reduce.java.opts')}, |
| {"config-name": 'mapreduce.task.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.task.io.sort.mb')}, |
| {"config-name": 'mapreduce.map.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.map.memory.mb')}, |
| {"config-name": 'mapreduce.reduce.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.reduce.memory.mb')}, |
| {"config-name": 'yarn.app.mapreduce.am.resource.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.resource.mb')}, |
| {"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.command-opts')}, |
| {"config-name": 'mapreduce.job.queuename', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'mapreduce.job.queuename', services)} ] |
| |
| if 'mapreduce.map.java.opts' in properties and \ |
| checkXmxValueFormat(properties['mapreduce.map.java.opts']): |
| mapreduceMapJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.map.java.opts'])) / (1024.0 * 1024) |
| mapreduceMapMemoryMb = to_number(properties['mapreduce.map.memory.mb']) |
| if mapreduceMapJavaOpts > mapreduceMapMemoryMb: |
| validationItems.append({"config-name": 'mapreduce.map.java.opts', "item": self.getWarnItem("mapreduce.map.java.opts Xmx should be less than mapreduce.map.memory.mb ({0})".format(mapreduceMapMemoryMb))}) |
| |
| if 'mapreduce.reduce.java.opts' in properties and \ |
| checkXmxValueFormat(properties['mapreduce.reduce.java.opts']): |
| mapreduceReduceJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.reduce.java.opts'])) / (1024.0 * 1024) |
| mapreduceReduceMemoryMb = to_number(properties['mapreduce.reduce.memory.mb']) |
| if mapreduceReduceJavaOpts > mapreduceReduceMemoryMb: |
| validationItems.append({"config-name": 'mapreduce.reduce.java.opts', "item": self.getWarnItem("mapreduce.reduce.java.opts Xmx should be less than mapreduce.reduce.memory.mb ({0})".format(mapreduceReduceMemoryMb))}) |
| |
| if 'yarn.app.mapreduce.am.command-opts' in properties and \ |
| checkXmxValueFormat(properties['yarn.app.mapreduce.am.command-opts']): |
| yarnAppMapreduceAmCommandOpts = formatXmxSizeToBytes(getXmxSize(properties['yarn.app.mapreduce.am.command-opts'])) / (1024.0 * 1024) |
| yarnAppMapreduceAmResourceMb = to_number(properties['yarn.app.mapreduce.am.resource.mb']) |
| if yarnAppMapreduceAmCommandOpts > yarnAppMapreduceAmResourceMb: |
| validationItems.append({"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.getWarnItem("yarn.app.mapreduce.am.command-opts Xmx should be less than yarn.app.mapreduce.am.resource.mb ({0})".format(yarnAppMapreduceAmResourceMb))}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "mapred-site") |
| |
| def validateHDFSConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')}, |
| {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')}, |
| {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}] |
| return self.toConfigurationValidationProblems(validationItems, "hadoop-env") |
| |
| def validateHDFSRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-hdfs-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' |
| if (ranger_plugin_enabled.lower() == 'yes'): |
| # ranger-hdfs-plugin must be enabled in ranger-env |
| ranger_env = getServicesSiteProperties(services, 'ranger-env') |
| if not ranger_env or not 'ranger-hdfs-plugin-enabled' in ranger_env or \ |
| ranger_env['ranger-hdfs-plugin-enabled'].lower() != 'yes': |
| validationItems.append({"config-name": 'ranger-hdfs-plugin-enabled', |
| "item": self.getWarnItem( |
| "ranger-hdfs-plugin-properties/ranger-hdfs-plugin-enabled must correspond ranger-env/ranger-hdfs-plugin-enabled")}) |
| return self.toConfigurationValidationProblems(validationItems, "ranger-hdfs-plugin-properties") |
| |
| |
| def validateHDFSConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| parentValidationProblems = super(HDP22StackAdvisor, self).validateHDFSConfigurations(properties, recommendedDefaults, configurations, services, hosts) |
| # We can not access property hadoop.security.authentication from the |
| # other config (core-site). That's why we are using another heuristics here |
| hdfs_site = properties |
| core_site = getSiteProperties(configurations, "core-site") |
| |
| dfs_encrypt_data_transfer = 'dfs.encrypt.data.transfer' # Hadoop Wire encryption |
| try: |
| wire_encryption_enabled = hdfs_site[dfs_encrypt_data_transfer] == "true" |
| except KeyError: |
| wire_encryption_enabled = False |
| |
| HTTP_ONLY = 'HTTP_ONLY' |
| HTTPS_ONLY = 'HTTPS_ONLY' |
| HTTP_AND_HTTPS = 'HTTP_AND_HTTPS' |
| |
| VALID_HTTP_POLICY_VALUES = [HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS] |
| VALID_TRANSFER_PROTECTION_VALUES = ['authentication', 'integrity', 'privacy'] |
| |
| validationItems = [] |
| address_properties = [ |
| # "dfs.datanode.address", |
| # "dfs.datanode.http.address", |
| # "dfs.datanode.https.address", |
| # "dfs.datanode.ipc.address", |
| # "dfs.journalnode.http-address", |
| # "dfs.journalnode.https-address", |
| # "dfs.namenode.rpc-address", |
| # "dfs.namenode.secondary.http-address", |
| "dfs.namenode.http-address", |
| "dfs.namenode.https-address", |
| ] |
| #Validating *address properties for correct values |
| |
| for address_property in address_properties: |
| if address_property in hdfs_site: |
| value = hdfs_site[address_property] |
| if not is_valid_host_port_authority(value): |
| validationItems.append({"config-name" : address_property, "item" : |
| self.getErrorItem(address_property + " does not contain a valid host:port authority: " + value)}) |
| |
| #Adding Ranger Plugin logic here |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-hdfs-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): |
| if 'dfs.permissions.enabled' in hdfs_site and \ |
| hdfs_site['dfs.permissions.enabled'] != 'true': |
| validationItems.append({"config-name": 'dfs.permissions.enabled', |
| "item": self.getWarnItem( |
| "dfs.permissions.enabled needs to be set to true if Ranger HDFS Plugin is enabled.")}) |
| |
| if (not wire_encryption_enabled and # If wire encryption is enabled at Hadoop, it disables all our checks |
| 'hadoop.security.authentication' in core_site and |
| core_site['hadoop.security.authentication'] == 'kerberos' and |
| 'hadoop.security.authorization' in core_site and |
| core_site['hadoop.security.authorization'] == 'true'): |
| # security is enabled |
| |
| dfs_http_policy = 'dfs.http.policy' |
| dfs_datanode_address = 'dfs.datanode.address' |
| datanode_http_address = 'dfs.datanode.http.address' |
| datanode_https_address = 'dfs.datanode.https.address' |
| data_transfer_protection = 'dfs.data.transfer.protection' |
| |
| try: # Params may be absent |
| privileged_dfs_dn_port = isSecurePort(getPort(hdfs_site[dfs_datanode_address])) |
| except KeyError: |
| privileged_dfs_dn_port = False |
| try: |
| privileged_dfs_http_port = isSecurePort(getPort(hdfs_site[datanode_http_address])) |
| except KeyError: |
| privileged_dfs_http_port = False |
| try: |
| privileged_dfs_https_port = isSecurePort(getPort(hdfs_site[datanode_https_address])) |
| except KeyError: |
| privileged_dfs_https_port = False |
| try: |
| dfs_http_policy_value = hdfs_site[dfs_http_policy] |
| except KeyError: |
| dfs_http_policy_value = HTTP_ONLY # Default |
| try: |
| data_transfer_protection_value = hdfs_site[data_transfer_protection] |
| except KeyError: |
| data_transfer_protection_value = None |
| |
| if dfs_http_policy_value not in VALID_HTTP_POLICY_VALUES: |
| validationItems.append({"config-name": dfs_http_policy, |
| "item": self.getWarnItem( |
| "Invalid property value: {0}. Valid values are {1}".format( |
| dfs_http_policy_value, VALID_HTTP_POLICY_VALUES))}) |
| |
| # determine whether we use secure ports |
| address_properties_with_warnings = [] |
| if dfs_http_policy_value == HTTPS_ONLY: |
| if not privileged_dfs_dn_port and (privileged_dfs_https_port or datanode_https_address not in hdfs_site): |
| important_properties = [dfs_datanode_address, datanode_https_address] |
| message = "You set up datanode to use some non-secure ports. " \ |
| "If you want to run Datanode under non-root user in a secure cluster, " \ |
| "you should set all these properties {2} " \ |
| "to use non-secure ports (if property {3} does not exist, " \ |
| "just add it). You may also set up property {4} ('{5}' is a good default value). " \ |
| "Also, set up WebHDFS with SSL as " \ |
| "described in manual in order to be able to " \ |
| "use HTTPS.".format(dfs_http_policy, dfs_http_policy_value, important_properties, |
| datanode_https_address, data_transfer_protection, |
| VALID_TRANSFER_PROTECTION_VALUES[0]) |
| address_properties_with_warnings.extend(important_properties) |
| else: # dfs_http_policy_value == HTTP_AND_HTTPS or HTTP_ONLY |
| # We don't enforce datanode_https_address to use privileged ports here |
| any_nonprivileged_ports_are_in_use = not privileged_dfs_dn_port or not privileged_dfs_http_port |
| if any_nonprivileged_ports_are_in_use: |
| important_properties = [dfs_datanode_address, datanode_http_address] |
| message = "You have set up datanode to use some non-secure ports, but {0} is set to {1}. " \ |
| "In a secure cluster, Datanode forbids using non-secure ports " \ |
| "if {0} is not set to {3}. " \ |
| "Please make sure that properties {2} use secure ports.".format( |
| dfs_http_policy, dfs_http_policy_value, important_properties, HTTPS_ONLY) |
| address_properties_with_warnings.extend(important_properties) |
| |
| # Generate port-related warnings if any |
| for prop in address_properties_with_warnings: |
| validationItems.append({"config-name": prop, |
| "item": self.getWarnItem(message)}) |
| |
| # Check if it is appropriate to use dfs.data.transfer.protection |
| if data_transfer_protection_value is not None: |
| if dfs_http_policy_value in [HTTP_ONLY, HTTP_AND_HTTPS]: |
| validationItems.append({"config-name": data_transfer_protection, |
| "item": self.getWarnItem( |
| "{0} property can not be used when {1} is set to any " |
| "value other then {2}. Tip: When {1} property is not defined, it defaults to {3}".format( |
| data_transfer_protection, dfs_http_policy, HTTPS_ONLY, HTTP_ONLY))}) |
| elif not data_transfer_protection_value in VALID_TRANSFER_PROTECTION_VALUES: |
| validationItems.append({"config-name": data_transfer_protection, |
| "item": self.getWarnItem( |
| "Invalid property value: {0}. Valid values are {1}.".format( |
| data_transfer_protection_value, VALID_TRANSFER_PROTECTION_VALUES))}) |
| validationProblems = self.toConfigurationValidationProblems(validationItems, "hdfs-site") |
| validationProblems.extend(parentValidationProblems) |
| return validationProblems |
| |
| def validateHiveServer2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| hive_server2 = properties |
| validationItems = [] |
| #Adding Ranger Plugin logic here |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-hive-plugin-properties") |
| hive_env_properties = getSiteProperties(configurations, "hive-env") |
| ranger_plugin_enabled = 'hive_security_authorization' in hive_env_properties and hive_env_properties['hive_security_authorization'].lower() == 'ranger' |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| ##Add stack validations only if Ranger is enabled. |
| if ("RANGER" in servicesList): |
| ##Add stack validations for Ranger plugin enabled. |
| if ranger_plugin_enabled: |
| prop_name = 'hive.security.authorization.manager' |
| prop_val = "com.xasecure.authorization.hive.authorizer.XaSecureHiveAuthorizerFactory" |
| if prop_name not in hive_server2 or hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is enabled."\ |
| " {0} under hiveserver2-site needs to be set to {1}".format(prop_name,prop_val))}) |
| prop_name = 'hive.security.authenticator.manager' |
| prop_val = "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator" |
| if prop_name not in hive_server2 or hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is enabled."\ |
| " {0} under hiveserver2-site needs to be set to {1}".format(prop_name,prop_val))}) |
| prop_name = 'hive.security.authorization.enabled' |
| prop_val = 'true' |
| if prop_name in hive_server2 and hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is enabled."\ |
| " {0} under hiveserver2-site needs to be set to {1}".format(prop_name, prop_val))}) |
| prop_name = 'hive.conf.restricted.list' |
| prop_vals = 'hive.security.authorization.enabled,hive.security.authorization.manager,hive.security.authenticator.manager'.split(',') |
| current_vals = [] |
| missing_vals = [] |
| if hive_server2 and prop_name in hive_server2: |
| current_vals = hive_server2[prop_name].split(',') |
| current_vals = [x.strip() for x in current_vals] |
| |
| for val in prop_vals: |
| if not val in current_vals: |
| missing_vals.append(val) |
| |
| if missing_vals: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem("If Ranger Hive Plugin is enabled."\ |
| " {0} under hiveserver2-site needs to contain missing value {1}".format(prop_name, ','.join(missing_vals)))}) |
| ##Add stack validations for Ranger plugin disabled. |
| elif not ranger_plugin_enabled: |
| prop_name = 'hive.security.authorization.manager' |
| prop_val = "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory" |
| if prop_name in hive_server2 and hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is disabled."\ |
| " {0} needs to be set to {1}".format(prop_name,prop_val))}) |
| prop_name = 'hive.security.authenticator.manager' |
| prop_val = "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator" |
| if prop_name in hive_server2 and hive_server2[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger Hive Plugin is disabled."\ |
| " {0} needs to be set to {1}".format(prop_name,prop_val))}) |
| return self.toConfigurationValidationProblems(validationItems, "hiveserver2-site") |
| |
| def validateWebhcatConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [{"config-name": 'templeton.hadoop.queue.name', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'templeton.hadoop.queue.name', services)}] |
| return self.toConfigurationValidationProblems(validationItems, "webhcat-site") |
| |
| |
| def validateHiveConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| hive_env = properties |
| hive_site = getSiteProperties(configurations, "hive-site") |
| if "hive_security_authorization" in hive_env and \ |
| str(hive_env["hive_security_authorization"]).lower() == "none" \ |
| and str(hive_site["hive.security.authorization.enabled"]).lower() == "true": |
| authorization_item = self.getErrorItem("hive_security_authorization should not be None " |
| "if hive.security.authorization.enabled is set") |
| validationItems.append({"config-name": "hive_security_authorization", "item": authorization_item}) |
| if "hive_security_authorization" in hive_env and \ |
| str(hive_env["hive_security_authorization"]).lower() == "ranger": |
| # ranger-hive-plugin must be enabled in ranger-env |
| ranger_env = getServicesSiteProperties(services, 'ranger-env') |
| if not ranger_env or not 'ranger-hive-plugin-enabled' in ranger_env or \ |
| ranger_env['ranger-hive-plugin-enabled'].lower() != 'yes': |
| validationItems.append({"config-name": 'hive_security_authorization', |
| "item": self.getWarnItem( |
| "ranger-env/ranger-hive-plugin-enabled must be enabled when hive_security_authorization is set to Ranger")}) |
| return self.toConfigurationValidationProblems(validationItems, "hive-env") |
| |
| def validateHiveConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| parentValidationProblems = super(HDP22StackAdvisor, self).validateHiveConfigurations(properties, recommendedDefaults, configurations, services, hosts) |
| hive_site = properties |
| validationItems = [] |
| stripe_size_values = [8388608, 16777216, 33554432, 67108864, 134217728, 268435456] |
| stripe_size_property = "hive.exec.orc.default.stripe.size" |
| if stripe_size_property in properties and \ |
| int(properties[stripe_size_property]) not in stripe_size_values: |
| validationItems.append({"config-name": stripe_size_property, |
| "item": self.getWarnItem("Correct values are {0}".format(stripe_size_values)) |
| } |
| ) |
| authentication_property = "hive.server2.authentication" |
| ldap_baseDN_property = "hive.server2.authentication.ldap.baseDN" |
| ldap_domain_property = "hive.server2.authentication.ldap.Domain" |
| if authentication_property in properties and properties[authentication_property].lower() == "ldap" \ |
| and not (ldap_baseDN_property in properties or ldap_domain_property in properties): |
| validationItems.append({"config-name" : authentication_property, "item" : |
| self.getWarnItem("According to LDAP value for " + authentication_property + ", you should add " + |
| ldap_domain_property + " property, if you are using AD, if not, then " + ldap_baseDN_property + "!")}) |
| |
| |
| configurationValidationProblems = self.toConfigurationValidationProblems(validationItems, "hive-site") |
| configurationValidationProblems.extend(parentValidationProblems) |
| return configurationValidationProblems |
| |
| def validateSparkDefaults(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ |
| { |
| "config-name": 'spark.yarn.queue', |
| "item": self.validatorYarnQueue(properties, recommendedDefaults, 'spark.yarn.queue', services) |
| } |
| ] |
| return self.toConfigurationValidationProblems(validationItems, "spark-defaults") |
| |
| def validateSparkThriftSparkConf(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [ |
| { |
| "config-name": 'spark.yarn.queue', |
| "item": self.validatorYarnQueue(properties, recommendedDefaults, 'spark.yarn.queue', services) |
| } |
| ] |
| return self.toConfigurationValidationProblems(validationItems, "spark-thrift-sparkconf") |
| |
| def validateHBASEConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| hbase_site = properties |
| validationItems = [] |
| |
| prop_name1 = 'hbase.regionserver.global.memstore.size' |
| prop_name2 = 'hfile.block.cache.size' |
| props_max_sum = 0.8 |
| |
| if prop_name1 in hbase_site and not is_number(hbase_site[prop_name1]): |
| validationItems.append({"config-name": prop_name1, |
| "item": self.getWarnItem( |
| "{0} should be float value".format(prop_name1))}) |
| elif prop_name2 in hbase_site and not is_number(hbase_site[prop_name2]): |
| validationItems.append({"config-name": prop_name2, |
| "item": self.getWarnItem( |
| "{0} should be float value".format(prop_name2))}) |
| elif prop_name1 in hbase_site and prop_name2 in hbase_site and \ |
| float(hbase_site[prop_name1]) + float(hbase_site[prop_name2]) > props_max_sum: |
| validationItems.append({"config-name": prop_name1, |
| "item": self.getWarnItem( |
| "{0} and {1} sum should not exceed {2}".format(prop_name1, prop_name2, props_max_sum))}) |
| |
| #Adding Ranger Plugin logic here |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-hbase-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-hbase-plugin-enabled'] if ranger_plugin_properties else 'No' |
| prop_name = 'hbase.security.authorization' |
| prop_val = "true" |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): |
| if hbase_site[prop_name] != prop_val: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger HBase Plugin is enabled."\ |
| "{0} needs to be set to {1}".format(prop_name,prop_val))}) |
| prop_name = "hbase.coprocessor.master.classes" |
| prop_val = "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor" |
| exclude_val = "org.apache.hadoop.hbase.security.access.AccessController" |
| if (prop_val in hbase_site[prop_name] and exclude_val not in hbase_site[prop_name]): |
| pass |
| else: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger HBase Plugin is enabled."\ |
| " {0} needs to contain {1} instead of {2}".format(prop_name,prop_val,exclude_val))}) |
| prop_name = "hbase.coprocessor.region.classes" |
| prop_val = "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor" |
| if (prop_val in hbase_site[prop_name] and exclude_val not in hbase_site[prop_name]): |
| pass |
| else: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If Ranger HBase Plugin is enabled."\ |
| " {0} needs to contain {1} instead of {2}".format(prop_name,prop_val,exclude_val))}) |
| |
| # Validate bucket cache correct config |
| prop_name = "hbase.bucketcache.ioengine" |
| prop_val = "offheap" |
| if prop_name in hbase_site and not (not hbase_site[prop_name] or hbase_site[prop_name] == prop_val): |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "Recommended values of " \ |
| " {0} is empty or '{1}'".format(prop_name,prop_val))}) |
| |
| prop_name1 = "hbase.bucketcache.ioengine" |
| prop_name2 = "hbase.bucketcache.size" |
| prop_name3 = "hbase.bucketcache.percentage.in.combinedcache" |
| |
| if prop_name1 in hbase_site and prop_name2 in hbase_site and hbase_site[prop_name1] and not hbase_site[prop_name2]: |
| validationItems.append({"config-name": prop_name2, |
| "item": self.getWarnItem( |
| "If bucketcache ioengine is enabled, {0} should be set".format(prop_name2))}) |
| if prop_name1 in hbase_site and prop_name3 in hbase_site and hbase_site[prop_name1] and not hbase_site[prop_name3]: |
| validationItems.append({"config-name": prop_name3, |
| "item": self.getWarnItem( |
| "If bucketcache ioengine is enabled, {0} should be set".format(prop_name3))}) |
| |
| # Validate hbase.security.authentication. |
| # Kerberos works only when security enabled. |
| if "hbase.security.authentication" in properties: |
| hbase_security_kerberos = properties["hbase.security.authentication"].lower() == "kerberos" |
| core_site_properties = getSiteProperties(configurations, "core-site") |
| security_enabled = False |
| if core_site_properties: |
| security_enabled = core_site_properties['hadoop.security.authentication'] == 'kerberos' and core_site_properties['hadoop.security.authorization'] == 'true' |
| if not security_enabled and hbase_security_kerberos: |
| validationItems.append({"config-name": "hbase.security.authentication", |
| "item": self.getWarnItem("Cluster must be secured with Kerberos before hbase.security.authentication's value of kerberos will have effect")}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "hbase-site") |
| |
| def validateHBASEEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| hbase_env = properties |
| validationItems = [ {"config-name": 'hbase_regionserver_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_regionserver_heapsize')}, |
| {"config-name": 'hbase_master_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_master_heapsize')} ] |
| prop_name = "hbase_max_direct_memory_size" |
| hbase_site_properties = getSiteProperties(configurations, "hbase-site") |
| prop_name1 = "hbase.bucketcache.ioengine" |
| |
| if prop_name1 in hbase_site_properties and prop_name in hbase_env and hbase_site_properties[prop_name1] and hbase_site_properties[prop_name1] == "offheap" and not hbase_env[prop_name]: |
| validationItems.append({"config-name": prop_name, |
| "item": self.getWarnItem( |
| "If bucketcache ioengine is enabled, {0} should be set".format(prop_name))}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "hbase-env") |
| |
| def validateHBASERangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-hbase-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-hbase-plugin-enabled'] if ranger_plugin_properties else 'No' |
| if ranger_plugin_enabled.lower() == 'yes': |
| # ranger-hdfs-plugin must be enabled in ranger-env |
| ranger_env = getServicesSiteProperties(services, 'ranger-env') |
| if not ranger_env or not 'ranger-hbase-plugin-enabled' in ranger_env or \ |
| ranger_env['ranger-hbase-plugin-enabled'].lower() != 'yes': |
| validationItems.append({"config-name": 'ranger-hbase-plugin-enabled', |
| "item": self.getWarnItem( |
| "ranger-hbase-plugin-properties/ranger-hbase-plugin-enabled must correspond ranger-env/ranger-hbase-plugin-enabled")}) |
| return self.toConfigurationValidationProblems(validationItems, "ranger-hbase-plugin-properties") |
| |
| def validateKnoxRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-knox-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-knox-plugin-enabled'] if ranger_plugin_properties else 'No' |
| if ranger_plugin_enabled.lower() == 'yes': |
| # ranger-hdfs-plugin must be enabled in ranger-env |
| ranger_env = getServicesSiteProperties(services, 'ranger-env') |
| if not ranger_env or not 'ranger-knox-plugin-enabled' in ranger_env or \ |
| ranger_env['ranger-knox-plugin-enabled'].lower() != 'yes': |
| validationItems.append({"config-name": 'ranger-knox-plugin-enabled', |
| "item": self.getWarnItem( |
| "ranger-knox-plugin-properties/ranger-knox-plugin-enabled must correspond ranger-env/ranger-knox-plugin-enabled")}) |
| return self.toConfigurationValidationProblems(validationItems, "ranger-knox-plugin-properties") |
| |
| def validateKafkaRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties else 'No' |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| security_enabled = self.isSecurityEnabled(services) |
| if ranger_plugin_enabled.lower() == 'yes': |
| # ranger-hdfs-plugin must be enabled in ranger-env |
| ranger_env = getServicesSiteProperties(services, 'ranger-env') |
| if not ranger_env or not 'ranger-kafka-plugin-enabled' in ranger_env or \ |
| ranger_env['ranger-kafka-plugin-enabled'].lower() != 'yes': |
| validationItems.append({"config-name": 'ranger-kafka-plugin-enabled', |
| "item": self.getWarnItem( |
| "ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled must correspond ranger-env/ranger-kafka-plugin-enabled")}) |
| |
| if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'yes') and not security_enabled: |
| validationItems.append({"config-name": "ranger-kafka-plugin-enabled", |
| "item": self.getWarnItem( |
| "Ranger Kafka plugin should not be enabled in non-kerberos environment.")}) |
| return self.toConfigurationValidationProblems(validationItems, "ranger-kafka-plugin-properties") |
| |
| def validateStormRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-storm-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-storm-plugin-enabled'] if ranger_plugin_properties else 'No' |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| security_enabled = self.isSecurityEnabled(services) |
| if ranger_plugin_enabled.lower() == 'yes': |
| # ranger-hdfs-plugin must be enabled in ranger-env |
| ranger_env = getServicesSiteProperties(services, 'ranger-env') |
| if not ranger_env or not 'ranger-storm-plugin-enabled' in ranger_env or \ |
| ranger_env['ranger-storm-plugin-enabled'].lower() != 'yes': |
| validationItems.append({"config-name": 'ranger-storm-plugin-enabled', |
| "item": self.getWarnItem( |
| "ranger-storm-plugin-properties/ranger-storm-plugin-enabled must correspond ranger-env/ranger-storm-plugin-enabled")}) |
| if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()) and not security_enabled: |
| validationItems.append({"config-name": "ranger-storm-plugin-enabled", |
| "item": self.getWarnItem( |
| "Ranger Storm plugin should not be enabled in non-kerberos environment.")}) |
| |
| return self.toConfigurationValidationProblems(validationItems, "ranger-storm-plugin-properties") |
| |
| def validateYARNEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| parentValidationProblems = super(HDP22StackAdvisor, self).validateYARNEnvConfigurations(properties, recommendedDefaults, configurations, services, hosts) |
| validationItems = [] |
| if "yarn_cgroups_enabled" in properties: |
| yarn_cgroups_enabled = properties["yarn_cgroups_enabled"].lower() == "true" |
| core_site_properties = getSiteProperties(configurations, "core-site") |
| security_enabled = False |
| if core_site_properties: |
| security_enabled = core_site_properties['hadoop.security.authentication'] == 'kerberos' and core_site_properties['hadoop.security.authorization'] == 'true' |
| if not security_enabled and yarn_cgroups_enabled: |
| validationItems.append({"config-name": "yarn_cgroups_enabled", |
| "item": self.getWarnItem("CPU Isolation should only be enabled if security is enabled")}) |
| validationProblems = self.toConfigurationValidationProblems(validationItems, "yarn-env") |
| validationProblems.extend(parentValidationProblems) |
| return validationProblems |
| |
| def validateYARNRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): |
| validationItems = [] |
| ranger_plugin_properties = getSiteProperties(configurations, "ranger-yarn-plugin-properties") |
| ranger_plugin_enabled = ranger_plugin_properties['ranger-yarn-plugin-enabled'] if ranger_plugin_properties else 'No' |
| if ranger_plugin_enabled.lower() == 'yes': |
| # ranger-hdfs-plugin must be enabled in ranger-env |
| ranger_env = getServicesSiteProperties(services, 'ranger-env') |
| if not ranger_env or not 'ranger-yarn-plugin-enabled' in ranger_env or \ |
| ranger_env['ranger-yarn-plugin-enabled'].lower() != 'yes': |
| validationItems.append({"config-name": 'ranger-yarn-plugin-enabled', |
| "item": self.getWarnItem( |
| "ranger-yarn-plugin-properties/ranger-yarn-plugin-enabled must correspond ranger-env/ranger-yarn-plugin-enabled")}) |
| return self.toConfigurationValidationProblems(validationItems, "ranger-yarn-plugin-properties") |
| |
| def validateRangerConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): |
| ranger_env_properties = properties |
| validationItems = [] |
| servicesList = [service["StackServices"]["service_name"] for service in services["services"]] |
| if "ranger-storm-plugin-enabled" in ranger_env_properties and ranger_env_properties['ranger-storm-plugin-enabled'].lower() == 'yes' and not 'KERBEROS' in servicesList: |
| validationItems.append({"config-name": "ranger-storm-plugin-enabled", |
| "item": self.getWarnItem("Ranger Storm plugin should not be enabled in non-kerberos environment.")}) |
| return self.toConfigurationValidationProblems(validationItems, "ranger-env") |
| |
| def getMastersWithMultipleInstances(self): |
| result = super(HDP22StackAdvisor, self).getMastersWithMultipleInstances() |
| result.extend(['METRICS_COLLECTOR']) |
| return result |
| |
| def getNotValuableComponents(self): |
| result = super(HDP22StackAdvisor, self).getNotValuableComponents() |
| result.extend(['METRICS_MONITOR']) |
| return result |
| |
| def getCardinalitiesDict(self): |
| result = super(HDP22StackAdvisor, self).getCardinalitiesDict() |
| result['METRICS_COLLECTOR'] = {"min": 1} |
| return result |
| |
| def getAffectedConfigs(self, services): |
| affectedConfigs = super(HDP22StackAdvisor, self).getAffectedConfigs(services) |
| |
| # There are configs that are not defined in the stack but added/removed by |
| # stack-advisor. Here we add such configs in order to clear the config |
| # filtering down in base class |
| configsList = [affectedConfig["type"] + "/" + affectedConfig["name"] for affectedConfig in affectedConfigs] |
| if 'yarn-env/yarn_cgroups_enabled' in configsList: |
| if 'yarn-site/yarn.nodemanager.container-executor.class' not in configsList: |
| affectedConfigs.append({"type": "yarn-site", "name": "yarn.nodemanager.container-executor.class"}) |
| if 'yarn-site/yarn.nodemanager.container-executor.group' not in configsList: |
| affectedConfigs.append({"type": "yarn-site", "name": "yarn.nodemanager.container-executor.group"}) |
| if 'yarn-site/yarn.nodemanager.container-executor.resources-handler.class' not in configsList: |
| affectedConfigs.append({"type": "yarn-site", "name": "yarn.nodemanager.container-executor.resources-handler.class"}) |
| if 'yarn-site/yarn.nodemanager.container-executor.cgroups.hierarchy' not in configsList: |
| affectedConfigs.append({"type": "yarn-site", "name": "yarn.nodemanager.container-executor.cgroups.hierarchy"}) |
| if 'yarn-site/yarn.nodemanager.container-executor.cgroups.mount' not in configsList: |
| affectedConfigs.append({"type": "yarn-site", "name": "yarn.nodemanager.container-executor.cgroups.mount"}) |
| if 'yarn-site/yarn.nodemanager.linux-container-executor.cgroups.mount-path' not in configsList: |
| affectedConfigs.append({"type": "yarn-site", "name": "yarn.nodemanager.linux-container-executor.cgroups.mount-path"}) |
| |
| return affectedConfigs; |
| |
| def is_number(s): |
| try: |
| float(s) |
| return True |
| except ValueError: |
| pass |
| |
| return False |
| |
| def is_valid_host_port_authority(target): |
| has_scheme = "://" in target |
| if not has_scheme: |
| target = "dummyscheme://"+target |
| try: |
| result = urlparse(target) |
| if result.hostname is not None and result.port is not None: |
| return True |
| except ValueError: |
| pass |
| return False |