blob: 9703d1f984cf6fac4a8b79bb394598d4cd2e1767 [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
import os
from datetime import datetime
from metron_security import kinit
from resource_management.core.exceptions import ComponentIsNotRunning
from resource_management.core.exceptions import Fail
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Directory, File
from resource_management.core.resources.system import Execute
from resource_management.core.source import InlineTemplate
from resource_management.core.source import Template
from resource_management.libraries.functions import format as ambari_format
from resource_management.libraries.functions.get_user_call_output import \
get_user_call_output
from resource_management.libraries.script import Script
def is_zk_configured(params):
return os.path.isfile(params.zk_configured_flag_file)
def init_zk_config(params):
Logger.info('Loading ALL Metron config into ZooKeeper - this command should ONLY be executed by Ambari on initial install.')
Execute(ambari_format(
"{metron_home}/bin/zk_load_configs.sh --zk_quorum {zookeeper_quorum} --mode PUSH --input_dir {metron_zookeeper_config_path}"),
path=ambari_format("{java_home}/bin")
)
def set_configured(user, flag_file, log_msg):
Logger.info(log_msg)
File(flag_file,
content="This file created on: " + datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
owner=user,
mode=0755)
def set_zk_configured(params):
set_configured(params.metron_user, params.zk_configured_flag_file, "Setting Zookeeper configured to true")
def solr_global_config_patches():
"""
Builds the global configuration patches required for Solr.
"""
return """
{
"op": "add",
"path": "/solr.zookeeper",
"value": "{{solr_zookeeper_url}}"
}
"""
def elasticsearch_global_config_patches():
"""
Builds the global configuration patches required for Elasticsearch.
"""
return """
{
"op": "add",
"path": "/es.clustername",
"value": "{{ es_cluster_name }}"
},
{
"op": "add",
"path": "/es.ip",
"value": "{{ es_url }}"
},
{
"op": "add",
"path": "/es.date.format",
"value": "{{es_date_format}}"
}
"""
def build_global_config_patch(params, patch_file):
"""
Build the file used to patch the global configuration.
See RFC 6902 at https://tools.ietf.org/html/rfc6902
:param params:
:param patch_file: The path where the patch file will be created.
"""
if params.ra_indexing_writer == 'Solr':
indexing_patches = solr_global_config_patches()
else:
indexing_patches = elasticsearch_global_config_patches()
other_patches = """
{
"op": "add",
"path": "/profiler.client.period.duration",
"value": "{{profiler_period_duration}}"
},
{
"op": "add",
"path": "/profiler.client.period.duration.units",
"value": "{{profiler_period_units}}"
},
{
"op": "add",
"path": "/parser.error.topic",
"value": "{{parser_error_topic}}"
},
{
"op": "add",
"path": "/enrichment.list.hbase.provider.impl",
"value": "{{enrichment_list_hbase_provider_impl}}"
},
{
"op": "add",
"path": "/enrichment.list.hbase.table",
"value": "{{enrichment_list_hbase_table}}"
},
{
"op": "add",
"path": "/enrichment.list.hbase.cf",
"value": "{{enrichment_list_hbase_cf}}"
},
{
"op": "add",
"path": "/update.hbase.table",
"value": "{{update_hbase_table}}"
},
{
"op": "add",
"path": "/update.hbase.cf",
"value": "{{update_hbase_cf}}"
},
{
"op": "add",
"path": "/user.settings.hbase.table",
"value": "{{user_settings_hbase_table}}"
},
{
"op": "add",
"path": "/user.settings.hbase.cf",
"value": "{{user_settings_hbase_cf}}"
},
{
"op": "add",
"path": "/bootstrap.servers",
"value": "{{kafka_brokers}}"
},
{
"op": "add",
"path": "/source.type.field",
"value": "{{source_type_field}}"
},
{
"op": "add",
"path": "/threat.triage.score.field",
"value": "{{threat_triage_score_field}}"
},
{
"op": "add",
"path": "/enrichment.writer.batchSize",
"value": "{{enrichment_kafka_writer_batch_size}}"
},
{
"op": "add",
"path": "/enrichment.writer.batchTimeout",
"value": "{{enrichment_kafka_writer_batch_timeout}}"
},
{
"op": "add",
"path": "/profiler.writer.batchSize",
"value": "{{profiler_kafka_writer_batch_size}}"
},
{
"op": "add",
"path": "/profiler.writer.batchTimeout",
"value": "{{profiler_kafka_writer_batch_timeout}}"
}
"""
patch_template = ambari_format(
"""
[
{indexing_patches},
{other_patches}
]
""")
File(patch_file,
content=InlineTemplate(patch_template),
owner=params.metron_user,
group=params.metron_group)
def patch_global_config(params):
patch_file = "/tmp/metron-global-config-patch.json"
Logger.info("Setup temporary global config JSON patch (formatting per RFC6902): " + patch_file)
build_global_config_patch(params, patch_file)
Logger.info('Patching global config in ZooKeeper')
Execute(ambari_format(
"{metron_home}/bin/zk_load_configs.sh --zk_quorum {zookeeper_quorum} --mode PATCH --config_type GLOBAL --patch_file " + patch_file),
path=ambari_format("{java_home}/bin")
)
Logger.info("Done patching global config")
def pull_config(params):
Logger.info('Pulling all Metron configs down from ZooKeeper to local file system')
Logger.info('NOTE - THIS IS OVERWRITING THE LOCAL METRON CONFIG DIR WITH ZOOKEEPER CONTENTS: ' + params.metron_zookeeper_config_path)
Execute(ambari_format(
"{metron_home}/bin/zk_load_configs.sh --zk_quorum {zookeeper_quorum} --mode PULL --output_dir {metron_zookeeper_config_path} --force"),
path=ambari_format("{java_home}/bin")
)
def refresh_configs(params):
if not is_zk_configured(params):
Logger.warning("The expected flag file '" + params.zk_configured_flag_file + "'indicating that Zookeeper has been configured does not exist. Skipping patching. An administrator should look into this.")
return
check_indexer_parameters()
patch_global_config(params)
pull_config(params)
def get_running_topologies(params):
Logger.info('Getting Running Storm Topologies from Storm REST Server')
Logger.info('Security enabled? ' + str(params.security_enabled))
# Want to sudo to the metron user and kinit as them so we aren't polluting root with Metron's Kerberos tickets.
# This is becuase we need to run a command with a return as the metron user. Sigh
negotiate = '--negotiate -u : ' if params.security_enabled else ''
cmd = ambari_format(
'curl --max-time 3 ' + negotiate + '{storm_rest_addr}/api/v1/topology/summary')
if params.security_enabled:
kinit(params.kinit_path_local,
params.metron_keytab_path,
params.metron_principal_name,
execute_user=params.metron_user)
Logger.info('Running cmd: ' + cmd)
return_code, stdout, stderr = get_user_call_output(cmd,
user=params.metron_user,
is_checked_call=False)
if (return_code != 0):
return {}
try:
stormjson = json.loads(stdout)
except ValueError, e:
Logger.info('Stdout: ' + str(stdout))
Logger.info('Stderr: ' + str(stderr))
Logger.exception(str(e))
return {}
topologiesDict = {}
for topology in stormjson['topologies']:
topologiesDict[topology['name']] = topology['status']
Logger.info("Topologies: " + str(topologiesDict))
return topologiesDict
def init_kafka_topics(params, topics):
Logger.info('Creating Kafka topics')
# Create the topics. All the components need indexing (for errors), so we pass '--if-not-exists'.
command_template = """{0}/kafka-topics.sh \
--zookeeper {1} \
--create \
--if-not-exists \
--topic {2} \
--partitions {3} \
--replication-factor {4} \
--config retention.bytes={5}"""
num_partitions = 1
replication_factor = 1
retention_gigabytes = int(params.metron_topic_retention)
retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
for topic in topics:
Logger.info("Creating topic'{0}'".format(topic))
Execute(command_template.format(params.kafka_bin_dir,
params.zookeeper_quorum,
topic,
num_partitions,
replication_factor,
retention_bytes),
user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
Logger.info("Done creating Kafka topics")
def init_kafka_acls(params, topics):
Logger.info('Creating Kafka topic ACLs')
acl_template = """{0}/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect={1} \
--add \
--allow-principal User:{2} \
--topic {3}"""
for topic in topics:
Logger.info("Creating ACL for topic '{0}'".format(topic))
Execute(acl_template.format(params.kafka_bin_dir,
params.zookeeper_quorum,
params.metron_user,
topic),
user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
def init_kafka_acl_groups(params, groups):
Logger.info('Creating Kafka group ACLs')
acl_template = """{0}/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect={1} \
--add \
--allow-principal User:{2} \
--group {3}"""
for group in groups:
Logger.info("Creating ACL for group '{0}'".format(group))
Execute(acl_template.format(params.kafka_bin_dir,
params.zookeeper_quorum,
params.metron_user,
group),
user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
def execute(cmd, user, err_msg=None, tries=3, try_sleep=5, logoutput=True, path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'):
"""
Executes a command and raises an appropriate error message if the command
fails.
:param cmd: The command to execute.
:param user: The user to execute the command as.
:param err_msg: The error message to display if the command fails.
:param tries: The number of attempts to execute the command.
:param try_sleep: The time between attempts.
:param logoutput: If true, log the command output.
:param path: The path use when running the command.
:return:
"""
try:
Execute(cmd, tries=tries, try_sleep=try_sleep, logoutput=logoutput, user=user, path=path)
except:
if err_msg is None:
err_msg = "Execution failed: cmd={0}, user={1}".format(cmd, user)
raise Fail(err_msg)
def check_kafka_topics(params, topics):
"""
Validates that the Kafka topics exist. An exception is raised if any of the
topics do not exist.
:param params:
:param topics: A list of topic names.
"""
# if needed kinit as 'metron'
if params.security_enabled:
kinit(params.kinit_path_local,
params.metron_keytab_path,
params.metron_principal_name,
execute_user=params.metron_user)
template = """{0}/kafka-topics.sh \
--zookeeper {1} \
--list | \
awk 'BEGIN {{cnt=0;}} /{2}/ {{cnt++}} END {{if (cnt > 0) {{exit 0}} else {{exit 1}}}}'"""
for topic in topics:
Logger.info("Checking existence of Kafka topic '{0}'".format(topic))
cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, topic)
err_msg = "Missing Kafka topic; topic={0}".format(topic)
execute(cmd, user=params.kafka_user, err_msg=err_msg)
def create_hbase_table(params, table, cf):
"""
Creates an HBase table, if the table does not currently exist
:param params:
:param table: The name of the HBase table.
:param cf: The column family
:param user: The user to execute the command as
"""
if params.security_enabled:
kinit(params.kinit_path_local,
params.hbase_keytab_path,
params.hbase_principal_name,
execute_user=params.hbase_user)
cmd = """if [[ $(echo \"exists '{0}'\" | hbase shell | grep 'not exist') ]]; \
then echo \"create '{0}','{1}'\" | hbase shell -n; fi"""
add_update_cmd = cmd.format(table, cf)
Execute(add_update_cmd,
tries=3,
try_sleep=5,
logoutput=False,
path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
user=params.hbase_user
)
def check_hbase_table(params, table):
"""
Validates that an HBase table exists. An exception is raised if the table
does not exist.
:param params:
:param table: The name of the HBase table.
"""
Logger.info("Checking HBase table '{0}'".format(table))
# if needed kinit as 'hbase'
if params.security_enabled:
kinit(params.kinit_path_local,
params.hbase_keytab_path,
params.hbase_principal_name,
execute_user=params.hbase_user)
template = "echo \"exists '{0}'\" | hbase shell -n | grep 'Table {1} does exist'"
cmd = template.format(table, table)
err_msg = "Missing HBase table; table={0}".format(table)
execute(cmd, user=params.hbase_user, err_msg=err_msg)
def check_hbase_column_family(params, table, column_family):
"""
Validates that an HBase column family exists. An exception is raised if the
column family does not exist.
:param params:
:param table: The name of the HBase table.
:param column_family: The name of the HBase column family.
"""
Logger.info("Checking column family '{0}:{1}'".format(table, column_family))
# if needed kinit as 'hbase'
if params.security_enabled:
kinit(params.kinit_path_local,
params.hbase_keytab_path,
params.hbase_principal_name,
execute_user=params.hbase_user)
template = "echo \"desc '{0}'\" | hbase shell -n | grep \"NAME => '{1}'\""
cmd = template.format(table, column_family)
err_msg = "Missing HBase column family; table={0}, cf={1}".format(table, column_family)
execute(cmd, user=params.hbase_user, err_msg=err_msg)
def check_hbase_acls(params, table, user=None, permissions="READ,WRITE"):
"""
Validates that HBase table permissions exist for a user. An exception is
raised if the permissions do not exist.
:param params:
:param table: The name of the HBase table.
:param user: The name of the user.
:param permissions: The permissions that should exist.
"""
if user is None:
user = params.metron_user
Logger.info("Checking HBase ACLs; table={0}, user={1}, permissions={2}".format(table, user, permissions))
# if needed kinit as 'hbase'
if params.security_enabled:
kinit(params.kinit_path_local,
params.hbase_keytab_path,
params.hbase_principal_name,
execute_user=params.hbase_user)
template = """echo "user_permission '{0}'" | \
hbase shell -n | \
grep " {1} " | \
grep "actions={2}"
"""
cmd = template.format(table, user, permissions)
err_msg = "Missing HBase access; table={0}, user={1}, permissions={2}".format(table, user, permissions)
execute(cmd, user=params.hbase_user, err_msg=err_msg)
def check_hdfs_dir_exists(params, path, user=None):
"""
Validate that a directory exists in HDFS.
:param params:
:param path: The directory path in HDFS.
:param user: The user to execute the check under.
"""
if user is None:
user = params.metron_user
Logger.info("Checking HDFS; directory={0} user={1}".format(path, user))
# if needed kinit as 'metron'
if params.security_enabled:
kinit(params.kinit_path_local,
params.metron_keytab_path,
params.metron_principal_name,
execute_user=params.metron_user)
template = "{0}/hdfs dfs -test -d {1}"
cmd = template.format(params.hadoop_bin_dir, path)
err_msg = "Missing directory in HDFS: directory={0} user={1}".format(path, user)
execute(cmd, user=params.metron_user, err_msg=err_msg)
def check_hdfs_file_exists(params, path, user=None):
"""
Validate that a file exists in HDFS.
:param params:
:param path: The file path in HDFS.
:param user: The user to execute the check under.
"""
if user is None:
user = params.metron_user
Logger.info("Checking HDFS; file={0}, user={1}".format(path, user))
# if needed kinit as 'metron'
if params.security_enabled:
kinit(params.kinit_path_local,
params.metron_keytab_path,
params.metron_principal_name,
execute_user=params.metron_user)
template = "{0}/hdfs dfs -test -f {1}"
cmd = template.format(params.hadoop_bin_dir, path)
err_msg = "Missing file in HDFS; file={0}".format(path)
execute(cmd, user=user, err_msg=err_msg)
def check_kafka_acls(params, topics, user=None):
"""
Validate that permissions have been granted for a list of Kakfa topics.
:param params:
:param topics: A list of topic names.
:param user: The user whose access is checked.
"""
if user is None:
user = params.metron_user
template = """{0}/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect={1} \
--topic {2} \
--list | grep 'User:{3}'"""
for topic in topics:
Logger.info("Checking ACL; topic={0}, user={1}'".format(topic, user))
cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, topic, user)
err_msg = "Missing Kafka access; topic={0}, user={1}".format(topic, user)
execute(cmd, user=params.kafka_user, err_msg=err_msg)
def check_kafka_acl_groups(params, groups, user=None):
"""
Validate that Kafka group permissions have been granted.
:param params:
:param groups: A list of group name.
:param user: The user whose access is checked.
"""
if user is None:
user = params.metron_user
template = """{0}/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect={1} \
--group {2} \
--list | grep 'User:{3}'"""
for group in groups:
Logger.info("Checking group ACL for topic '{0}'".format(group))
cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, group, user)
err_msg = "Missing Kafka group access; group={0}, user={1}".format(group, user)
execute(cmd, user=params.kafka_user, err_msg=err_msg)
def check_http(host, port, user):
"""
Check for a valid HTTP response.
:param hostname: The hostname.
:param port: The port number.
:param user: Execute the HTTP request as.
"""
cmd = "curl -sS --max-time 3 {0}:{1}".format(host, port)
Logger.info('Checking HTTP connectivity; host={0}, port={1}, user={2} cmd={3}'.format(host, port, user, cmd))
try:
Execute(cmd, tries=3, try_sleep=5, logoutput=False, user=user)
except:
raise ComponentIsNotRunning()
def check_indexer_parameters():
"""
Ensure that all required parameters have been defined for the chosen
Indexer; either Solr or Elasticsearch.
"""
missing = []
config = Script.get_config()
indexer = config['configurations']['metron-indexing-env']['ra_indexing_writer']
Logger.info('Checking parameters for indexer = ' + indexer)
if indexer == 'Solr':
# check for all required solr parameters
if not config['configurations']['metron-env']['solr_zookeeper_url']:
missing.append("metron-env/solr_zookeeper_url")
else:
# check for all required elasticsearch parameters
if not config['configurations']['metron-env']['es_cluster_name']:
missing.append("metron-env/es_cluster_name")
if not config['configurations']['metron-env']['es_hosts']:
missing.append("metron-env/es_hosts")
if not config['configurations']['metron-env']['es_date_format']:
missing.append("metron-env/es_date_format")
if len(missing) > 0:
raise Fail("Missing required indexing parameters(s): indexer={0}, missing={1}".format(indexer, missing))
def install_metron_knox(params):
if os.path.exists(params.knox_home):
template = """export KNOX_HOME={0}; \
export KNOX_USER={1}; \
export KNOX_GROUP={2}; \
{3}/bin/install_metron_knox.sh; \
unset KNOX_USER; \
unset KNOX_GROUP; \
unset KNOX_HOME;"""
cmd = template.format(params.knox_home, params.knox_user, params.knox_group, params.metron_home)
Execute(cmd)
set_metron_knox_installed(params)
def is_metron_knox_installed(params):
return os.path.isfile(params.metron_knox_installed_flag_file)
def set_metron_knox_installed(params):
Directory(params.metron_zookeeper_config_path,
mode=0755,
owner=params.metron_user,
group=params.metron_group,
create_parents=True
)
set_configured(params.metron_user, params.metron_knox_installed_flag_file, "Setting Metron Knox installed to true")
def metron_knox_topology_setup(params):
if os.path.exists(params.knox_home):
File(ambari_format("{knox_home}/conf/topologies/metron.xml"),
content=Template("metron.xml.j2"),
owner=params.knox_user,
group=params.knox_group
)
File(ambari_format("{knox_home}/conf/topologies/metronsso.xml"),
content=Template("metronsso.xml.j2"),
owner=params.knox_user,
group=params.knox_group
)