blob: 07e36db74e1cf371f41e99ff3dd5e6b423a621e7 [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import re
import subprocess
import time
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute, File
import metron_service
# Wrap major operations and functionality in this class
class ParserCommands:
__params = None
__parser_list = None
__configured = False
def __init__(self, params):
if params is None:
raise ValueError("params argument is required for initialization")
self.__params = params
self.__parser_list = self.__get_parsers(params)
self.__configured = os.path.isfile(self.__params.parsers_configured_flag_file)
# get list of parsers
def __get_parsers(self, params):
return params.parsers.replace(' ', '').split(',')
def is_configured(self):
return self.__configured
def set_configured(self):
File(self.__params.parsers_configured_flag_file,
content="",
owner=self.__params.metron_user,
mode=0775)
def init_parsers(self):
Logger.info(
"Copying grok patterns from local directory '{0}' to HDFS '{1}'".format(self.__params.local_grok_patterns_dir,
self.__params.metron_apps_dir))
self.__params.HdfsResource(self.__params.metron_apps_dir,
type="directory",
action="create_on_execute",
owner=self.__params.metron_user,
mode=0775,
source=self.__params.local_grok_patterns_dir)
Logger.info("Done initializing parser configuration")
def get_parser_list(self):
return self.__parser_list
def setup_repo(self):
def local_repo():
Logger.info("Setting up local repo")
Execute("yum -y install createrepo")
Execute("createrepo /localrepo")
Execute("chmod -R o-w+r /localrepo")
Execute("echo \"[METRON-0.2.1BETA]\n"
"name=Metron 0.2.1BETA packages\n"
"baseurl=file:///localrepo\n"
"gpgcheck=0\n"
"enabled=1\" > /etc/yum.repos.d/local.repo")
def remote_repo():
print('Using remote repo')
yum_repo_types = {
'local': local_repo,
'remote': remote_repo
}
repo_type = self.__params.yum_repo_type
if repo_type in yum_repo_types:
yum_repo_types[repo_type]()
else:
raise ValueError("Unsupported repo type '{0}'".format(repo_type))
def init_kafka_topics(self):
Logger.info('Creating Kafka topics')
command_template = """{0}/kafka-topics.sh \
--zookeeper {1} \
--create \
--topic {2} \
--partitions {3} \
--replication-factor {4} \
--config retention.bytes={5}"""
num_partitions = 1
replication_factor = 1
retention_gigabytes = int(self.__params.metron_topic_retention)
retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
Logger.info("Creating main topics for parsers")
for parser_name in self.get_parser_list():
Logger.info("Creating topic'{0}'".format(parser_name))
Execute(command_template.format(self.__params.kafka_bin_dir,
self.__params.zookeeper_quorum,
parser_name,
num_partitions,
replication_factor,
retention_bytes))
Logger.info("Creating topics for error handling")
Execute(command_template.format(self.__params.kafka_bin_dir,
self.__params.zookeeper_quorum,
"parser_invalid",
num_partitions,
replication_factor,
retention_bytes))
Execute(command_template.format(self.__params.kafka_bin_dir,
self.__params.zookeeper_quorum,
"parser_error",
num_partitions, replication_factor,
retention_bytes))
Logger.info("Done creating Kafka topics")
def start_parser_topologies(self):
Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list()))
start_cmd_template = """{0}/bin/start_parser_topology.sh \
-k {1} \
-z {2} \
-s {3}"""
for parser in self.get_parser_list():
Logger.info('Starting ' + parser)
Execute(start_cmd_template.format(self.__params.metron_home, self.__params.kafka_brokers,
self.__params.zookeeper_quorum, parser))
Logger.info('Finished starting parser topologies')
def stop_parser_topologies(self):
Logger.info('Stopping parsers')
for parser in self.get_parser_list():
Logger.info('Stopping ' + parser)
stop_cmd = 'storm kill ' + parser
Execute(stop_cmd)
Logger.info('Done stopping parser topologies')
def restart_parser_topologies(self, env):
Logger.info('Restarting the parser topologies')
self.stop_parser_topologies()
attempt_count = 0
while self.topologies_running(env):
if attempt_count > 2:
raise Exception("Unable to kill topologies")
attempt_count += 1
time.sleep(10)
self.start_parser_topologies()
Logger.info('Done restarting the parser topologies')
def topologies_exist(self):
cmd_open = subprocess.Popen(["storm", "list"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = cmd_open.communicate()
stdout_lines = stdout.splitlines()
if stdout_lines:
status_lines = self.__get_status_lines(stdout_lines)
for parser in self.get_parser_list():
for line in status_lines:
items = re.sub('[\s]+', ' ', line).split()
if items and items[0] == parser:
return True
return False
def topologies_running(self, env):
env.set_params(self.__params)
all_running = True
topologies = metron_service.get_running_topologies()
for parser in self.get_parser_list():
parser_found = False
is_running = False
if parser in topologies:
parser_found = True
is_running = topologies[parser] in ['ACTIVE', 'REBALANCING']
all_running &= parser_found and is_running
return all_running
def __get_status_lines(self, lines):
status_lines = []
do_stat = False
skipped = 0
for line in lines:
if line.startswith("Topology_name"):
do_stat = True
if do_stat and skipped == 2:
status_lines += [line]
elif do_stat:
skipped += 1
return status_lines
def __is_running(self, status):
return status in ['ACTIVE', 'REBALANCING']