| #!/usr/bin/env python |
| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| |
| """ |
| |
| import os |
| import re |
| import subprocess |
| import time |
| |
| from resource_management.core.logger import Logger |
| from resource_management.core.resources.system import Execute, File |
| |
| import metron_service |
| |
| |
| # Wrap major operations and functionality in this class |
| class ParserCommands: |
| __params = None |
| __parser_list = None |
| __configured = False |
| |
| def __init__(self, params): |
| if params is None: |
| raise ValueError("params argument is required for initialization") |
| self.__params = params |
| self.__parser_list = self.__get_parsers(params) |
| self.__configured = os.path.isfile(self.__params.parsers_configured_flag_file) |
| |
| # get list of parsers |
| def __get_parsers(self, params): |
| return params.parsers.replace(' ', '').split(',') |
| |
| def is_configured(self): |
| return self.__configured |
| |
| def set_configured(self): |
| File(self.__params.parsers_configured_flag_file, |
| content="", |
| owner=self.__params.metron_user, |
| mode=0775) |
| |
| def init_parsers(self): |
| Logger.info( |
| "Copying grok patterns from local directory '{0}' to HDFS '{1}'".format(self.__params.local_grok_patterns_dir, |
| self.__params.metron_apps_dir)) |
| self.__params.HdfsResource(self.__params.metron_apps_dir, |
| type="directory", |
| action="create_on_execute", |
| owner=self.__params.metron_user, |
| mode=0775, |
| source=self.__params.local_grok_patterns_dir) |
| |
| Logger.info("Done initializing parser configuration") |
| |
| def get_parser_list(self): |
| return self.__parser_list |
| |
| def setup_repo(self): |
| def local_repo(): |
| Logger.info("Setting up local repo") |
| Execute("yum -y install createrepo") |
| Execute("createrepo /localrepo") |
| Execute("chmod -R o-w+r /localrepo") |
| Execute("echo \"[METRON-0.2.1BETA]\n" |
| "name=Metron 0.2.1BETA packages\n" |
| "baseurl=file:///localrepo\n" |
| "gpgcheck=0\n" |
| "enabled=1\" > /etc/yum.repos.d/local.repo") |
| |
| def remote_repo(): |
| print('Using remote repo') |
| |
| yum_repo_types = { |
| 'local': local_repo, |
| 'remote': remote_repo |
| } |
| repo_type = self.__params.yum_repo_type |
| if repo_type in yum_repo_types: |
| yum_repo_types[repo_type]() |
| else: |
| raise ValueError("Unsupported repo type '{0}'".format(repo_type)) |
| |
| def init_kafka_topics(self): |
| Logger.info('Creating Kafka topics') |
| command_template = """{0}/kafka-topics.sh \ |
| --zookeeper {1} \ |
| --create \ |
| --topic {2} \ |
| --partitions {3} \ |
| --replication-factor {4} \ |
| --config retention.bytes={5}""" |
| num_partitions = 1 |
| replication_factor = 1 |
| retention_gigabytes = int(self.__params.metron_topic_retention) |
| retention_bytes = retention_gigabytes * 1024 * 1024 * 1024 |
| Logger.info("Creating main topics for parsers") |
| for parser_name in self.get_parser_list(): |
| Logger.info("Creating topic'{0}'".format(parser_name)) |
| Execute(command_template.format(self.__params.kafka_bin_dir, |
| self.__params.zookeeper_quorum, |
| parser_name, |
| num_partitions, |
| replication_factor, |
| retention_bytes)) |
| Logger.info("Creating topics for error handling") |
| Execute(command_template.format(self.__params.kafka_bin_dir, |
| self.__params.zookeeper_quorum, |
| "parser_invalid", |
| num_partitions, |
| replication_factor, |
| retention_bytes)) |
| Execute(command_template.format(self.__params.kafka_bin_dir, |
| self.__params.zookeeper_quorum, |
| "parser_error", |
| num_partitions, replication_factor, |
| retention_bytes)) |
| Logger.info("Done creating Kafka topics") |
| |
| def start_parser_topologies(self): |
| Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list())) |
| start_cmd_template = """{0}/bin/start_parser_topology.sh \ |
| -k {1} \ |
| -z {2} \ |
| -s {3}""" |
| for parser in self.get_parser_list(): |
| Logger.info('Starting ' + parser) |
| Execute(start_cmd_template.format(self.__params.metron_home, self.__params.kafka_brokers, |
| self.__params.zookeeper_quorum, parser)) |
| |
| Logger.info('Finished starting parser topologies') |
| |
| def stop_parser_topologies(self): |
| Logger.info('Stopping parsers') |
| for parser in self.get_parser_list(): |
| Logger.info('Stopping ' + parser) |
| stop_cmd = 'storm kill ' + parser |
| Execute(stop_cmd) |
| Logger.info('Done stopping parser topologies') |
| |
| def restart_parser_topologies(self, env): |
| Logger.info('Restarting the parser topologies') |
| self.stop_parser_topologies() |
| attempt_count = 0 |
| while self.topologies_running(env): |
| if attempt_count > 2: |
| raise Exception("Unable to kill topologies") |
| attempt_count += 1 |
| time.sleep(10) |
| self.start_parser_topologies() |
| Logger.info('Done restarting the parser topologies') |
| |
| def topologies_exist(self): |
| cmd_open = subprocess.Popen(["storm", "list"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| (stdout, stderr) = cmd_open.communicate() |
| stdout_lines = stdout.splitlines() |
| if stdout_lines: |
| status_lines = self.__get_status_lines(stdout_lines) |
| for parser in self.get_parser_list(): |
| for line in status_lines: |
| items = re.sub('[\s]+', ' ', line).split() |
| if items and items[0] == parser: |
| return True |
| return False |
| |
| def topologies_running(self, env): |
| env.set_params(self.__params) |
| all_running = True |
| topologies = metron_service.get_running_topologies() |
| for parser in self.get_parser_list(): |
| parser_found = False |
| is_running = False |
| if parser in topologies: |
| parser_found = True |
| is_running = topologies[parser] in ['ACTIVE', 'REBALANCING'] |
| all_running &= parser_found and is_running |
| return all_running |
| |
| def __get_status_lines(self, lines): |
| status_lines = [] |
| do_stat = False |
| skipped = 0 |
| for line in lines: |
| if line.startswith("Topology_name"): |
| do_stat = True |
| if do_stat and skipped == 2: |
| status_lines += [line] |
| elif do_stat: |
| skipped += 1 |
| return status_lines |
| |
| def __is_running(self, status): |
| return status in ['ACTIVE', 'REBALANCING'] |