#!/usr/bin/env python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


try:
    import os
    import sys
    import re
    import logging, time
    import subprocess
    import threading
    import Queue
    import signal
    import getpass
    import socket
    from optparse import OptionParser
    from gppylib.gplog import setup_hawq_tool_logging, quiet_stdout_logging, enable_verbose_logging
    from gppylib.commands.unix import getLocalHostname, getUserName
    from gppylib.commands import gp
    from gppylib.commands.gp import SEGMENT_TIMEOUT_DEFAULT
    from gppylib import userinput
    from gppylib.db import catalog
    from gppylib.commands import unix
    from hawqpylib.hawqlib import *
    from hawqpylib.HAWQ_HELP import *
    from gppylib.db import dbconn
    from pygresql.pg import DatabaseError
except ImportError, e:
    sys.exit('ERROR: Cannot import modules.  Please check that you '
             'have sourced greenplum_path.sh.  Detail: ' + str(e))

class HawqInit:
    def __init__(self, opts, hawq_dict):
        self.node_type = opts.node_type
        self.hawq_command = opts.hawq_command
        self.user= opts.user
        self.GPHOME = opts.GPHOME
        self.stop_mode = opts.stop_mode
        self.log_dir = opts.log_dir
        self.timeout = opts.timeout_seconds
        self.no_update = opts.no_update
        self.remove_standby = opts.remove_standby
        self.new_standby_hostname = opts.new_standby_hostname
        self.hawq_dict = hawq_dict
        self.locale = opts.hawq_locale
        self.quiet  = opts.quiet_run
        self.hawq_lc_collate= opts.hawq_lc_collate
        self.hawq_lc_ctype = opts.hawq_lc_ctype
        self.hawq_lc_messages = opts.hawq_lc_messages
        self.hawq_lc_monetary = opts.hawq_lc_monetary
        self.hawq_lc_numeric = opts.hawq_lc_numeric
        self.hawq_lc_time = opts.hawq_lc_time
        self.max_connections = opts.max_connections
        self.shared_buffers = opts.shared_buffers
        self.default_hash_table_bucket_number = opts.default_hash_table_bucket_number
        self.tde_keyname= opts.tde_keyname
        self.lock = threading.Lock()
        self.ignore_bad_hosts = opts.ignore_bad_hosts
        self.with_magma = opts.with_magma
        self._get_config()
        self._write_config()
        self._get_ips()

    def _get_config(self):
        check_items = ('hawq_master_address_host', 'hawq_master_address_port',
        'hawq_master_directory', 'hawq_segment_directory',
        'hawq_segment_address_port',
        'hawq_master_temp_directory', 'hawq_segment_temp_directory')

        if self.node_type in ['master', 'segment', 'standby']:
            for item in check_items:
                if item in self.hawq_dict:
                    logger.info("Check: %s is set" % item)
                else:
                    sys.exit("Check: %s not configured in hawq-site.xml" % item)

        self.master_host_name = self.hawq_dict['hawq_master_address_host']
        self.master_port = self.hawq_dict['hawq_master_address_port']
        self.master_data_directory = self.hawq_dict['hawq_master_directory']
        self.master_address = self.master_host_name + ":" + self.master_port
        self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
        self.segment_port = self.hawq_dict['hawq_segment_address_port']
        self.hawq_master_temp_directory = self.hawq_dict['hawq_master_temp_directory']
        self.hawq_segment_temp_directory = self.hawq_dict['hawq_segment_temp_directory']
        self.host_list = parse_hosts_file(self.GPHOME)
        self.hosts_count_number = len(self.host_list)

        if 'hawq_standby_address_host' in self.hawq_dict:
            self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
            self.standby_port = self.master_port
            self.standby_address = self.standby_host_name + ":" + self.standby_port
            if self.standby_host_name in (self.master_host_name, 'localhost', '127.0.0.1'):
                logger.error("Standby host should not be the same as master host")
                sys.exit(1)
        else:
            self.standby_host_name = ''

        if not self.default_hash_table_bucket_number and 'default_hash_table_bucket_number' in self.hawq_dict:
                self.default_hash_table_bucket_number = self.hawq_dict['default_hash_table_bucket_number']

        if self.new_standby_hostname != 'none':
            self.standby_host_name = self.new_standby_hostname

        if self.standby_host_name.lower() in ('', 'none'):
            logger.info("No standby host configured, skip it")

        if 'enable_secure_filesystem' in self.hawq_dict:
            self.enable_secure_filesystem = self.hawq_dict['enable_secure_filesystem']
        else:
            self.enable_secure_filesystem = 'off'

        if 'hawq_init_with_hdfs' in self.hawq_dict:
            self.hawq_init_with_hdfs = self.hawq_dict['hawq_init_with_hdfs']
        else:
            self.hawq_init_with_hdfs = 'on'

        if self.hawq_init_with_hdfs == 'on' or self.hawq_init_with_hdfs == 'true':
            if 'hawq_dfs_url' in self.hawq_dict:
                self.dfs_url = self.hawq_dict['hawq_dfs_url']
                logger.info("Check: hawq_dfs_url is set")
            else:
                sys.exit("Check: hawq_dfs_url not configured in hawq-site.xml")
        if 'krb_server_keyfile' in self.hawq_dict:
            self.krb_server_keyfile = self.hawq_dict['krb_server_keyfile']
        else:
            self.krb_server_keyfile = ''

        if 'krb_srvname' in self.hawq_dict:
            self.krb_srvname = self.hawq_dict['krb_srvname']
        else:
            self.krb_srvname = 'postgres'

        if 'enable_master_auto_ha' in self.hawq_dict:
            self.enable_master_auto_ha = self.hawq_dict['enable_master_auto_ha']
        else:
            self.enable_master_auto_ha = 'off'

        if 'upgrade_mode' in self.hawq_dict:
            self.upgrade_mode = self.hawq_dict['upgrade_mode']
    
    def _write_config(self):
        configFile = "%s/etc/_mgmt_config" % self.GPHOME
        # Clean configFile while the first write in.
        with open(configFile, 'w') as f: 
            f.write("GPHOME=%s\n" % self.GPHOME)
        with open(configFile, 'a') as f: 
            f.write("hawqUser=%s\n" % self.user)
            f.write("master_host_name=%s\n" % self.master_host_name)
            f.write("master_port=%s\n" % self.master_port)
            f.write("master_data_directory=%s\n" % self.master_data_directory)
            f.write("segment_data_directory=%s\n" % self.segment_data_directory)
            f.write("segment_port=%s\n" % self.segment_port)
            f.write("hawq_master_temp_directory=%s\n" % self.hawq_master_temp_directory)
            f.write("hawq_segment_temp_directory=%s\n" % self.hawq_segment_temp_directory)
            f.write("locale=%s\n" % self.locale)
            f.write("hawq_lc_collate=%s\n" % self.hawq_lc_collate)
            f.write("hawq_lc_ctype=%s\n" % self.hawq_lc_ctype)
            f.write("hawq_lc_messages=%s\n" % self.hawq_lc_messages)
            f.write("hawq_lc_monetary=%s\n" % self.hawq_lc_monetary)
            f.write("hawq_lc_numeric=%s\n" % self.hawq_lc_numeric)
            f.write("hawq_lc_time=%s\n" % self.hawq_lc_time)
            f.write("max_connections=%s\n" % self.max_connections)
            f.write("shared_buffers=%s\n" % self.shared_buffers)
            if self.hawq_init_with_hdfs == 'on' or self.hawq_init_with_hdfs == 'true':
                f.write("dfs_url=%s\n" % self.dfs_url)
            f.write("log_filename=%s\n" % log_filename)
            f.write("log_dir=%s\n" % self.log_dir)
            if self.standby_host_name.lower() not in ('', 'none'):
                f.write("standby_host_name=%s\n" % self.standby_host_name)

    def _get_ips(self):
        cmd = "%s/bin/lib/get_ip_addresses_of_host.sh %s master_ip_address_all" % (self.GPHOME, self.master_host_name)
        local_ssh(cmd, logger)
        if self.standby_host_name.lower() not in ('', 'none') and not self.remove_standby:
            cmd = "%s/bin/lib/get_ip_addresses_of_host.sh %s standby_ip_address_all" % (self.GPHOME, self.standby_host_name)
            local_ssh(cmd, logger)

    def check_hdfs_path(self):
        if self.tde_keyname:
            cmd = "%s/bin/gpcheckhdfs hdfs %s %s %s %s --with-tde %s" % \
              (self.GPHOME, self.dfs_url, self.enable_secure_filesystem, \
               self.krb_srvname, self.krb_server_keyfile, self.tde_keyname)
        else:
            cmd = "%s/bin/gpcheckhdfs hdfs %s %s %s %s" % \
              (self.GPHOME, self.dfs_url, self.enable_secure_filesystem, \
               self.krb_srvname, self.krb_server_keyfile)

        logger.info("Check if hdfs path is available")
        logger.debug("Check hdfs: %s" % cmd)
        check_return_code(local_ssh(cmd, logger, warning = True), logger, "Check hdfs failed, please verify your hdfs settings")
        if self.tde_keyname:
            logger.info("Create encryption zone successfully, key_name:%s" % (self.tde_keyname))

    def set_init_with_magma(self):
        logger.info("Set hawq_init_with_magma as: true")
        ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
        cmd = "hawq config -c hawq_init_with_magma -v true --skipvalidation -q %s > /dev/null" % ignore_bad_hosts
        result = local_ssh(cmd, logger)
        if result != 0:
            logger.error("Set hawq_init_with_magma failed")
        return result

    def set_new_standby_host(self):
        if self.new_standby_hostname == self.master_host_name:
            logger.error("Standby host name can't be the same as master host name")
            sys.exit(1)
        cmd = "%s; hawq config -c hawq_standby_address_host -v %s --skipvalidation -q > /dev/null" % \
               (source_hawq_env, self.new_standby_hostname)
        result = local_ssh(cmd, logger)
        if result != 0:
            logger.warn("Set standby host name failed")
        return result

    def sync_hdfs_client(self):
        sync_host_str = ""
        for node in self.host_list:
            sync_host_str += " -h %s" % node

        if 'hawq_standby_address_host' in self.hawq_dict and self.standby_host_name.lower() not in ('', 'none'):
            sync_host_str += " -h %s" % self.standby_host_name

        result = local_ssh("hawq scp %s %s/etc/hdfs-client.xml =:%s/etc/" % (sync_host_str, self.GPHOME, self.GPHOME))
        if result != 0:
            logger.error("Sync hdfs-client.xml failed.")
            sys.exit(1)

    def set_replace_datanode_on_failure(self):
        xml_file = "%s/etc/hdfs-client.xml" % self.GPHOME
        property_name = 'output.replace-datanode-on-failure'
        pexist, pname, pvalue = check_property_exist_xml(xml_file, property_name)
        datanodes_threshold = 4

        if self.hosts_count_number < datanodes_threshold:
            property_value = 'false'
        else:
            property_value = 'true'

        if pvalue != property_value:
            logger.info("Set output.replace-datanode-on-failure to %s" % property_value)
            if pexist:
                logger.debug("Update output.replace-datanode-on-failure to %s" % property_value)
                update_xml_property(xml_file, property_name, property_value)
            else:
                logger.debug("Add output.replace-datanode-on-failure as %s" % property_value)
                update_xml_property(xml_file, property_name, property_value)

            self.sync_hdfs_client()

            # Check hdfs-client.xml again to validate changes
            pexist, pname, pvalue = check_property_exist_xml(xml_file, property_name)
            if pvalue == property_value:
                result = 0
            else:
                result = 1
        else:
            result = 0

        return result

    def set_default_hash_table_bucket_number(self):
        if not self.default_hash_table_bucket_number:
            if 'hawq_rm_nvseg_perquery_limit' in self.hawq_dict:
                hawq_rm_nvseg_perquery_limit = self.hawq_dict['hawq_rm_nvseg_perquery_limit']
            else:
                hawq_rm_nvseg_perquery_limit = 512

            factor_min = 1
            factor_max = 8
            limit = int(hawq_rm_nvseg_perquery_limit)
            if int(self.hosts_count_number) == 0:
                segments_num = 1
            else:
                segments_num = int(self.hosts_count_number)

            factor = limit / segments_num
            # if too many segments or default limit is too low --> stick with the limit
            if factor < factor_min:
                buckets = limit
            # if the limit is large and results in factor > max --> limit factor to max
            elif factor > factor_max:
                buckets = factor_max * segments_num
            else:
                buckets = factor * segments_num

            self.default_hash_table_bucket_number = buckets

        logger.info("Set default_hash_table_bucket_number as: %s" % self.default_hash_table_bucket_number)
        ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
        cmd = "hawq config -c default_hash_table_bucket_number -v %s --skipvalidation -q %s > /dev/null" % \
               (self.default_hash_table_bucket_number, ignore_bad_hosts)
        result = local_ssh(cmd, logger)
        if result != 0:
            logger.error("Set default_hash_table_bucket_number failed")
        return result

    def _get_master_init_cmd(self):
        cmd = "%s/bin/lib/hawqinit.sh master '%s' '%s'" % \
                (self.GPHOME, self.GPHOME, self.hawq_init_with_hdfs)
        return cmd

    def _get_standby_init_cmd(self):
        cmd = "%s/bin/lib/hawqinit.sh standby '%s'" % \
                (self.GPHOME, self.GPHOME)
        return cmd

    def hawq_remove_standby(self):
        """Removes the standby master"""
        running_standby_host = ''

        try:
            dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
            conn = dbconn.connect(dburl, True)
            query = "select role, hostname from gp_segment_configuration where role = 's';"
            rows = dbconn.execSQL(conn, query)
            conn.close()
        except DatabaseError, ex:
            logger.error("Failed to connect to database, this script can only be run when the database is up")
            sys.exit(1)

        for row in rows:
            if row[0] == 's':
                running_standby_host = row[1]

        if running_standby_host:
            logger.info("running standby host is %s" % running_standby_host)
            signal.signal(signal.SIGINT,signal.SIG_IGN)
            logger.info("Stop HAWQ cluster")
            cmd = "%s; hawq stop master -a -M fast -q" % source_hawq_env
            check_return_code(local_ssh(cmd, logger), logger, "Stop HAWQ master failed, exit")
            ignore_bad_hosts = '--ignore-bad-hosts' if self.ignore_bad_hosts else ''
            cmd = "%s; hawq stop allsegments -a -q %s" % (source_hawq_env, ignore_bad_hosts)
            check_return_code(local_ssh(cmd, logger), logger, "Stop HAWQ segments failed, exit")
            logger.info("Start HAWQ master")
            cmd = "%s; hawq start master -m -q" % source_hawq_env
            check_return_code(local_ssh(cmd, logger), logger, "Start HAWQ master failed, exit")

            try:
                logger.info('Remove standby from Database catalog.')
                #dburl = dbconn.DbURL(port=self.master_port, dbname='template1')
                #conn = dbconn.connect(dburl, utility=True, verbose=True)
                #query = "select gp_remove_master_standby();"
                #rows = dbconn.execSQL(conn, query)
                #for row in rows:
                #    print row
                #conn.close()
                cmd = 'env PGOPTIONS="-c gp_session_role=utility" %s/bin/psql -p %s -d template1 -o /dev/null -c \
                    "select gp_remove_master_standby();"' % (self.GPHOME, self.master_port)
                check_return_code(local_ssh(cmd, logger), logger, \
                                  "Update catalog failed, exit", "Catalog updated successfully.")
                logger.info("Stop HAWQ master")
                cmd = "%s; hawq stop master -a -M fast" % source_hawq_env
                check_return_code(local_ssh(cmd, logger), logger, "Stop hawq master failed, exit")
            except DatabaseError, ex:
                logger.error("Failed to connect to database, this script can only be run when the database is up")
                cmd = "%s; hawq stop master -a -M fast" % source_hawq_env
                check_return_code(local_ssh(cmd, logger), logger, "Stop hawq master failed, exit")

            remove_property_xml("hawq_standby_address_host", "%s/etc/hawq-site.xml" % self.GPHOME, self.quiet)
            host_list = parse_hosts_file(self.GPHOME)
            sync_hawq_site(self.GPHOME, host_list)
            if is_node_alive(self.standby_host_name, self.user, logger):
                logger.info("Check if gpsyncmaster running on %s" % running_standby_host)
                gpsyncmaster_pid = gp.getSyncmasterPID(running_standby_host, self.master_data_directory)
                if gpsyncmaster_pid > 0:
                    # stop it
                    logger.info('Stopping gpsyncmaster on %s' % running_standby_host)
                    gp.SegmentStop.remote('stop gpsyncmaster',
                                        running_standby_host,
                                        self.master_data_directory)
            else:
                logger.warn('Not able to connect to Standby master, skip node clean')

            signal.signal(signal.SIGINT,signal.default_int_handler)
            logger.info('Remove standby master finished')
        else:
            logger.info("Do not find a running standby master")

    def _check_master_recovery(self):
        cmd = "%s; %s/bin/pg_controldata %s |grep 'Database cluster state';" % \
              (source_hawq_env, self.GPHOME, self.master_data_directory)
        result, stdout, stderr = remote_ssh_output(cmd, self.master_host_name, '')

        if stdout.find("recovery") != -1:
            logger.info('Master is doing recovery start, abort init standby')
            return 1

        return 0

    def _init_standby(self):
        logger.info("Start to init standby master: '%s'" % self.standby_host_name)
        logger.info("This might take a couple of minutes, please wait...")
        check_return_code(self._check_master_recovery())
        # Sync config files from master.
        scpcmd = "scp %s/etc/_mgmt_config %s:%s/etc/_mgmt_config > /dev/null" % \
                 (self.GPHOME, self.standby_host_name, self.GPHOME)
        check_return_code(local_ssh(scpcmd, logger, warning = True), \
                          logger, "Sync _mgmt_config failed")
        scpcmd = "scp %s/etc/hawq-site.xml %s:%s/etc/hawq-site.xml > /dev/null" % \
                 (self.GPHOME, self.standby_host_name, self.GPHOME)
        check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
                          logger, "Sync hawq-site.xml failed")
        scpcmd = "scp %s/etc/slaves %s:%s/etc/slaves > /dev/null" % \
                 (self.GPHOME, self.standby_host_name, self.GPHOME)
        check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
                          logger, "Sync slaves file failed")
        # Sync rps configuration
        if os.path.exists("%s/ranger/" % (self.GPHOME)):
            scpcmd = "scp %s/ranger/etc/* %s:%s/ranger/etc/ > /dev/null" % \
                 (self.GPHOME, self.standby_host_name, self.GPHOME)
            check_return_code(remote_ssh(scpcmd, self.master_host_name, self.user), \
                          logger, "Sync rps configuration files failed")

        standby_init_cmd = self._get_standby_init_cmd()

        return check_return_code(remote_ssh_nowait(standby_init_cmd, self.standby_host_name, self.user))

    def _resync_standby(self):
        logger.info("Re-sync standby")
        cmd = "%s; hawq stop master -a -M %s;" % (source_hawq_env, self.stop_mode)
        check_return_code(local_ssh(cmd, logger), logger, "Stop hawq cluster failed, exit")
        cmd = "cd %s; %s; %s/bin/lib/pysync.py -x gpperfmon/data -x pg_log -x db_dumps %s %s:%s" % \
                 (self.master_data_directory, source_hawq_env,  self.GPHOME, self.master_data_directory,
                  self.standby_host_name, self.master_data_directory)
        result = local_ssh(cmd, logger)
        check_return_code(result, logger, "Re-sync standby master failed, exit")
        cmd = "%s; hawq start master -a" % source_hawq_env
        result = local_ssh(cmd, logger)
        check_return_code(result, logger, "Start hawq cluster failed")

        return result

    def _get_segment_init_cmd(self):
        cmd = "%s/bin/lib/hawqinit.sh segment '%s'" % \
                (self.GPHOME, self.GPHOME)
        return cmd

    def _init_cluster(self):
        logger.info("%s segment hosts defined" % self.hosts_count_number)
        check_return_code(self.set_default_hash_table_bucket_number())
        check_return_code(self.set_replace_datanode_on_failure())
        if self.with_magma:
            check_return_code(self.set_init_with_magma())
        
        master_cmd = self._get_master_init_cmd()
        logger.info("Start to init master node: '%s'" % self.master_host_name)
        check_return_code(local_ssh(master_cmd, logger, warning = True), logger, \
                          "Master init failed, exit", \
                          "Master init successfully")
        self.start_heartbeat_and_monitor(self.master_host_name, "master")
        if self.standby_host_name.lower() not in ('', 'none'):
            check_return_code(self._init_standby(), logger, \
                              "Init standby failed, exit", \
                              "Init standby successfully")
        check_return_code(self._init_all_segments(), logger, \
                          "Segments init failed, exit", \
                          "Segments init successfully on nodes '%s'" % self.host_list)
        logger.info("Init HAWQ cluster successfully")

    def start_heartbeat_and_monitor(self,host,role):
        if self.standby_host_name in ('', 'none') or self.enable_master_auto_ha == 'off':
            return 0
        if role == "master":
            logger.info("Start heartbeat sender")
            cmd_str = "%s; nohup %s/bin/autoswitch.sh start heartbeat_sender %s/heartbeat_log" % (source_hawq_env, self.GPHOME, os.path.expanduser("~")+"/hawqAdminLogs")
        elif role == "standby":
            logger.info("Start heartbeat monitor")
            cmd_str = "%s; nohup %s/bin/autoswitch.sh start heartbeat_monitor %s/heartbeat_log" % (source_hawq_env, self.GPHOME, os.path.expanduser("~")+"/hawqAdminLogs")
        result = remote_ssh(cmd_str, host,self.user)
        return result

    def _init_all_segments(self):
        segment_cmd_str = self._get_segment_init_cmd()
        # Execute segment init command on each segment nodes.
        logger.info("Init segments in list: %s" % self.host_list)
        bad_hosts = []
        working_hosts = self.host_list
        if self.ignore_bad_hosts:
            working_hosts, bad_hosts = exclude_bad_hosts(self.host_list)
            if len(bad_hosts) == len(self.host_list):
                logger.error("Unable to SSH on any of the hosts, skipping segment start operation")
                return
            if len(bad_hosts) > 0:
                logger.warning("Skipping starting segments in the list {0}, SSH test failed".format(bad_hosts))
                self.hosts_count_number -= len(bad_hosts)

        work_list = []
        q = Queue.Queue()
        for host in self.host_list:
            logger.debug("Start to init segment on node '%s'" % host)
            scpcmd = "scp %s/etc/_mgmt_config %s:%s/etc/_mgmt_config > /dev/null" % (self.GPHOME, host, self.GPHOME)
            local_ssh(scpcmd, logger)
            work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
        logger.info("Total segment number is: %s" % len(self.host_list))
        work_list.append({"func":check_progress,"args":(q, self.hosts_count_number, 'init', 0, self.quiet)})
        node_init = HawqCommands(name='HAWQ', action_name = 'init', logger = logger)
        node_init.get_function_list(work_list)
        node_init.start()
        return node_init.return_flag

    def run(self):
        if self.new_standby_hostname != 'none':
            check_return_code(self.set_new_standby_host())

        if self.node_type == "master":
            if self.hawq_init_with_hdfs == 'ture' or self.hawq_init_with_hdfs == 'on' :
                self.check_hdfs_path()
            check_return_code(self.set_new_standby_host())
            logger.info("%s segment hosts defined" % self.hosts_count_number)
            check_return_code(self.set_default_hash_table_bucket_number())
            check_return_code(self.set_replace_datanode_on_failure())
            logger.info("Start to init master")
            cmd = self._get_master_init_cmd()
            check_return_code(local_ssh(cmd, logger, warning = True), logger, \
                              "Master init failed, exit", "Master init successfully")
        elif self.node_type == "standby" and self.remove_standby is True:
            logger.info("Try to remove standby master")
            self.hawq_remove_standby()
        elif self.node_type == "standby":
            if self.standby_host_name.lower() in ('', 'none'):
                logger.info("No standby host found")
                logger.info("Please check your standby host name")
                sys.exit(1)
            if self.no_update:
                check_return_code(self._resync_standby(), logger, \
                                  "Standby master re-sync failed, exit", \
                                  "Standby master re-sync successfully")
            else:
                check_return_code(self._init_standby(), logger, \
                                  "Init standby failed, exit", \
                                  "Init standby successfully")

        elif self.node_type == "segment":
            cmd = self._get_segment_init_cmd()
            check_return_code(local_ssh(cmd, logger, warning = True), logger, \
                              "Segment init failed, exit", \
                              "Segment init successfully")
        elif self.node_type == "cluster":
            if self.hawq_init_with_hdfs == 'true' or self.hawq_init_with_hdfs == 'on':
                self.check_hdfs_path()
            self._init_cluster()
        else:
            sys.exit('hawq init object should be one of master/standby/segment/cluster')
        return None


