blob: ae853bd948c270efd489ce6e1809135059fe2233 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
try:
import os
import sys
import re
import logging, time
import subprocess
import threading
import Queue
import signal
import getpass
from optparse import OptionParser
from gppylib.gplog import setup_hawq_tool_logging, quiet_stdout_logging, enable_verbose_logging
from gppylib.commands.unix import getLocalHostname, getUserName
from gppylib.commands import gp
from gppylib.commands.gp import SEGMENT_TIMEOUT_DEFAULT
from gppylib import userinput
from gppylib.db import catalog
from gppylib.commands import unix
from hawqpylib.hawqlib import *
from hawqpylib.HAWQ_HELP import *
from gppylib.db import dbconn
from pygresql.pg import DatabaseError
except ImportError, e:
sys.exit('ERROR: Cannot import modules. Please check that you '
'have sourced greenplum_path.sh. Detail: ' + str(e))
class HawqInit:
def __init__(self, opts, hawq_dict):
self.node_type = opts.node_type
self.hawq_command = opts.hawq_command
self.user= opts.user
self.GPHOME = opts.GPHOME
self.stop_mode = opts.stop_mode
self.log_dir = opts.log_dir
self.timeout = opts.timeout_seconds
self.no_update = opts.no_update
self.remove_standby = opts.remove_standby
self.new_standby_hostname = opts.new_standby_hostname
self.hawq_dict = hawq_dict
self.locale = opts.hawq_locale
self.quiet = opts.quiet_run
self.hawq_lc_collate= opts.hawq_lc_collate
self.hawq_lc_ctype = opts.hawq_lc_ctype
self.hawq_lc_messages = opts.hawq_lc_messages
self.hawq_lc_monetary = opts.hawq_lc_monetary
self.hawq_lc_numeric = opts.hawq_lc_numeric
self.hawq_lc_time = opts.hawq_lc_time
self.max_connections = opts.max_connections
self.shared_buffers = opts.shared_buffers
self.default_hash_table_bucket_number = opts.default_hash_table_bucket_number
self.tde_keyname= opts.tde_keyname
self.lock = threading.Lock()
self.ignore_bad_hosts = opts.ignore_bad_hosts
self._get_config()
self._write_config()
self._get_ips()
def _get_config(self):
check_items = ('hawq_master_address_host', 'hawq_master_address_port',
'hawq_master_directory', 'hawq_segment_directory',
'hawq_segment_address_port', 'hawq_dfs_url',
'hawq_master_temp_directory', 'hawq_segment_temp_directory')
if self.node_type in ['master', 'segment', 'standby']:
for item in check_items:
if item in self.hawq_dict:
logger.info("Check: %s is set" % item)
else:
sys.exit("Check: %s not configured in hawq-site.xml" % item)
self.master_host_name = self.hawq_dict['hawq_master_address_host']
self.master_port = self.hawq_dict['hawq_master_address_port']
self.master_data_directory = self.hawq_dict['hawq_master_directory']
self.master_address = self.master_host_name + ":" + self.master_port
self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
self.segment_port = self.hawq_dict['hawq_segment_address_port']
self.hawq_master_temp_directory = self.hawq_dict['hawq_master_temp_directory']
self.hawq_segment_temp_directory = self.hawq_dict['hawq_segment_temp_directory']
self.dfs_url = self.hawq_dict['hawq_dfs_url']
self.host_list = parse_hosts_file(self.GPHOME)
self.hosts_count_number = len(self.host_list)
if 'hawq_standby_address_host' in self.hawq_dict:
self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
self.standby_port = self.master_port
self.standby_address = self.standby_host_name + ":" + self.standby_port
if self.standby_host_name in (self.master_host_name, 'localhost', '127.0.0.1'):
logger.error("Standby host should not be the same as master host")
sys.exit(1)
else:
self.standby_host_name = ''
if not self.default_hash_table_bucket_number and 'default_hash_table_bucket_number' in self.hawq_dict:
self.default_hash_table_bucket_number = self.hawq_dict['default_hash_table_bucket_number']
if self.new_standby_hostname != 'none':
self.standby_host_name = self.new_standby_hostname
if self.standby_host_name.lower() in ('', 'none'):
logger.info("No standby host configured, skip it")
if 'enable_secure_filesystem' in self.hawq_dict:
self.enable_secure_filesystem = self.hawq_dict['enable_secure_filesystem']
else:
self.enable_secure_filesystem = 'off'
if 'krb_server_keyfile' in self.hawq_dict:
self.krb_server_keyfile = self.hawq_dict['krb_server_keyfile']
else:
self.krb_server_keyfile = ''
if 'krb_srvname' in self.hawq_dict:
self.krb_srvname = self.hawq_dict['krb_srvname']
else:
self.krb_srvname = 'postgres'
def _write_config(self):
configFile = "%s/etc/_mgmt_config" % self.GPHOME
# Clean configFile while the first write in.
with open(configFile, 'w') as f:
f.write("GPHOME=%s\n" % self.GPHOME)
with open(configFile, 'a') as f:
f.write("hawqUser=%s\n" % self.user)
f.write("master_host_name=%s\n" % self.master_host_name)
f.write("master_port=%s\n" % self.master_port)
f.write("master_data_directory=%s\n" % self.master_data_directory)
f.write("segment_data_directory=%s\n" % self.segment_data_directory)
f.write("segment_port=%s\n" % self.segment_port)
f.write("hawq_master_temp_directory=%s\n" % self.hawq_master_temp_directory)
f.write("hawq_segment_temp_directory=%s\n" % self.hawq_segment_temp_directory)
f.write("locale=%s\n" % self.locale)
f.write("hawq_lc_collate=%s\n" % self.hawq_lc_collate)
f.write("hawq_lc_ctype=%s\n" % self.hawq_lc_ctype)
f.write("hawq_lc_messages=%s\n" % self.hawq_lc_messages)
f.write("hawq_lc_monetary=%s\n" % self.hawq_lc_monetary)
f.write("hawq_lc_numeric=%s\n" % self.hawq_lc_numeric)
f.write("hawq_lc_time=%s\n" % self.hawq_lc_time)
f.write("max_connections=%s\n" % self.max_connections)
f.write("shared_buffers=%s\n" % self.shared_buffers)
f.write("dfs_url=%s\n" % self.dfs_url)
f.write("log_filename=%s\n" % log_filename)
f.write("log_dir=%s\n" % self.log_dir)
if self.standby_host_name.lower() not in ('', 'none'):
f.write("standby_host_name=%s\n" % self.standby_host_name)
def _get_ips(self):
cmd = "%s/bin/lib/get_ip_addresses_of_host.sh %s master_ip_address_all" % (self.GPHOME, self.master_host_name)
local_ssh(cmd, logger)
if self.standby_host_name.lower() not in ('', 'none') and not self.remove_standby:
cmd = "%s/bin/lib/get_ip_addresses_of_host.sh %s standby_ip_address_all" % (self.GPHOME, self.standby_host_name)
local_ssh(cmd, logger)
def check_hdfs_path(self):
if self.tde_keyname:
cmd = "%s/bin/gpcheckhdfs hdfs %s %s %s %s --with-tde %s" % \
(self.GPHOME, self.dfs_url, self.enable_secure_filesystem, \
self.krb_srvname, self.krb_server_keyfile, self.tde_keyname)
else:
cmd = "%s/bin/gpcheckhdfs hdfs %s %s %s %s" % \
(self.GPHOME, self.dfs_url, self.enable_secure_filesystem, \
self.krb_srvname, self.krb_server_keyfile)
logger.info("Check if hdfs path is available")
logger.debug("Check hdfs: %s" % cmd)
check_return_code(local_ssh(cmd, logger, warning = True), logger, "Check hdfs failed, please verify your hdfs settings")
if self.tde_keyname:
logger.info("Create encryption zone successfully, key_name:%s" % (self.tde_keyname))
def set_new_standby_host(self):
if self.new_standby_hostname == self.master_host_name:
logger.error("Standby host name can't be the same as master host name")
sys.exit(1)
cmd = "%s; hawq config -c hawq_standby_address_host -v %s --skipvalidation -q > /dev/null" % \
(source_hawq_env, self.new_standby_hostname)
result = local_ssh(cmd, logger)
if result != 0:
logger.warn("Set standby host name failed")
return result
def sync_hdfs_client(self):
sync_host_str = ""
for node in self.host_list:
sync_host_str += " -h %s" % node
if 'hawq_standby_address_host' in self.hawq_dict and self.standby_host_name.lower() not in ('', 'none'):
sync_host_str += " -h %s" % self.standby_host_name
result = local_ssh("hawq scp %s %s/etc/hdfs-client.xml =:%s/etc/" % (sync_host_str, self.GPHOME, self.GPHOME))
if result != 0:
logger.error("Sync hdfs-client.xml failed.")
sys.exit(1)
def set_replace_datanode_on_failure(self):
xml_file = "%s/etc/hdfs-client.xml" % self.GPHOME
property_name = 'output.replace-datanode-on-failure'
pexist, pname, pvalue = check_property_exist_xml(xml_file, property_name)
datanodes_threshold = 4
if self.hosts_count_number < datanodes_threshold:
property_value = 'false'
else:
property_value = 'true'
if pvalue != property_value:
logger.info("Set output.replace-datanode-on-failure to %s" % property_value)
if pexist:
logger.debug("Update output.replace-datanode-on-failure to %s" % property_value)
update_xml_property(xml_file, property_name, property_value)
else:
logger.debug("Add output.replace-datanode-on-failure as %s" % property_value)
update_xml_property(xml_file, property_name, property_value)
self.sync_hdfs_client()
# Check hdfs-client.xml again to validate changes
pexist, pname, pvalue = check_property_exist_xml(xml_file, property_name)
if pvalue == property_value:
result = 0
else:
result = 1
else:
result = 0
return result
def set_default_hash_table_bucket_number(self):
if not self.default_hash_table_bucket_number:
if 'hawq_rm_nvseg_perquery_limit' in self.hawq_dict:
hawq_rm_nvseg_perquery_limit = self.hawq_dict['hawq_rm_nvseg_perquery_limit']
else:
hawq_rm_nvseg_perquery_limit = 512
factor_min = 1
factor_max = 6
limit = int(hawq_rm_nvseg_perquery_limit)
if int(self.hosts_count_number) == 0:
segments_num = 1
else:
segments_num = int(self.hosts_count_number)
factor = limit / segments_num
# if too many segments or default limit is too low --> stick with the limit
if factor < factor_min:
buckets = limit
# if the limit is large and results in factor > max --> limit factor to max
elif factor > factor_max:
buckets = factor_max * segments_num
else:
buckets = factor * segments_num
self.default_hash_table_bucket_number = buckets
logger.info("Set default_hash_table_bucket_number as: %s" % self.default_hash_table_bucket_number)
ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
cmd = "hawq config -c default_hash_table_bucket_number -v %s --skipvalidation -q %s > /dev/null" % \
(self.default_hash_table_bucket_number, ignore_bad_hosts)
result = local_ssh(cmd, logger)
if result != 0:
logger.error("Set default_hash_table_bucket_number failed")
return result
def _get_master_init_cmd(self):
cmd = "%s/bin/lib/hawqinit.sh master '%s'" % \
(self.GPHOME, self.GPHOME)
return cmd
def _get_standby_init_cmd(self):
cmd = "%s/bin/lib/hawqinit.sh standby '%s'" % \
(self.GPHOME, self.GPHOME)
return cmd
def hawq_remove_standby(self):
"""Removes the standby master"""
running_standby_host = ''
try:
dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
conn = dbconn.connect(dburl, True)
query = "select role, hostname from gp_segment_configuration where role = 's';"
rows = dbconn.execSQL(conn, query)
conn.close()
except DatabaseError, ex:
logger.error("Failed to connect to database, this script can only be run when the database is up")
sys.exit(1)
for row in rows:
if row[0] == 's':
running_standby_host = row[1]
if running_standby_host:
logger.info("running standby host is %s" % running_standby_host)
signal.signal(signal.SIGINT,signal.SIG_IGN)
logger.info("Stop HAWQ cluster")
cmd = "%s; hawq stop master -a -M fast -q" % source_hawq_env
check_return_code(local_ssh(cmd, logger), logger, "Stop HAWQ master failed, exit")
ignore_bad_hosts = '--ignore-bad-hosts' if self.ignore_bad_hosts else ''
cmd = "%s; hawq stop allsegments -a -q %s" % (source_hawq_env, ignore_bad_hosts)
check_return_code(local_ssh(cmd, logger), logger, "Stop HAWQ segments failed, exit")
logger.info("Start HAWQ master")
cmd = "%s; hawq start master -m -q" % source_hawq_env
check_return_code(local_ssh(cmd, logger), logger, "Start HAWQ master failed, exit")
try:
logger.info('Remove standby from Database catalog.')
#dburl = dbconn.DbURL(port=self.master_port, dbname='template1')
#conn = dbconn.connect(dburl, utility=True, verbose=True)
#query = "select gp_remove_master_standby();"
#rows = dbconn.execSQL(conn, query)
#for row in rows:
# print row
#conn.close()
cmd = 'env PGOPTIONS="-c gp_session_role=utility" %s/bin/psql -p %s -d template1 -o /dev/null -c \
"select gp_remove_master_standby();"' % (self.GPHOME, self.master_port)
check_return_code(local_ssh(cmd, logger), logger, \
"Update catalog failed, exit", "Catalog updated successfully.")
logger.info("Stop HAWQ master")
cmd = "%s; hawq stop master -a -M fast" % source_hawq_env
check_return_code(local_ssh(cmd, logger), logger, "Stop hawq master failed, exit")
except DatabaseError, ex:
logger.error("Failed to connect to database, this script can only be run when the database is up")
cmd = "%s; hawq stop master -a -M fast" % source_hawq_env
check_return_code(local_ssh(cmd, logger), logger, "Stop hawq master failed, exit")
remove_property_xml("hawq_standby_address_host", "%s/etc/hawq-site.xml" % self.GPHOME, self.quiet)
host_list = parse_hosts_file(self.GPHOME)
sync_hawq_site(self.GPHOME, host_list)
if is_node_alive(self.standby_host_name, self.user, logger):
logger.info("Check if gpsyncmaster running on %s" % running_standby_host)
gpsyncmaster_pid = gp.getSyncmasterPID(running_standby_host, self.master_data_directory)
if gpsyncmaster_pid > 0:
# stop it
logger.info('Stopping gpsyncmaster on %s' % running_standby_host)
gp.SegmentStop.remote('stop gpsyncmaster',
running_standby_host,
self.master_data_directory)
tmp_dir_list = self.hawq_master_temp_directory.replace(',', ' ')
logger.debug("rm -rf %s/* %s/*" % (self.master_data_directory, tmp_dir_list))
cmd = "rm -rf %s/* %s/*" % (self.master_data_directory, tmp_dir_list)
result = remote_ssh(cmd, self.standby_host_name, self.user)
if result != 0:
logger.warn('Remove data files on standby master failed')
else:
logger.warn('Not able to connect to Standby master, skip node clean')
signal.signal(signal.SIGINT,signal.default_int_handler)
logger.info('Remove standby master finished')
else:
logger.info("Do not find a running standby master")
def _check_master_recovery(self):
cmd = "%s; %s/bin/pg_controldata %s |grep 'Database cluster state';" % \
(source_hawq_env, self.GPHOME, self.master_data_directory)
result, stdout, stderr = remote_ssh_output(cmd, self.master_host_name, '')
if stdout.find("recovery") != -1:
logger.info('Master is doing recovery start, abort init standby')
return 1
return 0
def _init_standby(self):
logger.info("Start to init standby master: '%s'" % self.standby_host_name)
logger.info("This might take a couple of minutes, please wait...")
check_return_code(self._check_master_recovery())
# Sync config files from master.
scpcmd = "scp %s/etc/_mgmt_config %s:%s/etc/_mgmt_config > /dev/null" % \
(self.GPHOME, self.standby_host_name, self.GPHOME)
check_return_code(local_ssh(scpcmd, logger, warning = True), \
logger, "Sync _mgmt_config failed")
scpcmd = "scp %s/etc/hawq-site.xml %s:%s/etc/hawq-site.xml > /dev/null" % \
(self.GPHOME, self.standby_host_name, self.GPHOME)
check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
logger, "Sync hawq-site.xml failed")
scpcmd = "scp %s/etc/slaves %s:%s/etc/slaves > /dev/null" % \
(self.GPHOME, self.standby_host_name, self.GPHOME)
check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
logger, "Sync slaves file failed")
# Sync rps configuration
if os.path.exists("%s/ranger/" % (self.GPHOME)):
scpcmd = "scp %s/ranger/etc/* %s:%s/ranger/etc/ > /dev/null" % \
(self.GPHOME, self.standby_host_name, self.GPHOME)
check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
logger, "Sync rps configuration files failed")
standby_init_cmd = self._get_standby_init_cmd()
return check_return_code(remote_ssh_nowait(standby_init_cmd, self.standby_host_name, self.user))
def _resync_standby(self):
logger.info("Re-sync standby")
cmd = "%s; hawq stop master -a -M %s;" % (source_hawq_env, self.stop_mode)
check_return_code(local_ssh(cmd, logger), logger, "Stop hawq cluster failed, exit")
cmd = "cd %s; %s; %s/bin/lib/pysync.py -x gpperfmon/data -x pg_log -x db_dumps %s %s:%s" % \
(self.master_data_directory, source_hawq_env, self.GPHOME, self.master_data_directory,
self.standby_host_name, self.master_data_directory)
result = local_ssh(cmd, logger)
check_return_code(result, logger, "Re-sync standby master failed, exit")
cmd = "%s; hawq start master -a" % source_hawq_env
result = local_ssh(cmd, logger)
check_return_code(result, logger, "Start hawq cluster failed")
return result
def _get_segment_init_cmd(self):
cmd = "%s/bin/lib/hawqinit.sh segment '%s'" % \
(self.GPHOME, self.GPHOME)
return cmd
def _init_cluster(self):
logger.info("%s segment hosts defined" % self.hosts_count_number)
check_return_code(self.set_default_hash_table_bucket_number())
check_return_code(self.set_replace_datanode_on_failure())
master_cmd = self._get_master_init_cmd()
logger.info("Start to init master node: '%s'" % self.master_host_name)
check_return_code(local_ssh(master_cmd, logger, warning = True), logger, \
"Master init failed, exit", \
"Master init successfully")
if self.standby_host_name.lower() not in ('', 'none'):
check_return_code(self._init_standby(), logger, \
"Init standby failed, exit", \
"Init standby successfully")
check_return_code(self._init_all_segments(), logger, \
"Segments init failed, exit", \
"Segments init successfully on nodes '%s'" % self.host_list)
logger.info("Init HAWQ cluster successfully")
def _init_all_segments(self):
segment_cmd_str = self._get_segment_init_cmd()
# Execute segment init command on each segment nodes.
logger.info("Init segments in list: %s" % self.host_list)
work_list = []
q = Queue.Queue()
for host in self.host_list:
logger.debug("Start to init segment on node '%s'" % host)
scpcmd = "scp %s/etc/_mgmt_config %s:%s/etc/_mgmt_config > /dev/null" % (self.GPHOME, host, self.GPHOME)
local_ssh(scpcmd, logger)
work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
logger.info("Total segment number is: %s" % len(self.host_list))
work_list.append({"func":check_progress,"args":(q, self.hosts_count_number, 'init', 0, self.quiet)})
node_init = HawqCommands(name='HAWQ', action_name = 'init', logger = logger)
node_init.get_function_list(work_list)
node_init.start()
return node_init.return_flag
def run(self):
if self.new_standby_hostname != 'none':
check_return_code(self.set_new_standby_host())
if self.node_type == "master":
self.check_hdfs_path()
logger.info("%s segment hosts defined" % self.hosts_count_number)
check_return_code(self.set_default_hash_table_bucket_number())
check_return_code(self.set_replace_datanode_on_failure())
logger.info("Start to init master")
cmd = self._get_master_init_cmd()
check_return_code(local_ssh(cmd, logger, warning = True), logger, \
"Master init failed, exit", "Master init successfully")
elif self.node_type == "standby" and self.remove_standby is True:
logger.info("Try to remove standby master")
self.hawq_remove_standby()
elif self.node_type == "standby":
if self.standby_host_name.lower() in ('', 'none'):
logger.info("No standby host found")
logger.info("Please check your standby host name")
sys.exit(1)
if self.no_update:
check_return_code(self._resync_standby(), logger, \
"Standby master re-sync failed, exit", \
"Standby master re-sync successfully")
else:
check_return_code(self._init_standby(), logger, \
"Init standby failed, exit", \
"Init standby successfully")
elif self.node_type == "segment":
cmd = self._get_segment_init_cmd()
check_return_code(local_ssh(cmd, logger, warning = True), logger, \
"Segment init failed, exit", \
"Segment init successfully")
elif self.node_type == "cluster":
self.check_hdfs_path()
self._init_cluster()
else:
sys.exit('hawq init object should be one of master/standby/segment/cluster')
return None
class HawqStart:
def __init__(self, opts, hawq_dict):
self.node_type = opts.node_type
self.hawq_command = opts.hawq_command
self.user= opts.user
self.GPHOME = opts.GPHOME
self.quiet = opts.quiet_run
self.stop_mode = opts.stop_mode
self.log_dir = opts.log_dir
self.timeout = opts.timeout_seconds
self.hawq_dict = hawq_dict
self.lock = threading.Lock()
self.max_connections = opts.max_connections
self.masteronly = opts.masteronly
self.special_mode = opts.special_mode
self.restrict = opts.restrict
self.ignore_bad_hosts = opts.ignore_bad_hosts
self._get_config()
self.hawq_acl_type = self.check_hawq_acl_type()
def _get_config(self):
logger.info("Gathering information and validating the environment...")
check_items = ('hawq_master_address_host', 'hawq_master_address_port',
'hawq_master_directory', 'hawq_segment_directory',
'hawq_segment_address_port', 'hawq_dfs_url',
'hawq_master_temp_directory', 'hawq_segment_temp_directory')
for item in check_items:
if item not in self.hawq_dict:
logger.error("Check: %s not configured in hawq-site.xml" % item)
sys.exit()
self.master_host_name = self.hawq_dict['hawq_master_address_host']
self.master_port = self.hawq_dict['hawq_master_address_port']
self.master_data_directory = self.hawq_dict['hawq_master_directory']
self.master_address = self.master_host_name + ":" + self.master_port
self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
self.segment_port = self.hawq_dict['hawq_segment_address_port']
self.dfs_url = self.hawq_dict['hawq_dfs_url']
self.host_list = parse_hosts_file(self.GPHOME)
self.hosts_count_number = len(self.host_list)
if 'hawq_standby_address_host' in self.hawq_dict:
self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
self.standby_port = self.master_port
self.standby_address = self.standby_host_name + ":" + self.standby_port
else:
logger.info("No standby host configured")
self.standby_host_name = ''
def check_hawq_acl_type(self):
if 'hawq_acl_type' in self.hawq_dict and self.hawq_dict['hawq_acl_type'].lower() == 'ranger':
return "ranger"
return "standalone"
def _check_recovery_start(self):
cmd = "%s; %s/bin/pg_controldata %s |grep 'Database cluster state';" % (source_hawq_env, self.GPHOME, self.master_data_directory)
result, stdout, stderr = remote_ssh_output(cmd, self.master_host_name, '')
if stdout.find("recovery") != -1:
hawq_recovery_state = 'recoverying'
elif stdout.find("starting up") != -1:
hawq_recovery_state = 'starting_up'
elif stdout.find("in production") != -1:
hawq_recovery_state = 'recovery_success'
else:
hawq_recovery_state = 'recovery_failed'
return hawq_recovery_state
def _start_master_cmd(self):
logger.info("Start master service")
if self.masteronly:
start_options = "-i -M master -p %s --silent-mode=true -c gp_role=utility" % self.master_port
elif self.special_mode == 'upgrade':
start_options = "-i -M master -p %s --silent-mode=true -U" % self.master_port
elif self.special_mode == 'maintenance':
start_options = "-i -M master -p %s --silent-mode=true -m" % self.master_port
elif self.restrict:
start_options = "-i -M master -p %s --silent-mode=true -c superuser_reserved_connections=%s" % (self.master_port, self.max_connections)
else:
start_options = "-i -M master -p %s --silent-mode=true" % self.master_port
cmd_str = "%s; %s/bin/pg_ctl start -w -t %s -D %s -l %s/pg_log/startup.log -o \\\"%s\\\" >> %s" \
% (source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory, self.master_data_directory, start_options, log_filename)
return cmd_str
def start_master(self):
cmd = self._start_master_cmd()
result = remote_ssh(cmd, self.master_host_name, self.user)
if result != 0:
hawq_recovery_state = self._check_recovery_start()
if hawq_recovery_state == 'recoverying':
recovery_sleep_time = 3
logger.warn("Master is doing recovery, this might take minutes to hours")
logger.warn("Please do not interrupt it and wait patient")
sys.stdout.write("\r")
while True:
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(recovery_sleep_time)
if recovery_sleep_time < 60:
recovery_sleep_time = recovery_sleep_time + 1
hawq_recovery_state = self._check_recovery_start()
if hawq_recovery_state == 'recovery_success':
result = 0
break
elif hawq_recovery_state == 'recovery_failed':
logger.error("HAWQ master recoverying failed")
result = 1
break
else:
result = 0
sys.stdout.write("\n")
if self.node_type == "cluster" and (self.standby_host_name.lower() not in ('', 'none')):
logger.info("Checking if standby is synced with master")
sync_result = self._check_standby_sync()
if sync_result == 3:
logger.warn("Standby is not synced as: Standby master too far behind")
logger.warn("Use 'hawq init standby -n' to do force sync")
elif sync_result == 2:
check_standby_sync_count = 0
while self._check_standby_sync() == 2:
logger.info("Waiting standby to get synced for 3 seconds...")
time.sleep(3)
check_standby_sync_count += 1
if check_standby_sync_count > 20:
break
sync_result = self._check_standby_sync()
if sync_result != 0:
logger.warn("Standby is not synced after 60 seconds")
logger.warn("Use 'hawq init standby -n' to do force sync")
else:
logger.info("Standby master is synced")
else:
logger.info("Standby master is synced")
return result
def _start_standby_cmd(self):
logger.info("Start standby master service")
cmd_str = "%s; %s/bin/gpsyncmaster -D %s -i -p %s >> %s 2>&1 &" \
% (source_hawq_env, self.GPHOME, self.master_data_directory, self.standby_port, log_filename)
return cmd_str
def start_standby(self):
cmd = self._start_standby_cmd()
check_return_code(remote_ssh(cmd, self.standby_host_name, self.user))
cmd = "%s; %s/sbin/hawqstandbywatch.py %s debug" % (source_hawq_env, self.GPHOME, self.master_data_directory)
result = remote_ssh_nowait(cmd, self.standby_host_name, self.user)
return result
def _check_standby_sync(self):
try:
dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
conn = dbconn.connect(dburl, True)
query = "select summary_state, detail_state from gp_master_mirroring;"
rows = dbconn.execSQL(conn, query)
conn.close()
except DatabaseError, ex:
logger.error("Failed to connect to database, this script can only be run when the database is up")
cmd = '%s/bin/psql -p %s -d template1 -c \
"select summary_state, detail_state from gp_master_mirroring;"' % (self.GPHOME, self.master_port)
(resutl, stdout, stderr) = local_ssh_output(cmd)
if stdout.find('Standby master too far behind') != -1:
return 3
for row in rows:
if row[0] != 'Synchronized':
return 2
else:
return 0
def _start_segment_cmd(self):
logger.info("Start segment service")
cmd_str = "%s; %s/bin/pg_ctl start -w -t %s -D %s -l %s/pg_log/startup.log -o \\\" -i -M %s -p %s --silent-mode=true\\\" >> %s" \
% (source_hawq_env, self.GPHOME, self.timeout, self.segment_data_directory, self.segment_data_directory,
"segment", self.segment_port, log_filename)
return cmd_str
def start_segment(self):
cmd = self._start_segment_cmd()
result = remote_ssh(cmd, 'localhost', self.user)
return result
def _start_all_nodes(self):
logger.info("Start all the nodes in hawq cluster")
if self.hawq_acl_type == 'ranger':
self.start_rps(self.master_host_name)
if self.standby_host_name.lower() not in ('', 'none'):
if self.hawq_acl_type == 'ranger':
self.start_rps(self.standby_host_name)
logger.info("Starting standby master '%s'" % self.standby_host_name)
check_return_code(self.start_standby(), logger, "Standby master start failed, exit",
"Standby master started successfully")
logger.info("Starting master node '%s'" % self.master_host_name)
check_return_code(self.start_master(), logger, "Master start failed, exit", \
"Master started successfully")
segments_return_flag = self._start_all_segments()
if segments_return_flag == 0:
logger.info("HAWQ cluster started successfully")
return segments_return_flag
def _start_all_segments(self):
logger.info("Start all the segments in hawq cluster")
logger.info("Start segments in list: %s" % self.host_list)
bad_hosts = []
working_hosts = self.host_list
if self.ignore_bad_hosts:
working_hosts, bad_hosts = exclude_bad_hosts(self.host_list)
if len(bad_hosts) == len(self.host_list):
logger.error("Unable to SSH on any of the hosts, skipping segment start operation")
return
if len(bad_hosts) > 0:
logger.warning("Skipping starting segments in the list {0}, SSH test failed".format(bad_hosts))
self.hosts_count_number -= len(bad_hosts)
segment_cmd_str = self._start_segment_cmd()
q = Queue.Queue()
work_list = []
for host in working_hosts:
work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
logger.info("Total segment number is: %s" % len(self.host_list))
work_list.append({"func":check_progress,"args":(q, self.hosts_count_number, 'start', len(bad_hosts), self.quiet)})
node_init = HawqCommands(name = 'HAWQ', action_name = 'start', logger = logger)
node_init.get_function_list(work_list)
node_init.start()
logger.debug("Total threads return value is : %d" % node_init.return_flag)
if node_init.return_flag != 0:
logger.error("Segments start failed")
else:
logger.info("Segments started successfully")
return node_init.return_flag
def _start_rps(self, rps_hostname):
logger.info("Start ranger plugin service on %s" % rps_hostname)
cmd_str = "%s/ranger/bin/rps.sh start" % (self.GPHOME)
result = remote_ssh(cmd_str, rps_hostname, self.user)
return result
def start_rps(self, rps_hostname):
check_return_code(self._start_rps(rps_hostname), logger, \
"Ranger plugin service start on %s failed, exit" % rps_hostname, \
"Ranger plugin service started on %s successfully" % rps_hostname)
def run(self):
if self.node_type == "master":
if self.hawq_acl_type == 'ranger':
self.start_rps(self.master_host_name)
check_return_code(self.start_master(), logger, \
"Master start failed, exit", "Master started successfully")
elif self.node_type == "standby":
if self.standby_host_name == '':
sys.exit(1)
if self.hawq_acl_type == 'ranger':
self.start_rps(self.standby_host_name)
check_return_code(self.start_standby(), logger, "Standby master start failed, exit",
"Standby master started successfully")
elif self.node_type == "segment":
check_return_code(self.start_segment(), logger, \
"Segment start failed, exit", "Segment started successfully")
elif self.node_type == "cluster":
check_return_code(self._start_all_nodes())
elif self.node_type == "allsegments":
check_return_code(self._start_all_segments())
else:
sys.exit('Node object should be in [master, standby, segment, allsegments, cluster]')
return None
class HawqStop:
def __init__(self, opts, hawq_dict):
self.node_type = opts.node_type
self.hawq_command = opts.hawq_command
self.user= opts.user
self.GPHOME = opts.GPHOME
self.stop_mode = opts.stop_mode
self.quiet = opts.quiet_run
self.log_dir = opts.log_dir
self.timeout = opts.timeout_seconds
self.hawq_dict = hawq_dict
self.hawq_reload = opts.hawq_reload
if self.hawq_reload:
self.stop_action = 'reload'
self.stop_action_past = 'reloaded'
else:
self.stop_action = 'stop'
self.stop_action_past = 'stopped'
self.lock = threading.Lock()
self.dburl = None
self.conn = None
self._get_config()
self.ignore_bad_hosts = opts.ignore_bad_hosts
self.hawq_acl_type = self.check_hawq_acl_type()
def _get_config(self):
check_items = ('hawq_master_address_host', 'hawq_master_address_port',
'hawq_master_directory', 'hawq_segment_directory',
'hawq_segment_address_port', 'hawq_dfs_url',
'hawq_master_temp_directory', 'hawq_segment_temp_directory')
for item in check_items:
if item not in self.hawq_dict:
sys.exit("Check: %s not configured in hawq-site.xml" % item)
self.master_host_name = self.hawq_dict['hawq_master_address_host']
self.master_port = self.hawq_dict['hawq_master_address_port']
self.master_data_directory = self.hawq_dict['hawq_master_directory']
self.master_address = self.master_host_name + ":" + self.master_port
self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
self.segment_port = self.hawq_dict['hawq_segment_address_port']
self.dfs_url = self.hawq_dict['hawq_dfs_url']
self.host_list = parse_hosts_file(self.GPHOME)
self.hosts_count_number = len(self.host_list)
if 'hawq_standby_address_host' in self.hawq_dict:
self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
self.standby_port = self.master_port
self.standby_address = self.standby_host_name + ":" + self.standby_port
else:
logger.info("No standby host configured")
self.standby_host_name = ''
def check_hawq_acl_type(self):
try:
dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
conn = dbconn.connect(dburl, True)
query = "select name, setting from pg_catalog.pg_settings where name='hawq_acl_type';"
rows = dbconn.execSQL(conn, query)
conn.close()
except DatabaseError, ex:
logger.warning("Failed to connect to database, cannot get hawq_acl_type")
return "unknown"
for row in rows:
if row[1].lower() == 'ranger':
return "ranger"
return "standalone"
def _stop_master_checks(self):
try:
total_connections = 0
self.dburl = dbconn.DbURL(hostname=self.master_host_name, port=self.master_port, username=self.user, dbname='template1')
self.conn = dbconn.connect(self.dburl, utility=True)
total_connections=len(catalog.getUserPIDs(self.conn))
self.conn.close()
logger.info("There are %d connections to the database" % total_connections)
except DatabaseError, ex:
logger.error("Failed to connect to the running database, please check master status")
logger.error("Or you can check hawq stop --help for other stop options")
sys.exit(1)
if total_connections > 0 and self.stop_mode=='smart':
logger.warning("There are other connections to this instance, shutdown mode smart aborted")
logger.warning("Either remove connections, or use 'hawq stop master -M fast' or 'hawq stop master -M immediate'")
logger.warning("See hawq stop --help for all options")
logger.error("Active connections. Aborting shutdown...")
sys.exit(1)
logger.info("Commencing Master instance shutdown with mode='%s'" % self.stop_mode)
logger.info("Master host=%s" % self.master_host_name)
if self.stop_mode == 'smart':
pass
elif self.stop_mode == 'fast':
logger.info("Detected %d connections to database" % total_connections)
if total_connections > 0:
logger.info("Switching to WAIT mode")
logger.info("Will wait for shutdown to complete, this may take some time if")
logger.info("there are a large number of active complex transactions, please wait...")
else:
if self.timeout == SEGMENT_TIMEOUT_DEFAULT:
logger.info("Using standard WAIT mode of %s seconds" % SEGMENT_TIMEOUT_DEFAULT)
else:
logger.info("Using WAIT mode of %s seconds" % self.timeout)
pass
def _stop_master_cmd(self):
logger.info("%s hawq master" % self.stop_action.title())
if self.hawq_reload:
cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
(source_hawq_env, self.GPHOME, self.master_data_directory, log_filename)
return cmd_str
else:
cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
(source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory,
self.master_data_directory, self.stop_mode, log_filename)
return cmd_str
def _stop_master(self):
master_host, master_running = check_hawq_running(self.master_host_name, self.master_data_directory, self.master_port, self.user, logger)
if master_running:
if self.stop_mode != 'immediate' and not self.hawq_reload:
self._stop_master_checks()
cmd = self._stop_master_cmd()
result = remote_ssh(cmd, self.master_host_name, self.user)
return result
else:
logger.warn('Master is not running, skip')
return 0
def _stop_segment_cmd(self):
logger.info("%s hawq segment" % self.stop_action.title())
if self.hawq_reload:
cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
(source_hawq_env, self.GPHOME, self.segment_data_directory, log_filename)
return cmd_str
else:
# If stop cluster, firstly master stopped, then segment maybe hung at recv() from master.
# So here change segment stop mode from smart to fast, so that segment don't wait
# connection from master to be closed.
seg_stop_mod = self.stop_mode
if(self.node_type == 'cluster' and seg_stop_mod =='smart'):
seg_stop_mod = 'fast'
cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
(source_hawq_env, self.GPHOME, self.timeout, self.segment_data_directory,
self.segment_data_directory, seg_stop_mod, log_filename)
return cmd_str
def _stop_segment(self):
seg_host, segment_running = check_hawq_running('localhost', self.segment_data_directory, self.segment_port, self.user, logger)
if segment_running:
cmd = self._stop_segment_cmd()
result = remote_ssh(cmd, 'localhost', self.user)
return result
else:
#logger.warning('')
return 0
def _stop_standby_cmd(self):
logger.info("%s hawq standby master" % self.stop_action.title())
if self.hawq_reload:
cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
(source_hawq_env, self.GPHOME, self.master_data_directory, log_filename)
return cmd_str
else:
cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
(source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory,
self.master_data_directory, self.stop_mode, log_filename)
return cmd_str
def _stop_standby(self):
if check_syncmaster_running(self.master_data_directory, '', self.standby_host_name, logger):
cmd = self._stop_standby_cmd()
result = remote_ssh(cmd, self.standby_host_name, self.user)
return result
else:
logger.warn("Standby master is not running, skip")
return 0
def _stopAll(self):
logger.info("%s hawq cluster" % self.stop_action.title())
master_result = 0
standby_result = 0
segments_return_flag = 0
master_result = self._stop_master()
if master_result != 0:
logger.error("Master %s failed" % self.stop_action)
else:
logger.info("Master %s successfully" % self.stop_action_past)
if self.standby_host_name.lower() not in ('', 'none'):
standby_result = self._stop_standby()
if standby_result != 0:
logger.error("Standby master %s failed" % self.stop_action)
else:
logger.info("Standby master %s successfully" % self.stop_action_past)
if self.hawq_acl_type in ['ranger', 'unknown']:
self._stop_rps(self.master_host_name, self.hawq_acl_type)
if self.standby_host_name.lower() not in ('', 'none'):
self._stop_rps(self.standby_host_name, self.hawq_acl_type)
# Execute segment stop command on each node.
segments_return_flag = self._stopAllSegments()
cluster_result = master_result + standby_result + segments_return_flag
if cluster_result != 0:
logger.error("Cluster %s failed" % self.stop_action)
else:
logger.info("Cluster %s successfully" % self.stop_action_past)
return cluster_result
def _running_segments_list(self, host_list):
work_list = []
running_host = []
stopped_host = []
seg_check_q = Queue.Queue()
for host in host_list:
work_list.append({"func":check_hawq_running,"args":(host, self.segment_data_directory, self.segment_port, self.user, logger)})
node_checks = threads_with_return(name = 'HAWQ', action_name = 'check', logger = logger, return_values = seg_check_q)
node_checks.get_function_list(work_list)
node_checks.start()
while not seg_check_q.empty():
item = seg_check_q.get()
if item[1] == True:
running_host.append(item[0])
else:
stopped_host.append(item[0])
return running_host, stopped_host
def _stopAllSegments(self):
bad_hosts = []
working_hosts = self.host_list
segment_cmd_str = self._stop_segment_cmd()
logger.info("%s segments in list: %s" % (self.stop_action.title(), self.host_list))
working_hosts, bad_hosts = exclude_bad_hosts(self.host_list)
if len(bad_hosts) == len(self.host_list):
logger.error("Unable to SSH on any of the hosts, skipping segment %s operation" % self.stop_action)
return 1
process_running_host, stopped_host = self._running_segments_list(working_hosts)
# Execute segment stop command on specified nodes.
if self.ignore_bad_hosts:
if len(bad_hosts) > 0:
logger.warning("Skipping {1} segments in the list {0}, SSH test failed".format(bad_hosts, self.stop_action))
skip_host_list = bad_hosts + stopped_host
else:
skip_host_list = stopped_host
work_list = []
q = Queue.Queue()
for host in process_running_host:
work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
logger.info("Total segment number is: %s" % len(self.host_list))
work_list.append({"func":check_progress,"args":(q, len(process_running_host), self.stop_action, len(skip_host_list), self.quiet)})
node_init = HawqCommands(name = 'HAWQ', action_name = self.stop_action, logger = logger)
node_init.get_function_list(work_list)
node_init.start()
if self.ignore_bad_hosts:
total_return_flag = node_init.return_flag
else:
if len(bad_hosts) > 0:
logger.error("%s segment %s failed, SSH test failed on %s" % (len(bad_hosts), self.stop_action, bad_hosts))
total_return_flag = node_init.return_flag + len(bad_hosts)
if total_return_flag != 0:
logger.error("Segments %s failed" % self.stop_action)
else:
logger.info("Segments %s successfully" % self.stop_action_past)
return total_return_flag
def _stop_rps(self, rps_hostname, acl_type):
if self.hawq_reload:
# not stop RPS when running 'hawq stop --reload'.
return
check_ret = True
if acl_type == 'unknown':
logger.warning("Try to stop RPS when hawq_acl_type is unknown")
check_ret = False
logger.info("Stop ranger plugin service on %s" % rps_hostname)
cmd_str = "%s/ranger/bin/rps.sh stop" % (self.GPHOME)
result = remote_ssh(cmd_str, rps_hostname, self.user)
if check_ret:
check_return_code(result, logger, \
"Ranger plugin service stop failed on %s, exit" % rps_hostname, \
"Ranger plugin service stopped on %s successfully" % rps_hostname)
def run(self):
if self.node_type == "master":
check_return_code(self._stop_master(), logger, \
"Master %s failed, exit" % self.stop_action, "Master %s successfully" % self.stop_action_past)
if self.hawq_acl_type in ['ranger', 'unknown']:
self._stop_rps(self.master_host_name, self.hawq_acl_type)
elif self.node_type == "standby":
if self.standby_host_name.lower() not in ('', 'none'):
check_return_code(self._stop_standby(), logger, \
"Standby master %s failed, exit" % self.stop_action, "Standby master %s successfully" % self.stop_action_past)
if self.hawq_acl_type in ['ranger', 'unknown']:
self._stop_rps(self.standby_host_name, self.hawq_acl_type)
elif self.node_type == "segment":
check_return_code(self._stop_segment(), logger, \
"Segment %s failed, exit" % self.stop_action, "Segment %s successfully" % self.stop_action_past)
elif self.node_type == "cluster":
check_return_code(self._stopAll())
elif self.node_type == "allsegments":
check_return_code(self._stopAllSegments())
else:
sys.exit('Node object should be in [master, standby, segment, allsegments, cluster]')
return None
def create_logger(logname='hawq logger', outputFile='/tmp/hawq_mgmt.log'):
logger = logging.getLogger(logname)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(outputFile)
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
#logger.addHandler(ch)
return logger
def get_args():
(opts, ARGS) = create_parser()
if ARGS[0]:
opts.hawq_command = ARGS[0]
if ARGS[1]:
opts.node_type = ARGS[1]
if opts.node_type in ['master', 'standby', 'segment', 'cluster', 'allsegments'] and opts.hawq_command in ['start', 'stop', 'restart', 'init', 'activate']:
if opts.log_dir and not os.path.exists(opts.log_dir):
os.makedirs(opts.log_dir)
global logger, log_filename
if opts.verbose:
enable_verbose_logging()
if opts.quiet_run:
quiet_stdout_logging()
logger, log_filename = setup_hawq_tool_logging('hawq_%s' % opts.hawq_command,getLocalHostname(),getUserName(), opts.log_dir)
logger.info("Prepare to do 'hawq %s'" % opts.hawq_command)
logger.info("You can find log in:")
logger.info(log_filename)
else:
print COMMON_HELP
sys.exit(1)
args_lens = len(ARGS)
if args_lens != 2:
sys.exit('Only support two arguements.')
if opts.verbose and opts.quiet_run:
logger.error("Multiple actions specified. See the --help info.")
sys.exit(1)
if opts.special_mode and (opts.hawq_command != 'start'):
logger.error("['-U', '--special-mode'] only apply to 'hawq start'. See the --help info.")
sys.exit(1)
opts.GPHOME = os.getenv('GPHOME')
if not opts.GPHOME:
logger.error("Didn't get GPHOME value, exit")
sys.exit()
logger.info("GPHOME is set to:")
logger.info(opts.GPHOME)
global source_hawq_env
source_hawq_env = ". %s/greenplum_path.sh" % opts.GPHOME
if opts.user == "":
opts.user = getpass.getuser()
if opts.user == "root":
logger.error("'root' user is not allowed")
sys.exit()
logger.debug("Current user is '%s'" % opts.user)
logger.debug("Parsing config file:")
logger.debug("%s/etc/hawq-site.xml" % opts.GPHOME)
hawqsite = HawqXMLParser(opts.GPHOME)
hawqsite.get_all_values()
hawq_dict = hawqsite.hawq_dict
cluster_host_list = list()
cluster_host_list.append(hawq_dict['hawq_master_address_host'])
if 'hawq_standby_address_host' in hawq_dict:
cluster_host_list.append(hawq_dict['hawq_standby_address_host'])
segments_host_list = parse_hosts_file(opts.GPHOME)
for host in segments_host_list:
cluster_host_list.append(host)
if opts.log_dir:
logger.debug("Check and create log directory %s on all hosts" % opts.log_dir)
create_success_host, create_failed_host = create_cluster_directory(opts.log_dir, cluster_host_list)
if len(create_failed_host) > 0:
logger.error("Create log directory %s failed on hosts %s" % (opts.log_dir, create_failed_host))
logger.error("Please check directory permission")
if opts.hawq_reload:
logger.info("Reloading configuration without restarting hawq cluster")
else:
logger.info("%s hawq with args: %s" % (opts.hawq_command.title(), ARGS))
return opts, hawq_dict
def remote_ssh(cmd_str, host, user, q=None):
if user == "":
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd_str)
else:
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd_str)
try:
result = subprocess.Popen(remote_cmd_str, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout,stderr = result.communicate()
except subprocess.CalledProcessError:
print "Execute shell command on %s failed" % host
pass
if stdout and stdout != '':
logger.info(stdout.strip())
if stderr and stderr != '':
logger.info(stderr.strip())
if q:
q.put(("done", host, result.returncode))
return result.returncode
def remote_ssh_nowait(cmd, host, user):
if user == "":
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd)
else:
remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd)
result = subprocess.Popen(remote_cmd_str, shell=True).wait()
return result
def check_progress(q, total_num, action, skip_num = 0, quiet=False):
working_num = total_num
success_num = 0
pnum = 0
sys.stdout.write("\r")
while working_num > 0:
while not q.empty():
msg = q.get()
if msg[0] == "done":
working_num = working_num - 1
if msg[2] == 0:
success_num = success_num + 1
if not quiet:
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
if not quiet:
sys.stdout.write("\n")
if skip_num != 0:
logger.info("%d of %d segments %s successfully, %d segments %s skipped" % (success_num, total_num, action, skip_num, action))
else:
logger.info("%d of %d segments %s successfully" % (success_num, total_num, action))
return 0
def start_hawq(opts, hawq_dict):
instance = HawqStart(opts, hawq_dict)
instance.run()
return None
def stop_hawq(opts, hawq_dict):
instance = HawqStop(opts, hawq_dict)
instance.run()
return None
def hawq_init(opts, hawq_dict):
instance = HawqInit(opts, hawq_dict)
instance.run()
return None
def hawq_activate_standby(opts, hawq_dict):
old_master_host_name = hawq_dict['hawq_master_address_host']
hawq_master_directory = hawq_dict['hawq_master_directory']
if 'hawq_standby_address_host' in hawq_dict:
if hawq_dict['hawq_standby_address_host'].lower() not in ['none', '', 'localhost']:
old_standby_host_name = hawq_dict['hawq_standby_address_host']
new_master_host_name = hawq_dict['hawq_standby_address_host']
logger.info("Starting to activate standby master '%s'" % old_standby_host_name)
else:
logger.error("No valid standby host name found, skip activate standby")
# Try to stop hawq cluster before doing standby activate.
if check_postgres_running(hawq_master_directory, '', old_master_host_name, logger):
logger.info("Try to stop hawq master before activate standby")
cmd = "%s; hawq stop master -a -M fast -q;" % source_hawq_env
result = remote_ssh(cmd, old_master_host_name, '')
if result != 0:
logger.error("Stop master failed, try again with immediate mode")
cmd = "%s; hawq stop master -a -M immediate -q;" % source_hawq_env
return_result = remote_ssh(cmd, old_master_host_name, '')
if return_result != 0:
logger.error("Stop master failed, abort")
logger.error("Please manually bring hawq cluster down, then do activate standby again")
sys.exit(1)
logger.info("Master is stopped")
else:
logger.info("HAWQ master is not running, skip")
ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
logger.info("Stopping all the running segments")
cmd = "%s; hawq stop allsegments -a -M fast -q %s;" % (source_hawq_env, ignore_bad_hosts)
result = remote_ssh(cmd, old_standby_host_name, '')
if result != 0:
logger.error("Stop segments failed, abort")
logger.error("Please manually bring hawq cluster down, then do activate standby again")
sys.exit(1)
logger.info("Stopping running standby")
if check_syncmaster_running(hawq_master_directory, '', old_standby_host_name, logger):
cmd = "%s; hawq stop standby -a -M fast -q;" % source_hawq_env
result = remote_ssh(cmd, old_standby_host_name, '')
if result != 0:
logger.error("Stop standby failed, abort")
logger.error("Please manually bring hawq cluster down, then do activate standby again")
sys.exit(1)
else:
logger.info("Standby master is not running, skip")
# Set current standby host name as the new master host name in configuration.
logger.info("Update master host name in hawq-site.xml")
cmd = "%s; hawq config -c hawq_master_address_host -v %s --skipvalidation -q %s" % (source_hawq_env, hawq_dict['hawq_standby_address_host'], ignore_bad_hosts)
check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Set hawq_master_address_host failed")
# Remove the old standby host configuration from hawq-site.xml.
logger.info("Remove current standby from hawq-site.xml")
cmd = "%s; hawq config -r hawq_standby_address_host --skipvalidation -q %s" % (source_hawq_env, ignore_bad_hosts)
check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Remove hawq_standby_address_host from configuration failed")
cmd = '''echo "gp_persistent_repair_global_sequence = true" >> %s/%s''' % (hawq_dict['hawq_master_directory'], 'postgresql.conf')
check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Set gp_persistent_repair_global_sequence = true failed")
# Start the new master in master only mode.
logger.info("Start master in master only mode")
cmd = "%s; hawq start master --masteronly -q" % source_hawq_env
check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start master in master only mode failed")
# Remove the old standby information in database.
logger.info("Remove current standby from catalog")
cmd = "%s; env PGOPTIONS=\\\"-c gp_session_role=utility\\\" psql -p %s -d template1 -o /dev/null -c \\\"select gp_remove_master_standby()\
where (select count(*) from gp_segment_configuration where role='s') = 1;\\\"" % (source_hawq_env, hawq_dict['hawq_master_address_port'])
result = remote_ssh(cmd, new_master_host_name, '')
# Try to restart hawq cluster.
logger.info("Stop hawq master")
cmd = "%s; hawq stop master -a -M fast -q" % source_hawq_env
check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Stop master failed")
logger.info("Start hawq cluster")
cmd = "%s; hawq start master" % source_hawq_env
check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start master failed")
cmd = "%s; hawq start allsegments %s" % (source_hawq_env, ignore_bad_hosts)
check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start all the segments failed")
cmd = '''sed -i "/gp_persistent_repair_global_sequence/d" %s/%s''' % (hawq_dict['hawq_master_directory'], 'postgresql.conf')
check_return_code(remote_ssh(cmd, new_master_host_name, ''))
logger.info("HAWQ activate standby successfully")
return None
def restart_hawq(opts, hawq_dict):
logger.info("Restarting hawq:")
stop_hawq(opts, hawq_dict)
start_hawq(opts, hawq_dict)
return None
def create_parser():
parser = OptionParser(usage="HAWQ management scripts options")
parser.add_option("-a", "--prompt",
action="store_false",
dest="prompt",
default=True,
help="Execute automatically")
parser.add_option("-M", "--mode",
choices=['smart', 'immediate', 'fast'],
dest="stop_mode",
default="smart",
help="HAWQ stop mode: smart/fast/immediate")
parser.add_option("-v", "--verbose",
action="store_true",
dest="verbose",
help="Execute with verbose output")
parser.add_option("-q", "--quiet",
action="store_true",
dest="quiet_run",
help="Execute in quiet mode")
parser.add_option("-l", "--logdir",
dest="log_dir",
help="Sets the directory for log files")
parser.add_option("-t", "--timeout",
dest="timeout_seconds",
default="600",
help="Set the timeout seconds, default is 600")
parser.add_option("--user",
dest="user",
default="",
help="Sets HAWQ user")
parser.add_option("-u", "--reload",
dest="hawq_reload",
action="store_true",
default=False,
help="Reload hawq configuration")
parser.add_option("-m", "--masteronly",
dest="masteronly",
action="store_true",
default=False,
help="Start hawq in utility mode")
parser.add_option("-U", "--special-mode",
choices=['upgrade', 'maintenance'],
dest="special_mode",
help="Start hawq in upgrade/maintenance mode")
parser.add_option("-R", "--restrict",
dest="restrict",
action="store_true",
default=False,
help="Start hawq in restrict mode")
parser.add_option('-r', '--remove-standby', action='store_true',
dest='remove_standby', default=False,
help='Delete hawq standby master node.')
parser.add_option('-s', '--standby-host', type='string',
dest='new_standby_hostname', default='none',
help='Hostname of system to create standby master on')
parser.add_option('-n', '--no-update', action='store_true',
dest='no_update', default=False,
help='Do not update system catalog tables.')
parser.add_option('-i', '--ignore-bad-hosts',
dest='ignore_bad_hosts', action='store_true',
default=False,
help='Skips syncing configuration files on hosts on which SSH fails')
parser.add_option("--bucket_number",
type="int",
dest="default_hash_table_bucket_number",
help="Sets maximum number of virtual segments per node")
parser.add_option("--tde_keyname",
dest="tde_keyname",
default="",
help="Sets the encryption zone key(EZK) name for the hawq directory(hawq_dfs_url)")
parser.add_option("--locale",
dest="hawq_locale",
default="en_US.utf8",
help="Sets the locale name")
parser.add_option("--lc-collate",
dest="hawq_lc_collate",
default="en_US.utf8",
help="Sets the string sort order")
parser.add_option("--lc-ctype",
dest="hawq_lc_ctype",
default="en_US.utf8",
help="Sets character classification")
parser.add_option("--lc-messages",
dest="hawq_lc_messages",
default="en_US.utf8",
help="Sets the language in which messages are displayed")
parser.add_option("--lc-monetary",
dest="hawq_lc_monetary",
default="en_US.utf8",
help="Sets the locale to use for formatting monetary amounts")
parser.add_option("--lc-numeric",
dest="hawq_lc_numeric",
default="en_US.utf8",
help="Sets the locale to use for formatting numbers")
parser.add_option("--lc-time",
dest="hawq_lc_time",
default="en_US.utf8",
help="Sets the locale to use for formatting dates and times")
parser.add_option("--max_connections",
dest="max_connections",
default="1280",
help="Sets the max_connections for formatting hawq database")
parser.add_option("--shared_buffers",
dest="shared_buffers",
default="128000kB",
help="Sets the shared_buffers for formatting hawq database")
(options, args) = parser.parse_args()
if len(args) == 0:
parser.print_help()
sys.exit(1)
return (options, args)
if __name__ == '__main__':
(opts, hawq_dict) = get_args()
if opts.hawq_command == 'start':
start_hawq(opts, hawq_dict)
elif opts.hawq_command == 'stop':
if opts.prompt:
if not userinput.ask_yesno(None, "\nContinue with HAWQ service stop", 'N'):
sys.exit(1)
stop_hawq(opts, hawq_dict)
elif opts.hawq_command == 'restart':
restart_hawq(opts, hawq_dict)
elif opts.hawq_command == 'init':
if opts.node_type == 'standby':
warn_msg = "\nMaster will get restarted, continue with standby init"
else:
warn_msg = "\nContinue with HAWQ init"
if opts.prompt:
if not userinput.ask_yesno(None, warn_msg, 'N'):
sys.exit(1)
hawq_init(opts, hawq_dict)
elif opts.hawq_command == 'activate':
if opts.prompt:
if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
sys.exit(1)
hawq_activate_standby(opts, hawq_dict)
else:
print COMMON_HELP