| #!/usr/bin/env python |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| try: |
| import os |
| import sys |
| import re |
| import logging, time |
| import subprocess |
| import threading |
| import Queue |
| import signal |
| import getpass |
| import socket |
| from optparse import OptionParser |
| from gppylib.gplog import setup_hawq_tool_logging, quiet_stdout_logging, enable_verbose_logging |
| from gppylib.commands.unix import getLocalHostname, getUserName |
| from gppylib.commands import gp |
| from gppylib.commands import unix |
| from hawqpylib.hawqlib import * |
| except ImportError, e: |
| sys.exit('ERROR: Cannot import modules. Please check that you ' |
| 'have sourced greenplum_path.sh. Detail: ' + str(e)) |
| |
| START_HELP = """ |
| magma start|stop|restart <object> |
| |
| The "objects" are: |
| cluster Start all magma nodes. |
| node Start local magma node |
| """ |
| |
| DEFAULT_REPLICA = 3 |
| MAGMA_RANGE_NUM = 2 |
| MAGMA_DIR_CAPACITY = 3 |
| |
| # check magma time value in seconds, default 600s(10min) |
| DEFAULT_CHECK_MAGMA_TIME = 600 |
| |
| def create_parser(): |
| parser = OptionParser(usage=START_HELP) |
| parser.add_option("--user", |
| dest="user", |
| default="", |
| help="Sets MAGMA user") |
| (options, args) = parser.parse_args() |
| |
| if len(args) < 2: |
| print "Usage:" |
| print START_HELP |
| sys.exit(1) |
| return (options, args) |
| |
| |
| def get_args(): |
| (opts, ARGS) = create_parser() |
| opts.magma_command = ARGS[0] |
| opts.node_type = ARGS[1] |
| |
| if opts.node_type in ['cluster', 'node'] and opts.magma_command in ['start', 'stop', 'restart']: |
| global logger, log_filename |
| logger, log_filename = setup_hawq_tool_logging('magma_%s' % opts.magma_command, getLocalHostname(), |
| getUserName()) |
| logger.info("Prepare to do 'magma %s'" % opts.magma_command) |
| logger.info("You can find log in:") |
| logger.info(log_filename) |
| else: |
| print "Usage:" |
| print START_HELP |
| sys.exit(1) |
| |
| opts.GPHOME = os.getenv('GPHOME') |
| if not opts.GPHOME: |
| logger.error("Didn't get GPHOME value, exit") |
| sys.exit(1) |
| logger.info("GPHOME is set to:") |
| logger.info(opts.GPHOME) |
| global source_hawq_env |
| source_hawq_env = "source %s/greenplum_path.sh" % opts.GPHOME |
| |
| if opts.user == "": |
| opts.user = getpass.getuser() |
| if opts.user == "root": |
| logger.error("'root' user is not allowed") |
| sys.exit(1) |
| logger.debug("Current user is '%s'" % opts.user) |
| logger.debug("Parsing config file:") |
| logger.debug("%s/etc/magma-site.xml" % opts.GPHOME) |
| |
| magmasite = HawqXMLParser(opts.GPHOME, 'magma-site.xml') |
| magmasite.get_all_values() |
| return opts, magmasite.hawq_dict |
| |
| |
| class MagmaControl: |
| def __init__(self, opts, magma_dict): |
| self.node_type = opts.node_type |
| self.magma_command = opts.magma_command |
| self.user = opts.user |
| self.GPHOME = opts.GPHOME |
| self.magma_dict = magma_dict |
| self._get_config() |
| |
| def run(self): |
| if self.magma_command == "start": |
| if self.node_type == "cluster": |
| self._start_magma_on_allnodes() |
| self.check_magma_status() |
| elif self.node_type == "node": |
| self.start_magma(getLocalHostname()) |
| else: |
| sys.exit(1) |
| elif self.magma_command == "stop": |
| if self.node_type == "cluster": |
| self._stop_magma_on_allnodes() |
| elif self.node_type == "node": |
| self.stop_magma(getLocalHostname()) |
| else: |
| sys.exit(1) |
| elif self.magma_command == "restart": |
| if self.node_type == "cluster": |
| self._stop_magma_on_allnodes() |
| self._start_magma_on_allnodes() |
| self.check_magma_status() |
| elif self.node_type == "node": |
| self.stop_magma(getLocalHostname()) |
| self.start_magma(getLocalHostname()) |
| else: |
| sys.exit(1) |
| logger.info("Magma service %s successfully" % self.magma_command) |
| return None |
| |
| def _get_config(self): |
| check_items = ('nodes_file', 'node_data_directory', 'node_log_directory', |
| 'node_address_port') |
| for item in check_items: |
| if item in self.magma_dict: |
| logger.info("Check: %s is set" % item) |
| else: |
| sys.exit("Check: %s not configured in magma-site.xml" % item) |
| |
| nodes_file = self.GPHOME + "/etc/%s" % magma_dict['nodes_file'] |
| if not os.path.exists(nodes_file): |
| logger.error("Cannot find nodes_file files %s, exit." % nodes_file) |
| sys.exit(1) |
| |
| self.nodes_host_list = parse_hosts_file(opts.GPHOME, magma_dict['nodes_file']) |
| self.nodes_count = len(self.nodes_host_list) |
| self.node_data_directory = self.magma_dict['node_data_directory'] |
| self.node_log_directory = self.magma_dict['node_log_directory'] |
| self.node_port = self.magma_dict['node_address_port'] |
| |
| if 'magma_range_number' in self.magma_dict: |
| self.magma_range_number = self.magma_dict['magma_range_number'] |
| else: |
| self.magma_range_number = MAGMA_RANGE_NUM |
| |
| if 'magma_replica_number' in self.magma_dict: |
| self.magma_replica_number = self.magma_dict['magma_replica_number'] |
| else: |
| self.magma_replica_number = DEFAULT_REPLICA |
| |
| if 'magma_datadir_capacity' in self.magma_dict: |
| self.magma_datadir_capacity = self.magma_dict['magma_datadir_capacity'] |
| else: |
| self.magma_datadir_capacity = MAGMA_DIR_CAPACITY |
| |
| self.check_magma_time = DEFAULT_CHECK_MAGMA_TIME |
| |
| def first_seg_host(self): |
| nodes_file = self.GPHOME + "/etc/%s" % magma_dict['nodes_file'] |
| first_seg = "" |
| with open(nodes_file, 'r') as f: |
| for line in f: |
| first_seg = line.split("#", 1)[0].strip() |
| break |
| return first_seg |
| |
| def start_magma(self, host): |
| logger.info("Start magma service at %s" % (host)) |
| first_seg_host = self.first_seg_host() |
| if host == first_seg_host: |
| join_address = None |
| else: |
| join_address = first_seg_host + ":" + self.node_port |
| listen_address = host + ":" + self.node_port |
| datadirs = self.node_data_directory |
| log_dir = self.node_log_directory |
| range_number = self.magma_range_number |
| replica_number = self.magma_replica_number |
| datadir_capacity = self.magma_datadir_capacity |
| cmd_str = self._start_magma_cmd(join_address, listen_address, datadirs, log_dir, replica_number, |
| range_number, datadir_capacity) |
| result = remote_ssh(cmd_str, host, self.user) |
| |
| check_return_code(result, logger, \ |
| "Magma service start failed", \ |
| "Magma service start succesfully on %s" % (host)) |
| return None |
| |
| def _start_magma_on_allnodes(self): |
| logger.info("start magma service on all nodes") |
| # Start magma on segments |
| q = Queue.Queue() |
| work_list = [] |
| first_seg_host = self.first_seg_host() |
| for host in self.nodes_host_list: |
| if host == first_seg_host: |
| join_address = None |
| else: |
| join_address = first_seg_host + ":" + self.node_port |
| listen_url = host + ":" + self.node_port |
| magma_cmd_str = self._start_magma_cmd(join_address, listen_url, \ |
| self.node_data_directory, \ |
| self.node_log_directory, self.magma_replica_number, \ |
| self.magma_range_number, self.magma_datadir_capacity) |
| logger.info("Start magma node on host: %s" % host) |
| work_list.append({"func": remote_ssh, "args": (magma_cmd_str, host, self.user, q)}) |
| |
| work_list.append({"func": check_progress, |
| "args": (q, self.nodes_count, 'start', 0, True, "magma nodes")}) |
| node_init = HawqCommands(name='Magma', action_name='start', logger=logger) |
| node_init.get_function_list(work_list) |
| node_init.start() |
| logger.debug("Total magma threads return value is : %d" % node_init.return_flag) |
| if node_init.return_flag != 0: |
| logger.error("Magma service start failed") |
| sys.exit(1) |
| return None |
| |
| def _start_magma_cmd(self, join_address, listen_address, datadirs, log_dir, replica, range_number, |
| datadir_capacity): |
| if join_address is None: |
| cmd_str = "%s; %s/bin/magma_ctl --ctl=start --listen=%s " \ |
| "--datadirs=%s --logdir=%s --replica=%s --range=%s --datadir_capacity=%s >> %s 2>&1" % \ |
| (source_hawq_env, self.GPHOME, \ |
| listen_address, datadirs, \ |
| log_dir, replica, \ |
| range_number, datadir_capacity, log_filename) |
| else: |
| cmd_str = "%s; %s/bin/magma_ctl --ctl=start --join=%s --listen=%s" \ |
| " --datadirs=%s --logdir=%s --replica=%s --range=%s --datadir_capacity=%s >> %s 2>&1" % \ |
| (source_hawq_env, self.GPHOME, \ |
| join_address, \ |
| listen_address, datadirs, \ |
| log_dir, replica, \ |
| range_number, datadir_capacity, log_filename) |
| logger.info(cmd_str) |
| return cmd_str |
| |
| def _stop_magma_on_allnodes(self): |
| logger.info("stop all magma nodes") |
| process_running_host, stopped_host = self._running_magma_nodes_list(self.nodes_host_list) |
| magma_cmd_str = self._stop_magma_cmd() |
| work_list = [] |
| q = Queue.Queue() |
| for host in process_running_host: |
| work_list.append({"func": remote_ssh, "args": (magma_cmd_str, host, self.user, q)}) |
| work_list.append({"func": check_progress, |
| "args": (q, len(process_running_host), 'stop', 0, True, "magma nodes")}) |
| node_stop = HawqCommands(name='Magma', action_name='stop', logger=logger) |
| node_stop.get_function_list(work_list) |
| node_stop.start() |
| if node_stop.return_flag != 0: |
| logger.error("Magma service stop failed") |
| sys.exit(1) |
| return None |
| |
| def stop_magma(self, host): |
| logger.info("Stop magma service on %s" % (host)) |
| magma_node_host, magma_node_running = check_magma_running(host, self.node_log_directory, |
| self.node_port, self.user, logger) |
| if not magma_node_running: |
| logger.warn('Magma node on %s is not running, skip' % (host)) |
| return 0 |
| |
| cmd_str = self._stop_magma_cmd() |
| logger.info("Stop magma service with command: %s" % cmd_str) |
| result = remote_ssh(cmd_str, host, self.user) |
| |
| check_return_code(result, logger, \ |
| "Magma service stop failed ", \ |
| "Magma service stop succesfully on %s" % (host)) |
| return None |
| |
| def _stop_magma_cmd(self): |
| cmd_str = "%s; %s/bin/magma_ctl --ctl=stop --stopmode=fast --logdir=%s >> %s 2>&1" % ( |
| source_hawq_env, self.GPHOME, self.node_log_directory, log_filename) |
| logger.info(cmd_str) |
| return cmd_str |
| |
| def check_magma_status(self): |
| recovery_total = self.check_magma_time |
| nodes_url = "" |
| url_num_max = 10 |
| for host in self.nodes_host_list: |
| if url_num_max == 10: |
| nodes_url = host + ":" + self.node_port |
| else: |
| nodes_url = nodes_url + "," + host + ":" + self.node_port |
| url_num_max = url_num_max - 1 |
| if url_num_max == 0: |
| break; |
| |
| while recovery_total > 0 : |
| recovery_total = recovery_total - 1 |
| ret = local_run("hawq_magma_tool -c %s >> %s 2>&1" % (nodes_url, log_filename)) |
| if ret == 0: |
| break; |
| else: |
| sys.stdout.write('.') |
| sys.stdout.flush() |
| time.sleep(1) |
| sys.stdout.write("\n") |
| if recovery_total <= 0: |
| logger.error("Magma service is unavailable after started") |
| sys.exit(1) |
| return None |
| |
| def _running_magma_nodes_list(self, host_list): |
| work_list = [] |
| running_host = [] |
| stopped_host = [] |
| seg_check_q = Queue.Queue() |
| |
| for host in host_list: |
| work_list.append({"func": check_magma_running, |
| "args": (host, os.path.expanduser(self.node_log_directory), self.node_port, self.user, logger)}) |
| |
| node_checks = threads_with_return(name='MAGMA', action_name='check', logger=logger, return_values=seg_check_q) |
| node_checks.get_function_list(work_list) |
| node_checks.start() |
| while not seg_check_q.empty(): |
| item = seg_check_q.get() |
| if item[1] == True: |
| running_host.append(item[0]) |
| else: |
| stopped_host.append(item[0]) |
| |
| return running_host, stopped_host |
| |
| |
| def check_progress(q, total_num, action, skip_num=0, quiet=False, service="segments"): |
| working_num = total_num |
| success_num = 0 |
| pnum = 0 |
| sys.stdout.write("\r") |
| while working_num > 0: |
| while not q.empty(): |
| msg = q.get() |
| if msg[0] == "done": |
| working_num = working_num - 1 |
| if msg[2] == 0: |
| success_num = success_num + 1 |
| if not quiet: |
| sys.stdout.write(".") |
| sys.stdout.flush() |
| time.sleep(1) |
| if not quiet: |
| sys.stdout.write("\n") |
| if skip_num != 0: |
| logger.info( |
| "%s %d of %d %s, %d %s %s skipped" % (action, success_num, total_num, service, skip_num, service, action)) |
| else: |
| logger.info("%s %d of %d %s" % (action, success_num, total_num, service)) |
| return 0 |
| |
| |
| def remote_ssh(cmd_str, host, user, q=None): |
| if user == "": |
| remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd_str) |
| else: |
| remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd_str) |
| try: |
| result = subprocess.Popen(remote_cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| stdout, stderr = result.communicate() |
| except subprocess.CalledProcessError: |
| print |
| "Execute shell command on %s failed" % host |
| pass |
| |
| if stdout and stdout != '': |
| logger.info(stdout.strip()) |
| if stderr and stderr != '': |
| logger.info(stderr.strip()) |
| if q: |
| q.put(("done", host, result.returncode)) |
| return result.returncode |
| |
| def local_run(cmd): |
| '''Execute shell command on local machine.''' |
| result = subprocess.Popen(cmd, shell=True).wait() |
| return result |
| |
| if __name__ == '__main__': |
| (opts, magma_dict) = get_args() |
| instance = MagmaControl(opts, magma_dict) |
| instance.run() |