class HawqStart:
    def __init__(self, opts, hawq_dict):
        self.node_type = opts.node_type
        self.hawq_command = opts.hawq_command
        self.user= opts.user
        self.GPHOME = opts.GPHOME
        self.quiet  = opts.quiet_run
        self.stop_mode = opts.stop_mode
        self.log_dir = opts.log_dir
        self.timeout = opts.timeout_seconds
        self.hawq_dict = hawq_dict
        self.lock = threading.Lock()
        self.max_connections = opts.max_connections
        self.masteronly = opts.masteronly 
        self.special_mode = opts.special_mode
        self.restrict =  opts.restrict
        self.ignore_bad_hosts = opts.ignore_bad_hosts

        self._get_config()
        self.hawq_acl_type = self.check_hawq_acl_type()

    def _get_config(self):
        logger.info("Gathering information and validating the environment...")
        check_items = ('hawq_master_address_host', 'hawq_master_address_port',
        'hawq_master_directory', 'hawq_segment_directory',
        'hawq_segment_address_port', 'hawq_dfs_url',
        'hawq_master_temp_directory', 'hawq_segment_temp_directory')

        for item in check_items:
            if item not in self.hawq_dict:
                logger.error("Check: %s not configured in hawq-site.xml" % item)
                sys.exit()

        self.master_host_name = self.hawq_dict['hawq_master_address_host']
        self.master_port = self.hawq_dict['hawq_master_address_port']
        self.master_data_directory = self.hawq_dict['hawq_master_directory']
        self.master_address = self.master_host_name + ":" + self.master_port
        self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
        self.segment_port = self.hawq_dict['hawq_segment_address_port']
        self.dfs_url = self.hawq_dict['hawq_dfs_url']
        self.host_list = parse_hosts_file(self.GPHOME)
        self.hosts_count_number = len(self.host_list)

        if 'hawq_standby_address_host' in self.hawq_dict:
            self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
            self.standby_port = self.master_port
            self.standby_address = self.standby_host_name + ":" + self.standby_port
        else:
            logger.info("No standby host configured")
            self.standby_host_name = ''

        if 'enable_master_auto_ha' in self.hawq_dict:
            self.enable_master_auto_ha = self.hawq_dict['enable_master_auto_ha']
        else:
            self.enable_master_auto_ha = 'off'

        if self.enable_master_auto_ha == 'on' and 'ha_zookeeper_quorum' not in self.hawq_dict:
            logger.error("ha_zookeeper_quorum not found in hawq-site.xml, but enable_master_auto_ha is on")
            sys.exit(1)

        if 'upgrade_mode' in self.hawq_dict:
            self.upgrade_mode = self.hawq_dict['upgrade_mode']

    def check_hawq_acl_type(self):
        if 'hawq_acl_type' in self.hawq_dict and self.hawq_dict['hawq_acl_type'].lower() == 'ranger':
            return "ranger"
        return "standalone"

    def _check_recovery_start(self):
        cmd = "%s; %s/bin/pg_controldata %s |grep 'Database cluster state';" % (source_hawq_env, self.GPHOME, self.master_data_directory)
        result, stdout, stderr = remote_ssh_output(cmd, self.master_host_name, '')

        if stdout.find("recovery") != -1:
            hawq_recovery_state = 'recoverying'
        elif stdout.find("starting up") != -1:
            hawq_recovery_state = 'starting_up'
        elif stdout.find("in production") != -1:
            hawq_recovery_state = 'recovery_success'
        else:
            hawq_recovery_state = 'recovery_failed'

        return hawq_recovery_state

    def _start_master_cmd(self):
        logger.info("Start master service")
        if self.masteronly:
            start_options = "-i -M master -p %s --silent-mode=true -c gp_role=utility" % self.master_port
        elif self.special_mode == 'upgrade':
            start_options = "-i -M master -p %s --silent-mode=true -U" % self.master_port
        elif self.special_mode == 'maintenance':
            start_options = "-i -M master -p %s --silent-mode=true -m" % self.master_port
        elif self.restrict:
            start_options = "-i -M master -p %s --silent-mode=true -c superuser_reserved_connections=%s" % (self.master_port, self.max_connections)
        else:
            start_options = "-i -M master -p %s --silent-mode=true" % self.master_port

        cmd_str = "%s; %s/bin/pg_ctl start -w -t %s -D %s -l %s/pg_log/startup.log -o \\\"%s\\\" >> %s" \
                 % (source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory, self.master_data_directory, start_options, log_filename)
        return cmd_str

    def start_master(self):
        cmd = self._start_master_cmd()
        result = remote_ssh(cmd, self.master_host_name, self.user)
        if result != 0:
            hawq_recovery_state = self._check_recovery_start()
            if hawq_recovery_state == 'recoverying':
                recovery_sleep_time = 3
                logger.warn("Master is doing recovery, this might take minutes to hours")
                logger.warn("Please do not interrupt it and wait patient")
                sys.stdout.write("\r")

                while True:
                    sys.stdout.write(".")
                    sys.stdout.flush()
                    time.sleep(recovery_sleep_time)
                    if recovery_sleep_time < 60:
                        recovery_sleep_time = recovery_sleep_time + 1
                    hawq_recovery_state = self._check_recovery_start()

                    if hawq_recovery_state == 'recovery_success':
                        result = 0
                        break
                    elif hawq_recovery_state == 'recovery_failed':
                        logger.error("HAWQ master recoverying failed")
                        result = 1
                        break
                    else:
                        result = 0

                sys.stdout.write("\n")

        if self.node_type == "cluster" and (self.standby_host_name.lower() not in ('', 'none')):
            logger.info("Checking if standby is synced with master")
            sync_result = self._check_standby_sync()
            if sync_result == 3:
                logger.warn("Standby is not synced as: Standby master too far behind")
                logger.warn("Use 'hawq init standby -n' to do force sync")
            elif sync_result == 2:
                check_standby_sync_count = 0
                while self._check_standby_sync() == 2:
                    logger.info("Waiting standby to get synced for 3 seconds...")
                    time.sleep(3)
                    check_standby_sync_count += 1
                    if check_standby_sync_count > 20:
                        break

                sync_result = self._check_standby_sync()
                if sync_result != 0:
                    logger.warn("Standby is not synced after 60 seconds")
                    logger.warn("Use 'hawq init standby -n' to do force sync")
                else:
                    logger.info("Standby master is synced")
            else:
                logger.info("Standby master is synced")

        if result != 0:
            return result
        result = self.start_heartbeat_and_monitor(self.master_host_name, "master")
        return result

    def _start_standby_cmd(self):
        logger.info("Start standby master service")
        cmd_str = "%s; %s/bin/gpsyncmaster -D %s -i -p %s >> %s 2>&1 &" \
                 % (source_hawq_env, self.GPHOME, self.master_data_directory, self.standby_port, log_filename)
        return cmd_str

    def start_standby(self):
        cmd = self._start_standby_cmd()
        check_return_code(remote_ssh(cmd, self.standby_host_name, self.user))
        cmd = "%s; %s/sbin/hawqstandbywatch.py %s debug" % (source_hawq_env, self.GPHOME, self.master_data_directory)
        result = remote_ssh_nowait(cmd, self.standby_host_name, self.user)
        if result != 0:
            return result
        result = self.start_heartbeat_and_monitor(self.standby_host_name, "standby")
        return result

    def _check_standby_sync(self):
        try:
            dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
            conn = dbconn.connect(dburl, True)
            query = "select summary_state, detail_state from gp_master_mirroring;"
            rows = dbconn.execSQL(conn, query)
            conn.close()
        except DatabaseError, ex:
            logger.error("Failed to connect to database, this script can only be run when the database is up")

        cmd = '%s/bin/psql -p %s -d template1 -c \
                  "select summary_state, detail_state from gp_master_mirroring;"' % (self.GPHOME, self.master_port)
        (resutl, stdout, stderr) = local_ssh_output(cmd)
        if stdout.find('Standby master too far behind') != -1:
            return 3

        for row in rows:
            if row[0] != 'Synchronized':
                return 2
            else:
                return 0

    def _start_segment_cmd(self):
        logger.info("Start segment service")
        cmd_str = "%s; %s/bin/pg_ctl start -w -t %s -D %s -l %s/pg_log/startup.log -o \\\" -i -M %s -p %s --silent-mode=true\\\" >> %s" \
             % (source_hawq_env, self.GPHOME, self.timeout, self.segment_data_directory, self.segment_data_directory,
                "segment", self.segment_port, log_filename)
        return cmd_str

    def start_segment(self):
        cmd = self._start_segment_cmd()
        result = remote_ssh(cmd, 'localhost', self.user)
        return result

    def _start_all_nodes(self):
        logger.info("Start all the nodes in hawq cluster")

        if self.hawq_acl_type == 'ranger':
            self.start_rps(self.master_host_name)
        if self.standby_host_name.lower() not in ('', 'none'):
            if self.hawq_acl_type == 'ranger':
                self.start_rps(self.standby_host_name)
            logger.info("Starting standby master '%s'" % self.standby_host_name)
            check_return_code(self.start_standby(), logger, "Standby master start failed, exit",
                              "Standby master started successfully")

        logger.info("Starting master node '%s'" % self.master_host_name)
        check_return_code(self.start_master(), logger, "Master start failed, exit", \
                          "Master started successfully")

        segments_return_flag = self._start_all_segments()
        if segments_return_flag == 0:
            logger.info("HAWQ cluster started successfully")
        return segments_return_flag

    def _start_all_segments(self):
        logger.info("Start all the segments in hawq cluster")
        logger.info("Start segments in list: %s" % self.host_list)
        bad_hosts = []
        working_hosts = self.host_list
        if self.ignore_bad_hosts:
            working_hosts, bad_hosts = exclude_bad_hosts(self.host_list)
            if len(bad_hosts) == len(self.host_list):
                logger.error("Unable to SSH on any of the hosts, skipping segment start operation")
                return
            if len(bad_hosts) > 0:
                logger.warning("Skipping starting segments in the list {0}, SSH test failed".format(bad_hosts))
                self.hosts_count_number -= len(bad_hosts)

        # Start segments
        segment_cmd_str = self._start_segment_cmd()
        q = Queue.Queue()
        work_list = []
        for host in working_hosts:
            work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
        logger.info("Total segment number is: %s" % len(self.host_list))
        work_list.append({"func":check_progress,"args":(q, self.hosts_count_number, 'start', len(bad_hosts), self.quiet)})
        node_init = HawqCommands(name = 'HAWQ', action_name = 'start', logger = logger)
        node_init.get_function_list(work_list)
        node_init.start()
        logger.debug("Total threads return value is : %d" % node_init.return_flag)
        if node_init.return_flag != 0:
            logger.error("Segments start failed")
        else:
            logger.info("Segments started successfully")
        return node_init.return_flag

    def start_heartbeat_and_monitor(self,host,role):
        if self.standby_host_name in ('', 'none') or self.enable_master_auto_ha == 'off':
            return 0
        if role == "master":
            logger.info("Start heartbeat sender")
            cmd_str = "%s; nohup %s/bin/autoswitch.sh start heartbeat_sender %s/heartbeat_log" % (source_hawq_env, self.GPHOME, os.path.expanduser("~")+"/hawqAdminLogs")
        elif role == "standby":
            logger.info("Start heartbeat monitor")
            cmd_str = "%s; nohup %s/bin/autoswitch.sh start heartbeat_monitor %s/heartbeat_log" % (source_hawq_env, self.GPHOME, os.path.expanduser("~")+"/hawqAdminLogs")
        result = remote_ssh(cmd_str, host,self.user)
        return result

    def _start_rps(self, rps_hostname):
        logger.info("Start ranger plugin service on %s" % rps_hostname)
        cmd_str = "%s/ranger/bin/rps.sh start" % (self.GPHOME)
        result = remote_ssh(cmd_str, rps_hostname, self.user)
        return result

    def start_rps(self, rps_hostname):
        check_return_code(self._start_rps(rps_hostname), logger, \
                          "Ranger plugin service start on %s failed, exit" % rps_hostname, \
                          "Ranger plugin service started on %s successfully" % rps_hostname)

    def run(self):
        if self.node_type == "master":
            if self.hawq_acl_type == 'ranger':
                self.start_rps(self.master_host_name)
            check_return_code(self.start_master(), logger, \
                              "Master start failed, exit", "Master started successfully")
        elif self.node_type == "standby":
            if self.standby_host_name == '':
                sys.exit(1)
            if self.hawq_acl_type == 'ranger':
                self.start_rps(self.standby_host_name)
            check_return_code(self.start_standby(), logger, "Standby master start failed, exit",
                              "Standby master started successfully")
        elif self.node_type == "segment":
            check_return_code(self.start_segment(), logger, \
                              "Segment start failed, exit", "Segment started successfully")
        elif self.node_type == "cluster":
            check_return_code(self._start_all_nodes())
        elif self.node_type == "allsegments":
            check_return_code(self._start_all_segments())
        else:
            sys.exit('Node object should be in [master, standby, segment, allsegments, cluster]')
        return None


