blob: bc1f60b6c3d2f821b4018f6bd639e11130f8d1f0 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
#
# THIS IMPORT MUST COME FIRST
#
# import mainUtils FIRST to get python version check
import sys
from optparse import OptionGroup, SUPPRESS_HELP
from gppylib.mainUtils import *
try:
import pickle
from gppylib.db import dbconn
from gppylib.gpparseopts import OptParser, OptChecker
from gppylib.gparray import *
from gppylib.gplog import get_default_logger, log_to_file_only
from gppylib import userinput
from gppylib.db import catalog
from gppylib.commands import unix
from gppylib.commands import gp
from gppylib.commands.gp import SEGMENT_TIMEOUT_DEFAULT
from gppylib.commands import base
from gppylib.commands import pg
from gppylib.commands import dca
from gppylib import pgconf
from gppylib.heapchecksum import HeapChecksum
from gppylib.commands.pg import PgControlData
from gppylib.operations.startSegments import *
from gppylib.operations.detect_unreachable_hosts import get_unreachable_segment_hosts, mark_segments_down_for_unreachable_hosts
from gppylib.utils import TableLogger
from gppylib.gp_era import GpEraFile
except ImportError as e:
sys.exit('Cannot import modules. Please check that you have sourced cloudberry-env.sh. Detail: ' + str(e))
logger = get_default_logger()
# ---------------------------------------------------------------
class GpStart:
######
def __init__(self, specialMode, restricted, start_standby, coordinator_datadir,
wrapper,
wrapper_args,
skip_standby_check,
parallel=gp.DEFAULT_GPSTART_NUM_WORKERS,
quiet=False,
coordinatoronly=False,
interactive=False,
timeout=SEGMENT_TIMEOUT_DEFAULT,
logfileDirectory=False,
skip_heap_checksum_validation=False,
fts_hosts=None,
etcd_hosts=None,
is_external_fts=False
):
assert (specialMode in [None, 'maintenance'])
self.specialMode = specialMode
self.restricted = restricted
self.start_standby = start_standby
self.pool = None
self.parallel = parallel
self.attempt_standby_start = False
self.quiet = quiet
self.coordinatoronly = coordinatoronly
self.coordinator_datadir = coordinator_datadir
self.interactive = interactive
self.timeout = timeout
self.wrapper = wrapper
self.wrapper_args = wrapper_args
self.skip_standby_check = skip_standby_check
self.logfileDirectory = logfileDirectory
self.skip_heap_checksum_validation = skip_heap_checksum_validation
self.fts_hosts = fts_hosts
self.etcd_hosts = etcd_hosts
self.is_external_fts = is_external_fts
self.singlenodemode = False
#
# Some variables that are set during execution
#
self.era = None
self.gpversion = None
self.gparray = None
self.port = None
self.gphome = None
self.dburl = None
self.max_connections = None
logger.debug("Setting level of parallelism to: %d" % self.parallel)
######
def _start_all_fts(self):
if not os.path.exists(self.fts_hosts):
raise gp.GpError("FTS host file %s does not exist." % self.fts_hosts)
fts_host_machine_list = read_hosts(self.fts_hosts)
etcd_config_tmp_file = "/tmp/cbdb_etcd.conf"
coordinator_data_directory = os.environ.get("COORDINATOR_DATA_DIRECTORY")
copy_etcd_config_file_cmd = f"gpsync -f {self.fts_hosts} {coordinator_data_directory + '/config' + '/cbdb_etcd.conf'} =:{etcd_config_tmp_file}"
subprocess.check_output(copy_etcd_config_file_cmd, shell=True)
logger.info("Begin to start all FTS process.")
isdemo = (len(fts_host_machine_list) == 1)
for fts in fts_host_machine_list:
start_fts(fts, isdemo)
time.sleep(30)
for fts in fts_host_machine_list:
if not check_fts(fts):
raise gp.GpError("FTS process on %s is not running." % fts)
logger.info("start all FTS process successfully")
def _generte_etcd_url(self, etcd_port):
etcd_num = 0
etcd_host_machine_list = read_hosts(self.etcd_hosts)
etcd_cmd = ""
for etcd in etcd_host_machine_list:
etcd_ip = resolve_hostname(etcd)
if etcd_cmd == "":
etcd_cmd = f"etcd-{etcd_num}=http://{etcd_ip}:{etcd_port}"
else:
etcd_cmd += f",etcd-{etcd_num}=http://{etcd_ip}:{etcd_port}"
etcd_num += 1
return etcd_cmd
def _start_all_etcd(self):
if not os.path.exists(self.etcd_hosts):
raise gp.GpError("ETCD host file %s does not exist." % self.etcd_hosts)
etcd_host_machine_list = read_hosts(self.etcd_hosts)
logger.info("Begin to start all ETCD process.")
if len(etcd_host_machine_list) == 1:
start_single_etcd(etcd_host_machine_list[0])
else:
etcd_home_num = 0
etcd_host_list = self._generte_etcd_url(2380)
for etcd in etcd_host_machine_list:
start_etcd(etcd, etcd_home_num, etcd_host_list)
etcd_home_num += 1
time.sleep(60)
for etcd in etcd_host_machine_list:
if not check_etcd(etcd):
raise gp.GpError("ETCD process on %s is not running." % etcd)
logger.info("start all ETCD process successfully")
######
def run(self):
self._prepare()
if self.etcd_hosts is not None:
self._start_all_etcd()
if self.coordinatoronly:
if os.getenv('GPSTART_INTERNAL_COORDINATOR_ONLY'):
logger.info('Coordinator-only start requested for management utilities.')
else:
logger.warning("****************************************************************************")
logger.warning("Coordinator-only start requested. If a standby is configured, this command")
logger.warning("may lead to a split-brain condition and possible unrecoverable data loss.")
logger.warning("Maintenance mode should only be used with direction from Cloudberry Support.")
logger.warning("****************************************************************************")
if self.interactive:
if not userinput.ask_yesno(None, "\nContinue with coordinator-only startup", 'N'):
raise UserAbortedException()
try:
# Disable Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_IGN)
self._startCoordinator()
logger.info("Coordinator Started...")
self.singlenodemode = self.gparray.is_singlenode
if self.singlenodemode:
logger.warning("SinglenodeMode has been enabled, no segment will be created.")
standby_was_started = self._start_standby()
return 0
if self.coordinatoronly:
if self.is_external_fts:
self._start_all_fts()
return 0
num_workers = min(len(self.gparray.get_hostlist()), self.parallel)
hosts = set(self.gparray.get_hostlist(includeCoordinator=False))
# We check for unreachable segment hosts first thing, because if a host is down but its segments
# are marked up, later checks can return invalid or misleading results and the cluster may not
# start in a good state.
unreachable_hosts = get_unreachable_segment_hosts(hosts, num_workers)
if unreachable_hosts:
mark_segments_down_for_unreachable_hosts(self.gparray.segmentPairs, unreachable_hosts)
if self.skip_heap_checksum_validation:
self.coordinator_checksum_value = None
logger.warning("Because of --skip-heap-checksum-validation, the GUC for data_checksums "
"will not be checked between coordinator and segments")
else:
self.coordinator_checksum_value = HeapChecksum(gparray=self.gparray, num_workers=num_workers,
logger=logger).get_coordinator_value()
if not self.skip_standby_check:
self.check_standby()
else:
logger.info("Skipping Standby activation status checking.")
logger.info("Shutting down coordinator")
self.shutdown_coordinator_only()
# TODO: check results of command.
finally:
# Reenable Ctrl-C
signal.signal(signal.SIGINT, signal.default_int_handler)
(segmentsToStart, invalidSegments) = self._prepare_segment_start()
if self.interactive:
self._summarize_actions(segmentsToStart)
if not userinput.ask_yesno(None, "\nContinue with Cloudberry instance startup", 'N'):
raise UserAbortedException()
try:
# Disable Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_IGN)
success = self._start(segmentsToStart, invalidSegments)
finally:
# Reenable Ctrl-C
signal.signal(signal.SIGINT, signal.default_int_handler)
if dca.is_dca_appliance():
logger.info("Initializing DCA settings")
dca.DcaGpdbInitialized.local()
logger.info("DCA settings initialized")
if self.is_external_fts:
self._start_all_fts()
return 0 if success else 1
######
def cleanup(self):
if self.pool:
self.pool.haltWork()
# ------------------------------- Internal Helper --------------------------------
######
def _cbdb_precheck(self):
logger.info("checking fts hosts...")
if not os.path.exists(self.fts_hosts):
raise gp.GpError("FTS host file %s does not exist." % self.fts_hosts)
fts_host_machine_list = read_hosts(self.fts_hosts)
for fts in fts_host_machine_list:
if check_fts(fts):
raise gp.GpError("FTS host %s has alived fts process, please cleanup the environment first." % fts)
if self.etcd_hosts is None:
logger.info("checking etcd hosts...")
coordinator_datadir = os.environ.get("COORDINATOR_DATA_DIRECTORY")
etcd_host_machine_list = read_hosts(coordinator_datadir + "/config/etcd_host")
for etcd in etcd_host_machine_list:
if not check_etcd(etcd):
raise gp.GpError("ETCD host %s has no alived etcd process, please cleanup the environment first." % etcd)
######
def _prepare(self):
logger.info("Gathering information and validating the environment...")
if self.is_external_fts:
self._cbdb_precheck()
self._basic_setup()
self._check_version()
self._check_coordinator_running()
######
def _basic_setup(self):
self.gphome = gp.get_gphome()
if self.coordinator_datadir is None:
self.coordinator_datadir = gp.get_coordinatordatadir()
self.user = unix.getUserName()
gp.check_permissions(self.user)
self._read_postgresqlconf()
######
def _read_postgresqlconf(self):
logger.debug("Obtaining coordinator's port from coordinator data directory")
pgconf_dict = pgconf.readfile(self.coordinator_datadir + "/postgresql.conf")
self.port = pgconf_dict.int('port')
logger.debug("Read from postgresql.conf port=%s" % self.port)
self.max_connections = pgconf_dict.int('max_connections')
logger.debug("Read from postgresql.conf max_connections=%s" % self.max_connections)
######
def _check_version(self):
self.gpversion = gp.GpVersion.local('local CBDB software version check', self.gphome)
logger.info("Cloudberry Binary Version: '%s'" % self.gpversion)
# It would be nice to work out the catalog version => greenplum version
# calculation so that we can print out nicer error messages when
# version doesn't match.
bin_catversion = gp.GpCatVersion.local('local CBDB software catalag version check', self.gphome)
logger.info("Cloudberry Catalog Version: '%s'" % bin_catversion)
dir_catversion = gp.GpCatVersionDirectory.local('local CBDB directory catalog version check', self.coordinator_datadir)
if bin_catversion != dir_catversion:
logger.info("COORDINATOR_DIRECTORY Catalog Version: '%s'" % dir_catversion)
logger.info("Catalog Version of coordinator directory incompatible with binaries")
raise ExceptionNoStackTraceNeeded("Catalog Versions are incompatible")
######
def _check_coordinator_running(self):
logger.debug("Check if Coordinator is already running...")
if os.path.exists(self.coordinator_datadir + '/postmaster.pid'):
logger.warning("postmaster.pid file exists on Coordinator, checking if recovery startup required")
self._recovery_startup()
self._remove_postmaster_tmpfile(self.port)
def shutdown_coordinator_only(self):
cmd = gp.GpStop("Shutting down coordinator", coordinatorOnly=True,
fast=True, quiet=logging_is_quiet(),
verbose=logging_is_verbose(),
datadir=self.coordinator_datadir,
parallel=self.parallel,
logfileDirectory=self.logfileDirectory)
cmd.run()
logger.debug("results of forcing coordinator shutdown: %s" % cmd)
def fetch_tli(self, data_dir_path, remoteHost=None):
if not remoteHost:
controldata = PgControlData("fetching pg_controldata locally", data_dir_path)
else:
controldata = PgControlData("fetching pg_controldata remotely", data_dir_path, REMOTE, remoteHost)
controldata.run(validateAfter=True)
return int(controldata.get_value("Latest checkpoint's TimeLineID"))
class StandbyUnreachable(Exception):
pass
def _standby_activated(self):
logger.debug("Checking if standby has been activated...")
if not self.gparray.standbyCoordinator:
return False
# fetch timelineids for both primary and standby (post-promote)
primary_tli = self.fetch_tli(self.coordinator_datadir)
try:
standby_tli = self.fetch_tli(self.gparray.standbyCoordinator.getSegmentDataDirectory(),
self.gparray.standbyCoordinator.getSegmentHostName())
except base.ExecutionError as err:
raise GpStart.StandbyUnreachable(err)
logger.debug("Primary TLI = %d" % primary_tli)
logger.debug("Standby TLI = %d" % standby_tli)
return primary_tli < standby_tli
def check_standby(self):
try:
standby_activated = self._standby_activated()
except GpStart.StandbyUnreachable as err:
logger.warning("Standby host is unreachable, cannot determine whether the standby is currently acting as the coordinator. Received error: %s" % err)
logger.warning("Continue only if you are certain that the standby is not acting as the coordinator.")
if not self.interactive or not userinput.ask_yesno(None, "\nContinue with startup", 'N'):
if not self.interactive:
logger.warning("Non interactive mode detected. Not starting the cluster. Start the cluster in interactive mode.")
self.shutdown_coordinator_only()
raise UserAbortedException()
# If the user wants to continue when the standby is unreachable,
# set start_standby to False to prevent starting the unreachable
# standy later in the startup process.
self.start_standby = False
return
if not standby_activated:
return
# stop the coordinator we've started up.
cmd = gp.GpStop("Shutting down coordinator", coordinatorOnly=True,
fast=True, quiet=logging_is_quiet(),
verbose=logging_is_verbose(),
datadir=self.coordinator_datadir,
parallel=self.parallel)
cmd.run(validateAfter=True)
logger.info("Coordinator Stopped...")
raise ExceptionNoStackTraceNeeded("Standby activated, this node no more can act as coordinator.")
######
def _recovery_startup(self):
logger.info("Commencing recovery startup checks")
lockfile = "/tmp/.s.PGSQL.%s" % self.port
tmpfile_exists = os.path.exists(lockfile)
ss_port_active = unix.PgPortIsActive.local('check ss for coordinator port',
lockfile, self.port)
if tmpfile_exists and ss_port_active:
logger.info("Have lock file %s and a process running on port %s" % (lockfile, self.port))
raise ExceptionNoStackTraceNeeded("Coordinator instance process running")
elif tmpfile_exists and not ss_port_active:
logger.info("Have lock file %s but no process running on port %s" % (lockfile, self.port))
elif not tmpfile_exists and ss_port_active:
logger.info("No lock file %s but a process running on port %s" % (lockfile, self.port))
raise ExceptionNoStackTraceNeeded("Port %s is already in use" % self.port)
elif not tmpfile_exists and not ss_port_active:
logger.info("No socket connection or lock file in /tmp found for port=%s" % self.port)
logger.info("No Coordinator instance process, entering recovery startup mode")
if tmpfile_exists:
logger.info("Clearing Coordinator instance lock files")
os.remove(lockfile)
postmaster_pid_file = "%s/postmaster.pid" % self.coordinator_datadir
if os.path.exists(postmaster_pid_file):
logger.info("Clearing Coordinator instance pid file")
os.remove("%s/postmaster.pid" % self.coordinator_datadir)
self._startCoordinator()
logger.info("Commencing forced instance shutdown")
gp.GpStop.local("forcing coordinator shutdown", coordinatorOnly=True,
verbose=logging_is_verbose,
quiet=self.quiet, fast=False,
force=True, datadir=self.coordinator_datadir, parallel=self.parallel)
######
def _remove_postmaster_tmpfile(self, port):
lockfile = "/tmp/.s.PGSQL.%s" % port
tmpfile_exists = os.path.exists(lockfile)
if tmpfile_exists:
logger.info("Clearing Coordinator instance lock files")
os.remove(lockfile)
pass
######
def _summarize_actions(self, segmentsToStart):
logger.info("--------------------------")
logger.info("Coordinator instance parameters")
logger.info("--------------------------")
logger.info("Database = %s" % self.dburl.pgdb)
logger.info("Coordinator Port = %s" % self.port)
logger.info("Coordinator directory = %s" % self.coordinator_datadir)
logger.info("Timeout = %d seconds" % self.timeout)
if self.gparray.standbyCoordinator:
if self.start_standby:
logger.info("Coordinator standby start = On")
else:
logger.info("Coordinator standby start = Off")
else:
logger.info("Coordinator standby = Off ")
logger.info("--------------------------------------")
logger.info("Segment instances that will be started")
logger.info("--------------------------------------")
isFileReplication = self.gparray.hasMirrors
tabLog = TableLogger().setWarnWithArrows(True)
header = ["Host", "Datadir", "Port"]
if isFileReplication:
header.append("Role")
tabLog.info(header)
for db in segmentsToStart:
line = [db.getSegmentHostName(), db.getSegmentDataDirectory(), str(db.getSegmentPort())]
if isFileReplication:
line.append("Primary" if db.isSegmentPrimary(True) else "Mirror")
tabLog.info(line)
tabLog.outputTable()
######
def _get_format_string(self):
host_len = 0
dir_len = 0
port_len = 0
for db in self.gparray.getSegDbList():
if len(db.hostname) > host_len:
host_len = len(db.hostname)
if len(db.datadir) > dir_len:
dir_len = len(db.datadir)
if len(str(db.port)) > port_len:
port_len = len(str(db.port))
return "%-" + str(host_len) + "s %-" + str(dir_len) + "s %-" + str(port_len) + "s %s"
######
def _startCoordinator(self):
if self.restricted:
logger.info("Starting Coordinator instance in admin and RESTRICTED mode")
else:
logger.info("Starting Coordinator instance in admin mode")
cmd = gp.CoordinatorStart('Coordinator in utility mode with restricted set to {0}'.format(self.restricted),
self.coordinator_datadir, self.port, self.era, wrapper=self.wrapper,
wrapper_args=self.wrapper_args, specialMode=self.specialMode,
restrictedMode=self.restricted, timeout=self.timeout, utilityMode=True,
max_connections=self.max_connections)
cmd.run()
if cmd.get_results().rc != 0:
if self.restricted:
logger.fatal("Failed to start Coordinator instance in admin and RESTRICTED mode")
else:
logger.fatal("Failed to start Coordinator instance in admin mode")
cmd.validate()
logger.info("Obtaining Cloudberry Coordinator catalog information")
logger.info("Obtaining Segment details from coordinator...")
self.dburl = dbconn.DbURL(port=self.port, dbname='template1')
self.gparray = GpArray.initFromCatalog(self.dburl, utility=True)
logger.info("Setting new coordinator era")
e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose())
e.new_era(self.gparray.coordinator.hostname, self.port, time.strftime('%y%m%d%H%M%S'))
self.era = e.era
######
def _start(self, segmentsToStart, invalidSegments):
""" starts all of the segments, the coordinator and the standby coordinator
returns whether all segments that should be started were started successfully
note that the parameters do not list coordinator/standby, they only list data segments
"""
workers = min(len(self.gparray.get_hostlist()), self.parallel)
self.pool = base.WorkerPool(numWorkers=workers)
if os.path.exists(self.coordinator_datadir + "/gpexpand.status") and not self.restricted:
raise ExceptionNoStackTraceNeeded(
"Found a System Expansion Setup in progress. Please run 'gpexpand --rollback'")
logger.debug("gparray does%s have mirrors" % ("" if self.gparray.hasMirrors else " not"))
if self.gparray.hasMirrors:
startMode = START_AS_PRIMARY_OR_MIRROR
else:
startMode = START_AS_MIRRORLESS
# this will eventually start gpsegstart.py
segmentStartOp = StartSegmentsOperation(self.pool, self.quiet, self.gpversion,
self.gphome, self.coordinator_datadir, self.coordinator_checksum_value,
self.timeout, self.specialMode, self.wrapper, self.wrapper_args, self.parallel,
logfileDirectory=self.logfileDirectory)
segmentStartResult = segmentStartOp.startSegments(self.gparray, segmentsToStart, startMode, self.era)
# see if we have at least one segment per content
willShutdownSegments = not self._verify_enough_segments(segmentStartResult, self.gparray)
# process the result of segment startup
self._print_segment_start(segmentStartResult, invalidSegments, willShutdownSegments)
if willShutdownSegments:
# go through and remove any segments that we did start so that we keep everything
# shutdown cleanly
self._shutdown_segments(segmentStartResult)
raise ExceptionNoStackTraceNeeded("Do not have enough valid segments to start the array.")
failedToStart = segmentStartResult.getFailedSegmentObjs()
coordinator_result, message = self._start_final_coordinator()
if not coordinator_result:
return False
# start standby after coordinator in dispatch mode comes up
standby_was_started = self._start_standby()
# report if we complete operations
return self._check_final_result(
not standby_was_started and self.attempt_standby_start,
failedToStart, invalidSegments, message)
######
def _prepare_segment_start(self):
segs = self.gparray.get_valid_segdbs()
logger.debug("gp_segment_configuration indicates following valid segments")
for seg in segs:
logger.debug("SegDB: %s" % seg)
# segments marked down
invalid_segs = self.gparray.get_invalid_segdbs()
for seg in invalid_segs:
logger.warning("Skipping startup of segment marked down in configuration: on %s directory %s <<<<<" % \
(seg.getSegmentHostName(), seg.getSegmentDataDirectory()))
return (segs, invalid_segs)
####
def _verify_enough_segments(self, startResult, gparray):
successfulSegments = startResult.getSuccessfulSegments()
allSegmentsByContent = GpArray.getSegmentsByContentId(gparray.getSegDbList())
successfulSegmentsByDbId = GpArray.getSegmentsGroupedByValue(successfulSegments, Segment.getSegmentDbId)
#
# look at each content, see if there is a segment available (or one
# which can be made available by failing over)
#
for primary in gparray.getSegDbList():
if not primary.isSegmentPrimary(current_role=True):
continue
# find the mirror
segs = allSegmentsByContent[primary.getSegmentContentId()]
mirror = None
if len(segs) > 1:
mirror = [s for s in segs if s.isSegmentMirror(current_role=True)][0]
if primary.getSegmentDbId() in successfulSegmentsByDbId:
# good, we can continue!
continue
if mirror is not None \
and mirror.getSegmentDbId() in successfulSegmentsByDbId \
and primary.isSegmentModeSynchronized():
#
# we could fail over to that mirror, so it's okay to start up like this
#
continue
logger.error("No segment started for content: %d." % primary.getSegmentContentId())
logger.info("dumping success segments: %s" % [s.__str__() for s in startResult.getSuccessfulSegments()])
return False
return True
######
def _shutdown_segments(self, segmentStartResult):
logger.info("Commencing parallel segment instance shutdown, please wait...")
#
# Note that a future optimization would be to only stop the segments that we actually started.
# This requires knowing which ones are left in a partially up state
#
#
# gather the list of those that we actually tried to start
toStop = []
toStop.extend(segmentStartResult.getSuccessfulSegments())
toStop.extend([f.getSegment() for f in segmentStartResult.getFailedSegmentObjs()])
segmentsByHost = GpArray.getSegmentsByHostName(toStop)
#
# stop them, stopping primaries before mirrors
#
for type in ["primary", "mirror"]:
for hostName, segments in segmentsByHost.items():
if type == "primary":
segments = [seg for seg in segments if seg.isSegmentPrimary(current_role=True)]
else:
segments = [seg for seg in segments if seg.isSegmentMirror(current_role=True)]
if len(segments) > 0:
logger.debug("Dispatching command to shutdown %s segments on host: %s" % (type, hostName))
cmd = gp.GpSegStopCmd("remote segment stop on host '%s'" % hostName,
self.gphome, self.gpversion,
mode='immediate', dbs=segments,
verbose=logging_is_verbose(),
ctxt=base.REMOTE, remoteHost=hostName)
self.pool.addCommand(cmd)
if self.quiet:
self.pool.join()
else:
base.join_and_indicate_progress(self.pool)
######
def _print_segment_start(self, segmentStartResult, invalidSegments, willShutdownSegments):
"""
Print the results of segment startup
segmentStartResult is the StartSegmentsResult from the actual start
invalidSegments are those that we didn't even try to start because they are marked as down or should otherwise
not be started
"""
started = len(segmentStartResult.getSuccessfulSegments())
failed = len(segmentStartResult.getFailedSegmentObjs())
totalTriedToStart = started + failed
if failed or logging_is_verbose():
logger.info("----------------------------------------------------")
for failure in segmentStartResult.getFailedSegmentObjs():
segment = failure.getSegment()
logger.info("DBID:%d FAILED host:'%s' datadir:'%s' with reason:'%s'" % (
segment.getSegmentDbId(), segment.getSegmentHostName(),
segment.getSegmentDataDirectory(), failure.getReason()))
for segment in segmentStartResult.getSuccessfulSegments():
logger.debug("DBID:%d STARTED" % segment.getSegmentDbId())
logger.info("----------------------------------------------------\n\n")
tableLog = TableLogger().setWarnWithArrows(True)
tableLog.addSeparator()
tableLog.info(["Successful segment starts", "= %d" % started])
tableLog.infoOrWarn(failed, ["Failed segment starts", "= %d" % failed])
tableLog.infoOrWarn(len(invalidSegments) > 0,
["Skipped segment starts (segments are marked down in configuration)",
"= %d" % len(invalidSegments)])
tableLog.addSeparator()
tableLog.outputTable()
attentionFlag = "<<<<<<<<" if started != totalTriedToStart else ""
if len(invalidSegments) > 0:
skippedMsg = ", skipped %s other segments" % len(invalidSegments)
else:
skippedMsg = ""
logger.info("Successfully started %d of %d segment instances%s %s" %
(started, totalTriedToStart, skippedMsg, attentionFlag))
logger.info("----------------------------------------------------")
if failed:
logger.warning("Segment instance startup failures reported")
logger.warning("Failed start %d of %d segment instances %s" % \
(failed, totalTriedToStart, attentionFlag))
logger.warning("Review %s" % get_logfile())
if not willShutdownSegments:
logger.warning("For more details on segment startup failure(s)")
logger.warning("Run gpstate -s to review current segment instance status")
logger.info("----------------------------------------------------")
if len(invalidSegments) > 0:
logger.warning("****************************************************************************")
logger.warning("There are %d segment(s) marked down in the database" % len(invalidSegments))
logger.warning("To recover from this current state, review usage of the gprecoverseg")
logger.warning("management utility which will recover failed segment instance databases.")
logger.warning("****************************************************************************")
######
def _check_final_result(self, standbyFailure,
failedToStart, invalidSegments, msg):
if standbyFailure:
logger.warning("Standby Coordinator could not be started")
if len(failedToStart) > 0:
logger.warning("Number of segments which failed to start: %d", len(failedToStart))
if standbyFailure or len(failedToStart) > 0:
return False
if len(invalidSegments) > 0:
logger.warning("Number of segments not attempted to start: %d", len(invalidSegments))
if (len(failedToStart) > 0 or len(invalidSegments) > 0 or
msg is not None):
logger.info("Check status of database with gpstate utility")
else:
logger.info("Database successfully started")
return True
######
def _start_final_coordinator(self):
""" Last item in the startup sequence is to start the coordinator.
After starting the coordinator we connect to it. This is done both as a check that the system is
actually started but its also done because certain backend processes don't get kickstarted
until the first connection. The DTM is an example of this and it allows those initialization
messages to end up in the gpstart log as opposed to the user's psql session.
Returns a tuple of (result[bool], message[string])
"""
restrict_txt = ""
if self.restricted:
restrict_txt = "in RESTRICTED mode"
logger.info("Starting Coordinator instance %s directory %s %s" % (
self.gparray.coordinator.hostname, self.coordinator_datadir, restrict_txt))
# attempt to start coordinator
gp.CoordinatorStart.local("Starting Coordinator instance",
self.coordinator_datadir, self.port, self.era,
wrapper=self.wrapper, wrapper_args=self.wrapper_args,
specialMode=self.specialMode, restrictedMode=self.restricted, timeout=self.timeout,
max_connections=self.max_connections
)
# check that coordinator is running now
if not pg.DbStatus.local('coordinator instance', self.gparray.coordinator):
logger.warning("Command pg_ctl reports Coordinator %s on port %d not running" % (
self.gparray.coordinator.datadir, self.gparray.coordinator.port))
logger.warning("Coordinator could not be started")
return False, None
logger.info("Command pg_ctl reports Coordinator %s instance active" % self.gparray.coordinator.hostname)
msg = None
try:
self.dburl.retries = 4
self.dburl.timeout = 15
coordinatorconn = dbconn.connect(self.dburl)
coordinatorconn.close()
except Exception as e:
# MPP-14016. While certain fts scenarios will trigger initial connection failures
# we still need watch for PANIC events.
msg = str(e)
if 'PANIC' in msg:
logger.critical(msg)
return False
logger.warning(msg)
# set the era we used when starting the segments
e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose())
e.set_era(self.era)
return True, msg
######
def _start_standby(self):
""" used to start the standbycoordinator if necessary.
returns if the standby coordinator was started or not
"""
if self.start_standby and self.gparray.standbyCoordinator is not None:
try:
self.attempt_standby_start = True
host = self.gparray.standbyCoordinator.hostname
datadir = self.gparray.standbyCoordinator.datadir
port = self.gparray.standbyCoordinator.port
return gp.start_standbycoordinator(host, datadir, port, era=self.era,
wrapper=self.wrapper,
wrapper_args=self.wrapper_args)
except base.ExecutionError as e:
logger.warning("Error occured while starting the standby coordinator: %s" % e)
return False
else:
logger.info("No standby coordinator configured. skipping...")
return False
# ----------------------- Command line option parser ----------------------
@staticmethod
def createParser():
parser = OptParser(option_class=OptChecker,
description="Starts a CBDB Array.",
version='%prog version $Revision$')
parser.setHelp([])
addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption=True)
addTo = OptionGroup(parser, 'Connection options')
parser.add_option_group(addTo)
addCoordinatorDirectoryOptionForSingleClusterProgram(addTo)
addTo = OptionGroup(parser, 'Database startup options: ')
parser.add_option_group(addTo)
addTo.add_option('-U', '--specialMode', type='choice', choices=['maintenance'],
metavar='maintenance', action='store', default=None,
help=SUPPRESS_HELP)
addTo.add_option('-m', '--master_only', '-c', '--coordinator_only', dest="coordinator_only", action='store_true',
help='start coordinator instance only in maintenance mode')
addTo.add_option('-y', '--no_standby', dest="start_standby", action='store_false', default=True,
help='do not start coordinator standby server')
addTo.add_option('-B', '--parallel', type="int", default=gp.DEFAULT_GPSTART_NUM_WORKERS, metavar="<parallel_processes>",
help='number of segment hosts to run in parallel. Default is %d' % gp.DEFAULT_GPSTART_NUM_WORKERS)
addTo.add_option('-R', '--restricted', action='store_true',
help='start in restricted mode. Only users with superuser privilege are allowed to connect.')
addTo.add_option('-t', '--timeout', dest='timeout', default=SEGMENT_TIMEOUT_DEFAULT, type='int',
help='time to wait for segment startup (in seconds)')
addTo.add_option('', '--wrapper', dest="wrapper", default=None, type='string')
addTo.add_option('', '--wrapper-args', dest="wrapper_args", default=None, type='string')
addTo.add_option('-S', '--skip_standby_check', dest="skip_standby_check", action='store_true', default=False)
addTo.add_option('--skip-heap-checksum-validation', dest='skip_heap_checksum_validation',
action='store_true', default=False, help='Skip the validation of data_checksums GUC. '
'Note: Starting up the cluster without this '
'validation could lead to data loss.')
addTo.add_option('-F', dest='fts_hosts', type='string',default=None ,
help='specify the file that contains all fts hosts.If this argument is set, `gpstart` will attempt'
'to start all fts in the specified hosts, if not, `gpstart` will start all fts spectified in the'
'$COORDINATOR_DATA_DIRECTORY/config/fts_host')
addTo.add_option('-E', dest='etcd_hosts', type='string',default=None ,
help='specify the file that contains all etcd hosts.If this argument is set, `gpstart` will attempt'
'to start all etcd in the specified hosts')
parser.set_defaults(verbose=False, filters=[], slice=(None, None))
return parser
@staticmethod
def createProgram(options, args):
logfileDirectory = options.ensure_value("logfileDirectory", False)
proccount = os.environ.get('GP_MGMT_PROCESS_COUNT')
external_fts = is_external_fts()
if options.parallel == 64 and proccount is not None:
options.parallel = int(proccount)
if external_fts and (options.fts_hosts or options.etcd_hosts):
ProgramArgumentValidationException("internal fts not suopport -F and -E")
# -n sanity check
if options.parallel > 128 or options.parallel < 1:
raise ProgramArgumentValidationException("Invalid value for parallel degree: %s" % options.parallel)
if args:
raise ProgramArgumentValidationException(
"Argument %s is invalid. Is an option missing a parameter?" % args[-1])
if is_external_fts:
if options.fts_hosts is None:
coordinator_data_directory = gp.get_coordinatordatadir()
options.fts_hosts = coordinator_data_directory + '/config' + '/fts_host'
return GpStart(options.specialMode, options.restricted,
options.start_standby,
coordinator_datadir=options.coordinatorDataDirectory,
parallel=options.parallel,
quiet=options.quiet,
coordinatoronly=options.coordinator_only,
interactive=options.interactive,
timeout=options.timeout,
wrapper=options.wrapper,
wrapper_args=options.wrapper_args,
skip_standby_check=options.skip_standby_check,
logfileDirectory=logfileDirectory,
skip_heap_checksum_validation=options.skip_heap_checksum_validation,
fts_hosts=options.fts_hosts,
etcd_hosts=options.etcd_hosts,
is_external_fts=external_fts
)
if __name__ == '__main__':
simple_main(GpStart.createParser, GpStart.createProgram)