| #!/usr/bin/env python3 |
| # |
| # Copyright (c) Greenplum Inc 2008. All Rights Reserved. |
| # |
| # |
| # THIS IMPORT MUST COME FIRST |
| # |
| # import mainUtils FIRST to get python version check |
| import sys |
| from optparse import OptionGroup, SUPPRESS_HELP |
| |
| from gppylib.mainUtils import * |
| |
| try: |
| import pickle |
| |
| from gppylib.db import dbconn |
| from gppylib.gpparseopts import OptParser, OptChecker |
| from gppylib.gparray import * |
| from gppylib.gplog import get_default_logger, log_to_file_only |
| from gppylib import userinput |
| from gppylib.db import catalog |
| from gppylib.commands import unix |
| from gppylib.commands import gp |
| from gppylib.commands.gp import SEGMENT_TIMEOUT_DEFAULT |
| from gppylib.commands import base |
| from gppylib.commands import pg |
| from gppylib.commands import dca |
| from gppylib import pgconf |
| from gppylib.heapchecksum import HeapChecksum |
| from gppylib.commands.pg import PgControlData |
| from gppylib.operations.startSegments import * |
| from gppylib.operations.detect_unreachable_hosts import get_unreachable_segment_hosts, mark_segments_down_for_unreachable_hosts |
| from gppylib.utils import TableLogger |
| from gppylib.gp_era import GpEraFile |
| except ImportError as e: |
| sys.exit('Cannot import modules. Please check that you have sourced cloudberry-env.sh. Detail: ' + str(e)) |
| |
| logger = get_default_logger() |
| |
| |
| # --------------------------------------------------------------- |
| class GpStart: |
| ###### |
| def __init__(self, specialMode, restricted, start_standby, coordinator_datadir, |
| wrapper, |
| wrapper_args, |
| skip_standby_check, |
| parallel=gp.DEFAULT_GPSTART_NUM_WORKERS, |
| quiet=False, |
| coordinatoronly=False, |
| interactive=False, |
| timeout=SEGMENT_TIMEOUT_DEFAULT, |
| logfileDirectory=False, |
| skip_heap_checksum_validation=False, |
| fts_hosts=None, |
| etcd_hosts=None, |
| is_external_fts=False |
| ): |
| assert (specialMode in [None, 'maintenance']) |
| self.specialMode = specialMode |
| self.restricted = restricted |
| self.start_standby = start_standby |
| self.pool = None |
| self.parallel = parallel |
| self.attempt_standby_start = False |
| self.quiet = quiet |
| self.coordinatoronly = coordinatoronly |
| self.coordinator_datadir = coordinator_datadir |
| self.interactive = interactive |
| self.timeout = timeout |
| self.wrapper = wrapper |
| self.wrapper_args = wrapper_args |
| self.skip_standby_check = skip_standby_check |
| self.logfileDirectory = logfileDirectory |
| self.skip_heap_checksum_validation = skip_heap_checksum_validation |
| self.fts_hosts = fts_hosts |
| self.etcd_hosts = etcd_hosts |
| self.is_external_fts = is_external_fts |
| self.singlenodemode = False |
| |
| # |
| # Some variables that are set during execution |
| # |
| self.era = None |
| self.gpversion = None |
| self.gparray = None |
| self.port = None |
| self.gphome = None |
| self.dburl = None |
| self.max_connections = None |
| logger.debug("Setting level of parallelism to: %d" % self.parallel) |
| ###### |
| def _start_all_fts(self): |
| if not os.path.exists(self.fts_hosts): |
| raise gp.GpError("FTS host file %s does not exist." % self.fts_hosts) |
| fts_host_machine_list = read_hosts(self.fts_hosts) |
| |
| etcd_config_tmp_file = "/tmp/cbdb_etcd.conf" |
| coordinator_data_directory = os.environ.get("COORDINATOR_DATA_DIRECTORY") |
| copy_etcd_config_file_cmd = f"gpsync -f {self.fts_hosts} {coordinator_data_directory + '/config' + '/cbdb_etcd.conf'} =:{etcd_config_tmp_file}" |
| subprocess.check_output(copy_etcd_config_file_cmd, shell=True) |
| logger.info("Begin to start all FTS process.") |
| isdemo = (len(fts_host_machine_list) == 1) |
| for fts in fts_host_machine_list: |
| start_fts(fts, isdemo) |
| time.sleep(30) |
| for fts in fts_host_machine_list: |
| if not check_fts(fts): |
| raise gp.GpError("FTS process on %s is not running." % fts) |
| logger.info("start all FTS process successfully") |
| |
| def _generte_etcd_url(self, etcd_port): |
| etcd_num = 0 |
| etcd_host_machine_list = read_hosts(self.etcd_hosts) |
| etcd_cmd = "" |
| for etcd in etcd_host_machine_list: |
| etcd_ip = resolve_hostname(etcd) |
| if etcd_cmd == "": |
| etcd_cmd = f"etcd-{etcd_num}=http://{etcd_ip}:{etcd_port}" |
| else: |
| etcd_cmd += f",etcd-{etcd_num}=http://{etcd_ip}:{etcd_port}" |
| etcd_num += 1 |
| return etcd_cmd |
| |
| def _start_all_etcd(self): |
| if not os.path.exists(self.etcd_hosts): |
| raise gp.GpError("ETCD host file %s does not exist." % self.etcd_hosts) |
| etcd_host_machine_list = read_hosts(self.etcd_hosts) |
| |
| logger.info("Begin to start all ETCD process.") |
| if len(etcd_host_machine_list) == 1: |
| start_single_etcd(etcd_host_machine_list[0]) |
| else: |
| etcd_home_num = 0 |
| etcd_host_list = self._generte_etcd_url(2380) |
| for etcd in etcd_host_machine_list: |
| start_etcd(etcd, etcd_home_num, etcd_host_list) |
| etcd_home_num += 1 |
| time.sleep(60) |
| for etcd in etcd_host_machine_list: |
| if not check_etcd(etcd): |
| raise gp.GpError("ETCD process on %s is not running." % etcd) |
| logger.info("start all ETCD process successfully") |
| |
| ###### |
| def run(self): |
| self._prepare() |
| |
| if self.etcd_hosts is not None: |
| self._start_all_etcd() |
| |
| if self.coordinatoronly: |
| if os.getenv('GPSTART_INTERNAL_COORDINATOR_ONLY'): |
| logger.info('Coordinator-only start requested for management utilities.') |
| else: |
| logger.warning("****************************************************************************") |
| logger.warning("Coordinator-only start requested. If a standby is configured, this command") |
| logger.warning("may lead to a split-brain condition and possible unrecoverable data loss.") |
| logger.warning("Maintenance mode should only be used with direction from Cloudberry Support.") |
| logger.warning("****************************************************************************") |
| if self.interactive: |
| if not userinput.ask_yesno(None, "\nContinue with coordinator-only startup", 'N'): |
| raise UserAbortedException() |
| |
| try: |
| # Disable Ctrl-C |
| signal.signal(signal.SIGINT, signal.SIG_IGN) |
| |
| self._startCoordinator() |
| logger.info("Coordinator Started...") |
| |
| self.singlenodemode = self.gparray.is_singlenode |
| if self.singlenodemode: |
| logger.warning("SinglenodeMode has been enabled, no segment will be created.") |
| standby_was_started = self._start_standby() |
| return 0 |
| |
| if self.coordinatoronly: |
| if self.is_external_fts: |
| self._start_all_fts() |
| return 0 |
| |
| num_workers = min(len(self.gparray.get_hostlist()), self.parallel) |
| hosts = set(self.gparray.get_hostlist(includeCoordinator=False)) |
| # We check for unreachable segment hosts first thing, because if a host is down but its segments |
| # are marked up, later checks can return invalid or misleading results and the cluster may not |
| # start in a good state. |
| unreachable_hosts = get_unreachable_segment_hosts(hosts, num_workers) |
| if unreachable_hosts: |
| mark_segments_down_for_unreachable_hosts(self.gparray.segmentPairs, unreachable_hosts) |
| |
| if self.skip_heap_checksum_validation: |
| self.coordinator_checksum_value = None |
| logger.warning("Because of --skip-heap-checksum-validation, the GUC for data_checksums " |
| "will not be checked between coordinator and segments") |
| else: |
| self.coordinator_checksum_value = HeapChecksum(gparray=self.gparray, num_workers=num_workers, |
| logger=logger).get_coordinator_value() |
| |
| if not self.skip_standby_check: |
| self.check_standby() |
| else: |
| logger.info("Skipping Standby activation status checking.") |
| |
| logger.info("Shutting down coordinator") |
| self.shutdown_coordinator_only() |
| # TODO: check results of command. |
| |
| finally: |
| # Reenable Ctrl-C |
| signal.signal(signal.SIGINT, signal.default_int_handler) |
| |
| (segmentsToStart, invalidSegments) = self._prepare_segment_start() |
| |
| if self.interactive: |
| self._summarize_actions(segmentsToStart) |
| if not userinput.ask_yesno(None, "\nContinue with Cloudberry instance startup", 'N'): |
| raise UserAbortedException() |
| |
| try: |
| # Disable Ctrl-C |
| signal.signal(signal.SIGINT, signal.SIG_IGN) |
| |
| success = self._start(segmentsToStart, invalidSegments) |
| finally: |
| # Reenable Ctrl-C |
| signal.signal(signal.SIGINT, signal.default_int_handler) |
| |
| if dca.is_dca_appliance(): |
| logger.info("Initializing DCA settings") |
| dca.DcaGpdbInitialized.local() |
| logger.info("DCA settings initialized") |
| if self.is_external_fts: |
| self._start_all_fts() |
| return 0 if success else 1 |
| ###### |
| def cleanup(self): |
| if self.pool: |
| self.pool.haltWork() |
| |
| # ------------------------------- Internal Helper -------------------------------- |
| ###### |
| def _cbdb_precheck(self): |
| logger.info("checking fts hosts...") |
| if not os.path.exists(self.fts_hosts): |
| raise gp.GpError("FTS host file %s does not exist." % self.fts_hosts) |
| |
| fts_host_machine_list = read_hosts(self.fts_hosts) |
| for fts in fts_host_machine_list: |
| if check_fts(fts): |
| raise gp.GpError("FTS host %s has alived fts process, please cleanup the environment first." % fts) |
| |
| if self.etcd_hosts is None: |
| logger.info("checking etcd hosts...") |
| coordinator_datadir = os.environ.get("COORDINATOR_DATA_DIRECTORY") |
| etcd_host_machine_list = read_hosts(coordinator_datadir + "/config/etcd_host") |
| for etcd in etcd_host_machine_list: |
| if not check_etcd(etcd): |
| raise gp.GpError("ETCD host %s has no alived etcd process, please cleanup the environment first." % etcd) |
| |
| ###### |
| def _prepare(self): |
| logger.info("Gathering information and validating the environment...") |
| if self.is_external_fts: |
| self._cbdb_precheck() |
| self._basic_setup() |
| |
| self._check_version() |
| self._check_coordinator_running() |
| |
| ###### |
| def _basic_setup(self): |
| self.gphome = gp.get_gphome() |
| if self.coordinator_datadir is None: |
| self.coordinator_datadir = gp.get_coordinatordatadir() |
| self.user = unix.getUserName() |
| gp.check_permissions(self.user) |
| self._read_postgresqlconf() |
| |
| ###### |
| def _read_postgresqlconf(self): |
| logger.debug("Obtaining coordinator's port from coordinator data directory") |
| pgconf_dict = pgconf.readfile(self.coordinator_datadir + "/postgresql.conf") |
| self.port = pgconf_dict.int('port') |
| logger.debug("Read from postgresql.conf port=%s" % self.port) |
| self.max_connections = pgconf_dict.int('max_connections') |
| logger.debug("Read from postgresql.conf max_connections=%s" % self.max_connections) |
| |
| ###### |
| def _check_version(self): |
| self.gpversion = gp.GpVersion.local('local CBDB software version check', self.gphome) |
| logger.info("Cloudberry Binary Version: '%s'" % self.gpversion) |
| |
| # It would be nice to work out the catalog version => greenplum version |
| # calculation so that we can print out nicer error messages when |
| # version doesn't match. |
| bin_catversion = gp.GpCatVersion.local('local CBDB software catalag version check', self.gphome) |
| logger.info("Cloudberry Catalog Version: '%s'" % bin_catversion) |
| |
| dir_catversion = gp.GpCatVersionDirectory.local('local CBDB directory catalog version check', self.coordinator_datadir) |
| |
| if bin_catversion != dir_catversion: |
| logger.info("COORDINATOR_DIRECTORY Catalog Version: '%s'" % dir_catversion) |
| logger.info("Catalog Version of coordinator directory incompatible with binaries") |
| raise ExceptionNoStackTraceNeeded("Catalog Versions are incompatible") |
| |
| ###### |
| def _check_coordinator_running(self): |
| logger.debug("Check if Coordinator is already running...") |
| if os.path.exists(self.coordinator_datadir + '/postmaster.pid'): |
| logger.warning("postmaster.pid file exists on Coordinator, checking if recovery startup required") |
| self._recovery_startup() |
| |
| self._remove_postmaster_tmpfile(self.port) |
| |
| def shutdown_coordinator_only(self): |
| cmd = gp.GpStop("Shutting down coordinator", coordinatorOnly=True, |
| fast=True, quiet=logging_is_quiet(), |
| verbose=logging_is_verbose(), |
| datadir=self.coordinator_datadir, |
| parallel=self.parallel, |
| logfileDirectory=self.logfileDirectory) |
| cmd.run() |
| logger.debug("results of forcing coordinator shutdown: %s" % cmd) |
| |
| def fetch_tli(self, data_dir_path, remoteHost=None): |
| if not remoteHost: |
| controldata = PgControlData("fetching pg_controldata locally", data_dir_path) |
| else: |
| controldata = PgControlData("fetching pg_controldata remotely", data_dir_path, REMOTE, remoteHost) |
| |
| controldata.run(validateAfter=True) |
| return int(controldata.get_value("Latest checkpoint's TimeLineID")) |
| |
| class StandbyUnreachable(Exception): |
| pass |
| |
| def _standby_activated(self): |
| logger.debug("Checking if standby has been activated...") |
| |
| if not self.gparray.standbyCoordinator: |
| return False |
| |
| # fetch timelineids for both primary and standby (post-promote) |
| primary_tli = self.fetch_tli(self.coordinator_datadir) |
| try: |
| standby_tli = self.fetch_tli(self.gparray.standbyCoordinator.getSegmentDataDirectory(), |
| self.gparray.standbyCoordinator.getSegmentHostName()) |
| except base.ExecutionError as err: |
| raise GpStart.StandbyUnreachable(err) |
| |
| logger.debug("Primary TLI = %d" % primary_tli) |
| logger.debug("Standby TLI = %d" % standby_tli) |
| return primary_tli < standby_tli |
| |
| def check_standby(self): |
| try: |
| standby_activated = self._standby_activated() |
| except GpStart.StandbyUnreachable as err: |
| logger.warning("Standby host is unreachable, cannot determine whether the standby is currently acting as the coordinator. Received error: %s" % err) |
| logger.warning("Continue only if you are certain that the standby is not acting as the coordinator.") |
| if not self.interactive or not userinput.ask_yesno(None, "\nContinue with startup", 'N'): |
| if not self.interactive: |
| logger.warning("Non interactive mode detected. Not starting the cluster. Start the cluster in interactive mode.") |
| self.shutdown_coordinator_only() |
| raise UserAbortedException() |
| |
| # If the user wants to continue when the standby is unreachable, |
| # set start_standby to False to prevent starting the unreachable |
| # standy later in the startup process. |
| self.start_standby = False |
| return |
| |
| if not standby_activated: |
| return |
| |
| # stop the coordinator we've started up. |
| cmd = gp.GpStop("Shutting down coordinator", coordinatorOnly=True, |
| fast=True, quiet=logging_is_quiet(), |
| verbose=logging_is_verbose(), |
| datadir=self.coordinator_datadir, |
| parallel=self.parallel) |
| cmd.run(validateAfter=True) |
| logger.info("Coordinator Stopped...") |
| raise ExceptionNoStackTraceNeeded("Standby activated, this node no more can act as coordinator.") |
| |
| ###### |
| def _recovery_startup(self): |
| logger.info("Commencing recovery startup checks") |
| |
| lockfile = "/tmp/.s.PGSQL.%s" % self.port |
| tmpfile_exists = os.path.exists(lockfile) |
| |
| ss_port_active = unix.PgPortIsActive.local('check ss for coordinator port', |
| lockfile, self.port) |
| if tmpfile_exists and ss_port_active: |
| logger.info("Have lock file %s and a process running on port %s" % (lockfile, self.port)) |
| raise ExceptionNoStackTraceNeeded("Coordinator instance process running") |
| elif tmpfile_exists and not ss_port_active: |
| logger.info("Have lock file %s but no process running on port %s" % (lockfile, self.port)) |
| elif not tmpfile_exists and ss_port_active: |
| logger.info("No lock file %s but a process running on port %s" % (lockfile, self.port)) |
| raise ExceptionNoStackTraceNeeded("Port %s is already in use" % self.port) |
| elif not tmpfile_exists and not ss_port_active: |
| logger.info("No socket connection or lock file in /tmp found for port=%s" % self.port) |
| |
| logger.info("No Coordinator instance process, entering recovery startup mode") |
| |
| if tmpfile_exists: |
| logger.info("Clearing Coordinator instance lock files") |
| os.remove(lockfile) |
| |
| postmaster_pid_file = "%s/postmaster.pid" % self.coordinator_datadir |
| if os.path.exists(postmaster_pid_file): |
| logger.info("Clearing Coordinator instance pid file") |
| os.remove("%s/postmaster.pid" % self.coordinator_datadir) |
| |
| self._startCoordinator() |
| |
| logger.info("Commencing forced instance shutdown") |
| |
| gp.GpStop.local("forcing coordinator shutdown", coordinatorOnly=True, |
| verbose=logging_is_verbose, |
| quiet=self.quiet, fast=False, |
| force=True, datadir=self.coordinator_datadir, parallel=self.parallel) |
| |
| ###### |
| def _remove_postmaster_tmpfile(self, port): |
| lockfile = "/tmp/.s.PGSQL.%s" % port |
| tmpfile_exists = os.path.exists(lockfile) |
| |
| if tmpfile_exists: |
| logger.info("Clearing Coordinator instance lock files") |
| os.remove(lockfile) |
| pass |
| |
| ###### |
| def _summarize_actions(self, segmentsToStart): |
| logger.info("--------------------------") |
| logger.info("Coordinator instance parameters") |
| logger.info("--------------------------") |
| logger.info("Database = %s" % self.dburl.pgdb) |
| logger.info("Coordinator Port = %s" % self.port) |
| logger.info("Coordinator directory = %s" % self.coordinator_datadir) |
| logger.info("Timeout = %d seconds" % self.timeout) |
| if self.gparray.standbyCoordinator: |
| if self.start_standby: |
| logger.info("Coordinator standby start = On") |
| else: |
| logger.info("Coordinator standby start = Off") |
| else: |
| logger.info("Coordinator standby = Off ") |
| |
| logger.info("--------------------------------------") |
| logger.info("Segment instances that will be started") |
| logger.info("--------------------------------------") |
| |
| isFileReplication = self.gparray.hasMirrors |
| |
| tabLog = TableLogger().setWarnWithArrows(True) |
| header = ["Host", "Datadir", "Port"] |
| if isFileReplication: |
| header.append("Role") |
| tabLog.info(header) |
| for db in segmentsToStart: |
| line = [db.getSegmentHostName(), db.getSegmentDataDirectory(), str(db.getSegmentPort())] |
| if isFileReplication: |
| line.append("Primary" if db.isSegmentPrimary(True) else "Mirror") |
| tabLog.info(line) |
| tabLog.outputTable() |
| |
| ###### |
| def _get_format_string(self): |
| host_len = 0 |
| dir_len = 0 |
| port_len = 0 |
| for db in self.gparray.getSegDbList(): |
| if len(db.hostname) > host_len: |
| host_len = len(db.hostname) |
| if len(db.datadir) > dir_len: |
| dir_len = len(db.datadir) |
| if len(str(db.port)) > port_len: |
| port_len = len(str(db.port)) |
| |
| return "%-" + str(host_len) + "s %-" + str(dir_len) + "s %-" + str(port_len) + "s %s" |
| |
| ###### |
| def _startCoordinator(self): |
| if self.restricted: |
| logger.info("Starting Coordinator instance in admin and RESTRICTED mode") |
| else: |
| logger.info("Starting Coordinator instance in admin mode") |
| |
| cmd = gp.CoordinatorStart('Coordinator in utility mode with restricted set to {0}'.format(self.restricted), |
| self.coordinator_datadir, self.port, self.era, wrapper=self.wrapper, |
| wrapper_args=self.wrapper_args, specialMode=self.specialMode, |
| restrictedMode=self.restricted, timeout=self.timeout, utilityMode=True, |
| max_connections=self.max_connections) |
| cmd.run() |
| |
| if cmd.get_results().rc != 0: |
| if self.restricted: |
| logger.fatal("Failed to start Coordinator instance in admin and RESTRICTED mode") |
| else: |
| logger.fatal("Failed to start Coordinator instance in admin mode") |
| cmd.validate() |
| |
| logger.info("Obtaining Cloudberry Coordinator catalog information") |
| |
| logger.info("Obtaining Segment details from coordinator...") |
| self.dburl = dbconn.DbURL(port=self.port, dbname='template1') |
| self.gparray = GpArray.initFromCatalog(self.dburl, utility=True) |
| |
| logger.info("Setting new coordinator era") |
| e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose()) |
| e.new_era(self.gparray.coordinator.hostname, self.port, time.strftime('%y%m%d%H%M%S')) |
| self.era = e.era |
| |
| ###### |
| def _start(self, segmentsToStart, invalidSegments): |
| """ starts all of the segments, the coordinator and the standby coordinator |
| |
| returns whether all segments that should be started were started successfully |
| |
| note that the parameters do not list coordinator/standby, they only list data segments |
| """ |
| workers = min(len(self.gparray.get_hostlist()), self.parallel) |
| self.pool = base.WorkerPool(numWorkers=workers) |
| |
| if os.path.exists(self.coordinator_datadir + "/gpexpand.status") and not self.restricted: |
| raise ExceptionNoStackTraceNeeded( |
| "Found a System Expansion Setup in progress. Please run 'gpexpand --rollback'") |
| |
| logger.debug("gparray does%s have mirrors" % ("" if self.gparray.hasMirrors else " not")) |
| |
| if self.gparray.hasMirrors: |
| startMode = START_AS_PRIMARY_OR_MIRROR |
| else: |
| startMode = START_AS_MIRRORLESS |
| |
| # this will eventually start gpsegstart.py |
| segmentStartOp = StartSegmentsOperation(self.pool, self.quiet, self.gpversion, |
| self.gphome, self.coordinator_datadir, self.coordinator_checksum_value, |
| self.timeout, self.specialMode, self.wrapper, self.wrapper_args, self.parallel, |
| logfileDirectory=self.logfileDirectory) |
| segmentStartResult = segmentStartOp.startSegments(self.gparray, segmentsToStart, startMode, self.era) |
| |
| # see if we have at least one segment per content |
| willShutdownSegments = not self._verify_enough_segments(segmentStartResult, self.gparray) |
| |
| # process the result of segment startup |
| self._print_segment_start(segmentStartResult, invalidSegments, willShutdownSegments) |
| |
| if willShutdownSegments: |
| # go through and remove any segments that we did start so that we keep everything |
| # shutdown cleanly |
| self._shutdown_segments(segmentStartResult) |
| raise ExceptionNoStackTraceNeeded("Do not have enough valid segments to start the array.") |
| |
| failedToStart = segmentStartResult.getFailedSegmentObjs() |
| coordinator_result, message = self._start_final_coordinator() |
| if not coordinator_result: |
| return False |
| |
| # start standby after coordinator in dispatch mode comes up |
| standby_was_started = self._start_standby() |
| |
| # report if we complete operations |
| return self._check_final_result( |
| not standby_was_started and self.attempt_standby_start, |
| failedToStart, invalidSegments, message) |
| |
| ###### |
| def _prepare_segment_start(self): |
| segs = self.gparray.get_valid_segdbs() |
| |
| logger.debug("gp_segment_configuration indicates following valid segments") |
| for seg in segs: |
| logger.debug("SegDB: %s" % seg) |
| |
| # segments marked down |
| invalid_segs = self.gparray.get_invalid_segdbs() |
| |
| for seg in invalid_segs: |
| logger.warning("Skipping startup of segment marked down in configuration: on %s directory %s <<<<<" % \ |
| (seg.getSegmentHostName(), seg.getSegmentDataDirectory())) |
| |
| return (segs, invalid_segs) |
| |
| #### |
| def _verify_enough_segments(self, startResult, gparray): |
| successfulSegments = startResult.getSuccessfulSegments() |
| |
| allSegmentsByContent = GpArray.getSegmentsByContentId(gparray.getSegDbList()) |
| successfulSegmentsByDbId = GpArray.getSegmentsGroupedByValue(successfulSegments, Segment.getSegmentDbId) |
| |
| # |
| # look at each content, see if there is a segment available (or one |
| # which can be made available by failing over) |
| # |
| for primary in gparray.getSegDbList(): |
| |
| if not primary.isSegmentPrimary(current_role=True): |
| continue |
| |
| # find the mirror |
| segs = allSegmentsByContent[primary.getSegmentContentId()] |
| mirror = None |
| if len(segs) > 1: |
| mirror = [s for s in segs if s.isSegmentMirror(current_role=True)][0] |
| |
| if primary.getSegmentDbId() in successfulSegmentsByDbId: |
| # good, we can continue! |
| continue |
| |
| if mirror is not None \ |
| and mirror.getSegmentDbId() in successfulSegmentsByDbId \ |
| and primary.isSegmentModeSynchronized(): |
| # |
| # we could fail over to that mirror, so it's okay to start up like this |
| # |
| continue |
| |
| logger.error("No segment started for content: %d." % primary.getSegmentContentId()) |
| logger.info("dumping success segments: %s" % [s.__str__() for s in startResult.getSuccessfulSegments()]) |
| return False |
| return True |
| |
| ###### |
| def _shutdown_segments(self, segmentStartResult): |
| |
| logger.info("Commencing parallel segment instance shutdown, please wait...") |
| |
| # |
| # Note that a future optimization would be to only stop the segments that we actually started. |
| # This requires knowing which ones are left in a partially up state |
| # |
| # |
| # gather the list of those that we actually tried to start |
| toStop = [] |
| toStop.extend(segmentStartResult.getSuccessfulSegments()) |
| toStop.extend([f.getSegment() for f in segmentStartResult.getFailedSegmentObjs()]) |
| |
| segmentsByHost = GpArray.getSegmentsByHostName(toStop) |
| |
| # |
| # stop them, stopping primaries before mirrors |
| # |
| for type in ["primary", "mirror"]: |
| for hostName, segments in segmentsByHost.items(): |
| |
| if type == "primary": |
| segments = [seg for seg in segments if seg.isSegmentPrimary(current_role=True)] |
| else: |
| segments = [seg for seg in segments if seg.isSegmentMirror(current_role=True)] |
| |
| if len(segments) > 0: |
| logger.debug("Dispatching command to shutdown %s segments on host: %s" % (type, hostName)) |
| cmd = gp.GpSegStopCmd("remote segment stop on host '%s'" % hostName, |
| self.gphome, self.gpversion, |
| mode='immediate', dbs=segments, |
| verbose=logging_is_verbose(), |
| ctxt=base.REMOTE, remoteHost=hostName) |
| self.pool.addCommand(cmd) |
| |
| if self.quiet: |
| self.pool.join() |
| else: |
| base.join_and_indicate_progress(self.pool) |
| |
| ###### |
| def _print_segment_start(self, segmentStartResult, invalidSegments, willShutdownSegments): |
| """ |
| Print the results of segment startup |
| |
| segmentStartResult is the StartSegmentsResult from the actual start |
| invalidSegments are those that we didn't even try to start because they are marked as down or should otherwise |
| not be started |
| """ |
| started = len(segmentStartResult.getSuccessfulSegments()) |
| failed = len(segmentStartResult.getFailedSegmentObjs()) |
| totalTriedToStart = started + failed |
| |
| if failed or logging_is_verbose(): |
| logger.info("----------------------------------------------------") |
| for failure in segmentStartResult.getFailedSegmentObjs(): |
| segment = failure.getSegment() |
| logger.info("DBID:%d FAILED host:'%s' datadir:'%s' with reason:'%s'" % ( |
| segment.getSegmentDbId(), segment.getSegmentHostName(), |
| segment.getSegmentDataDirectory(), failure.getReason())) |
| for segment in segmentStartResult.getSuccessfulSegments(): |
| logger.debug("DBID:%d STARTED" % segment.getSegmentDbId()) |
| logger.info("----------------------------------------------------\n\n") |
| |
| tableLog = TableLogger().setWarnWithArrows(True) |
| |
| tableLog.addSeparator() |
| tableLog.info(["Successful segment starts", "= %d" % started]) |
| |
| tableLog.infoOrWarn(failed, ["Failed segment starts", "= %d" % failed]) |
| |
| tableLog.infoOrWarn(len(invalidSegments) > 0, |
| ["Skipped segment starts (segments are marked down in configuration)", |
| "= %d" % len(invalidSegments)]) |
| tableLog.addSeparator() |
| tableLog.outputTable() |
| |
| attentionFlag = "<<<<<<<<" if started != totalTriedToStart else "" |
| if len(invalidSegments) > 0: |
| skippedMsg = ", skipped %s other segments" % len(invalidSegments) |
| else: |
| skippedMsg = "" |
| logger.info("Successfully started %d of %d segment instances%s %s" % |
| (started, totalTriedToStart, skippedMsg, attentionFlag)) |
| logger.info("----------------------------------------------------") |
| |
| if failed: |
| logger.warning("Segment instance startup failures reported") |
| logger.warning("Failed start %d of %d segment instances %s" % \ |
| (failed, totalTriedToStart, attentionFlag)) |
| logger.warning("Review %s" % get_logfile()) |
| if not willShutdownSegments: |
| logger.warning("For more details on segment startup failure(s)") |
| logger.warning("Run gpstate -s to review current segment instance status") |
| |
| logger.info("----------------------------------------------------") |
| |
| if len(invalidSegments) > 0: |
| logger.warning("****************************************************************************") |
| logger.warning("There are %d segment(s) marked down in the database" % len(invalidSegments)) |
| logger.warning("To recover from this current state, review usage of the gprecoverseg") |
| logger.warning("management utility which will recover failed segment instance databases.") |
| logger.warning("****************************************************************************") |
| |
| ###### |
| def _check_final_result(self, standbyFailure, |
| failedToStart, invalidSegments, msg): |
| if standbyFailure: |
| logger.warning("Standby Coordinator could not be started") |
| if len(failedToStart) > 0: |
| logger.warning("Number of segments which failed to start: %d", len(failedToStart)) |
| |
| if standbyFailure or len(failedToStart) > 0: |
| return False |
| |
| if len(invalidSegments) > 0: |
| logger.warning("Number of segments not attempted to start: %d", len(invalidSegments)) |
| |
| if (len(failedToStart) > 0 or len(invalidSegments) > 0 or |
| msg is not None): |
| logger.info("Check status of database with gpstate utility") |
| else: |
| logger.info("Database successfully started") |
| |
| return True |
| |
| ###### |
| def _start_final_coordinator(self): |
| """ Last item in the startup sequence is to start the coordinator. |
| |
| After starting the coordinator we connect to it. This is done both as a check that the system is |
| actually started but its also done because certain backend processes don't get kickstarted |
| until the first connection. The DTM is an example of this and it allows those initialization |
| messages to end up in the gpstart log as opposed to the user's psql session. |
| Returns a tuple of (result[bool], message[string]) |
| """ |
| restrict_txt = "" |
| if self.restricted: |
| restrict_txt = "in RESTRICTED mode" |
| |
| logger.info("Starting Coordinator instance %s directory %s %s" % ( |
| self.gparray.coordinator.hostname, self.coordinator_datadir, restrict_txt)) |
| |
| # attempt to start coordinator |
| gp.CoordinatorStart.local("Starting Coordinator instance", |
| self.coordinator_datadir, self.port, self.era, |
| wrapper=self.wrapper, wrapper_args=self.wrapper_args, |
| specialMode=self.specialMode, restrictedMode=self.restricted, timeout=self.timeout, |
| max_connections=self.max_connections |
| ) |
| |
| # check that coordinator is running now |
| if not pg.DbStatus.local('coordinator instance', self.gparray.coordinator): |
| logger.warning("Command pg_ctl reports Coordinator %s on port %d not running" % ( |
| self.gparray.coordinator.datadir, self.gparray.coordinator.port)) |
| logger.warning("Coordinator could not be started") |
| return False, None |
| |
| logger.info("Command pg_ctl reports Coordinator %s instance active" % self.gparray.coordinator.hostname) |
| |
| msg = None |
| try: |
| self.dburl.retries = 4 |
| self.dburl.timeout = 15 |
| coordinatorconn = dbconn.connect(self.dburl) |
| coordinatorconn.close() |
| |
| except Exception as e: |
| # MPP-14016. While certain fts scenarios will trigger initial connection failures |
| # we still need watch for PANIC events. |
| msg = str(e) |
| if 'PANIC' in msg: |
| logger.critical(msg) |
| return False |
| logger.warning(msg) |
| |
| # set the era we used when starting the segments |
| e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose()) |
| e.set_era(self.era) |
| |
| return True, msg |
| |
| ###### |
| def _start_standby(self): |
| """ used to start the standbycoordinator if necessary. |
| |
| returns if the standby coordinator was started or not |
| """ |
| if self.start_standby and self.gparray.standbyCoordinator is not None: |
| try: |
| self.attempt_standby_start = True |
| host = self.gparray.standbyCoordinator.hostname |
| datadir = self.gparray.standbyCoordinator.datadir |
| port = self.gparray.standbyCoordinator.port |
| return gp.start_standbycoordinator(host, datadir, port, era=self.era, |
| wrapper=self.wrapper, |
| wrapper_args=self.wrapper_args) |
| except base.ExecutionError as e: |
| logger.warning("Error occured while starting the standby coordinator: %s" % e) |
| return False |
| else: |
| logger.info("No standby coordinator configured. skipping...") |
| return False |
| |
| # ----------------------- Command line option parser ---------------------- |
| @staticmethod |
| def createParser(): |
| parser = OptParser(option_class=OptChecker, |
| description="Starts a CBDB Array.", |
| version='%prog version $Revision$') |
| parser.setHelp([]) |
| |
| addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption=True) |
| |
| addTo = OptionGroup(parser, 'Connection options') |
| parser.add_option_group(addTo) |
| addCoordinatorDirectoryOptionForSingleClusterProgram(addTo) |
| |
| addTo = OptionGroup(parser, 'Database startup options: ') |
| parser.add_option_group(addTo) |
| addTo.add_option('-U', '--specialMode', type='choice', choices=['maintenance'], |
| metavar='maintenance', action='store', default=None, |
| help=SUPPRESS_HELP) |
| addTo.add_option('-m', '--master_only', '-c', '--coordinator_only', dest="coordinator_only", action='store_true', |
| help='start coordinator instance only in maintenance mode') |
| addTo.add_option('-y', '--no_standby', dest="start_standby", action='store_false', default=True, |
| help='do not start coordinator standby server') |
| addTo.add_option('-B', '--parallel', type="int", default=gp.DEFAULT_GPSTART_NUM_WORKERS, metavar="<parallel_processes>", |
| help='number of segment hosts to run in parallel. Default is %d' % gp.DEFAULT_GPSTART_NUM_WORKERS) |
| addTo.add_option('-R', '--restricted', action='store_true', |
| help='start in restricted mode. Only users with superuser privilege are allowed to connect.') |
| addTo.add_option('-t', '--timeout', dest='timeout', default=SEGMENT_TIMEOUT_DEFAULT, type='int', |
| help='time to wait for segment startup (in seconds)') |
| addTo.add_option('', '--wrapper', dest="wrapper", default=None, type='string') |
| addTo.add_option('', '--wrapper-args', dest="wrapper_args", default=None, type='string') |
| addTo.add_option('-S', '--skip_standby_check', dest="skip_standby_check", action='store_true', default=False) |
| addTo.add_option('--skip-heap-checksum-validation', dest='skip_heap_checksum_validation', |
| action='store_true', default=False, help='Skip the validation of data_checksums GUC. ' |
| 'Note: Starting up the cluster without this ' |
| 'validation could lead to data loss.') |
| addTo.add_option('-F', dest='fts_hosts', type='string',default=None , |
| help='specify the file that contains all fts hosts.If this argument is set, `gpstart` will attempt' |
| 'to start all fts in the specified hosts, if not, `gpstart` will start all fts spectified in the' |
| '$COORDINATOR_DATA_DIRECTORY/config/fts_host') |
| addTo.add_option('-E', dest='etcd_hosts', type='string',default=None , |
| help='specify the file that contains all etcd hosts.If this argument is set, `gpstart` will attempt' |
| 'to start all etcd in the specified hosts') |
| |
| parser.set_defaults(verbose=False, filters=[], slice=(None, None)) |
| |
| return parser |
| |
| @staticmethod |
| def createProgram(options, args): |
| logfileDirectory = options.ensure_value("logfileDirectory", False) |
| proccount = os.environ.get('GP_MGMT_PROCESS_COUNT') |
| external_fts = is_external_fts() |
| if options.parallel == 64 and proccount is not None: |
| options.parallel = int(proccount) |
| |
| if external_fts and (options.fts_hosts or options.etcd_hosts): |
| ProgramArgumentValidationException("internal fts not suopport -F and -E") |
| |
| # -n sanity check |
| if options.parallel > 128 or options.parallel < 1: |
| raise ProgramArgumentValidationException("Invalid value for parallel degree: %s" % options.parallel) |
| |
| if args: |
| raise ProgramArgumentValidationException( |
| "Argument %s is invalid. Is an option missing a parameter?" % args[-1]) |
| |
| if is_external_fts: |
| if options.fts_hosts is None: |
| coordinator_data_directory = gp.get_coordinatordatadir() |
| options.fts_hosts = coordinator_data_directory + '/config' + '/fts_host' |
| |
| return GpStart(options.specialMode, options.restricted, |
| options.start_standby, |
| coordinator_datadir=options.coordinatorDataDirectory, |
| parallel=options.parallel, |
| quiet=options.quiet, |
| coordinatoronly=options.coordinator_only, |
| interactive=options.interactive, |
| timeout=options.timeout, |
| wrapper=options.wrapper, |
| wrapper_args=options.wrapper_args, |
| skip_standby_check=options.skip_standby_check, |
| logfileDirectory=logfileDirectory, |
| skip_heap_checksum_validation=options.skip_heap_checksum_validation, |
| fts_hosts=options.fts_hosts, |
| etcd_hosts=options.etcd_hosts, |
| is_external_fts=external_fts |
| ) |
| |
| |
| if __name__ == '__main__': |
| simple_main(GpStart.createParser, GpStart.createProgram) |