| #!/usr/bin/env python3 |
| # |
| # Copyright (c) Greenplum Inc 2008. All Rights Reserved. |
| # |
| # |
| # THIS IMPORT MUST COME FIRST |
| # |
| # import mainUtils FIRST to get python version check |
| from gppylib.mainUtils import * |
| |
| import os, sys |
| import signal |
| import time |
| import pprint |
| from optparse import Option, OptionGroup, OptionParser, OptionValueError, SUPPRESS_USAGE |
| import socket |
| from gppylib.mainUtils import parseStatusLine |
| |
| try: |
| from gppylib.gpparseopts import OptParser, OptChecker |
| from gppylib.gplog import * |
| from gppylib.db import dbconn |
| from gppylib.db import catalog |
| from gppylib.gparray import * |
| from gppylib import userinput |
| from gppylib import pgconf |
| from gppylib.commands import unix |
| from gppylib.commands import gp |
| from gppylib.commands.gp import SEGMENT_STOP_TIMEOUT_DEFAULT, SegmentStop, GpSegStopCmd |
| from gppylib.commands import base |
| from gppylib.commands import pg |
| from gppylib.commands import dca |
| from gppylib.utils import TableLogger |
| from gppylib.gp_era import GpEraFile |
| from gppylib.operations.utils import ParallelOperation, RemoteOperation |
| from gppylib.operations.rebalanceSegments import ReconfigDetectionSQLQueryCommand |
| from gppylib.operations.detect_unreachable_hosts import get_unreachable_segment_hosts |
| except ImportError as e: |
| sys.exit('ERROR: Cannot import modules. Please check that you have sourced cloudberry-env.sh. Detail: ' + str(e)) |
| |
| DEFAULT_NUM_WORKERS = 64 |
| logger = get_default_logger() |
| |
| PG_CTL_STATUS_RUNNING = 0 |
| PG_CTL_STATUS_STOPPED = 3 |
| PG_CTL_STATUS_SLEEP_INTERVAL = 1.0 # number of seconds to sleep before issuing next pg_ctl status |
| |
| |
| # --------------------------------------------------------------- |
| class SegStopStatus: |
| """Tracks result of trying to stop an individual segment database""" |
| |
| def __init__(self, db, stopped=False, reason=None, failedCmd=None, timedOut=False): |
| self.db = db |
| self.stopped = stopped |
| self.reason = reason |
| self.failedCmd = failedCmd |
| self.timedOut = timedOut |
| |
| def __str__(self): |
| if self.stopped: |
| return "DBID:%d STOPPED" % self.db.dbid |
| else: |
| return "DBID:%d FAILED host:'%s' datadir:'%s' with reason:'%s'" % ( |
| self.db.dbid, self.db.hostname, self.db.datadir, self.reason) |
| |
| |
| def print_progress(pool, interval=10): |
| """ |
| Waits for a WorkerPool to complete, printing a progress percentage marker |
| once at the beginning of the call, and thereafter at the provided interval |
| (default ten seconds). A final 100% marker is printed upon completion. |
| """ |
| def print_completed_percentage(): |
| # pool.completed can change asynchronously; save its value. |
| completed = pool.completed |
| |
| pct = 0 |
| if pool.assigned: |
| pct = float(completed) / pool.assigned |
| |
| pool.logger.info('%0.2f%% of jobs completed' % (pct * 100)) |
| return completed >= pool.assigned |
| |
| # print_completed_percentage() returns True if we're done. |
| while not print_completed_percentage(): |
| pool.join(interval) |
| |
| |
| # --------------------------------------------------------------- |
| class GpStop: |
| ###### |
| def __init__(self, mode, coordinator_datadir=None, |
| parallel=DEFAULT_NUM_WORKERS, quiet=False, coordinatoronly=False, sighup=False, |
| interactive=False, stopstandby=False, restart=False, |
| timeout=SEGMENT_STOP_TIMEOUT_DEFAULT, logfileDirectory=False, onlyThisHost=None, |
| fts_hosts=None, etcd_hosts=None, is_external_fts=False): |
| self.mode = mode |
| self.coordinator_datadir = coordinator_datadir |
| self.pool = None |
| self.parallel = parallel |
| self.quiet = quiet |
| self.pid = 0 |
| self.coordinatoronly = coordinatoronly |
| self.sighup = sighup |
| self.interactive = interactive |
| self.stopstandby = stopstandby |
| self.restart = restart |
| self.hadFailures = False |
| self.timeout = timeout |
| self.logfileDirectory = logfileDirectory |
| self.onlyThisHost = onlyThisHost |
| self.fts_hosts = fts_hosts |
| self.etcd_hosts = etcd_hosts |
| # some variables that will be assigned during run() |
| self.gphome = None |
| self.port = None |
| self.dburl = None |
| self.conn = None |
| self.gparray = None |
| self.gpversion = None |
| self.is_external_fts=is_external_fts |
| |
| logger.debug("Setting level of parallelism to: %d" % self.parallel) |
| pass |
| |
| |
| ##### |
| def _find_any_primary_and_mirror_on_same_host(self, segs): |
| map_content_id_to_segment = {} |
| for seg in segs: |
| seg_content_id = seg.getSegmentContentId() |
| if seg_content_id in map_content_id_to_segment: |
| if seg.isSegmentPrimary(True): |
| return seg, map_content_id_to_segment[seg_content_id] |
| else: |
| return map_content_id_to_segment[seg_content_id], seg |
| else: |
| map_content_id_to_segment[seg_content_id] = seg |
| |
| return None, None |
| |
| def _filter_segments_for_single_host_stop(self): |
| host_to_segments_mapping = self.gparray.getSegmentsByHostName(self.gparray.getSegDbList()) |
| segments_on_host = host_to_segments_mapping[self.onlyThisHost] |
| up_segments_on_host = [seg for seg in segments_on_host if seg.isSegmentUp()] |
| matching_primary, matching_mirror = self._find_any_primary_and_mirror_on_same_host(up_segments_on_host) |
| if matching_primary or matching_mirror: |
| raise Exception("Segment host '%s' has both of corresponding primary " |
| "'%s' and mirror '%s'. Aborting." % (matching_primary.getSegmentHostName(), |
| matching_primary, matching_mirror)) |
| |
| for seg in up_segments_on_host: |
| if seg.isSegmentModeSynchronized(): |
| continue |
| raise Exception("Segment '%s' not synchronized. Aborting." % seg) |
| |
| return up_segments_on_host |
| |
| |
| def run(self): |
| """ |
| Run and return the exitCode for the program |
| """ |
| self._prepare() |
| |
| if self.gparray.get_segment_count() == 0: |
| self.coordinatoronly = True |
| |
| if self.coordinatoronly: |
| if self.sighup: |
| return self._sighup_cluster() |
| if self.interactive: |
| if not userinput.ask_yesno(None, "\nContinue with coordinator-only shutdown", 'N'): |
| raise UserAbortedException() |
| try: |
| # Disable Ctrl-C |
| signal.signal(signal.SIGINT, signal.SIG_IGN) |
| |
| self._stop_coordinator() |
| finally: |
| # Reenable Ctrl-C |
| self.cleanup() |
| signal.signal(signal.SIGINT, signal.default_int_handler) |
| if self.is_external_fts: |
| self._clean_fts() |
| if self.etcd_hosts is not None: |
| self._clean_etcd() |
| if self.restart: |
| logger.info("Restarting System...") |
| gp.NewGpStart.local('restarting system', verbose=logging_is_verbose(), coordinatorOnly=self.coordinatoronly, |
| nostandby=not self.stopstandby, coordinatorDirectory=self.coordinator_datadir) |
| else: |
| if self.sighup: |
| return self._sighup_cluster() |
| else: |
| |
| segs = [] |
| if self.onlyThisHost: |
| if self.onlyThisHost in self.gparray.get_coordinator_host_names(): |
| raise Exception("Specified host '%s' has the coordinator or standby coordinator on it. This node can only be stopped as part of a full-cluster gpstop, without '--host'." % |
| self.onlyThisHost) |
| if not self.gparray.hasMirrors: |
| raise Exception("Cannot perform host-specific gpstop on a cluster without segment mirroring.") |
| segs = self._filter_segments_for_single_host_stop() |
| else: |
| segs = self.gparray.getSegDbList() |
| |
| if self.interactive: |
| self._summarize_actions(segs) |
| if not userinput.ask_yesno(None, "\nContinue with Cloudberry instance shutdown", 'N'): |
| raise UserAbortedException() |
| |
| try: |
| # Disable Ctrl-C |
| signal.signal(signal.SIGINT, signal.SIG_IGN) |
| |
| if self.onlyThisHost is None: |
| self._stop_coordinator() |
| self._stop_standby() |
| self._stop_segments(segs) |
| finally: |
| # Reenable Ctrl-C |
| self.cleanup() |
| signal.signal(signal.SIGINT, signal.default_int_handler) |
| |
| if self.onlyThisHost: |
| logger.info("Recognizing new cluster state...") |
| try: |
| # currently responsible for triggering an update to gp_segment_configuration |
| # because dbconn.connect() internally calls commit() |
| self.conn = dbconn.connect(self.dburl) |
| |
| # backup in case connect() does not do a commit |
| ReconfigDetectionSQLQueryCommand(self.conn).run() |
| except Exception as e: |
| logger.debug('query trying to start a transaction failed: %s' % str(e)) |
| logger.debug('expected: the purpose was that by attempting a transaction, gp_segment_configuration would be updated') |
| finally: |
| if self.conn: |
| self.conn.close() |
| if self.is_external_fts: |
| self._clean_fts() |
| if self.etcd_hosts is not None: |
| self._clean_etcd() |
| |
| if self.restart: |
| logger.info("Restarting System...") |
| gp.NewGpStart.local('restarting system', verbose=logging_is_verbose(), |
| nostandby=not self.stopstandby, coordinatorDirectory=self.coordinator_datadir) |
| else: |
| if dca.is_dca_appliance(): |
| logger.info("Unregistering with DCA") |
| dca.DcaGpdbStopped.local() |
| logger.info("Unregistered with DCA") |
| |
| if self.hadFailures: |
| # MPP-15208 |
| return 2 |
| return 0 |
| |
| def _clean_fts(self): |
| if not os.path.exists(self.fts_hosts): |
| raise gp.GpError("FTS host file %s does not exist." % self.fts_hosts) |
| fts_host_machine_list = read_hosts(self.fts_hosts) |
| logger.info("Start to kill FTS processes...") |
| for fts in fts_host_machine_list: |
| kill_fts(fts) |
| time.sleep(5) |
| for fts in fts_host_machine_list: |
| if check_fts(fts): |
| raise gp.GpError("FTS process on %s is still alive" % fts) |
| logger.info("kill all FTS processes successfully.") |
| |
| |
| def _clean_etcd(self): |
| if not os.path.exists(self.etcd_hosts): |
| raise gp.GpError("ETCD host file %s does not exist." % self.etcd_hosts) |
| etcd_host_machine_list = read_hosts(self.etcd_hosts) |
| logger.info("Start to kill ETCD processes...") |
| for etcd in etcd_host_machine_list: |
| kill_etcd(etcd) |
| |
| time.sleep(60) |
| for etcd in etcd_host_machine_list: |
| if check_etcd(etcd): |
| raise gp.GpError("ETCD process on %s is still alive" % etcd) |
| logger.info("kill all ETCD processes successfully.") |
| |
| |
| ###### |
| def cleanup(self): |
| if self.pool: |
| self.pool.haltWork() |
| |
| ###### |
| |
| def _prepare(self): |
| logger.info("Gathering information and validating the environment...") |
| self.gphome = gp.get_gphome() |
| if self.coordinator_datadir is None: |
| self.coordinator_datadir = gp.get_coordinatordatadir() |
| self.user = unix.getUserName() |
| gp.check_permissions(self.user) |
| self._read_postgresqlconf() |
| self._check_db_running() |
| self._build_gparray() |
| if self.onlyThisHost: |
| self._is_hostname_valid() |
| self._check_version() |
| |
| ###### |
| def _is_hostname_valid(self): |
| segments = self.gparray.getSegmentsByHostName( self.gparray.getDbList()) |
| host_names = list(segments.keys()) |
| if self.onlyThisHost not in host_names: |
| logger.error("host '%s' is not found in gp_segment_configuration" % self.onlyThisHost) |
| logger.error("hosts in cluster config: %s" % host_names) |
| raise SystemExit(1) |
| |
| ###### |
| def _check_version(self): |
| self.gpversion = gp.GpVersion.local('local Cloudberry version check', self.gphome) |
| logger.info("Cloudberry Version: '%s'" % self.gpversion) |
| |
| ###### |
| def _read_postgresqlconf(self): |
| logger.debug("Obtaining coordinator's port from coordinator data directory") |
| pgconf_dict = pgconf.readfile(self.coordinator_datadir + "/postgresql.conf") |
| self.port = pgconf_dict.int('port') |
| logger.debug("Read from postgresql.conf port=%s" % self.port) |
| |
| |
| ###### |
| |
| def _check_db_running(self): |
| if os.path.exists(self.coordinator_datadir + '/postmaster.pid'): |
| self.pid = gp.read_postmaster_pidfile(self.coordinator_datadir) |
| if not unix.check_pid(self.pid): |
| logger.warning("Have a postmaster.pid file but no Coordinator segment process running") |
| logger.info("Clearing postmaster.pid file and /tmp lock files") |
| |
| lockfile = "/tmp/.s.PGSQL.%s" % self.port |
| logger.info("Clearing Coordinator instance lock files") |
| os.remove(lockfile) |
| |
| logger.info("Clearing Coordinator instance pid file") |
| os.remove("%s/postmaster.pid" % self.coordinator_datadir) |
| |
| logger.info("Setting recovery parameters") |
| self.mode = 'fast' |
| logger.info("Commencing forced shutdown") |
| pass |
| else: |
| raise ExceptionNoStackTraceNeeded( |
| 'postmaster.pid file does not exist. is Cloudberry instance already stopped?') |
| |
| ###### |
| def _build_gparray(self): |
| logger.info("Obtaining Cloudberry Coordinator catalog information") |
| |
| logger.info("Obtaining Segment details from coordinator...") |
| self.dburl = dbconn.DbURL(port=self.port, dbname='template1') |
| self.gparray = GpArray.initFromCatalog(self.dburl, utility=True) |
| |
| class _SigIntHandler(object): |
| def __init__(self): |
| self.interrupted = False |
| |
| def _handler(self, signum, stack_frame): |
| self.interrupted = True |
| |
| def enable(self): |
| self.interrupted = False |
| signal.signal(signal.SIGINT, self._handler) |
| |
| def disable(self): |
| signal.signal(signal.SIGINT, signal.SIG_IGN) |
| |
| def __enter__(self): |
| self._prev_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) |
| |
| # Try to prevent easy mistakes by ensuring that SIGINT was |
| # ignored before we entered the manager. |
| if self._prev_handler != signal.SIG_IGN: |
| signal.signal(signal.SIGINT, self._prev_handler) |
| raise Exception("SIGINT disposition must be SIG_IGN before entering SigIntHandler") |
| return self |
| |
| def __exit__(self, exc_type, exc_value, traceback): |
| # Put back the previous handler. |
| signal.signal(signal.SIGINT, self._prev_handler) |
| return False |
| |
| |
| def _is_coordinator_stopped(self): |
| with open(os.devnull, 'w') as devnull: |
| ret = subprocess.call(['pg_ctl', 'status', '-D', self.coordinator_datadir], |
| stdout=devnull) |
| if ret == PG_CTL_STATUS_STOPPED: |
| return True |
| if ret != PG_CTL_STATUS_RUNNING: |
| raise Exception('pg_ctl status failed with return code %d' % ret) |
| return False |
| |
| |
| |
| # Send a no-wait stop to the coordinator, and then poll for it to stop. If the user gets impatient or |
| # the operation times out, show the connections and allow the user to upgrade the stop to fast or |
| # immediate mode. |
| def _stop_coordinator_smart(self): |
| logger.info("Stopping coordinator segment and waiting for user connections to finish ...") |
| |
| self.conn = dbconn.connect(self.dburl, utility=True) |
| user_connections_header, user_connections = catalog.getUserConnectionInfo(self.conn) |
| self.conn.close() |
| |
| ret = subprocess.call(['pg_ctl', 'stop', '-W', '-m', 'smart', '-D', self.coordinator_datadir]) |
| if ret: |
| raise Exception('pg_ctl stop smart failed') |
| |
| time_waited = 0 |
| wait_forever = False |
| with GpStop._SigIntHandler() as handler: |
| while not self._is_coordinator_stopped(): |
| # time.sleep() can be safely interrupted (https://docs.python.org/2/library/time.html#time.sleep) |
| handler.enable() |
| time.sleep(PG_CTL_STATUS_SLEEP_INTERVAL) |
| time_waited = time_waited + PG_CTL_STATUS_SLEEP_INTERVAL |
| handler.disable() |
| |
| if handler.interrupted or (time_waited >= self.timeout and not wait_forever): |
| handler.interrupted = False |
| |
| warn_text = """Smart mode failed to shutdown the coordinator.\n""" |
| if len(user_connections): |
| warn_text += """ There were {} user connections at the start of the shutdown. Note: These connections may be outdated. |
| The following connections were found before shutting down:\n""".format(len(user_connections)) |
| for user_conn in user_connections: |
| warn_text += " %s" % pprint.pformat(user_conn) |
| |
| logger.warn(warn_text) |
| logger.info("Continue waiting in smart mode, stop in fast mode, or stop in immediate mode.") |
| mode_selection = userinput.ask_string("Your_choice", |
| "['(s)mart_mode', '(f)ast_mode', '(i)mmediate_mode']", |
| 's', |
| ['s', 'f', 'i']) |
| logger.info("Your choice was '%s'" % mode_selection) |
| if mode_selection == 'f': |
| self.mode = 'fast' |
| return False |
| elif mode_selection == 'i': |
| self.mode = 'immediate' |
| return False |
| else: |
| wait_forever = True |
| logger.info("Continuing waiting for user connections to finish ...") |
| |
| |
| ###### |
| # NOTE: we must make sure the coordinator is down before leaving this routine |
| def _stop_coordinator(self, coordinatorOnly=False): |
| ''' shutsdown the coordinator ''' |
| |
| logger.info("Commencing Coordinator instance shutdown with mode='%s'" % self.mode) |
| logger.info("Coordinator segment instance directory=%s" % self.coordinator_datadir) |
| |
| e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose()) |
| e.end_era() |
| |
| # Get a list of postgres processes running before stopping the server |
| postgres_pids = gp.get_postgres_segment_processes(self.coordinator_datadir, self.gparray.coordinator.hostname) |
| logger.debug("Postgres processes running on coordinator host: {0}".format(postgres_pids)) |
| |
| try: |
| if self.mode == 'smart': |
| self._stop_coordinator_smart() |
| |
| # NOTE: _stop_coordinator_smart() can change self.mode |
| if self.mode != 'smart': |
| cmd = gp.CoordinatorStop("stopping coordinator", self.coordinator_datadir, mode=self.mode, timeout=self.timeout) |
| cmd.run(validateAfter=True) |
| except: |
| # Didn't stop in timeout or pg_ctl failed. So try kill |
| (succeeded, mypid, file_datadir) = pg.ReadPostmasterTempFile.local("Read coordinator tmp file", |
| self.dburl.pgport).getResults() |
| if succeeded and file_datadir == self.coordinator_datadir: |
| if unix.check_pid(mypid): |
| logger.info("Failed to shutdown coordinator with pg_ctl.") |
| logger.info("Sending SIGQUIT signal...") |
| os.kill(mypid, signal.SIGQUIT) |
| time.sleep(5) |
| |
| # Still not gone... try SIGABRT |
| if unix.check_pid(mypid): |
| logger.info("Sending SIGABRT signal...") |
| os.kill(mypid, signal.SIGABRT) |
| time.sleep(5) |
| |
| if not unix.check_pid(mypid): |
| # Clean up files |
| lockfile = "/tmp/.s.PGSQL.%s" % self.dburl.pgport |
| if os.path.exists(lockfile): |
| logger.info("Clearing segment instance lock files") |
| os.remove(lockfile) |
| |
| # Use the process list and make sure that all the processes are killed at the end |
| logger.info('Attempting forceful termination of any leftover coordinator process') |
| unix.kill_9_segment_processes(self.coordinator_datadir, postgres_pids, self.gparray.coordinator.hostname) |
| |
| logger.debug("Successfully shutdown the Coordinator instance in admin mode") |
| |
| ###### |
| def _stop_standby(self): |
| """ assumes prepare() has been called """ |
| if not self.stopstandby: |
| return True |
| |
| if self.gparray.standbyCoordinator: |
| standby = self.gparray.standbyCoordinator |
| |
| if get_unreachable_segment_hosts([standby.hostname], 1): |
| logger.warning("Standby is unreachable, skipping shutdown on standby") |
| return True |
| |
| # Get a list of postgres processes running before stopping the server |
| postgres_pids = gp.get_postgres_segment_processes(standby.datadir, standby.hostname) |
| logger.debug("Postgres processes running on standby host: {0}".format(postgres_pids)) |
| |
| logger.info("Stopping coordinator standby host %s mode=%s" % (standby.hostname, self.mode)) |
| try: |
| cmd = SegmentStop("stopping coordinator standby", |
| standby.datadir, mode=self.mode, |
| timeout=self.timeout, |
| ctxt=base.REMOTE, |
| remoteHost=standby.hostname) |
| cmd.run(validateAfter=True) |
| except base.ExecutionError as e: |
| logger.warning("Error occured while stopping the standby coordinator: %s" % e) |
| |
| if not pg.DbStatus.remote('checking status of standby coordinator instance', standby, standby.hostname): |
| logger.info("Successfully shutdown standby process on %s" % standby.hostname) |
| return True |
| else: |
| logger.warning("Process coordinator standby still running, will issue fast shutdown with immediate") |
| try: |
| cmd = SegmentStop("stopping coordinator standby", standby.datadir, mode='immediate', |
| timeout=self.timeout, |
| ctxt=base.REMOTE, remoteHost=standby.hostname) |
| cmd.run(validateAfter=True) |
| except base.ExecutionError as e: |
| logger.warning("Error occured while stopping the standby coordinator: %s" % e) |
| |
| if not pg.DbStatus.remote('checking status of standby coordinator instance', standby, standby.hostname): |
| logger.info("Successfully shutdown coordinator standby process on %s" % standby.hostname) |
| return True |
| else: |
| logger.error('Failed to stop standby. Attempting forceful termination of standby process') |
| |
| # Use the process list and make sure that all the processes are killed at the end |
| unix.kill_9_segment_processes(standby.datadir, postgres_pids, standby.hostname) |
| |
| if not pg.DbStatus.remote('checking status of standby coordinator instance', standby, standby.hostname): |
| logger.info("Successfully shutdown coordinator standby process on %s" % standby.hostname) |
| return True |
| else: |
| logger.error("Unable to stop coordinator standby on host: %s" % standby.hostname) |
| return False |
| else: |
| logger.info("No standby coordinator host configured") |
| return True |
| |
| ###### |
| def _stop_segments(self, segs): |
| failed_seg_status = [] |
| workers = min(len(self.gparray.get_hostlist()), self.parallel) |
| self.pool = base.WorkerPool(numWorkers=workers, logger=logger) |
| |
| logger.info("Targeting dbid %s for shutdown" % [seg.getSegmentDbId() for seg in segs]) |
| |
| |
| if self.gparray.hasMirrors: |
| # stop primaries |
| logger.info("Commencing parallel primary segment instance shutdown, please wait...") |
| try: |
| self._stopseg_cmds(True, False, segs=segs) |
| finally: |
| self.pool.join() |
| primary_success_seg_status = self._process_segment_stop(failed_seg_status) |
| |
| # stop mirrors |
| logger.info("Commencing parallel mirror segment instance shutdown, please wait...") |
| try: |
| self._stopseg_cmds(False, True, segs=segs) |
| finally: |
| self.pool.join() |
| mirror_success_seg_status = self._process_segment_stop(failed_seg_status) |
| |
| success_seg_status = primary_success_seg_status + mirror_success_seg_status |
| self._print_segment_stop(segs, failed_seg_status, success_seg_status) |
| |
| else: |
| logger.info("Commencing parallel segment instance shutdown, please wait...") |
| # There are no active-mirrors |
| try: |
| self._stopseg_cmds(True, False, segs=segs) |
| finally: |
| self.pool.join() |
| success_seg_status = self._process_segment_stop(failed_seg_status) |
| |
| self._print_segment_stop(segs, failed_seg_status, success_seg_status) |
| pass |
| |
| ###### |
| def _stopseg_cmds(self, includePrimaries, includeMirrors, segs): |
| host_segs_map = {} |
| for seg in segs: |
| if seg.getSegmentHostName() in list(host_segs_map.keys()): |
| host_segs_map[seg.getSegmentHostName()].append(seg) |
| else: |
| host_segs_map[seg.getSegmentHostName()] = [seg] |
| |
| for hostname, gpdb_objs in host_segs_map.items(): |
| dbs = [] |
| for db in gpdb_objs: |
| role = db.getSegmentRole() |
| if role == 'p' and includePrimaries: |
| dbs.append(db) |
| elif role != 'p' and includeMirrors: |
| dbs.append(db) |
| |
| # If we have no dbs then we have no segments of the type primary |
| # or mirror. This will occur when you have an entire host fail |
| # when using group mirroring. This is because all the mirror segs |
| # on the alive host will be marked primary (or vice-versa) |
| if len(dbs) == 0: |
| continue |
| |
| logger.debug("Dispatching command to shutdown %d segments on host: %s" % (len(dbs), hostname)) |
| cmd = GpSegStopCmd("remote segment starts on host '%s'" % hostname, self.gphome, self.gpversion, |
| mode=self.mode, dbs=dbs, timeout=self.timeout, |
| verbose=logging_is_verbose(), ctxt=base.REMOTE, remoteHost=hostname, |
| logfileDirectory=self.logfileDirectory) |
| self.pool.addCommand(cmd) |
| |
| print_progress(self.pool) |
| |
| ###### |
| def _process_segment_stop(self, failed_seg_status): |
| '''reviews results of gpsegstop commands ''' |
| success_seg_status = [] |
| seg_timed_out = False |
| cmds = self.pool.getCompletedItems() |
| |
| for cmd in cmds: |
| if cmd.get_results().rc == 0 or cmd.get_results().rc == 1: |
| cmdout = cmd.get_results().stdout |
| lines = cmdout.split('\n') |
| for line in lines: |
| if line.startswith("STATUS"): |
| reasonStr, started, dir = parseStatusLine(line, isStop=True)[1:] |
| |
| if started.lower() == 'false': |
| success = False |
| else: |
| success = True |
| |
| for db in cmd.dblist: |
| if db.datadir == dir: |
| if success: |
| success_seg_status.append( |
| SegStopStatus(db, stopped=True, reason=reasonStr, failedCmd=cmd)) |
| else: |
| # dbs that are marked invalid are 'skipped' but we dispatch to them |
| # anyway since we want to try and shutdown any runaway pg processes. |
| failed_seg_status.append( |
| SegStopStatus(db, stopped=False, reason=reasonStr, failedCmd=cmd)) |
| |
| elif line.strip().startswith('stderr: pg_ctl: server does not shut down'): |
| # We are assuming that we know what segment failed beforehand. |
| if failed_seg_status: |
| failed_seg_status[-1].timedOut = True |
| else: |
| logger.debug("No failed segments to time out") |
| else: |
| for db in cmd.dblist: |
| # dbs that are marked invalid are 'skipped' but we dispatch to them |
| # anyway since we want to try and shutdown any runaway pg processes. |
| if db.valid: |
| failed_seg_status.append( |
| SegStopStatus(db, stopped=False, reason=cmd.get_results(), failedCmd=cmd)) |
| |
| self.pool.empty_completed_items() |
| return success_seg_status |
| |
| |
| ###### |
| |
| def _print_segment_stop(self, segs, failed_seg_status, success_seg_status): |
| stopped = len(segs) - len(failed_seg_status) |
| failed = len([x for x in failed_seg_status if x.db.valid]) |
| invalid = self.gparray.get_invalid_segdbs() |
| total_segs = len(self.gparray.getSegDbList()) |
| timed_out = len([x for x in failed_seg_status if x.timedOut]) |
| |
| if failed > 0 or logging_is_verbose(): |
| logger.info("------------------------------------------------") |
| if logging_is_verbose(): |
| logger.info("Segment Stop Information") |
| else: |
| logger.info("Failed Segment Stop Information ") |
| |
| logger.info("------------------------------------------------") |
| if failed > 0: |
| for failure in failed_seg_status: |
| logger.info(failure) |
| if logging_is_verbose(): |
| for stat in success_seg_status: |
| logger.debug(stat) |
| |
| tabLog = TableLogger(logger=logger).setWarnWithArrows(True) |
| tabLog.addSeparator() |
| tabLog.info(["Segments stopped successfully", "= %d" % stopped]) |
| tabLog.infoOrWarn(failed > 0, ["Segments with errors during stop", "= %d" % failed]) |
| if invalid: |
| tabLog.info([]) |
| tabLog.warn(["Segments that are currently marked down in configuration", "= %d" % len(invalid)]) |
| tabLog.info([" (stop was still attempted on these segments)"]) |
| tabLog.addSeparator() |
| |
| tabLog.outputTable() |
| |
| flag = "" if failed == 0 else "<<<<<<<<" |
| logger.info("Successfully shutdown %d of %d segment instances %s" % (stopped, total_segs, flag)) |
| |
| if failed > 0: |
| self.hadFailures = True |
| logger.warning("------------------------------------------------") |
| logger.warning("Segment instance shutdown failures reported") |
| logger.warning("Failed to shutdown %d of %d segment instances <<<<<" % (failed, total_segs)) |
| if timed_out > 0: |
| logger.warning("%d segments did not complete their shutdown in the allowed" % timed_out) |
| logger.warning("timeout of %d seconds. These segments are still in the process" % self.timeout) |
| logger.warning("of shutting down. You will not be able to restart the database") |
| logger.warning("until all processes have terminated.") |
| logger.warning("A total of %d errors were encountered" % failed) |
| logger.warning("Review logfile %s" % get_logfile()) |
| logger.warning("For more details on segment shutdown failure(s)") |
| logger.warning("------------------------------------------------") |
| else: |
| self.hadFailures = False |
| logger.info("Database successfully shutdown with no errors reported") |
| pass |
| |
| ###### |
| def _sighup_cluster(self): |
| """ assumes prepare() has been called """ |
| workers = min(len(self.gparray.get_hostlist()), self.parallel) |
| |
| class SighupWorkerPool(base.WorkerPool): |
| """ |
| This pool knows all the commands are calls to pg_ctl. |
| The failed list collects segments without a running postmaster. |
| """ |
| |
| def __init__(self, numWorkers): |
| base.WorkerPool.__init__(self, numWorkers) |
| self.failed = [] |
| |
| def check_results(self): |
| while not self.completed_queue.empty(): |
| item = self.completed_queue.get(False) |
| results = item.get_results() |
| if results.wasSuccessful(): |
| continue |
| self.failed.append(item.db) |
| |
| self.pool = SighupWorkerPool(numWorkers=workers) |
| dbList = self.gparray.getDbList() |
| hostname = socket.gethostname() |
| logger.info("Signalling all postmaster processes to reload") |
| for db in dbList: |
| ctxt = REMOTE |
| remote_host = db.getSegmentHostName() |
| if db.getSegmentHostName() == hostname: |
| ctxt = LOCAL |
| remote_host = None |
| cmd = pg.ReloadDbConf(name="reload segment number " + str(db.getSegmentDbId()), |
| db=db, |
| ctxt=ctxt, |
| remoteHost=remote_host |
| ) |
| self.pool.addCommand(cmd) |
| |
| if self.quiet: |
| self.pool.join() |
| else: |
| base.join_and_indicate_progress(self.pool) |
| |
| self.pool.check_results() |
| self.pool.empty_completed_items() |
| |
| if len(self.pool.failed) < 1: |
| return 0 |
| |
| logger.info("--------------------------------------------") |
| logger.info("Some segment postmasters were not reloaded") |
| logger.info("--------------------------------------------") |
| tabLog = TableLogger().setWarnWithArrows(True) |
| tabLog.info(["Host", "Datadir", "Port", "Status"]) |
| for db in self.pool.failed: |
| tup = [db.getSegmentHostName(), db.getSegmentDataDirectory(), str(db.getSegmentPort()), |
| db.getSegmentStatus()] |
| tabLog.info(tup) |
| tabLog.outputTable() |
| logger.info("--------------------------------------------") |
| return 1 |
| |
| ###### |
| def _summarize_actions(self, segs): |
| logger.info("--------------------------------------------") |
| logger.info("Coordinator instance parameters") |
| logger.info("--------------------------------------------") |
| |
| tabLog = TableLogger(logger=logger).setWarnWithArrows(True) |
| tabLog.info(["Coordinator Cloudberry instance process active PID", "= %s" % self.pid]) |
| tabLog.info(["Database", "= %s" % self.dburl.pgdb]) |
| tabLog.info(["Coordinator port", "= %s" % self.port]) |
| tabLog.info(["Coordinator directory", "= %s" % self.coordinator_datadir]) |
| tabLog.info(["Shutdown mode", "= %s" % self.mode]) |
| tabLog.info(["Timeout", "= %s" % self.timeout]) |
| |
| standbyMsg = "On" if self.gparray.standbyCoordinator and self.stopstandby else "Off" |
| tabLog.info(["Shutdown Coordinator standby host", "= %s" % standbyMsg]) |
| |
| tabLog.outputTable() |
| |
| logger.info("--------------------------------------------") |
| logger.info("Segment instances that will be shutdown:") |
| logger.info("--------------------------------------------") |
| |
| tabLog = TableLogger(logger=logger).setWarnWithArrows(True) |
| tabLog.info(["Host", "Datadir", "Port", "Status"]) |
| |
| for db in segs: |
| tabLog.info([db.getSegmentHostName(), db.getSegmentDataDirectory(), |
| str(db.getSegmentPort()), db.getSegmentStatus()]) |
| tabLog.outputTable() |
| |
| # ----------------------- Command line option parser ---------------------- |
| @staticmethod |
| def createParser(): |
| parser = OptParser(option_class=OptChecker, |
| description="Stops a CBDB Array.", |
| version='%prog version $Revision$') |
| parser.setHelp([]) |
| |
| addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption=True, includeUsageOption=True) |
| |
| addTo = OptionGroup(parser, 'Connection options') |
| parser.add_option_group(addTo) |
| addCoordinatorDirectoryOptionForSingleClusterProgram(addTo) |
| |
| addTo = OptionGroup(parser, 'Instance shutdown options: ') |
| parser.add_option_group(addTo) |
| addTo.add_option('-f', '--fast', action='store_true', default=False, |
| help="<deprecated> Fast shutdown, active transactions interrupted and rolled back") |
| addTo.add_option('-i', '--immediate', action='store_true', default=False, |
| help="<deprecated> Immediate shutdown, active transaction aborted.") |
| addTo.add_option('-s', '--smart', action='store_true', |
| help="<deprecated> Smart shutdown, wait for active transaction to complete. [default]") |
| addTo.add_option('-z', '--force', action='store_true', default=False, |
| help="<deprecated> Force shutdown of segment instances marked as invalid. Kill postmaster PID, " \ |
| "delete /tmp lock files and remove segment instance postmaster.pid file.") |
| addTo.add_option('-M', '--mode', type='choice', choices=['fast', 'immediate', 'smart'], |
| metavar='fast|immediate|smart', action='store', default='smart', |
| help='set the method of shutdown') |
| |
| addTo.add_option('-r', '--restart', action='store_true', |
| help='Restart Apache Cloudberry instance after successful gpstop.') |
| addTo.add_option('-m', '--master_only', '-c', '--coordinator_only', dest='coordinator_only', action='store_true', |
| help='stop coordinator instance started in maintenance mode') |
| addTo.add_option('-y', dest="stop_standby", action='store_false', default=True, |
| help='Do not stop the standby coordinator process.') |
| addTo.add_option('-u', dest="request_sighup", action='store_true', |
| help="upload new coordinator postgresql.conf settings, does not stop Cloudberry array," \ |
| "issues a signal to the coordinator segment postmaster process to reload") |
| |
| addTo.add_option('-B', '--parallel', type="int", default=DEFAULT_NUM_WORKERS, metavar="<parallel_processes>", |
| help='number of segment hosts to run in parallel. Default is %d' % DEFAULT_NUM_WORKERS) |
| addTo.add_option('-t', '--timeout', dest='timeout', default=SEGMENT_STOP_TIMEOUT_DEFAULT, type='int', |
| help='time to wait for segment stop (in seconds)') |
| addTo.add_option('--host', dest='only_this_host', type='string', |
| help='stop all segments on this host (this will only complete if failover segments are available). ' |
| 'hostname as displayed in gp_segment_configuration') |
| addTo.add_option('--skipvalidation', action='store_true', default=False, |
| help="<development testing only> DO NOT USE") |
| addTo.add_option('-F', dest='fts_hosts', type='string',default=None , |
| help='specify the file that contains all fts hosts.If this argument is set, `gpstop` will attempt' |
| 'to stop all fts in the specified hosts, if not, `gpstop` will stop all fts spectified in the' |
| '$COORDINATOR_DATA_DIRECTORY/config/fts_host') |
| addTo.add_option('-E', dest='etcd_hosts', type='string',default=None , |
| help='specify the file that contains all etcd hosts.If this argument is set, `gpstop` will attempt' |
| 'to stop all etcd in the specified hosts') |
| parser.set_defaults(verbose=False, filters=[], slice=(None, None)) |
| return parser |
| |
| @staticmethod |
| def createProgram(options, args): |
| logfileDirectory = options.ensure_value("logfileDirectory", False) |
| external_fts = is_external_fts() |
| |
| if external_fts and (options.fts_hosts or options.etcd_hosts): |
| ProgramArgumentValidationException("internal fts not suopport -F and -E") |
| if options.mode != 'smart': |
| if options.fast or options.immediate: |
| raise ProgramArgumentValidationException("Can not mix --mode options with older deprecated '-f,-i,-s'") |
| |
| if options.coordinator_only and options.only_this_host: |
| raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with stopping " |
| "coordinator-only.") |
| |
| if options.restart and options.only_this_host: |
| raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-r' for " |
| "restart.") |
| |
| if options.request_sighup and options.only_this_host: |
| raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-u' for " |
| "config reload.") |
| |
| if (not options.stop_standby) and options.only_this_host: |
| raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-y' for " |
| "skipping standby.") |
| |
| if options.fast: |
| options.mode = "fast" |
| if options.immediate: |
| options.mode = "immediate" |
| if options.smart: |
| options.mode = "smart" |
| |
| # deprecating force option. it no longer kills -9 things. |
| # just make it stop fast instead. |
| if options.force: |
| options.mode = "fast" |
| |
| proccount = os.environ.get('GP_MGMT_PROCESS_COUNT') |
| if options.parallel == 64 and proccount is not None: |
| options.parallel = int(proccount) |
| |
| # -n sanity check |
| if options.parallel > 128 or options.parallel < 1: |
| raise ProgramArgumentValidationException("Invalid value for parallel degree: %s" % options.parallel) |
| |
| # Don't allow them to go below default |
| if options.timeout < SEGMENT_STOP_TIMEOUT_DEFAULT and not options.skipvalidation: |
| raise ProgramArgumentValidationException( |
| "Invalid timeout value. Must be greater than %s seconds." % SEGMENT_STOP_TIMEOUT_DEFAULT) |
| |
| if is_external_fts: |
| if options.fts_hosts is None: |
| coordinator_data_directory = gp.get_coordinatordatadir() |
| |
| options.fts_hosts = coordinator_data_directory + '/config' + '/fts_host' |
| |
| if args: |
| raise ProgramArgumentValidationException( |
| "Argument %s is invalid. Is an option missing a parameter?" % args[-1]) |
| |
| return GpStop(options.mode, |
| coordinator_datadir=options.coordinatorDataDirectory, |
| parallel=options.parallel, |
| quiet=options.quiet, |
| coordinatoronly=options.coordinator_only, |
| sighup=options.request_sighup, |
| interactive=options.interactive, |
| stopstandby=options.stop_standby, |
| restart=options.restart, |
| timeout=options.timeout, |
| logfileDirectory=logfileDirectory, |
| onlyThisHost=options.only_this_host, |
| fts_hosts=options.fts_hosts, |
| etcd_hosts=options.etcd_hosts, |
| is_external_fts=external_fts) |
| |
| if __name__ == '__main__': |
| simple_main(GpStop.createParser, GpStop.createProgram) |