| #!/usr/bin/env python3 |
| # Line too long - pylint: disable=C0301 |
| # Invalid name - pylint: disable=C0103 |
| # |
| # Copyright (c) EMC/Greenplum Inc 2011. All Rights Reserved. |
| # Copyright (c) Greenplum Inc 2008. All Rights Reserved. |
| # |
| """ |
| Internal Use Function. |
| """ |
| |
| # THIS IMPORT MUST COME FIRST |
| from gppylib.mainUtils import simple_main, addStandardLoggingAndHelpOptions |
| |
| import os, pickle, base64, time |
| |
| from gppylib.gpparseopts import OptParser, OptChecker |
| from gppylib import gparray, gplog |
| from gppylib.commands import base, gp |
| from gppylib.utils import parseKeyColonValueLines |
| from gppylib.commands.pg import PgControlData |
| |
| logger = gplog.get_default_logger() |
| |
| |
| DESCRIPTION = """ |
| This utility is NOT SUPPORTED and is for internal-use only. |
| Starts a set of one or more segment databases. |
| """ |
| |
| HELP = [""" |
| Utility should only be used by other GP utilities. |
| |
| Return codes: |
| 0 - All segments started successfully |
| 1 - At least one segment didn't start successfully |
| |
| """] |
| |
| |
| |
| |
| class StartResult: |
| """ |
| Recorded result information from an attempt to start one segment. |
| """ |
| |
| def __init__(self, datadir, started, reason, reasoncode): |
| """ |
| @param datadir |
| @param started |
| @param reason |
| @param reasoncode one of the gp.SEGSTART_* values |
| """ |
| self.datadir = datadir |
| self.started = started |
| self.reason = reason |
| self.reasoncode = reasoncode |
| |
| def __str__(self): |
| return "".join([ |
| "STATUS", |
| "--DIR:", str(self.datadir), |
| "--STARTED:", str(self.started), |
| "--REASONCODE:", str(self.reasoncode), |
| "--REASON:", str(self.reason) |
| ]) |
| |
| |
| class OverallStatus: |
| """ |
| Mapping and segment status information for all segments on this host. |
| """ |
| |
| def __init__(self, dblist): |
| """ |
| Build the datadir->segment mapping and remember the original size. |
| Since segments which fail to start will be removed from the mapping, |
| we later test the size of the map against the original size when |
| returning the appropriate status code to the caller. |
| """ |
| self.dirmap = dict([(seg.getSegmentDataDirectory(), seg) for seg in dblist]) |
| self.original_length = len(self.dirmap) |
| self.results = [] |
| self.logger = logger |
| |
| |
| def mark_failed(self, datadir, msg, reasoncode): |
| """ |
| Mark a segment as failed during some startup process. |
| Remove the entry for the segment from dirmap. |
| |
| @param datadir |
| @param msg |
| @param reasoncode one of the gp.SEGSTART_* constant values |
| """ |
| self.logger.info("Marking failed %s, %s, %s" % (datadir, msg, reasoncode)) |
| self.results.append( StartResult(datadir=datadir, started=False, reason=msg, reasoncode=reasoncode) ) |
| del self.dirmap[datadir] |
| |
| |
| def remaining_items_succeeded(self): |
| """ |
| Add results for all remaining items in our datadir->segment map. |
| """ |
| for datadir in list(self.dirmap.keys()): |
| self.results.append( StartResult(datadir=datadir, started=True, reason="Start Succeeded", reasoncode=gp.SEGSTART_SUCCESS ) ) |
| |
| |
| def log_results(self): |
| """ |
| Log info messages with our results |
| """ |
| status = '\nCOMMAND RESULTS\n' + "\n".join([str(result) for result in self.results]) |
| self.logger.info(status) |
| |
| |
| def exit_code(self): |
| """ |
| Return an appropriate exit code: 0 if no failures, 1 if some segments failed to start. |
| """ |
| if len(self.dirmap) != self.original_length: |
| return 1 |
| return 0 |
| |
| |
| |
| class GpSegStart: |
| """ |
| Logic to start segment servers on this host. |
| """ |
| |
| def __init__(self, dblist, gpversion, mirroringMode, num_cids, era, |
| timeout, pickledTransitionData, specialMode, wrapper, wrapper_args, |
| coordinator_checksum_version, segment_batch_size, logfileDirectory=False): |
| |
| # validate/store arguments |
| # |
| self.dblist = list(map(gparray.Segment.initFromString, dblist)) |
| |
| expected_gpversion = gpversion |
| actual_gpversion = gp.GpVersion.local('local software version check', os.path.abspath(os.pardir)) |
| if actual_gpversion != expected_gpversion: |
| raise Exception("Local Software Version does not match what is expected.\n" |
| "The local software version is: '%s'\n" |
| "But we were expecting it to be: '%s'\n" |
| "Please review and correct" % (actual_gpversion, expected_gpversion)) |
| |
| self.mirroringMode = mirroringMode |
| self.num_cids = num_cids |
| self.era = era |
| self.timeout = timeout |
| self.pickledTransitionData = pickledTransitionData |
| |
| assert(specialMode in [None, 'upgrade', 'maintenance']) |
| self.specialMode = specialMode |
| |
| self.wrapper = wrapper |
| self.wrapper_args = wrapper_args |
| |
| # initialize state |
| # |
| self.pool = base.WorkerPool(numWorkers=min(len(dblist), segment_batch_size)) |
| self.logger = logger |
| self.overall_status = None |
| |
| self.logfileDirectory = logfileDirectory |
| self.coordinator_checksum_version = coordinator_checksum_version |
| |
| def getOverallStatusKeys(self): |
| return list(self.overall_status.dirmap.keys()) |
| |
| # return True if all running |
| # return False if not all running |
| def checkPostmasters(self, must_be_running): |
| """ |
| Check that segment postmasters have been started. |
| @param must_be_running True if postmasters must be running by now. |
| """ |
| self.logger.info("Checking segment postmasters... (must_be_running %s)" % must_be_running) |
| |
| all_running = True |
| for datadir in self.getOverallStatusKeys(): |
| pid = gp.read_postmaster_pidfile(datadir) |
| running = gp.check_pid(pid) |
| msg = "Postmaster %s %srunning (pid %d)" % (datadir, "is " if running else "NOT ", pid) |
| self.logger.info(msg) |
| |
| if not running: |
| all_running = False |
| |
| if must_be_running and not running: |
| reasoncode = gp.SEGSTART_ERROR_PG_CTL_FAILED |
| self.overall_status.mark_failed(datadir, msg, reasoncode) |
| |
| return all_running |
| |
| |
| def __validateDirectoriesAndSetupRecoveryStartup(self): |
| """ |
| validate that the directories all exist and run recovery startup if needed |
| """ |
| self.logger.info("Validating directories...") |
| |
| for datadir in list(self.overall_status.dirmap.keys()): |
| self.logger.info("Validating directory: %s" % datadir) |
| |
| if os.path.isdir(datadir): |
| # |
| # segment datadir exists |
| # |
| postmaster_pid = os.path.join(datadir, 'postmaster.pid') |
| if os.path.exists(postmaster_pid): |
| self.logger.warning("postmaster.pid file exists, checking if recovery startup required") |
| |
| msg = gp.recovery_startup(datadir) |
| if msg: |
| reasoncode = gp.SEGSTART_ERROR_STOP_RUNNING_SEGMENT_FAILED |
| self.overall_status.mark_failed(datadir, msg, reasoncode) |
| |
| else: |
| # |
| # segment datadir does not exist |
| # |
| msg = "Segment data directory does not exist for: '%s'" % datadir |
| self.logger.warning(msg) |
| |
| reasoncode = gp.SEGSTART_ERROR_DATA_DIRECTORY_DOES_NOT_EXIST |
| self.overall_status.mark_failed(datadir, msg, reasoncode) |
| |
| |
| def __startSegments(self): |
| """ |
| Start the segments themselves |
| """ |
| self.logger.info("Starting segments... (mirroringMode %s)" % self.mirroringMode) |
| |
| for datadir, seg in list(self.overall_status.dirmap.items()): |
| |
| if self.coordinator_checksum_version != None: |
| cmd = PgControlData(name='run pg_controldata', datadir=datadir) |
| cmd.run(validateAfter=True) |
| res = cmd.get_results() |
| |
| if res.rc != 0: |
| msg = "pg_controldata failed.\nstdout:%s\nstderr:%s\n" % (res.stdout, res.stderr) |
| reasoncode = gp.SEGSTART_ERROR_PG_CONTROLDATA_FAILED |
| self.overall_status.mark_failed(datadir, msg, reasoncode) |
| continue |
| |
| segment_heap_checksum_version = cmd.get_value('Data page checksum version') |
| if segment_heap_checksum_version != self.coordinator_checksum_version: |
| msg = "Segment checksum %s does not match coordinator checksum %s.\n" % (segment_heap_checksum_version, |
| self.coordinator_checksum_version) |
| reasoncode = gp.SEGSTART_ERROR_CHECKSUM_MISMATCH |
| self.overall_status.mark_failed(datadir, msg, reasoncode) |
| continue |
| |
| cmd = gp.SegmentStart("Starting seg at dir %s" % datadir, |
| seg, |
| self.num_cids, |
| self.era, |
| self.mirroringMode, |
| timeout=self.timeout, |
| specialMode=self.specialMode, |
| wrapper=self.wrapper, |
| wrapper_args=self.wrapper_args) |
| self.pool.addCommand(cmd) |
| |
| self.pool.join() |
| |
| for cmd in self.pool.getCompletedItems(): |
| res = cmd.get_results() |
| if res.rc != 0: |
| |
| # we should also read in last entries in startup.log here |
| |
| datadir = cmd.segment.getSegmentDataDirectory() |
| msg = "PG_CTL failed.\nstdout:%s\nstderr:%s\n" % (res.stdout, res.stderr) |
| reasoncode = gp.SEGSTART_ERROR_PG_CTL_FAILED |
| self.overall_status.mark_failed(datadir, msg, reasoncode) |
| |
| self.pool.empty_completed_items() |
| |
| def run(self): |
| """ |
| Logic to start the segments. |
| """ |
| |
| # we initialize an overall status object which maintains a mapping |
| # from each segment's data directory to the segment object as well |
| # as a list of specific success/failure results. |
| # |
| self.overall_status = OverallStatus(self.dblist) |
| |
| # Each of the next four steps executes operations which may cause segment |
| # details to be removed from the mapping and recorded as failures. |
| # |
| self.__validateDirectoriesAndSetupRecoveryStartup() |
| self.__startSegments() |
| |
| # Being paranoid, we frequently check for postmaster failures. |
| # The postmasters should be running by now. |
| # |
| self.checkPostmasters(must_be_running=True) |
| |
| # At this point any segments remaining in the mapping are assumed to |
| # have successfully started. |
| # |
| self.overall_status.remaining_items_succeeded() |
| self.overall_status.log_results() |
| return self.overall_status.exit_code() |
| |
| |
| |
| def cleanup(self): |
| """ |
| Cleanup worker pool resources |
| """ |
| if self.pool: |
| self.pool.haltWork() |
| |
| |
| @staticmethod |
| def createParser(): |
| """ |
| Create parser expected by simple_main |
| """ |
| |
| parser = OptParser(option_class=OptChecker, |
| description=' '.join(DESCRIPTION.split()), |
| version='%prog version main build dev') |
| parser.setHelp(HELP) |
| |
| # |
| # Note that this mirroringmode parameter should only be either mirrorless or quiescent. |
| # If quiescent then it is implied that there is pickled transition data that will be |
| # provided (using -p) to immediately convert to a primary or a mirror. |
| # |
| addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption=False) |
| |
| parser.add_option("-D", "--dblist", dest="dblist", action="append", type="string") |
| parser.add_option("-M", "--mirroringmode", dest="mirroringMode", type="string") |
| parser.add_option("-p", "--pickledTransitionData", dest="pickledTransitionData", type="string") |
| parser.add_option("-V", "--gp-version", dest="gpversion", metavar="GP_VERSION", help="expected software version") |
| parser.add_option("-n", "--numsegments", dest="num_cids", help="number of distinct content ids in cluster") |
| parser.add_option("", "--era", dest="era", help="coordinator era") |
| parser.add_option("-t", "--timeout", dest="timeout", type="int", default=gp.SEGMENT_TIMEOUT_DEFAULT, |
| help="seconds to wait") |
| parser.add_option('-U', '--specialMode', type='choice', choices=['upgrade', 'maintenance'], |
| metavar='upgrade|maintenance', action='store', default=None, |
| help='start the instance in upgrade or maintenance mode') |
| parser.add_option('', '--wrapper', dest="wrapper", default=None, type='string') |
| parser.add_option('', '--wrapper-args', dest="wrapper_args", default=None, type='string') |
| parser.add_option('', '--coordinator-checksum-version', dest="coordinator_checksum_version", default=None, type='string', action="store") |
| parser.add_option('-b', '--segment-batch-size', type="int", dest="segment_batch_size", default=gp.DEFAULT_GPSTART_NUM_WORKERS, |
| help='Max number of segments per host to operate on in parallel.') |
| |
| return parser |
| |
| |
| @staticmethod |
| def createProgram(options, args): |
| """ |
| Create program expected by simple_main |
| """ |
| logfileDirectory = options.ensure_value("logfileDirectory", False) |
| return GpSegStart(options.dblist, |
| options.gpversion, |
| options.mirroringMode, |
| options.num_cids, |
| options.era, |
| options.timeout, |
| options.pickledTransitionData, |
| options.specialMode, |
| options.wrapper, |
| options.wrapper_args, |
| options.coordinator_checksum_version, |
| options.segment_batch_size, |
| logfileDirectory=logfileDirectory) |
| |
| #------------------------------------------------------------------------- |
| if __name__ == '__main__': |
| mainOptions = { 'setNonuserOnToolLogger':True} |
| simple_main( GpSegStart.createParser, GpSegStart.createProgram, mainOptions ) |