blob: ab7655597c91be416512993867125755f3de8a62 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
#
# THIS IMPORT MUST COME FIRST
#
# import mainUtils FIRST to get python version check
from gppylib.mainUtils import *
import os, sys
import signal
import time
import pprint
from optparse import Option, OptionGroup, OptionParser, OptionValueError, SUPPRESS_USAGE
import socket
from gppylib.mainUtils import parseStatusLine
try:
from gppylib.gpparseopts import OptParser, OptChecker
from gppylib.gplog import *
from gppylib.db import dbconn
from gppylib.db import catalog
from gppylib.gparray import *
from gppylib import userinput
from gppylib import pgconf
from gppylib.commands import unix
from gppylib.commands import gp
from gppylib.commands.gp import SEGMENT_STOP_TIMEOUT_DEFAULT, SegmentStop, GpSegStopCmd
from gppylib.commands import base
from gppylib.commands import pg
from gppylib.commands import dca
from gppylib.utils import TableLogger
from gppylib.gp_era import GpEraFile
from gppylib.operations.utils import ParallelOperation, RemoteOperation
from gppylib.operations.rebalanceSegments import ReconfigDetectionSQLQueryCommand
from gppylib.operations.detect_unreachable_hosts import get_unreachable_segment_hosts
except ImportError as e:
sys.exit('ERROR: Cannot import modules. Please check that you have sourced cloudberry-env.sh. Detail: ' + str(e))
DEFAULT_NUM_WORKERS = 64
logger = get_default_logger()
PG_CTL_STATUS_RUNNING = 0
PG_CTL_STATUS_STOPPED = 3
PG_CTL_STATUS_SLEEP_INTERVAL = 1.0 # number of seconds to sleep before issuing next pg_ctl status
# ---------------------------------------------------------------
class SegStopStatus:
"""Tracks result of trying to stop an individual segment database"""
def __init__(self, db, stopped=False, reason=None, failedCmd=None, timedOut=False):
self.db = db
self.stopped = stopped
self.reason = reason
self.failedCmd = failedCmd
self.timedOut = timedOut
def __str__(self):
if self.stopped:
return "DBID:%d STOPPED" % self.db.dbid
else:
return "DBID:%d FAILED host:'%s' datadir:'%s' with reason:'%s'" % (
self.db.dbid, self.db.hostname, self.db.datadir, self.reason)
def print_progress(pool, interval=10):
"""
Waits for a WorkerPool to complete, printing a progress percentage marker
once at the beginning of the call, and thereafter at the provided interval
(default ten seconds). A final 100% marker is printed upon completion.
"""
def print_completed_percentage():
# pool.completed can change asynchronously; save its value.
completed = pool.completed
pct = 0
if pool.assigned:
pct = float(completed) / pool.assigned
pool.logger.info('%0.2f%% of jobs completed' % (pct * 100))
return completed >= pool.assigned
# print_completed_percentage() returns True if we're done.
while not print_completed_percentage():
pool.join(interval)
# ---------------------------------------------------------------
class GpStop:
######
def __init__(self, mode, coordinator_datadir=None,
parallel=DEFAULT_NUM_WORKERS, quiet=False, coordinatoronly=False, sighup=False,
interactive=False, stopstandby=False, restart=False,
timeout=SEGMENT_STOP_TIMEOUT_DEFAULT, logfileDirectory=False, onlyThisHost=None,
fts_hosts=None, etcd_hosts=None, is_external_fts=False):
self.mode = mode
self.coordinator_datadir = coordinator_datadir
self.pool = None
self.parallel = parallel
self.quiet = quiet
self.pid = 0
self.coordinatoronly = coordinatoronly
self.sighup = sighup
self.interactive = interactive
self.stopstandby = stopstandby
self.restart = restart
self.hadFailures = False
self.timeout = timeout
self.logfileDirectory = logfileDirectory
self.onlyThisHost = onlyThisHost
self.fts_hosts = fts_hosts
self.etcd_hosts = etcd_hosts
# some variables that will be assigned during run()
self.gphome = None
self.port = None
self.dburl = None
self.conn = None
self.gparray = None
self.gpversion = None
self.is_external_fts=is_external_fts
logger.debug("Setting level of parallelism to: %d" % self.parallel)
pass
#####
def _find_any_primary_and_mirror_on_same_host(self, segs):
map_content_id_to_segment = {}
for seg in segs:
seg_content_id = seg.getSegmentContentId()
if seg_content_id in map_content_id_to_segment:
if seg.isSegmentPrimary(True):
return seg, map_content_id_to_segment[seg_content_id]
else:
return map_content_id_to_segment[seg_content_id], seg
else:
map_content_id_to_segment[seg_content_id] = seg
return None, None
def _filter_segments_for_single_host_stop(self):
host_to_segments_mapping = self.gparray.getSegmentsByHostName(self.gparray.getSegDbList())
segments_on_host = host_to_segments_mapping[self.onlyThisHost]
up_segments_on_host = [seg for seg in segments_on_host if seg.isSegmentUp()]
matching_primary, matching_mirror = self._find_any_primary_and_mirror_on_same_host(up_segments_on_host)
if matching_primary or matching_mirror:
raise Exception("Segment host '%s' has both of corresponding primary "
"'%s' and mirror '%s'. Aborting." % (matching_primary.getSegmentHostName(),
matching_primary, matching_mirror))
for seg in up_segments_on_host:
if seg.isSegmentModeSynchronized():
continue
raise Exception("Segment '%s' not synchronized. Aborting." % seg)
return up_segments_on_host
def run(self):
"""
Run and return the exitCode for the program
"""
self._prepare()
if self.gparray.get_segment_count() == 0:
self.coordinatoronly = True
if self.coordinatoronly:
if self.sighup:
return self._sighup_cluster()
if self.interactive:
if not userinput.ask_yesno(None, "\nContinue with coordinator-only shutdown", 'N'):
raise UserAbortedException()
try:
# Disable Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_IGN)
self._stop_coordinator()
finally:
# Reenable Ctrl-C
self.cleanup()
signal.signal(signal.SIGINT, signal.default_int_handler)
if self.is_external_fts:
self._clean_fts()
if self.etcd_hosts is not None:
self._clean_etcd()
if self.restart:
logger.info("Restarting System...")
gp.NewGpStart.local('restarting system', verbose=logging_is_verbose(), coordinatorOnly=self.coordinatoronly,
nostandby=not self.stopstandby, coordinatorDirectory=self.coordinator_datadir)
else:
if self.sighup:
return self._sighup_cluster()
else:
segs = []
if self.onlyThisHost:
if self.onlyThisHost in self.gparray.get_coordinator_host_names():
raise Exception("Specified host '%s' has the coordinator or standby coordinator on it. This node can only be stopped as part of a full-cluster gpstop, without '--host'." %
self.onlyThisHost)
if not self.gparray.hasMirrors:
raise Exception("Cannot perform host-specific gpstop on a cluster without segment mirroring.")
segs = self._filter_segments_for_single_host_stop()
else:
segs = self.gparray.getSegDbList()
if self.interactive:
self._summarize_actions(segs)
if not userinput.ask_yesno(None, "\nContinue with Cloudberry instance shutdown", 'N'):
raise UserAbortedException()
try:
# Disable Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_IGN)
if self.onlyThisHost is None:
self._stop_coordinator()
self._stop_standby()
self._stop_segments(segs)
finally:
# Reenable Ctrl-C
self.cleanup()
signal.signal(signal.SIGINT, signal.default_int_handler)
if self.onlyThisHost:
logger.info("Recognizing new cluster state...")
try:
# currently responsible for triggering an update to gp_segment_configuration
# because dbconn.connect() internally calls commit()
self.conn = dbconn.connect(self.dburl)
# backup in case connect() does not do a commit
ReconfigDetectionSQLQueryCommand(self.conn).run()
except Exception as e:
logger.debug('query trying to start a transaction failed: %s' % str(e))
logger.debug('expected: the purpose was that by attempting a transaction, gp_segment_configuration would be updated')
finally:
if self.conn:
self.conn.close()
if self.is_external_fts:
self._clean_fts()
if self.etcd_hosts is not None:
self._clean_etcd()
if self.restart:
logger.info("Restarting System...")
gp.NewGpStart.local('restarting system', verbose=logging_is_verbose(),
nostandby=not self.stopstandby, coordinatorDirectory=self.coordinator_datadir)
else:
if dca.is_dca_appliance():
logger.info("Unregistering with DCA")
dca.DcaGpdbStopped.local()
logger.info("Unregistered with DCA")
if self.hadFailures:
# MPP-15208
return 2
return 0
def _clean_fts(self):
if not os.path.exists(self.fts_hosts):
raise gp.GpError("FTS host file %s does not exist." % self.fts_hosts)
fts_host_machine_list = read_hosts(self.fts_hosts)
logger.info("Start to kill FTS processes...")
for fts in fts_host_machine_list:
kill_fts(fts)
time.sleep(5)
for fts in fts_host_machine_list:
if check_fts(fts):
raise gp.GpError("FTS process on %s is still alive" % fts)
logger.info("kill all FTS processes successfully.")
def _clean_etcd(self):
if not os.path.exists(self.etcd_hosts):
raise gp.GpError("ETCD host file %s does not exist." % self.etcd_hosts)
etcd_host_machine_list = read_hosts(self.etcd_hosts)
logger.info("Start to kill ETCD processes...")
for etcd in etcd_host_machine_list:
kill_etcd(etcd)
time.sleep(60)
for etcd in etcd_host_machine_list:
if check_etcd(etcd):
raise gp.GpError("ETCD process on %s is still alive" % etcd)
logger.info("kill all ETCD processes successfully.")
######
def cleanup(self):
if self.pool:
self.pool.haltWork()
######
def _prepare(self):
logger.info("Gathering information and validating the environment...")
self.gphome = gp.get_gphome()
if self.coordinator_datadir is None:
self.coordinator_datadir = gp.get_coordinatordatadir()
self.user = unix.getUserName()
gp.check_permissions(self.user)
self._read_postgresqlconf()
self._check_db_running()
self._build_gparray()
if self.onlyThisHost:
self._is_hostname_valid()
self._check_version()
######
def _is_hostname_valid(self):
segments = self.gparray.getSegmentsByHostName( self.gparray.getDbList())
host_names = list(segments.keys())
if self.onlyThisHost not in host_names:
logger.error("host '%s' is not found in gp_segment_configuration" % self.onlyThisHost)
logger.error("hosts in cluster config: %s" % host_names)
raise SystemExit(1)
######
def _check_version(self):
self.gpversion = gp.GpVersion.local('local Cloudberry version check', self.gphome)
logger.info("Cloudberry Version: '%s'" % self.gpversion)
######
def _read_postgresqlconf(self):
logger.debug("Obtaining coordinator's port from coordinator data directory")
pgconf_dict = pgconf.readfile(self.coordinator_datadir + "/postgresql.conf")
self.port = pgconf_dict.int('port')
logger.debug("Read from postgresql.conf port=%s" % self.port)
######
def _check_db_running(self):
if os.path.exists(self.coordinator_datadir + '/postmaster.pid'):
self.pid = gp.read_postmaster_pidfile(self.coordinator_datadir)
if not unix.check_pid(self.pid):
logger.warning("Have a postmaster.pid file but no Coordinator segment process running")
logger.info("Clearing postmaster.pid file and /tmp lock files")
lockfile = "/tmp/.s.PGSQL.%s" % self.port
logger.info("Clearing Coordinator instance lock files")
os.remove(lockfile)
logger.info("Clearing Coordinator instance pid file")
os.remove("%s/postmaster.pid" % self.coordinator_datadir)
logger.info("Setting recovery parameters")
self.mode = 'fast'
logger.info("Commencing forced shutdown")
pass
else:
raise ExceptionNoStackTraceNeeded(
'postmaster.pid file does not exist. is Cloudberry instance already stopped?')
######
def _build_gparray(self):
logger.info("Obtaining Cloudberry Coordinator catalog information")
logger.info("Obtaining Segment details from coordinator...")
self.dburl = dbconn.DbURL(port=self.port, dbname='template1')
self.gparray = GpArray.initFromCatalog(self.dburl, utility=True)
class _SigIntHandler(object):
def __init__(self):
self.interrupted = False
def _handler(self, signum, stack_frame):
self.interrupted = True
def enable(self):
self.interrupted = False
signal.signal(signal.SIGINT, self._handler)
def disable(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
def __enter__(self):
self._prev_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
# Try to prevent easy mistakes by ensuring that SIGINT was
# ignored before we entered the manager.
if self._prev_handler != signal.SIG_IGN:
signal.signal(signal.SIGINT, self._prev_handler)
raise Exception("SIGINT disposition must be SIG_IGN before entering SigIntHandler")
return self
def __exit__(self, exc_type, exc_value, traceback):
# Put back the previous handler.
signal.signal(signal.SIGINT, self._prev_handler)
return False
def _is_coordinator_stopped(self):
with open(os.devnull, 'w') as devnull:
ret = subprocess.call(['pg_ctl', 'status', '-D', self.coordinator_datadir],
stdout=devnull)
if ret == PG_CTL_STATUS_STOPPED:
return True
if ret != PG_CTL_STATUS_RUNNING:
raise Exception('pg_ctl status failed with return code %d' % ret)
return False
# Send a no-wait stop to the coordinator, and then poll for it to stop. If the user gets impatient or
# the operation times out, show the connections and allow the user to upgrade the stop to fast or
# immediate mode.
def _stop_coordinator_smart(self):
logger.info("Stopping coordinator segment and waiting for user connections to finish ...")
self.conn = dbconn.connect(self.dburl, utility=True)
user_connections_header, user_connections = catalog.getUserConnectionInfo(self.conn)
self.conn.close()
ret = subprocess.call(['pg_ctl', 'stop', '-W', '-m', 'smart', '-D', self.coordinator_datadir])
if ret:
raise Exception('pg_ctl stop smart failed')
time_waited = 0
wait_forever = False
with GpStop._SigIntHandler() as handler:
while not self._is_coordinator_stopped():
# time.sleep() can be safely interrupted (https://docs.python.org/2/library/time.html#time.sleep)
handler.enable()
time.sleep(PG_CTL_STATUS_SLEEP_INTERVAL)
time_waited = time_waited + PG_CTL_STATUS_SLEEP_INTERVAL
handler.disable()
if handler.interrupted or (time_waited >= self.timeout and not wait_forever):
handler.interrupted = False
warn_text = """Smart mode failed to shutdown the coordinator.\n"""
if len(user_connections):
warn_text += """ There were {} user connections at the start of the shutdown. Note: These connections may be outdated.
The following connections were found before shutting down:\n""".format(len(user_connections))
for user_conn in user_connections:
warn_text += " %s" % pprint.pformat(user_conn)
logger.warn(warn_text)
logger.info("Continue waiting in smart mode, stop in fast mode, or stop in immediate mode.")
mode_selection = userinput.ask_string("Your_choice",
"['(s)mart_mode', '(f)ast_mode', '(i)mmediate_mode']",
's',
['s', 'f', 'i'])
logger.info("Your choice was '%s'" % mode_selection)
if mode_selection == 'f':
self.mode = 'fast'
return False
elif mode_selection == 'i':
self.mode = 'immediate'
return False
else:
wait_forever = True
logger.info("Continuing waiting for user connections to finish ...")
######
# NOTE: we must make sure the coordinator is down before leaving this routine
def _stop_coordinator(self, coordinatorOnly=False):
''' shutsdown the coordinator '''
logger.info("Commencing Coordinator instance shutdown with mode='%s'" % self.mode)
logger.info("Coordinator segment instance directory=%s" % self.coordinator_datadir)
e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose())
e.end_era()
# Get a list of postgres processes running before stopping the server
postgres_pids = gp.get_postgres_segment_processes(self.coordinator_datadir, self.gparray.coordinator.hostname)
logger.debug("Postgres processes running on coordinator host: {0}".format(postgres_pids))
try:
if self.mode == 'smart':
self._stop_coordinator_smart()
# NOTE: _stop_coordinator_smart() can change self.mode
if self.mode != 'smart':
cmd = gp.CoordinatorStop("stopping coordinator", self.coordinator_datadir, mode=self.mode, timeout=self.timeout)
cmd.run(validateAfter=True)
except:
# Didn't stop in timeout or pg_ctl failed. So try kill
(succeeded, mypid, file_datadir) = pg.ReadPostmasterTempFile.local("Read coordinator tmp file",
self.dburl.pgport).getResults()
if succeeded and file_datadir == self.coordinator_datadir:
if unix.check_pid(mypid):
logger.info("Failed to shutdown coordinator with pg_ctl.")
logger.info("Sending SIGQUIT signal...")
os.kill(mypid, signal.SIGQUIT)
time.sleep(5)
# Still not gone... try SIGABRT
if unix.check_pid(mypid):
logger.info("Sending SIGABRT signal...")
os.kill(mypid, signal.SIGABRT)
time.sleep(5)
if not unix.check_pid(mypid):
# Clean up files
lockfile = "/tmp/.s.PGSQL.%s" % self.dburl.pgport
if os.path.exists(lockfile):
logger.info("Clearing segment instance lock files")
os.remove(lockfile)
# Use the process list and make sure that all the processes are killed at the end
logger.info('Attempting forceful termination of any leftover coordinator process')
unix.kill_9_segment_processes(self.coordinator_datadir, postgres_pids, self.gparray.coordinator.hostname)
logger.debug("Successfully shutdown the Coordinator instance in admin mode")
######
def _stop_standby(self):
""" assumes prepare() has been called """
if not self.stopstandby:
return True
if self.gparray.standbyCoordinator:
standby = self.gparray.standbyCoordinator
if get_unreachable_segment_hosts([standby.hostname], 1):
logger.warning("Standby is unreachable, skipping shutdown on standby")
return True
# Get a list of postgres processes running before stopping the server
postgres_pids = gp.get_postgres_segment_processes(standby.datadir, standby.hostname)
logger.debug("Postgres processes running on standby host: {0}".format(postgres_pids))
logger.info("Stopping coordinator standby host %s mode=%s" % (standby.hostname, self.mode))
try:
cmd = SegmentStop("stopping coordinator standby",
standby.datadir, mode=self.mode,
timeout=self.timeout,
ctxt=base.REMOTE,
remoteHost=standby.hostname)
cmd.run(validateAfter=True)
except base.ExecutionError as e:
logger.warning("Error occured while stopping the standby coordinator: %s" % e)
if not pg.DbStatus.remote('checking status of standby coordinator instance', standby, standby.hostname):
logger.info("Successfully shutdown standby process on %s" % standby.hostname)
return True
else:
logger.warning("Process coordinator standby still running, will issue fast shutdown with immediate")
try:
cmd = SegmentStop("stopping coordinator standby", standby.datadir, mode='immediate',
timeout=self.timeout,
ctxt=base.REMOTE, remoteHost=standby.hostname)
cmd.run(validateAfter=True)
except base.ExecutionError as e:
logger.warning("Error occured while stopping the standby coordinator: %s" % e)
if not pg.DbStatus.remote('checking status of standby coordinator instance', standby, standby.hostname):
logger.info("Successfully shutdown coordinator standby process on %s" % standby.hostname)
return True
else:
logger.error('Failed to stop standby. Attempting forceful termination of standby process')
# Use the process list and make sure that all the processes are killed at the end
unix.kill_9_segment_processes(standby.datadir, postgres_pids, standby.hostname)
if not pg.DbStatus.remote('checking status of standby coordinator instance', standby, standby.hostname):
logger.info("Successfully shutdown coordinator standby process on %s" % standby.hostname)
return True
else:
logger.error("Unable to stop coordinator standby on host: %s" % standby.hostname)
return False
else:
logger.info("No standby coordinator host configured")
return True
######
def _stop_segments(self, segs):
failed_seg_status = []
workers = min(len(self.gparray.get_hostlist()), self.parallel)
self.pool = base.WorkerPool(numWorkers=workers, logger=logger)
logger.info("Targeting dbid %s for shutdown" % [seg.getSegmentDbId() for seg in segs])
if self.gparray.hasMirrors:
# stop primaries
logger.info("Commencing parallel primary segment instance shutdown, please wait...")
try:
self._stopseg_cmds(True, False, segs=segs)
finally:
self.pool.join()
primary_success_seg_status = self._process_segment_stop(failed_seg_status)
# stop mirrors
logger.info("Commencing parallel mirror segment instance shutdown, please wait...")
try:
self._stopseg_cmds(False, True, segs=segs)
finally:
self.pool.join()
mirror_success_seg_status = self._process_segment_stop(failed_seg_status)
success_seg_status = primary_success_seg_status + mirror_success_seg_status
self._print_segment_stop(segs, failed_seg_status, success_seg_status)
else:
logger.info("Commencing parallel segment instance shutdown, please wait...")
# There are no active-mirrors
try:
self._stopseg_cmds(True, False, segs=segs)
finally:
self.pool.join()
success_seg_status = self._process_segment_stop(failed_seg_status)
self._print_segment_stop(segs, failed_seg_status, success_seg_status)
pass
######
def _stopseg_cmds(self, includePrimaries, includeMirrors, segs):
host_segs_map = {}
for seg in segs:
if seg.getSegmentHostName() in list(host_segs_map.keys()):
host_segs_map[seg.getSegmentHostName()].append(seg)
else:
host_segs_map[seg.getSegmentHostName()] = [seg]
for hostname, gpdb_objs in host_segs_map.items():
dbs = []
for db in gpdb_objs:
role = db.getSegmentRole()
if role == 'p' and includePrimaries:
dbs.append(db)
elif role != 'p' and includeMirrors:
dbs.append(db)
# If we have no dbs then we have no segments of the type primary
# or mirror. This will occur when you have an entire host fail
# when using group mirroring. This is because all the mirror segs
# on the alive host will be marked primary (or vice-versa)
if len(dbs) == 0:
continue
logger.debug("Dispatching command to shutdown %d segments on host: %s" % (len(dbs), hostname))
cmd = GpSegStopCmd("remote segment starts on host '%s'" % hostname, self.gphome, self.gpversion,
mode=self.mode, dbs=dbs, timeout=self.timeout,
verbose=logging_is_verbose(), ctxt=base.REMOTE, remoteHost=hostname,
logfileDirectory=self.logfileDirectory)
self.pool.addCommand(cmd)
print_progress(self.pool)
######
def _process_segment_stop(self, failed_seg_status):
'''reviews results of gpsegstop commands '''
success_seg_status = []
seg_timed_out = False
cmds = self.pool.getCompletedItems()
for cmd in cmds:
if cmd.get_results().rc == 0 or cmd.get_results().rc == 1:
cmdout = cmd.get_results().stdout
lines = cmdout.split('\n')
for line in lines:
if line.startswith("STATUS"):
reasonStr, started, dir = parseStatusLine(line, isStop=True)[1:]
if started.lower() == 'false':
success = False
else:
success = True
for db in cmd.dblist:
if db.datadir == dir:
if success:
success_seg_status.append(
SegStopStatus(db, stopped=True, reason=reasonStr, failedCmd=cmd))
else:
# dbs that are marked invalid are 'skipped' but we dispatch to them
# anyway since we want to try and shutdown any runaway pg processes.
failed_seg_status.append(
SegStopStatus(db, stopped=False, reason=reasonStr, failedCmd=cmd))
elif line.strip().startswith('stderr: pg_ctl: server does not shut down'):
# We are assuming that we know what segment failed beforehand.
if failed_seg_status:
failed_seg_status[-1].timedOut = True
else:
logger.debug("No failed segments to time out")
else:
for db in cmd.dblist:
# dbs that are marked invalid are 'skipped' but we dispatch to them
# anyway since we want to try and shutdown any runaway pg processes.
if db.valid:
failed_seg_status.append(
SegStopStatus(db, stopped=False, reason=cmd.get_results(), failedCmd=cmd))
self.pool.empty_completed_items()
return success_seg_status
######
def _print_segment_stop(self, segs, failed_seg_status, success_seg_status):
stopped = len(segs) - len(failed_seg_status)
failed = len([x for x in failed_seg_status if x.db.valid])
invalid = self.gparray.get_invalid_segdbs()
total_segs = len(self.gparray.getSegDbList())
timed_out = len([x for x in failed_seg_status if x.timedOut])
if failed > 0 or logging_is_verbose():
logger.info("------------------------------------------------")
if logging_is_verbose():
logger.info("Segment Stop Information")
else:
logger.info("Failed Segment Stop Information ")
logger.info("------------------------------------------------")
if failed > 0:
for failure in failed_seg_status:
logger.info(failure)
if logging_is_verbose():
for stat in success_seg_status:
logger.debug(stat)
tabLog = TableLogger(logger=logger).setWarnWithArrows(True)
tabLog.addSeparator()
tabLog.info(["Segments stopped successfully", "= %d" % stopped])
tabLog.infoOrWarn(failed > 0, ["Segments with errors during stop", "= %d" % failed])
if invalid:
tabLog.info([])
tabLog.warn(["Segments that are currently marked down in configuration", "= %d" % len(invalid)])
tabLog.info([" (stop was still attempted on these segments)"])
tabLog.addSeparator()
tabLog.outputTable()
flag = "" if failed == 0 else "<<<<<<<<"
logger.info("Successfully shutdown %d of %d segment instances %s" % (stopped, total_segs, flag))
if failed > 0:
self.hadFailures = True
logger.warning("------------------------------------------------")
logger.warning("Segment instance shutdown failures reported")
logger.warning("Failed to shutdown %d of %d segment instances <<<<<" % (failed, total_segs))
if timed_out > 0:
logger.warning("%d segments did not complete their shutdown in the allowed" % timed_out)
logger.warning("timeout of %d seconds. These segments are still in the process" % self.timeout)
logger.warning("of shutting down. You will not be able to restart the database")
logger.warning("until all processes have terminated.")
logger.warning("A total of %d errors were encountered" % failed)
logger.warning("Review logfile %s" % get_logfile())
logger.warning("For more details on segment shutdown failure(s)")
logger.warning("------------------------------------------------")
else:
self.hadFailures = False
logger.info("Database successfully shutdown with no errors reported")
pass
######
def _sighup_cluster(self):
""" assumes prepare() has been called """
workers = min(len(self.gparray.get_hostlist()), self.parallel)
class SighupWorkerPool(base.WorkerPool):
"""
This pool knows all the commands are calls to pg_ctl.
The failed list collects segments without a running postmaster.
"""
def __init__(self, numWorkers):
base.WorkerPool.__init__(self, numWorkers)
self.failed = []
def check_results(self):
while not self.completed_queue.empty():
item = self.completed_queue.get(False)
results = item.get_results()
if results.wasSuccessful():
continue
self.failed.append(item.db)
self.pool = SighupWorkerPool(numWorkers=workers)
dbList = self.gparray.getDbList()
hostname = socket.gethostname()
logger.info("Signalling all postmaster processes to reload")
for db in dbList:
ctxt = REMOTE
remote_host = db.getSegmentHostName()
if db.getSegmentHostName() == hostname:
ctxt = LOCAL
remote_host = None
cmd = pg.ReloadDbConf(name="reload segment number " + str(db.getSegmentDbId()),
db=db,
ctxt=ctxt,
remoteHost=remote_host
)
self.pool.addCommand(cmd)
if self.quiet:
self.pool.join()
else:
base.join_and_indicate_progress(self.pool)
self.pool.check_results()
self.pool.empty_completed_items()
if len(self.pool.failed) < 1:
return 0
logger.info("--------------------------------------------")
logger.info("Some segment postmasters were not reloaded")
logger.info("--------------------------------------------")
tabLog = TableLogger().setWarnWithArrows(True)
tabLog.info(["Host", "Datadir", "Port", "Status"])
for db in self.pool.failed:
tup = [db.getSegmentHostName(), db.getSegmentDataDirectory(), str(db.getSegmentPort()),
db.getSegmentStatus()]
tabLog.info(tup)
tabLog.outputTable()
logger.info("--------------------------------------------")
return 1
######
def _summarize_actions(self, segs):
logger.info("--------------------------------------------")
logger.info("Coordinator instance parameters")
logger.info("--------------------------------------------")
tabLog = TableLogger(logger=logger).setWarnWithArrows(True)
tabLog.info(["Coordinator Cloudberry instance process active PID", "= %s" % self.pid])
tabLog.info(["Database", "= %s" % self.dburl.pgdb])
tabLog.info(["Coordinator port", "= %s" % self.port])
tabLog.info(["Coordinator directory", "= %s" % self.coordinator_datadir])
tabLog.info(["Shutdown mode", "= %s" % self.mode])
tabLog.info(["Timeout", "= %s" % self.timeout])
standbyMsg = "On" if self.gparray.standbyCoordinator and self.stopstandby else "Off"
tabLog.info(["Shutdown Coordinator standby host", "= %s" % standbyMsg])
tabLog.outputTable()
logger.info("--------------------------------------------")
logger.info("Segment instances that will be shutdown:")
logger.info("--------------------------------------------")
tabLog = TableLogger(logger=logger).setWarnWithArrows(True)
tabLog.info(["Host", "Datadir", "Port", "Status"])
for db in segs:
tabLog.info([db.getSegmentHostName(), db.getSegmentDataDirectory(),
str(db.getSegmentPort()), db.getSegmentStatus()])
tabLog.outputTable()
# ----------------------- Command line option parser ----------------------
@staticmethod
def createParser():
parser = OptParser(option_class=OptChecker,
description="Stops a CBDB Array.",
version='%prog version $Revision$')
parser.setHelp([])
addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption=True, includeUsageOption=True)
addTo = OptionGroup(parser, 'Connection options')
parser.add_option_group(addTo)
addCoordinatorDirectoryOptionForSingleClusterProgram(addTo)
addTo = OptionGroup(parser, 'Instance shutdown options: ')
parser.add_option_group(addTo)
addTo.add_option('-f', '--fast', action='store_true', default=False,
help="<deprecated> Fast shutdown, active transactions interrupted and rolled back")
addTo.add_option('-i', '--immediate', action='store_true', default=False,
help="<deprecated> Immediate shutdown, active transaction aborted.")
addTo.add_option('-s', '--smart', action='store_true',
help="<deprecated> Smart shutdown, wait for active transaction to complete. [default]")
addTo.add_option('-z', '--force', action='store_true', default=False,
help="<deprecated> Force shutdown of segment instances marked as invalid. Kill postmaster PID, " \
"delete /tmp lock files and remove segment instance postmaster.pid file.")
addTo.add_option('-M', '--mode', type='choice', choices=['fast', 'immediate', 'smart'],
metavar='fast|immediate|smart', action='store', default='smart',
help='set the method of shutdown')
addTo.add_option('-r', '--restart', action='store_true',
help='Restart Apache Cloudberry instance after successful gpstop.')
addTo.add_option('-m', '--master_only', '-c', '--coordinator_only', dest='coordinator_only', action='store_true',
help='stop coordinator instance started in maintenance mode')
addTo.add_option('-y', dest="stop_standby", action='store_false', default=True,
help='Do not stop the standby coordinator process.')
addTo.add_option('-u', dest="request_sighup", action='store_true',
help="upload new coordinator postgresql.conf settings, does not stop Cloudberry array," \
"issues a signal to the coordinator segment postmaster process to reload")
addTo.add_option('-B', '--parallel', type="int", default=DEFAULT_NUM_WORKERS, metavar="<parallel_processes>",
help='number of segment hosts to run in parallel. Default is %d' % DEFAULT_NUM_WORKERS)
addTo.add_option('-t', '--timeout', dest='timeout', default=SEGMENT_STOP_TIMEOUT_DEFAULT, type='int',
help='time to wait for segment stop (in seconds)')
addTo.add_option('--host', dest='only_this_host', type='string',
help='stop all segments on this host (this will only complete if failover segments are available). '
'hostname as displayed in gp_segment_configuration')
addTo.add_option('--skipvalidation', action='store_true', default=False,
help="<development testing only> DO NOT USE")
addTo.add_option('-F', dest='fts_hosts', type='string',default=None ,
help='specify the file that contains all fts hosts.If this argument is set, `gpstop` will attempt'
'to stop all fts in the specified hosts, if not, `gpstop` will stop all fts spectified in the'
'$COORDINATOR_DATA_DIRECTORY/config/fts_host')
addTo.add_option('-E', dest='etcd_hosts', type='string',default=None ,
help='specify the file that contains all etcd hosts.If this argument is set, `gpstop` will attempt'
'to stop all etcd in the specified hosts')
parser.set_defaults(verbose=False, filters=[], slice=(None, None))
return parser
@staticmethod
def createProgram(options, args):
logfileDirectory = options.ensure_value("logfileDirectory", False)
external_fts = is_external_fts()
if external_fts and (options.fts_hosts or options.etcd_hosts):
ProgramArgumentValidationException("internal fts not suopport -F and -E")
if options.mode != 'smart':
if options.fast or options.immediate:
raise ProgramArgumentValidationException("Can not mix --mode options with older deprecated '-f,-i,-s'")
if options.coordinator_only and options.only_this_host:
raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with stopping "
"coordinator-only.")
if options.restart and options.only_this_host:
raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-r' for "
"restart.")
if options.request_sighup and options.only_this_host:
raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-u' for "
"config reload.")
if (not options.stop_standby) and options.only_this_host:
raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-y' for "
"skipping standby.")
if options.fast:
options.mode = "fast"
if options.immediate:
options.mode = "immediate"
if options.smart:
options.mode = "smart"
# deprecating force option. it no longer kills -9 things.
# just make it stop fast instead.
if options.force:
options.mode = "fast"
proccount = os.environ.get('GP_MGMT_PROCESS_COUNT')
if options.parallel == 64 and proccount is not None:
options.parallel = int(proccount)
# -n sanity check
if options.parallel > 128 or options.parallel < 1:
raise ProgramArgumentValidationException("Invalid value for parallel degree: %s" % options.parallel)
# Don't allow them to go below default
if options.timeout < SEGMENT_STOP_TIMEOUT_DEFAULT and not options.skipvalidation:
raise ProgramArgumentValidationException(
"Invalid timeout value. Must be greater than %s seconds." % SEGMENT_STOP_TIMEOUT_DEFAULT)
if is_external_fts:
if options.fts_hosts is None:
coordinator_data_directory = gp.get_coordinatordatadir()
options.fts_hosts = coordinator_data_directory + '/config' + '/fts_host'
if args:
raise ProgramArgumentValidationException(
"Argument %s is invalid. Is an option missing a parameter?" % args[-1])
return GpStop(options.mode,
coordinator_datadir=options.coordinatorDataDirectory,
parallel=options.parallel,
quiet=options.quiet,
coordinatoronly=options.coordinator_only,
sighup=options.request_sighup,
interactive=options.interactive,
stopstandby=options.stop_standby,
restart=options.restart,
timeout=options.timeout,
logfileDirectory=logfileDirectory,
onlyThisHost=options.only_this_host,
fts_hosts=options.fts_hosts,
etcd_hosts=options.etcd_hosts,
is_external_fts=external_fts)
if __name__ == '__main__':
simple_main(GpStop.createParser, GpStop.createProgram)