class HawqStop:
    def __init__(self, opts, hawq_dict):
        self.node_type = opts.node_type
        self.hawq_command = opts.hawq_command
        self.user= opts.user
        self.GPHOME = opts.GPHOME
        self.stop_mode = opts.stop_mode
        self.quiet  = opts.quiet_run
        self.log_dir = opts.log_dir
        self.timeout = opts.timeout_seconds
        self.hawq_dict = hawq_dict
        self.hawq_reload = opts.hawq_reload
        if self.hawq_reload:
            self.stop_action = 'reload'
            self.stop_action_past = 'reloaded'
        else:
            self.stop_action = 'stop'
            self.stop_action_past = 'stopped'

        self.lock = threading.Lock()
        self.dburl = None
        self.conn = None
        self._get_config()
        self.ignore_bad_hosts = opts.ignore_bad_hosts
        self.hawq_acl_type = self.check_hawq_acl_type()

    def _get_config(self):
        check_items = ('hawq_master_address_host', 'hawq_master_address_port',
                       'hawq_master_directory', 'hawq_segment_directory',
                       'hawq_segment_address_port',
                       'hawq_master_temp_directory', 'hawq_segment_temp_directory')

        for item in check_items:
            if item not in self.hawq_dict:
                sys.exit("Check: %s not configured in hawq-site.xml" % item)

        self.master_host_name = self.hawq_dict['hawq_master_address_host']
        self.master_port = self.hawq_dict['hawq_master_address_port']
        self.master_data_directory = self.hawq_dict['hawq_master_directory']
        self.master_address = self.master_host_name + ":" + self.master_port
        self.segment_data_directory = self.hawq_dict['hawq_segment_directory']
        self.segment_port = self.hawq_dict['hawq_segment_address_port']
        self.host_list = parse_hosts_file(self.GPHOME)
        self.hosts_count_number = len(self.host_list)

        if 'hawq_standby_address_host' in self.hawq_dict:
            self.standby_host_name = self.hawq_dict['hawq_standby_address_host']
            self.standby_port = self.master_port
            self.standby_address = self.standby_host_name + ":" + self.standby_port
        else:
            logger.info("No standby host configured")
            self.standby_host_name = ''

        if 'enable_master_auto_ha' in self.hawq_dict:
            self.enable_master_auto_ha = self.hawq_dict['enable_master_auto_ha']
        else:
            self.enable_master_auto_ha = 'off'

    def check_hawq_acl_type(self):
        try:
            dburl = dbconn.DbURL(port=self.master_port, username=self.user, dbname='template1')
            conn = dbconn.connect(dburl, True)
            query = "select name, setting from pg_catalog.pg_settings where name='hawq_acl_type';"
            rows = dbconn.execSQL(conn, query)
            conn.close()
        except DatabaseError, ex:
            logger.warning("Failed to connect to database, cannot get hawq_acl_type")
            return "unknown"

        for row in rows:
            if row[1].lower() == 'ranger':
                return "ranger"
        return "standalone"

    def _stop_master_checks(self):
        try:
            total_connections = 0
            self.dburl = dbconn.DbURL(hostname=self.master_host_name, port=self.master_port, username=self.user, dbname='template1')
            self.conn = dbconn.connect(self.dburl, utility=True)
            total_connections=len(catalog.getUserPIDs(self.conn))
            self.conn.close()
            logger.info("There are %d connections to the database" % total_connections)
        except DatabaseError, ex:
            logger.error("Failed to connect to the running database, please check master status")
            logger.error("Or you can check hawq stop --help for other stop options")
            sys.exit(1)

        if total_connections > 0 and self.stop_mode=='smart':
            logger.warning("There are other connections to this instance, shutdown mode smart aborted")
            logger.warning("Either remove connections, or use 'hawq stop master -M fast' or 'hawq stop master -M immediate'")
            logger.warning("See hawq stop --help for all options")
            logger.error("Active connections. Aborting shutdown...")
            sys.exit(1)

        logger.info("Commencing Master instance shutdown with mode='%s'" % self.stop_mode)
        logger.info("Master host=%s" % self.master_host_name)

        if self.stop_mode == 'smart':
            pass
        elif self.stop_mode == 'fast':
            logger.info("Detected %d connections to database" % total_connections)
            if total_connections > 0:
                logger.info("Switching to WAIT mode")
                logger.info("Will wait for shutdown to complete, this may take some time if")
                logger.info("there are a large number of active complex transactions, please wait...")
            else:
                if self.timeout == SEGMENT_TIMEOUT_DEFAULT:
                    logger.info("Using standard WAIT mode of %s seconds" % SEGMENT_TIMEOUT_DEFAULT)
                else:
                    logger.info("Using WAIT mode of %s seconds" % self.timeout)
        pass

    def _stop_master_cmd(self):
        logger.info("%s hawq master" % self.stop_action.title())
        if self.hawq_reload:
            cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
                      (source_hawq_env, self.GPHOME, self.master_data_directory, log_filename)
            return cmd_str
        else:
            cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
                      (source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory,
                       self.master_data_directory, self.stop_mode, log_filename)
            return cmd_str

    def _stop_master(self):
        master_host, master_running = check_hawq_running(self.master_host_name, self.master_data_directory, self.master_port, self.user, logger)
        if master_running:
            if self.stop_mode != 'immediate' and not self.hawq_reload:
                self._stop_master_checks()
            cmd = self._stop_master_cmd()
            result = remote_ssh(cmd, self.master_host_name, self.user)
            if result != 0:
                return result
        else:
            logger.warn('Master is not running, skip')

        result = self.stop_heartbeat_and_monitor(self.master_host_name, "master")
        return result

    def _stop_segment_cmd(self):
        logger.info("%s hawq segment" % self.stop_action.title())
        if self.hawq_reload:
            cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
                      (source_hawq_env, self.GPHOME, self.segment_data_directory, log_filename)
            return cmd_str
        else:
            # If stop cluster, firstly master stopped, then segment maybe hung at recv() from master.
            # So here change segment stop mode from smart to fast, so that segment don't wait
            # connection from master to be closed.
            seg_stop_mod = self.stop_mode
            if(self.node_type == 'cluster' and seg_stop_mod =='smart'):
                seg_stop_mod = 'fast'

            cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
                      (source_hawq_env, self.GPHOME, self.timeout, self.segment_data_directory,
                       self.segment_data_directory, seg_stop_mod, log_filename)
            return cmd_str

    def _stop_segment(self):
        seg_host, segment_running = check_hawq_running('localhost', self.segment_data_directory, self.segment_port, self.user, logger)
        if segment_running:
            cmd = self._stop_segment_cmd()
            result = remote_ssh(cmd, 'localhost', self.user)
            return result
        else:
            logger.warn('HAWQ segment is not running, skip')
            return 0

    def _stop_standby_cmd(self):
        logger.info("%s hawq standby master" % self.stop_action.title())
        if self.hawq_reload:
            cmd_str = "%s; %s/bin/pg_ctl reload -D %s >> %s" % \
                      (source_hawq_env, self.GPHOME, self.master_data_directory, log_filename)
            return cmd_str
        else:
            cmd_str = "%s; %s/bin/pg_ctl stop -w -t %s -D %s -l %s/pg_log/startup.log -m %s >> %s" % \
                      (source_hawq_env, self.GPHOME, self.timeout, self.master_data_directory,
                       self.master_data_directory, self.stop_mode, log_filename)
            return cmd_str

    def _stop_standby(self):
        if check_syncmaster_running(self.master_data_directory, '', self.standby_host_name, logger):
            cmd = self._stop_standby_cmd()
            result = remote_ssh(cmd, self.standby_host_name, self.user)
            if result != 0:
                return result
        else:
            logger.warn('Standby master is not running, skip')

        result = self.stop_heartbeat_and_monitor(self.standby_host_name, "standby")
        return result

    def _stopAll(self):
        logger.info("%s hawq cluster" % self.stop_action.title())
        master_result = 0
        standby_result = 0
        segments_return_flag = 0
        master_result = self._stop_master()
        if master_result != 0:
            logger.error("Master %s failed" % self.stop_action)
        else:
            logger.info("Master %s successfully" % self.stop_action_past)
        if self.standby_host_name.lower() not in ('', 'none'):
            standby_result = self._stop_standby()
            if standby_result != 0:
                logger.error("Standby master %s failed" % self.stop_action)
            else:
                logger.info("Standby master %s successfully" % self.stop_action_past)
        if self.hawq_acl_type in ['ranger', 'unknown']:
            self._stop_rps(self.master_host_name, self.hawq_acl_type)
            if self.standby_host_name.lower() not in ('', 'none'):
                self._stop_rps(self.standby_host_name, self.hawq_acl_type)

        # Execute segment stop command on each node.
        segments_return_flag = self._stopAllSegments()
        cluster_result = master_result + standby_result + segments_return_flag
        if cluster_result != 0:
            logger.error("Cluster %s failed" % self.stop_action)
        else:
            logger.info("Cluster %s successfully" % self.stop_action_past)
        return cluster_result

    def _running_segments_list(self, host_list):
        work_list = []
        running_host = []
        stopped_host = []
        seg_check_q = Queue.Queue()

        for host in host_list:
            work_list.append({"func":check_hawq_running,"args":(host, self.segment_data_directory, self.segment_port, self.user, logger)})

        node_checks = threads_with_return(name = 'HAWQ', action_name = 'check', logger = logger, return_values = seg_check_q)
        node_checks.get_function_list(work_list)
        node_checks.start()
        while not seg_check_q.empty():
            item = seg_check_q.get()
            if item[1] == True:
                running_host.append(item[0])
            else:
                stopped_host.append(item[0])

        return running_host, stopped_host

    def _stopAllSegments(self):
        bad_hosts = []
        working_hosts = self.host_list
        segment_cmd_str = self._stop_segment_cmd()
        logger.info("%s segments in list: %s" % (self.stop_action.title(), self.host_list))

        working_hosts, bad_hosts = exclude_bad_hosts(self.host_list)
        if len(bad_hosts) == len(self.host_list):
            logger.error("Unable to SSH on any of the hosts, skipping segment %s operation" % self.stop_action)
            return 1

        process_running_host, stopped_host = self._running_segments_list(working_hosts)

        # Execute segment stop command on specified nodes.
        if self.ignore_bad_hosts:
            if len(bad_hosts) > 0:
                logger.warning("Skipping {1} segments in the list {0}, SSH test failed".format(bad_hosts, self.stop_action))
            skip_host_list = bad_hosts + stopped_host
        else:
            skip_host_list = stopped_host

        # Stop segments
        work_list = []
        q = Queue.Queue()
        for host in process_running_host:
            work_list.append({"func":remote_ssh,"args":(segment_cmd_str, host, self.user, q)})
        logger.info("Total segment number is: %s" % len(self.host_list))
        work_list.append({"func":check_progress,"args":(q, len(process_running_host), self.stop_action, len(skip_host_list), self.quiet)})
        node_init = HawqCommands(name = 'HAWQ', action_name = self.stop_action, logger = logger)
        node_init.get_function_list(work_list)
        node_init.start()
        if self.ignore_bad_hosts:
            total_return_flag = node_init.return_flag
        else:
            if len(bad_hosts) > 0:
                logger.error("%s segment %s failed, SSH test failed on %s" % (len(bad_hosts), self.stop_action,  bad_hosts))
            total_return_flag = node_init.return_flag + len(bad_hosts)

        if total_return_flag != 0:
            logger.error("Segments %s failed" % self.stop_action)
        else:
            logger.info("Segments %s successfully" % self.stop_action_past)
        return total_return_flag

    def stop_heartbeat_and_monitor(self,host,role):
        if self.hawq_reload:
            # not stop  when running 'hawq stop --reload'.
            return 0
        if role in [ "master", "standby" ]:
            logger.info("Stop auto-switch service ")
            cmd_str = "%s; %s/bin/autoswitch.sh stop " % (source_hawq_env, self.GPHOME)
        else:
            logger.error("heartbeat or its moniter stop is not applicable on role %s. \
                          Must be master or standby" % (role))

        result = remote_ssh(cmd_str, host, self.user)
        check_return_code(result, logger, \
                          "auto-switch service stop failed", \
                          "auto-switch service stop succesfully")
        return result

    def _stop_rps(self, rps_hostname, acl_type):
        if self.hawq_reload:
            # not stop RPS when running 'hawq stop --reload'.
            return
        check_ret = True
        if acl_type == 'unknown':
            logger.warning("Try to stop RPS when hawq_acl_type is unknown")
            check_ret = False
        logger.info("Stop ranger plugin service on %s" % rps_hostname)
        cmd_str = "%s/ranger/bin/rps.sh stop" % (self.GPHOME)
        result = remote_ssh(cmd_str, rps_hostname, self.user)
        if check_ret:
            check_return_code(result, logger, \
                          "Ranger plugin service stop failed on %s, exit" % rps_hostname, \
                          "Ranger plugin service stopped on %s successfully" % rps_hostname)

    def run(self):
        if self.node_type == "master":
            check_return_code(self._stop_master(), logger, \
                              "Master %s failed, exit" % self.stop_action, "Master %s successfully" % self.stop_action_past)
            if self.hawq_acl_type in ['ranger', 'unknown']:
                self._stop_rps(self.master_host_name, self.hawq_acl_type)
        elif self.node_type == "standby":
            if self.standby_host_name.lower() not in ('', 'none'):
                check_return_code(self._stop_standby(), logger, \
                                  "Standby master %s failed, exit" % self.stop_action, "Standby master %s successfully" % self.stop_action_past)
                if self.hawq_acl_type in ['ranger', 'unknown']:
                    self._stop_rps(self.standby_host_name, self.hawq_acl_type)
        elif self.node_type == "segment":
            check_return_code(self._stop_segment(), logger, \
                              "Segment %s failed, exit" % self.stop_action, "Segment %s successfully" % self.stop_action_past)
        elif self.node_type == "cluster":
            check_return_code(self._stopAll())
        elif self.node_type == "allsegments":
            check_return_code(self._stopAllSegments())
        else:
            sys.exit('Node object should be in [master, standby, segment, allsegments, cluster]')
        return None

