blob: 81421f3fcf1167a580a502c29e88db653c0c4c02 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
try:
import os
import sys
import re
import math
import logging, time
import subprocess
import threading
import Queue
import signal
import getpass
import socket
from optparse import OptionParser
from gppylib.gplog import setup_hawq_tool_logging, quiet_stdout_logging, enable_verbose_logging
from gppylib.commands.unix import getLocalHostname, getUserName
from gppylib.commands import gp
from gppylib.commands.gp import SEGMENT_TIMEOUT_DEFAULT
from gppylib import userinput
from gppylib.db import catalog
from gppylib.commands import unix
from hawqpylib.hawqlib import *
from hawqpylib.HAWQ_HELP import *
from gppylib.db import dbconn
from pygresql.pg import DatabaseError
except ImportError, e:
sys.exit('ERROR: Cannot import modules. Please check that you '
'have sourced greenplum_path.sh. Detail: ' + str(e))
class HawqInit:
def __init__(self, opts, hawq_dict):
self.node_type = opts.node_type
self.hawq_command = opts.hawq_command
self.user= opts.user
self.GPHOME = opts.GPHOME
self.stop_mode = opts.stop_mode
self.log_dir = opts.log_dir
self.timeout = opts.timeout_seconds
self.no_update = opts.no_update
self.remove_standby = opts.remove_standby
self.new_standby_hostname = opts.new_standby_hostname
self.hawq_dict = hawq_dict
self.locale = opts.hawq_locale
self.quiet = opts.quiet_run
self.hawq_lc_collate= opts.hawq_lc_collate
self.hawq_lc_ctype = opts.hawq_lc_ctype
self.hawq_lc_messages = opts.hawq_lc_messages
self.hawq_lc_monetary = opts.hawq_lc_monetary
self.hawq_lc_numeric = opts.hawq_lc_numeric
self.hawq_lc_time = opts.hawq_lc_time
self.max_connections = opts.max_connections
self.shared_buffers = opts.shared_buffers
self.default_hash_table_bucket_number = opts.default_hash_table_bucket_number
self.tde_keyname= opts.tde_keyname
self.lock = threading.Lock()
self.ignore_bad_hosts = opts.ignore_bad_hosts
self.with_magma = opts.with_magma
self._get_config()
self._write_config()
self._get_ips()
def _get_config(self):
check_items = ('hawq_master_address_host', 'hawq_master_address_port',
'hawq_master_directory', 'hawq_segment_directory',
'hawq_segment_address_port',
'hawq_master_temp_directory', 'hawq_segment_temp_directory')
if self.node_type in ['master', 'segment', 'standby']:
for item in check_items:
if item in self.hawq_dict:
logger.info("Check: %s is set" % item)
else:
sys.exit("Check: %s not configured in hawq-site.xml" % item)
self.master_host_name = self.hawq_dict['hawq_master_address_host']
self.master_port = self.hawq_dict['hawq_master_address_port']
self.master_data_directory = self.hawq_dict['hawq_master_directory']
self.master_address = self.master_host_name + ":" + self.master_port
self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
self.segment_port = self.hawq_dict['hawq_segment_address_port']
self.hawq_master_temp_directory = self.hawq_dict['hawq_master_temp_directory']
self.hawq_segment_temp_directory = self.hawq_dict['hawq_segment_temp_directory']
self.host_list = parse_hosts_file(self.GPHOME)
self.hosts_count_number = len(self.host_list)
if 'hawq_standby_address_host' in self.hawq_dict:
self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
self.standby_port = self.master_port
self.standby_address = self.standby_host_name + ":" + self.standby_port
if self.standby_host_name in (self.master_host_name, 'localhost', '127.0.0.1'):
logger.error("Standby host should not be the same as master host")
sys.exit(1)
else:
self.standby_host_name = ''
if not self.default_hash_table_bucket_number and 'default_hash_table_bucket_number' in self.hawq_dict:
self.default_hash_table_bucket_number = self.hawq_dict['default_hash_table_bucket_number']
if self.new_standby_hostname != 'none':
self.standby_host_name = self.new_standby_hostname
if self.standby_host_name.lower() in ('', 'none'):
logger.info("No standby host configured, skip it")
if 'enable_secure_filesystem' in self.hawq_dict:
self.enable_secure_filesystem = self.hawq_dict['enable_secure_filesystem']
else:
self.enable_secure_filesystem = 'off'
if 'hawq_init_with_hdfs' in self.hawq_dict:
self.hawq_init_with_hdfs = self.hawq_dict['hawq_init_with_hdfs']
else:
self.hawq_init_with_hdfs = 'on'
if self.hawq_init_with_hdfs == 'on' or self.hawq_init_with_hdfs == 'true':
if 'hawq_dfs_url' in self.hawq_dict:
self.dfs_url = self.hawq_dict['hawq_dfs_url']
logger.info("Check: hawq_dfs_url is set")
else:
sys.exit("Check: hawq_dfs_url not configured in hawq-site.xml")
if 'krb_server_keyfile' in self.hawq_dict:
self.krb_server_keyfile = self.hawq_dict['krb_server_keyfile']
else:
self.krb_server_keyfile = ''
if 'krb_srvname' in self.hawq_dict:
self.krb_srvname = self.hawq_dict['krb_srvname']
else:
self.krb_srvname = 'postgres'
if 'enable_master_auto_ha' in self.hawq_dict:
self.enable_master_auto_ha = self.hawq_dict['enable_master_auto_ha']
else:
self.enable_master_auto_ha = 'off'
if 'upgrade_mode' in self.hawq_dict:
self.upgrade_mode = self.hawq_dict['upgrade_mode']
def _write_config(self):
configFile = "%s/etc/_mgmt_config" % self.GPHOME
# Clean configFile while the first write in.
with open(configFile, 'w') as f:
f.write("GPHOME=%s\n" % self.GPHOME)
with open(configFile, 'a') as f:
f.write("hawqUser=%s\n" % self.user)
f.write("master_host_name=%s\n" % self.master_host_name)
f.write("master_port=%s\n" % self.master_port)
f.write("master_data_directory=%s\n" % self.master_data_directory)
f.write("segment_data_directory=%s\n" % self.segment_data_directory)
f.write("segment_port=%s\n" % self.segment_port)
f.write("hawq_master_temp_directory=%s\n" % self.hawq_master_temp_directory)
f.write("hawq_segment_temp_directory=%s\n" % self.hawq_segment_temp_directory)
f.write("locale=%s\n" % self.locale)
f.write("hawq_lc_collate=%s\n" % self.hawq_lc_collate)
f.write("hawq_lc_ctype=%s\n" % self.hawq_lc_ctype)
f.write("hawq_lc_messages=%s\n" % self.hawq_lc_messages)
f.write("hawq_lc_monetary=%s\n" % self.hawq_lc_monetary)
f.write("hawq_lc_numeric=%s\n" % self.hawq_lc_numeric)
f.write("hawq_lc_time=%s\n" % self.hawq_lc_time)
f.write("max_connections=%s\n" % self.max_connections)
f.write("shared_buffers=%s\n" % self.shared_buffers)
if self.hawq_init_with_hdfs == 'on' or self.hawq_init_with_hdfs == 'true':
f.write("dfs_url=%s\n" % self.dfs_url)
f.write("log_filename=%s\n" % log_filename)
f.write("log_dir=%s\n" % self.log_dir)
if self.standby_host_name.lower() not in ('', 'none'):
f.write("standby_host_name=%s\n" % self.standby_host_name)
def _get_ips(self):
cmd = "%s/bin/lib/get_ip_addresses_of_host.sh %s master_ip_address_all" % (self.GPHOME, self.master_host_name)
local_ssh(cmd, logger)
if self.standby_host_name.lower() not in ('', 'none') and not self.remove_standby:
cmd = "%s/bin/lib/get_ip_addresses_of_host.sh %s standby_ip_address_all" % (self.GPHOME, self.standby_host_name)
local_ssh(cmd, logger)
def check_hdfs_path(self):
if self.tde_keyname:
cmd = "%s/bin/gpcheckhdfs hdfs %s %s %s %s --with-tde %s" % \
(self.GPHOME, self.dfs_url, self.enable_secure_filesystem, \
self.krb_srvname, self.krb_server_keyfile, self.tde_keyname)
else:
cmd = "%s/bin/gpcheckhdfs hdfs %s %s %s %s" % \
(self.GPHOME, self.dfs_url, self.enable_secure_filesystem, \
self.krb_srvname, self.krb_server_keyfile)
logger.info("Check if hdfs path is available")
logger.debug("Check hdfs: %s" % cmd)
check_return_code(local_ssh(cmd, logger, warning = True), logger, "Check hdfs failed, please verify your hdfs settings")
if self.tde_keyname:
logger.info("Create encryption zone successfully, key_name:%s" % (self.tde_keyname))
def set_init_with_magma(self):
logger.info("Set hawq_init_with_magma as: true")
ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
cmd = "hawq config -c hawq_init_with_magma -v true --skipvalidation -q %s > /dev/null" % ignore_bad_hosts
result = local_ssh(cmd, logger)
if result != 0:
logger.error("Set hawq_init_with_magma failed")
return result
def set_new_standby_host(self):
if self.new_standby_hostname == self.master_host_name:
logger.error("Standby host name can't be the same as master host name")
sys.exit(1)
cmd = "%s; hawq config -c hawq_standby_address_host -v %s --skipvalidation -q > /dev/null" % \
(source_hawq_env, self.new_standby_hostname)
result = local_ssh(cmd, logger)
if result != 0:
logger.warn("Set standby host name failed")
return result
def sync_hdfs_client(self):
sync_host_str = ""
for node in self.host_list:
sync_host_str += " -h %s" % node
if 'hawq_standby_address_host' in self.hawq_dict and self.standby_host_name.lower() not in ('', 'none'):
sync_host_str += " -h %s" % self.standby_host_name
result = local_ssh("hawq scp %s %s/etc/hdfs-client.xml =:%s/etc/" % (sync_host_str, self.GPHOME, self.GPHOME))
if result != 0:
logger.error("Sync hdfs-client.xml failed.")
sys.exit(1)
def set_replace_datanode_on_failure(self):
xml_file = "%s/etc/hdfs-client.xml" % self.GPHOME
property_name = 'output.replace-datanode-on-failure'
pexist, pname, pvalue = check_property_exist_xml(xml_file, property_name)
datanodes_threshold = 4
if self.hosts_count_number < datanodes_threshold:
property_value = 'false'
else:
property_value = 'true'
if pvalue != property_value:
logger.info("Set output.replace-datanode-on-failure to %s" % property_value)
if pexist:
logger.debug("Update output.replace-datanode-on-failure to %s" % property_value)
update_xml_property(xml_file, property_name, property_value)
else:
logger.debug("Add output.replace-datanode-on-failure as %s" % property_value)
update_xml_property(xml_file, property_name, property_value)
self.sync_hdfs_client()
# Check hdfs-client.xml again to validate changes
pexist, pname, pvalue = check_property_exist_xml(xml_file, property_name)
if pvalue == property_value:
result = 0
else:
result = 1
else:
result = 0
return result
def set_default_hash_table_bucket_number(self):
if not self.default_hash_table_bucket_number:
if 'hawq_rm_nvseg_perquery_limit' in self.hawq_dict:
hawq_rm_nvseg_perquery_limit = self.hawq_dict['hawq_rm_nvseg_perquery_limit']
else:
hawq_rm_nvseg_perquery_limit = 512
factor_min = 1
factor_max = 8
limit = int(hawq_rm_nvseg_perquery_limit)
if int(self.hosts_count_number) == 0:
segments_num = 1
else:
segments_num = int(self.hosts_count_number)
factor = limit / segments_num
# if too many segments or default limit is too low --> stick with the limit
if factor < factor_min:
buckets = limit
# if the limit is large and results in factor > max --> limit factor to max
elif factor > factor_max:
buckets = factor_max * segments_num
else:
buckets = factor * segments_num
self.default_hash_table_bucket_number = buckets
logger.info("Set default_hash_table_bucket_number as: %s" % self.default_hash_table_bucket_number)
ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
cmd = "hawq config -c default_hash_table_bucket_number -v %s --skipvalidation -q %s > /dev/null" % \
(self.default_hash_table_bucket_number, ignore_bad_hosts)
result = local_ssh(cmd, logger)
if result != 0:
logger.error("Set default_hash_table_bucket_number failed")
return result
def set_main_disp_connections_per_thread(self):
if 'main_disp_connections_per_thread' in self.hawq_dict:
logger.info("main_disp_connections_per_thread is %s" % self.hawq_dict['main_disp_connections_per_thread'])
return 0
if int(self.hosts_count_number) == 0:
segments_num = 1
else:
segments_num = int(self.hosts_count_number)
main_disp_connections_per_thread = max(2, math.ceil(segments_num / 10))
logger.info("Set main_disp_connections_per_thread as: %s" % main_disp_connections_per_thread)
ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
cmd = "hawq config -c main_disp_connections_per_thread -v %s --skipvalidation -q %s > /dev/null" % \
(main_disp_connections_per_thread, ignore_bad_hosts)
result = local_ssh(cmd, logger)
if result != 0:
logger.error("Set main_disp_connections_per_thread failed")
return result
def _get_master_init_cmd(self):
cmd = "%s/bin/lib/hawqinit.sh master '%s' '%s'" % \
(self.GPHOME, self.GPHOME, self.hawq_init_with_hdfs)
return cmd
def _get_standby_init_cmd(self):
cmd = "%s/bin/lib/hawqinit.sh standby '%s'" % \
(self.GPHOME, self.GPHOME)
return cmd
def hawq_remove_standby(self):
"""Removes the standby master"""
running_standby_host = ''
try:
dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
conn = dbconn.connect(dburl, True)
query = "select role, hostname from gp_segment_configuration where role = 's';"
rows = dbconn.execSQL(conn, query)
conn.close()
except DatabaseError, ex:
logger.error("Failed to connect to database, this script can only be run when the database is up")
sys.exit(1)
for row in rows:
if row[0] == 's':
running_standby_host = row[1]
if running_standby_host:
logger.info("running standby host is %s" % running_standby_host)
signal.signal(signal.SIGINT,signal.SIG_IGN)
logger.info("Stop HAWQ cluster")
cmd = "%s; hawq stop master -a -M fast -q" % source_hawq_env
check_return_code(local_ssh(cmd, logger), logger, "Stop HAWQ master failed, exit")
ignore_bad_hosts = '--ignore-bad-hosts' if self.ignore_bad_hosts else ''
cmd = "%s; hawq stop allsegments -a -q %s" % (source_hawq_env, ignore_bad_hosts)
check_return_code(local_ssh(cmd, logger), logger, "Stop HAWQ segments failed, exit")
logger.info("Start HAWQ master")
cmd = "%s; hawq start master -m -q" % source_hawq_env
check_return_code(local_ssh(cmd, logger), logger, "Start HAWQ master failed, exit")
try:
logger.info('Remove standby from Database catalog.')
#dburl = dbconn.DbURL(port=self.master_port, dbname='template1')
#conn = dbconn.connect(dburl, utility=True, verbose=True)
#query = "select gp_remove_master_standby();"
#rows = dbconn.execSQL(conn, query)
#for row in rows:
# print row
#conn.close()
cmd = 'env PGOPTIONS="-c gp_session_role=utility" %s/bin/psql -p %s -d template1 -o /dev/null -c \
"select gp_remove_master_standby();"' % (self.GPHOME, self.master_port)
check_return_code(local_ssh(cmd, logger), logger, \
"Update catalog failed, exit", "Catalog updated successfully.")
logger.info("Stop HAWQ master")
cmd = "%s; hawq stop master -a -M fast" % source_hawq_env
check_return_code(local_ssh(cmd, logger), logger, "Stop hawq master failed, exit")
except DatabaseError, ex:
logger.error("Failed to connect to database, this script can only be run when the database is up")
cmd = "%s; hawq stop master -a -M fast" % source_hawq_env
check_return_code(local_ssh(cmd, logger), logger, "Stop hawq master failed, exit")
remove_property_xml("hawq_standby_address_host", "%s/etc/hawq-site.xml" % self.GPHOME, self.quiet)
host_list = parse_hosts_file(self.GPHOME)
sync_hawq_site(self.GPHOME, host_list)
if is_node_alive(self.standby_host_name, self.user, logger):
logger.info("Check if gpsyncmaster running on %s" % running_standby_host)
gpsyncmaster_pid = gp.getSyncmasterPID(running_standby_host, self.master_data_directory)
if gpsyncmaster_pid > 0:
# stop it
logger.info('Stopping gpsyncmaster on %s' % running_standby_host)
gp.SegmentStop.remote('stop gpsyncmaster',
running_standby_host,
self.master_data_directory)
else:
logger.warn('Not able to connect to Standby master, skip node clean')
signal.signal(signal.SIGINT,signal.default_int_handler)
logger.info('Remove standby master finished')
else:
logger.info("Do not find a running standby master")
def _check_master_recovery(self):
cmd = "%s; %s/bin/pg_controldata %s |grep 'Database cluster state';" % \
(source_hawq_env, self.GPHOME, self.master_data_directory)
result, stdout, stderr = remote_ssh_output(cmd, self.master_host_name, '')
if stdout.find("recovery") != -1:
logger.info('Master is doing recovery start, abort init standby')
return 1
return 0
def _init_standby(self):
logger.info("Start to init standby master: '%s'" % self.standby_host_name)
logger.info("This might take a couple of minutes, please wait...")
check_return_code(self._check_master_recovery())
# Sync config files from master.
scpcmd = "scp %s/etc/_mgmt_config %s:%s/etc/_mgmt_config > /dev/null" % \
(self.GPHOME, self.standby_host_name, self.GPHOME)
check_return_code(local_ssh(scpcmd, logger, warning = True), \
logger, "Sync _mgmt_config failed")
scpcmd = "scp %s/etc/hawq-site.xml %s:%s/etc/hawq-site.xml > /dev/null" % \
(self.GPHOME, self.standby_host_name, self.GPHOME)
check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
logger, "Sync hawq-site.xml failed")
scpcmd = "scp %s/etc/slaves %s:%s/etc/slaves > /dev/null" % \
(self.GPHOME, self.standby_host_name, self.GPHOME)
check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
logger, "Sync slaves file failed")
# Sync rps configuration
if os.path.exists("%s/ranger/" % (self.GPHOME)):
scpcmd = "scp %s/ranger/etc/* %s:%s/ranger/etc/ > /dev/null" % \
(self.GPHOME, self.standby_host_name, self.GPHOME)
check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
logger, "Sync rps configuration files failed")
standby_init_cmd = self._get_standby_init_cmd()
return check_return_code(remote_ssh_nowait(standby_init_cmd, self.standby_host_name, self.user))
def _resync_standby(self):
logger.info("Re-sync standby")
cmd = "%s; hawq stop master -a -M %s;" % (source_hawq_env, self.stop_mode)
check_return_code(local_ssh(cmd, logger), logger, "Stop hawq cluster failed, exit")
cmd = "cd %s; %s; %s/bin/lib/pysync.py -x gpperfmon/data -x pg_log -x db_dumps %s %s:%s" % \
(self.master_data_directory, source_hawq_env, self.GPHOME, self.master_data_directory,
self.standby_host_name, self.master_data_directory)
result = local_ssh(cmd, logger)
check_return_code(result, logger, "Re-sync standby master failed, exit")
cmd = "%s; hawq start master -a" % source_hawq_env
result = local_ssh(cmd, logger)
check_return_code(result, logger, "Start hawq cluster failed")
return result
def _get_segment_init_cmd(self):
cmd = "%s/bin/lib/hawqinit.sh segment '%s'" % \
(self.GPHOME, self.GPHOME)
return cmd
def _init_cluster(self):
logger.info("%s segment hosts defined" % self.hosts_count_number)
check_return_code(self.set_default_hash_table_bucket_number())
check_return_code(self.set_main_disp_connections_per_thread())
check_return_code(self.set_replace_datanode_on_failure())
if self.with_magma:
check_return_code(self.set_init_with_magma())
master_cmd = self._get_master_init_cmd()
logger.info("Start to init master node: '%s'" % self.master_host_name)
check_return_code(local_ssh(master_cmd, logger, warning = True), logger, \
"Master init failed, exit", \
"Master init successfully")
self.start_heartbeat_and_monitor(self.master_host_name, "master")
if self.standby_host_name.lower() not in ('', 'none'):
check_return_code(self._init_standby(), logger, \
"Init standby failed, exit", \
"Init standby successfully")
check_return_code(self._init_all_segments(), logger, \
"Segments init failed, exit", \
"Segments init successfully on nodes '%s'" % self.host_list)
logger.info("Init HAWQ cluster successfully")
def start_heartbeat_and_monitor(self,host,role):
if self.standby_host_name in ('', 'none') or self.enable_master_auto_ha == 'off':
return 0
if role == "master":
logger.info("Start heartbeat sender")
cmd_str = "%s; nohup %s/bin/autoswitch.sh start heartbeat_sender %s/heartbeat_log" % (source_hawq_env, self.GPHOME, os.path.expanduser("~")+"/hawqAdminLogs")
elif role == "standby":
logger.info("Start heartbeat monitor")
cmd_str = "%s; nohup %s/bin/autoswitch.sh start heartbeat_monitor %s/heartbeat_log" % (source_hawq_env, self.GPHOME, os.path.expanduser("~")+"/hawqAdminLogs")
result = remote_ssh(cmd_str, host,self.user)
return result
def _init_all_segments(self):
segment_cmd_str = self._get_segment_init_cmd()
# Execute segment init command on each segment nodes.
logger.info("Init segments in list: %s" % self.host_list)
bad_hosts = []
working_hosts = self.host_list
if self.ignore_bad_hosts:
working_hosts, bad_hosts = exclude_bad_hosts(self.host_list)
if len(bad_hosts) == len(self.host_list):
logger.error("Unable to SSH on any of the hosts, skipping segment start operation")
return
if len(bad_hosts) > 0:
logger.warning("Skipping starting segments in the list {0}, SSH test failed".format(bad_hosts))
self.hosts_count_number -= len(bad_hosts)
work_list = []
q = Queue.Queue()
for host in self.host_list:
logger.debug("Start to init segment on node '%s'" % host)
scpcmd = "scp %s/etc/_mgmt_config %s:%s/etc/_mgmt_config > /dev/null" % (self.GPHOME, host, self.GPHOME)
local_ssh(scpcmd, logger)
work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
logger.info("Total segment number is: %s" % len(self.host_list))
work_list.append({"func":check_progress,"args":(q, self.hosts_count_number, 'init', 0, self.quiet)})
node_init = HawqCommands(name='HAWQ', action_name = 'init', logger = logger)
node_init.get_function_list(work_list)
node_init.start()
return node_init.return_flag
def run(self):
if self.new_standby_hostname != 'none':
check_return_code(self.set_new_standby_host())
if self.node_type == "master":
if self.hawq_init_with_hdfs == 'ture' or self.hawq_init_with_hdfs == 'on' :
self.check_hdfs_path()
check_return_code(self.set_new_standby_host())
logger.info("%s segment hosts defined" % self.hosts_count_number)
check_return_code(self.set_default_hash_table_bucket_number())
check_return_code(self.set_main_disp_connections_per_thread())
check_return_code(self.set_replace_datanode_on_failure())
logger.info("Start to init master")
cmd = self._get_master_init_cmd()
check_return_code(local_ssh(cmd, logger, warning = True), logger, \
"Master init failed, exit", "Master init successfully")
elif self.node_type == "standby" and self.remove_standby is True:
logger.info("Try to remove standby master")
self.hawq_remove_standby()
elif self.node_type == "standby":
if self.standby_host_name.lower() in ('', 'none'):
logger.info("No standby host found")
logger.info("Please check your standby host name")
sys.exit(1)
if self.no_update:
check_return_code(self._resync_standby(), logger, \
"Standby master re-sync failed, exit", \
"Standby master re-sync successfully")
else:
check_return_code(self._init_standby(), logger, \
"Init standby failed, exit", \
"Init standby successfully")
elif self.node_type == "segment":
cmd = self._get_segment_init_cmd()
check_return_code(local_ssh(cmd, logger, warning = True), logger, \
"Segment init failed, exit", \
"Segment init successfully")
elif self.node_type == "cluster":
if self.hawq_init_with_hdfs == 'true' or self.hawq_init_with_hdfs == 'on':
self.check_hdfs_path()
self._init_cluster()
else:
sys.exit('hawq init object should be one of master/standby/segment/cluster')
return None
class HawqStart:
def __init__(self, opts, hawq_dict):
self.node_type = opts.node_type
self.hawq_command = opts.hawq_command
self.user= opts.user
self.GPHOME = opts.GPHOME
self.quiet = opts.quiet_run
self.stop_mode = opts.stop_mode
self.log_dir = opts.log_dir
self.timeout = opts.timeout_seconds
self.hawq_dict = hawq_dict
self.lock = threading.Lock()
self.max_connections = opts.max_connections
self.masteronly = opts.masteronly
self.special_mode = opts.special_mode
self.restrict = opts.restrict
self.ignore_bad_hosts = opts.ignore_bad_hosts
self._get_config()
self.hawq_acl_type = self.check_hawq_acl_type()
def _get_config(self):
logger.info("Gathering information and validating the environment...")
check_items = ('hawq_master_address_host', 'hawq_master_address_port',
'hawq_master_directory', 'hawq_segment_directory',
'hawq_segment_address_port', 'hawq_dfs_url',
'hawq_master_temp_directory', 'hawq_segment_temp_directory')
for item in check_items:
if item not in self.hawq_dict:
logger.error("Check: %s not configured in hawq-site.xml" % item)
sys.exit()
self.master_host_name = self.hawq_dict['hawq_master_address_host']
self.master_port = self.hawq_dict['hawq_master_address_port']
self.master_data_directory = self.hawq_dict['hawq_master_directory']
self.master_address = self.master_host_name + ":" + self.master_port
self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
self.segment_port = self.hawq_dict['hawq_segment_address_port']
self.dfs_url = self.hawq_dict['hawq_dfs_url']
self.host_list = parse_hosts_file(self.GPHOME)
self.hosts_count_number = len(self.host_list)
if 'hawq_standby_address_host' in self.hawq_dict:
self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
self.standby_port = self.master_port
self.standby_address = self.standby_host_name + ":" + self.standby_port
else:
logger.info("No standby host configured")
self.standby_host_name = ''
if 'enable_master_auto_ha' in self.hawq_dict:
self.enable_master_auto_ha = self.hawq_dict['enable_master_auto_ha']
else:
self.enable_master_auto_ha = 'off'
if self.enable_master_auto_ha == 'on' and 'ha_zookeeper_quorum' not in self.hawq_dict:
logger.error("ha_zookeeper_quorum not found in hawq-site.xml, but enable_master_auto_ha is on")
sys.exit(1)
if 'upgrade_mode' in self.hawq_dict:
self.upgrade_mode = self.hawq_dict['upgrade_mode']
def check_hawq_acl_type(self):
if 'hawq_acl_type' in self.hawq_dict and self.hawq_dict['hawq_acl_type'].lower() == 'ranger':
return "ranger"
return "standalone"
def _check_recovery_start(self):
cmd = "%s; %s/bin/pg_controldata %s |grep 'Database cluster state';" % (source_hawq_env, self.GPHOME, self.master_data_directory)
result, stdout, stderr = remote_ssh_output(cmd, self.master_host_name, '')
if stdout.find("recovery") != -1:
hawq_recovery_state = 'recoverying'
elif stdout.find("starting up") != -1:
hawq_recovery_state = 'starting_up'
elif stdout.find("in production") != -1:
hawq_recovery_state = 'recovery_success'
else:
hawq_recovery_state = 'recovery_failed'
return hawq_recovery_state
def _start_master_cmd(self):
logger.info("Start master service")
if self.masteronly:
start_options = "-i -M master -p %s --silent-mode=true -c gp_role=utility" % self.master_port
elif self.special_mode == 'upgrade':
start_options = "-i -M master -p %s --silent-mode=true -U" % self.master_port
elif self.special_mode == 'maintenance':
start_options = "-i -M master -p %s --silent-mode=true -m" % self.master_port
elif self.restrict:
start_options = "-i -M master -p %s --silent-mode=true -c superuser_reserved_connections=%s" % (self.master_port, self.max_connections)
else:
start_options = "-i -M master -p %s --silent-mode=true" % self.master_port
cmd_str = "%s; %s/bin/pg_ctl start -w -t %s -D %s -l %s/pg_log/startup.log -o \\\"%s\\\" >> %s" \
% (source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory, self.master_data_directory, start_options, log_filename)
return cmd_str
def start_master(self):
cmd = self._start_master_cmd()
result = remote_ssh(cmd, self.master_host_name, self.user)
if result != 0:
hawq_recovery_state = self._check_recovery_start()
if hawq_recovery_state == 'recoverying':
recovery_sleep_time = 3
logger.warn("Master is doing recovery, this might take minutes to hours")
logger.warn("Please do not interrupt it and wait patient")
sys.stdout.write("\r")
while True:
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(recovery_sleep_time)
if recovery_sleep_time < 60:
recovery_sleep_time = recovery_sleep_time + 1
hawq_recovery_state = self._check_recovery_start()
if hawq_recovery_state == 'recovery_success':
result = 0
break
elif hawq_recovery_state == 'recovery_failed':
logger.error("HAWQ master recoverying failed")
result = 1
break
else:
result = 0
sys.stdout.write("\n")
if self.node_type == "cluster" and (self.standby_host_name.lower() not in ('', 'none')):
logger.info("Checking if standby is synced with master")
sync_result = self._check_standby_sync()
if sync_result == 3:
logger.warn("Standby is not synced as: Standby master too far behind")
logger.warn("Use 'hawq init standby -n' to do force sync")
elif sync_result == 2:
check_standby_sync_count = 0
while self._check_standby_sync() == 2:
logger.info("Waiting standby to get synced for 3 seconds...")
time.sleep(3)
check_standby_sync_count += 1
if check_standby_sync_count > 20:
break
sync_result = self._check_standby_sync()
if sync_result != 0:
logger.warn("Standby is not synced after 60 seconds")
logger.warn("Use 'hawq init standby -n' to do force sync")
else:
logger.info("Standby master is synced")
else:
logger.info("Standby master is synced")
if result != 0:
return result
result = self.start_heartbeat_and_monitor(self.master_host_name, "master")
return result
def _start_standby_cmd(self):
logger.info("Start standby master service")
cmd_str = "%s; %s/bin/gpsyncmaster -D %s -i -p %s >> %s 2>&1 &" \
% (source_hawq_env, self.GPHOME, self.master_data_directory, self.standby_port, log_filename)
return cmd_str
def start_standby(self):
cmd = self._start_standby_cmd()
check_return_code(remote_ssh(cmd, self.standby_host_name, self.user))
cmd = "%s; %s/sbin/hawqstandbywatch.py %s debug" % (source_hawq_env, self.GPHOME, self.master_data_directory)
result = remote_ssh_nowait(cmd, self.standby_host_name, self.user)
if result != 0:
return result
result = self.start_heartbeat_and_monitor(self.standby_host_name, "standby")
return result
def _check_standby_sync(self):
try:
dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
conn = dbconn.connect(dburl, True)
query = "select summary_state, detail_state from gp_master_mirroring;"
rows = dbconn.execSQL(conn, query)
conn.close()
except DatabaseError, ex:
logger.error("Failed to connect to database, this script can only be run when the database is up")
cmd = '%s/bin/psql -p %s -d template1 -c \
"select summary_state, detail_state from gp_master_mirroring;"' % (self.GPHOME, self.master_port)
(resutl, stdout, stderr) = local_ssh_output(cmd)
if stdout.find('Standby master too far behind') != -1:
return 3
for row in rows:
if row[0] != 'Synchronized':
return 2
else:
return 0
def _start_segment_cmd(self):
logger.info("Start segment service")
cmd_str = "%s; %s/bin/pg_ctl start -w -t %s -D %s -l %s/pg_log/startup.log -o \\\" -i -M %s -p %s --silent-mode=true\\\" >> %s" \
% (source_hawq_env, self.GPHOME, self.timeout, self.segment_data_directory, self.segment_data_directory,
"segment", self.segment_port, log_filename)
return cmd_str
def start_segment(self):
cmd = self._start_segment_cmd()
result = remote_ssh(cmd, 'localhost', self.user)
return result
def _start_all_nodes(self):
logger.info("Start all the nodes in hawq cluster")
if self.hawq_acl_type == 'ranger':
self.start_rps(self.master_host_name)
if self.standby_host_name.lower() not in ('', 'none'):
if self.hawq_acl_type == 'ranger':
self.start_rps(self.standby_host_name)
logger.info("Starting standby master '%s'" % self.standby_host_name)
check_return_code(self.start_standby(), logger, "Standby master start failed, exit",
"Standby master started successfully")
logger.info("Starting master node '%s'" % self.master_host_name)
check_return_code(self.start_master(), logger, "Master start failed, exit", \
"Master started successfully")
segments_return_flag = self._start_all_segments()
if segments_return_flag == 0:
logger.info("HAWQ cluster started successfully")
return segments_return_flag
def _start_all_segments(self):
logger.info("Start all the segments in hawq cluster")
logger.info("Start segments in list: %s" % self.host_list)
bad_hosts = []
working_hosts = self.host_list
if self.ignore_bad_hosts:
working_hosts, bad_hosts = exclude_bad_hosts(self.host_list)
if len(bad_hosts) == len(self.host_list):
logger.error("Unable to SSH on any of the hosts, skipping segment start operation")
return
if len(bad_hosts) > 0:
logger.warning("Skipping starting segments in the list {0}, SSH test failed".format(bad_hosts))
self.hosts_count_number -= len(bad_hosts)
# Start segments
segment_cmd_str = self._start_segment_cmd()
q = Queue.Queue()
work_list = []
for host in working_hosts:
work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
logger.info("Total segment number is: %s" % len(self.host_list))
work_list.append({"func":check_progress,"args":(q, self.hosts_count_number, 'start', len(bad_hosts), self.quiet)})
node_init = HawqCommands(name = 'HAWQ', action_name = 'start', logger = logger)
node_init.get_function_list(work_list)
node_init.start()
logger.debug("Total threads return value is : %d" % node_init.return_flag)
if node_init.return_flag != 0:
logger.error("Segments start failed")
else:
logger.info("Segments started successfully")
return node_init.return_flag
def start_heartbeat_and_monitor(self,host,role):
if self.standby_host_name in ('', 'none') or self.enable_master_auto_ha == 'off':
return 0
if role == "master":
logger.info("Start heartbeat sender")
cmd_str = "%s; nohup %s/bin/autoswitch.sh start heartbeat_sender %s/heartbeat_log" % (source_hawq_env, self.GPHOME, os.path.expanduser("~")+"/hawqAdminLogs")
elif role == "standby":
logger.info("Start heartbeat monitor")
cmd_str = "%s; nohup %s/bin/autoswitch.sh start heartbeat_monitor %s/heartbeat_log" % (source_hawq_env, self.GPHOME, os.path.expanduser("~")+"/hawqAdminLogs")
result = remote_ssh(cmd_str, host,self.user)
return result
def _start_rps(self, rps_hostname):
logger.info("Start ranger plugin service on %s" % rps_hostname)
cmd_str = "%s/ranger/bin/rps.sh start" % (self.GPHOME)
result = remote_ssh(cmd_str, rps_hostname, self.user)
return result
def start_rps(self, rps_hostname):
check_return_code(self._start_rps(rps_hostname), logger, \
"Ranger plugin service start on %s failed, exit" % rps_hostname, \
"Ranger plugin service started on %s successfully" % rps_hostname)
def run(self):
if self.node_type == "master":
if self.hawq_acl_type == 'ranger':
self.start_rps(self.master_host_name)
check_return_code(self.start_master(), logger, \
"Master start failed, exit", "Master started successfully")
elif self.node_type == "standby":
if self.standby_host_name == '':
sys.exit(1)
if self.hawq_acl_type == 'ranger':
self.start_rps(self.standby_host_name)
check_return_code(self.start_standby(), logger, "Standby master start failed, exit",
"Standby master started successfully")
elif self.node_type == "segment":
check_return_code(self.start_segment(), logger, \
"Segment start failed, exit", "Segment started successfully")
elif self.node_type == "cluster":
check_return_code(self._start_all_nodes())
elif self.node_type == "allsegments":
check_return_code(self._start_all_segments())
else:
sys.exit('Node object should be in [master, standby, segment, allsegments, cluster]')
return None
class HawqStop:
def __init__(self, opts, hawq_dict):
self.node_type = opts.node_type
self.hawq_command = opts.hawq_command
self.user= opts.user
self.GPHOME = opts.GPHOME
self.stop_mode = opts.stop_mode
self.quiet = opts.quiet_run
self.log_dir = opts.log_dir
self.timeout = opts.timeout_seconds
self.hawq_dict = hawq_dict
self.hawq_reload = opts.hawq_reload
if self.hawq_reload:
self.stop_action = 'reload'
self.stop_action_past = 'reloaded'
else:
self.stop_action = 'stop'
self.stop_action_past = 'stopped'
self.lock = threading.Lock()
self.dburl = None
self.conn = None
self._get_config()
self.ignore_bad_hosts = opts.ignore_bad_hosts
self.hawq_acl_type = self.check_hawq_acl_type()
def _get_config(self):
check_items = ('hawq_master_address_host', 'hawq_master_address_port',
'hawq_master_directory', 'hawq_segment_directory',
'hawq_segment_address_port',
'hawq_master_temp_directory', 'hawq_segment_temp_directory')
for item in check_items:
if item not in self.hawq_dict:
sys.exit("Check: %s not configured in hawq-site.xml" % item)
self.master_host_name = self.hawq_dict['hawq_master_address_host']
self.master_port = self.hawq_dict['hawq_master_address_port']
self.master_data_directory = self.hawq_dict['hawq_master_directory']
self.master_address = self.master_host_name + ":" + self.master_port
self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
self.segment_port = self.hawq_dict['hawq_segment_address_port']
self.host_list = parse_hosts_file(self.GPHOME)
self.hosts_count_number = len(self.host_list)
if 'hawq_standby_address_host' in self.hawq_dict:
self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
self.standby_port = self.master_port
self.standby_address = self.standby_host_name + ":" + self.standby_port
else:
logger.info("No standby host configured")
self.standby_host_name = ''
if 'enable_master_auto_ha' in self.hawq_dict:
self.enable_master_auto_ha = self.hawq_dict['enable_master_auto_ha']
else:
self.enable_master_auto_ha = 'off'
def check_hawq_acl_type(self):
try:
dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
conn = dbconn.connect(dburl, True)
query = "select name, setting from pg_catalog.pg_settings where name='hawq_acl_type';"
rows = dbconn.execSQL(conn, query)
conn.close()
except DatabaseError, ex:
logger.warning("Failed to connect to database, cannot get hawq_acl_type")
return "unknown"
for row in rows:
if row[1].lower() == 'ranger':
return "ranger"
return "standalone"
def _stop_master_checks(self):
try:
total_connections = 0
self.dburl = dbconn.DbURL(hostname=self.master_host_name, port=self.master_port, username=self.user, dbname='template1')
self.conn = dbconn.connect(self.dburl, utility=True)
total_connections=len(catalog.getUserPIDs(self.conn))
self.conn.close()
logger.info("There are %d connections to the database" % total_connections)
except DatabaseError, ex:
logger.error("Failed to connect to the running database, please check master status")
logger.error("Or you can check hawq stop --help for other stop options")
sys.exit(1)
if total_connections > 0 and self.stop_mode=='smart':
logger.warning("There are other connections to this instance, shutdown mode smart aborted")
logger.warning("Either remove connections, or use 'hawq stop master -M fast' or 'hawq stop master -M immediate'")
logger.warning("See hawq stop --help for all options")
logger.error("Active connections. Aborting shutdown...")
sys.exit(1)
logger.info("Commencing Master instance shutdown with mode='%s'" % self.stop_mode)
logger.info("Master host=%s" % self.master_host_name)
if self.stop_mode == 'smart':
pass
elif self.stop_mode == 'fast':
logger.info("Detected %d connections to database" % total_connections)
if total_connections > 0:
logger.info("Switching to WAIT mode")
logger.info("Will wait for shutdown to complete, this may take some time if")
logger.info("there are a large number of active complex transactions, please wait...")
else:
if self.timeout == SEGMENT_TIMEOUT_DEFAULT:
logger.info("Using standard WAIT mode of %s seconds" % SEGMENT_TIMEOUT_DEFAULT)
else:
logger.info("Using WAIT mode of %s seconds" % self.timeout)
pass
def _stop_master_cmd(self):
logger.info("%s hawq master" % self.stop_action.title())
if self.hawq_reload:
cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
(source_hawq_env, self.GPHOME, self.master_data_directory, log_filename)
return cmd_str
else:
cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
(source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory,
self.master_data_directory, self.stop_mode, log_filename)
return cmd_str
def _stop_master(self):
master_host, master_running = check_hawq_running(self.master_host_name, self.master_data_directory, self.master_port, self.user, logger)
if master_running:
if self.stop_mode != 'immediate' and not self.hawq_reload:
self._stop_master_checks()
cmd = self._stop_master_cmd()
result = remote_ssh(cmd, self.master_host_name, self.user)
if result != 0:
return result
else:
logger.warn('Master is not running, skip')
result = self.stop_heartbeat_and_monitor(self.master_host_name, "master")
return result
def _stop_segment_cmd(self):
logger.info("%s hawq segment" % self.stop_action.title())
if self.hawq_reload:
cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
(source_hawq_env, self.GPHOME, self.segment_data_directory, log_filename)
return cmd_str
else:
# If stop cluster, firstly master stopped, then segment maybe hung at recv() from master.
# So here change segment stop mode from smart to fast, so that segment don't wait
# connection from master to be closed.
seg_stop_mod = self.stop_mode
if(self.node_type == 'cluster' and seg_stop_mod =='smart'):
seg_stop_mod = 'fast'
cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
(source_hawq_env, self.GPHOME, self.timeout, self.segment_data_directory,
self.segment_data_directory, seg_stop_mod, log_filename)
return cmd_str
def _stop_segment(self):
seg_host, segment_running = check_hawq_running('localhost', self.segment_data_directory, self.segment_port, self.user, logger)
if segment_running:
cmd = self._stop_segment_cmd()
result = remote_ssh(cmd, 'localhost', self.user)
return result
else:
logger.warn('HAWQ segment is not running, skip')
return 0
def _stop_standby_cmd(self):
logger.info("%s hawq standby master" % self.stop_action.title())
if self.hawq_reload:
cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
(source_hawq_env, self.GPHOME, self.master_data_directory, log_filename)
return cmd_str
else:
cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
(source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory,
self.master_data_directory, self.stop_mode, log_filename)
return cmd_str
def _stop_standby(self):
if check_syncmaster_running(self.master_data_directory, '', self.standby_host_name, logger):
cmd = self._stop_standby_cmd()
result = remote_ssh(cmd, self.standby_host_name, self.user)
if result != 0:
return result
else:
logger.warn('Standby master is not running, skip')
result = self.stop_heartbeat_and_monitor(self.standby_host_name, "standby")
return result
def _stopAll(self):
logger.info("%s hawq cluster" % self.stop_action.title())
master_result = 0
standby_result = 0
segments_return_flag = 0
master_result = self._stop_master()
if master_result != 0:
logger.error("Master %s failed" % self.stop_action)
else:
logger.info("Master %s successfully" % self.stop_action_past)
if self.standby_host_name.lower() not in ('', 'none'):
standby_result = self._stop_standby()
if standby_result != 0:
logger.error("Standby master %s failed" % self.stop_action)
else:
logger.info("Standby master %s successfully" % self.stop_action_past)
if self.hawq_acl_type in ['ranger', 'unknown']:
self._stop_rps(self.master_host_name, self.hawq_acl_type)
if self.standby_host_name.lower() not in ('', 'none'):
self._stop_rps(self.standby_host_name, self.hawq_acl_type)
# Execute segment stop command on each node.
segments_return_flag = self._stopAllSegments()
cluster_result = master_result + standby_result + segments_return_flag
if cluster_result != 0:
logger.error("Cluster %s failed" % self.stop_action)
else:
logger.info("Cluster %s successfully" % self.stop_action_past)
return cluster_result
def _running_segments_list(self, host_list):
work_list = []
running_host = []
stopped_host = []
seg_check_q = Queue.Queue()
for host in host_list:
work_list.append({"func":check_hawq_running,"args":(host, self.segment_data_directory, self.segment_port, self.user, logger)})
node_checks = threads_with_return(name = 'HAWQ', action_name = 'check', logger = logger, return_values = seg_check_q)
node_checks.get_function_list(work_list)
node_checks.start()
while not seg_check_q.empty():
item = seg_check_q.get()
if item[1] == True:
running_host.append(item[0])
else:
stopped_host.append(item[0])
return running_host, stopped_host
def _stopAllSegments(self):
bad_hosts = []
working_hosts = self.host_list
segment_cmd_str = self._stop_segment_cmd()
logger.info("%s segments in list: %s" % (self.stop_action.title(), self.host_list))
working_hosts, bad_hosts = exclude_bad_hosts(self.host_list)
if len(bad_hosts) == len(self.host_list):
logger.error("Unable to SSH on any of the hosts, skipping segment %s operation" % self.stop_action)
return 1
process_running_host, stopped_host = self._running_segments_list(working_hosts)
# Execute segment stop command on specified nodes.
if self.ignore_bad_hosts:
if len(bad_hosts) > 0:
logger.warning("Skipping {1} segments in the list {0}, SSH test failed".format(bad_hosts, self.stop_action))
skip_host_list = bad_hosts + stopped_host
else:
skip_host_list = stopped_host
# Stop segments
work_list = []
q = Queue.Queue()
for host in process_running_host:
work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
logger.info("Total segment number is: %s" % len(self.host_list))
work_list.append({"func":check_progress,"args":(q, len(process_running_host), self.stop_action, len(skip_host_list), self.quiet)})
node_init = HawqCommands(name = 'HAWQ', action_name = self.stop_action, logger = logger)
node_init.get_function_list(work_list)
node_init.start()
if self.ignore_bad_hosts:
total_return_flag = node_init.return_flag
else:
if len(bad_hosts) > 0:
logger.error("%s segment %s failed, SSH test failed on %s" % (len(bad_hosts), self.stop_action, bad_hosts))
total_return_flag = node_init.return_flag + len(bad_hosts)
if total_return_flag != 0:
logger.error("Segments %s failed" % self.stop_action)
else:
logger.info("Segments %s successfully" % self.stop_action_past)
return total_return_flag
def stop_heartbeat_and_monitor(self,host,role):
if self.hawq_reload:
# not stop when running 'hawq stop --reload'.
return 0
if role in [ "master", "standby" ]:
logger.info("Stop auto-switch service ")
cmd_str = "%s; %s/bin/autoswitch.sh stop " % (source_hawq_env, self.GPHOME)
else:
logger.error("heartbeat or its moniter stop is not applicable on role %s. \
Must be master or standby" % (role))
result = remote_ssh(cmd_str, host, self.user)
check_return_code(result, logger, \
"auto-switch service stop failed", \
"auto-switch service stop succesfully")
return result
def _stop_rps(self, rps_hostname, acl_type):
if self.hawq_reload:
# not stop RPS when running 'hawq stop --reload'.
return
check_ret = True
if acl_type == 'unknown':
logger.warning("Try to stop RPS when hawq_acl_type is unknown")
check_ret = False
logger.info("Stop ranger plugin service on %s" % rps_hostname)
cmd_str = "%s/ranger/bin/rps.sh stop" % (self.GPHOME)
result = remote_ssh(cmd_str, rps_hostname, self.user)
if check_ret:
check_return_code(result, logger, \
"Ranger plugin service stop failed on %s, exit" % rps_hostname, \
"Ranger plugin service stopped on %s successfully" % rps_hostname)
def run(self):
if self.node_type == "master":
check_return_code(self._stop_master(), logger, \
"Master %s failed, exit" % self.stop_action, "Master %s successfully" % self.stop_action_past)
if self.hawq_acl_type in ['ranger', 'unknown']:
self._stop_rps(self.master_host_name, self.hawq_acl_type)
elif self.node_type == "standby":
if self.standby_host_name.lower() not in ('', 'none'):
check_return_code(self._stop_standby(), logger, \
"Standby master %s failed, exit" % self.stop_action, "Standby master %s successfully" % self.stop_action_past)
if self.hawq_acl_type in ['ranger', 'unknown']:
self._stop_rps(self.standby_host_name, self.hawq_acl_type)
elif self.node_type == "segment":
check_return_code(self._stop_segment(), logger, \
"Segment %s failed, exit" % self.stop_action, "Segment %s successfully" % self.stop_action_past)
elif self.node_type == "cluster":
check_return_code(self._stopAll())
elif self.node_type == "allsegments":
check_return_code(self._stopAllSegments())
else:
sys.exit('Node object should be in [master, standby, segment, allsegments, cluster]')
return None
def flush_status(charc, seconds):
sys.stdout.write(charc)
sys.stdout.flush()
time.sleep(seconds)
def write_out_tofile(out, err, file):
with open(file, 'a+') as f:
f.write("stdout: " + out + '\n')
f.write("stderr: " + err + '\n')
def create_logger(logname='hawq logger', outputFile='/tmp/hawq_mgmt.log'):
logger = logging.getLogger(logname)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(outputFile)
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
#logger.addHandler(ch)
return logger
def get_args():
(opts, ARGS) = create_parser()
if ARGS[0]:
opts.hawq_command = ARGS[0]
if ARGS[1]:
opts.node_type = ARGS[1]
if opts.node_type in ['master', 'standby', 'segment', 'cluster', 'allsegments'] and opts.hawq_command in ['start', 'stop', 'restart', 'init', 'activate']:
if opts.log_dir and not os.path.exists(opts.log_dir):
os.makedirs(opts.log_dir)
global logger, log_filename
if opts.verbose:
enable_verbose_logging()
if opts.quiet_run:
quiet_stdout_logging()
logger, log_filename = setup_hawq_tool_logging('hawq_%s' % opts.hawq_command,getLocalHostname(),getUserName(), opts.log_dir)
logger.info("Prepare to do 'hawq %s'" % opts.hawq_command)
logger.info("You can find log in:")
logger.info(log_filename)
else:
print COMMON_HELP
sys.exit(1)
args_lens = len(ARGS)
if args_lens != 2:
sys.exit('Only support two arguements.')
if opts.verbose and opts.quiet_run:
logger.error("Multiple actions specified. See the --help info.")
sys.exit(1)
if opts.special_mode and (opts.hawq_command != 'start'):
logger.error("['-U', '--special-mode'] only apply to 'hawq start'. See the --help info.")
sys.exit(1)
opts.GPHOME = os.getenv('GPHOME')
if not opts.GPHOME:
logger.error("Didn't get GPHOME value, exit")
sys.exit()
logger.info("GPHOME is set to:")
logger.info(opts.GPHOME)
global source_hawq_env
source_hawq_env = "source %s/greenplum_path.sh" % opts.GPHOME
if opts.user == "":
opts.user = getpass.getuser()
if opts.user == "root":
logger.error("'root' user is not allowed")
sys.exit()
logger.debug("Current user is '%s'" % opts.user)
logger.debug("Parsing config file:")
logger.debug("%s/etc/hawq-site.xml" % opts.GPHOME)
hawqsite = HawqXMLParser(opts.GPHOME)
hawqsite.get_all_values()
hawq_dict = hawqsite.hawq_dict
cluster_host_list = list()
cluster_host_list.append(hawq_dict['hawq_master_address_host'])
if 'hawq_standby_address_host' in hawq_dict:
cluster_host_list.append(hawq_dict['hawq_standby_address_host'])
segments_host_list = parse_hosts_file(opts.GPHOME)
for host in segments_host_list:
cluster_host_list.append(host)
if opts.log_dir:
logger.debug("Check and create log directory %s on all hosts" % opts.log_dir)
create_success_host, create_failed_host = create_cluster_directory(opts.log_dir, cluster_host_list)
if len(create_failed_host) > 0:
logger.error("Create log directory %s failed on hosts %s" % (opts.log_dir, create_failed_host))
logger.error("Please check directory permission")
if opts.hawq_reload:
logger.info("Reloading configuration without restarting hawq cluster")
else:
logger.info("%s hawq with args: %s" % (opts.hawq_command.title(), ARGS))
return opts, hawq_dict
def remote_ssh(cmd_str, host, user, q=None):
if user == "":
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd_str)
else:
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd_str)
try:
result = subprocess.Popen(remote_cmd_str, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout,stderr = result.communicate()
except subprocess.CalledProcessError:
print "Execute shell command on %s failed" % host
pass
if stdout and stdout != '':
logger.info(stdout.strip())
if stderr and stderr != '':
logger.info(stderr.strip())
if q:
q.put(("done", host, result.returncode))
return result.returncode
def remote_ssh_nowait(cmd, host, user):
if user == "":
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd)
else:
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd)
result = subprocess.Popen(remote_cmd_str, shell=True).wait()
return result
def check_progress(q, total_num, action, skip_num = 0, quiet=False, service="segments"):
working_num = total_num
success_num = 0
pnum = 0
sys.stdout.write("\r")
while working_num > 0:
while not q.empty():
msg = q.get()
if msg[0] == "done":
working_num = working_num - 1
if msg[2] == 0:
success_num = success_num + 1
if not quiet:
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
if not quiet:
sys.stdout.write("\n")
if skip_num != 0:
logger.info("%s %d of %d %s, %d %s %s skipped" % (action, success_num, total_num, service, skip_num, service, action))
else:
logger.info("%s %d of %d %s" % (action, success_num, total_num, service))
return 0
def start_hawq(opts, hawq_dict):
instance = HawqStart(opts, hawq_dict)
instance.run()
return None
def stop_hawq(opts, hawq_dict):
instance = HawqStop(opts, hawq_dict)
instance.run()
return None
def hawq_init(opts, hawq_dict):
instance = HawqInit(opts, hawq_dict)
instance.run()
return None
def hawq_activate_standby(opts, hawq_dict):
old_master_host_name = hawq_dict['hawq_master_address_host']
hawq_master_directory = hawq_dict['hawq_master_directory']
if 'hawq_standby_address_host' in hawq_dict:
if hawq_dict['hawq_standby_address_host'].lower() not in ['none', '', 'localhost']:
old_standby_host_name = hawq_dict['hawq_standby_address_host']
new_master_host_name = hawq_dict['hawq_standby_address_host']
logger.info("Starting to activate standby master '%s'" % old_standby_host_name)
else:
logger.error("No valid standby host name found, skip activate standby")
sys.exit(1)
else:
logger.error("No valid standby host name found, skip activate standby")
sys.exit(1)
# Try to stop hawq cluster before doing standby activate.
if check_postgres_running(hawq_master_directory, '', old_master_host_name, logger):
logger.info("Try to stop hawq master before activate standby")
cmd = "%s; hawq_ctl stop master -a -M fast -q;" % source_hawq_env
result = remote_ssh(cmd, old_master_host_name, '')
if result != 0:
logger.error("Stop master failed, try again with immediate mode")
cmd = "%s; hawq_ctl stop master -a -M immediate -q;" % source_hawq_env
return_result = remote_ssh(cmd, old_master_host_name, '')
if return_result != 0:
logger.error("Stop master failed, abort")
logger.error("Please manually bring hawq cluster down, then do activate standby again")
sys.exit(1)
logger.info("Master is stopped")
else:
logger.info("HAWQ master is not running, skip")
ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
logger.info("Stopping all the running segments")
cmd = "%s; hawq_ctl stop allsegments -a -M fast -q %s;" % (source_hawq_env, ignore_bad_hosts)
result = remote_ssh(cmd, old_standby_host_name, '')
if result != 0:
logger.error("Stop segments failed, abort")
logger.error("Please manually bring hawq cluster down, then do activate standby again")
sys.exit(1)
logger.info("Stopping running standby")
if check_syncmaster_running(hawq_master_directory, '', old_standby_host_name, logger):
cmd = "%s; hawq_ctl stop standby -a -M fast -q;" % source_hawq_env
result = remote_ssh(cmd, old_standby_host_name, '')
if result != 0:
logger.error("Stop standby failed, abort")
logger.error("Please manually bring hawq cluster down, then do activate standby again")
sys.exit(1)
else:
logger.info("Standby master is not running, skip")
# Set current standby host name as the new master host name in configuration.
logger.info("Update master host name in hawq-site.xml")
cmd = "%s; hawqconfig -c hawq_master_address_host -v %s --skipvalidation -q %s" % (source_hawq_env, hawq_dict['hawq_standby_address_host'], ignore_bad_hosts)
check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Set hawq_master_address_host failed")
# Remove the old standby host configuration from hawq-site.xml.
logger.info("Remove current standby from hawq-site.xml")
cmd = "%s; hawqconfig -r hawq_standby_address_host --skipvalidation -q %s" % (source_hawq_env, ignore_bad_hosts)
check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Remove hawq_standby_address_host from configuration failed")
cmd = '''echo "gp_persistent_repair_global_sequence = true" >> %s/%s''' % (hawq_dict['hawq_master_directory'], 'postgresql.conf')
check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Set gp_persistent_repair_global_sequence = true failed")
# Start the new master in master only mode.
logger.info("Start master in master only mode")
cmd = "%s; hawq_ctl start master --masteronly -q" % source_hawq_env
check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start master in master only mode failed")
# Remove the old standby information in database.
logger.info("Remove current standby from catalog")
cmd = "%s; env PGOPTIONS=\\\"-c gp_session_role=utility\\\" psql -p %s -d template1 -o /dev/null -c \\\"select gp_remove_master_standby()\
where (select count(*) from gp_segment_configuration where role='s') = 1;\\\"" % (source_hawq_env, hawq_dict['hawq_master_address_port'])
result = remote_ssh(cmd, new_master_host_name, '')
# Try to restart hawq cluster.
logger.info("Stop hawq master")
cmd = "%s; hawq_ctl stop master -a -M fast -q" % source_hawq_env
check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Stop master failed")
logger.info("Start hawq cluster")
cmd = "%s; hawq_ctl start master" % source_hawq_env
check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start master failed")
cmd = "%s; hawq_ctl start allsegments %s" % (source_hawq_env, ignore_bad_hosts)
check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start all the segments failed")
cmd = '''sed -i "/gp_persistent_repair_global_sequence/d" %s/%s''' % (hawq_dict['hawq_master_directory'], 'postgresql.conf')
check_return_code(remote_ssh(cmd, new_master_host_name, ''))
logger.info("HAWQ activate standby successfully")
return None
def restart_hawq(opts, hawq_dict):
logger.info("Restarting hawq:")
stop_hawq(opts, hawq_dict)
start_hawq(opts, hawq_dict)
return None
def create_parser():
parser = OptionParser(usage="HAWQ management scripts options")
parser.add_option("-a", "--prompt",
action="store_false",
dest="prompt",
default=True,
help="Execute automatically")
parser.add_option("-M", "--mode",
choices=['smart', 'immediate', 'fast'],
dest="stop_mode",
default="smart",
help="HAWQ stop mode: smart/fast/immediate")
parser.add_option("-v", "--verbose",
action="store_true",
dest="verbose",
help="Execute with verbose output")
parser.add_option("-q", "--quiet",
action="store_true",
dest="quiet_run",
help="Execute in quiet mode")
parser.add_option("-l", "--logdir",
dest="log_dir",
help="Sets the directory for log files")
parser.add_option("-t", "--timeout",
dest="timeout_seconds",
default="600",
help="Set the timeout seconds, default is 600")
parser.add_option("--user",
dest="user",
default="",
help="Sets HAWQ user")
parser.add_option("-u", "--reload",
dest="hawq_reload",
action="store_true",
default=False,
help="Reload hawq configuration")
parser.add_option("-m", "--masteronly",
dest="masteronly",
action="store_true",
default=False,
help="Start hawq in utility mode")
parser.add_option("-U", "--special-mode",
choices=['upgrade', 'maintenance'],
dest="special_mode",
help="Start hawq in upgrade/maintenance mode")
parser.add_option("-R", "--restrict",
dest="restrict",
action="store_true",
default=False,
help="Start hawq in restrict mode")
parser.add_option('-r', '--remove-standby', action='store_true',
dest='remove_standby', default=False,
help='Delete hawq standby master node.')
parser.add_option('-s', '--standby-host', type='string',
dest='new_standby_hostname', default='none',
help='Hostname of system to create standby master on')
parser.add_option('-n', '--no-update', action='store_true',
dest='no_update', default=False,
help='Do not update system catalog tables.')
parser.add_option('-i', '--ignore-bad-hosts',
dest='ignore_bad_hosts', action='store_true',
default=False,
help='Skips syncing configuration files on hosts on which SSH fails')
parser.add_option("--bucket_number",
type="int",
dest="default_hash_table_bucket_number",
help="Sets maximum number of virtual segments per node")
parser.add_option("--tde_keyname",
dest="tde_keyname",
default="",
help="Sets the encryption zone key(EZK) name for the hawq directory(hawq_dfs_url)")
parser.add_option("--locale",
dest="hawq_locale",
default="en_US.utf8",
help="Sets the locale name")
parser.add_option("--lc-collate",
dest="hawq_lc_collate",
default="en_US.utf8",
help="Sets the string sort order")
parser.add_option("--lc-ctype",
dest="hawq_lc_ctype",
default="en_US.utf8",
help="Sets character classification")
parser.add_option("--lc-messages",
dest="hawq_lc_messages",
default="en_US.utf8",
help="Sets the language in which messages are displayed")
parser.add_option("--lc-monetary",
dest="hawq_lc_monetary",
default="en_US.utf8",
help="Sets the locale to use for formatting monetary amounts")
parser.add_option("--lc-numeric",
dest="hawq_lc_numeric",
default="en_US.utf8",
help="Sets the locale to use for formatting numbers")
parser.add_option("--lc-time",
dest="hawq_lc_time",
default="en_US.utf8",
help="Sets the locale to use for formatting dates and times")
parser.add_option("--max_connections",
dest="max_connections",
default="1280",
help="Sets the max_connections for formatting hawq database")
parser.add_option("--shared_buffers",
dest="shared_buffers",
default="128000kB",
help="Sets the shared_buffers for formatting hawq database")
parser.add_option("--with_magma",
action='store_true',
dest="with_magma",
default=False,
help="Start hawq cluster with magma")
(options, args) = parser.parse_args()
if len(args) == 0:
parser.print_help()
sys.exit(1)
return (options, args)
def local_run(cmd):
'''Execute shell command on local machine.'''
result = subprocess.Popen(cmd, shell=True).wait()
return result
if __name__ == '__main__':
(opts, hawq_dict) = get_args()
if opts.with_magma and opts.node_type != 'cluster':
logger.error("option --with_magma is only compatible with cluster")
sys.exit(1)
source_hawq_env = ". %s/greenplum_path.sh" % opts.GPHOME
if opts.hawq_command == 'start':
start_hawq(opts, hawq_dict)
if opts.with_magma:
cmd = "%s; magma start %s" % (source_hawq_env, 'cluster')
check_return_code(local_run(cmd), logger, "hawq start with magma failed")
elif opts.hawq_command == 'stop':
if opts.prompt:
if not userinput.ask_yesno(None, "\nContinue with HAWQ service stop", 'N'):
sys.exit(1)
stop_hawq(opts, hawq_dict)
if opts.with_magma :
cmd = "%s; magma stop %s" % (source_hawq_env, 'cluster')
check_return_code(local_run(cmd), logger, "hawq stop with magma failed")
elif opts.hawq_command == 'restart':
restart_hawq(opts, hawq_dict)
if opts.with_magma :
cmd = "%s; magma restart %s" % (source_hawq_env, 'cluster')
check_return_code(local_run(cmd), logger, "hawq restart with magma failed")
elif opts.hawq_command == 'init':
if opts.node_type == 'standby':
warn_msg = "\nMaster will get restarted, continue with standby init"
else:
warn_msg = "\nContinue with HAWQ init"
if opts.prompt:
if not userinput.ask_yesno(None, warn_msg, 'N'):
sys.exit(1)
hawq_init(opts, hawq_dict)
if opts.with_magma :
cmd = "%s; magma start %s" % (source_hawq_env, 'cluster')
check_return_code(local_run(cmd), logger, "hawq init with magma failed")
elif opts.hawq_command == 'activate':
if opts.prompt:
if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
sys.exit(1)
hawq_activate_standby(opts, hawq_dict)
else:
print COMMON_HELP