| #!/usr/bin/env python |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| try: |
| import os |
| import sys |
| import re |
| import logging, time |
| import subprocess |
| import threading |
| import Queue |
| import signal |
| import getpass |
| from optparse import OptionParser |
| from gppylib.gplog import setup_hawq_tool_logging, quiet_stdout_logging, enable_verbose_logging |
| from gppylib.commands.unix import getLocalHostname, getUserName |
| from gppylib.commands import gp |
| from gppylib.commands.gp import SEGMENT_TIMEOUT_DEFAULT |
| from gppylib import userinput |
| from gppylib.db import catalog |
| from gppylib.commands import unix |
| from hawqpylib.hawqlib import * |
| from hawqpylib.HAWQ_HELP import * |
| from gppylib.db import dbconn |
| from pygresql.pg import DatabaseError |
| except ImportError, e: |
| sys.exit('ERROR: Cannot import modules. Please check that you ' |
| 'have sourced greenplum_path.sh. Detail: ' + str(e)) |
| |
| |
| class HawqInit: |
| def __init__(self, opts, hawq_dict): |
| self.node_type = opts.node_type |
| self.hawq_command = opts.hawq_command |
| self.user= opts.user |
| self.GPHOME = opts.GPHOME |
| self.stop_mode = opts.stop_mode |
| self.log_dir = opts.log_dir |
| self.timeout = opts.timeout_seconds |
| self.no_update = opts.no_update |
| self.remove_standby = opts.remove_standby |
| self.new_standby_hostname = opts.new_standby_hostname |
| self.hawq_dict = hawq_dict |
| self.locale = opts.hawq_locale |
| self.quiet = opts.quiet_run |
| self.hawq_lc_collate= opts.hawq_lc_collate |
| self.hawq_lc_ctype = opts.hawq_lc_ctype |
| self.hawq_lc_messages = opts.hawq_lc_messages |
| self.hawq_lc_monetary = opts.hawq_lc_monetary |
| self.hawq_lc_numeric = opts.hawq_lc_numeric |
| self.hawq_lc_time = opts.hawq_lc_time |
| self.max_connections = opts.max_connections |
| self.shared_buffers = opts.shared_buffers |
| self.default_hash_table_bucket_number = opts.default_hash_table_bucket_number |
| self.lock = threading.Lock() |
| self.ignore_bad_hosts = opts.ignore_bad_hosts |
| self._get_config() |
| self._write_config() |
| self._get_ips() |
| |
| def _get_config(self): |
| check_items = ('hawq_master_address_host', 'hawq_master_address_port', |
| 'hawq_master_directory', 'hawq_segment_directory', |
| 'hawq_segment_address_port', 'hawq_dfs_url', |
| 'hawq_master_temp_directory', 'hawq_segment_temp_directory') |
| |
| if self.node_type in ['master', 'segment', 'standby']: |
| for item in check_items: |
| if item in self.hawq_dict: |
| logger.info("Check: %s is set" % item) |
| else: |
| sys.exit("Check: %s not configured in hawq-site.xml" % item) |
| |
| self.master_host_name = self.hawq_dict['hawq_master_address_host'] |
| self.master_port = self.hawq_dict['hawq_master_address_port'] |
| self.master_data_directory = self.hawq_dict['hawq_master_directory'] |
| self.master_address = self.master_host_name + ":" + self.master_port |
| self.segment_data_directory = self.hawq_dict['hawq_segment_directory'] |
| self.segment_port = self.hawq_dict['hawq_segment_address_port'] |
| self.hawq_master_temp_directory = self.hawq_dict['hawq_master_temp_directory'] |
| self.hawq_segment_temp_directory = self.hawq_dict['hawq_segment_temp_directory'] |
| self.dfs_url = self.hawq_dict['hawq_dfs_url'] |
| self.host_list = parse_hosts_file(self.GPHOME) |
| self.hosts_count_number = len(self.host_list) |
| |
| if 'hawq_standby_address_host' in self.hawq_dict: |
| self.standby_host_name = self.hawq_dict['hawq_standby_address_host'] |
| self.standby_port = self.master_port |
| self.standby_address = self.standby_host_name + ":" + self.standby_port |
| if self.standby_host_name in (self.master_host_name, 'localhost', '127.0.0.1'): |
| logger.error("Standby host should not be the same as master host") |
| sys.exit(1) |
| else: |
| self.standby_host_name = '' |
| |
| if not self.default_hash_table_bucket_number and 'default_hash_table_bucket_number' in self.hawq_dict: |
| self.default_hash_table_bucket_number = self.hawq_dict['default_hash_table_bucket_number'] |
| |
| if self.new_standby_hostname != 'none': |
| self.standby_host_name = self.new_standby_hostname |
| |
| if self.standby_host_name.lower() in ('', 'none'): |
| logger.info("No standby host configured, skip it") |
| |
| if 'enable_secure_filesystem' in self.hawq_dict: |
| self.enable_secure_filesystem = self.hawq_dict['enable_secure_filesystem'] |
| else: |
| self.enable_secure_filesystem = 'off' |
| |
| if 'krb_server_keyfile' in self.hawq_dict: |
| self.krb_server_keyfile = self.hawq_dict['krb_server_keyfile'] |
| else: |
| self.krb_server_keyfile = '' |
| |
| if 'krb_srvname' in self.hawq_dict: |
| self.krb_srvname = self.hawq_dict['krb_srvname'] |
| else: |
| self.krb_srvname = 'postgres' |
| |
| def _write_config(self): |
| configFile = "%s/etc/_mgmt_config" % self.GPHOME |
| # Clean configFile while the first write in. |
| with open(configFile, 'w') as f: |
| f.write("GPHOME=%s\n" % self.GPHOME) |
| with open(configFile, 'a') as f: |
| f.write("hawqUser=%s\n" % self.user) |
| f.write("master_host_name=%s\n" % self.master_host_name) |
| f.write("master_port=%s\n" % self.master_port) |
| f.write("master_data_directory=%s\n" % self.master_data_directory) |
| f.write("segment_data_directory=%s\n" % self.segment_data_directory) |
| f.write("segment_port=%s\n" % self.segment_port) |
| f.write("hawq_master_temp_directory=%s\n" % self.hawq_master_temp_directory) |
| f.write("hawq_segment_temp_directory=%s\n" % self.hawq_segment_temp_directory) |
| f.write("locale=%s\n" % self.locale) |
| f.write("hawq_lc_collate=%s\n" % self.hawq_lc_collate) |
| f.write("hawq_lc_ctype=%s\n" % self.hawq_lc_ctype) |
| f.write("hawq_lc_messages=%s\n" % self.hawq_lc_messages) |
| f.write("hawq_lc_monetary=%s\n" % self.hawq_lc_monetary) |
| f.write("hawq_lc_numeric=%s\n" % self.hawq_lc_numeric) |
| f.write("hawq_lc_time=%s\n" % self.hawq_lc_time) |
| f.write("max_connections=%s\n" % self.max_connections) |
| f.write("shared_buffers=%s\n" % self.shared_buffers) |
| f.write("dfs_url=%s\n" % self.dfs_url) |
| f.write("log_filename=%s\n" % log_filename) |
| f.write("log_dir=%s\n" % self.log_dir) |
| if self.standby_host_name.lower() not in ('', 'none'): |
| f.write("standby_host_name=%s\n" % self.standby_host_name) |
| |
| def _get_ips(self): |
| cmd = "%s/bin/lib/get_ip_addresses_of_host.sh %s master_ip_address_all" % (self.GPHOME, self.master_host_name) |
| local_ssh(cmd, logger) |
| if self.standby_host_name.lower() not in ('', 'none') and not self.remove_standby: |
| cmd = "%s/bin/lib/get_ip_addresses_of_host.sh %s standby_ip_address_all" % (self.GPHOME, self.standby_host_name) |
| local_ssh(cmd, logger) |
| |
| def check_hdfs_path(self): |
| cmd = "%s/bin/gpcheckhdfs hdfs %s %s %s %s" % \ |
| (self.GPHOME, self.dfs_url, self.enable_secure_filesystem, self.krb_srvname, self.krb_server_keyfile) |
| logger.info("Check if hdfs path is available") |
| logger.debug("Check hdfs: %s" % cmd) |
| check_return_code(local_ssh(cmd, logger, warning = True), logger, "Check hdfs failed, please verify your hdfs settings") |
| |
| def set_new_standby_host(self): |
| if self.new_standby_hostname == self.master_host_name: |
| logger.error("Standby host name can't be the same as master host name") |
| sys.exit(1) |
| cmd = "%s; hawq config -c hawq_standby_address_host -v %s --skipvalidation -q > /dev/null" % \ |
| (source_hawq_env, self.new_standby_hostname) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.warn("Set standby host name failed") |
| return result |
| |
| def sync_hdfs_client(self): |
| sync_host_str = "" |
| for node in self.host_list: |
| sync_host_str += " -h %s" % node |
| |
| if 'hawq_standby_address_host' in self.hawq_dict and self.standby_host_name.lower() not in ('', 'none'): |
| sync_host_str += " -h %s" % self.standby_host_name |
| |
| result = local_ssh("hawq scp %s %s/etc/hdfs-client.xml =:%s/etc/" % (sync_host_str, self.GPHOME, self.GPHOME)) |
| if result != 0: |
| logger.error("Sync hdfs-client.xml failed.") |
| sys.exit(1) |
| |
| def set_replace_datanode_on_failure(self): |
| xml_file = "%s/etc/hdfs-client.xml" % self.GPHOME |
| property_name = 'output.replace-datanode-on-failure' |
| pexist, pname, pvalue = check_property_exist_xml(xml_file, property_name) |
| datanodes_threshold = 4 |
| |
| if self.hosts_count_number < datanodes_threshold: |
| property_value = 'false' |
| else: |
| property_value = 'true' |
| |
| if pvalue != property_value: |
| logger.info("Set output.replace-datanode-on-failure to %s" % property_value) |
| if pexist: |
| logger.debug("Update output.replace-datanode-on-failure to %s" % property_value) |
| update_xml_property(xml_file, property_name, property_value) |
| else: |
| logger.debug("Add output.replace-datanode-on-failure as %s" % property_value) |
| update_xml_property(xml_file, property_name, property_value) |
| |
| self.sync_hdfs_client() |
| |
| # Check hdfs-client.xml again to validate changes |
| pexist, pname, pvalue = check_property_exist_xml(xml_file, property_name) |
| if pvalue == property_value: |
| result = 0 |
| else: |
| result = 1 |
| else: |
| result = 0 |
| |
| return result |
| |
| def set_default_hash_table_bucket_number(self): |
| if not self.default_hash_table_bucket_number: |
| if 'hawq_rm_nvseg_perquery_limit' in self.hawq_dict: |
| hawq_rm_nvseg_perquery_limit = self.hawq_dict['hawq_rm_nvseg_perquery_limit'] |
| else: |
| hawq_rm_nvseg_perquery_limit = 512 |
| |
| factor_min = 1 |
| factor_max = 6 |
| limit = int(hawq_rm_nvseg_perquery_limit) |
| if int(self.hosts_count_number) == 0: |
| segments_num = 1 |
| else: |
| segments_num = int(self.hosts_count_number) |
| |
| factor = limit / segments_num |
| # if too many segments or default limit is too low --> stick with the limit |
| if factor < factor_min: |
| buckets = limit |
| # if the limit is large and results in factor > max --> limit factor to max |
| elif factor > factor_max: |
| buckets = factor_max * segments_num |
| else: |
| buckets = factor * segments_num |
| |
| self.default_hash_table_bucket_number = buckets |
| |
| logger.info("Set default_hash_table_bucket_number as: %s" % self.default_hash_table_bucket_number) |
| ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else '' |
| cmd = "hawq config -c default_hash_table_bucket_number -v %s --skipvalidation -q %s > /dev/null" % \ |
| (self.default_hash_table_bucket_number, ignore_bad_hosts) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Set default_hash_table_bucket_number failed") |
| return result |
| |
| def _get_master_init_cmd(self): |
| cmd = "%s/bin/lib/hawqinit.sh master '%s'" % \ |
| (self.GPHOME, self.GPHOME) |
| return cmd |
| |
| def _get_standby_init_cmd(self): |
| cmd = "%s/bin/lib/hawqinit.sh standby '%s'" % \ |
| (self.GPHOME, self.GPHOME) |
| return cmd |
| |
| def hawq_remove_standby(self): |
| """Removes the standby master""" |
| running_standby_host = '' |
| |
| try: |
| dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1') |
| conn = dbconn.connect(dburl, True) |
| query = "select role, hostname from gp_segment_configuration where role = 's';" |
| rows = dbconn.execSQL(conn, query) |
| conn.close() |
| except DatabaseError, ex: |
| logger.error("Failed to connect to database, this script can only be run when the database is up") |
| sys.exit(1) |
| |
| for row in rows: |
| if row[0] == 's': |
| running_standby_host = row[1] |
| |
| if running_standby_host: |
| logger.info("running standby host is %s" % running_standby_host) |
| signal.signal(signal.SIGINT,signal.SIG_IGN) |
| logger.info("Stop HAWQ cluster") |
| cmd = "%s; hawq stop master -a -M fast -q" % source_hawq_env |
| check_return_code(local_ssh(cmd, logger), logger, "Stop HAWQ master failed, exit") |
| ignore_bad_hosts = '--ignore-bad-hosts' if self.ignore_bad_hosts else '' |
| cmd = "%s; hawq stop allsegments -a -q %s" % (source_hawq_env, ignore_bad_hosts) |
| check_return_code(local_ssh(cmd, logger), logger, "Stop HAWQ segments failed, exit") |
| logger.info("Start HAWQ master") |
| cmd = "%s; hawq start master -m -q" % source_hawq_env |
| check_return_code(local_ssh(cmd, logger), logger, "Start HAWQ master failed, exit") |
| |
| try: |
| logger.info('Remove standby from Database catalog.') |
| #dburl = dbconn.DbURL(port=self.master_port, dbname='template1') |
| #conn = dbconn.connect(dburl, utility=True, verbose=True) |
| #query = "select gp_remove_master_standby();" |
| #rows = dbconn.execSQL(conn, query) |
| #for row in rows: |
| # print row |
| #conn.close() |
| cmd = 'env PGOPTIONS="-c gp_session_role=utility" %s/bin/psql -p %s -d template1 -o /dev/null -c \ |
| "select gp_remove_master_standby();"' % (self.GPHOME, self.master_port) |
| check_return_code(local_ssh(cmd, logger), logger, \ |
| "Update catalog failed, exit", "Catalog updated successfully.") |
| logger.info("Stop HAWQ master") |
| cmd = "%s; hawq stop master -a -M fast" % source_hawq_env |
| check_return_code(local_ssh(cmd, logger), logger, "Stop hawq master failed, exit") |
| except DatabaseError, ex: |
| logger.error("Failed to connect to database, this script can only be run when the database is up") |
| cmd = "%s; hawq stop master -a -M fast" % source_hawq_env |
| check_return_code(local_ssh(cmd, logger), logger, "Stop hawq master failed, exit") |
| |
| remove_property_xml("hawq_standby_address_host", "%s/etc/hawq-site.xml" % self.GPHOME, self.quiet) |
| host_list = parse_hosts_file(self.GPHOME) |
| sync_hawq_site(self.GPHOME, host_list) |
| if is_node_alive(self.standby_host_name, self.user, logger): |
| logger.info("Check if gpsyncmaster running on %s" % running_standby_host) |
| gpsyncmaster_pid = gp.getSyncmasterPID(running_standby_host, self.master_data_directory) |
| if gpsyncmaster_pid > 0: |
| # stop it |
| logger.info('Stopping gpsyncmaster on %s' % running_standby_host) |
| gp.SegmentStop.remote('stop gpsyncmaster', |
| running_standby_host, |
| self.master_data_directory) |
| |
| tmp_dir_list = self.hawq_master_temp_directory.replace(',', ' ') |
| |
| logger.debug("rm -rf %s/* %s/*" % (self.master_data_directory, tmp_dir_list)) |
| cmd = "rm -rf %s/* %s/*" % (self.master_data_directory, tmp_dir_list) |
| result = remote_ssh(cmd, self.standby_host_name, self.user) |
| if result != 0: |
| logger.warn('Remove data files on standby master failed') |
| else: |
| logger.warn('Not able to connect to Standby master, skip node clean') |
| |
| signal.signal(signal.SIGINT,signal.default_int_handler) |
| logger.info('Remove standby master finished') |
| else: |
| logger.info("Do not find a running standby master") |
| |
| def _check_master_recovery(self): |
| cmd = "%s; %s/bin/pg_controldata %s |grep 'Database cluster state';" % \ |
| (source_hawq_env, self.GPHOME, self.master_data_directory) |
| result, stdout, stderr = remote_ssh_output(cmd, self.master_host_name, '') |
| |
| if stdout.find("recovery") != -1: |
| logger.info('Master is doing recovery start, abort init standby') |
| return 1 |
| |
| return 0 |
| |
| def _init_standby(self): |
| logger.info("Start to init standby master: '%s'" % self.standby_host_name) |
| logger.info("This might take a couple of minutes, please wait...") |
| check_return_code(self._check_master_recovery()) |
| # Sync config files from master. |
| scpcmd = "scp %s/etc/_mgmt_config %s:%s/etc/_mgmt_config > /dev/null" % \ |
| (self.GPHOME, self.standby_host_name, self.GPHOME) |
| check_return_code(local_ssh(scpcmd, logger, warning = True), \ |
| logger, "Sync _mgmt_config failed") |
| scpcmd = "scp %s/etc/hawq-site.xml %s:%s/etc/hawq-site.xml > /dev/null" % \ |
| (self.GPHOME, self.standby_host_name, self.GPHOME) |
| check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \ |
| logger, "Sync hawq-site.xml failed") |
| scpcmd = "scp %s/etc/slaves %s:%s/etc/slaves > /dev/null" % \ |
| (self.GPHOME, self.standby_host_name, self.GPHOME) |
| check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \ |
| logger, "Sync slaves file failed") |
| |
| standby_init_cmd = self._get_standby_init_cmd() |
| |
| return check_return_code(remote_ssh_nowait(standby_init_cmd, self.standby_host_name, self.user)) |
| |
| def _resync_standby(self): |
| logger.info("Re-sync standby") |
| cmd = "%s; hawq stop master -a;" % source_hawq_env |
| check_return_code(local_ssh(cmd, logger), logger, "Stop hawq cluster failed, exit") |
| cmd = "cd %s; %s; %s/bin/lib/pysync.py -x gpperfmon/data -x pg_log -x db_dumps %s %s:%s" % \ |
| (self.master_data_directory, source_hawq_env, self.GPHOME, self.master_data_directory, |
| self.standby_host_name, self.master_data_directory) |
| result = local_ssh(cmd, logger) |
| check_return_code(result, logger, "Re-sync standby master failed, exit") |
| cmd = "%s; hawq start master -a" % source_hawq_env |
| result = local_ssh(cmd, logger) |
| check_return_code(result, logger, "Start hawq cluster failed") |
| |
| return result |
| |
| def _get_segment_init_cmd(self): |
| cmd = "%s/bin/lib/hawqinit.sh segment '%s'" % \ |
| (self.GPHOME, self.GPHOME) |
| return cmd |
| |
| def _init_cluster(self): |
| logger.info("%s segment hosts defined" % self.hosts_count_number) |
| check_return_code(self.set_default_hash_table_bucket_number()) |
| check_return_code(self.set_replace_datanode_on_failure()) |
| |
| master_cmd = self._get_master_init_cmd() |
| logger.info("Start to init master node: '%s'" % self.master_host_name) |
| check_return_code(local_ssh(master_cmd, logger, warning = True), logger, \ |
| "Master init failed, exit", \ |
| "Master init successfully") |
| if self.standby_host_name.lower() not in ('', 'none'): |
| check_return_code(self._init_standby(), logger, \ |
| "Init standby failed, exit", \ |
| "Init standby successfully") |
| check_return_code(self._init_all_segments(), logger, \ |
| "Segments init failed, exit", \ |
| "Segments init successfully on nodes '%s'" % self.host_list) |
| logger.info("Init HAWQ cluster successfully") |
| |
| def _init_all_segments(self): |
| segment_cmd_str = self._get_segment_init_cmd() |
| # Execute segment init command on each segment nodes. |
| logger.info("Init segments in list: %s" % self.host_list) |
| work_list = [] |
| q = Queue.Queue() |
| for host in self.host_list: |
| logger.debug("Start to init segment on node '%s'" % host) |
| scpcmd = "scp %s/etc/_mgmt_config %s:%s/etc/_mgmt_config > /dev/null" % (self.GPHOME, host, self.GPHOME) |
| local_ssh(scpcmd, logger) |
| work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)}) |
| logger.info("Total segment number is: %s" % len(self.host_list)) |
| work_list.append({"func":check_progress,"args":(q, self.hosts_count_number, 'init', 0, self.quiet)}) |
| node_init = HawqCommands(name='HAWQ', action_name = 'init', logger = logger) |
| node_init.get_function_list(work_list) |
| node_init.start() |
| |
| return node_init.return_flag |
| |
| def run(self): |
| if self.new_standby_hostname != 'none': |
| check_return_code(self.set_new_standby_host()) |
| |
| if self.node_type == "master": |
| self.check_hdfs_path() |
| logger.info("%s segment hosts defined" % self.hosts_count_number) |
| check_return_code(self.set_default_hash_table_bucket_number()) |
| check_return_code(self.set_replace_datanode_on_failure()) |
| logger.info("Start to init master") |
| cmd = self._get_master_init_cmd() |
| check_return_code(local_ssh(cmd, logger, warning = True), logger, \ |
| "Master init failed, exit", "Master init successfully") |
| elif self.node_type == "standby" and self.remove_standby is True: |
| logger.info("Try to remove standby master") |
| self.hawq_remove_standby() |
| elif self.node_type == "standby": |
| if self.standby_host_name.lower() in ('', 'none'): |
| logger.info("No standby host found") |
| logger.info("Please check your standby host name") |
| sys.exit(1) |
| if self.no_update: |
| check_return_code(self._resync_standby(), logger, \ |
| "Standby master re-sync failed, exit", \ |
| "Standby master re-sync successfully") |
| else: |
| check_return_code(self._init_standby(), logger, \ |
| "Init standby failed, exit", \ |
| "Init standby successfully") |
| |
| elif self.node_type == "segment": |
| cmd = self._get_segment_init_cmd() |
| check_return_code(local_ssh(cmd, logger, warning = True), logger, \ |
| "Segment init failed, exit", \ |
| "Segment init successfully") |
| |
| elif self.node_type == "cluster": |
| self.check_hdfs_path() |
| self._init_cluster() |
| else: |
| sys.exit('hawq init object should be one of master/standby/segment/cluster') |
| return None |
| |
| |
| class HawqStart: |
| def __init__(self, opts, hawq_dict): |
| self.node_type = opts.node_type |
| self.hawq_command = opts.hawq_command |
| self.user= opts.user |
| self.GPHOME = opts.GPHOME |
| self.quiet = opts.quiet_run |
| self.stop_mode = opts.stop_mode |
| self.log_dir = opts.log_dir |
| self.timeout = opts.timeout_seconds |
| self.hawq_dict = hawq_dict |
| self.lock = threading.Lock() |
| self.max_connections = opts.max_connections |
| self.masteronly = opts.masteronly |
| self.special_mode = opts.special_mode |
| self.restrict = opts.restrict |
| self.ignore_bad_hosts = opts.ignore_bad_hosts |
| |
| self._get_config() |
| |
| def _get_config(self): |
| logger.info("Gathering information and validating the environment...") |
| check_items = ('hawq_master_address_host', 'hawq_master_address_port', |
| 'hawq_master_directory', 'hawq_segment_directory', |
| 'hawq_segment_address_port', 'hawq_dfs_url', |
| 'hawq_master_temp_directory', 'hawq_segment_temp_directory') |
| |
| for item in check_items: |
| if item not in self.hawq_dict: |
| logger.error("Check: %s not configured in hawq-site.xml" % item) |
| sys.exit() |
| |
| self.master_host_name = self.hawq_dict['hawq_master_address_host'] |
| self.master_port = self.hawq_dict['hawq_master_address_port'] |
| self.master_data_directory = self.hawq_dict['hawq_master_directory'] |
| self.master_address = self.master_host_name + ":" + self.master_port |
| self.segment_data_directory = self.hawq_dict['hawq_segment_directory'] |
| self.segment_port = self.hawq_dict['hawq_segment_address_port'] |
| self.dfs_url = self.hawq_dict['hawq_dfs_url'] |
| self.host_list = parse_hosts_file(self.GPHOME) |
| self.hosts_count_number = len(self.host_list) |
| |
| if 'hawq_standby_address_host' in self.hawq_dict: |
| self.standby_host_name = self.hawq_dict['hawq_standby_address_host'] |
| self.standby_port = self.master_port |
| self.standby_address = self.standby_host_name + ":" + self.standby_port |
| else: |
| logger.info("No standby host configured") |
| self.standby_host_name = '' |
| |
| def _check_recovery_start(self): |
| cmd = "%s; %s/bin/pg_controldata %s |grep 'Database cluster state';" % (source_hawq_env, self.GPHOME, self.master_data_directory) |
| result, stdout, stderr = remote_ssh_output(cmd, self.master_host_name, '') |
| |
| if stdout.find("recovery") != -1: |
| hawq_recovery_state = 'recoverying' |
| elif stdout.find("starting up") != -1: |
| hawq_recovery_state = 'starting_up' |
| elif stdout.find("in production") != -1: |
| hawq_recovery_state = 'recovery_success' |
| else: |
| hawq_recovery_state = 'recovery_failed' |
| |
| return hawq_recovery_state |
| |
| def _start_master_cmd(self): |
| logger.info("Start master service") |
| if self.masteronly: |
| start_options = "-i -M master -p %s --silent-mode=true -c gp_role=utility" % self.master_port |
| elif self.special_mode == 'upgrade': |
| start_options = "-i -M master -p %s --silent-mode=true -U" % self.master_port |
| elif self.special_mode == 'maintenance': |
| start_options = "-i -M master -p %s --silent-mode=true -m" % self.master_port |
| elif self.restrict: |
| start_options = "-i -M master -p %s --silent-mode=true -c superuser_reserved_connections=%s" % (self.master_port, self.max_connections) |
| else: |
| start_options = "-i -M master -p %s --silent-mode=true" % self.master_port |
| |
| cmd_str = "%s; %s/bin/pg_ctl start -w -t %s -D %s -l %s/pg_log/startup.log -o \\\"%s\\\" >> %s" \ |
| % (source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory, self.master_data_directory, start_options, log_filename) |
| return cmd_str |
| |
| def start_master(self): |
| cmd = self._start_master_cmd() |
| result = remote_ssh(cmd, self.master_host_name, self.user) |
| if result != 0: |
| hawq_recovery_state = self._check_recovery_start() |
| if hawq_recovery_state == 'recoverying': |
| recovery_sleep_time = 3 |
| logger.warn("Master is doing recovery, this might take minutes to hours") |
| logger.warn("Please do not interrupt it and wait patient") |
| sys.stdout.write("\r") |
| |
| while True: |
| sys.stdout.write(".") |
| sys.stdout.flush() |
| time.sleep(recovery_sleep_time) |
| if recovery_sleep_time < 60: |
| recovery_sleep_time = recovery_sleep_time + 1 |
| hawq_recovery_state = self._check_recovery_start() |
| |
| if hawq_recovery_state == 'recovery_success': |
| result = 0 |
| break |
| elif hawq_recovery_state == 'recovery_failed': |
| logger.error("HAWQ master recoverying failed") |
| result = 1 |
| break |
| else: |
| result = 0 |
| |
| sys.stdout.write("\n") |
| |
| if self.node_type == "cluster" and (self.standby_host_name.lower() not in ('', 'none')): |
| logger.info("Checking if standby is synced with master") |
| sync_result = self._check_standby_sync() |
| if sync_result == 3: |
| logger.warn("Standby is not synced as: Standby master too far behind") |
| logger.warn("Use 'hawq init standby -n' to do force sync") |
| elif sync_result == 2: |
| check_standby_sync_count = 0 |
| while self._check_standby_sync() == 2: |
| logger.info("Waiting standby to get synced for 3 seconds...") |
| time.sleep(3) |
| check_standby_sync_count += 1 |
| if check_standby_sync_count > 20: |
| break |
| |
| sync_result = self._check_standby_sync() |
| if sync_result != 0: |
| logger.warn("Standby is not synced after 60 seconds") |
| logger.warn("Use 'hawq init standby -n' to do force sync") |
| else: |
| logger.info("Standby master is synced") |
| else: |
| logger.info("Standby master is synced") |
| |
| return result |
| |
| def _start_standby_cmd(self): |
| logger.info("Start standby master service") |
| cmd_str = "%s; %s/bin/gpsyncmaster -D %s -i -p %s >> %s 2>&1 &" \ |
| % (source_hawq_env, self.GPHOME, self.master_data_directory, self.standby_port, log_filename) |
| return cmd_str |
| |
| def start_standby(self): |
| cmd = self._start_standby_cmd() |
| check_return_code(remote_ssh(cmd, self.standby_host_name, self.user)) |
| cmd = "%s; %s/sbin/hawqstandbywatch.py %s debug" % (source_hawq_env, self.GPHOME, self.master_data_directory) |
| result = remote_ssh_nowait(cmd, self.standby_host_name, self.user) |
| return result |
| |
| def _check_standby_sync(self): |
| try: |
| dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1') |
| conn = dbconn.connect(dburl, True) |
| query = "select summary_state, detail_state from gp_master_mirroring;" |
| rows = dbconn.execSQL(conn, query) |
| conn.close() |
| except DatabaseError, ex: |
| logger.error("Failed to connect to database, this script can only be run when the database is up") |
| |
| cmd = '%s/bin/psql -p %s -d template1 -c \ |
| "select summary_state, detail_state from gp_master_mirroring;"' % (self.GPHOME, self.master_port) |
| (resutl, stdout, stderr) = local_ssh_output(cmd) |
| if stdout.find('Standby master too far behind') != -1: |
| return 3 |
| |
| for row in rows: |
| if row[0] != 'Synchronized': |
| return 2 |
| else: |
| return 0 |
| |
| def _start_segment_cmd(self): |
| logger.info("Start segment service") |
| cmd_str = "%s; %s/bin/pg_ctl start -w -t %s -D %s -l %s/pg_log/startup.log -o \\\" -i -M %s -p %s --silent-mode=true\\\" >> %s" \ |
| % (source_hawq_env, self.GPHOME, self.timeout, self.segment_data_directory, self.segment_data_directory, |
| "segment", self.segment_port, log_filename) |
| |
| return cmd_str |
| |
| def start_segment(self): |
| cmd = self._start_segment_cmd() |
| result = remote_ssh(cmd, 'localhost', self.user) |
| return result |
| |
| def _start_all_nodes(self): |
| logger.info("Start all the nodes in hawq cluster") |
| |
| if self.standby_host_name.lower() not in ('', 'none'): |
| logger.info("Starting standby master '%s'" % self.standby_host_name) |
| check_return_code(self.start_standby(), logger, "Standby master start failed, exit", |
| "Standby master started successfully") |
| |
| logger.info("Starting master node '%s'" % self.master_host_name) |
| check_return_code(self.start_master(), logger, "Master start failed, exit", \ |
| "Master started successfully") |
| |
| segments_return_flag = self._start_all_segments() |
| if segments_return_flag == 0: |
| logger.info("HAWQ cluster started successfully") |
| return segments_return_flag |
| |
| def _start_all_segments(self): |
| logger.info("Start all the segments in hawq cluster") |
| logger.info("Start segments in list: %s" % self.host_list) |
| bad_hosts = [] |
| working_hosts = self.host_list |
| if self.ignore_bad_hosts: |
| working_hosts, bad_hosts = exclude_bad_hosts(self.host_list) |
| if len(bad_hosts) == len(self.host_list): |
| logger.error("Unable to SSH on any of the hosts, skipping segment start operation") |
| return |
| if len(bad_hosts) > 0: |
| logger.warning("Skipping starting segments in the list {0}, SSH test failed".format(bad_hosts)) |
| self.hosts_count_number -= len(bad_hosts) |
| |
| segment_cmd_str = self._start_segment_cmd() |
| q = Queue.Queue() |
| work_list = [] |
| for host in working_hosts: |
| work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)}) |
| logger.info("Total segment number is: %s" % len(self.host_list)) |
| work_list.append({"func":check_progress,"args":(q, self.hosts_count_number, 'start', len(bad_hosts), self.quiet)}) |
| node_init = HawqCommands(name = 'HAWQ', action_name = 'start', logger = logger) |
| node_init.get_function_list(work_list) |
| node_init.start() |
| logger.debug("Total threads return value is : %d" % node_init.return_flag) |
| if node_init.return_flag != 0: |
| logger.error("Segments start failed") |
| else: |
| logger.info("Segments started successfully") |
| return node_init.return_flag |
| |
| def run(self): |
| if self.node_type == "master": |
| check_return_code(self.start_master(), logger, \ |
| "Master start failed, exit", "Master started successfully") |
| elif self.node_type == "standby": |
| if self.standby_host_name == '': |
| sys.exit(1) |
| check_return_code(self.start_standby(), logger, "Standby master start failed, exit", |
| "Standby master started successfully") |
| elif self.node_type == "segment": |
| check_return_code(self.start_segment(), logger, \ |
| "Segment start failed, exit", "Segment started successfully") |
| elif self.node_type == "cluster": |
| check_return_code(self._start_all_nodes()) |
| elif self.node_type == "allsegments": |
| check_return_code(self._start_all_segments()) |
| else: |
| sys.exit('Node object should be in [master, standby, segment, allsegments, cluster]') |
| return None |
| |
| |
| class HawqStop: |
| def __init__(self, opts, hawq_dict): |
| self.node_type = opts.node_type |
| self.hawq_command = opts.hawq_command |
| self.user= opts.user |
| self.GPHOME = opts.GPHOME |
| self.stop_mode = opts.stop_mode |
| self.quiet = opts.quiet_run |
| self.log_dir = opts.log_dir |
| self.timeout = opts.timeout_seconds |
| self.hawq_dict = hawq_dict |
| self.hawq_reload = opts.hawq_reload |
| if self.hawq_reload: |
| self.stop_action = 'reload' |
| self.stop_action_past = 'reloaded' |
| else: |
| self.stop_action = 'stop' |
| self.stop_action_past = 'stopped' |
| |
| self.lock = threading.Lock() |
| self.dburl = None |
| self.conn = None |
| self._get_config() |
| self.ignore_bad_hosts = opts.ignore_bad_hosts |
| |
| def _get_config(self): |
| check_items = ('hawq_master_address_host', 'hawq_master_address_port', |
| 'hawq_master_directory', 'hawq_segment_directory', |
| 'hawq_segment_address_port', 'hawq_dfs_url', |
| 'hawq_master_temp_directory', 'hawq_segment_temp_directory') |
| |
| for item in check_items: |
| if item not in self.hawq_dict: |
| sys.exit("Check: %s not configured in hawq-site.xml" % item) |
| |
| self.master_host_name = self.hawq_dict['hawq_master_address_host'] |
| self.master_port = self.hawq_dict['hawq_master_address_port'] |
| self.master_data_directory = self.hawq_dict['hawq_master_directory'] |
| self.master_address = self.master_host_name + ":" + self.master_port |
| self.segment_data_directory = self.hawq_dict['hawq_segment_directory'] |
| self.segment_port = self.hawq_dict['hawq_segment_address_port'] |
| self.dfs_url = self.hawq_dict['hawq_dfs_url'] |
| self.host_list = parse_hosts_file(self.GPHOME) |
| self.hosts_count_number = len(self.host_list) |
| |
| if 'hawq_standby_address_host' in self.hawq_dict: |
| self.standby_host_name = self.hawq_dict['hawq_standby_address_host'] |
| self.standby_port = self.master_port |
| self.standby_address = self.standby_host_name + ":" + self.standby_port |
| else: |
| logger.info("No standby host configured") |
| self.standby_host_name = '' |
| |
| def _stop_master_checks(self): |
| try: |
| total_connections = 0 |
| self.dburl = dbconn.DbURL(hostname=self.master_host_name, port=self.master_port, username=self.user, dbname='template1') |
| self.conn = dbconn.connect(self.dburl, utility=True) |
| total_connections=len(catalog.getUserPIDs(self.conn)) |
| self.conn.close() |
| logger.info("There are %d connections to the database" % total_connections) |
| except DatabaseError, ex: |
| logger.error("Failed to connect to the running database, please check master status") |
| logger.error("Or you can check hawq stop --help for other stop options") |
| sys.exit(1) |
| |
| if total_connections > 0 and self.stop_mode=='smart': |
| logger.warning("There are other connections to this instance, shutdown mode smart aborted") |
| logger.warning("Either remove connections, or use 'hawq stop master -M fast' or 'hawq stop master -M immediate'") |
| logger.warning("See hawq stop --help for all options") |
| logger.error("Active connections. Aborting shutdown...") |
| sys.exit(1) |
| |
| logger.info("Commencing Master instance shutdown with mode='%s'" % self.stop_mode) |
| logger.info("Master host=%s" % self.master_host_name) |
| |
| if self.stop_mode == 'smart': |
| pass |
| elif self.stop_mode == 'fast': |
| logger.info("Detected %d connections to database" % total_connections) |
| if total_connections > 0: |
| logger.info("Switching to WAIT mode") |
| logger.info("Will wait for shutdown to complete, this may take some time if") |
| logger.info("there are a large number of active complex transactions, please wait...") |
| else: |
| if self.timeout == SEGMENT_TIMEOUT_DEFAULT: |
| logger.info("Using standard WAIT mode of %s seconds" % SEGMENT_TIMEOUT_DEFAULT) |
| else: |
| logger.info("Using WAIT mode of %s seconds" % self.timeout) |
| pass |
| |
| def _stop_master_cmd(self): |
| logger.info("%s hawq master" % self.stop_action.title()) |
| if self.hawq_reload: |
| cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \ |
| (source_hawq_env, self.GPHOME, self.master_data_directory, log_filename) |
| return cmd_str |
| else: |
| cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \ |
| (source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory, |
| self.master_data_directory, self.stop_mode, log_filename) |
| return cmd_str |
| |
| def _stop_master(self): |
| master_host, master_running = check_hawq_running(self.master_host_name, self.master_data_directory, self.master_port, self.user, logger) |
| if master_running: |
| if self.stop_mode != 'immediate' and not self.hawq_reload: |
| self._stop_master_checks() |
| cmd = self._stop_master_cmd() |
| result = remote_ssh(cmd, self.master_host_name, self.user) |
| return result |
| else: |
| logger.warn('Master is not running, skip') |
| return 0 |
| |
| def _stop_segment_cmd(self): |
| logger.info("%s hawq segment" % self.stop_action.title()) |
| if self.hawq_reload: |
| cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \ |
| (source_hawq_env, self.GPHOME, self.segment_data_directory, log_filename) |
| return cmd_str |
| else: |
| cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \ |
| (source_hawq_env, self.GPHOME, self.timeout, self.segment_data_directory, |
| self.segment_data_directory, self.stop_mode, log_filename) |
| return cmd_str |
| |
| def _stop_segment(self): |
| seg_host, segment_running = check_hawq_running('localhost', self.segment_data_directory, self.segment_port, self.user, logger) |
| if segment_running: |
| cmd = self._stop_segment_cmd() |
| result = remote_ssh(cmd, 'localhost', self.user) |
| return result |
| else: |
| #logger.warning('') |
| return 0 |
| |
| def _stop_standby_cmd(self): |
| logger.info("%s hawq standby master" % self.stop_action.title()) |
| if self.hawq_reload: |
| cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \ |
| (source_hawq_env, self.GPHOME, self.master_data_directory, log_filename) |
| return cmd_str |
| else: |
| cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \ |
| (source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory, |
| self.master_data_directory, self.stop_mode, log_filename) |
| return cmd_str |
| |
| def _stop_standby(self): |
| if check_syncmaster_running(self.master_data_directory, '', self.standby_host_name, logger): |
| cmd = self._stop_standby_cmd() |
| result = remote_ssh(cmd, self.standby_host_name, self.user) |
| return result |
| else: |
| logger.warn("Standby master is not running, skip") |
| return 0 |
| |
| def _stopAll(self): |
| logger.info("%s hawq cluster" % self.stop_action.title()) |
| master_result = 0 |
| standby_result = 0 |
| segments_return_flag = 0 |
| master_result = self._stop_master() |
| if master_result != 0: |
| logger.error("Master %s failed" % self.stop_action) |
| else: |
| logger.info("Master %s successfully" % self.stop_action_past) |
| if self.standby_host_name.lower() not in ('', 'none'): |
| standby_result = self._stop_standby() |
| if standby_result != 0: |
| logger.error("Standby master %s failed" % self.stop_action) |
| else: |
| logger.info("Standby master %s successfully" % self.stop_action_past) |
| |
| # Execute segment stop command on each node. |
| segments_return_flag = self._stopAllSegments() |
| cluster_result = master_result + standby_result + segments_return_flag |
| if cluster_result != 0: |
| logger.error("Cluster %s failed" % self.stop_action) |
| else: |
| logger.info("Cluster %s successfully" % self.stop_action_past) |
| return cluster_result |
| |
| def _running_segments_list(self, host_list): |
| work_list = [] |
| running_host = [] |
| stopped_host = [] |
| seg_check_q = Queue.Queue() |
| |
| for host in host_list: |
| work_list.append({"func":check_hawq_running,"args":(host, self.segment_data_directory, self.segment_port, self.user, logger)}) |
| |
| node_checks = threads_with_return(name = 'HAWQ', action_name = 'check', logger = logger, return_values = seg_check_q) |
| node_checks.get_function_list(work_list) |
| node_checks.start() |
| while not seg_check_q.empty(): |
| item = seg_check_q.get() |
| if item[1] == True: |
| running_host.append(item[0]) |
| else: |
| stopped_host.append(item[0]) |
| |
| return running_host, stopped_host |
| |
| |
| def _stopAllSegments(self): |
| bad_hosts = [] |
| working_hosts = self.host_list |
| segment_cmd_str = self._stop_segment_cmd() |
| logger.info("%s segments in list: %s" % (self.stop_action.title(), self.host_list)) |
| |
| working_hosts, bad_hosts = exclude_bad_hosts(self.host_list) |
| if len(bad_hosts) == len(self.host_list): |
| logger.error("Unable to SSH on any of the hosts, skipping segment %s operation" % self.stop_action) |
| return 1 |
| |
| process_running_host, stopped_host = self._running_segments_list(working_hosts) |
| |
| # Execute segment stop command on specified nodes. |
| if self.ignore_bad_hosts: |
| if len(bad_hosts) > 0: |
| logger.warning("Skipping {1} segments in the list {0}, SSH test failed".format(bad_hosts, self.stop_action)) |
| skip_host_list = bad_hosts + stopped_host |
| else: |
| skip_host_list = stopped_host |
| |
| work_list = [] |
| q = Queue.Queue() |
| for host in process_running_host: |
| work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)}) |
| logger.info("Total segment number is: %s" % len(self.host_list)) |
| work_list.append({"func":check_progress,"args":(q, len(process_running_host), self.stop_action, len(skip_host_list), self.quiet)}) |
| node_init = HawqCommands(name = 'HAWQ', action_name = self.stop_action, logger = logger) |
| node_init.get_function_list(work_list) |
| node_init.start() |
| if self.ignore_bad_hosts: |
| total_return_flag = node_init.return_flag |
| else: |
| if len(bad_hosts) > 0: |
| logger.error("%s segment %s failed, SSH test failed on %s" % (len(bad_hosts), self.stop_action, bad_hosts)) |
| total_return_flag = node_init.return_flag + len(bad_hosts) |
| |
| if total_return_flag != 0: |
| logger.error("Segments %s failed" % self.stop_action) |
| else: |
| logger.info("Segments %s successfully" % self.stop_action_past) |
| return total_return_flag |
| |
| def run(self): |
| if self.node_type == "master": |
| check_return_code(self._stop_master(), logger, \ |
| "Master %s failed, exit" % self.stop_action, "Master %s successfully" % self.stop_action_past) |
| elif self.node_type == "standby": |
| if self.standby_host_name.lower() not in ('', 'none'): |
| check_return_code(self._stop_standby(), logger, \ |
| "Standby master %s failed, exit" % self.stop_action, "Standby master %s successfully" % self.stop_action_past) |
| elif self.node_type == "segment": |
| check_return_code(self._stop_segment(), logger, \ |
| "Segment %s failed, exit" % self.stop_action, "Segment %s successfully" % self.stop_action_past) |
| elif self.node_type == "cluster": |
| check_return_code(self._stopAll()) |
| elif self.node_type == "allsegments": |
| check_return_code(self._stopAllSegments()) |
| else: |
| sys.exit('Node object should be in [master, standby, segment, allsegments, cluster]') |
| return None |
| |
| |
| def create_logger(logname='hawq logger', outputFile='/tmp/hawq_mgmt.log'): |
| logger = logging.getLogger(logname) |
| logger.setLevel(logging.DEBUG) |
| fh = logging.FileHandler(outputFile) |
| fh.setLevel(logging.DEBUG) |
| ch = logging.StreamHandler() |
| ch.setLevel(logging.DEBUG) |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| fh.setFormatter(formatter) |
| ch.setFormatter(formatter) |
| logger.addHandler(fh) |
| #logger.addHandler(ch) |
| return logger |
| |
| |
| def get_args(): |
| (opts, ARGS) = create_parser() |
| if ARGS[0]: |
| opts.hawq_command = ARGS[0] |
| if ARGS[1]: |
| opts.node_type = ARGS[1] |
| |
| if opts.node_type in ['master', 'standby', 'segment', 'cluster', 'allsegments'] and opts.hawq_command in ['start', 'stop', 'restart', 'init', 'activate']: |
| if opts.log_dir and not os.path.exists(opts.log_dir): |
| os.makedirs(opts.log_dir) |
| |
| global logger, log_filename |
| if opts.verbose: |
| enable_verbose_logging() |
| if opts.quiet_run: |
| quiet_stdout_logging() |
| logger, log_filename = setup_hawq_tool_logging('hawq_%s' % opts.hawq_command,getLocalHostname(),getUserName(), opts.log_dir) |
| logger.info("Prepare to do 'hawq %s'" % opts.hawq_command) |
| logger.info("You can find log in:") |
| logger.info(log_filename) |
| else: |
| print COMMON_HELP |
| sys.exit(1) |
| |
| args_lens = len(ARGS) |
| |
| if args_lens != 2: |
| sys.exit('Only support two arguements.') |
| |
| if opts.verbose and opts.quiet_run: |
| logger.error("Multiple actions specified. See the --help info.") |
| sys.exit(1) |
| |
| if opts.special_mode and (opts.hawq_command != 'start'): |
| logger.error("['-U', '--special-mode'] only apply to 'hawq start'. See the --help info.") |
| sys.exit(1) |
| |
| opts.GPHOME = os.getenv('GPHOME') |
| if not opts.GPHOME: |
| logger.error("Didn't get GPHOME value, exit") |
| sys.exit() |
| logger.info("GPHOME is set to:") |
| logger.info(opts.GPHOME) |
| global source_hawq_env |
| source_hawq_env = ". %s/greenplum_path.sh" % opts.GPHOME |
| |
| if opts.user == "": |
| opts.user = getpass.getuser() |
| if opts.user == "root": |
| logger.error("'root' user is not allowed") |
| sys.exit() |
| logger.debug("Current user is '%s'" % opts.user) |
| logger.debug("Parsing config file:") |
| logger.debug("%s/etc/hawq-site.xml" % opts.GPHOME) |
| hawqsite = HawqXMLParser(opts.GPHOME) |
| hawqsite.get_all_values() |
| hawq_dict = hawqsite.hawq_dict |
| cluster_host_list = list() |
| cluster_host_list.append(hawq_dict['hawq_master_address_host']) |
| |
| if 'hawq_standby_address_host' in hawq_dict: |
| cluster_host_list.append(hawq_dict['hawq_standby_address_host']) |
| |
| segments_host_list = parse_hosts_file(opts.GPHOME) |
| for host in segments_host_list: |
| cluster_host_list.append(host) |
| |
| if opts.log_dir: |
| logger.debug("Check and create log directory %s on all hosts" % opts.log_dir) |
| create_success_host, create_failed_host = create_cluster_directory(opts.log_dir, cluster_host_list) |
| if len(create_failed_host) > 0: |
| logger.error("Create log directory %s failed on hosts %s" % (opts.log_dir, create_failed_host)) |
| logger.error("Please check directory permission") |
| |
| if opts.hawq_reload: |
| logger.info("Reloading configuration without restarting hawq cluster") |
| else: |
| logger.info("%s hawq with args: %s" % (opts.hawq_command.title(), ARGS)) |
| return opts, hawq_dict |
| |
| |
| def remote_ssh(cmd_str, host, user, q=None): |
| if user == "": |
| remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd_str) |
| else: |
| remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd_str) |
| try: |
| result = subprocess.Popen(remote_cmd_str, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) |
| stdout,stderr = result.communicate() |
| except subprocess.CalledProcessError: |
| print "Execute shell command on %s failed" % host |
| pass |
| |
| if stdout and stdout != '': |
| logger.info(stdout.strip()) |
| if stderr and stderr != '': |
| logger.info(stderr.strip()) |
| if q: |
| q.put(("done", host, result.returncode)) |
| return result.returncode |
| |
| def remote_ssh_nowait(cmd, host, user): |
| if user == "": |
| remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd) |
| else: |
| remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd) |
| result = subprocess.Popen(remote_cmd_str, shell=True).wait() |
| return result |
| |
| |
| def check_progress(q, total_num, action, skip_num = 0, quiet=False): |
| working_num = total_num |
| success_num = 0 |
| pnum = 0 |
| sys.stdout.write("\r") |
| while working_num > 0: |
| while not q.empty(): |
| msg = q.get() |
| if msg[0] == "done": |
| working_num = working_num - 1 |
| if msg[2] == 0: |
| success_num = success_num + 1 |
| if not quiet: |
| sys.stdout.write(".") |
| sys.stdout.flush() |
| time.sleep(1) |
| if not quiet: |
| sys.stdout.write("\n") |
| if skip_num != 0: |
| logger.info("%d of %d segments %s successfully, %d segments %s skipped" % (success_num, total_num, action, skip_num, action)) |
| else: |
| logger.info("%d of %d segments %s successfully" % (success_num, total_num, action)) |
| return 0 |
| |
| |
| def start_hawq(opts, hawq_dict): |
| instance = HawqStart(opts, hawq_dict) |
| instance.run() |
| return None |
| |
| |
| def stop_hawq(opts, hawq_dict): |
| instance = HawqStop(opts, hawq_dict) |
| instance.run() |
| return None |
| |
| |
| def hawq_init(opts, hawq_dict): |
| instance = HawqInit(opts, hawq_dict) |
| instance.run() |
| return None |
| |
| |
| def hawq_activate_standby(opts, hawq_dict): |
| old_master_host_name = hawq_dict['hawq_master_address_host'] |
| hawq_master_directory = hawq_dict['hawq_master_directory'] |
| if 'hawq_standby_address_host' in hawq_dict: |
| if hawq_dict['hawq_standby_address_host'].lower() not in ['none', '', 'localhost']: |
| old_standby_host_name = hawq_dict['hawq_standby_address_host'] |
| new_master_host_name = hawq_dict['hawq_standby_address_host'] |
| logger.info("Starting to activate standby master '%s'" % old_standby_host_name) |
| else: |
| logger.error("No valid standby host name found, skip activate standby") |
| |
| # Try to stop hawq cluster before doing standby activate. |
| if check_postgres_running(hawq_master_directory, '', old_master_host_name, logger): |
| logger.info("Try to stop hawq master before activate standby") |
| cmd = "%s; hawq stop master -a -M fast -q;" % source_hawq_env |
| result = remote_ssh(cmd, old_master_host_name, '') |
| if result != 0: |
| logger.error("Stop master failed, try again with immediate mode") |
| cmd = "%s; hawq stop master -a -M immediate -q;" % source_hawq_env |
| return_result = remote_ssh(cmd, old_master_host_name, '') |
| if return_result != 0: |
| logger.error("Stop master failed, abort") |
| logger.error("Please manually bring hawq cluster down, then do activate standby again") |
| sys.exit(1) |
| |
| logger.info("Master is stopped") |
| else: |
| logger.info("HAWQ master is not running, skip") |
| |
| ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else '' |
| logger.info("Stopping all the running segments") |
| cmd = "%s; hawq stop allsegments -a -M fast -q %s;" % (source_hawq_env, ignore_bad_hosts) |
| result = remote_ssh(cmd, old_standby_host_name, '') |
| if result != 0: |
| logger.error("Stop segments failed, abort") |
| logger.error("Please manually bring hawq cluster down, then do activate standby again") |
| sys.exit(1) |
| |
| logger.info("Stopping running standby") |
| if check_syncmaster_running(hawq_master_directory, '', old_standby_host_name, logger): |
| cmd = "%s; hawq stop standby -a -M fast -q;" % source_hawq_env |
| result = remote_ssh(cmd, old_standby_host_name, '') |
| if result != 0: |
| logger.error("Stop standby failed, abort") |
| logger.error("Please manually bring hawq cluster down, then do activate standby again") |
| sys.exit(1) |
| else: |
| logger.info("Standby master is not running, skip") |
| |
| # Set current standby host name as the new master host name in configuration. |
| logger.info("Update master host name in hawq-site.xml") |
| cmd = "%s; hawq config -c hawq_master_address_host -v %s --skipvalidation -q %s" % (source_hawq_env, hawq_dict['hawq_standby_address_host'], ignore_bad_hosts) |
| check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Set hawq_master_address_host failed") |
| |
| # Remove the old standby host configuration from hawq-site.xml. |
| logger.info("Remove current standby from hawq-site.xml") |
| cmd = "%s; hawq config -r hawq_standby_address_host --skipvalidation -q %s" % (source_hawq_env, ignore_bad_hosts) |
| check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Remove hawq_standby_address_host from configuration failed") |
| |
| cmd = '''echo "gp_persistent_repair_global_sequence = true" >> %s/%s''' % (hawq_dict['hawq_master_directory'], 'postgresql.conf') |
| check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Set gp_persistent_repair_global_sequence = true failed") |
| |
| # Start the new master in master only mode. |
| logger.info("Start master in master only mode") |
| cmd = "%s; hawq start master --masteronly -q" % source_hawq_env |
| check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start master in master only mode failed") |
| |
| # Remove the old standby information in database. |
| logger.info("Remove current standby from catalog") |
| cmd = "%s; env PGOPTIONS=\\\"-c gp_session_role=utility\\\" psql -p %s -d template1 -o /dev/null -c \\\"select gp_remove_master_standby()\ |
| where (select count(*) from gp_segment_configuration where role='s') = 1;\\\"" % (source_hawq_env, hawq_dict['hawq_master_address_port']) |
| result = remote_ssh(cmd, new_master_host_name, '') |
| |
| # Try to restart hawq cluster. |
| logger.info("Stop hawq master") |
| cmd = "%s; hawq stop master -a -M fast -q" % source_hawq_env |
| check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Stop master failed") |
| logger.info("Start hawq cluster") |
| cmd = "%s; hawq start master" % source_hawq_env |
| check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start master failed") |
| cmd = "%s; hawq start allsegments %s" % (source_hawq_env, ignore_bad_hosts) |
| check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start all the segments failed") |
| cmd = '''sed -i "/gp_persistent_repair_global_sequence/d" %s/%s''' % (hawq_dict['hawq_master_directory'], 'postgresql.conf') |
| check_return_code(remote_ssh(cmd, new_master_host_name, '')) |
| logger.info("HAWQ activate standby successfully") |
| return None |
| |
| |
| def restart_hawq(opts, hawq_dict): |
| logger.info("Restarting hawq:") |
| stop_hawq(opts, hawq_dict) |
| start_hawq(opts, hawq_dict) |
| return None |
| |
| |
| def create_parser(): |
| parser = OptionParser(usage="HAWQ management scripts options") |
| parser.add_option("-a", "--prompt", |
| action="store_false", |
| dest="prompt", |
| default=True, |
| help="Execute automatically") |
| parser.add_option("-M", "--mode", |
| choices=['smart', 'immediate', 'fast'], |
| dest="stop_mode", |
| default="smart", |
| help="HAWQ stop mode: smart/fast/immediate") |
| parser.add_option("-v", "--verbose", |
| action="store_true", |
| dest="verbose", |
| help="Execute with verbose output") |
| parser.add_option("-q", "--quiet", |
| action="store_true", |
| dest="quiet_run", |
| help="Execute in quiet mode") |
| parser.add_option("-l", "--logdir", |
| dest="log_dir", |
| help="Sets the directory for log files") |
| parser.add_option("-t", "--timeout", |
| dest="timeout_seconds", |
| default="600", |
| help="Set the timeout seconds, default is 600") |
| parser.add_option("--user", |
| dest="user", |
| default="", |
| help="Sets HAWQ user") |
| parser.add_option("-u", "--reload", |
| dest="hawq_reload", |
| action="store_true", |
| default=False, |
| help="Reload hawq configuration") |
| parser.add_option("-m", "--masteronly", |
| dest="masteronly", |
| action="store_true", |
| default=False, |
| help="Start hawq in utility mode") |
| parser.add_option("-U", "--special-mode", |
| choices=['upgrade', 'maintenance'], |
| dest="special_mode", |
| help="Start hawq in upgrade/maintenance mode") |
| parser.add_option("-R", "--restrict", |
| dest="restrict", |
| action="store_true", |
| default=False, |
| help="Start hawq in restrict mode") |
| parser.add_option('-r', '--remove-standby', action='store_true', |
| dest='remove_standby', default=False, |
| help='Delete hawq standby master node.') |
| parser.add_option('-s', '--standby-host', type='string', |
| dest='new_standby_hostname', default='none', |
| help='Hostname of system to create standby master on') |
| parser.add_option('-n', '--no-update', action='store_true', |
| dest='no_update', default=False, |
| help='Do not update system catalog tables.') |
| parser.add_option('-i', '--ignore-bad-hosts', |
| dest='ignore_bad_hosts', action='store_true', |
| default=False, |
| help='Skips syncing configuration files on hosts on which SSH fails') |
| parser.add_option("--bucket_number", |
| type="int", |
| dest="default_hash_table_bucket_number", |
| help="Sets maximum number of virtual segments per node") |
| parser.add_option("--locale", |
| dest="hawq_locale", |
| default="en_US.utf8", |
| help="Sets the locale name") |
| parser.add_option("--lc-collate", |
| dest="hawq_lc_collate", |
| default="en_US.utf8", |
| help="Sets the string sort order") |
| parser.add_option("--lc-ctype", |
| dest="hawq_lc_ctype", |
| default="en_US.utf8", |
| help="Sets character classification") |
| parser.add_option("--lc-messages", |
| dest="hawq_lc_messages", |
| default="en_US.utf8", |
| help="Sets the language in which messages are displayed") |
| parser.add_option("--lc-monetary", |
| dest="hawq_lc_monetary", |
| default="en_US.utf8", |
| help="Sets the locale to use for formatting monetary amounts") |
| parser.add_option("--lc-numeric", |
| dest="hawq_lc_numeric", |
| default="en_US.utf8", |
| help="Sets the locale to use for formatting numbers") |
| parser.add_option("--lc-time", |
| dest="hawq_lc_time", |
| default="en_US.utf8", |
| help="Sets the locale to use for formatting dates and times") |
| parser.add_option("--max_connections", |
| dest="max_connections", |
| default="1280", |
| help="Sets the max_connections for formatting hawq database") |
| parser.add_option("--shared_buffers", |
| dest="shared_buffers", |
| default="128000kB", |
| help="Sets the shared_buffers for formatting hawq database") |
| |
| (options, args) = parser.parse_args() |
| if len(args) == 0: |
| parser.print_help() |
| sys.exit(1) |
| |
| return (options, args) |
| |
| if __name__ == '__main__': |
| (opts, hawq_dict) = get_args() |
| |
| if opts.hawq_command == 'start': |
| start_hawq(opts, hawq_dict) |
| elif opts.hawq_command == 'stop': |
| if opts.prompt: |
| if not userinput.ask_yesno(None, "\nContinue with HAWQ service stop", 'N'): |
| sys.exit(1) |
| stop_hawq(opts, hawq_dict) |
| elif opts.hawq_command == 'restart': |
| restart_hawq(opts, hawq_dict) |
| elif opts.hawq_command == 'init': |
| if opts.node_type == 'standby': |
| warn_msg = "\nMaster will get restarted, continue with standby init" |
| else: |
| warn_msg = "\nContinue with HAWQ init" |
| |
| if opts.prompt: |
| if not userinput.ask_yesno(None, warn_msg, 'N'): |
| sys.exit(1) |
| hawq_init(opts, hawq_dict) |
| elif opts.hawq_command == 'activate': |
| if opts.prompt: |
| if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'): |
| sys.exit(1) |
| hawq_activate_standby(opts, hawq_dict) |
| else: |
| print COMMON_HELP |