def flush_status(charc, seconds):
    sys.stdout.write(charc)
    sys.stdout.flush()
    time.sleep(seconds)


def write_out_tofile(out, err, file):
    with open(file, 'a+') as f:
        f.write("stdout: " + out + '\n')
        f.write("stderr: " + err + '\n')


def create_logger(logname='hawq logger', outputFile='/tmp/hawq_mgmt.log'):
    logger = logging.getLogger(logname)
    logger.setLevel(logging.DEBUG) 
    fh = logging.FileHandler(outputFile)
    fh.setLevel(logging.DEBUG)  
    ch = logging.StreamHandler() 
    ch.setLevel(logging.DEBUG) 
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') 
    fh.setFormatter(formatter) 
    ch.setFormatter(formatter) 
    logger.addHandler(fh)
    #logger.addHandler(ch) 
    return logger


def get_args():
    (opts, ARGS) = create_parser()
    if ARGS[0]:
        opts.hawq_command = ARGS[0]
    if ARGS[1]:
        opts.node_type = ARGS[1]

    if opts.node_type in ['master', 'standby', 'segment', 'cluster', 'allsegments'] and opts.hawq_command in ['start', 'stop', 'restart', 'init', 'activate']:
        if opts.log_dir and not os.path.exists(opts.log_dir):
            os.makedirs(opts.log_dir)

        global logger, log_filename
        if opts.verbose:
            enable_verbose_logging()
        if opts.quiet_run:
            quiet_stdout_logging()
        logger, log_filename = setup_hawq_tool_logging('hawq_%s' % opts.hawq_command,getLocalHostname(),getUserName(), opts.log_dir)
        logger.info("Prepare to do 'hawq %s'" % opts.hawq_command)
        logger.info("You can find log in:")
        logger.info(log_filename)
    else:
        print COMMON_HELP
        sys.exit(1)

    args_lens = len(ARGS)
    
    if args_lens != 2:
        sys.exit('Only support two arguements.')
    
    if opts.verbose and opts.quiet_run:
        logger.error("Multiple actions specified.  See the --help info.")
        sys.exit(1)

    if opts.special_mode and (opts.hawq_command != 'start'):
        logger.error("['-U', '--special-mode'] only apply to 'hawq start'.  See the --help info.")
        sys.exit(1)

    opts.GPHOME = os.getenv('GPHOME')
    if not opts.GPHOME:
        logger.error("Didn't get GPHOME value, exit")
        sys.exit()
    logger.info("GPHOME is set to:")
    logger.info(opts.GPHOME)
    global source_hawq_env
    source_hawq_env = "source %s/greenplum_path.sh" % opts.GPHOME 

    if opts.user == "":
        opts.user = getpass.getuser()
    if opts.user == "root":
        logger.error("'root' user is not allowed")
        sys.exit()
    logger.debug("Current user is '%s'" % opts.user)
    logger.debug("Parsing config file:")
    logger.debug("%s/etc/hawq-site.xml" % opts.GPHOME)
    hawqsite = HawqXMLParser(opts.GPHOME)
    hawqsite.get_all_values()
    hawq_dict = hawqsite.hawq_dict
    cluster_host_list = list()
    cluster_host_list.append(hawq_dict['hawq_master_address_host'])

    if 'hawq_standby_address_host' in hawq_dict:
        cluster_host_list.append(hawq_dict['hawq_standby_address_host'])

    segments_host_list = parse_hosts_file(opts.GPHOME)
    for host in segments_host_list:
        cluster_host_list.append(host)

    if opts.log_dir:
        logger.debug("Check and create log directory %s on all hosts" % opts.log_dir)
        create_success_host, create_failed_host = create_cluster_directory(opts.log_dir, cluster_host_list)
        if len(create_failed_host) > 0:
            logger.error("Create log directory %s failed on hosts %s" % (opts.log_dir, create_failed_host))
            logger.error("Please check directory permission")

    if opts.hawq_reload:
        logger.info("Reloading configuration without restarting hawq cluster")
    else:
        logger.info("%s hawq with args: %s" % (opts.hawq_command.title(), ARGS))
    return opts, hawq_dict


