| #!/usr/bin/env python3 |
| ''' |
| Usage: gpcheckcat [<option>] [dbname] |
| |
| -? | --help : help menu |
| -B parallel : number of worker threads |
| -g dir : generate SQL to rectify catalog corruption, put it in dir |
| -p port : DB port number |
| -P passwd : DB password |
| -U uname : DB User Name |
| -v : verbose |
| -A : all databases |
| -S option : shared table options (none, only) |
| -O : Online |
| -l : list all tests |
| -R test | 'test1, test2' : run this particular test(s) (quoted, comma seperated list for multiple tests) |
| -s test | 'test1, test2' : skip this particular test(s) (quoted, comma seperated list for multiple tests) |
| -C catname : run cross consistency, FK and ACL tests for this catalog table |
| -x : set session level GUCs |
| |
| Test subset options are mutually exclusive, use only one of '-R', '-s', or '-C'. |
| |
| ''' |
| import datetime |
| import getopt |
| import logging |
| import re |
| import sys |
| import time |
| from functools import reduce |
| |
| try: |
| from gppylib import gplog |
| from gppylib.db import dbconn |
| from gppylib.gpcatalog import * |
| from gppylib.commands.unix import * |
| from gppylib.commands.gp import conflict_with_gpexpand |
| from gppylib.system.info import * |
| from pgdb import DatabaseError |
| from gpcheckcat_modules.unique_index_violation_check import UniqueIndexViolationCheck |
| from gpcheckcat_modules.leaked_schema_dropper import LeakedSchemaDropper |
| from gpcheckcat_modules.repair import Repair |
| from gpcheckcat_modules.foreign_key_check import ForeignKeyCheck |
| from gpcheckcat_modules.orphaned_toast_tables_check import OrphanedToastTablesCheck |
| |
| import pg |
| |
| except ImportError as e: |
| sys.exit('Error: unable to import module: ' + str(e)) |
| |
| # cache OID -> object name cache |
| oidmap = {} |
| |
| # ------------------------------------------------------------------------------- |
| EXECNAME = os.path.split(__file__)[-1] |
| gplog.setup_tool_logging(EXECNAME, getLocalHostname(), getUserName()) |
| gplog.very_quiet_stdout_logging() |
| logger = gplog.get_default_logger() |
| |
| sysinfo = SystemInfo(logger) |
| |
| parallelism = get_max_available_thread_count() |
| #------------------------------------------------------------------------------- |
| |
| ################ |
| def parse_int(val): |
| try: |
| val = int(val) |
| except ValueError: |
| val = 0 |
| return val |
| |
| |
| ############################### |
| def quote_value(name): |
| """Add surrounding single quote, double interior single quote.""" |
| assert isinstance(name, str) |
| return "'" + "''".join(name.split("'")) + "'" |
| |
| |
| SUCCESS = 0 |
| ERROR_REMOVE = 1 |
| ERROR_RESYNC = 2 |
| ERROR_NOREPAIR = 3 |
| |
| FIRST_NORMAL_OID = 16384 |
| PG_CATALOG_OID = 11 |
| |
| def setError(level): |
| ''' |
| Increases the error level to the specified level, if specified level is |
| lower than the existing level no change is made. |
| |
| error level 0 => success |
| error level 1 => error, with repair script removes objects |
| error level 2 => error, with repair script that resynchronizes objects |
| error level 3 => error, no repair script |
| ''' |
| GV.retcode = max(level, GV.retcode) |
| |
| def setScriptRetCode(level): |
| GV.script_retcode = max(level,GV.script_retcode) |
| |
| |
| ############################### |
| class Global(): |
| def __init__(self): |
| self.timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") |
| |
| self.retcode = SUCCESS |
| self.opt = {} |
| self.opt['-h'] = None |
| self.opt['-p'] = None |
| self.opt['-P'] = None |
| self.opt['-U'] = None |
| self.opt['-v'] = False |
| self.opt['-V'] = False |
| self.opt['-t'] = False |
| |
| self.opt['-g'] = 'gpcheckcat.repair.' + self.timestamp |
| self.opt['-B'] = parallelism |
| self.opt['-T'] = None |
| |
| self.opt['-A'] = False |
| self.opt['-S'] = None |
| self.opt['-O'] = False |
| |
| self.opt['-R'] = None |
| self.opt['-s'] = None |
| self.opt['-C'] = None |
| self.opt['-l'] = False |
| |
| self.opt['-E'] = False |
| self.opt['-x'] = [] |
| |
| self.coordinator_dbid = None |
| self.cfg = None |
| self.dbname = None |
| self.firstdb = None |
| self.alldb = [] |
| self.db = {} |
| self.tmpdir = None |
| |
| self.reset_stmt_queues() |
| |
| self.home = os.environ.get('HOME') |
| if self.home is None: |
| usage('Error: $HOME must be set') |
| self.user = os.environ.get('USER') or os.environ.get('LOGNAME') |
| if self.user is None: |
| usage('Error: either $USER or $LOGNAME must be set') |
| |
| self.catalog = None |
| self.max_content = 0 |
| self.report_cfg = {} |
| self.script_retcode = SUCCESS |
| |
| def reset_stmt_queues(self): |
| |
| # dictionary of SQL statements. Key is dbid |
| self.Remove = {} |
| |
| # dictionary of SQL statements. Key is dbid |
| self.AdjustConname = {} |
| |
| # dictionary of SQL statements. Key is dbid |
| self.DemoteConstraint = {} |
| |
| # array of SQL statements. Key is dbid |
| self.ReSync = {} |
| |
| # constraints which are actually unenforcable |
| self.Constraints = [] |
| |
| # ownership fixups |
| self.Owners = [] |
| |
| # fix distribution policies |
| self.Policies = [] |
| |
| # Indexes to rebuild |
| self.Reindex = [] |
| |
| self.missingEntryStatus = None |
| self.inconsistentEntryStatus = None |
| self.foreignKeyStatus = None |
| self.aclStatus = None |
| |
| # the following variables used for reporting purposes |
| self.elapsedTime = 0 |
| self.totalCheckRun = 0 |
| self.checkStatus = True |
| self.failedChecks = [] |
| self.missing_attr_tables = [] |
| self.extra_attr_tables = [] |
| |
| |
| GV = Global() |
| |
| |
| ############################### |
| def usage(exitarg=None): |
| print(__doc__) |
| sys.exit(exitarg) |
| |
| |
| ############################### |
| |
| def getversion(): |
| db = connect() |
| curs = db.query(''' |
| select regexp_replace(version(), |
| E'.*PostgreSQL [^ ]+ .Apache Cloudberry ([1-9]+.[0-9]+|main).*', |
| E'\\\\1') as ver;''') |
| |
| row = curs.getresult()[0] |
| version = row[0] |
| |
| logger.debug('got version %s' % version) |
| return version |
| |
| |
| ############################### |
| def getalldbs(): |
| """ |
| get all connectable databases |
| """ |
| db = connect() |
| curs = db.query(''' |
| select datname from pg_database where datallowconn order by datname ''') |
| row = curs.getresult() |
| return row |
| |
| |
| ############################### |
| def parseCommandLine(): |
| try: |
| # A colon following the flag indicates an argument is expected |
| (options, args) = getopt.getopt(sys.argv[1:], '?x:p:P:U:B:vg:t:AOS:R:s:C:lE', 'help') |
| except Exception as e: |
| usage('Error: ' + str(e)) |
| |
| for (switch, val) in options: |
| if switch in ('-?', '--help'): |
| usage(0) |
| elif switch[1] in 'pBPUgSRCs': |
| GV.opt[switch] = val |
| elif switch[1] in 'vtAOlE': |
| GV.opt[switch] = True |
| elif switch[1] in 'x': |
| GV.opt[switch].append(val) |
| |
| def setdef(x, v): |
| if not GV.opt[x]: |
| t = os.environ.get(v) |
| GV.opt[x] = t |
| if t: |
| logger.debug('%s not specified; default to %s (from %s)' % |
| (x, t, v)) |
| |
| if GV.opt['-l']: return |
| |
| if GV.opt['-v']: |
| gplog.enable_verbose_logging() |
| |
| setdef('-P', 'PGPASSWORD') |
| setdef('-U', 'PGUSER') |
| setdef('-U', 'USER') |
| setdef('-p', 'PGPORT') |
| if not GV.opt['-p']: |
| usage('Please specify -p port') |
| GV.opt['-p'] = parse_int(GV.opt['-p']) |
| |
| GV.opt['-h'] = 'localhost' |
| logger.debug('Set default hostname to %s' % GV.opt['-h']) |
| |
| if GV.opt['-R']: |
| logger.debug('Run this test: %s' % GV.opt['-R']) |
| |
| if GV.opt['-s']: |
| logger.debug('Skip these tests: %s' % GV.opt['-s']) |
| |
| if GV.opt['-C']: |
| GV.opt['-C'] = GV.opt['-C'].lower() |
| logger.debug('Run cross consistency test for table: %s' % GV.opt['-C']) |
| |
| if (len(args) != 0 and len(args) != 1): |
| usage('Too many arguments') |
| |
| if len(args) == 0: |
| GV.dbname = os.environ.get('PGDATABASE') |
| if GV.dbname is None: |
| GV.dbname = 'template1' |
| logger.debug('database is %s' % GV.dbname) |
| else: |
| GV.dbname = args[0] |
| |
| GV.firstdb = GV.dbname |
| GV.alldb.append(GV.firstdb) |
| |
| # build list of all connectable databases |
| if GV.opt['-A']: |
| alldb = getalldbs() |
| GV.alldb.pop() |
| for adbname in alldb: |
| GV.alldb.append(adbname[0]) |
| |
| if GV.opt['-S']: |
| if not re.match("none|only", GV.opt['-S'], re.I): |
| usage('Error: invalid value \'%s\' for shared table option. Legal values are (none, only)' % GV.opt['-S']) |
| GV.opt['-S'] = GV.opt['-S'].lower() |
| |
| logger.debug('coordinator at host %s, port %s, user %s, database %s' % |
| (GV.opt['-h'], GV.opt['-p'], GV.opt['-U'], GV.dbname)) |
| |
| try: |
| GV.opt['-B'] = int(GV.opt['-B']) |
| except Exception as e: |
| usage('Error: ' + str(e)) |
| |
| if GV.opt['-B'] < 1: |
| usage('Error: parallelism must be 1 or greater') |
| |
| logger.debug('degree of parallelism: %s' % GV.opt['-B']) |
| |
| |
| ############# |
| def connect(user=None, password=None, host=None, port=None, |
| database=None, utilityMode=False): |
| '''Connect to DB using parameters in GV''' |
| # unset search path due to CVE-2018-1058 |
| options = '-c search_path=' |
| if utilityMode: |
| options += ' -c gp_role=utility' |
| for val in GV.opt['-x']: |
| options += ' -c {}'.format(val) |
| |
| if not user: user = GV.opt['-U'] |
| if not password: password = GV.opt['-P'] |
| if not host: host = GV.opt['-h'] |
| if not port: port = GV.opt['-p'] |
| if not database: database = GV.dbname |
| |
| try: |
| logger.debug('connecting to %s:%s %s' % (host, port, database)) |
| db = pg.connect(host=host, port=port, user=user, |
| passwd=password, dbname=database, opt=options) |
| |
| except pg.InternalError as ex: |
| logger.fatal('could not connect to %s: "%s"' % |
| (database, str(ex).strip())) |
| exit(1) |
| |
| logger.debug('connected with %s:%s %s' % (host, port, database)) |
| return db |
| |
| |
| ############# |
| def connect2(cfgrec, user=None, password=None, database=None, utilityMode=True): |
| host = cfgrec['address'] |
| port = cfgrec['port'] |
| datadir = cfgrec['datadir'] |
| logger.debug('connect %s:%s:%s' % (host, port, datadir)) |
| if not database: database = GV.dbname |
| if cfgrec['content'] == -1: |
| utilityMode = False |
| |
| key = "%s.%s.%s.%s.%s.%s.%s" % (host, port, datadir, user, password, database, |
| str(utilityMode)) |
| conns = GV.db.get(key) |
| if conns: |
| return conns[0] |
| |
| conn = connect(host=host, port=port, user=user, password=password, |
| database=database, utilityMode=utilityMode) |
| if conn: |
| GV.db[key] = [conn, cfgrec] |
| |
| return conn |
| |
| |
| class execThread(Thread): |
| def __init__(self, cfg, db, qry): |
| self.cfg = cfg |
| self.db = db |
| self.qry = qry |
| self.curs = None |
| self.error = None |
| Thread.__init__(self) |
| |
| def run(self): |
| try: |
| self.curs = self.db.query(self.qry) |
| except BaseException as e: |
| self.error = e |
| |
| |
| def processThread(threads): |
| batch = [] |
| for th in threads: |
| logger.debug('waiting on thread %s' % th.name) |
| th.join() |
| if th.error: |
| setError(ERROR_NOREPAIR) |
| myprint("%s:%d:%s : %s" % |
| (th.cfg['hostname'], |
| th.cfg['port'], |
| th.cfg['datadir'], |
| str(th.error))) |
| else: |
| batch.append([th.cfg, th.curs]) |
| return batch |
| |
| |
| ############# |
| def connect2run(qry, col=None): |
| logger.debug('%s' % qry) |
| |
| batch = [] |
| threads = [] |
| i = 1 |
| |
| # parallelise queries |
| for dbid in GV.cfg: |
| c = GV.cfg[dbid] |
| db = connect2(c) |
| |
| thread = execThread(c, db, qry) |
| thread.start() |
| logger.debug('launching query thread %s for dbid %i' % |
| (thread.name, dbid)) |
| threads.append(thread) |
| |
| # we don't want too much going on at once |
| if (i % GV.opt['-B']) == 0: |
| # process this batch of threads |
| batch.extend(processThread(threads)) |
| threads = [] |
| |
| i += 1 |
| |
| # process the rest of threads |
| batch.extend(processThread(threads)) |
| |
| err = [] |
| for [cfg, curs] in batch: |
| if col is None: |
| col = curs.listfields() |
| for row in curs.dictresult(): |
| err.append([cfg, col, row]) |
| |
| return err |
| |
| |
| def formatErr(c, col, row): |
| s = ('%s:%s:%s, content %s, dbid %s' % |
| (c['hostname'], c['port'], c['datadir'], |
| c['content'], c['dbid'])) |
| for i in col: |
| s = '%s, %s %s' % (s, i, row[i]) |
| return s |
| |
| |
| ############# |
| def getGPConfiguration(): |
| cfg = {} |
| db = connect() |
| # note that in 4.0, sql commands cannot be run against the segment mirrors directly |
| # so we filter out non-primary segment databases in the query |
| qry = ''' |
| SELECT content, preferred_role = 'p' as definedprimary, |
| dbid, role = 'p' as isprimary, hostname, address, port, |
| datadir |
| FROM gp_segment_configuration |
| WHERE (role = 'p' or content < 0 ) |
| ''' |
| curs = db.query(qry) |
| for row in curs.dictresult(): |
| if row['content'] == -1 and not row['isprimary']: |
| continue # skip standby coordinator |
| cfg[row['dbid']] = row |
| db.close() |
| return cfg |
| |
| def checkDistribPolicy(): |
| logger.info('-----------------------------------') |
| logger.info('Checking constraints on randomly distributed tables') |
| qry = ''' |
| select n.nspname, rel.relname, pk.conname as constraint |
| from pg_constraint pk |
| join pg_class rel on (pk.conrelid = rel.oid) |
| join pg_namespace n on (rel.relnamespace = n.oid) |
| join gp_distribution_policy d on (rel.oid = d.localoid) |
| where pk.contype in('p', 'u') and d.policytype = 'p' and d.distkey = '' |
| ''' |
| |
| db = connect2(GV.cfg[GV.coordinator_dbid]) |
| try: |
| curs = db.query(qry) |
| err = [] |
| for row in curs.dictresult(): |
| err.append([GV.cfg[GV.coordinator_dbid], ('nspname', 'relname', 'constraint'), row]) |
| |
| if not err: |
| logger.info('[OK] randomly distributed tables') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] randomly distributed tables') |
| logger.error('pg_constraint has %d issue(s)' % len(err)) |
| logger.error(qry) |
| for e in err: |
| logger.error(formatErr(e[0], e[1], e[2])) |
| for e in err: |
| cons = e[2] |
| removeIndexConstraint(cons['nspname'], cons['relname'], |
| cons['constraint']) |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkDistribPolicy') |
| myprint(' Execution error: ' + str(e)) |
| |
| logger.info('-----------------------------------') |
| logger.info('Checking that unique constraints are only on distribution columns') |
| |
| # final part of the WHERE clause here is a little tricky: we want to make |
| # sure that the set of distribution columns is a left subset of the |
| # constraint. |
| qry = ''' |
| select n.nspname, rel.relname, pk.conname as constraint |
| from pg_constraint pk |
| join pg_class rel on (pk.conrelid = rel.oid) |
| join pg_namespace n on (rel.relnamespace = n.oid) |
| join gp_distribution_policy d on (rel.oid = d.localoid) |
| where pk.contype in ('p', 'u') and d.policytype = 'p' |
| and not d.distkey::int2[] operator(pg_catalog.<@) pk.conkey |
| ''' |
| try: |
| curs = db.query(qry) |
| |
| err = [] |
| for row in curs.dictresult(): |
| err.append([GV.cfg[GV.coordinator_dbid], ('nspname', 'relname', 'constraint'), row]) |
| |
| if not err: |
| logger.info('[OK] unique constraints') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] unique constraints') |
| logger.error('pg_constraint has %d issue(s)' % len(err)) |
| logger.error(qry) |
| for e in err: logger.error(formatErr(e[0], e[1], e[2])) |
| for e in err: |
| cons = e[2] |
| removeIndexConstraint(cons['nspname'], cons['relname'], |
| cons['constraint']) |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkDistribPolicy') |
| myprint(' Execution error: ' + str(e)) |
| |
| checkConstraintsRepair() |
| |
| ############# |
| def checkPartitionIntegrity(): |
| logger.info('-----------------------------------') |
| logger.info('Checking pg_partition ...') |
| err = [] |
| db = connect() |
| |
| # Check for the numsegments value of parent and child partition from the gp_distribution_policy table |
| qry = ''' |
| select inhparent::regclass, inhrelid::regclass, parent.numsegments as numsegments_parent, child.numsegments as numsegments_child |
| from pg_inherits inner join pg_partitioned_table on (pg_inherits.inhparent = pg_partitioned_table.partrelid) |
| inner join gp_distribution_policy parent on (parent.localoid = inhparent) |
| inner join gp_distribution_policy child on (child.localoid = inhrelid) |
| where parent.numsegments is distinct from child.numsegments |
| and not (inhrelid in (select ftrelid from pg_catalog.pg_foreign_table) and child.numsegments = NULL); |
| ''' |
| try: |
| curs = db.query(qry) |
| cols = ('inhparent', 'inhrelid', 'numsegments_parent', 'numsegments_child') |
| col_names = { |
| 'inhparent': 'table', |
| 'inhrelid': 'affected child', |
| 'numsegments_parent': 'parent numsegments value', |
| 'numsegments_child': 'child numsegments value', |
| } |
| |
| err = [] |
| for row in curs.dictresult(): |
| err.append([GV.cfg[GV.coordinator_dbid], cols, row]) |
| |
| if not err: |
| logger.info('[OK] partition numsegments check') |
| else: |
| err_count = len(err) |
| GV.checkStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] partition numsegments check') |
| logger.error('partition numsegments check found %d issue(s)' % err_count) |
| if err_count > 100: |
| logger.error(qry) |
| |
| myprint( |
| '[ERROR]: child partition(s) have different numsegments value ' |
| 'from the root partition. Check the gpcheckcat log for details.' |
| ) |
| logger.error('The following tables have different numsegments value (showing at most 100 rows):') |
| |
| # report at most 100 rows, for brevity |
| err = err[:100] |
| |
| for index, e in enumerate(err): |
| cfg = e[0] |
| col = e[1] |
| row = e[2] |
| |
| if index == 0: |
| logger.error("--------") |
| logger.error(" " + " | ".join(map(col_names.get, col))) |
| logger.error(" " + "-+-".join(['-' * len(col_names[x]) for x in col])) |
| |
| logger.error(" " + " | ".join([str(row[x]) for x in col])) |
| |
| if err_count > 100: |
| logger.error(" ...") |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkPartitionIntegrity') |
| myprint(' Execution error: ' + str(e)) |
| |
| # Check for the distribution policy of parent and child partitions based on the following conditions: |
| # 1. If a root is randomly distributed, then all middle, leaf level partitions must also be randomly distributed |
| # 2. If a root is hash distributed, then middle level should be same as root and |
| # leaf level partition can be hash on same key or randomly distributed |
| qry = ''' |
| select inhparent::regclass, inhrelid::regclass, |
| pg_get_table_distributedby(inhparent) as dby_parent, |
| pg_get_table_distributedby(inhrelid) as dby_child |
| from pg_inherits inner join pg_partitioned_table on (pg_inherits.inhparent = pg_partitioned_table.partrelid) |
| where pg_get_table_distributedby(inhrelid) is distinct from pg_get_table_distributedby(inhparent) |
| and not (pg_get_table_distributedby(inhparent) = 'DISTRIBUTED RANDOMLY' and pg_get_table_distributedby(inhrelid) = '') |
| and not (inhrelid in (select ftrelid from pg_catalog.pg_foreign_table) and pg_get_table_distributedby(inhrelid) = '') |
| and not (pg_get_table_distributedby(inhparent) like 'DISTRIBUTED BY%' and pg_get_table_distributedby(inhrelid) = 'DISTRIBUTED RANDOMLY' |
| and (select isleaf from pg_partition_tree(inhparent) where relid = inhrelid)); |
| ''' |
| try: |
| curs = db.query(qry) |
| cols = ('inhparent', 'inhrelid', 'dby_parent', 'dby_child') |
| col_names = { |
| 'inhparent': 'table', |
| 'inhrelid': 'affected child', |
| 'dby_parent': 'table distribution key', |
| 'dby_child': 'child distribution key', |
| } |
| |
| err = [] |
| for row in curs.dictresult(): |
| err.append([GV.cfg[GV.coordinator_dbid], cols, row]) |
| |
| if not err: |
| logger.info('[OK] partition distribution policy check') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] partition distribution policy check') |
| logger.error('partition distribution policy check found %d issue(s)' % len(err)) |
| if len(err) > 100: |
| logger.error(qry) |
| |
| myprint( |
| '[ERROR]: child partition(s) are distributed differently from ' |
| 'the root partition, and must be manually redistributed, for ' |
| 'some tables. Check the gpcheckcat log for details.' |
| ) |
| logger.error('The following tables must be manually redistributed:') |
| |
| count = 0 |
| for e in err: |
| cfg = e[0] |
| col = e[1] |
| row = e[2] |
| |
| # TODO: generate a repair script for this row. This is |
| # difficult, since we can't redistribute child partitions |
| # directly. |
| |
| # report at most 100 rows, for brevity |
| if count == 100: |
| logger.error("...") |
| count += 1 |
| if count > 100: |
| continue |
| |
| if count == 0: |
| logger.error("--------") |
| logger.error(" " + " | ".join(map(col_names.get, col))) |
| logger.error(" " + "-+-".join(['-' * len(col_names[x]) for x in col])) |
| |
| logger.error(" " + " | ".join([str(row[x]) for x in col])) |
| count += 1 |
| |
| logger.error( |
| 'Execute an ALTER TABLE ... SET DISTRIBUTED BY statement, with ' |
| 'the desired distribution key, on the partition root for each ' |
| 'affected table.' |
| ) |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkPartitionIntegrity') |
| myprint(' Execution error: ' + str(e)) |
| |
| db.close() |
| |
| checkPoliciesRepair() |
| |
| ############# |
| def checkPGClass(): |
| logger.info('-----------------------------------') |
| logger.info('Checking pg_class ...') |
| qry = ''' |
| SELECT relname, relkind, tc.oid as oid |
| FROM pg_class tc left outer join |
| pg_attribute ta on (tc.oid = ta.attrelid) |
| WHERE ta.attrelid is NULL and tc.relnatts != 0 |
| ''' |
| err = connect2run(qry, ('relname', 'relkind', 'oid')) |
| if not err: |
| logger.info('[OK] pg_class') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] pg_class') |
| logger.error('pg_class has %d issue(s)' % len(err)) |
| logger.error(qry) |
| for e in err[0:100]: |
| logger.error(formatErr(e[0], e[1], e[2])) |
| if len(err) > 100: |
| logger.error("...") |
| |
| |
| ############# |
| def checkPGNamespace(): |
| # Check for objects in various catalogs that are in a schema that has |
| # been dropped. |
| logger.info('Checking missing schema definitions ...') |
| qry = ''' |
| SELECT o.catalog, o.nsp |
| FROM pg_namespace n right outer join |
| (select 'pg_class' as catalog, relnamespace as nsp from pg_class |
| union |
| select 'pg_type' as catalog, typnamespace as nsp from pg_type |
| union |
| select 'pg_operator' as catalog, oprnamespace as nsp from pg_operator |
| union |
| select 'pg_proc' as catalog,pronamespace as nsp from pg_proc) o on |
| (n.oid = o.nsp) |
| WHERE n.oid is NULL |
| ''' |
| err = connect2run(qry, ('catalog', 'nsp')) |
| if not err: |
| logger.info('[OK] missing schema definitions') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] missing schema definitons') |
| logger.error('found %d references to non-existent schemas' % len(err)) |
| logger.error(qry) |
| for e in err: |
| logger.error(formatErr(e[0], e[1], e[2])) |
| |
| |
| ############# |
| ''' |
| Produce repair scripts to remove dangling entries of gp_fastsequence: |
| - one file per segment dbid |
| - contains DELETE statements to remove catalog entry |
| ''' |
| |
| |
| def removeFastSequence(db): |
| ''' |
| MPP-14758: gp_fastsequence does not get cleanup after a failed transaction (AO/CO) |
| Note: this is slightly different from the normal foreign key check |
| because it stretches the cross reference across segments. |
| This makes it safe in the event of cross consistency issues with pg_class, |
| but may not repair some issues when there are cross consistency problems |
| ''' |
| try: |
| qry = """ |
| SELECT dbid, objid |
| FROM gp_segment_configuration AS cfg JOIN |
| (SELECT gp_segment_id, objid |
| FROM (select gp_segment_id, objid from gp_fastsequence |
| union |
| select gp_segment_id, objid from gp_dist_random('gp_fastsequence')) AS fs |
| LEFT OUTER JOIN |
| (select oid from pg_class |
| union |
| select oid from gp_dist_random('pg_class')) AS c |
| ON (fs.objid = c.oid) |
| WHERE c.oid IS NULL |
| ) AS r |
| ON r.gp_segment_id = cfg.content |
| WHERE cfg.role = 'p'; |
| """ |
| curs = db.query(qry) |
| for row in curs.dictresult(): |
| seg = row['dbid'] # dbid of targeted segment |
| name = 'gp_fastsequence tuple' # for comment purposes |
| table = 'gp_fastsequence' # table name |
| cols = {'objid': row['objid']} # column name and value |
| objname = 'gp_fastsequence' # for comment purposes |
| buildRemove(seg, name, table, cols, objname) |
| except Exception as e: |
| logger.error('removeFastSequence: ' + str(e)) |
| |
| |
| ############# |
| |
| def removeIndexConstraint(nspname, relname, constraint): |
| GV.Constraints.append('ALTER TABLE "%s"."%s" DROP CONSTRAINT "%s" CASCADE;' % \ |
| (nspname, relname, constraint)) |
| |
| |
| def buildRemove(seg, name, table, cols, objname): |
| first = False |
| fullstr = '' |
| str = '' |
| for col in cols: |
| if not first: |
| fullstr = '--Object Name: %s\n--Remove %s for %i\n' % \ |
| (objname, name, cols[col]) |
| if len(str) > 0: |
| str += ' or ' |
| str += '%s = \'%s\'' % (col, cols[col]) |
| fullstr += 'DELETE FROM %s WHERE %s;' % (table, str) |
| addRemove(seg, fullstr) |
| |
| |
| def addRemove(seg, line): |
| if not seg in GV.Remove: |
| GV.Remove[seg] = [] |
| GV.Remove[seg].append(line) |
| |
| |
| def buildAdjustConname(seg, relname, relid, oldconname, newconname): |
| # relname text |
| # relid oid as int |
| # old/newconname text |
| stmt = '--Constraint Name: %s on %s becomes %s\n' % \ |
| (oldconname, relname, newconname) |
| stmt += 'UPDATE pg_constraint\n' |
| stmt += 'SET conname = %s\n' % newconname |
| stmt += 'WHERE conrelid = %d and conname = %s;' % (relid, oldconname) |
| addAdjustConname(seg, stmt) |
| |
| |
| def addAdjustConname(seg, stmt): |
| if not seg in GV.AdjustConname: |
| GV.AdjustConname[seg] = [] |
| GV.AdjustConname[seg].append(stmt) |
| |
| |
| def addDemoteConstraint(seg, repair_sequence): |
| if not seg in GV.DemoteConstraint: |
| GV.DemoteConstraint[seg] = [] |
| GV.DemoteConstraint[seg].append(repair_sequence) |
| |
| |
| ############# |
| def drop_leaked_schemas(leaked_schema_dropper, dbname): |
| logger.info('-----------------------------------') |
| logger.info('Checking for leaked temporary schemas') |
| |
| db_connection = connect(database=dbname) |
| try: |
| dropped_schemas = leaked_schema_dropper.drop_leaked_schemas(db_connection) |
| if not dropped_schemas: |
| logger.info('[OK] temporary schemas') |
| else: |
| logger.info('[FAIL] temporary schemas') |
| myprint("Found and dropped %d unbound temporary schemas" % len(dropped_schemas)) |
| logger.error('Dropped leaked schemas \'%s\' in the database \'%s\'' % (dropped_schemas, dbname)) |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| finally: |
| db_connection.close() |
| |
| def checkDepend(): |
| # Check for dependencies on non-existent objects |
| logger.info('-----------------------------------') |
| logger.info('Checking Object Dependencies') |
| |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| |
| # Catalogs that link up to pg_depend/pg_shdepend |
| qry = """ |
| select relname from pg_class c |
| where relkind='r' |
| and relnamespace=%d |
| and exists (select 1 from pg_attribute a where attname = 'oid' and a.attrelid = c.oid) |
| """ % PG_CATALOG_OID |
| curs = db.query(qry) |
| catalogs = [] |
| for row in curs.getresult(): |
| catalogs.append(row[0]) |
| |
| checkDependJoinCatalog(catalogs) |
| checkCatalogJoinDepend(catalogs) |
| |
| def checkDependJoinCatalog(catalogs): |
| # Construct subquery that will verify that all (classid, objid) |
| # and (refclassid, refobjid) pairs existing in pg_depend actually |
| # exist in that catalog table (classid::regclass or |
| # refclassid::regclass) |
| deps = [] |
| for cat in catalogs: |
| qry = """ |
| SELECT '{catalog}' as catalog, objid FROM ( |
| SELECT objid FROM pg_depend |
| WHERE classid = '{catalog}'::regclass |
| UNION ALL |
| SELECT refobjid FROM pg_depend |
| WHERE refclassid = '{catalog}'::regclass |
| ) d |
| LEFT OUTER JOIN {catalog} c on (d.objid = c.oid) |
| WHERE c.oid is NULL |
| UNION ALL |
| SELECT '{catalog}' as catalog, objid FROM ( |
| SELECT dbid, objid FROM pg_shdepend |
| WHERE classid = '{catalog}'::regclass |
| UNION ALL |
| SELECT dbid, refobjid FROM pg_shdepend |
| WHERE refclassid = '{catalog}'::regclass |
| ) d JOIN pg_database db |
| ON (d.dbid = db.oid and datname= current_database()) |
| LEFT OUTER JOIN {catalog} c on (d.objid = c.oid) |
| WHERE c.oid is NULL |
| """.format(catalog=cat) |
| deps.append(qry) |
| |
| qry = """ |
| SELECT distinct catalog, objid |
| FROM (%s |
| ) q |
| """ % "UNION ALL".join(deps) |
| |
| try: |
| err = connect2run(qry) |
| if not err: |
| logger.info('[OK] extra object dependencies') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] extra object dependencies') |
| logger.error(' found %d dependencies on dropped objects' % len(err)) |
| for config, column, row in err: |
| gpObj = getGPObject(row['objid'], row['catalog']) |
| issue = CatDependencyIssue(row['catalog'], row['objid'], config['content']) |
| gpObj.addDependencyIssue(issue) |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint("[ERROR] executing test: extra object dependencies") |
| myprint(" Execution error: " + str(e)) |
| myprint(qry) |
| |
| def checkCatalogJoinDepend(catalogs): |
| # Construct subqueries to aggregate all OIDs from catalog tables |
| # not in the DEPENDENCY_EXCLUSION |
| deps = [] |
| for cat in catalogs: |
| if cat not in DEPENDENCY_EXCLUSION: |
| qry = """ |
| SELECT '{catalog}' AS catalog, oid FROM {catalog} WHERE oid >= {oid} |
| """.format(catalog=cat, oid=FIRST_NORMAL_OID) |
| |
| # Exclude pg_proc entries in pg_catalog namespace. This is |
| # mainly to avoid flagging language handler functions that |
| # linger after the language is dropped. Extensions (which |
| # do drop their proclang handler functions) will be |
| # replacing language so this exclusion should be fine. |
| if str(cat) == 'pg_proc': |
| qry += """AND pronamespace != {oid} |
| """.format(oid=PG_CATALOG_OID) |
| |
| # Exclude pg_am entries which depend on internal handlers. |
| if str(cat) == 'pg_am': |
| qry = """ |
| SELECT '{catalog}' AS catalog, pg_am.oid AS oid FROM {catalog} JOIN pg_proc ON pg_am.amhandler::text = pg_proc.proname WHERE pg_am.oid >= {oid} AND pg_proc.oid >= {oid} |
| """.format(catalog=cat, oid=FIRST_NORMAL_OID) |
| |
| deps.append(qry) |
| |
| # Construct query that will check that each OID in our aggregated |
| # catalog OIDs list has a reference in pg_depend under objid or |
| # refobjid column |
| qry = """ |
| SELECT distinct catalog, c.oid as objid |
| FROM ( |
| {subquery} |
| ) c |
| LEFT OUTER JOIN |
| ( |
| SELECT objid AS oid FROM pg_depend WHERE objid >= {oid} |
| UNION |
| SELECT refobjid AS oid FROM pg_depend WHERE refobjid >= {oid} |
| ) d |
| ON (c.oid = d.oid) |
| WHERE d.oid IS NULL; |
| """.format(subquery="UNION ALL".join(deps), oid=FIRST_NORMAL_OID) |
| |
| try: |
| err = connect2run(qry) |
| if not err: |
| logger.info('[OK] missing object dependencies') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] missing object dependencies') |
| logger.error(' found %d existing objects without dependencies' % len(err)) |
| for config, column, row in err: |
| gpObj = getGPObject(row['objid'], row['catalog']) |
| issue = CatDependencyIssue(row['catalog'], row['objid'], config['content']) |
| gpObj.addDependencyIssue(issue) |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint("[ERROR] executing test: missing object dependencies") |
| myprint(" Execution error: " + str(e)) |
| myprint(qry) |
| |
| def fixupowners(nspname, relname, oldrole, newrole): |
| # Must first alter the table to the wrong owner, then back to the right |
| # owner. The purpose of this is that AT doesn't dispatch the change unless |
| # it think the table actually changed owner. |
| # |
| # Note: this means that the Owners list must be run in the order given. |
| # |
| GV.Owners.append('-- owner for "%s"."%s"' % (nspname, relname)) |
| GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' % |
| (nspname, relname, oldrole)) |
| GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' % |
| (nspname, relname, newrole)) |
| |
| |
| def checkOwners(): |
| logger.info('-----------------------------------') |
| logger.info('Checking table ownership') |
| |
| # Check owners in pg_class |
| # |
| # - Report each table that has an inconsistency with the coordinator database, |
| # this can be on the table directly, or on one of the table subobjects |
| # (pg_toast, pg_aoseg, etc) |
| # |
| # - Theoretically this could result in multiple corrections for a given |
| # table based on having multiple "wrong" owners. Realistically most |
| # of the problems we have seen with this problem the wrong owner is |
| # almost always the gpadmin user, so this is not expected. If it does |
| # occur it won't be a problem, but we will issue more ALTER TABLE |
| # commands than is strictly necessary. |
| # |
| # - Between 3.3 and 4.0 the ao segment columns migrated from pg_class |
| # to pg_appendonly. |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| qry = ''' |
| select distinct n.nspname, coalesce(o.relname, c.relname) as relname, |
| a.rolname, m.rolname as coordinator_rolname |
| from gp_dist_random('pg_class') r |
| join pg_class c on (c.oid = r.oid) |
| left join pg_appendonly ao on (c.oid = ao.segrelid or |
| c.oid = ao.blkdirrelid or |
| c.oid = ao.blkdiridxid) |
| left join pg_class o on (o.oid = ao.relid or |
| o.reltoastrelid = c.oid) |
| join pg_authid a on (a.oid = r.relowner) |
| join pg_authid m on (m.oid = coalesce(o.relowner, c.relowner)) |
| join pg_namespace n on (n.oid = coalesce(o.relnamespace, c.relnamespace)) |
| where c.relowner <> r.relowner |
| ''' |
| try: |
| curs = db.query(qry) |
| |
| rows = [] |
| for row in curs.dictresult(): |
| rows.append(row) |
| |
| if len(rows) == 0: |
| logger.info('[OK] table ownership') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] table ownership') |
| logger.error('found %d table ownership issue(s)' % len(rows)) |
| logger.error('%s' % qry) |
| for row in rows[0:100]: |
| logger.error(' %s.%s relowner %s != %s' |
| % (row['nspname'], row['relname'], row['rolname'], |
| row['coordinator_rolname'])) |
| if len(rows) > 100: |
| logger.error("...") |
| for row in rows: |
| fixupowners(row['nspname'], row['relname'], row['rolname'], |
| row['coordinator_rolname']) |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint("[ERROR] executing: check table ownership") |
| myprint(" Execution error: " + str(e)) |
| myprint(qry) |
| |
| # TODO: add a check that subobject owners agree with the main object owner. |
| # (The above checks only compare coordinator vs segment) |
| |
| |
| # Check owners in pg_type |
| # - Ignore implementation types of pg_class entries - they should be |
| # in the check above since ALTER TABLE is required to fix them, not |
| # ALTER TYPE. |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| qry = ''' |
| select distinct n.nspname, t.typname, a.rolname, m.rolname as coordinator_rolname |
| from gp_dist_random('pg_type') r |
| join pg_type t on (t.oid = r.oid) |
| join pg_namespace n on (n.oid = t.typnamespace) |
| join pg_authid a on (a.oid = r.typowner) |
| join pg_authid m on (m.oid = t.typowner) |
| where r.typowner <> t.typowner |
| ''' |
| try: |
| curs = db.query(qry) |
| |
| rows = [] |
| for row in curs.dictresult(): |
| rows.append(row) |
| |
| if len(rows) == 0: |
| logger.info('[OK] type ownership') |
| else: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] type ownership') |
| logger.error('found %d type ownership issue(s)' % len(rows)) |
| logger.error('%s' % qry) |
| for row in rows[0:100]: |
| logger.error(' %s.%s typeowner %s != %s' |
| % (row['nspname'], row['typname'], row['rolname'], |
| row['coordinator_rolname'])) |
| if len(rows) > 100: |
| logger.error("...") |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint("[ERROR] executing test: check type ownership") |
| myprint(" Execution error: " + str(e)) |
| myprint(qry) |
| |
| # FIXME: add repair script |
| # NOTE: types with typrelid probably will be repaired by the |
| # script generated by the table ownership check above. |
| |
| # TODO: |
| # Check owners in pg_proc |
| # Check owners in pg_database |
| # Check owners in pg_tablespace |
| # Check owners in pg_namespace |
| # Check owners in pg_operator |
| # Check owners in pg_opclass |
| # ... |
| |
| checkOwnersRepair() |
| |
| |
| def closeDbs(): |
| for key, conns in GV.db.items(): |
| db = conns[0] |
| db.close() |
| GV.db = {} # remove everything |
| |
| |
| # ------------------------------------------------------------------------------- |
| def getCatObj(namestr): |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| try: |
| cat = GV.catalog.getCatalogTable(namestr) |
| except Exception as e: |
| myprint("No such catalog table: %s\n" % str(namestr)) |
| raise |
| return cat |
| |
| |
| # ------------------------------------------------------------------------------- |
| def checkACL(): |
| logger.info('-----------------------------------') |
| logger.info('Performing cross database ACL tests') |
| |
| # looks up information in the catalog: |
| tables = GV.catalog.getCatalogTables() |
| |
| for cat in tables: |
| checkTableACL(cat) |
| # ------------------------------------------------------------------------------- |
| |
| |
| def checkTableACL(cat): |
| catname = cat.getTableName() |
| pkey = cat.getPrimaryKey() |
| coordinator = cat.isCoordinatorOnly() |
| isShared = cat.isShared() |
| acl = cat.getTableAcl() |
| |
| if GV.aclStatus is None: |
| GV.aclStatus = True |
| |
| # Skip: |
| # - coordinator only tables |
| # - tables without a primary key |
| # - tables without acls |
| if coordinator or pkey == [] or acl is None: |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # Comparing ACLs cannot be done with a simple equality comparison |
| # since it is valid for the ACLs to have different order. Instead |
| # we compare that both acls are subsets of each other => equality. |
| |
| mPkey = ['m.' + i for i in pkey] |
| if cat.tableHasConsistentOids(): |
| mPkey = ['m.oid'] |
| |
| qry = """ |
| SELECT s.gp_segment_id as segid, {mPkey}, |
| m.{acl} as coordinator_acl, s.{acl} as segment_acl |
| FROM {catalog} m |
| JOIN gp_dist_random('{catalog}') s using ({primary_key}) |
| WHERE not (m.{acl} @> s.{acl} and s.{acl} @> m.{acl}) or |
| (m.{acl} is null and s.{acl} is not null) or |
| (s.{acl} is null and m.{acl} is not null) |
| ORDER BY {primary_key}, s.gp_segment_id |
| """.format(catalog=catname, |
| primary_key=', '.join(pkey), |
| mPkey=', '.join(mPkey), |
| acl=acl) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Cross consistency acl check for ' + catname) |
| else: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| GV.aclStatus = False |
| logger.info('[FAIL] Cross consistency acl check for ' + catname) |
| logger.error(' %s acl check has %d issue(s)' % (catname, nrows)) |
| |
| fields = curs.listfields() |
| gplog.log_literal(logger, logging.ERROR, " " + " | ".join(fields)) |
| for row in curs.getresult(): |
| gplog.log_literal(logger, logging.ERROR, " " + " | ".join(map(str, row))) |
| processACLResult(catname, fields, curs.getresult()) |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| GV.aclStatus = False |
| myprint('[ERROR] executing: Cross consistency check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| # ------------------------------------------------------------------------------- |
| |
| |
| def checkForeignKey(cat_tables=None): |
| logger.info('-----------------------------------') |
| logger.info('Performing foreign key tests') |
| |
| if GV.foreignKeyStatus is None: |
| GV.foreignKeyStatus = True |
| |
| # looks up information in the catalog: |
| if not cat_tables: |
| cat_tables = GV.catalog.getCatalogTables() |
| |
| db_connection = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| try: |
| foreign_key_check = ForeignKeyCheck(db_connection, logger, GV.opt['-S'], autoCast) |
| foreign_key_issues = foreign_key_check.runCheck(cat_tables) |
| if foreign_key_issues: |
| GV.checkStatus = False |
| |
| for catname, tuples in foreign_key_issues.items(): |
| for pkcatname, fields, results in tuples: |
| processForeignKeyResult(catname, pkcatname, fields, results) |
| if catname == 'gp_fastsequence' and pkcatname == 'pg_class': |
| setError(ERROR_REMOVE) |
| removeFastSequence(db_connection) |
| else: |
| setError(ERROR_NOREPAIR) |
| except Exception as ex: |
| setError(ERROR_NOREPAIR) |
| GV.foreignKeyStatus = False |
| myprint(' Execution error: ' + str(ex)) |
| # ------------------------------------------------------------------------------- |
| |
| |
| def checkMissingEntry(): |
| logger.info('-----------------------------------') |
| logger.info('Performing cross consistency tests: check for missing or extraneous issues') |
| |
| if GV.missingEntryStatus is None: |
| GV.missingEntryStatus = True |
| |
| catalog_issues = _checkAllTablesForMissingEntries() |
| |
| if len(catalog_issues) == 0: |
| return |
| |
| if GV.opt['-E']: |
| do_repair_for_extra(catalog_issues) |
| else: |
| setError(ERROR_NOREPAIR) |
| |
| |
| def _checkAllTablesForMissingEntries(): |
| catalog_issues = {} |
| tables = GV.catalog.getCatalogTables() |
| for catalog_table_obj in tables: |
| issues = checkTableMissingEntry(catalog_table_obj) |
| if not issues: |
| continue |
| catalog_name = catalog_table_obj.getTableName() |
| _, pk_name = getPrimaryKeyColumn(catalog_name, catalog_table_obj.getPrimaryKey()) |
| # We do this check in this function rather than getPrimaryKeyColumn as |
| # it is used in checkACL and we do not want to modify that functionality |
| catalog_issues[(catalog_table_obj, pk_name)] = issues |
| return catalog_issues |
| # ------------------------------------------------------------------------------- |
| |
| def checkTableMissingEntry(cat): |
| catname = cat.getTableName() |
| pkey = cat.getPrimaryKey() |
| coordinator = cat.isCoordinatorOnly() |
| isShared = cat.isShared() |
| coltypes = cat.getTableColtypes() |
| |
| # Skip coordinator only tables |
| if coordinator: |
| return |
| |
| # Skip gp_segment_configuration |
| if catname == "gp_segment_configuration": |
| return |
| |
| # Skip gp_matview_aux or gp_matview_tables |
| if catname == "gp_matview_aux" or catname == "gp_matview_tables": |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # Skip catalogs without primary key |
| if len(pkey) == 0: |
| logger.warning("[WARN] Skipped missing/extra entry check for %s" % catname) |
| return |
| |
| castedPkey = cat.getPrimaryKey() |
| castedPkey = [c + autoCast.get(coltypes[c], '') for c in castedPkey] |
| |
| if cat.tableHasConsistentOids(): |
| qry = missingEntryQuery(GV.max_content, catname, ['oid'], ['oid']) |
| else: |
| qry = missingEntryQuery(GV.max_content, catname, pkey, castedPkey) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| results = curs.getresult() |
| fields = curs.listfields() |
| |
| if nrows != 0: |
| results = filterSpuriousFailures(catname, fields, results) |
| nrows = len(results) |
| |
| if nrows == 0: |
| logger.info('[OK] Checking for missing or extraneous entries for ' + catname) |
| else: |
| if catname in ['pg_constraint']: |
| logger_with_level = logger.warning |
| log_level = logging.WARNING |
| else: |
| GV.checkStatus = False |
| GV.missingEntryStatus = False |
| logger_with_level = logger.error |
| log_level = logging.ERROR |
| |
| logger.info(('[%s] Checking for missing or extraneous entries for ' + catname) % |
| ('WARNING' if log_level == logging.WARNING else 'FAIL')) |
| logger_with_level(' %s has %d issue(s)' % (catname, nrows)) |
| gplog.log_literal(logger, log_level, " " + " | ".join(fields)) |
| for row in results: |
| gplog.log_literal(logger, log_level, " " + " | ".join(map(str, row))) |
| processMissingDuplicateEntryResult(catname, fields, results, "missing") |
| if catname == 'pg_type': |
| generateVerifyFile(catname, fields, results, 'missing_extraneous') |
| |
| return results |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| GV.missingEntryStatus = False |
| myprint('[ERROR] executing: Missing or extraneous entries check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| class checkAOSegVpinfoThread(execThread): |
| def __init__(self, cfg, db): |
| execThread.__init__(self, cfg, db, None) |
| |
| def run(self): |
| aoseg_query = """ |
| SELECT a.relname, a.relid, a.segrelid, cl.relname, a.relnatts |
| FROM (SELECT p.relid, p.segrelid, c.relname, c.relnatts FROM pg_appendonly p LEFT JOIN pg_class c ON p.relid = c.oid LEFT JOIN pg_am a ON (a.oid = c.relam) |
| WHERE a.amname = 'ao_column') a |
| LEFT JOIN pg_class cl ON a.segrelid = cl.oid; |
| """ |
| |
| try: |
| # Read the list of aoseg tables from the database |
| curs = self.db.query(aoseg_query) |
| |
| for relname, relid, segrelid, segrelname, attr_count in curs.getresult(): |
| # We check vpinfo consistency only for segs that are in state |
| # AOSEG_STATE_DEFAULT and which are not RESERVED_SEGNO. |
| # RESERVED_SEGNO can have a different number of attributes than |
| # the other segments. For eg. if we ALTER TABLE ADD COLUMN in a |
| # transaction, and then insert rows in the same transaction, and |
| # then abort, the RESERVED_SEGNO will hold the inserted rows. |
| # The vpinfo for RESERVED_SEGNO will have more columns than |
| # relnatts in that case. |
| qry = "SELECT distinct(length(vpinfo)) FROM pg_aoseg.%s where state = 1 and segno <> 0;" % (segrelname) |
| vpinfo_curs = self.db.query(qry) |
| nrows = vpinfo_curs.ntuples() |
| if nrows == 0: |
| continue |
| elif nrows > 1: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] inconsistent vpinfo') |
| logger.error("found {nrows} vpinfo(s) with different length in 'pg_aoseg.{segrelname}' of table '{relname}' on segment {content}" |
| .format(nrows = nrows, |
| segrelname = segrelname, |
| relname = relname, |
| content = self.cfg['content'])) |
| logger.error(qry) |
| continue |
| |
| vpinfo_length = vpinfo_curs.getresult()[0][0] |
| |
| # vpinfo is bytea type, the length of the first 3 fields is 12 bytes, and the size of AOCSVPInfoEntry is 16 |
| # typedef struct AOCSVPInfo |
| # { |
| # int32 _len; |
| # int32 version; |
| # int32 nEntry; |
| # |
| # AOCSVPInfoEntry entry[1]; |
| # } AOCSVPInfo; |
| vpinfo_attr_count = (vpinfo_length - 12) / 16 |
| if vpinfo_attr_count != attr_count: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] inconsistent vpinfo') |
| logger.error("vpinfo in 'pg_aoseg.{segrelname}' of table '{relname}' contains {vpinfo_attr_count} attributes, while pg_class has {attr_count} attributes on segment {content}" |
| .format(segrelname = segrelname, |
| relname = relname, |
| vpinfo_attr_count = vpinfo_attr_count, |
| attr_count = attr_count, |
| content = self.cfg['content'])) |
| logger.error(qry) |
| except Exception as e: |
| GV.checkStatus = False |
| self.error = e |
| |
| def checkAOSegVpinfo(): |
| threads = [] |
| i = 1 |
| # parallelise check |
| for dbid in GV.cfg: |
| cfg = GV.cfg[dbid] |
| db_connection = connect2(cfg) |
| thread = checkAOSegVpinfoThread(cfg, db_connection) |
| thread.start() |
| logger.debug('launching check thread %s for dbid %i' % |
| (thread.name, dbid)) |
| threads.append(thread) |
| |
| if (i % GV.opt['-B']) == 0: |
| processThread(threads) |
| threads = [] |
| |
| i += 1 |
| |
| processThread(threads) |
| |
| # ------------------------------------------------------------------------------- |
| |
| # Exclude these tuples from the catalog table scan |
| # pg_depend: classid 2603 (pg_amproc) is excluded because their OIDs can be |
| # nonsynchronous (catalog.c:RelationNeedsSynchronizedOIDs()) |
| MISSING_ENTRY_EXCLUDE = {'pg_depend': 'WHERE classid != 2603'} |
| |
| def missingEntryQuery(max_content, catname, pkey, castedPkey): |
| # ================= |
| # Missing / Extra |
| # ================= |
| # |
| # (Fetch all the entries from segments. For each entry, collect the |
| # segment IDs of all the segments where the entry is present, in an array) |
| # |
| # Full outer join |
| # |
| # (Fetch all entries in coordinator) |
| # |
| # |
| # The WHERE clause at the bottom filters out all the boring rows, leaving |
| # only rows that are missing from one of the segments, or from the coordinator. |
| |
| catalog_exclude = MISSING_ENTRY_EXCLUDE.get(catname, "") |
| |
| qry = """ |
| SELECT {primary_key}, |
| case when coordinator is null then segids |
| when segids is null then array[coordinator.segid] |
| else coordinator.segid || segids end as segids |
| FROM |
| ( |
| SELECT {primary_key}, array_agg(gp_segment_id order by gp_segment_id) as segids |
| FROM gp_dist_random('{catalog}') {catalog_exclude} GROUP BY {primary_key} |
| ) as seg |
| FULL OUTER JOIN |
| ( |
| SELECT gp_segment_id as segid, {primary_key} FROM {catalog} {catalog_exclude} |
| ) as coordinator |
| USING ({primary_key}) |
| WHERE coordinator.segid is null |
| OR segids is null |
| OR NOT segids @> (select array_agg(content::int4) from gp_segment_configuration WHERE content >= 0) |
| """.format(primary_key=','.join(pkey), |
| catalog=catname, |
| catalog_exclude=catalog_exclude) |
| |
| return qry |
| |
| |
| # ------------------------------------------------------------------------------- |
| def checkInconsistentEntry(): |
| logger.info('-----------------------------------') |
| logger.info('Performing cross consistency test: check for inconsistent entries') |
| |
| if GV.inconsistentEntryStatus is None: |
| GV.inconsistentEntryStatus = True |
| |
| # looks up information in the catalog: |
| tables = GV.catalog.getCatalogTables() |
| |
| for cat in tables: |
| checkTableInconsistentEntry(cat) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def checkTableInconsistentEntry(cat): |
| catname = cat.getTableName() |
| pkey = cat.getPrimaryKey() |
| coordinator = cat.isCoordinatorOnly() |
| isShared = cat.isShared() |
| columns = cat.getTableColumns(with_acl=False) |
| coltypes = cat.getTableColtypes() |
| |
| # Skip coordinator only tables |
| if coordinator: |
| return |
| |
| # Skip gp_segment_configuration or pg_appendonly |
| if catname == "gp_segment_configuration" or catname == "pg_appendonly": |
| return |
| |
| # Skip gp_matview_aux or gp_matview_tables |
| if catname == "gp_matview_aux" or catname == "gp_matview_tables": |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # Skip catalogs without primary key |
| if pkey == []: |
| logger.warning("[WARN] Skipped cross consistency check for %s" % |
| catname) |
| return |
| |
| castedPkey = cat.getPrimaryKey() |
| castedPkey = [c + autoCast.get(coltypes[c], '') for c in castedPkey] |
| castcols = [c + autoCast.get(coltypes[c], '') for c in columns] |
| |
| # pg_attribute.attmissingval has pseudo-type 'any', which doesn't allow |
| # much operations on it. Cast it to text, by replacing it with |
| # "attmissingval::text" in the columns list. |
| columns = list(map(lambda b: b.replace("attmissingval","attmissingval::text"), columns)) |
| castcols = list(map(lambda b: b.replace("attmissingval","attmissingval::text"), castcols)) |
| |
| # pg_dependencies and pg_mcv_list disallow inputs, convert to text for the query |
| columns = list(map(lambda b: b.replace("stxddependencies","stxddependencies::text"), columns)) |
| castcols = list(map(lambda b: b.replace("stxddependencies","stxddependencies::text"), castcols)) |
| columns = list(map(lambda b: b.replace("stxdmcv","stxdmcv::text"), columns)) |
| castcols = list(map(lambda b: b.replace("stxdmcv","stxdmcv::text"), castcols)) |
| # fixme: we should support the rolpassword column when the password is encrypted with SCRAM-SHA-256 |
| if catname == "pg_authid": |
| columns.remove("rolpassword") |
| castcols.remove("rolpassword") |
| columns.remove("rolpasswordsetat") |
| castcols.remove("rolpasswordsetat") |
| columns.remove("rollockdate") |
| castcols.remove("rollockdate") |
| columns.remove("rolpasswordexpire") |
| castcols.remove("rolpasswordexpire") |
| |
| if catname == "pg_password_history": |
| columns.remove("passhistpasswordsetat") |
| castcols.remove("passhistpasswordsetat") |
| |
| if catname == "pg_directory_table": |
| columns.remove("dtlocation") |
| castcols.remove("dtlocation") |
| |
| if cat.tableHasConsistentOids(): |
| qry = inconsistentEntryQuery(GV.max_content, catname, ['oid'], columns, castcols) |
| else: |
| qry = inconsistentEntryQuery(GV.max_content, catname, castedPkey, columns, castcols) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Checking for inconsistent entries for ' + catname) |
| else: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| GV.inconsistentEntryStatus = False |
| logger.info('[FAIL] Checking for inconsistent entries for ' + catname) |
| logger.error(' %s has %d issue(s)' % (catname, nrows)) |
| |
| fields = curs.listfields() |
| gplog.log_literal(logger, logging.ERROR, " " + " | ".join(fields)) |
| for row in curs.getresult(): |
| gplog.log_literal(logger, logging.ERROR, " " + " | ".join(map(str, row))) |
| results = curs.getresult() |
| processInconsistentEntryResult(catname, pkey, fields, results) |
| if catname == 'pg_type': |
| generateVerifyFile(catname, fields, results, 'duplicate') |
| |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| GV.inconsistentEntryStatus = False |
| myprint('[ERROR] executing: Inconsistent entries check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def inconsistentEntryQuery(max_content, catname, pkey, columns, castcols): |
| # -- ============== |
| # -- Inconsistent |
| # -- ============== |
| # -- Build set of all values in the instance |
| # -- |
| # -- Count number of unique values for the primary key |
| # -- - Ignore rows where this does not equal number of segments |
| # -- (These are the missing/extra rows and are covered by other checks) |
| # -- |
| # -- Count number of unique values for all columns |
| # -- - Ignore rows where this equals number of segments |
| # -- |
| # -- Group the majority opinion into one bucket and report everyone who disagrees with |
| # -- the majority individually. |
| # -- |
| # -- Majority gets reported with gp_segment_id == null |
| |
| qry = """ |
| SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; |
| |
| -- distribute catalog table from coordinator, so that we can avoid to gather |
| CREATE TEMPORARY TABLE _tmp_coordinator ON COMMIT DROP AS |
| SELECT gp_segment_id AS segid, {columns} FROM {catalog} DISTRIBUTED BY (segid); |
| SELECT {columns}, array_agg(gp_segment_id order by gp_segment_id) as segids |
| FROM |
| ( |
| SELECT DISTINCT |
| case when dcount <= ({max_content}+2)/2.0 then gp_segment_id else null end as gp_segment_id, {columns} |
| FROM |
| ( |
| select count(*) over (partition by {columns}) as dcount, |
| count(*) over (partition by {pkey}) as pcount, |
| gp_segment_id, {columns} |
| from ( |
| select segid gp_segment_id, {castcols} from _tmp_coordinator |
| union all |
| select gp_segment_id, {castcols} |
| from gp_dist_random('{catalog}') |
| ) all_segments |
| ) counted_segments |
| WHERE dcount != {max_content}+2 and pcount = {max_content}+2 |
| ORDER BY {pkey}, gp_segment_id |
| ) rowresult |
| GROUP BY {columns} |
| ORDER BY {pkey}, segids; |
| """.format(pkey=','.join(pkey), |
| catalog=catname, |
| columns=','.join(columns), |
| castcols=','.join(castcols), |
| max_content=max_content) |
| |
| return qry |
| |
| |
| # ------------------------------------------------------------------------------- |
| def checkDuplicateEntry(): |
| logger.info('-----------------------------------') |
| logger.info('Performing test: checking for duplicate entries') |
| |
| # looks up information in the catalog: |
| tables = GV.catalog.getCatalogTables() |
| |
| for cat in tables: |
| ## pg_depend does not care about duplicates at the moment |
| if str(cat) == 'pg_depend': |
| continue |
| checkTableDuplicateEntry(cat) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def checkTableDuplicateEntry(cat): |
| catname = cat.getTableName() |
| pkey = cat.getPrimaryKey() |
| coordinator = cat.isCoordinatorOnly() |
| isShared = cat.isShared() |
| columns = cat.getTableColumns(with_acl=False) |
| coltypes = cat.getTableColtypes() |
| |
| # Skip coordinator only tables |
| if coordinator: |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # Skip catalogs without primary key |
| if pkey == []: |
| logger.warning("[WARN] Skipped duplicate check for %s" % |
| catname) |
| return |
| |
| pkey = [c + autoCast.get(coltypes[c], '') for c in pkey] |
| if cat.tableHasConsistentOids(): |
| qry = duplicateEntryQuery(catname, ['oid']) |
| else: |
| qry = duplicateEntryQuery(catname, pkey) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Checking for duplicate entries for ' + catname) |
| else: |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.error('[FAIL] Checking for duplicate entries for ' + catname) |
| logger.error(' %s has %d issue(s)' % (catname, nrows)) |
| |
| fields = curs.listfields() |
| gplog.log_literal(logger, logging.ERROR, " " + " | ".join(fields)) |
| results = curs.getresult() |
| for row in results: |
| gplog.log_literal(logger, logging.ERROR, " " + " | ".join(map(str, row))) |
| processMissingDuplicateEntryResult(catname, fields, results, "duplicate") |
| if catname == 'pg_type': |
| generateVerifyFile(catname, fields, results, 'duplicate') |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing: duplicate entries check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def duplicateEntryQuery(catname, pkey): |
| # -- =========== |
| # -- Duplicate |
| # -- =========== |
| # -- Return any rows having count > 1 for a given segid, {unique_key} pair |
| # -- |
| |
| qry = """ |
| SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; |
| |
| -- distribute catalog table from coordinator, so that we can avoid to gather |
| CREATE TEMPORARY TABLE _tmp_coordinator ON COMMIT DROP AS |
| SELECT gp_segment_id AS segid, {pkey} FROM {catalog} DISTRIBUTED BY (segid); |
| SELECT {pkey}, total, array_agg(segid order by segid) as segids |
| FROM ( |
| SELECT segid, {pkey}, count(*) as total |
| FROM ( |
| select segid, {pkey} FROM _tmp_coordinator |
| union all |
| select gp_segment_id as segid, {pkey} FROM gp_dist_random('{catalog}') |
| ) all_segments |
| GROUP BY segid, {pkey} |
| HAVING count(*) > 1 |
| ) rowresult |
| GROUP BY {pkey}, total |
| """.format(catalog=catname, |
| pkey=','.join(pkey)) |
| |
| return qry |
| |
| |
| # ------------------------------------------------------------------------------- |
| def checkUniqueIndexViolation(): |
| logger.info('-----------------------------------') |
| logger.info('Performing check: checking for violated unique indexes') |
| db_connection = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| |
| violations = UniqueIndexViolationCheck().runCheck(db_connection) |
| |
| checkname = 'unique index violation(s)' |
| if violations: |
| logger.info('[FAIL] %s' % checkname) |
| GV.checkStatus = False |
| setError(ERROR_NOREPAIR) |
| log_unique_index_violations(violations) |
| else: |
| logger.info('[OK] %s' % checkname) |
| |
| def log_unique_index_violations(violations): |
| log_output = ['\n segment_id | index_name | table_name | column_names'] |
| log_output.append(' ---------------------------------------------------') |
| |
| for violation in violations: |
| error_object = getGPObject(violation['table_oid'], violation['table_name']) |
| issue = CatUniqueIndexViolationIssue(violation['table_name'], |
| violation['index_name']) |
| error_object.addDuplicateIssue(issue) |
| |
| for seg_id in violation['violated_segments']: |
| log_output.append(' %s | %s | %s | %s' % ( |
| seg_id, |
| violation['index_name'], |
| violation['table_name'], |
| violation['column_names'], |
| )) |
| |
| logger.error('\n'.join(log_output)) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def checkOrphanedToastTables(): |
| logger.info('-----------------------------------') |
| logger.info('Performing check: checking for orphaned TOAST tables') |
| |
| db_connection = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| checker = OrphanedToastTablesCheck() |
| check_passed = checker.runCheck(db_connection) |
| |
| checkname = 'orphaned toast table(s)' |
| if check_passed: |
| logger.info('[OK] %s' % checkname) |
| else: |
| logger.info('[FAIL] %s' % checkname) |
| GV.checkStatus = False |
| setError(ERROR_REMOVE) |
| |
| # log raw orphan toast table query results only to the log file |
| log_output = ['Orphan toast tables detected for the following scenarios:'] |
| for issue_type in checker.issue_types(): |
| log_output.append('\n' + issue_type.header) |
| log_output.append('content_id | toast_table_oid | toast_table_name | expected_table_oid | expected_table_name | dependent_table_oid | dependent_table_name') |
| log_output.append('-----------+-----------------+------------------+--------------------+---------------------+---------------------+---------------------') |
| for row in checker.rows_of_type(issue_type): |
| log_output.append('{} | {} | {} | {} | {} | {} | {}'.format( |
| row['content_id'], |
| row['toast_table_oid'], |
| row['toast_table_name'], |
| row['expected_table_oid'], |
| row['expected_table_name'], |
| row['dependent_table_oid'], |
| row['dependent_table_name'])) |
| log_output = '\n'.join(log_output) |
| log_output = log_output + '\n' |
| logger.error(log_output) |
| |
| # log fix instructions for the orphaned toast tables to stdout and log file |
| gplog.log_literal(logger, logging.CRITICAL, checker.get_fix_text()) |
| |
| # log per-orphan table issue and cause to stdout and log file |
| for issue, segments in checker.iterate_issues(): |
| cat_issue_obj = CatOrphanToastTableIssue(issue.table.oid, |
| issue.table.catname, |
| issue, |
| segments) |
| error_object = getGPObject(issue.table.oid, issue.table.catname) |
| error_object.addOrphanToastTableIssue(cat_issue_obj) |
| |
| do_repair_for_segments(segments_to_repair_statements=checker.add_repair_statements(GV.cfg), |
| issue_type="orphaned_toast_tables", |
| description='Repairing orphaned TOAST tables') |
| |
| |
| ############################################################################ |
| # Help populating repair part for all checked types |
| # All functions dependent on results stored in global variable GV |
| ############################################################################ |
| name_repair = { |
| "segment": |
| { |
| "description": "Check if needs repairing segment", |
| "fn": lambda maybeRemove, catmod_guc, seg: checkSegmentRepair(maybeRemove, catmod_guc, seg) |
| }, |
| } |
| |
| ############################################################################ |
| # version = X.X : run this check as part of running all checks on gpdb <= X.X |
| # individually, any check should be ok run on any version |
| # online = True: okie to run gpcheckcat online |
| # order = X : the order check should be run when running all checks |
| ############################################################################ |
| |
| all_checks = { |
| "unique_index_violation": |
| { |
| "description": "Check for violated unique indexes", |
| "fn": lambda: checkUniqueIndexViolation(), |
| "version": "main", |
| "order": 1, |
| "online": True |
| }, |
| "duplicate": |
| { |
| "description": "Check for duplicate entries", |
| "fn": lambda: checkDuplicateEntry(), |
| "version": 'main', |
| "order": 2, |
| "online": True |
| }, |
| "missing_extraneous": |
| { |
| "description": "Cross consistency check for missing or extraneous entries", |
| "fn": lambda: checkMissingEntry(), |
| "version": 'main', |
| "order": 3, |
| "online": True |
| }, |
| "inconsistent": |
| { |
| "description": "Cross consistency check for coordinator segment inconsistency", |
| "fn": lambda: checkInconsistentEntry(), |
| "version": 'main', |
| "order": 4, |
| "online": True |
| }, |
| "foreign_key": |
| { |
| "description": "Check foreign keys", |
| "fn": lambda: checkForeignKey(), |
| "version": 'main', |
| "order": 5, |
| "online": True |
| }, |
| "acl": |
| { |
| "description": "Cross consistency check for access control privileges", |
| "fn": lambda: checkACL(), |
| "version": 'main', |
| "order": 6, |
| "online": True, |
| "defaultSkip": True |
| }, |
| "pgclass": |
| { |
| "description": "Check pg_class entry that does not have any correspond pg_attribute entry", |
| "fn": lambda: checkPGClass(), |
| "version": 'main', |
| "order": 7, |
| "online": False |
| }, |
| "namespace": |
| { |
| "description": "Check for schemas with a missing schema definition", |
| "fn": lambda: checkPGNamespace(), |
| "version": 'main', |
| "order": 8, |
| "online": False |
| }, |
| "distribution_policy": |
| { |
| "description": "Check constraints on randomly distributed tables", |
| "fn": lambda: checkDistribPolicy(), |
| "version": 'main', |
| "order": 9, |
| "online": False |
| }, |
| "dependency": |
| { |
| "description": "Check for dependency on non-existent objects", |
| "fn": lambda: checkDepend(), |
| "version": 'main', |
| "order": 10, |
| "online": False |
| }, |
| "owner": |
| { |
| "description": "Check table ownership that is inconsistent with the coordinator database", |
| "fn": lambda: checkOwners(), |
| "version": 'main', |
| "order": 11, |
| "online": True, |
| "defaultSkip": True |
| }, |
| "part_integrity": |
| { |
| "description": "Check pg_partition branch integrity, partition with oids, partition distribution policy", |
| "fn": lambda: checkPartitionIntegrity(), |
| "version": 'main', |
| "order": 12, |
| "online": True |
| }, |
| "orphaned_toast_tables": |
| { |
| "description": "Check pg_class and pg_depend for orphaned TOAST tables", |
| "fn": checkOrphanedToastTables, |
| "version": 'main', |
| "order": 13, |
| "online": True |
| }, |
| "aoseg_table": |
| { |
| "description": "Check that vpinfo in aoseg table is consistent with pg_attribute", |
| "fn": lambda: checkAOSegVpinfo(), |
| "version": 'main', |
| "order": 15, |
| "online": False |
| } |
| } |
| |
| |
| #------------------------------------------------------------------------------- |
| def listAllChecks(): |
| |
| myprint('\nList of gpcheckcat tests:\n') |
| for name in sorted(all_checks, key=lambda x: all_checks[x]["order"]): |
| myprint(" %24s: %s" % \ |
| (name, all_checks[name]["description"])) |
| myprint('') |
| |
| |
| #------------------------------------------------------------------------------- |
| def runCheckCatname(catalog_table_obj): |
| |
| checks = {'missing_extraneous': lambda: checkTableMissingEntry(catalog_table_obj), |
| 'inconsistent': lambda: checkTableInconsistentEntry(catalog_table_obj), |
| # We assume that everything is passed as a list to checkForeignKey |
| 'foreign_key': lambda: checkForeignKey([catalog_table_obj]), |
| 'duplicate': lambda: checkTableDuplicateEntry(catalog_table_obj), |
| 'acl': lambda: checkTableACL(catalog_table_obj)} |
| |
| for check in sorted(checks): |
| myprint("Performing test '%s' for %s" % (check, catalog_table_obj.getTableName())) |
| stime = time.time() |
| checks[check]() |
| etime = time.time() |
| elapsed = etime - stime |
| GV.elapsedTime += elapsed |
| elapsed = str(datetime.timedelta(seconds=elapsed))[:-4] |
| GV.totalCheckRun += 1 |
| myprint("Total runtime for test '%s': %s" % (check, elapsed)) |
| |
| |
| #------------------------------------------------------------------------------- |
| def runOneCheck(name): |
| |
| if GV.opt['-O'] and not all_checks[name]["online"]: |
| logger.info("%s: Skip this test in online mode" % (name)) |
| return |
| else: |
| myprint("Performing test '%s'" % name) |
| GV.totalCheckRun += 1 |
| GV.checkStatus = True |
| stime = time.time() |
| all_checks[name]["fn"]() |
| etime = time.time() |
| elapsed = etime - stime |
| GV.elapsedTime += elapsed |
| elapsed = str(datetime.timedelta(seconds=elapsed))[:-4] |
| myprint("Total runtime for test '%s': %s" % (name, elapsed)) |
| if GV.checkStatus == False: |
| GV.failedChecks.append(name) |
| |
| |
| #------------------------------------------------------------------------------- |
| def runAllChecks(run_tests): |
| ''' |
| perform catalog check for specified database |
| ''' |
| for name in sorted(all_checks, key=lambda x: all_checks[x]["order"]): |
| if all_checks[name]["version"] >= GV.version and name in run_tests: |
| if (not GV.opt['-R'] and "defaultSkip" in all_checks[name] and all_checks[name]["defaultSkip"]): |
| logger.debug("Default skipping test: "+name) |
| else: |
| runOneCheck(name) |
| |
| closeDbs() |
| logger.info("------------------------------------") |
| fixes = (len(GV.Remove) > 0 or |
| len(GV.AdjustConname) > 0 or |
| len(GV.DemoteConstraint) > 0 or |
| len(GV.ReSync) > 0) |
| if GV.opt['-g'] != None and fixes: |
| repair_obj = Repair(context=GV) |
| repair_dir_path = repair_obj.create_repair_dir() |
| logger.debug('Building catalog repair scripts') |
| # build a script to run these files |
| script = '' |
| catmod_guc = "-c allow_system_table_mods=true" |
| |
| # don't remove anything if ReSync possible -- |
| # comment out the delete statements from the repair scripts |
| maybeRemove = "" |
| if len(GV.ReSync): |
| maybeRemove = "# " |
| |
| # use the per-dbid loop for removal and for constraint |
| # name adjustments |
| |
| dbids = set(GV.Remove.keys()) |
| dbids = dbids.union(GV.AdjustConname.keys()) |
| dbids = dbids.union(GV.DemoteConstraint.keys()) |
| |
| for seg in dbids: |
| script += '\n%secho "Repairing segment %i"\n' % (maybeRemove, seg) |
| segment_repair_script = name_repair["segment"]["fn"](maybeRemove, catmod_guc, seg) |
| script += segment_repair_script |
| |
| repair_obj.append_content_to_bash_script(repair_dir_path, script) |
| |
| print_repair_issues(repair_dir_path) |
| |
| if GV.retcode >= ERROR_NOREPAIR: |
| logger.warning('[WARN]: unable to generate repairs for some issues') |
| logger.info("Check complete") |
| |
| |
| def do_repair(sql_repair_contents, issue_type, description): |
| logger.info("Starting repair of %s with %d issues" % (description, len(sql_repair_contents))) |
| if len(sql_repair_contents) == 0: |
| return |
| |
| repair_dir_path = "" |
| try: |
| repair_obj = Repair(context=GV, issue_type=issue_type, desc=description) |
| repair_dir_path = repair_obj.create_repair(sql_repair_contents=sql_repair_contents) |
| except Exception as ex: |
| logger.fatal(str(ex)) |
| |
| print_repair_issues(repair_dir_path) |
| |
| def do_repair_for_segments(segments_to_repair_statements, issue_type, description): |
| logger.info("Starting repair of %s" % description) |
| |
| repair_dir_path = "" |
| try: |
| repair_obj = Repair(context=GV, issue_type=issue_type, desc=description) |
| repair_dir_path = repair_obj.create_segment_repair_scripts(segments_to_repair_statements) |
| except Exception as ex: |
| logger.fatal(str(ex)) |
| |
| print_repair_issues(repair_dir_path) |
| |
| |
| def do_repair_for_extra(catalog_issues): |
| setError(ERROR_REMOVE) |
| repair_dir_path = "" |
| for (catalog_table_obj, pk_name), issues in catalog_issues.items(): |
| try: |
| repair_obj = Repair(context=GV, issue_type="extra", desc="Removing extra entries") |
| repair_dir_path = repair_obj.create_repair_for_extra_missing(catalog_table_obj=catalog_table_obj, |
| issues=issues, |
| pk_name=pk_name, |
| segments=GV.cfg) |
| except Exception as ex: |
| logger.fatal(str(ex)) |
| |
| print_repair_issues(repair_dir_path) |
| |
| |
| def print_repair_issues(repair_dir_path): |
| myprint('') |
| myprint("catalog issue(s) found , repair script(s) generated in dir {0}".format(repair_dir_path)) |
| myprint('') |
| |
| |
| def checkReindexRepair(): |
| if len(GV.Reindex) > 0: |
| # dbname.type.timestamp.sql |
| filename = '%s.%s.%s.sql' % (GV.dbname, "reindex", GV.timestamp) |
| fullpath = '%s/%s' % (GV.opt['-g'], filename) |
| try: |
| file = open(fullpath, 'w') |
| except Exception as e: |
| logger.fatal('Unable to create file "%s": %s' % (fullpath, str(e))) |
| sys.exit(1) |
| |
| description = '\necho "Reindexing potentially damaged bitmap indexes"\n' |
| for r in GV.Reindex: |
| file.write('REINDEX INDEX %s;\n' % r) |
| file.close() |
| return description, filename |
| else: |
| return None, None |
| |
| def checkConstraintsRepair(): |
| do_repair(GV.Constraints, issue_type="removeconstraints", description='Dropping invalid unique constraints') |
| |
| def checkOwnersRepair(): |
| # Previously we removed duplicate statements from the "Owners" list |
| # by casting it through a "list(set(...))", however this does not work |
| # since the order of items in the list is important for correct results. |
| do_repair(GV.Owners, issue_type="fixowner", description='Correcting table ownership') |
| |
| def checkPoliciesRepair(): |
| # changes to distribution policies |
| do_repair(GV.Policies, issue_type="fixdistribution", description='Fixing distribution policies') |
| |
| def checkSegmentRepair(maybeRemove, catmod_guc, seg): |
| # dbid.host.port.dbname.timestamp.sql |
| c = GV.cfg[seg] |
| filename = '%i.%s.%i.%s.%s.sql' % (seg, c['hostname'], c['port'], GV.dbname, GV.timestamp) |
| filename = filename.replace(' ', '_') |
| fullpath = '%s/%s' % (GV.opt['-g'], filename) |
| try: |
| file = open(fullpath, 'w') |
| except Exception as e: |
| logger.fatal('Unable to create file "%s": %s' % (fullpath, str(e))) |
| sys.exit(1) |
| |
| # unique |
| lines = set() |
| if seg in GV.Remove:#foreign_key check |
| lines = lines.union(GV.Remove[seg]) |
| if seg in GV.AdjustConname:#part_constraint |
| lines = lines.union(GV.AdjustConname[seg]) |
| if seg in GV.DemoteConstraint: |
| lines = lines.union(GV.DemoteConstraint[seg]) |
| |
| # make sure it is sorted |
| li = list(lines) |
| li.sort() |
| file.write('BEGIN;\n') |
| for line in li: |
| file.write(line + '\n') |
| file.write('COMMIT;\n') |
| file.close() |
| |
| run_psql_script = '''{maybe}env PGOPTIONS="-c gp_role=utility {guc}" psql -X -a -h {hostname} -p {port} -f {fname} "{dbname}" > {fname}.out'''.format( |
| maybe=maybeRemove, guc=catmod_guc, |
| hostname=c['hostname'], |
| port=c['port'], |
| fname=filename, |
| dbname=GV.dbname) |
| |
| return run_psql_script |
| |
| |
| # ------------------------------------------------------------------------------- |
| def getCatalog(): |
| # Establish a connection to the coordinator & looks up info in the catalog |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| return GPCatalog(db) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def getReportConfiguration(): |
| cfg = {} |
| for _, each in GV.cfg.items(): |
| cfg[each['content']] = {'segname': "content " + str(each['content']), |
| 'hostname': each['hostname'], |
| 'port': each['port']} |
| return cfg |
| |
| |
| # =============================================================================== |
| # GPCHECKCAT REPORT |
| # =============================================================================== |
| |
| GPObjects = {} # key = (catname, oid), value = GPObject |
| GPObjectGraph = {} # key = GPObject, value=[GPObject] |
| # ------------------------------------------------------------------------------- |
| |
| |
| # TableMainColumn[catname] = [colname, catname2] |
| # - report this catname's issues as part of catname2 |
| # - catname.colname references catname2.oid |
| # ------------------------------------------------------------------------------- |
| TableMainColumn = {} |
| # Table with no OID |
| TableMainColumn['gp_distribution_policy'] = ['localoid', 'pg_class'] |
| TableMainColumn['gp_version_at_initdb'] = ['schemaversion', 'gp_version_at_initdb'] |
| TableMainColumn['pg_aggregate'] = ['aggfnoid', 'pg_proc'] |
| TableMainColumn['pg_appendonly'] = ['relid', 'pg_class'] |
| TableMainColumn['pg_attribute'] = ['attrelid', 'pg_class'] |
| TableMainColumn['pg_attribute_encoding'] = ['attrelid', 'pg_class'] |
| TableMainColumn['pg_auth_members'] = ['roleid', 'pg_authid'] |
| TableMainColumn['pg_autovacuum'] = ['vacrelid', 'pg_class'] |
| TableMainColumn['pg_compression'] = ['compname', 'pg_compression'] |
| TableMainColumn['pg_depend'] = ['objid', 'pg_class'] |
| TableMainColumn['pg_foreign_table'] = ['ftrelid', 'pg_class'] |
| TableMainColumn['pg_index'] = ['indexrelid', 'pg_class'] |
| TableMainColumn['pg_inherits'] = ['inhrelid', 'pg_class'] |
| TableMainColumn['pg_largeobject'] = ['loid', 'pg_largeobject'] |
| TableMainColumn['pg_pltemplate'] = ['tmplname', 'pg_pltemplate'] |
| TableMainColumn['pg_proc_callback'] = ['profnoid', 'pg_proc'] |
| TableMainColumn['pg_statistic_ext_data'] = ['stxoid', 'pg_statistic_ext'] |
| TableMainColumn['pg_type_encoding'] = ['typid', 'pg_type'] |
| TableMainColumn['pg_window'] = ['winfnoid', 'pg_proc'] |
| TableMainColumn['pg_password_history'] = ['passhistroleid', 'pg_authid'] |
| TableMainColumn['pg_description'] = ['objoid', 'pg_description'] |
| |
| # Table with OID (special case), these OIDs are known to be inconsistent |
| TableMainColumn['pg_attrdef'] = ['adrelid', 'pg_class'] |
| TableMainColumn['pg_constraint'] = ['conrelid', 'pg_class'] |
| TableMainColumn['pg_trigger'] = ['tgrelid', 'pg_class'] |
| TableMainColumn['pg_rewrite'] = ['ev_class', 'pg_class'] |
| |
| # Cast OID alias type to OID since our graph of objects is based on OID |
| # int2vector is also casted to int2[] due to the lack of an <(int2vector, int2vector) operator |
| autoCast = { |
| 'regproc': '::oid', |
| 'regprocedure': '::oid', |
| 'regoper': '::oid', |
| 'regoperator': '::oid', |
| 'regclass': '::oid', |
| 'regtype': '::oid', |
| 'int2vector': '::int2[]' |
| } |
| |
| |
| # ------------------------------------------------------------------------------- |
| class QueryException(Exception): pass |
| |
| |
| # ------------------------------------------------------------------------------- |
| # Get gpObj from GPObjects, instantiate a new one & add to GPObjects if not found |
| def getGPObject(oid, catname): |
| gpObj = GPObjects.get((oid, catname), None) |
| if gpObj is None: |
| if catname == 'pg_class': |
| gpObj = RelationObject(oid, catname) |
| else: |
| gpObj = GPObject(oid, catname) |
| GPObjects[(oid, catname)] = gpObj |
| return gpObj |
| |
| |
| # ------------------------------------------------------------------------------- |
| def getOidFromPK(catname, pkeys): |
| # pkeys: name-value pair |
| pkeystrList = ["%s='%s'" % (key, value) for key, value in pkeys.items()] |
| pkeystr = ' and '.join(pkeystrList) |
| |
| qry = """ |
| SELECT oid |
| FROM ( |
| SELECT oid FROM {catname} |
| WHERE {pkeystr} |
| UNION ALL |
| SELECT oid FROM gp_dist_random('{catname}') |
| WHERE {pkeystr} |
| ) alloids |
| GROUP BY oid |
| ORDER BY count(*) desc, oid |
| """.format(catname=catname, |
| pkeystr=pkeystr) |
| |
| try: |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| curs = db.query(qry) |
| if (len(curs.dictresult()) == 0): |
| raise QueryException("No such entry '%s' in %s" % (pkeystr, catname)) |
| |
| return curs.dictresult().pop()['oid'] |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def getClassOidForRelfilenode(relfilenode): |
| qry = "SELECT oid FROM pg_class WHERE relfilenode = %d;" % (relfilenode) |
| try: |
| dburl = dbconn.DbURL(hostname=GV.opt['-h'], port=GV.opt['-p'], dbname=GV.dbname) |
| conn = dbconn.connect(dburl) |
| oid = dbconn.queryRow(conn, qry)[0] |
| return oid |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def getResourceTypeOid(oid): |
| qry = """ |
| SELECT oid |
| FROM ( |
| SELECT oid FROM pg_resourcetype WHERE restypid = %d |
| UNION ALL |
| SELECT oid FROM gp_dist_random('pg_resourcetype') |
| WHERE restypid = %d |
| ) alloids |
| GROUP BY oid ORDER BY count(*) desc LIMIT 1 |
| """ % (oid, oid) |
| |
| try: |
| db = connect() |
| curs = db.query(qry) |
| if len(curs.dictresult()) == 0: return 0 |
| return curs.dictresult().pop()['oid'] |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| |
| |
| def filterSpuriousFailures(catname, colnames, results): |
| if catname == 'pg_depend': |
| # Catalog tables having content only on coordinator |
| # (e.g. pg_statistic*) should be excluded from pg_depend |
| # comparisons between coordinator and segments. |
| legitFailures = [] |
| coordinatorOnlyOids = list(map(lambda cat: cat._oid, |
| filter(lambda cat: cat.isCoordinatorOnly(), |
| GV.catalog.getCatalogTables()))) |
| for row in results: |
| legit = True |
| if row[colnames.index('classid')] in coordinatorOnlyOids: |
| logger.info('excluding coordinator-only entry from missing_pg_depend: %s' % str(row)) |
| legit = False |
| if legit: |
| legitFailures.append(row) |
| else: |
| legitFailures = results |
| |
| return legitFailures |
| |
| |
| #------------------------------------------------------------------------------- |
| # Process results of checks (CC and FK) for a particular catalog: |
| # - Categorize entry into GPObject |
| # - Instantiate GPObject if needed |
| # ------------------------------------------------------------------------------- |
| def processMissingDuplicateEntryResult(catname, colname, allValues, type): |
| # type = {"missing" | "duplicate"} |
| ''' |
| colname: proname | pronamespace | proargtypes | segids |
| allValues: add | 2200 | 23 23 | {2,3} |
| scube | 2200 | 1700 | {-1,0,1,3} |
| scube_accum | 2200 | 1700 1700 | {-1,0,1,3} |
| |
| colname: oid | total | segids |
| allValues: 18853 | 2 | {-1,1} |
| 18853 | 3 | {0} |
| ''' |
| gpObjName = catname |
| gpColName = None |
| pknames = [i for i in colname[:-1]] # Everything except the last column |
| |
| # This catname may not be a GPObject (e.g. pg_attribute) |
| # If these catnames have oids, they are known to be inconsistent |
| if catname in TableMainColumn: |
| gpColName = TableMainColumn[catname][0] |
| gpObjName = TableMainColumn[catname][1] |
| elif 'oid' in pknames: |
| gpColName = 'oid' |
| |
| # Process entries |
| for row in allValues: |
| rowObjName = gpObjName |
| # pygresql 4 to 5 update means that this segid is now a list of int (i.e. [-1, 0, 1]) instead of a string like "{-1,0,1}" |
| segids = "{" + ",".join(str(i) for i in row[-1]) + "}" |
| pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames) |
| |
| if gpColName != None: |
| oid = row[colname.index(gpColName)] |
| else: |
| oid = getOidFromPK(catname, pkeys) |
| pkeys = {'oid': oid} |
| |
| # Special case: constraint for domain, report pg_type.contypid |
| if catname == 'pg_constraint' and oid == 0: |
| oid = row[colname.index('contypid')] |
| rowObjName = 'pg_type' |
| |
| gpObj = getGPObject(oid, rowObjName) |
| |
| if type == "missing": |
| |
| # In how many segments (counting coordinator as segment -1) was the |
| # entry present? |
| # |
| # If it was present in half or more, then report the ones that |
| # it was *not* present as 'missing'. If it was present in half |
| # or less, then report the ones that it was present as 'extra' |
| # Note that if it was present in exactly half of the segments, |
| # we will report it as both missing and extra. |
| |
| ids = [int(i) for i in segids[1:-1].split(',')] |
| |
| if len(ids) <= (GV.max_content + 2) / 2: |
| issue = CatMissingIssue(catname, pkeys, segids, 'extra') |
| gpObj.addMissingIssue(issue) |
| |
| if len(ids) >= (GV.max_content + 2) / 2: |
| allids = [int(i) for i in range(-1, GV.max_content+1)] |
| diffids = set(allids) - set(ids) |
| # convert the difference set back into a string that looks |
| # like a PostgreSQL array. |
| segids = ",".join(str(x) for x in diffids) |
| segids = "{%s}" % (segids) |
| issue = CatMissingIssue(catname, pkeys, segids, 'missing') |
| gpObj.addMissingIssue(issue) |
| |
| else: |
| assert (type == "duplicate") |
| issue = CatDuplicateIssue(catname, pkeys, segids, row[-2]) |
| gpObj.addDuplicateIssue(issue) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def processInconsistentEntryResult(catname, pknames, colname, allValues): |
| # If tableHasInconsistentOid, columns does not have oid |
| ''' |
| 17365 | test10 | 2200 | 17366 | 0 | 0 | 17366 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [0] -- row1 |
| 17365 | test10 | 2200 | 17366 | 0 | 0 | 0 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [None] -- row2 |
| |
| 176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 4 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [-1] |
| 176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [0,1] |
| 176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [2,3] |
| ''' |
| |
| # Group allValues rows by its key (oid or primary key) into a dictionary: |
| groupedValues = {} |
| for row in allValues: |
| if 'oid' in colname: |
| keys = row[colname.index('oid')] |
| else: |
| keys = tuple((row[colname.index(n)]) for n in pknames) |
| if keys in groupedValues: |
| groupedValues[keys].append(row) |
| else: |
| groupedValues[keys] = [row] |
| |
| gpObjName = catname |
| gpColName = None |
| |
| # This catname may not be a GPObject (e.g. pg_attribute) |
| # If these catnames have oids, they are known to be inconsistent |
| if catname in TableMainColumn: |
| gpColName = TableMainColumn[catname][0] |
| gpObjName = TableMainColumn[catname][1] |
| elif 'oid' in colname: |
| gpColName = 'oid' |
| |
| for keys, values in groupedValues.items(): |
| rowObjName = gpObjName |
| pkeys = dict((n, values[0][colname.index(n)]) for n in pknames) |
| |
| if gpColName != None: |
| oid = values[0][colname.index(gpColName)] |
| else: |
| oid = getOidFromPK(catname, pkeys) |
| issue = CatInconsistentIssue(catname, colname, list(values)) |
| |
| # Special case: constraint for domain, report pg_type.contypid |
| if catname == 'pg_constraint' and oid == 0: |
| oid = values[0][colname.index('contypid')] |
| rowObjName = 'pg_type' |
| |
| gpObj = getGPObject(oid, rowObjName) |
| gpObj.addInconsistentIssue(issue) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def processForeignKeyResult(catname, pkcatname, colname, allValues): |
| ''' |
| colname: pg_attribute_attrelid | pg_attribute_attname | missing_catalog | present_key | pg_class_oid | segids |
| allValues: None | None | pg_attribute | oid | 190683 | {-1} |
| or |
| allValues: 12344 | foo | pg_class | attrelid | 190683 | {-1} |
| ''' |
| |
| gpObjName = pkcatname |
| gpColName = colname[-2].rsplit('_', 1)[1] |
| fkeytab = catname |
| # The foreign keys are all entries except the last 4 |
| # NOTE: We can't assume the first few rows are the expected issues anymore... |
| fkeylist = [name.rsplit(catname+'_', 1)[1] for name in colname[:-4]] |
| |
| # This catname may not be a GPObject (e.g. pg_attribute) |
| # 1. pg_attrdef(adrelid) referencing pg_attribute(attrelid) |
| # 2. pg_rewrite(ev_class) referencing pg_attribute(attrelid) |
| if pkcatname in TableMainColumn: |
| if gpColName == TableMainColumn[catname][0]: |
| gpObjName = TableMainColumn[catname][1] |
| |
| # Process each row |
| for row in allValues: |
| rowObjName = gpObjName |
| segids = row[-1] |
| |
| oid = row[len(colname) - 2] |
| missing_cat = row[-4] |
| |
| # Build fkeys dict: used to find oid of pg_class |
| fkeys = dict((n, row[colname.index(catname + '_' + n)]) for n in fkeylist) |
| |
| # Must get third column from end since fkeylist length can vary |
| if not fkeys: |
| fkeys = row[-3] |
| |
| # We are flipping the values of the primary key and the foreign key |
| # depending on the missing catalog, since we want to print out the |
| # missing catalog at the front of the stderr |
| existing_cat = pkcatname |
| if missing_cat == pkcatname: |
| existing_cat = fkeytab |
| |
| missing_pkeys={gpColName: oid} |
| existing_fkeys=fkeys |
| if catname == missing_cat: |
| missing_pkeys=fkeys |
| existing_fkeys={gpColName: oid} |
| |
| issue = CatForeignKeyIssue(catname=missing_cat, pkeys=missing_pkeys, |
| segids=segids, fcatname=existing_cat, fkeys=existing_fkeys) |
| |
| # Special cases: |
| # 1. pg_class(oid) referencing pg_type(oid) - relation & composite type |
| # 2. pg_resqueuecapability(restypid) referencing pg_resourcetype(restypid) |
| if pkcatname == 'pg_type' and fkeytab == 'pg_class': |
| rowObjName = 'pg_class' |
| oid = getOidFromPK(rowObjName, fkeys) |
| elif pkcatname == 'pg_resourcetype' and gpColName == 'restypid': |
| oid = getResourceTypeOid(oid) |
| gpObj = getGPObject(oid, rowObjName) |
| gpObj.addForeignKeyIssue(issue) |
| |
| def getPrimaryKeyColumn(catname, pknames): |
| # We have to look up key to report (pg_pltemplate) |
| gpObjName = catname |
| gpColName = None |
| if catname in TableMainColumn: |
| gpColName = TableMainColumn[catname][0] |
| gpObjName = TableMainColumn[catname][1] |
| elif 'oid' in pknames: |
| gpColName = 'oid' |
| return gpObjName, gpColName |
| |
| # ------------------------------------------------------------------------------- |
| def processACLResult(catname, colname, allValues): |
| ''' |
| colname: segid | datname | coordinator_acl | segment_acl |
| allValues: 2 | acldb | {=Tc/nmiller,nmiller=CTc/nmiller,person1=C/nmiller} | {=Tc/nmiller,nmiller=CTc/nmiller} |
| 1 | gptest | None | {=Tc/nmiller,nmiller=CTc/nmiller,person2=CTc/nmiller} |
| ''' |
| |
| pknames = [i for i in colname[1:-2]] |
| |
| gpObjName, gpColName = getPrimaryKeyColumn(catname, pknames) |
| |
| # Process entries |
| for row in allValues: |
| segid = row[0] |
| pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames) |
| |
| if gpColName != None: |
| oid = row[colname.index(gpColName)] |
| else: |
| oid = getOidFromPK(catname, pkeys) |
| pkeys = {'oid': oid} |
| |
| macl = row[-2] if row[-2] != None else [] |
| sacl = row[-1] if row[-1] != None else [] |
| macl_only = list(set(macl).difference(sacl)) |
| sacl_only = list(set(sacl).difference(macl)) |
| |
| issue = CatACLIssue(catname, pkeys, segid, macl_only, sacl_only) |
| gpObj = getGPObject(oid, gpObjName) |
| gpObj.addACLIssue(issue) |
| |
| |
| # ------------------------------------------------------------------------------- |
| class CatInconsistentIssue: |
| def __init__(self, catname, columns, rows): |
| |
| self.catname = catname |
| self.columns = columns |
| self.rows = rows |
| |
| def report(self): |
| |
| def format_segids(segids): |
| if segids == [None]: |
| idstr = 'all other segments' |
| else: |
| ids = segids |
| idstr = '' |
| for i in ids: |
| idstr += '%s (%s:%d) ' % \ |
| (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'], |
| GV.report_cfg[i]['port']) |
| return idstr |
| |
| for i in range(0, len(self.columns) - 1): |
| colset = set(str(j[i]) for j in self.rows) |
| if len(colset) > 1: |
| for row in self.rows: |
| myprint("%20s is '%s' on %s" % (self.columns[i], row[i], format_segids(row[-1]))) |
| |
| |
| # ------------------------------------------------------------------------------- |
| class CatForeignKeyIssue: |
| def __init__(self, catname, pkeys, segids, fcatname, fkeys): |
| |
| self.catname = catname |
| self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx |
| self.segids = segids # list of affected segids |
| self.fcatname = fcatname # checked object catname |
| self.fkeys = fkeys # string of fkeys=xxxx |
| |
| def report(self): |
| |
| ids = self.segids |
| idstr = '' |
| if len(ids) == len(GV.report_cfg): |
| idstr = 'all segments' |
| else: |
| for i in ids: |
| idstr += '%s (%s:%d) ' % \ |
| (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'], |
| GV.report_cfg[i]['port']) |
| |
| myprint(" No %s %s entry for %s %s on %s" \ |
| % (self.catname, self.pkeys, self.fcatname, self.fkeys, idstr)) |
| |
| |
| # ------------------------------------------------------------------------------- |
| class CatMissingIssue: |
| def __init__(self, catname, pkeys, segids, type): |
| |
| self.catname = catname |
| self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx |
| self.segids = segids # list of affected segids |
| self.type = type # missing or extra |
| assert (self.type in ['missing', 'extra']) |
| |
| def report(self): |
| |
| ids = [int(i) for i in self.segids[1:-1].split(',')] |
| idstr = '' |
| if len(ids) == len(GV.report_cfg): |
| idstr = 'all segments' |
| else: |
| for i in ids: |
| idstr += '%s (%s:%d) ' % \ |
| (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'], |
| GV.report_cfg[i]['port']) |
| |
| if self.catname == 'pg_attribute': |
| myprint(" %s column '%s' on %s" \ |
| % (self.type.capitalize(), self.pkeys['attname'], idstr)) |
| elif self.catname == 'pg_class': |
| myprint(' %s relation metadata for %s on %s' \ |
| % (self.type.capitalize(), str(self.pkeys), idstr)) |
| else: |
| myprint(' %s %s metadata of %s on %s' \ |
| % (self.type.capitalize(), self.catname[3:], str(self.pkeys), idstr)) |
| |
| |
| # ------------------------------------------------------------------------------- |
| class CatDuplicateIssue: |
| def __init__(self, catname, pkeys, segids, enum): |
| self.catname = catname |
| self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx |
| self.segids = segids # list of affected segids |
| self.enum = enum # number of entries |
| |
| def report(self): |
| """ Found # catname entries of "oid=XXXXX" on segment {Y,Z} """ |
| myprint(' Found %d %s entries of %s on segment %s' \ |
| % (self.enum, self.catname, str(self.pkeys), str(self.segids))) |
| |
| |
| # ------------------------------------------------------------------------------- |
| class CatACLIssue: |
| def __init__(self, catname, pkeys, segid, macl_only, sacl_only): |
| |
| self.catname = catname |
| self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx |
| self.segid = segid # the affected segid |
| self.macl_only = macl_only # acl appears only on coordinator |
| self.sacl_only = sacl_only # acl appears only on segment |
| |
| def report(self): |
| """ Coordinator (host:port) and seg# (host:port) have different ACL: |
| Exist(s) on coordinator only: [...........] |
| Exist(s) on seg# only: [...........] |
| """ |
| mstr = 'Coordinator (%s:%s)' % (GV.report_cfg[-1]['hostname'], GV.report_cfg[-1]['port']) |
| sstr = '%s (%s:%s)' % (GV.report_cfg[self.segid]['segname'], |
| GV.report_cfg[self.segid]['hostname'], |
| GV.report_cfg[self.segid]['port']) |
| myprint(' %s and %s have different ACLs:' % (mstr, sstr)) |
| if len(self.macl_only) > 0: |
| myprint(' Exist(s) on coordinator only: %s' % (self.macl_only)) |
| if len(self.sacl_only) > 0: |
| myprint(' Exist(s) on %s only: %s ' % \ |
| (GV.report_cfg[self.segid]['segname'], self.sacl_only)) |
| |
| # ------------------------------------------------------------------------------- |
| class CatUniqueIndexViolationIssue: |
| def __init__(self, table_name, index_name): |
| self.catname = table_name |
| self.index_name = index_name |
| |
| def report(self): |
| myprint( |
| ' Table %s has a violated unique index: %s' |
| % (self.catname, self.index_name) |
| ) |
| |
| class CatDependencyIssue: |
| def __init__(self, table_name, oid, content): |
| self.catname = table_name |
| self.oid = oid |
| self.content = content |
| |
| def report(self): |
| myprint( |
| ' Table %s has a dependency issue on oid %s at content %s' |
| % (self.catname, self.oid, self.content) |
| ) |
| |
| # ------------------------------------------------------------------------------- |
| class CatOrphanToastTableIssue: |
| def __init__(self, oid, catname, issue, segments): |
| self.oid = oid |
| self.catname = catname |
| self.issue = issue |
| self.segments = segments |
| |
| def report(self): |
| myprint('%s' % self.issue.description) |
| myprint('''On segment(s) %s table '%s' (oid: %s) %s''' % (', '.join(map(str, sorted(self.segments))), self.catname, self.oid, self.issue.cause)) |
| |
| # ------------------------------------------------------------------------------- |
| class GPObject: |
| def __init__(self, oid, catname): |
| self.oid = oid |
| self.catname = catname |
| self.missingIssues = {} # key=issue.catname, value=list of catMissingIssue |
| self.inconsistentIssues = {} # key=issue.catname, value=list of catInconsistentIssue |
| self.duplicateIssues = {} # key=issue.catname, value=list of catDuplicateIssue |
| self.aclIssues = {} # key=issue.catname, value=list of catACLIssue |
| self.foreignkeyIssues = {} # key=issue.catname, value=list of catForeignKeyIssue |
| self.dependencyIssues = {} # key=issue.catname, value=list of catDependencyIssue |
| self.orphanToastTableIssues = {} # key=issue.catname, value=list of orphanToastTableIssues |
| |
| def addDependencyIssue(self, issue): |
| if issue.catname in self.dependencyIssues: |
| self.dependencyIssues[issue.catname].append(issue) |
| else: |
| self.dependencyIssues[issue.catname] = [issue] |
| |
| def addMissingIssue(self, issue): |
| if issue.catname in self.missingIssues: |
| self.missingIssues[issue.catname].append(issue) |
| else: |
| self.missingIssues[issue.catname] = [issue] |
| |
| def addInconsistentIssue(self, issue): |
| if issue.catname in self.inconsistentIssues: |
| self.inconsistentIssues[issue.catname].append(issue) |
| else: |
| self.inconsistentIssues[issue.catname] = [issue] |
| |
| def addDuplicateIssue(self, issue): |
| if issue.catname in self.duplicateIssues: |
| self.duplicateIssues[issue.catname].append(issue) |
| else: |
| self.duplicateIssues[issue.catname] = [issue] |
| |
| def addACLIssue(self, issue): |
| if issue.catname in self.aclIssues: |
| self.aclIssues[issue.catname].append(issue) |
| else: |
| self.aclIssues[issue.catname] = [issue] |
| |
| def addForeignKeyIssue(self, issue): |
| if issue.catname in self.foreignkeyIssues: |
| self.foreignkeyIssues[issue.catname].append(issue) |
| else: |
| self.foreignkeyIssues[issue.catname] = [issue] |
| |
| def addOrphanToastTableIssue(self, issue): |
| if issue.catname in self.orphanToastTableIssues: |
| self.orphanToastTableIssues[issue.catname].append(issue) |
| else: |
| self.orphanToastTableIssues[issue.catname] = [issue] |
| |
| def isTopLevel(self): |
| return True |
| |
| def reportAllIssues(self): |
| """ |
| We want to prevent oids from certain catalog tables to appear on |
| stdout. Hence we create a separate function for myprint which |
| will log all the oid inconsistencies as WARNING instead of |
| CRITICAL in order to log it in the log file. |
| """ |
| |
| def __myprint(string): |
| if self.catname in ['pg_constraint']: |
| _myprint(string, logging.WARNING) |
| else: |
| _myprint(string) |
| |
| global myprint |
| myprint = __myprint |
| |
| if self.__class__ == GPObject: |
| myprint('') |
| myprint('----------------------------------------------------') |
| myprint('Object oid: %s' % (self.oid)) |
| myprint('Table name: %s' % (self.catname)) |
| myprint('') |
| |
| # Report dependency issues |
| if len(self.dependencyIssues): |
| for catname, issues in self.dependencyIssues.items(): |
| myprint(' Name of test which found this issue: dependency_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| # Report inconsistent issues |
| if len(self.inconsistentIssues): |
| for catname, issues in self.inconsistentIssues.items(): |
| myprint(' Name of test which found this issue: inconsistent_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| # Report missing issues |
| if len(self.missingIssues): |
| omitlist = ['pg_attribute', 'pg_attribute_encoding', 'pg_type', 'pg_appendonly', 'pg_index', 'pg_password_history', 'pg_directory_table'] |
| if 'pg_class' in self.missingIssues: |
| myprint(' Name of test which found this issue: missing_extraneous_pg_class') |
| for name in omitlist: |
| if name in self.missingIssues: |
| myprint(' Name of test which found this issue: missing_extraneous_%s' % name) |
| for each in self.missingIssues['pg_class']: |
| each.report() |
| |
| for catname, issues in self.missingIssues.items(): |
| if catname != 'pg_class' and catname not in omitlist: |
| myprint(' Name of test which found this issue: missing_extraneous_%s' % catname) |
| for each in issues: |
| each.report() |
| else: |
| for catname, issues in self.missingIssues.items(): |
| myprint(' Name of test which found this issue: missing_extraneous_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| |
| |
| # Report foreign key issues |
| if len(self.foreignkeyIssues): |
| for catname, issues in self.foreignkeyIssues.items(): |
| myprint(' Name of test which found this issue: foreign_key_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| # Report duplicate issues |
| if len(self.duplicateIssues): |
| for catname, issues in self.duplicateIssues.items(): |
| myprint(' Name of test which found this issue: duplicate_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| # Report ACL issues |
| if len(self.aclIssues): |
| for catname, issues in self.aclIssues.items(): |
| myprint(' Name of test which found this issue: acl_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| # Report Orphan Toast Table issues |
| if len(self.orphanToastTableIssues): |
| for catname, issues in self.orphanToastTableIssues.items(): |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| myprint = _myprint |
| |
| # Collect all tables with missing issues for later reporting |
| if len(self.missingIssues): |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| oid_query = "select (select nspname from pg_namespace where oid=relnamespace) || '.' || relname from pg_class where oid=%d" |
| type_query = "select (select nspname from pg_namespace where oid=relnamespace) || '.' || relname from pg_class where reltype=%d" |
| for issues in self.missingIssues.values() : |
| for issue in issues: |
| # Get schemaname.tablename corresponding to oid |
| for key in issue.pkeys: |
| if 'relid' in key or key in ['ev_class', 'reloid']: |
| table_list = db.query(oid_query % issue.pkeys[key]).getresult() |
| if table_list: |
| if issue.type == 'missing': |
| GV.missing_attr_tables.append( (table_list[0][0], issue.segids) ) |
| else: |
| GV.extra_attr_tables.append( (table_list[0][0], issue.segids) ) |
| elif key == 'oid': |
| table_list = db.query(type_query % issue.pkeys[key]).getresult() |
| if table_list: |
| if issue.type == 'missing': |
| GV.missing_attr_tables.append( (table_list[0][0], issue.segids) ) |
| else: |
| GV.extra_attr_tables.append( (table_list[0][0], issue.segids) ) |
| |
| |
| def __cmp__(self, other): |
| if isinstance(other, GPObject): |
| return cmp((self.oid, self.catname), (other.oid, other.catname)) |
| else: |
| return NotImplemented |
| |
| def __hash__(self): |
| return hash((self.oid, self.catname)) |
| |
| |
| # ------------------------------------------------------------------------------- |
| class RelationObject(GPObject): |
| def __init__(self, oid, catname): |
| GPObject.__init__(self, oid, catname) |
| self.relname = None |
| self.nspname = None |
| self.relkind = None |
| self.paroid = None |
| |
| def reportAllIssues(self): |
| oid = 'N/A' if self.oid is None else str(self.oid) |
| nspname = 'N/A' if self.nspname is None else self.nspname |
| relname = 'N/A' if self.relname is None else self.relname |
| |
| if self.isTopLevel(): |
| myprint('') |
| myprint('----------------------------------------------------') |
| |
| objname = 'Type' if self.relkind == 'c' else 'Relation' |
| myprint('%s oid: %s' % (objname, oid)) |
| myprint('%s schema: %s' % (objname, nspname)) |
| myprint('%s name: %s' % (objname, relname)) |
| |
| else: |
| myprint(' Sub-object: ') |
| myprint(' ----------------------------------------------------') |
| myprint(' Relation oid: %s' % oid) |
| myprint(' Relation schema: %s' % nspname) |
| myprint(' Relation name: %s' % relname) |
| myprint('') |
| GPObject.reportAllIssues(self) |
| |
| def setRelInfo(self, relname, nspname, relkind, paroid): |
| self.relname = relname |
| self.nspname = nspname |
| self.relkind = relkind |
| self.paroid = paroid |
| |
| def isTopLevel(self): |
| if self.relkind == 'i' or self.relkind == 't' or self.relkind == 'o': |
| return False |
| return True |
| |
| |
| # ------------------------------------------------------------------------------- |
| def getRelInfo(objects): |
| # Get relinfo: relname, relkind, par oid for all oids |
| # get relname, relkind |
| # left outer join with union of |
| # - get par oid for toast |
| # - get par oid for aoco |
| # - get par oid for index |
| # for each query: if there is inconsistency, majority wins |
| |
| if objects == {}: return |
| |
| oids = [oid for (oid, catname) in objects if catname == 'pg_class'] |
| if oids == [] or None in oids: return |
| |
| qry = """ |
| SELECT oid, relname, nspname, relkind, paroid |
| FROM ( |
| SELECT oid, relname, nspname, relkind |
| FROM ( |
| SELECT oid, relname, nspname, relkind, rank() over (partition by oid order by count(*) desc) |
| FROM |
| ( |
| SELECT c.oid, c.relname, c.relkind, n.nspname |
| FROM pg_class c |
| left outer join pg_namespace n |
| on (c.relnamespace = n.oid) |
| WHERE c.oid in ({oids}) |
| UNION ALL |
| SELECT c.oid, c.relname, c.relkind, n.nspname |
| FROM gp_dist_random('pg_class') c |
| left outer join gp_dist_random('pg_namespace') n |
| on (c.relnamespace = n.oid and c.gp_segment_id = n.gp_segment_id) |
| WHERE c.oid in ({oids}) |
| ) allrelinfo |
| GROUP BY oid, relname, nspname, relkind |
| ) relinfo |
| WHERE rank=1 |
| ) relinfo |
| |
| LEFT OUTER JOIN |
| |
| ( |
| SELECT reltoastrelid as childoid, oid as paroid |
| FROM ( |
| SELECT reltoastrelid, oid, rank() over (partition by reltoastrelid order by count(*) desc) |
| FROM |
| ( SELECT reltoastrelid, oid FROM pg_class |
| WHERE reltoastrelid in ({oids}) |
| UNION ALL |
| SELECT reltoastrelid, oid FROM gp_dist_random('pg_class') |
| WHERE reltoastrelid in ({oids}) |
| ) allpar_toast |
| GROUP BY reltoastrelid, oid |
| ) par_toast |
| WHERE rank=1 |
| |
| UNION ALL |
| |
| SELECT segrelid as childoid, relid as paroid |
| FROM ( |
| SELECT segrelid, relid, rank() over (partition by segrelid order by count(*) desc) |
| FROM |
| ( SELECT segrelid, relid FROM pg_appendonly |
| WHERE segrelid in ({oids}) |
| UNION ALL |
| SELECT segrelid, relid FROM gp_dist_random('pg_appendonly') |
| WHERE segrelid in ({oids}) |
| ) allpar_aoco |
| GROUP BY segrelid, relid |
| ) par_aoco |
| WHERE rank=1 |
| |
| UNION ALL |
| |
| SELECT indexrelid as childoid, indrelid as paroid |
| FROM ( |
| SELECT indexrelid, indrelid, rank() over (partition by indexrelid order by count(*) desc) |
| FROM |
| ( SELECT indexrelid, indrelid FROM pg_index |
| WHERE indexrelid in ({oids}) |
| UNION ALL |
| SELECT indexrelid, indrelid FROM gp_dist_random('pg_index') |
| WHERE indexrelid in ({oids}) |
| ) allpar_index |
| GROUP BY indexrelid, indrelid |
| ) par_index |
| WHERE rank=1 |
| ) par ON childoid = oid |
| """.format(oids=','.join(map(str, oids))) |
| |
| try: |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| curs = db.query(qry) |
| for row in curs.getresult(): |
| (oid, relname, nspname, relkind, paroid) = row |
| objects[oid, 'pg_class'].setRelInfo(relname, nspname, relkind, paroid) |
| |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def buildGraph(): |
| def buildGraphRecursive(objects, graph): |
| if objects == {}: return |
| getRelInfo(objects) |
| localObjects = {} |
| for (oid, catname), obj in objects.items(): |
| # Top level object, add to graph |
| if obj.isTopLevel(): |
| if obj not in graph: |
| graph[obj] = [] |
| # Not top level object, find parent, add to localObjects if needed |
| else: |
| parobj = objects.get((obj.paroid, catname), None) |
| if parobj is None: |
| parobj = localObjects.get((obj.paroid, catname), None) |
| if parobj is None: |
| parobj = RelationObject(obj.paroid, catname) |
| localObjects[(obj.paroid, catname)] = parobj |
| assert (parobj is not None) |
| # Add obj and parobj to graph |
| if parobj not in graph: |
| graph[parobj] = [] |
| if obj not in graph[parobj]: |
| graph[parobj].append(obj) |
| buildGraphRecursive(localObjects, graph) |
| |
| buildGraphRecursive(GPObjects, GPObjectGraph) |
| |
| |
| # ------------------------------------------------------------------------------- |
| def checkcatReport(): |
| def reportAllIssuesRecursive(par, graph): |
| par.reportAllIssues() |
| if par not in graph: |
| return |
| for child in graph[par]: |
| reportAllIssuesRecursive(child, graph) |
| |
| buildGraph() |
| reportedCheck = ['duplicate','missing_extraneous','inconsistent','foreign_key','acl', 'orphaned_toast_tables'] |
| myprint('') |
| myprint('SUMMARY REPORT: %s' % ('FAILED' if len(GV.failedChecks) else 'PASSED')) |
| myprint('===================================================================') |
| elapsed = datetime.timedelta(seconds=int(GV.elapsedTime)) |
| endTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
| myprint('Completed %d test(s) on database \'%s\' at %s with elapsed time %s' \ |
| % (GV.totalCheckRun, GV.dbname, endTime, elapsed)) |
| |
| if len(GPObjectGraph) == 0 and len(GV.failedChecks) == 0: |
| myprint('Found no catalog issue\n') |
| assert(GV.retcode == SUCCESS) |
| return |
| |
| if len(GPObjectGraph) > 0: |
| total = 0 |
| for par in GPObjectGraph: |
| if par.isTopLevel() and par.catname not in ['pg_constraint']: total += 1 |
| if total == 0: |
| myprint('Found no catalog issue\n') |
| if par.catname in ['pg_constraint']: |
| myprint('Warnings were generated, see %s for detail\n' % gplog.get_logfile()) |
| else: |
| myprint('Found a total of %d issue(s)' % total) |
| |
| for par in GPObjectGraph: |
| if par.isTopLevel(): |
| reportAllIssuesRecursive(par, GPObjectGraph) |
| myprint('') |
| |
| # Report tables with missing attributes in a more usable format |
| if len(GV.missing_attr_tables) or len(GV.extra_attr_tables): |
| # Expand partition tables |
| db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False) |
| |
| GV.missing_attr_tables = list(set(GV.missing_attr_tables)) |
| if len(GV.missing_attr_tables) > 0: |
| myprint('----------------------------------------------------') |
| myprint(" Tables with missing issues:") |
| myprint(" Format [database name].[schema name].[table name].[segment id]:") |
| for table, segids in sorted(GV.missing_attr_tables): |
| for id in segids[1:-1].split(','): |
| myprint(" Table %s.%s.%s" % (GV.dbname, table, id)) |
| myprint('') |
| |
| GV.extra_attr_tables = list(set(GV.extra_attr_tables)) |
| if len(GV.extra_attr_tables) > 0: |
| myprint('----------------------------------------------------') |
| myprint(" Tables with extra issues:") |
| myprint(" Format [database name].[schema name].[table name].[segment id]:") |
| for table, segids in sorted(GV.extra_attr_tables): |
| for id in segids[1:-1].split(','): |
| myprint(" Table %s.%s.%s" % (GV.dbname, table, id)) |
| myprint('') |
| myprint('') |
| |
| notReported = set(GV.failedChecks).difference(reportedCheck) |
| if len(notReported) > 0: |
| myprint('Failed test(s) that are not reported here: %s' % (', '.join(notReported))) |
| myprint('See %s for detail\n' % gplog.get_logfile()) |
| |
| |
| def _myprint(str, level=logging.CRITICAL): |
| gplog.log_literal(logger, level, str) |
| |
| |
| myprint = _myprint |
| |
| def generateVerifyFile(catname, fields, results, checkname): |
| in_clause = reduce(lambda x, y: x + ('' if not x else ',') + str(y[fields.index('oid')]), results, '') |
| verify_sql = ''' |
| SELECT * |
| FROM ( |
| SELECT relname, oid FROM pg_class WHERE reltype IN ({in_clause}) |
| UNION ALL |
| SELECT relname, oid FROM gp_dist_random('pg_class') WHERE reltype IN ({in_clause}) |
| ) alltyprelids |
| GROUP BY relname, oid ORDER BY count(*) desc |
| '''.format(in_clause=in_clause) |
| filename = 'gpcheckcat.verify.%s.%s.%s.%s.sql' % (GV.dbname, catname, checkname, GV.timestamp) |
| try: |
| with open(filename, 'w') as fp: |
| fp.write(verify_sql + '\n') |
| except Exception as e: |
| logger.warning('Unable to generate verify file for %s (%s)' % (catname, str(e))) |
| |
| def truncate_batch_size(primaries): |
| if GV.opt['-B'] > primaries: |
| GV.opt['-B'] = primaries |
| myprint("Truncated batch size to number of primaries: %d" % primaries) |
| |
| def check_gpexpand(): |
| check_result, msg = conflict_with_gpexpand("gpcheckcat", |
| refuse_phase1=True, |
| refuse_phase2=False) |
| if not check_result: |
| myprint(msg) |
| sys.exit(1) |
| |
| def check_test_subset_parameter_count(): |
| test_option_count = 0 |
| if GV.opt['-R']: |
| test_option_count += 1 |
| if GV.opt['-s']: |
| test_option_count += 1 |
| if GV.opt['-C']: |
| test_option_count += 1 |
| if test_option_count > 1: |
| myprint("Error: multiple test subset options are selected. Please pass only one of [-R, -s, -C] if necessary.\n") |
| setError(ERROR_NOREPAIR) |
| sys.exit(GV.retcode) |
| return |
| |
| def check_test_names_parsed(tests): |
| correct_tests = [] |
| for t in tests: |
| if t not in all_checks: |
| myprint("'%s' is not a valid test" % t) |
| else: |
| correct_tests.append(t) |
| return correct_tests |
| |
| |
| def main(): |
| parseCommandLine() |
| if GV.opt['-l']: |
| listAllChecks() |
| sys.exit(GV.retcode) |
| |
| GV.version = getversion() |
| # Our current implementation does always support gpcheckcat |
| #if GV.version < "4.0": |
| # myprint("Error: only Apache Cloudberry version >= 4.0 are supported\n") |
| # sys.exit(GV.retcode) |
| |
| check_test_subset_parameter_count() |
| |
| # gpcheckcat should check gpexpand running status |
| check_gpexpand() |
| |
| GV.cfg = getGPConfiguration() |
| truncate_batch_size(len(GV.cfg.keys())) |
| |
| GV.report_cfg = getReportConfiguration() |
| GV.max_content = max([GV.cfg[dbid]['content'] for dbid in GV.cfg]) |
| |
| for dbid in GV.cfg: |
| if (GV.cfg[dbid]['content'] == -1): |
| GV.coordinator_dbid = dbid |
| break |
| |
| if GV.coordinator_dbid is None: |
| myprint("Error: coordinator configuration info not found in gp_segment_configuration\n") |
| setError(ERROR_NOREPAIR) |
| sys.exit(GV.retcode) |
| |
| GV.catalog = getCatalog() |
| leaked_schema_dropper = LeakedSchemaDropper() |
| |
| for dbname in GV.alldb: |
| |
| # Reset global variables |
| GV.retcode=SUCCESS |
| GV.reset_stmt_queues() |
| GPObjects.clear() |
| GPObjectGraph.clear() |
| GV.dbname = dbname |
| myprint('') |
| myprint("Connected as user \'%s\' to database '%s', port '%d', gpdb version '%s'" \ |
| % (GV.opt['-U'], GV.dbname, GV.report_cfg[-1]['port'], GV.version)) |
| myprint('-------------------------------------------------------------------') |
| myprint('Batch size: %s' % GV.opt['-B']) |
| |
| drop_leaked_schemas(leaked_schema_dropper, dbname) |
| |
| if GV.opt['-C']: |
| namestr = GV.opt['-C'] |
| try: |
| catalog_table_obj = getCatObj(namestr) |
| runCheckCatname(catalog_table_obj) |
| except Exception as e: |
| setError(ERROR_NOREPAIR) |
| sys.exit(GV.retcode) |
| else: |
| if GV.opt['-R']: |
| run_names = GV.opt['-R'] |
| run_names_array = [x.strip() for x in run_names.split(",")] |
| run_tests = check_test_names_parsed(run_names_array) |
| elif GV.opt['-s']: |
| skip_names = GV.opt['-s'] |
| skip_names_array = [x.strip() for x in skip_names.split(",")] |
| skip_tests = check_test_names_parsed(skip_names_array) |
| run_tests = [x for x in all_checks if x not in skip_tests] |
| else: |
| run_tests = all_checks.keys() |
| |
| runAllChecks(run_tests) |
| |
| checkcatReport() |
| setScriptRetCode(GV.retcode) |
| |
| # skip shared tables on subsequent passes |
| if not GV.opt['-S']: |
| GV.opt['-S'] = "none" |
| |
| sys.exit(GV.script_retcode) |
| |
| ############# |
| if __name__ == '__main__': |
| main() |