| #!/usr/bin/env python |
| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| |
| """ |
| import os |
| from resource_management.libraries.functions import format |
| from resource_management.libraries.script.script import Script |
| from resource_management.libraries.functions.version import format_stack_version |
| from resource_management.libraries.functions import StackFeature |
| from resource_management.libraries.functions.stack_features import check_stack_feature |
| from resource_management.libraries.functions.stack_features import get_stack_feature_version |
| from resource_management.libraries.functions.default import default |
| from utils import get_bare_principal |
| from resource_management.libraries.functions.get_stack_version import get_stack_version |
| from resource_management.libraries.functions.is_empty import is_empty |
| import status_params |
| from resource_management.libraries.resources.hdfs_resource import HdfsResource |
| from resource_management.libraries.functions import stack_select |
| from resource_management.libraries.functions import conf_select |
| from resource_management.libraries.functions import get_kinit_path |
| from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources |
| |
| # server configurations |
| config = Script.get_config() |
| tmp_dir = Script.get_tmp_dir() |
| stack_root = Script.get_stack_root() |
| stack_name = default("/hostLevelParams/stack_name", None) |
| retryAble = default("/commandParams/command_retry_enabled", False) |
| |
| # Version being upgraded/downgraded to |
| version = default("/commandParams/version", None) |
| |
| # Version that is CURRENT. |
| current_version = default("/hostLevelParams/current_version", None) |
| |
| host_sys_prepped = default("/hostLevelParams/host_sys_prepped", False) |
| |
| stack_version_unformatted = config['hostLevelParams']['stack_version'] |
| stack_version_formatted = format_stack_version(stack_version_unformatted) |
| upgrade_direction = default("/commandParams/upgrade_direction", None) |
| |
| # get the correct version to use for checking stack features |
| version_for_stack_feature_checks = get_stack_feature_version(config) |
| |
| stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks) |
| stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks) |
| |
| # When downgrading the 'version' and 'current_version' are both pointing to the downgrade-target version |
| # downgrade_from_version provides the source-version the downgrade is happening from |
| downgrade_from_version = default("/commandParams/downgrade_from_version", None) |
| |
| hostname = config['hostname'] |
| |
| # default kafka parameters |
| kafka_home = '/usr/lib/kafka' |
| kafka_bin = kafka_home+'/bin/kafka' |
| conf_dir = "/etc/kafka/conf" |
| limits_conf_dir = "/etc/security/limits.d" |
| |
| # Used while upgrading the stack in a kerberized cluster and running kafka-acls.sh |
| zookeeper_connect = default("/configurations/kafka-broker/zookeeper.connect", None) |
| |
| kafka_user_nofile_limit = config['configurations']['kafka-env']['kafka_user_nofile_limit'] |
| kafka_user_nproc_limit = config['configurations']['kafka-env']['kafka_user_nproc_limit'] |
| |
| # parameters for 2.2+ |
| if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): |
| kafka_home = os.path.join(stack_root, "current", "kafka-broker") |
| kafka_bin = os.path.join(kafka_home, "bin", "kafka") |
| conf_dir = os.path.join(kafka_home, "config") |
| |
| kafka_user = config['configurations']['kafka-env']['kafka_user'] |
| kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir'] |
| kafka_pid_dir = status_params.kafka_pid_dir |
| kafka_pid_file = kafka_pid_dir+"/kafka.pid" |
| # This is hardcoded on the kafka bash process lifecycle on which we have no control over |
| kafka_managed_pid_dir = "/var/run/kafka" |
| kafka_managed_log_dir = "/var/log/kafka" |
| user_group = config['configurations']['cluster-env']['user_group'] |
| java64_home = config['hostLevelParams']['java_home'] |
| kafka_env_sh_template = config['configurations']['kafka-env']['content'] |
| kafka_hosts = config['clusterHostInfo']['kafka_broker_hosts'] |
| kafka_hosts.sort() |
| |
| zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts'] |
| zookeeper_hosts.sort() |
| |
| if (('kafka-log4j' in config['configurations']) and ('content' in config['configurations']['kafka-log4j'])): |
| log4j_props = config['configurations']['kafka-log4j']['content'] |
| else: |
| log4j_props = None |
| |
| if 'ganglia_server_host' in config['clusterHostInfo'] and \ |
| len(config['clusterHostInfo']['ganglia_server_host'])>0: |
| ganglia_installed = True |
| ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0] |
| ganglia_report_interval = 60 |
| else: |
| ganglia_installed = False |
| |
| metric_collector_host = "" |
| metric_collector_port = "" |
| metric_collector_protocol = "" |
| metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "") |
| metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "") |
| metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "") |
| |
| ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", []) |
| has_metric_collector = not len(ams_collector_hosts) == 0 |
| |
| if has_metric_collector: |
| if 'cluster-env' in config['configurations'] and \ |
| 'metrics_collector_vip_host' in config['configurations']['cluster-env']: |
| metric_collector_host = config['configurations']['cluster-env']['metrics_collector_vip_host'] |
| else: |
| metric_collector_host = ams_collector_hosts[0] |
| if 'cluster-env' in config['configurations'] and \ |
| 'metrics_collector_vip_port' in config['configurations']['cluster-env']: |
| metric_collector_port = config['configurations']['cluster-env']['metrics_collector_vip_port'] |
| else: |
| metric_collector_web_address = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "localhost:6188") |
| if metric_collector_web_address.find(':') != -1: |
| metric_collector_port = metric_collector_web_address.split(':')[1] |
| else: |
| metric_collector_port = '6188' |
| if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY": |
| metric_collector_protocol = 'https' |
| else: |
| metric_collector_protocol = 'http' |
| pass |
| # Security-related params |
| security_enabled = config['configurations']['cluster-env']['security_enabled'] |
| kafka_kerberos_enabled = (('security.inter.broker.protocol' in config['configurations']['kafka-broker']) and |
| ((config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL") or |
| (config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "SASL_PLAINTEXT"))) |
| |
| |
| if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \ |
| and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted): |
| _hostname_lowercase = config['hostname'].lower() |
| _kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name'] |
| kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase) |
| kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab'] |
| kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name) |
| kafka_kerberos_params = "-Djava.security.auth.login.config="+ conf_dir +"/kafka_jaas.conf" |
| else: |
| kafka_kerberos_params = '' |
| kafka_jaas_principal = None |
| kafka_keytab_path = None |
| |
| # *********************** RANGER PLUGIN CHANGES *********************** |
| # ranger host |
| # ********************************************************************** |
| ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", []) |
| has_ranger_admin = not len(ranger_admin_hosts) == 0 |
| xml_configurations_supported = config['configurations']['ranger-env']['xml_configurations_supported'] |
| ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0] |
| |
| ranger_admin_log_dir = default("/configurations/ranger-env/ranger_admin_log_dir","/var/log/ranger/admin") |
| is_supported_kafka_ranger = config['configurations']['kafka-env']['is_supported_kafka_ranger'] |
| |
| #ranger kafka properties |
| if has_ranger_admin and is_supported_kafka_ranger: |
| |
| enable_ranger_kafka = config['configurations']['ranger-kafka-plugin-properties']['ranger-kafka-plugin-enabled'] |
| enable_ranger_kafka = not is_empty(enable_ranger_kafka) and enable_ranger_kafka.lower() == 'yes' |
| policymgr_mgr_url = config['configurations']['admin-properties']['policymgr_external_url'] |
| if 'admin-properties' in config['configurations'] and 'policymgr_external_url' in config['configurations']['admin-properties'] and policymgr_mgr_url.endswith('/'): |
| policymgr_mgr_url = policymgr_mgr_url.rstrip('/') |
| xa_audit_db_flavor = config['configurations']['admin-properties']['DB_FLAVOR'] |
| xa_audit_db_flavor = xa_audit_db_flavor.lower() if xa_audit_db_flavor else None |
| xa_audit_db_name = default('/configurations/admin-properties/audit_db_name', 'ranger_audits') |
| xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger') |
| xa_audit_db_password = '' |
| if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db: |
| xa_audit_db_password = unicode(config['configurations']['admin-properties']['audit_db_password']) |
| xa_db_host = config['configurations']['admin-properties']['db_host'] |
| repo_name = str(config['clusterName']) + '_kafka' |
| |
| ranger_env = config['configurations']['ranger-env'] |
| ranger_plugin_properties = config['configurations']['ranger-kafka-plugin-properties'] |
| |
| ranger_kafka_audit = config['configurations']['ranger-kafka-audit'] |
| ranger_kafka_audit_attrs = config['configuration_attributes']['ranger-kafka-audit'] |
| ranger_kafka_security = config['configurations']['ranger-kafka-security'] |
| ranger_kafka_security_attrs = config['configuration_attributes']['ranger-kafka-security'] |
| ranger_kafka_policymgr_ssl = config['configurations']['ranger-kafka-policymgr-ssl'] |
| ranger_kafka_policymgr_ssl_attrs = config['configuration_attributes']['ranger-kafka-policymgr-ssl'] |
| |
| policy_user = config['configurations']['ranger-kafka-plugin-properties']['policy_user'] |
| |
| ranger_plugin_config = { |
| 'username' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_USERNAME'], |
| 'password' : unicode(config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_PASSWORD']), |
| 'zookeeper.connect' : config['configurations']['ranger-kafka-plugin-properties']['zookeeper.connect'], |
| 'commonNameForCertificate' : config['configurations']['ranger-kafka-plugin-properties']['common.name.for.certificate'] |
| } |
| |
| kafka_ranger_plugin_repo = { |
| 'isEnabled': 'true', |
| 'configs': ranger_plugin_config, |
| 'description': 'kafka repo', |
| 'name': repo_name, |
| 'repositoryType': 'kafka', |
| 'type': 'kafka', |
| 'assetType': '1' |
| } |
| |
| if stack_supports_ranger_kerberos and security_enabled: |
| ranger_plugin_config['policy.download.auth.users'] = kafka_user |
| ranger_plugin_config['tag.download.auth.users'] = kafka_user |
| ranger_plugin_config['ambari.service.check.user'] = policy_user |
| |
| #For curl command in ranger plugin to get db connector |
| jdk_location = config['hostLevelParams']['jdk_location'] |
| java_share_dir = '/usr/share/java' |
| previous_jdbc_jar_name = None |
| |
| if stack_supports_ranger_audit_db: |
| if xa_audit_db_flavor and xa_audit_db_flavor == 'mysql': |
| jdbc_jar_name = default("/hostLevelParams/custom_mysql_jdbc_name", None) |
| previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_mysql_jdbc_name", None) |
| audit_jdbc_url = format('jdbc:mysql://{xa_db_host}/{xa_audit_db_name}') |
| jdbc_driver = "com.mysql.jdbc.Driver" |
| elif xa_audit_db_flavor and xa_audit_db_flavor == 'oracle': |
| jdbc_jar_name = default("/hostLevelParams/custom_oracle_jdbc_name", None) |
| previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_oracle_jdbc_name", None) |
| colon_count = xa_db_host.count(':') |
| if colon_count == 2 or colon_count == 0: |
| audit_jdbc_url = format('jdbc:oracle:thin:@{xa_db_host}') |
| else: |
| audit_jdbc_url = format('jdbc:oracle:thin:@//{xa_db_host}') |
| jdbc_driver = "oracle.jdbc.OracleDriver" |
| elif xa_audit_db_flavor and xa_audit_db_flavor == 'postgres': |
| jdbc_jar_name = default("/hostLevelParams/custom_postgres_jdbc_name", None) |
| previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_postgres_jdbc_name", None) |
| audit_jdbc_url = format('jdbc:postgresql://{xa_db_host}/{xa_audit_db_name}') |
| jdbc_driver = "org.postgresql.Driver" |
| elif xa_audit_db_flavor and xa_audit_db_flavor == 'mssql': |
| jdbc_jar_name = default("/hostLevelParams/custom_mssql_jdbc_name", None) |
| previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_mssql_jdbc_name", None) |
| audit_jdbc_url = format('jdbc:sqlserver://{xa_db_host};databaseName={xa_audit_db_name}') |
| jdbc_driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" |
| elif xa_audit_db_flavor and xa_audit_db_flavor == 'sqla': |
| jdbc_jar_name = default("/hostLevelParams/custom_sqlanywhere_jdbc_name", None) |
| previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_sqlanywhere_jdbc_name", None) |
| audit_jdbc_url = format('jdbc:sqlanywhere:database={xa_audit_db_name};host={xa_db_host}') |
| jdbc_driver = "sap.jdbc4.sqlanywhere.IDriver" |
| |
| downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None |
| driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None |
| driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None |
| previous_jdbc_jar = format("{kafka_home}/libs/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None |
| |
| xa_audit_db_is_enabled = False |
| ranger_audit_solr_urls = config['configurations']['ranger-admin-site']['ranger.audit.solr.urls'] |
| if xml_configurations_supported and stack_supports_ranger_audit_db: |
| xa_audit_db_is_enabled = config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db'] |
| xa_audit_hdfs_is_enabled = default('/configurations/ranger-kafka-audit/xasecure.audit.destination.hdfs', False) |
| ssl_keystore_password = unicode(config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password']) if xml_configurations_supported else None |
| ssl_truststore_password = unicode(config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password']) if xml_configurations_supported else None |
| credential_file = format('/etc/ranger/{repo_name}/cred.jceks') if xml_configurations_supported else None |
| |
| stack_version = get_stack_version('kafka-broker') |
| setup_ranger_env_sh_source = format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh') |
| setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh") |
| |
| #For SQLA explicitly disable audit to DB for Ranger |
| if xa_audit_db_flavor == 'sqla': |
| xa_audit_db_is_enabled = False |
| |
| namenode_hosts = default("/clusterHostInfo/namenode_host", []) |
| has_namenode = not len(namenode_hosts) == 0 |
| |
| hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None |
| hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None |
| hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None |
| hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None |
| default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None |
| hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None |
| hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None |
| kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) |
| |
| import functools |
| #create partial functions with common arguments for every HdfsResource call |
| #to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code |
| HdfsResource = functools.partial( |
| HdfsResource, |
| user=hdfs_user, |
| hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore", |
| security_enabled = security_enabled, |
| keytab = hdfs_user_keytab, |
| kinit_path_local = kinit_path_local, |
| hadoop_bin_dir = hadoop_bin_dir, |
| hadoop_conf_dir = hadoop_conf_dir, |
| principal_name = hdfs_principal_name, |
| hdfs_site = hdfs_site, |
| default_fs = default_fs, |
| immutable_paths = get_not_managed_resources() |
| ) |