def remote_ssh(cmd_str, host, user, q=None):
    if user == "":
        remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd_str)
    else:
        remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd_str)
    try:
        result = subprocess.Popen(remote_cmd_str, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
        stdout,stderr = result.communicate()
    except subprocess.CalledProcessError:
        print "Execute shell command on %s failed" % host
        pass

    if stdout and stdout != '':
        logger.info(stdout.strip())
    if stderr and stderr != '':
        logger.info(stderr.strip())
    if q:
        q.put(("done", host, result.returncode))
    return result.returncode


def remote_ssh_nowait(cmd, host, user):
    if user == "":
        remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s \"%s\"" % (host, cmd)
    else:
        remote_cmd_str = "ssh -o 'StrictHostKeyChecking no' %s@%s \"%s\"" % (user, host, cmd)
    result = subprocess.Popen(remote_cmd_str, shell=True).wait()
    return result


def check_progress(q, total_num, action, skip_num = 0, quiet=False, service="segments"):
    working_num = total_num
    success_num = 0
    pnum = 0
    sys.stdout.write("\r")
    while working_num > 0:
        while not q.empty():
            msg = q.get()
            if msg[0] == "done":
                working_num = working_num - 1
            if msg[2] == 0:
                success_num = success_num + 1
        if not quiet:
            sys.stdout.write(".")
            sys.stdout.flush()
        time.sleep(1)
    if not quiet:
        sys.stdout.write("\n")
    if skip_num != 0:
        logger.info("%s %d of %d %s, %d %s %s skipped" % (action, success_num, total_num, service, skip_num, service, action))
    else:
        logger.info("%s %d of %d %s" % (action, success_num, total_num, service))
    return 0


def start_hawq(opts, hawq_dict):
    instance = HawqStart(opts, hawq_dict)
    instance.run()
    return None


def stop_hawq(opts, hawq_dict):
    instance = HawqStop(opts, hawq_dict)
    instance.run()
    return None


def hawq_init(opts, hawq_dict):
    instance = HawqInit(opts, hawq_dict)
    instance.run()
    return None


def hawq_activate_standby(opts, hawq_dict):
    old_master_host_name = hawq_dict['hawq_master_address_host']
    hawq_master_directory = hawq_dict['hawq_master_directory']
    if 'hawq_standby_address_host' in hawq_dict:
        if hawq_dict['hawq_standby_address_host'].lower() not in ['none', '', 'localhost']:
            old_standby_host_name = hawq_dict['hawq_standby_address_host']
            new_master_host_name = hawq_dict['hawq_standby_address_host']
            logger.info("Starting to activate standby master '%s'" % old_standby_host_name)
        else:
            logger.error("No valid standby host name found, skip activate standby")
            sys.exit(1)
    else:
        logger.error("No valid standby host name found, skip activate standby")
        sys.exit(1)

    # Try to stop hawq cluster before doing standby activate.
    if check_postgres_running(hawq_master_directory, '', old_master_host_name, logger):
        logger.info("Try to stop hawq master before activate standby")
        cmd = "%s; hawq_ctl stop master -a -M fast -q;" % source_hawq_env
        result = remote_ssh(cmd, old_master_host_name, '')
        if result != 0:
            logger.error("Stop master failed, try again with immediate mode")
            cmd = "%s; hawq_ctl stop master -a -M immediate -q;" % source_hawq_env
            return_result = remote_ssh(cmd, old_master_host_name, '')
            if return_result != 0:
                logger.error("Stop master failed, abort")
                logger.error("Please manually bring hawq cluster down, then do activate standby again")
                sys.exit(1)

        logger.info("Master is stopped")
    else:
        logger.info("HAWQ master is not running, skip")

    ignore_bad_hosts = '--ignore-bad-hosts' if opts.ignore_bad_hosts else ''
    logger.info("Stopping all the running segments")
    cmd = "%s; hawq_ctl stop allsegments -a -M fast -q %s;" % (source_hawq_env, ignore_bad_hosts)
    result = remote_ssh(cmd, old_standby_host_name, '')
    if result != 0:
        logger.error("Stop segments failed, abort")
        logger.error("Please manually bring hawq cluster down, then do activate standby again")
        sys.exit(1)

    logger.info("Stopping running standby")
    if check_syncmaster_running(hawq_master_directory, '', old_standby_host_name, logger):
        cmd = "%s; hawq_ctl stop standby -a -M fast -q;" % source_hawq_env
        result = remote_ssh(cmd, old_standby_host_name, '')
        if result != 0:
            logger.error("Stop standby failed, abort")
            logger.error("Please manually bring hawq cluster down, then do activate standby again")
            sys.exit(1)
    else:
        logger.info("Standby master is not running, skip")
    
    # Set current standby host name as the new master host name in configuration.
    logger.info("Update master host name in hawq-site.xml")
    cmd = "%s; hawqconfig -c hawq_master_address_host -v %s --skipvalidation -q %s" % (source_hawq_env, hawq_dict['hawq_standby_address_host'], ignore_bad_hosts)
    check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Set hawq_master_address_host failed")

    # Remove the old standby host configuration from hawq-site.xml.
    logger.info("Remove current standby from hawq-site.xml")
    cmd = "%s; hawqconfig -r hawq_standby_address_host --skipvalidation -q %s" % (source_hawq_env, ignore_bad_hosts)
    check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Remove hawq_standby_address_host from configuration failed")

    cmd = '''echo "gp_persistent_repair_global_sequence = true" >> %s/%s''' % (hawq_dict['hawq_master_directory'], 'postgresql.conf')
    check_return_code(remote_ssh(cmd, old_standby_host_name, ''), logger, "Set gp_persistent_repair_global_sequence = true failed")

    # Start the new master in master only mode.
    logger.info("Start master in master only mode")
    cmd = "%s; hawq_ctl start master --masteronly -q" % source_hawq_env
    check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start master in master only mode failed")

    # Remove the old standby information in database.
    logger.info("Remove current standby from catalog")
    cmd = "%s; env PGOPTIONS=\\\"-c gp_session_role=utility\\\" psql -p %s -d template1 -o /dev/null -c \\\"select gp_remove_master_standby()\
            where (select count(*) from gp_segment_configuration where role='s') = 1;\\\"" % (source_hawq_env, hawq_dict['hawq_master_address_port'])
    result = remote_ssh(cmd, new_master_host_name, '')

    # Try to restart hawq cluster.
    logger.info("Stop hawq master")
    cmd = "%s; hawq_ctl stop master -a -M fast -q" % source_hawq_env
    check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Stop master failed")
    logger.info("Start hawq cluster")
    cmd = "%s; hawq_ctl start master" % source_hawq_env
    check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start master failed")
    cmd = "%s; hawq_ctl start allsegments %s" % (source_hawq_env, ignore_bad_hosts)
    check_return_code(remote_ssh(cmd, new_master_host_name, ''), logger, "Start all the segments failed")
    cmd = '''sed -i "/gp_persistent_repair_global_sequence/d" %s/%s''' % (hawq_dict['hawq_master_directory'], 'postgresql.conf')
    check_return_code(remote_ssh(cmd, new_master_host_name, ''))
    logger.info("HAWQ activate standby successfully")
    return None


def restart_hawq(opts, hawq_dict):
    logger.info("Restarting hawq:")
    stop_hawq(opts, hawq_dict)
    start_hawq(opts, hawq_dict)
    return None


def create_parser():
    parser = OptionParser(usage="HAWQ management scripts options")
    parser.add_option("-a", "--prompt",
                      action="store_false",
                      dest="prompt",
                      default=True,
                      help="Execute automatically")
    parser.add_option("-M", "--mode",
                      choices=['smart', 'immediate', 'fast'],
                      dest="stop_mode",
                      default="smart",
                      help="HAWQ stop mode: smart/fast/immediate")
    parser.add_option("-v", "--verbose",
                      action="store_true",
                      dest="verbose",
                      help="Execute with verbose output")
    parser.add_option("-q", "--quiet",
                      action="store_true",
                      dest="quiet_run",
                      help="Execute in quiet mode")
    parser.add_option("-l", "--logdir",
                      dest="log_dir",
                      help="Sets the directory for log files")
    parser.add_option("-t", "--timeout",
                      dest="timeout_seconds",
                      default="600",
                      help="Set the timeout seconds, default is 600")
    parser.add_option("--user",
                      dest="user",
                      default="",
                      help="Sets HAWQ user")
    parser.add_option("-u", "--reload",
                      dest="hawq_reload",
                      action="store_true",
                      default=False,
                      help="Reload hawq configuration")
    parser.add_option("-m", "--masteronly",
                      dest="masteronly",
                      action="store_true",
                      default=False,
                      help="Start hawq in utility mode")
    parser.add_option("-U", "--special-mode",
                      choices=['upgrade', 'maintenance'],
                      dest="special_mode",
                      help="Start hawq in upgrade/maintenance mode")
    parser.add_option("-R", "--restrict",
                      dest="restrict",
                      action="store_true",
                      default=False,
                      help="Start hawq in restrict mode")
    parser.add_option('-r', '--remove-standby', action='store_true',
                      dest='remove_standby', default=False,
                      help='Delete hawq standby master node.')
    parser.add_option('-s', '--standby-host', type='string',
                      dest='new_standby_hostname', default='none',
                      help='Hostname of system to create standby master on')
    parser.add_option('-n', '--no-update', action='store_true',
                      dest='no_update', default=False,
                      help='Do not update system catalog tables.')
    parser.add_option('-i', '--ignore-bad-hosts',
                      dest='ignore_bad_hosts', action='store_true',
                      default=False,
                      help='Skips syncing configuration files on hosts on which SSH fails')
    parser.add_option("--bucket_number",
                      type="int",
                      dest="default_hash_table_bucket_number",
                      help="Sets maximum number of virtual segments per node")
    parser.add_option("--tde_keyname",
                      dest="tde_keyname",
                      default="",
                      help="Sets the encryption zone key(EZK) name for the hawq directory(hawq_dfs_url)")
    parser.add_option("--locale",
                      dest="hawq_locale",
                      default="en_US.utf8",
                      help="Sets the locale name")
    parser.add_option("--lc-collate",
                      dest="hawq_lc_collate",
                      default="en_US.utf8",
                      help="Sets the string sort order")
    parser.add_option("--lc-ctype",
                      dest="hawq_lc_ctype",
                      default="en_US.utf8",
                      help="Sets character classification")
    parser.add_option("--lc-messages",
                      dest="hawq_lc_messages",
                      default="en_US.utf8",
                      help="Sets the language in which messages are displayed")
    parser.add_option("--lc-monetary",
                      dest="hawq_lc_monetary",
                      default="en_US.utf8",
                      help="Sets the locale to use for formatting monetary amounts")
    parser.add_option("--lc-numeric",
                      dest="hawq_lc_numeric",
                      default="en_US.utf8",
                      help="Sets the locale to use for formatting numbers")
    parser.add_option("--lc-time",
                      dest="hawq_lc_time",
                      default="en_US.utf8",
                      help="Sets the locale to use for formatting dates and times")
    parser.add_option("--max_connections",
                      dest="max_connections",
                      default="1280",
                      help="Sets the max_connections for formatting hawq database")
    parser.add_option("--shared_buffers",
                      dest="shared_buffers",
                      default="128000kB",
                      help="Sets the shared_buffers for formatting hawq database")
    parser.add_option("--with_magma",
                      action='store_true',
                      dest="with_magma",
                      default=False,
                      help="Start hawq cluster with magma")

    (options, args) = parser.parse_args()
    if len(args) == 0:
        parser.print_help()
        sys.exit(1)

    return (options, args)

def local_run(cmd):
    '''Execute shell command on local machine.'''
    result = subprocess.Popen(cmd, shell=True).wait()
    return result

if __name__ == '__main__':
    (opts, hawq_dict) = get_args()

    if opts.with_magma and opts.node_type != 'cluster':
        logger.error("option --with_magma is only compatible with cluster")
        sys.exit(1)

    source_hawq_env = ". %s/greenplum_path.sh" % opts.GPHOME
    if opts.hawq_command == 'start':
        start_hawq(opts, hawq_dict)
        if opts.with_magma:
            cmd = "%s; magma start %s" % (source_hawq_env, 'cluster')
            check_return_code(local_run(cmd), logger, "hawq start with magma failed")
    elif opts.hawq_command == 'stop':
        if opts.prompt:
            if not userinput.ask_yesno(None, "\nContinue with HAWQ service stop", 'N'):
                sys.exit(1)
        stop_hawq(opts, hawq_dict)
        if opts.with_magma :
            cmd = "%s; magma stop %s" % (source_hawq_env, 'cluster')
            check_return_code(local_run(cmd), logger, "hawq stop with magma failed")
    elif opts.hawq_command == 'restart':
        restart_hawq(opts, hawq_dict)
        if opts.with_magma :
            cmd = "%s; magma restart %s" % (source_hawq_env, 'cluster')
            check_return_code(local_run(cmd), logger, "hawq restart with magma failed")
    elif opts.hawq_command == 'init':
        if opts.node_type == 'standby':
            warn_msg = "\nMaster will get restarted, continue with standby init"
        else:
            warn_msg = "\nContinue with HAWQ init"

        if opts.prompt:
            if not userinput.ask_yesno(None, warn_msg, 'N'):
                sys.exit(1)
        hawq_init(opts, hawq_dict)
        if opts.with_magma :
            cmd = "%s; magma start %s" % (source_hawq_env, 'cluster')
            check_return_code(local_run(cmd), logger, "hawq init with magma failed")
    elif opts.hawq_command == 'activate':
        if opts.prompt:
            if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
                sys.exit(1)
        hawq_activate_standby(opts, hawq_dict)
    else:
        print COMMON_HELP

