blob: f43073d31e8d3a24b5f4435c4eaa8c7a8e1363bf [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import time
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute, File
from resource_management.libraries.resources.hdfs_resource import HdfsResource
import metron_service
# Wrap major operations and functionality in this class
class EnrichmentCommands:
__params = None
__enrichment_topology = None
__enrichment_topic = None
__configured = False
def __init__(self, params):
if params is None:
raise ValueError("params argument is required for initialization")
self.__params = params
self.__enrichment_topology = params.metron_enrichment_topology
self.__enrichment_topic = params.metron_enrichment_topic
self.__configured = os.path.isfile(self.__params.enrichment_configured_flag_file)
def is_configured(self):
return self.__configured
def set_configured(self):
File(self.__params.enrichment_configured_flag_file,
content="",
owner=self.__params.metron_user,
mode=0775)
def setup_repo(self):
def local_repo():
Logger.info("Setting up local repo")
Execute("yum -y install createrepo")
Execute("createrepo /localrepo")
Execute("chmod -R o-w+r /localrepo")
def remote_repo():
Logger.info('Using remote repo')
yum_repo_types = {
'local': local_repo,
'remote': remote_repo
}
repo_type = self.__params.yum_repo_type
if repo_type in yum_repo_types:
yum_repo_types[repo_type]()
Logger.info("Writing out repo file")
repo_template = ("echo \"[METRON-0.3.0]\n"
"name=Metron 0.3.0 packages\n"
"baseurl={0}\n"
"gpgcheck=0\n"
"enabled=1\n\""
" > /etc/yum.repos.d/metron.repo")
Execute(repo_template.format(self.__params.repo_url))
else:
raise ValueError("Unsupported repo type '{0}'".format(repo_type))
def init_kafka_topics(self):
Logger.info('Creating Kafka topics')
command_template = """{0}/kafka-topics.sh \
--zookeeper {1} \
--create \
--topic {2} \
--partitions {3} \
--replication-factor {4} \
--config retention.bytes={5}"""
num_partitions = 1
replication_factor = 1
retention_gigabytes = int(self.__params.metron_topic_retention)
retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
Logger.info("Creating topics for enrichment")
Logger.info("Creating topic'{0}'".format(self.__enrichment_topic))
Execute(command_template.format(self.__params.kafka_bin_dir,
self.__params.zookeeper_quorum,
self.__enrichment_topic,
num_partitions,
replication_factor,
retention_bytes))
Logger.info("Done creating Kafka topics")
def init_hdfs_dir(self):
self.__params.HdfsResource(self.__params.metron_apps_enrichment_dir,
type="directory",
action="create_on_execute",
owner=self.__params.metron_user,
group=self.__params.user_group,
mode=0775,
)
def start_enrichment_topology(self):
Logger.info("Starting Metron enrichment topology: {0}".format(self.__enrichment_topology))
start_cmd_template = """{0}/bin/start_enrichment_topology.sh \
-s {1} \
-z {2}"""
Logger.info('Starting ' + self.__enrichment_topology)
Execute(start_cmd_template.format(self.__params.metron_home, self.__enrichment_topology, self.__params.zookeeper_quorum))
Logger.info('Finished starting enrichment topology')
def stop_enrichment_topology(self):
Logger.info('Stopping ' + self.__enrichment_topology)
stop_cmd = 'storm kill ' + self.__enrichment_topology
Execute(stop_cmd)
Logger.info('Done stopping enrichment topologies')
def restart_enrichment_topology(self, env):
Logger.info('Restarting the enrichment topologies')
self.stop_enrichment_topology()
# Wait for old topology to be cleaned up by Storm, before starting again.
retries = 0
topology_active = self.is_topology_active(env)
while topology_active and retries < 3:
Logger.info('Existing topology still active. Will wait and retry')
time.sleep(40)
topology_active = self.is_topology_active(env)
retries += 1
if not topology_active:
self.start_enrichment_topology()
Logger.info('Done restarting the enrichment topology')
else:
Logger.warning('Retries exhausted. Existing topology not cleaned up. Aborting topology start.')
def is_topology_active(self, env):
env.set_params(self.__params)
active = True
topologies = metron_service.get_running_topologies()
is_running = False
if self.__enrichment_topology in topologies:
is_running = topologies[self.__enrichment_topology] in ['ACTIVE', 'REBALANCING']
active &= is_running
return active
def create_hbase_tables(self):
add_enrichment_cmd = "echo \"create '{0}','{1}'\" | hbase shell -n".format(self.__params.enrichment_table, self.__params.enrichment_cf)
Execute(add_enrichment_cmd,
tries=3,
try_sleep=5,
logoutput=False,
path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'
)
add_threatintel_cmd = "echo \"create '{0}','{1}'\" | hbase shell -n".format(self.__params.threatintel_table, self.__params.threatintel_cf)
Execute(add_threatintel_cmd,
tries=3,
try_sleep=5,
logoutput=False,
path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'
)