blob: df8a7fc97e2178b8b9fea8c7cdc9a1889eb1791c [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import metron_service
from metron_security import kinit
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Directory, Execute, File
from resource_management.libraries.functions import get_user_call_output
from resource_management.libraries.functions.format import format
from resource_management.libraries.functions.show_logs import show_logs
# Wrap major operations and functionality in this class
class RestCommands:
__params = None
__kafka_configured = False
__kafka_acl_configured = False
__hbase_configured = False
__hbase_acl_configured = False
__metron_user_hdfs_dir_configured = False
def __init__(self, params):
if params is None:
raise ValueError("params argument is required for initialization")
self.__params = params
self.__kafka_configured = os.path.isfile(self.__params.rest_kafka_configured_flag_file)
self.__kafka_acl_configured = os.path.isfile(self.__params.rest_kafka_acl_configured_flag_file)
self.__hbase_configured = os.path.isfile(self.__params.rest_hbase_configured_flag_file)
self.__hbase_acl_configured = os.path.isfile(self.__params.rest_hbase_acl_configured_flag_file)
self.__metron_user_hdfs_dir_configured = os.path.isfile(self.__params.metron_user_hdfs_dir_configured_flag_file)
Directory(params.metron_rest_pid_dir,
mode=0755,
owner=params.metron_user,
group=params.metron_group,
create_parents=True
)
Directory(params.metron_log_dir,
mode=0755,
owner=params.metron_user,
group=params.metron_group,
create_parents=True
)
def __get_topics(self):
return [self.__params.metron_escalation_topic]
def is_kafka_configured(self):
return self.__kafka_configured
def is_kafka_acl_configured(self):
return self.__kafka_acl_configured
def is_hbase_configured(self):
return self.__hbase_configured
def is_hbase_acl_configured(self):
return self.__hbase_acl_configured
def is_metron_user_hdfs_dir_configured(self):
return self.__metron_user_hdfs_dir_configured
def set_kafka_configured(self):
metron_service.set_configured(self.__params.metron_user, self.__params.rest_kafka_configured_flag_file, "Setting Kafka configured to True for rest")
def set_kafka_acl_configured(self):
metron_service.set_configured(self.__params.metron_user, self.__params.rest_kafka_acl_configured_flag_file, "Setting Kafka ACL configured to True for rest")
def set_hbase_configured(self):
metron_service.set_configured(self.__params.metron_user, self.__params.rest_hbase_configured_flag_file, "Setting HBase configured to True for rest")
def set_hbase_acl_configured(self):
metron_service.set_configured(self.__params.metron_user, self.__params.rest_hbase_acl_configured_flag_file, "Setting HBase ACL configured to True for rest")
def set_metron_user_hdfs_dir_configured(self):
metron_service.set_configured(self.__params.metron_user, self.__params.metron_user_hdfs_dir_configured_flag_file, "Setting Metron user HDFS directory configured to True")
def init_kafka_topics(self):
Logger.info('Creating Kafka topics for rest')
metron_service.init_kafka_topics(self.__params, self.__get_topics())
def init_kafka_acls(self):
Logger.info('Creating Kafka ACLs for rest')
# The following topics must be permissioned for the rest application list operation
topics = self.__get_topics() + [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic]
metron_service.init_kafka_acls(self.__params, topics)
groups = ['metron-rest']
metron_service.init_kafka_acl_groups(self.__params, groups)
def create_metron_user_hdfs_dir(self):
Logger.info("Creating HDFS location for Metron user")
self.__params.HdfsResource(self.__params.metron_user_hdfs_dir,
type="directory",
action="create_on_execute",
owner=self.__params.metron_user,
group=self.__params.metron_group,
mode=0755,
)
def start_rest_application(self):
"""
Start the REST application
"""
Logger.info('Starting REST application')
if self.__params.security_enabled:
kinit(self.__params.kinit_path_local,
self.__params.metron_keytab_path,
self.__params.metron_principal_name,
execute_user=self.__params.metron_user)
# Get the PID associated with the service
pid_file = format("{metron_rest_pid_dir}/{metron_rest_pid}")
pid = get_user_call_output.get_user_call_output(format("cat {pid_file}"), user=self.__params.metron_user, is_checked_call=False)[1]
process_id_exists_command = format("ls {pid_file} >/dev/null 2>&1 && ps -p {pid} >/dev/null 2>&1")
# Set the password with env variable instead of param to avoid it showing in ps
cmd = format((
"export METRON_JDBC_PASSWORD={metron_jdbc_password!p};"
"export JAVA_HOME={java_home};"
"export METRON_REST_CLASSPATH={metron_rest_classpath};"
"export METRON_INDEX_CP={metron_indexing_classpath};"
"export METRON_LOG_DIR={metron_log_dir};"
"export METRON_PID_FILE={pid_file};"
"export HDP_VERSION={hdp_version};"
"export METRON_RA_INDEXING_WRITER={ra_indexing_writer};"
"export METRON_LDAP_PASSWORD={metron_ldap_password!p};"
"export METRON_LDAP_SSL_TRUSTSTORE_PASSWORD={metron_ldap_ssl_truststore_password!p};"
"{metron_home}/bin/metron-rest.sh;"
"unset METRON_JDBC_PASSWORD;"
"unset METRON_LDAP_PASSWORD;"
"unset METRON_LDAP_SSL_TRUSTSTORE_PASSWORD;"
))
Execute(cmd,
user = self.__params.metron_user,
logoutput=True,
not_if = process_id_exists_command,
timeout=60)
Logger.info('Done starting REST application')
def stop_rest_application(self):
"""
Stop the REST application
"""
Logger.info('Stopping REST application')
# Get the pid associated with the service
pid_file = format("{metron_rest_pid_dir}/{metron_rest_pid}")
pid = get_user_call_output.get_user_call_output(format("cat {pid_file}"), user=self.__params.metron_user, is_checked_call=False)[1]
process_id_exists_command = format("ls {pid_file} >/dev/null 2>&1 && ps -p {pid} >/dev/null 2>&1")
if self.__params.security_enabled:
kinit(self.__params.kinit_path_local,
self.__params.metron_keytab_path,
self.__params.metron_principal_name,
execute_user=self.__params.metron_user)
# Politely kill
kill_cmd = ('kill', format("{pid}"))
Execute(kill_cmd,
sudo=True,
not_if = format("! ({process_id_exists_command})")
)
# Violently kill
hard_kill_cmd = ('kill', '-9', format("{pid}"))
wait_time = 5
Execute(hard_kill_cmd,
not_if = format("! ({process_id_exists_command}) || ( sleep {wait_time} && ! ({process_id_exists_command}) )"),
sudo=True,
ignore_failures = True
)
try:
# check if stopped the process, else fail the task
Execute(format("! ({process_id_exists_command})"),
tries=20,
try_sleep=3,
)
except:
show_logs(self.__params.metron_log_dir, self.__params.metron_user)
raise
File(pid_file, action = "delete")
Logger.info('Done stopping REST application')
def restart_rest_application(self, env):
"""
Restart the REST application
:param env: Environment
"""
Logger.info('Restarting the REST application')
self.stop_rest_application()
self.start_rest_application()
Logger.info('Done restarting the REST application')
def status_rest_application(self, env):
"""
Performs a status check for the REST application
:param env: Environment
"""
Logger.info('Status check the REST application')
metron_service.check_http(
self.__params.metron_rest_host,
self.__params.metron_rest_port,
self.__params.metron_user)
def create_hbase_tables(self):
Logger.info("Creating HBase Tables")
metron_service.create_hbase_table(self.__params,
self.__params.user_settings_hbase_table,
self.__params.user_settings_hbase_cf)
Logger.info("Done creating HBase Tables")
self.set_hbase_configured()
def set_hbase_acls(self):
Logger.info("Setting HBase ACLs")
if self.__params.security_enabled:
kinit(self.__params.kinit_path_local,
self.__params.hbase_keytab_path,
self.__params.hbase_principal_name,
execute_user=self.__params.hbase_user)
cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n"
add_rest_acl_cmd = cmd.format(self.__params.metron_user, self.__params.user_settings_hbase_table)
Execute(add_rest_acl_cmd,
tries=3,
try_sleep=5,
logoutput=False,
path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
user=self.__params.hbase_user
)
Logger.info("Done setting HBase ACLs")
self.set_hbase_acl_configured()
def service_check(self, env):
"""
Performs a service check for the REST application
:param env: Environment
"""
Logger.info('Checking connectivity to REST application')
metron_service.check_http(
self.__params.metron_rest_host,
self.__params.metron_rest_port,
self.__params.metron_user)
Logger.info('Checking Kafka topics for the REST application')
metron_service.check_kafka_topics(self.__params, self.__get_topics())
if self.__params.security_enabled:
Logger.info('Checking Kafka topic ACL for the REST application')
metron_service.check_kafka_acls(self.__params, self.__get_topics())
Logger.info("REST application service check completed successfully")