blob: 8cb2cd87f0e50a11fd90125e826c2547d6fbff8b [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
try:
import os
import sys
import re
import logging, time
import subprocess
import threading
import Queue
import signal
import getpass
import socket
from optparse import OptionParser
from gppylib.gplog import setup_hawq_tool_logging, quiet_stdout_logging, enable_verbose_logging
from gppylib.commands.unix import getLocalHostname, getUserName
from gppylib.commands import gp
from gppylib.commands import unix
from hawqpylib.hawqlib import *
except ImportError, e:
sys.exit('ERROR: Cannot import modules. Please check that you '
'have sourced greenplum_path.sh. Detail: ' + str(e))
START_HELP = """
magma start|stop|restart <object>
The "objects" are:
cluster Start all magma nodes.
node Start local magma node
"""
DEFAULT_REPLICA = 3
MAGMA_RANGE_NUM = 2
MAGMA_DIR_CAPACITY = 3
# check magma time value in seconds, default 600s(10min)
DEFAULT_CHECK_MAGMA_TIME = 600
def create_parser():
parser = OptionParser(usage=START_HELP)
parser.add_option("--user",
dest="user",
default="",
help="Sets MAGMA user")
(options, args) = parser.parse_args()
if len(args) < 2:
print "Usage:"
print START_HELP
sys.exit(1)
return (options, args)
def get_args():
(opts, ARGS) = create_parser()
opts.magma_command = ARGS[0]
opts.node_type = ARGS[1]
if opts.node_type in ['cluster', 'node'] and opts.magma_command in ['start', 'stop', 'restart']:
global logger, log_filename
logger, log_filename = setup_hawq_tool_logging('magma_%s' % opts.magma_command, getLocalHostname(),
getUserName())
logger.info("Prepare to do 'magma %s'" % opts.magma_command)
logger.info("You can find log in:")
logger.info(log_filename)
else:
print "Usage:"
print START_HELP
sys.exit(1)
opts.GPHOME = os.getenv('GPHOME')
if not opts.GPHOME:
logger.error("Didn't get GPHOME value, exit")
sys.exit(1)
logger.info("GPHOME is set to:")
logger.info(opts.GPHOME)
global source_hawq_env
source_hawq_env = "source %s/greenplum_path.sh" % opts.GPHOME
if opts.user == "":
opts.user = getpass.getuser()
if opts.user == "root":
logger.error("'root' user is not allowed")
sys.exit(1)
logger.debug("Current user is '%s'" % opts.user)
logger.debug("Parsing config file:")
logger.debug("%s/etc/magma-site.xml" % opts.GPHOME)
magmasite = HawqXMLParser(opts.GPHOME, 'magma-site.xml')
magmasite.get_all_values()
return opts, magmasite.hawq_dict
class MagmaControl:
def __init__(self, opts, magma_dict):
self.node_type = opts.node_type
self.magma_command = opts.magma_command
self.user = opts.user
self.GPHOME = opts.GPHOME
self.magma_dict = magma_dict
self._get_config()
def run(self):
if self.magma_command == "start":
if self.node_type == "cluster":
self._start_magma_on_allnodes()
self.check_magma_status()
elif self.node_type == "node":
self.start_magma(getLocalHostname())
else:
sys.exit(1)
elif self.magma_command == "stop":
if self.node_type == "cluster":
self._stop_magma_on_allnodes()
elif self.node_type == "node":
self.stop_magma(getLocalHostname())
else:
sys.exit(1)
elif self.magma_command == "restart":
if self.node_type == "cluster":
self._stop_magma_on_allnodes()
self._start_magma_on_allnodes()
self.check_magma_status()
elif self.node_type == "node":
self.stop_magma(getLocalHostname())
self.start_magma(getLocalHostname())
else:
sys.exit(1)
logger.info("Magma service %s successfully" % self.magma_command)
return None
def _get_config(self):
check_items = ('nodes_file', 'node_data_directory', 'node_log_directory',
'node_address_port')
for item in check_items:
if item in self.magma_dict:
logger.info("Check: %s is set" % item)
else:
sys.exit("Check: %s not configured in magma-site.xml" % item)
nodes_file = self.GPHOME + "/etc/%s" % magma_dict['nodes_file']
if not os.path.exists(nodes_file):
logger.error("Cannot find nodes_file files %s, exit." % nodes_file)
sys.exit(1)
self.nodes_host_list = parse_hosts_file(opts.GPHOME, magma_dict['nodes_file'])
self.nodes_count = len(self.nodes_host_list)
self.node_data_directory = self.magma_dict['node_data_directory']
self.node_log_directory = self.magma_dict['node_log_directory']
self.node_port = self.magma_dict['node_address_port']
if 'magma_range_number' in self.magma_dict:
self.magma_range_number = self.magma_dict['magma_range_number']
else:
self.magma_range_number = MAGMA_RANGE_NUM
if 'magma_replica_number' in self.magma_dict:
self.magma_replica_number = self.magma_dict['magma_replica_number']
else:
self.magma_replica_number = DEFAULT_REPLICA
if 'magma_datadir_capacity' in self.magma_dict:
self.magma_datadir_capacity = self.magma_dict['magma_datadir_capacity']
else:
self.magma_datadir_capacity = MAGMA_DIR_CAPACITY
self.check_magma_time = DEFAULT_CHECK_MAGMA_TIME
def first_seg_host(self):
nodes_file = self.GPHOME + "/etc/%s" % magma_dict['nodes_file']
first_seg = ""
with open(nodes_file, 'r') as f:
for line in f:
first_seg = line.split("#", 1)[0].strip()
break
return first_seg
def start_magma(self, host):
logger.info("Start magma service at %s" % (host))
first_seg_host = self.first_seg_host()
if host == first_seg_host:
join_address = None
else:
join_address = first_seg_host + ":" + self.node_port
listen_address = host + ":" + self.node_port
datadirs = self.node_data_directory
log_dir = self.node_log_directory
range_number = self.magma_range_number
replica_number = self.magma_replica_number
datadir_capacity = self.magma_datadir_capacity
cmd_str = self._start_magma_cmd(join_address, listen_address, datadirs, log_dir, replica_number,
range_number, datadir_capacity)
result = remote_ssh(cmd_str, host, self.user)
check_return_code(result, logger, \
"Magma service start failed", \
"Magma service start succesfully on %s" % (host))
return None
def _start_magma_on_allnodes(self):
logger.info("start magma service on all nodes")
# Start magma on segments
q = Queue.Queue()
work_list = []
first_seg_host = self.first_seg_host()
for host in self.nodes_host_list:
if host == first_seg_host:
join_address = None
else:
join_address = first_seg_host + ":" + self.node_port
listen_url = host + ":" + self.node_port
magma_cmd_str = self._start_magma_cmd(join_address, listen_url, \
self.node_data_directory, \
self.node_log_directory, self.magma_replica_number, \
self.magma_range_number, self.magma_datadir_capacity)
logger.info("Start magma node on host: %s" % host)
work_list.append({"func": remote_ssh, "args": (magma_cmd_str, host, self.user, q)})
work_list.append({"func": check_progress,
"args": (q, self.nodes_count, 'start', 0, True, "magma nodes")})
node_init = HawqCommands(name='Magma', action_name='start', logger=logger)
node_init.get_function_list(work_list)
node_init.start()
logger.debug("Total magma threads return value is : %d" % node_init.return_flag)
if node_init.return_flag != 0:
logger.error("Magma service start failed")
sys.exit(1)
return None
def _start_magma_cmd(self, join_address, listen_address, datadirs, log_dir, replica, range_number,
datadir_capacity):
if join_address is None:
cmd_str = "%s; %s/bin/magma_ctl --ctl=start --listen=%s " \
"--datadirs=%s --logdir=%s --replica=%s --range=%s --datadir_capacity=%s >> %s 2>&1" % \
(source_hawq_env, self.GPHOME, \
listen_address, datadirs, \
log_dir, replica, \
range_number, datadir_capacity, log_filename)
else:
cmd_str = "%s; %s/bin/magma_ctl --ctl=start --join=%s --listen=%s" \
" --datadirs=%s --logdir=%s --replica=%s --range=%s --datadir_capacity=%s >> %s 2>&1" % \
(source_hawq_env, self.GPHOME, \
join_address, \
listen_address, datadirs, \
log_dir, replica, \
range_number, datadir_capacity, log_filename)
logger.info(cmd_str)
return cmd_str
def _stop_magma_on_allnodes(self):
logger.info("stop all magma nodes")
process_running_host, stopped_host = self._running_magma_nodes_list(self.nodes_host_list)
magma_cmd_str = self._stop_magma_cmd()
work_list = []
q = Queue.Queue()
for host in process_running_host:
work_list.append({"func": remote_ssh, "args": (magma_cmd_str, host, self.user, q)})
work_list.append({"func": check_progress,
"args": (q, len(process_running_host), 'stop', 0, True, "magma nodes")})
node_stop = HawqCommands(name='Magma', action_name='stop', logger=logger)
node_stop.get_function_list(work_list)
node_stop.start()
if node_stop.return_flag != 0:
logger.error("Magma service stop failed")
sys.exit(1)
return None
def stop_magma(self, host):
logger.info("Stop magma service on %s" % (host))
magma_node_host, magma_node_running = check_magma_running(host, self.node_log_directory,
self.node_port, self.user, logger)
if not magma_node_running:
logger.warn('Magma node on %s is not running, skip' % (host))
return 0
cmd_str = self._stop_magma_cmd()
logger.info("Stop magma service with command: %s" % cmd_str)
result = remote_ssh(cmd_str, host, self.user)
check_return_code(result, logger, \
"Magma service stop failed ", \
"Magma service stop succesfully on %s" % (host))
return None
def _stop_magma_cmd(self):
cmd_str = "%s; %s/bin/magma_ctl --ctl=stop --stopmode=fast --logdir=%s >> %s 2>&1" % (
source_hawq_env, self.GPHOME, self.node_log_directory, log_filename)
logger.info(cmd_str)
return cmd_str
def check_magma_status(self):
recovery_total = self.check_magma_time
nodes_url = ""
url_num_max = 10
for host in self.nodes_host_list:
if url_num_max == 10:
nodes_url = host + ":" + self.node_port
else:
nodes_url = nodes_url + "," + host + ":" + self.node_port
url_num_max = url_num_max - 1
if url_num_max == 0:
break;
while recovery_total > 0 :
recovery_total = recovery_total - 1
ret = local_run("hawq_magma_tool -c %s >> %s 2>&1" % (nodes_url, log_filename))
if ret == 0:
break;
else:
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("\n")
if recovery_total <= 0:
logger.error("Magma service is unavailable after started")
sys.exit(1)
return None
def _running_magma_nodes_list(self, host_list):
work_list = []
running_host = []
stopped_host = []
seg_check_q = Queue.Queue()
for host in host_list:
work_list.append({"func": check_magma_running,
"args": (host, os.path.expanduser(self.node_log_directory), self.node_port, self.user, logger)})
node_checks = threads_with_return(name='MAGMA', action_name='check', logger=logger, return_values=seg_check_q)
node_checks.get_function_list(work_list)
node_checks.start()
while not seg_check_q.empty():
item = seg_check_q.get()
if item[1] == True:
running_host.append(item[0])
else:
stopped_host.append(item[0])
return running_host, stopped_host
def check_progress(q, total_num, action, skip_num=0, quiet=False, service="segments"):
working_num = total_num
success_num = 0
pnum = 0
sys.stdout.write("\r")
while working_num > 0:
while not q.empty():
msg = q.get()
if msg[0] == "done":
working_num = working_num - 1
if msg[2] == 0:
success_num = success_num + 1
if not quiet:
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
if not quiet:
sys.stdout.write("\n")
if skip_num != 0:
logger.info(
"%s %d of %d %s, %d %s %s skipped" % (action, success_num, total_num, service, skip_num, service, action))
else:
logger.info("%s %d of %d %s" % (action, success_num, total_num, service))
return 0
def remote_ssh(cmd_str, host, user, q=None):
if user == "":
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd_str)
else:
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd_str)
try:
result = subprocess.Popen(remote_cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = result.communicate()
except subprocess.CalledProcessError:
print
"Execute shell command on %s failed" % host
pass
if stdout and stdout != '':
logger.info(stdout.strip())
if stderr and stderr != '':
logger.info(stderr.strip())
if q:
q.put(("done", host, result.returncode))
return result.returncode
def local_run(cmd):
'''Execute shell command on local machine.'''
result = subprocess.Popen(cmd, shell=True).wait()
return result
if __name__ == '__main__':
(opts, magma_dict) = get_args()
instance = MagmaControl(opts, magma_dict)
instance.run()