blob: 32e1510483f4c18e02a9678f93a9defccd02d4ed [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Python Imports
import os
import sys
# Local Imports
from resource_management import get_bare_principal
from status_params import *
from resource_management import format_stack_version, Script
from resource_management.libraries.functions import format
from resource_management.libraries.functions.default import default
from resource_management.libraries.functions.stack_features import check_stack_feature
from resource_management.libraries.functions import StackFeature
from resource_management.libraries.functions.is_empty import is_empty
from resource_management.libraries.functions.expect import expect
def configs_for_ha(atlas_hosts, metadata_port, is_atlas_ha_enabled):
"""
Return a dictionary of additional configs to merge if Atlas HA is enabled.
:param atlas_hosts: List of hostnames that contain Atlas
:param metadata_port: Port number
:param is_atlas_ha_enabled: None, True, or False
:return: Dictionary with additional configs to merge to application-properties if HA is enabled.
"""
additional_props = {}
if atlas_hosts is None or len(atlas_hosts) == 0 or metadata_port is None:
return additional_props
# Sort to guarantee each host sees the same values, assuming restarted at the same time.
atlas_hosts = sorted(atlas_hosts)
# E.g., id1,id2,id3,...,idn
_server_id_list = ["id" + str(i) for i in range(1, len(atlas_hosts) + 1)]
atlas_server_ids = ",".join(_server_id_list)
additional_props["atlas.server.ids"] = atlas_server_ids
i = 0
for curr_hostname in atlas_hosts:
id = _server_id_list[i]
prop_name = "atlas.server.address." + id
prop_value = curr_hostname + ":" + metadata_port
additional_props[prop_name] = prop_value
i += 1
# This may override the existing property
if i == 1 or (i > 1 and is_atlas_ha_enabled is False):
additional_props["atlas.server.ha.enabled"] = "false"
elif i > 1:
additional_props["atlas.server.ha.enabled"] = "true"
return additional_props
# server configurations
config = Script.get_config()
exec_tmp_dir = Script.get_tmp_dir()
stack_root = Script.get_stack_root()
# Needed since this is an Atlas Hook service.
cluster_name = config['clusterName']
java_version = expect("/hostLevelParams/java_version", int)
if security_enabled:
_hostname_lowercase = config['hostname'].lower()
_atlas_principal_name = config['configurations']['application-properties']['atlas.authentication.principal']
atlas_jaas_principal = _atlas_principal_name.replace('_HOST',_hostname_lowercase)
atlas_keytab_path = config['configurations']['application-properties']['atlas.authentication.keytab']
# New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade
version = default("/commandParams/version", None)
# stack version
stack_version_unformatted = config['hostLevelParams']['stack_version']
stack_version_formatted = format_stack_version(stack_version_unformatted)
metadata_home = os.environ['METADATA_HOME_DIR'] if 'METADATA_HOME_DIR' in os.environ else format('{stack_root}/current/atlas-server')
metadata_bin = format("{metadata_home}/bin")
python_binary = os.environ['PYTHON_EXE'] if 'PYTHON_EXE' in os.environ else sys.executable
metadata_start_script = format("{metadata_bin}/atlas_start.py")
metadata_stop_script = format("{metadata_bin}/atlas_stop.py")
# metadata local directory structure
log_dir = config['configurations']['atlas-env']['metadata_log_dir']
# service locations
hadoop_conf_dir = os.path.join(os.environ["HADOOP_HOME"], "conf") if 'HADOOP_HOME' in os.environ else '/etc/hadoop/conf'
# some commands may need to supply the JAAS location when running as atlas
atlas_jaas_file = format("{conf_dir}/atlas_jaas.conf")
# user
user_group = config['configurations']['cluster-env']['user_group']
# metadata env
java64_home = config['hostLevelParams']['java_home']
env_sh_template = config['configurations']['atlas-env']['content']
# credential provider
credential_provider = format( "jceks://file@{conf_dir}/atlas-site.jceks")
# command line args
ssl_enabled = default("/configurations/application-properties/atlas.enableTLS", False)
http_port = default("/configurations/application-properties/atlas.server.http.port", "21000")
https_port = default("/configurations/application-properties/atlas.server.https.port", "21443")
if ssl_enabled:
metadata_port = https_port
metadata_protocol = 'https'
else:
metadata_port = http_port
metadata_protocol = 'http'
metadata_host = config['hostname']
atlas_hosts = sorted(default('/clusterHostInfo/atlas_server_hosts', []))
metadata_server_host = atlas_hosts[0] if len(atlas_hosts) > 0 else "UNKNOWN_HOST"
# application properties
application_properties = dict(config['configurations']['application-properties'])
application_properties["atlas.server.bind.address"] = metadata_host
if check_stack_feature(StackFeature.ATLAS_UPGRADE_SUPPORT, version_for_stack_feature_checks):
metadata_server_url = application_properties["atlas.rest.address"]
else:
# In HDP 2.3 and 2.4 the property was computed and saved to the local config but did not exist in the database.
metadata_server_url = format('{metadata_protocol}://{metadata_server_host}:{metadata_port}')
application_properties["atlas.rest.address"] = metadata_server_url
# Atlas HA should populate
# atlas.server.ids = id1,id2,...,idn
# atlas.server.address.id# = host#:port
# User should not have to modify this property, but still allow overriding it to False if multiple Atlas servers exist
# This can be None, True, or False
is_atlas_ha_enabled = default("/configurations/application-properties/atlas.server.ha.enabled", None)
additional_ha_props = configs_for_ha(atlas_hosts, metadata_port, is_atlas_ha_enabled)
for k,v in additional_ha_props.iteritems():
application_properties[k] = v
metadata_env_content = config['configurations']['atlas-env']['content']
metadata_opts = config['configurations']['atlas-env']['metadata_opts']
metadata_classpath = config['configurations']['atlas-env']['metadata_classpath']
data_dir = format("{stack_root}/current/atlas-server/data")
expanded_war_dir = os.environ['METADATA_EXPANDED_WEBAPP_DIR'] if 'METADATA_EXPANDED_WEBAPP_DIR' in os.environ else format("{stack_root}/current/atlas-server/server/webapp")
metadata_log4j_content = config['configurations']['atlas-log4j']['content']
metadata_solrconfig_content = default("/configurations/atlas-solrconfig/content", None)
atlas_log_level = config['configurations']['atlas-log4j']['atlas_log_level']
audit_log_level = config['configurations']['atlas-log4j']['audit_log_level']
# smoke test
smoke_test_user = config['configurations']['cluster-env']['smokeuser']
smoke_test_password = 'smoke'
smokeuser_principal = config['configurations']['cluster-env']['smokeuser_principal_name']
smokeuser_keytab = config['configurations']['cluster-env']['smokeuser_keytab']
security_check_status_file = format('{log_dir}/security_check.status')
if security_enabled:
smoke_cmd = format('curl --negotiate -u : -b ~/cookiejar.txt -c ~/cookiejar.txt -s -o /dev/null -w "%{{http_code}}" {metadata_protocol}://{metadata_host}:{metadata_port}/')
else:
smoke_cmd = format('curl -s -o /dev/null -w "%{{http_code}}" {metadata_protocol}://{metadata_host}:{metadata_port}/')
# hbase
hbase_conf_dir = "/etc/hbase/conf"
atlas_search_backend = default("/configurations/application-properties/atlas.graph.index.search.backend", "")
search_backend_solr = atlas_search_backend.startswith('solr')
# infra solr
infra_solr_znode = default("/configurations/infra-solr-env/infra_solr_znode", None)
infra_solr_hosts = default("/clusterHostInfo/infra_solr_hosts", [])
infra_solr_replication_factor = 2 if len(infra_solr_hosts) > 1 else 1
atlas_solr_shards = default("/configurations/atlas-env/atlas_solr-shards", 1)
has_infra_solr = len(infra_solr_hosts) > 0
# zookeeper
zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts']
zookeeper_port = default('/configurations/zoo.cfg/clientPort', None)
# get comma separated lists of zookeeper hosts from clusterHostInfo
index = 0
zookeeper_quorum = ""
for host in zookeeper_hosts:
zookeeper_host = host
if zookeeper_port is not None:
zookeeper_host = host + ":" + str(zookeeper_port)
zookeeper_quorum += zookeeper_host
index += 1
if index < len(zookeeper_hosts):
zookeeper_quorum += ","
# Atlas Ranger plugin configurations
stack_supports_atlas_ranger_plugin = check_stack_feature(StackFeature.ATLAS_RANGER_PLUGIN_SUPPORT, version_for_stack_feature_checks)
stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks)
retry_enabled = default("/commandParams/command_retry_enabled", False)
ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
has_ranger_admin = not len(ranger_admin_hosts) == 0
xml_configurations_supported = config['configurations']['ranger-env']['xml_configurations_supported']
enable_ranger_atlas = False
atlas_server_xmx = default("configurations/atlas-env/atlas_server_xmx", 2048)
atlas_server_max_new_size = default("configurations/atlas-env/atlas_server_max_new_size", 614)
hbase_master_hosts = default('/clusterHostInfo/hbase_master_hosts', [])
has_hbase_master = not len(hbase_master_hosts) == 0
ranger_admin_hosts = default('/clusterHostInfo/ranger_admin_hosts', [])
has_ranger_admin = not len(ranger_admin_hosts) == 0
atlas_hbase_setup = format("{exec_tmp_dir}/atlas_hbase_setup.rb")
atlas_kafka_setup = format("{exec_tmp_dir}/atlas_kafka_acl.sh")
atlas_graph_storage_hbase_table = default('/configurations/application-properties/atlas.graph.storage.hbase.table', None)
atlas_audit_hbase_tablename = default('/configurations/application-properties/atlas.audit.hbase.tablename', None)
hbase_user_keytab = default('/configurations/hbase-env/hbase_user_keytab', None)
hbase_principal_name = default('/configurations/hbase-env/hbase_principal_name', None)
enable_ranger_hbase = False
# ToDo: Kafka port to Atlas
# Used while upgrading the stack in a kerberized cluster and running kafka-acls.sh
hosts_with_kafka = default('/clusterHostInfo/kafka_broker_hosts', [])
host_with_kafka = hostname in hosts_with_kafka
ranger_tagsync_hosts = default("/clusterHostInfo/ranger_tagsync_hosts", [])
has_ranger_tagsync = len(ranger_tagsync_hosts) > 0
rangertagsync_user = "rangertagsync"
kafka_keytab = default('/configurations/kafka-env/kafka_keytab', None)
kafka_principal_name = default('/configurations/kafka-env/kafka_principal_name', None)
default_replication_factor = default('/configurations/application-properties/atlas.notification.replicas', None)
if check_stack_feature(StackFeature.ATLAS_UPGRADE_SUPPORT, version_for_stack_feature_checks):
default_replication_factor = default('/configurations/application-properties/atlas.notification.replicas', None)
kafka_env_sh_template = config['configurations']['kafka-env']['content']
kafka_home = os.path.join(stack_root, "current", "kafka-broker")
kafka_conf_dir = os.path.join(kafka_home, "config")
kafka_zk_endpoint = default("/configurations/kafka-broker/zookeeper.connect", None)
kafka_kerberos_enabled = (('security.inter.broker.protocol' in config['configurations']['kafka-broker']) and
((config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL") or
(config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "SASL_PLAINTEXT")))
if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \
and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted):
_hostname_lowercase = config['hostname'].lower()
_kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name']
kafka_jaas_principal = _kafka_principal_name.replace('_HOST', _hostname_lowercase)
kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab']
kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
kafka_kerberos_params = "-Djava.security.auth.login.config={0}/kafka_jaas.conf".format(kafka_conf_dir)
else:
kafka_kerberos_params = ''
kafka_jaas_principal = None
kafka_keytab_path = None
if has_ranger_admin and stack_supports_atlas_ranger_plugin:
# for create_hdfs_directory
namenode_host = set(default("/clusterHostInfo/namenode_host", []))
has_namenode = not len(namenode_host) == 0
hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None
hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None
hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None
hdfs_site = config['configurations']['hdfs-site']
default_fs = config['configurations']['core-site']['fs.defaultFS']
dfs_type = default("/commandParams/dfs_type", "")
import functools
from resource_management.libraries.resources.hdfs_resource import HdfsResource
from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
#create partial functions with common arguments for every HdfsResource call
#to create hdfs directory we need to call params.HdfsResource in code
HdfsResource = functools.partial(
HdfsResource,
user = hdfs_user,
hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
security_enabled = security_enabled,
keytab = hdfs_user_keytab,
kinit_path_local = kinit_path_local,
hadoop_bin_dir = hadoop_bin_dir,
hadoop_conf_dir = hadoop_conf_dir,
principal_name = hdfs_principal_name,
hdfs_site = hdfs_site,
default_fs = default_fs,
immutable_paths = get_not_managed_resources(),
dfs_type = dfs_type
)
repo_name = str(config['clusterName']) + '_atlas'
ssl_keystore_password = unicode(config['configurations']['ranger-atlas-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'])
ssl_truststore_password = unicode(config['configurations']['ranger-atlas-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'])
credential_file = format('/etc/ranger/{repo_name}/cred.jceks')
xa_audit_hdfs_is_enabled = default('/configurations/ranger-atlas-audit/xasecure.audit.destination.hdfs', False)
enable_ranger_atlas = config['configurations']['ranger-atlas-plugin-properties']['ranger-atlas-plugin-enabled']
enable_ranger_atlas = not is_empty(enable_ranger_atlas) and enable_ranger_atlas.lower() == 'yes'
enable_ranger_hbase = config['configurations']['ranger-hbase-plugin-properties']['ranger-hbase-plugin-enabled']
enable_ranger_hbase = not is_empty(enable_ranger_hbase) and enable_ranger_hbase.lower() == 'yes'
policymgr_mgr_url = config['configurations']['admin-properties']['policymgr_external_url']
downloaded_custom_connector = None
driver_curl_source = None
driver_curl_target = None
ranger_env = config['configurations']['ranger-env']
ranger_plugin_properties = config['configurations']['ranger-atlas-plugin-properties']
ranger_atlas_audit = config['configurations']['ranger-atlas-audit']
ranger_atlas_audit_attrs = config['configuration_attributes']['ranger-atlas-audit']
ranger_atlas_security = config['configurations']['ranger-atlas-security']
ranger_atlas_security_attrs = config['configuration_attributes']['ranger-atlas-security']
ranger_atlas_policymgr_ssl = config['configurations']['ranger-atlas-policymgr-ssl']
ranger_atlas_policymgr_ssl_attrs = config['configuration_attributes']['ranger-atlas-policymgr-ssl']
policy_user = config['configurations']['ranger-atlas-plugin-properties']['policy_user']
atlas_repository_configuration = {
'username' : config['configurations']['ranger-atlas-plugin-properties']['REPOSITORY_CONFIG_USERNAME'],
'password' : unicode(config['configurations']['ranger-atlas-plugin-properties']['REPOSITORY_CONFIG_PASSWORD']),
'atlas.rest.address' : metadata_server_url,
'commonNameForCertificate' : config['configurations']['ranger-atlas-plugin-properties']['common.name.for.certificate'],
'ambari.service.check.user' : policy_user
}
if security_enabled:
atlas_repository_configuration['policy.download.auth.users'] = metadata_user
atlas_repository_configuration['tag.download.auth.users'] = metadata_user
atlas_ranger_plugin_repo = {
'isEnabled': 'true',
'configs': atlas_repository_configuration,
'description': 'atlas repo',
'name': repo_name,
'type': 'atlas',
}