| #!/usr/bin/env python |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| ''' |
| Usage: gpcheckcat [<option>] [dbname] |
| |
| -? |
| -B parallel: number of worker threads |
| -g dir : generate SQL to rectify catalog corruption, put it in dir |
| -p port : DB port number |
| -P passwd : DB password |
| -U uname : DB User Name |
| -v : verbose |
| -A : all databases |
| -S option : shared table options (none, only) |
| -O : Online |
| -l : list all tests |
| -R test : run this particular test |
| -C catname : run cross consistency, FK and ACL tests for this catalog table |
| |
| ''' |
| import sys, os, subprocess, tempfile, stat, re |
| from threading import Thread |
| from datetime import datetime |
| from time import localtime, strftime |
| import time |
| import getopt |
| |
| try: |
| from gppylib.db import dbconn |
| from gppylib.gplog import * |
| from gppylib.gpcatalog import * |
| from gppylib.commands.unix import * |
| from pygresql.pgdb import DatabaseError |
| from pygresql import pg |
| except ImportError, e: |
| sys.exit('Error: unable to import module: ' + str(e)) |
| |
| parallelism = 8 |
| # cache OID -> object name cache |
| oidmap = {} |
| |
| #------------------------------------------------------------------------------- |
| EXECNAME = os.path.split(__file__)[-1] |
| setup_tool_logging(EXECNAME,getLocalHostname(),getUserName()) |
| very_quiet_stdout_logging() |
| logger = get_default_logger() |
| #------------------------------------------------------------------------------- |
| |
| ################ |
| def parse_int(val): |
| try: val = int(val) |
| except ValueError: val = 0 |
| return val |
| |
| ############################### |
| def quote_value(name): |
| """Add surrounding single quote, double interior single quote.""" |
| assert isinstance(name,str) |
| return "'" + "''".join(name.split("'")) + "'" |
| |
| |
| SUCCESS=0 |
| ERROR_REMOVE=1 |
| ERROR_RESYNC=2 |
| ERROR_NOREPAIR=3 |
| def setError(level): |
| ''' |
| Increases the error level to the specified level, if specified level is |
| lower than the existing level no change is made. |
| |
| error level 0 => success |
| error level 1 => error, with repair script removes objects |
| error level 2 => error, with repair script that resynchronizes objects |
| error level 3 => error, no repair script |
| ''' |
| GV.retcode = max(level, GV.retcode) |
| |
| |
| ############################### |
| class Global(): |
| |
| def __init__(self): |
| self.retcode = SUCCESS |
| self.opt = {} |
| self.opt['-h'] = None |
| self.opt['-p'] = None |
| self.opt['-P'] = None |
| self.opt['-U'] = None |
| self.opt['-v'] = False |
| self.opt['-V'] = False |
| self.opt['-t'] = False |
| |
| self.opt['-g'] = 'gpcheckcat.repair.' + strftime('%Y-%m-%d.%H.%M.%S', localtime()) |
| self.opt['-B'] = parallelism |
| self.opt['-T'] = None |
| |
| self.opt['-A'] = False |
| self.opt['-S'] = None |
| self.opt['-O'] = False |
| |
| self.opt['-R'] = None |
| self.opt['-C'] = None |
| self.opt['-l'] = False |
| |
| self.cfg = None |
| self.dbname = None |
| self.firstdb = None |
| self.alldb = [] |
| self.db = {} |
| self.tmpdir = None |
| |
| self.reset_stmt_queues() |
| |
| self.home = os.environ.get('HOME') |
| if self.home == None: |
| usage('Error: $HOME must be set') |
| self.user = os.environ.get('USER') or os.environ.get('LOGNAME') |
| if self.user == None: |
| usage('Error: either $USER or $LOGNAME must be set') |
| |
| self.catalog = None |
| self.max_content = 0 |
| self.report_cfg = {} |
| |
| def reset_stmt_queues(self): |
| |
| # dictionary of SQL statements. Key is dbid |
| self.Remove = {} |
| |
| # dictionary of SQL statements. Key is dbid |
| self.AdjustConname = {} |
| |
| # dictionary of SQL statements. Key is dbid |
| self.DemoteConstraint = {} |
| |
| # array of SQL statements. Key is dbid |
| self.ReSync = {} |
| |
| # Indexes to rebuild |
| self.Reindex = [] |
| |
| # constraints which are actually unenforcable |
| self.Constraints = [] |
| |
| # ownership fixups |
| self.Owners = [] |
| |
| # drop leaked temporary schemas |
| self.Schemas = [] |
| |
| # fix distribution policies |
| self.Policies = [] |
| |
| self.missingEntryStatus = None |
| self.inconsistentEntryStatus = None |
| self.foreignKeyStatus = None |
| self.aclStatus = None |
| |
| |
| # the following variables used for reporting purposes |
| self.elapsedTime = 0 |
| self.totalTestRun = 0 |
| self.testStatus = True |
| self.failedTest = [] |
| |
| GV = Global() |
| |
| |
| |
| ############################### |
| def usage(exitarg = None): |
| print __doc__ |
| sys.exit(exitarg) |
| |
| |
| ############################### |
| |
| def getversion(): |
| versions = ["3.2", "3.3", "4.0", "4.1", "main"] |
| db = connect() |
| curs = db.query(''' |
| select regexp_replace(version(), |
| E'.*PostgreSQL [^ ]+ .Greenplum Database ([1-9]+.[0-9]+|main).*', |
| E'\\\\1') as ver;''') |
| |
| row = curs.getresult()[0] |
| version = row[0] |
| |
| logger.debug('got version %s' % version) |
| return version |
| |
| ############################### |
| def getalldbs(): |
| """ |
| get all connectable databases |
| """ |
| db = connect() |
| curs = db.query(''' |
| select datname from pg_database where datallowconn order by datname ''') |
| row = curs.getresult() |
| return row |
| |
| ############################### |
| def parseCommandLine(): |
| try: |
| (options, args) = getopt.getopt(sys.argv[1:], '?p:P:U:B:vg:t:AOS:R:C:l') |
| except Exception, e: |
| usage('Error: ' + str(e)) |
| |
| for (switch, val) in options: |
| if switch == '-?': usage(0) |
| elif switch[1] in 'pBPUgSRC': GV.opt[switch] = val |
| elif switch[1] in 'vtAOl': GV.opt[switch] = True |
| |
| def setdef(x, v): |
| if not GV.opt[x]: |
| t = os.environ.get(v) |
| GV.opt[x] = t |
| if t: |
| logger.debug('%s not specified; default to %s (from %s)' % |
| (x, t, v)) |
| |
| if GV.opt['-l']: return |
| |
| if GV.opt['-v']: |
| enable_verbose_logging() |
| |
| setdef('-P', 'PGPASSWORD') |
| setdef('-U', 'PGUSER') |
| setdef('-U', 'USER') |
| setdef('-p', 'PGPORT') |
| if not GV.opt['-p']: |
| usage('Please specify -p port') |
| GV.opt['-p'] = parse_int(GV.opt['-p']) |
| |
| GV.opt['-h'] = 'localhost' |
| logger.debug('Set default hostname to %s' % GV.opt['-h']) |
| |
| if GV.opt['-R']: |
| logger.debug('Run this test: %s' % GV.opt['-R']) |
| |
| if GV.opt['-C']: |
| GV.opt['-C'] = GV.opt['-C'].lower() |
| logger.debug('Run cross consistency test for table: %s' % GV.opt['-C']) |
| |
| if (len(args) != 0 and len(args) != 1): |
| usage('Too many arguments') |
| |
| if len(args) == 0: |
| GV.dbname = os.environ.get('PGDATABASE') |
| if GV.dbname is None: |
| GV.dbname = 'template1' |
| logger.debug('database is %s' % GV.dbname) |
| else: |
| GV.dbname = args[0] |
| |
| GV.firstdb = GV.dbname |
| GV.alldb.append(GV.firstdb) |
| |
| # build list of all connectable databases |
| if GV.opt['-A']: |
| alldb = getalldbs() |
| GV.alldb.pop() |
| for adbname in alldb: |
| GV.alldb.append(adbname[0]) |
| |
| if GV.opt['-S']: |
| if not re.match("none|only", GV.opt['-S'], re.I): |
| usage('Error: invalid value \'%s\' for shared table option. Legal values are (none, only)'% GV.opt['-S']); |
| GV.opt['-S'] = GV.opt['-S'].lower() |
| |
| logger.debug('master at host %s, port %s, user %s, database %s' % |
| (GV.opt['-h'], GV.opt['-p'], GV.opt['-U'], GV.dbname)) |
| |
| try: |
| GV.opt['-B'] = int(GV.opt['-B']) |
| except Exception, e: |
| usage('Error: ' + str(e)) |
| |
| if GV.opt['-B'] < 1: |
| usage('Error: parallelism must be 1 or greater') |
| |
| logger.debug('degree of parallelism: %s' % GV.opt['-B']) |
| |
| ############# |
| def connect(user=None, password=None, host=None, port=None, |
| database=None, utilityMode=False): |
| '''Connect to DB using parameters in GV''' |
| options = utilityMode and '-c gp_session_role=utility' or None |
| if not user: user = GV.opt['-U'] |
| if not password: password = GV.opt['-P'] |
| if not host: host = GV.opt['-h'] |
| if not port: port = GV.opt['-p'] |
| if not database: database = GV.dbname |
| try: |
| logger.debug('connecting to %s:%s %s' % (host, port, database)) |
| db = pg.connect(host=host, port=port, user=user, |
| passwd=password, dbname=database, opt=options) |
| except pg.InternalError, ex: |
| logger.fatal('could not connect to %s: "%s"' % |
| (database, str(ex).strip())) |
| exit(1) |
| |
| logger.debug('connected with %s:%s %s' % (host, port, database)) |
| return db |
| |
| ############# |
| def connect2(cfgrec, user=None, password=None, database=None, utilityMode=True): |
| host = cfgrec['address'] |
| port = cfgrec['port'] |
| datadir = cfgrec['datadir'] |
| logger.debug('connect %s:%s:%s' % (host, port, datadir)) |
| if not database: database = GV.dbname |
| if cfgrec['content'] == -1: |
| utilityMode = False |
| |
| key = "%s.%s.%s.%s.%s.%s.%s" % (host, port, datadir, user, password, database, |
| str(utilityMode)) |
| conns = GV.db.get(key) |
| if conns: |
| return conns[0] |
| |
| conn = connect(host=host, port=port, user=user, password=password, |
| database=database, utilityMode=utilityMode) |
| if conn: |
| GV.db[key] = [conn, cfgrec] |
| |
| return conn |
| |
| class execThread(Thread): |
| def __init__(self, cfg, db, qry): |
| self.cfg = cfg |
| self.db = db |
| self.qry = qry |
| self.curs = None |
| self.error = None |
| Thread.__init__(self) |
| |
| def run(self): |
| try: |
| self.curs = self.db.query(self.qry) |
| except BaseException, e: |
| self.error = e |
| |
| |
| def processThread(threads): |
| batch = [] |
| for th in threads: |
| logger.debug('waiting on thread %s' % th.getName()) |
| th.join() |
| if th.error: |
| setError(ERROR_NOREPAIR) |
| myprint("%s:%d:%s : %s" % |
| (th.cfg['hostname'], |
| th.cfg['port'], |
| th.cfg['datadir'], |
| str(th.error))) |
| else: |
| batch.append([th.cfg, th.curs]) |
| return batch |
| |
| |
| ############# |
| def connect2run(qry, col=None): |
| logger.debug('%s' % qry) |
| |
| batch = [] |
| threads = [] |
| i = 1 |
| |
| # parallelise queries |
| for dbid in GV.cfg: |
| c = GV.cfg[dbid] |
| db = connect2(c) |
| |
| thread = execThread(c, db, qry) |
| thread.start() |
| logger.debug('launching query thread %s for dbid %i' % |
| (thread.getName(), dbid)) |
| threads.append(thread) |
| |
| # we don't want too much going on at once |
| if (i % GV.opt['-B']) == 0: |
| # process this batch of threads |
| batch.extend(processThread(threads)) |
| threads = [] |
| |
| i += 1 |
| |
| # process the rest of threads |
| batch.extend(processThread(threads)) |
| |
| err = [] |
| for [cfg, curs] in batch: |
| if col == None: |
| col = curs.listfields() |
| for row in curs.dictresult(): |
| err.append([cfg, col, row]) |
| |
| return err |
| |
| def formatErr(c, col, row): |
| s = ('%s:%s:%s, content %s, dbid %s' % |
| (c['hostname'], c['port'], c['datadir'], |
| c['content'], c['dbid'])) |
| for i in col: |
| s = '%s, %s %s' % (s, i, row[i]) |
| return s |
| |
| ############# |
| def getGPConfiguration(): |
| cfg = {} |
| db = connect() |
| # note that in 4.0, sql commands cannot be run against the segment mirrors directly |
| # so we filter out non-primary segment databases in the query |
| qry = ''' |
| SELECT content, preferred_role = 'p' as definedprimary, |
| dbid, role = 'p' as isprimary, hostname, address, port, |
| fselocation as datadir |
| FROM gp_segment_configuration JOIN pg_filespace_entry on (dbid = fsedbid) |
| WHERE fsefsoid = (select oid from pg_filespace where fsname='pg_system') |
| AND (role = 'p' or content < 0 ) |
| ''' |
| curs = db.query(qry) |
| for row in curs.dictresult(): |
| if row['content'] == -1 and row['isprimary'] != 't': |
| continue # skip standby master |
| cfg[row['dbid']] = row |
| db.close() |
| return cfg |
| |
| def checkDistribPolicy(): |
| logger.info('-----------------------------------') |
| logger.info('Checking constraints on randomly distributed tables') |
| qry = ''' |
| select n.nspname, rel.relname, pk.conname as constraint |
| from pg_constraint pk |
| join pg_class rel on (pk.conrelid = rel.oid) |
| join pg_namespace n on (rel.relnamespace = n.oid) |
| join gp_distribution_policy d on (rel.oid = d.localoid) |
| where pk.contype in('p', 'u') and d.attrnums is null |
| ''' |
| |
| db = connect2(GV.cfg[1]) |
| try: |
| curs = db.query(qry) |
| err = [] |
| for row in curs.dictresult(): |
| err.append([GV.cfg[1], ('nspname', 'relname', 'constraint'), row]) |
| |
| if not err: |
| logger.info('[OK] randomly distributed tables') |
| else: |
| GV.testStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] randomly distributed tables') |
| logger.error('pg_constraint has %d issue(s)' % len(err)) |
| logger.error(qry) |
| for e in err: |
| logger.error(formatErr(e[0], e[1], e[2])) |
| for e in err: |
| cons = e[2] |
| removeIndexConstraint(cons['nspname'], cons['relname'], |
| cons['constraint']) |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkDistribPolicy') |
| myprint(' Execution error: ' + str(e)) |
| |
| |
| logger.info('-----------------------------------') |
| logger.info('Checking that unique constraints are only on distribution columns') |
| |
| # final part of the WHERE clause here is a little tricky: we want to make |
| # sure that the set of distribution columns is a left subset of the |
| # constraint. |
| qry = ''' |
| select n.nspname, rel.relname, pk.conname as constraint |
| from pg_constraint pk |
| join pg_class rel on (pk.conrelid = rel.oid) |
| join pg_namespace n on (rel.relnamespace = n.oid) |
| join gp_distribution_policy d on (rel.oid = d.localoid) |
| where pk.contype in ('p', 'u') and |
| (d.attrnums is null or not |
| d.attrnums operator(pg_catalog.<@) pk.conkey) |
| ''' |
| try: |
| curs = db.query(qry) |
| |
| err = [] |
| for row in curs.dictresult(): |
| err.append([GV.cfg[1], ('nspname', 'relname', 'constraint'), row]) |
| |
| if not err: |
| logger.info('[OK] unique constraints') |
| else: |
| GV.testStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] unique constraints') |
| logger.error('pg_constraint has %d issue(s)' % len(err)) |
| logger.error(qry) |
| for e in err: logger.error(formatErr(e[0], e[1], e[2])) |
| for e in err: |
| cons = e[2] |
| removeIndexConstraint(cons['nspname'], cons['relname'], |
| cons['constraint']) |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkDistribPolicy') |
| myprint(' Execution error: ' + str(e)) |
| |
| |
| ############# |
| def checkPartitionIntegrity(): |
| # |
| # MPP-8558: look for data in branch (not leaf) partitions |
| # |
| logger.info('-----------------------------------') |
| logger.info('Checking pg_partition ...') |
| err = [] |
| db = connect() |
| qry = ''' |
| select distinct |
| quote_ident(n.nspname) || '.' || quote_ident(c.relname) as parname |
| from pg_partition_rule r1 |
| join pg_partition_rule r2 on (r1.oid = r2.parparentrule) |
| join pg_class c on (r1.parchildrelid = c.oid) |
| join pg_namespace n on (c.relnamespace = n.oid) |
| where r1.parchildrelid <> 0 and r2.parchildrelid <> 0 |
| ''' |
| try: |
| curs = db.query(qry) |
| for row in curs.dictresult(): |
| qy2 = ('select count(*) as cc from (select 1 from only %s limit(2)) q' |
| % (row['parname'])) |
| |
| # logger.info(qy2) |
| curs2 = db.query(qy2) |
| for row2 in curs2.dictresult(): |
| if row2['cc'] == 0 : |
| continue |
| err.append([row['parname']]) |
| |
| if not err: |
| logger.info('[OK] pg_partition branch integrity') |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] pg_partition branch integrity') |
| logger.error('pg_partition has %d issue(s)' % len(err)) |
| logger.error('Discovered row data in branch partitions') |
| for e in err: |
| logger.error(' partition table name: %s' % (e[0])) |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkPartitionIntegrity') |
| myprint(' Execution error: ' + str(e)) |
| |
| # MPP-9283: look for partitions create "with oids". |
| qry = ''' |
| select n1.nspname, pc.relname, pc.oid |
| from pg_class pc, pg_partition_rule pr, pg_namespace n1 |
| where pr.parchildrelid = pc.oid and pc.relnamespace = n1.oid and pc.relhasoids |
| union |
| select n1.nspname, pc.relname, pc.oid |
| from pg_class pc, pg_partition pp, pg_namespace n1 |
| where pp.parrelid = pc.oid and pc.relnamespace = n1.oid and pc.relhasoids |
| ''' |
| try: |
| curs = db.query(qry) |
| err = [] |
| for row in curs.dictresult(): |
| err.append([GV.cfg[1], ('nspname', 'relname', 'oid'), row]) |
| |
| if not err: |
| logger.info('[OK] partition with oids check') |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] partition with oids check') |
| logger.error('partition with oids check found %d issue(s)' % len(err)) |
| logger.error(qry) |
| for e in err[0:100]: |
| row = e[2] |
| logger.error(" table %s.%s oid %s" % |
| (row['nspname'], row['relname'], row['oid'])) |
| if len(err) > 100: |
| logger.error("...") |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkPartitionIntegrity') |
| myprint(' Execution error: ' + str(e)) |
| |
| # TODO: add repair script |
| |
| |
| # MPP-11120: check for child partitions with different |
| # distribution policies The complexity of this query is due to |
| # having to account for the possibility of partitions with "holes" |
| # which would make similar distribution policies appear different |
| # in gp_distribution_policy. This is handled by unnesting both |
| # distribution |
| qry = ''' |
| select |
| parrelid, |
| parchildrelid, |
| d1.attrnums as parpolicy, |
| d2.attrnums as parchildpolicy |
| from |
| ( |
| select distinct |
| coalesce(p1.parrelid, p2.parrelid)::regclass as parrelid, |
| coalesce(p1.parchildrelid, p2.parchildrelid)::regclass as parchildrelid |
| from |
| ( |
| select parrelid, parchildrelid, g1.attname, g1.index |
| from pg_partition_rule pr |
| join pg_partition p on (pr.paroid = p.oid) |
| join ( |
| select localoid, attname, index |
| from ( |
| select |
| localoid, |
| unnest(attrnums) as offset, |
| generate_series(1, array_upper(attrnums, 1)) as index |
| from |
| gp_distribution_policy |
| ) d |
| join pg_attribute a on (a.attrelid = d.localoid and a.attnum = d.offset) |
| ) g1 on (parrelid = g1.localoid) |
| where parchildrelid != 0 |
| ) p1 |
| full outer join |
| ( |
| select parrelid, parchildrelid, g2.attname, g2.index |
| from pg_partition_rule pr |
| join pg_partition p on (pr.paroid = p.oid) |
| join ( |
| select localoid, attname, index |
| from ( |
| select |
| localoid, |
| unnest(attrnums) as offset, |
| generate_series(1, array_upper(attrnums, 1)) as index |
| from |
| gp_distribution_policy |
| ) d |
| join pg_attribute a on (a.attrelid = d.localoid and a.attnum = d.offset) |
| ) g2 on (parchildrelid = g2.localoid) |
| where parchildrelid != 0 |
| ) p2 |
| on (p1.parrelid = p2.parrelid and |
| p1.parchildrelid = p2.parchildrelid and |
| p1.index = p2.index) |
| where p1.attname is distinct from p2.attname |
| ) p |
| join gp_distribution_policy d1 on (d1.localoid = p.parrelid) |
| join gp_distribution_policy d2 on (d2.localoid = p.parchildrelid) |
| where d2.attrnums is not null |
| ''' |
| try: |
| curs = db.query(qry) |
| cols = ('parrelid', 'parchildrelid', 'parpolicy', 'parchildpolicy') |
| |
| err = [] |
| for row in curs.dictresult(): |
| err.append([GV.cfg[1], cols, row]) |
| |
| if not err: |
| logger.info('[OK] partition distribution policy check') |
| else: |
| GV.testStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] partition distribution policy check') |
| logger.error('partition distribution policy check found %d issue(s)' % len(err)) |
| if len(err) > 100: |
| logger.error(qry) |
| |
| count = 0 |
| for e in err: |
| cfg = e[0] |
| col = e[1] |
| row = e[2] |
| |
| # generate repair script for this row |
| distributeRandomly(row['parchildrelid']) |
| |
| # report at most 100 rows, for brevity |
| if count == 100: |
| logger.error("...") |
| count += 1 |
| if count > 100: |
| continue |
| |
| if count == 0: |
| logger.error("--------") |
| logger.error(" " + " | ".join(map(str, col))) |
| logger.error(" " + "-+-".join(['-'*len(x) for x in col])) |
| |
| |
| logger.error(" " + " | ".join([str(row[x]) for x in col])) |
| count += 1 |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkPartitionIntegrity') |
| myprint(' Execution error: ' + str(e)) |
| |
| |
| db.close() |
| |
| ############# |
| # A dictionary of SQL statements for checkPartitionRegularity() |
| |
| partitionRegularityChecks = { |
| # Query: irreg_user_constraint |
| # |
| # A row represents an irregular user-defined constraint. |
| # |
| # This view just filters ptable_user_con_info so that it returns no rows |
| # if no irregular user-defined constraints exist. |
| # |
| # An irregular user-constraint is a constraint whose definition (DDL expression |
| # format) appears on the root table of a partitioned table, and doesn't appear |
| # on every part table of the partitioned table. It is possible to construct |
| # these using sequences of DDL operations on release prior to Rio. In Rio, |
| # it is possible to construct them with FOREIGN KEY constraints, but not with |
| # other constraint types. |
| # |
| # tableid: pg_class.oid of the partitioned table |
| # conname: name of the constraint on the partitioned table |
| # contype: type of constraint (same on part and table) |
| # condef: specification of constraint (same on part and table) |
| # numexpected: number of constraint occurrences expected (incl table) |
| # numactual: number of constraint occurrences observed (incl table) |
| 'irreg_user_constraint' : |
| """select |
| tableid, |
| tableconname, |
| contype, |
| condef, |
| numexpected, |
| numactual |
| from |
| ( |
| select |
| c.tableid, |
| c.tableconname, |
| c.contype, |
| c.condef, |
| i.tableparts + 1, |
| count(c.partid), |
| count(distinct c.partconname) |
| from |
| ( |
| select |
| u.tableid, |
| u.conname, |
| u.contype, |
| u.condef, |
| p.partid, |
| p.conid, |
| coalesce(p.conname) |
| from |
| ( |
| select |
| p.tableid, |
| c.conname, |
| c.contype, |
| pg_get_constraintdef(c.oid) as condef |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as p(tableid, tabledepth) , |
| pg_constraint c |
| where |
| p.tableid = c.conrelid |
| ) as u(tableid, conname, contype, condef) |
| join |
| ( |
| select |
| x.tableid::regclass as tableid, |
| c.conrelid::regclass as partid, |
| c.oid as conid, |
| c.conname, |
| c.contype, |
| pg_get_constraintdef(c.oid) as condef |
| from |
| pg_constraint c, |
| ( |
| select |
| tableid, |
| tabledepth, |
| tableid::regclass partid, |
| 0 as partdepth, |
| 0 as partordinal, |
| 'r'::char as partstatus |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as ptable(tableid, tabledepth) |
| union all |
| select |
| parrelid::regclass as tableid, |
| t.tabledepth as tabledepth, |
| r.parchildrelid::regclass partid, |
| p.parlevel + 1 as partdepth, |
| r.parruleord as partordinal, |
| case |
| when t.tabledepth = p.parlevel + 1 then 'l'::char |
| else 'i'::char |
| end as partstatus |
| from |
| pg_partition p, |
| pg_partition_rule r, |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as t(tableid, tabledepth) |
| where |
| p.oid = r.paroid |
| and not p.paristemplate |
| and p.parrelid = t.tableid |
| ) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus) |
| where |
| x.partid = c.conrelid |
| ) as p(tableid, partid, conid, conname, contype, condef) |
| on ( |
| u.tableid = p.tableid and |
| u.contype = p.contype and |
| u.condef = p.condef |
| ) |
| ) as c(tableid, tableconname, contype, condef, partid, partconid, partconname) , |
| ( |
| select |
| t.tableid, |
| t.tabledepth, |
| n.nparts as tableparts, |
| r.nspname, |
| r.relname |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as t(tableid, tabledepth) , |
| ( |
| select |
| c.oid::regclass as relid, |
| n.nspname, |
| c.relname, |
| c.relkind |
| from |
| pg_class c, |
| pg_namespace n |
| where |
| c.relnamespace = n.oid |
| ) as r(relid, nspname, relname, relkind) , |
| ( |
| select tableid, count(*) |
| from ( |
| select |
| t.tableid::regclass, |
| p.parchildrelid::regclass as partid |
| from |
| ( |
| select pg_partition.parrelid, pg_partition.oid |
| from pg_partition |
| where not pg_partition.paristemplate |
| ) t(tableid, partid), |
| pg_partition_rule p |
| where p.paroid = t.partid |
| ) as contains_part(tableid, partid) |
| group by tableid |
| ) n(tableid, nparts) |
| where |
| t.tableid = r.relid and |
| t.tableid = n.tableid |
| ) as i(tableid, tabledepth, tableparts, nspname, relname) |
| where |
| c.tableid = i.tableid |
| group by |
| c.tableid, |
| c.tableconname, |
| c.contype, |
| c.condef, |
| i.tableparts |
| ) as ptable_user_con_info(tableid, tableconname, contype, condef, numexpected, numactual, numnames) |
| where |
| contype != 'f' and -- no FK consistency in Rio |
| numexpected != numactual""", |
| |
| # Query: irreg_sys_constraint |
| # |
| # A row represents an irregular system-defined constraint. |
| # |
| # An irregular system constraint is a constraint whose definition (DDL |
| # expression format) appears on some part tables and that is not a system- |
| # defined partition constraint. |
| # |
| # This view doesn't actually test this. Instead, it checks for an |
| # expected pattern of system-defined constraints. |
| # 1. The constraints checked are those that don't appear on the root. |
| # 2. Each part has as many constraints as its depth in the partition |
| # less any default parts on its path. |
| # Violators are listed as results. |
| # |
| # tableid: pg_class.oid of the partitioned table |
| # partid: pg_class.oid of the (constrained) part table |
| # expected: number of constraint occurrences expected |
| # actual: number of constraint occurrences observed |
| 'irreg_sys_constraint' : |
| """select |
| coalesce(x.partid, a.partid) as partid, |
| x.tableid, |
| x.expected, |
| coalesce(a.actual, 0) as actual |
| from |
| ( |
| {expected} |
| ) x |
| full join |
| ( |
| {actual} |
| ) a |
| on (x.partid = a.partid) |
| where |
| x.expected != a.actual or |
| x.expected is null or |
| x.expected != coalesce(a.actual,0);""", |
| |
| |
| # Query based on view: unenforced_constraint_info |
| # |
| # a row represents an unenforced unique constraint to be fixed. |
| # |
| # tableid |
| # partid |
| # indexid |
| # partoid |
| # indexoid |
| # contype |
| # conname |
| # refobjid |
| |
| 'unenforced_constraint_info' : |
| """select |
| c.tableid, |
| c.partid, |
| b.indexid, |
| c.partid::int as partoid, |
| b.indexid::int as indexoid, |
| b.contype, |
| b.conname, |
| b.refobjid |
| from |
| ( |
| select |
| k.tableid, |
| k.distkey, |
| k.partkey, |
| c.conid, |
| c.conname, |
| c.contype, |
| i.indrelid::regclass, |
| i.indkey::smallint[] |
| from |
| pg_index i, |
| ( |
| select |
| k.tableid, |
| d.attrcnt, |
| d.attrnums, |
| sum(k.partkeylen), |
| array_agg(k.partkey[g.g]) |
| from |
| ( |
| select |
| p.parrelid::regclass, |
| t.tabledepth, |
| p.parlevel, |
| p.parnatts, |
| p.paratts |
| from |
| pg_partition p, |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as t(tableid, tabledepth) |
| where |
| p.parrelid = t.tableid and |
| p.paristemplate is false |
| ) as k(tableid, tabledepth, partdepth, partkeylen, partkey) , |
| generate_series(0, (select max(partkeylen) from ( |
| select |
| p.parrelid::regclass, |
| t.tabledepth, |
| p.parlevel, |
| p.parnatts, |
| p.paratts |
| from |
| pg_partition p, |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as t(tableid, tabledepth) |
| where |
| p.parrelid = t.tableid and |
| p.paristemplate is false |
| ) as partition_level(tableid, tabledepth, partdepth, partkeylen, partkey) )-1) g(g), |
| ( |
| select |
| localoid::regclass, |
| attrnums, |
| coalesce(1+array_upper(attrnums,1)-array_lower(attrnums,1),0) |
| from |
| gp_distribution_policy |
| ) as d(relid, attrnums, attrcnt) |
| where |
| g.g < k.partkeylen and |
| k.tableid = d.relid |
| group by |
| k.tableid, |
| d.attrcnt, |
| d.attrnums |
| ) as k(tableid, distkeylen, distkey, partkeylen, partkey) , |
| ( |
| select |
| relid, |
| conid, |
| conname, |
| contype, |
| indexid |
| from |
| ( |
| select |
| r.oid::regclass as relid, |
| c.oid as conid, |
| c.conname, |
| c.contype, |
| c.consrc, |
| pg_get_constraintdef(c.oid) as condef, |
| d.objid::regclass as indexid |
| from |
| ( |
| pg_class r |
| join |
| pg_constraint c |
| on |
| r.oid = c.conrelid |
| and r.relkind = 'r' |
| ) |
| left join |
| pg_depend d |
| on |
| d.refobjid = c.oid |
| and d.classid = 'pg_class'::regclass |
| and d.refclassid = 'pg_constraint'::regclass |
| and d.deptype = 'i' |
| ) as relconstraint(relid, conid, conname, contype, consrc, condef, indexid) |
| where |
| indexid is not null |
| or contype in ('p', 'u') |
| ) as c(relid, conid, conname, contype, indexid) |
| where |
| c.relid = k.tableid and |
| i.indrelid = k.tableid and |
| i.indexrelid = c.indexid and |
| not i.indkey::smallint[] @> array_cat(k.distkey, k.partkey) |
| ) as x(tableid, distkey, partkey, conid, conname, contype, indrelid, indkey) , |
| ( |
| select |
| tableid, |
| tabledepth, |
| tableid::regclass partid, |
| 0 as partdepth, |
| 0 as partordinal, |
| 'r'::char as partstatus |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as ptable(tableid, tabledepth) |
| union all |
| select |
| parrelid::regclass as tableid, |
| t.tabledepth as tabledepth, |
| r.parchildrelid::regclass partid, |
| p.parlevel + 1 as partdepth, |
| r.parruleord as partordinal, |
| case |
| when t.tabledepth = p.parlevel + 1 then 'l'::char |
| else 'i'::char |
| end as partstatus |
| from |
| pg_partition p, |
| pg_partition_rule r, |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as t(tableid, tabledepth) |
| where |
| p.oid = r.paroid |
| and not p.paristemplate |
| and p.parrelid = t.tableid |
| ) as c(tableid, tabledepth, partid, partdepth, partordinal, partstatus) , |
| ( |
| select |
| r.oid::regclass as relid, |
| i.oid::regclass as indexid, |
| c.oid as conid, |
| c.contype, |
| c.conname, |
| pg_get_constraintdef(c.oid) as condef, |
| d.classid, |
| d.objid, |
| d.objsubid, |
| d.refclassid, |
| d.refobjid, |
| d.refobjsubid, |
| d.deptype |
| from |
| pg_constraint c, |
| pg_class i, |
| pg_depend d, |
| pg_index x, |
| pg_class r |
| where |
| d.classid = 'pg_class'::regclass and |
| d.objid = i.oid and |
| d.objsubid = 0 and |
| d.refclassid = 'pg_constraint'::regclass and |
| d.refobjid = c.oid and |
| d.refobjsubid = 0 and |
| d.deptype = 'i' and |
| i.relkind = 'i' and |
| i.oid = x.indexrelid and |
| r.oid = x.indrelid |
| ) as b(relid, indexid, conid, contype, conname, condef, classid, objid, objsubid, refclassid, refobjid, refobjsubid, deptype) |
| where |
| x.tableid = c.tableid and |
| c.partid = b.relid and |
| b.classid = 'pg_class'::regclass and |
| b.objsubid = 0 and |
| b.refclassid = 'pg_constraint'::regclass and |
| b.refobjsubid = 0 and |
| b.deptype = 'i'""", |
| |
| # Repair sequence for issues identified by unenforced_constraint_info. |
| # Must substitute values from a row of that query, before running. |
| |
| 'unenforced_constraint_repairs' : |
| """--Demote type-{contype} constraint {conname} on part {partid} |
| --of partitioned table {tableid} to simple unique index. |
| |
| --Disconnect index {indexid} from constraint {conname} |
| delete |
| from |
| pg_depend |
| where |
| (classid, objid, objsubid, refclassid, refobjid, refobjsubid, deptype) = |
| ( |
| select |
| d.classid, |
| d.objid, |
| d.objsubid, |
| d.refclassid, |
| d.refobjid, |
| d.refobjsubid, |
| d.deptype |
| from |
| pg_index i, |
| pg_depend d |
| where |
| i.indexrelid = {indexoid} and |
| d.classid = 'pg_class'::regclass and |
| d.objid = i.indexrelid and |
| d.objsubid = 0 and |
| d.refclassid = 'pg_constraint'::regclass and |
| d.refobjid = ( |
| select oid |
| from pg_constraint |
| where |
| conrelid = {partoid} and |
| conname = {quoted_conname} and |
| contypid = 0 and |
| contype = {quoted_contype} |
| ) and |
| d.refobjsubid = 0 and |
| d.deptype = 'i' |
| ); |
| --Replace dependency of constraint {conname} on table columns of {partid} |
| --with dependency of index {indexoid} on same. |
| update |
| pg_depend |
| set |
| classid = 'pg_class'::regclass, |
| objid = {indexoid} |
| where |
| classid = 'pg_constraint'::regclass and |
| objid = ( |
| select oid |
| from pg_constraint |
| where |
| conrelid = {partoid} and |
| conname = {quoted_conname} and |
| contypid = 0 and |
| contype = {quoted_contype} |
| ) and |
| objsubid = 0 and |
| refclassid = 'pg_class'::regclass and |
| refobjid = {partoid} and |
| refobjsubid <> 0 and |
| deptype = 'a'; |
| --Delete constraint {conname} leaving its index {indexid} |
| delete |
| from |
| pg_constraint |
| where |
| conrelid = {partoid} and |
| conname = {quoted_conname} and |
| contypid = 0 and |
| contype = {quoted_contype};""", |
| |
| # Query: fix_ill_named_constraint |
| # |
| # a row represents the data for a fixup of an ill-named constraint |
| # |
| # tableid: pg_class.oid of the partitioned table as regclass |
| # tableconname: pg_constraint.conname on the partitioned table |
| # partid: pg_class.oid of the part table as regclass |
| # partrelid: pg_class.oid of the part table as int |
| # partconname: pg_constraint.conname on the part or partitioned table |
| |
| 'fix_ill_named_constraint' : |
| """select |
| tableid, |
| tableconname, |
| partid, |
| partid::int as partrelid, |
| partconname |
| from ( |
| select |
| u.tableid, |
| u.conname, |
| u.contype, |
| u.condef, |
| p.partid, |
| p.conid, |
| coalesce(p.conname) |
| from |
| ( |
| select |
| p.tableid, |
| c.conname, |
| c.contype, |
| pg_get_constraintdef(c.oid) as condef |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as p(tableid, tabledepth) , |
| pg_constraint c |
| where |
| p.tableid = c.conrelid |
| ) as u(tableid, conname, contype, condef) |
| join |
| ( |
| select |
| x.tableid::regclass as tableid, |
| c.conrelid::regclass as partid, |
| c.oid as conid, |
| c.conname, |
| c.contype, |
| pg_get_constraintdef(c.oid) as condef |
| from |
| pg_constraint c, |
| ( |
| select |
| tableid, |
| tabledepth, |
| tableid::regclass partid, |
| 0 as partdepth, |
| 0 as partordinal, |
| 'r'::char as partstatus |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as ptable(tableid, tabledepth) |
| union all |
| select |
| parrelid::regclass as tableid, |
| t.tabledepth as tabledepth, |
| r.parchildrelid::regclass partid, |
| p.parlevel + 1 as partdepth, |
| r.parruleord as partordinal, |
| case |
| when t.tabledepth = p.parlevel + 1 then 'l'::char |
| else 'i'::char |
| end as partstatus |
| from |
| pg_partition p, |
| pg_partition_rule r, |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as t(tableid, tabledepth) |
| where |
| p.oid = r.paroid |
| and not p.paristemplate |
| and p.parrelid = t.tableid |
| ) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus) |
| where |
| x.partid = c.conrelid |
| ) as p(tableid, partid, conid, conname, contype, condef) |
| on ( |
| u.tableid = p.tableid and |
| u.contype = p.contype and |
| u.condef = p.condef |
| ) |
| ) as part_user_constraint(tableid, tableconname, contype, condef, partid, partconid, partconname) |
| where partconname != tableconname""", |
| |
| # Query based on view: part_sys_actual |
| # tableid |
| # partid |
| # actual observed number of system-constraints |
| |
| "actual_part_con_query" : |
| """select |
| tableid, |
| partid, |
| count(*) as actual |
| from ( |
| select |
| tableid, |
| partid, |
| conid, |
| conname, |
| contype, |
| condef |
| from ( |
| select |
| x.tableid::regclass as tableid, |
| c.conrelid::regclass as partid, |
| c.oid as conid, |
| c.conname, |
| c.contype, |
| pg_get_constraintdef(c.oid) as condef |
| from |
| pg_constraint c, |
| ( |
| select |
| tableid, |
| tabledepth, |
| tableid::regclass partid, |
| 0 as partdepth, |
| 0 as partordinal, |
| 'r'::char as partstatus |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as ptable(tableid, tabledepth) |
| union all |
| select |
| parrelid::regclass as tableid, |
| t.tabledepth as tabledepth, |
| r.parchildrelid::regclass partid, |
| p.parlevel + 1 as partdepth, |
| r.parruleord as partordinal, |
| case |
| when t.tabledepth = p.parlevel + 1 then 'l'::char |
| else 'i'::char |
| end as partstatus |
| from |
| pg_partition p, |
| pg_partition_rule r, |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as t(tableid, tabledepth) |
| where |
| p.oid = r.paroid |
| and not p.paristemplate |
| and p.parrelid = t.tableid |
| ) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus) |
| where |
| x.partid = c.conrelid |
| ) as part_constraint(tableid, partid, conid, conname, contype, condef) |
| where |
| (partid, condef) not in |
| ( |
| select partid, condef |
| from ( |
| select |
| u.tableid, |
| u.conname, |
| u.contype, |
| u.condef, |
| p.partid, |
| p.conid, |
| coalesce(p.conname) |
| from |
| ( |
| select |
| p.tableid, |
| c.conname, |
| c.contype, |
| pg_get_constraintdef(c.oid) as condef |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as p(tableid, tabledepth) , |
| pg_constraint c |
| where |
| p.tableid = c.conrelid |
| ) as u(tableid, conname, contype, condef) |
| join |
| ( |
| select |
| x.tableid::regclass as tableid, |
| c.conrelid::regclass as partid, |
| c.oid as conid, |
| c.conname, |
| c.contype, |
| pg_get_constraintdef(c.oid) as condef |
| from |
| pg_constraint c, |
| ( |
| select |
| tableid, |
| tabledepth, |
| tableid::regclass partid, |
| 0 as partdepth, |
| 0 as partordinal, |
| 'r'::char as partstatus |
| from |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as ptable(tableid, tabledepth) |
| union all |
| select |
| parrelid::regclass as tableid, |
| t.tabledepth as tabledepth, |
| r.parchildrelid::regclass partid, |
| p.parlevel + 1 as partdepth, |
| r.parruleord as partordinal, |
| case |
| when t.tabledepth = p.parlevel + 1 then 'l'::char |
| else 'i'::char |
| end as partstatus |
| from |
| pg_partition p, |
| pg_partition_rule r, |
| ( |
| select |
| parrelid::regclass, |
| max(parlevel)+1 |
| from |
| pg_partition |
| group by parrelid |
| ) as t(tableid, tabledepth) |
| where |
| p.oid = r.paroid |
| and not p.paristemplate |
| and p.parrelid = t.tableid |
| ) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus) |
| where |
| x.partid = c.conrelid |
| ) as p(tableid, partid, conid, conname, contype, condef) |
| on ( |
| u.tableid = p.tableid and |
| u.contype = p.contype and |
| u.condef = p.condef |
| ) |
| ) as part_user_constraint(tableid, tableconname, contype, condef, partid, partconid, partconname) |
| ) |
| ) as part_sys_constraint(tableid, partid, conid, conname, contype, condef) |
| group by tableid, partid""", |
| |
| # Query contains_directly has a row per step along a path from |
| # root to leaf. The columns are |
| # tableid - pg_class.oid of the partitioned table. |
| # parentid - pg_class.oid of the "from" table. |
| # childid - pg_class.oid of the "to" table, the target of the step. |
| # npcon - constraint count for this step: 0 if "to" is default, else 1. |
| # |
| |
| 'contains_directly' : |
| """( |
| select |
| r.parrelid::regclass as tableid, |
| coalesce(p.parchildrelid, r.parrelid)::regclass as parentid, |
| c.parchildrelid::regclass as childid, |
| case when c.parisdefault then 0 else 1 end as npcon |
| from |
| pg_partition r, |
| pg_partition_rule c |
| left join pg_partition_rule p on (c.parparentrule = p.oid) |
| where |
| not r.paristemplate and |
| not c.parchildrelid = 0 and |
| c.paroid = r.oid |
| ) {alias}""", |
| |
| # The base case is the length 1 path from root or branch to branch |
| # or leaf. This is always the first union term. |
| |
| 'base_query' : |
| """select |
| start.tableid, |
| start.parentid, |
| start.childid, |
| start.npcon |
| from |
| {contains_directly_start}""", |
| |
| # A union term is an N-way self join of contains_directly and |
| # identifies paths of length N. Note that the length of a path |
| # to a part at depth N+1 is N. |
| |
| 'union_term' : |
| """select |
| start.tableid, |
| start.parentid, |
| stop.childid, |
| start.npcon -- stop.npcon |
| from |
| {contains_directly_start}, |
| {contains_directly_stop}{intermediate_path_from} |
| where |
| start.childid {intermediate_path_where} |
| = stop.parentid""", |
| |
| # |
| |
| 'union_query' : |
| """select tableid, childid as partid, sum(npcon) as expected |
| from ( |
| {open_union} |
| ) r |
| group by tableid, childid""" |
| |
| } |
| |
| ############# |
| |
| # For partitionRegularityChecks. |
| def make_union_term(depth): |
| assert depth > 0 |
| |
| union_term = partitionRegularityChecks['union_term'] |
| contains_directly = partitionRegularityChecks['contains_directly'] |
| |
| path_from = [] |
| path_where = [] |
| |
| if depth > 1: # for later .join() |
| path_from.append('') |
| path_where.append('') |
| |
| for i in range(1, depth): |
| path_alias = "path%d" % i |
| path_from.append(contains_directly.format(alias = path_alias)) |
| path_where.append(" = %s.parentid and %s.childid" |
| % (path_alias, path_alias) ) |
| |
| return union_term.format( |
| contains_directly_start = contains_directly.format(alias = 'start'), |
| contains_directly_stop = contains_directly.format(alias = 'stop'), |
| intermediate_path_from = ",\n ".join(path_from), |
| intermediate_path_where = "\n".join(path_where) |
| ) |
| |
| # For partitionRegularityChecks. |
| def expected_part_con_query(maxdepth): |
| cds = partitionRegularityChecks['contains_directly'].format(alias = 'start') |
| base_query = partitionRegularityChecks['base_query'] |
| terms = [base_query.format(contains_directly_start=cds)] |
| |
| for depth in range(1, maxdepth+1): |
| terms.append(make_union_term(depth)) |
| |
| return partitionRegularityChecks['union_query'].format( |
| open_union = '\n\nunion all\n\n'.join(terms)) |
| |
| # For partitionRegularityChecks. |
| def make_irregular_sys_con_query(maxdepth): |
| expected = expected_part_con_query(maxdepth) |
| actual = partitionRegularityChecks['actual_part_con_query'] |
| qry = partitionRegularityChecks['irreg_sys_constraint'] |
| return qry.format(expected = expected, actual = actual) |
| |
| |
| ############# |
| def checkPartitionRegularity(): |
| # |
| # Rio partitioning changes introduce new regularity requirements for constraints |
| # on partitioned tables. These are the checks. |
| # |
| okayToRun = GV.missingEntryStatus and GV.inconsistentEntryStatus and GV.foreignKeyStatus |
| irregular = False |
| |
| logger.info('-----------------------------------') |
| logger.info('Checking pg_partition/pg_constraint regularity ...') |
| |
| if not okayToRun: |
| logger.warn('unable to run pg_partition/pg_constraint regularity checks') |
| logger.warn('prerequisite tests failed or not run') |
| return |
| |
| db = connect() |
| |
| testname = 'user-defined constraint regularity on partitioned tables' |
| |
| qry = partitionRegularityChecks['irreg_user_constraint'] |
| err = [] |
| |
| try: |
| curs = db.query(qry) |
| |
| for row in curs.dictresult(): |
| err.append( (row['tableconname'], row['tableid'], row['numactual'], row['numexpected']) ) |
| |
| if not err: |
| logger.info('[OK] %s' % testname) |
| else: |
| irregular = True |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] %s' % testname) |
| logger.error(' %s test has %d issue(s)' % (testname, len(err)) ) |
| efmt = ' Constraint "%s" of partitioned table "%s" occurs %d times, %d expected.' |
| for e in err: |
| logger.error(efmt % e) |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: %s' % testname) |
| myprint(' Execution error: ' + str(e)) |
| |
| testname = 'system-defined partition constraint regularity on partitioned tables' |
| |
| # Determine maximum number of partitioning levels in this database |
| try: |
| qry = """select max(parlevel)+1 as maxdepth from pg_partition;""" |
| curs = db.query(qry) |
| assert len(curs.dictresult()) == 1 # scalar aggregation! |
| maxdepth = curs.dictresult()[0]['maxdepth'] |
| |
| err = [] |
| if isinstance(maxdepth, int): |
| qry = make_irregular_sys_con_query(maxdepth) |
| curs = db.query(qry) |
| for row in curs.dictresult(): |
| err.append( (row['partid'], row['tableid'], row['actual'], row['expected']) ) |
| |
| if not err: |
| logger.info('[OK] %s' % testname) |
| else: |
| irregular = True |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] %s' % testname) |
| logger.error(' %s test has %d issue(s)' % ( testname, len(err)) ) |
| efmt = ' Part table "%s" of partitioned table "%s" has %d system-defined constraints, %d expected.' |
| for e in err: |
| logger.error(efmt % e) |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: %s' % testname) |
| myprint(' Execution error: ' + str(e)) |
| |
| testname = 'unenforced unique constraints on partitioned tables' |
| |
| if irregular: |
| logger.warn('skipping test %s due to constraint irregularities' % testname) |
| elif GV.Remove: |
| logger.warn('skipping test %s due to required removals' % testname) |
| else: |
| # logger.info('Checking %s ...' % testname) |
| |
| qry = partitionRegularityChecks['unenforced_constraint_info'] |
| err = [] |
| try: |
| curs = db.query(qry) |
| for row in curs.dictresult(): |
| row['con_type_phrase'] = 'primary key' if row['contype'] == 'p' else 'unique' |
| row['quoted_conname'] = quote_value(row['conname']) |
| row['quoted_contype'] = quote_value(row['contype']) |
| err.append(row) |
| |
| if not err: |
| logger.info('[OK] %s' % testname) |
| else: |
| GV.testStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] %s' % testname) |
| logger.error(' %s test has %d issue(s)' % ( testname, len(err) ) ) |
| efmt = ' partitioned table "{tableid}" has unenforced {con_type_phrase} constraint: "{partid}"' |
| repair_fmt = partitionRegularityChecks['unenforced_constraint_repairs'] |
| n = 0 |
| for e in err: |
| n = n+1 |
| if n <= 100: |
| logger.error(efmt.format(**e)) |
| repair_sequence = repair_fmt.format(**e) |
| for dbid in GV.cfg: |
| addDemoteConstraint(dbid, repair_sequence) |
| if len(err) > 100: |
| logger.error("...") |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: %s' % testname) |
| myprint(' Execution error: ' + str(e)) |
| |
| |
| testname = 'inconsistently named constraint in partitioned table' |
| |
| if irregular: |
| logger.warn('skipping test %s due to constraint irregularities' % testname) |
| elif GV.Remove: |
| logger.warn('skipping test %s due to required removals' % testname) |
| elif GV.DemoteConstraint: |
| logger.warn('skipping test %s due to unenforced constraint removals' % testname) |
| else: |
| # logger.info('Checking %s ...' % testname) |
| |
| qry = partitionRegularityChecks['fix_ill_named_constraint'] |
| err = [] |
| try: |
| curs = db.query(qry) |
| for row in curs.dictresult(): |
| err.append( row ) |
| |
| if not err: |
| logger.info('[OK] %s' % testname) |
| else: |
| GV.testStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] %s' % testname) |
| logger.error(' %s test has %d issue(s)' % ( testname, len(err) ) ) |
| efmt = ' part table "%s" has constraint named "%s"; should be "%s"' |
| n = 0 |
| for e in err: |
| n = n+1 |
| if n <= 100: |
| # part name, wrong constraint name, right constraint name |
| logger.error(efmt % (e['partid'], e['partconname'], e['tableconname'])) |
| for dbid in GV.cfg: |
| buildAdjustConname( |
| dbid, e['tableid'], int(e['partrelid']), |
| quote_value(e['partconname']), |
| quote_value(e['tableconname']) ) |
| if len(err) > 100: |
| logger.error("...") |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: %s' % testname) |
| myprint(' Execution error: ' + str(e)) |
| |
| db.close() |
| |
| ############# |
| def checkPGClass(): |
| logger.info('-----------------------------------') |
| logger.info('Checking pg_class ...') |
| qry = ''' |
| SELECT relname, relkind, tc.oid as oid, |
| reltoastrelid, reltoastidxid |
| FROM pg_class tc left outer join |
| pg_attribute ta on (tc.oid = ta.attrelid) |
| WHERE ta.attrelid is NULL |
| ''' |
| err = connect2run(qry, ('relname', 'relkind', 'oid')) |
| if not err: |
| logger.info('[OK] pg_class') |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] pg_class') |
| logger.error('pg_class has %d issue(s)' % len(err)) |
| logger.error(qry) |
| for e in err[0:100]: |
| logger.error(formatErr(e[0], e[1], e[2])) |
| if len(err) > 100: |
| logger.error("...") |
| |
| |
| ############# |
| def checkPGNamespace(): |
| logger.info('-----------------------------------') |
| logger.info('Checking for leaked temporary schemas') |
| db = connect2(GV.cfg[1], utilityMode=False) |
| |
| # The simpler form of this query that pushed the union into the |
| # inner select does not run correctly on 3.2.x |
| qry = ''' |
| SELECT distinct nspname as schema |
| FROM ( |
| SELECT nspname, replace(nspname, 'pg_temp_','')::int as sess_id |
| FROM gp_dist_random('pg_namespace') |
| WHERE nspname ~ '^pg_temp_[0-9]+' |
| ) n LEFT OUTER JOIN pg_stat_activity x using (sess_id) |
| WHERE x.sess_id is null |
| UNION |
| SELECT nspname as schema |
| FROM ( |
| SELECT nspname, replace(nspname, 'pg_temp_','')::int as sess_id |
| FROM pg_namespace |
| WHERE nspname ~ '^pg_temp_[0-9]+' |
| ) n LEFT OUTER JOIN pg_stat_activity x using (sess_id) |
| WHERE x.sess_id is null |
| ''' |
| try: |
| curs = db.query(qry) |
| if curs.ntuples() == 0: |
| logger.info('[OK] temporary schemas') |
| else: |
| GV.testStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] temporary schemas') |
| logger.error('found %d unbound temporary schemas' % curs.ntuples()) |
| for row in curs.getresult(): |
| logger.error(" ... %s" % row[0]) |
| GV.Schemas.append(row[0]) |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing test: checkPGNamespace') |
| myprint(' Execution error: ' + str(e)) |
| |
| # Check for objects in various catalogs that are in a schema that has |
| # been dropped. |
| logger.info('Checking missing schema definitions ...') |
| qry = ''' |
| SELECT o.catalog, o.nsp |
| FROM pg_namespace n right outer join |
| (select 'pg_class' as catalog, relnamespace as nsp from pg_class |
| union |
| select 'pg_type' as catalog, typnamespace as nsp from pg_type |
| union |
| select 'pg_operator' as catalog, oprnamespace as nsp from pg_operator |
| union |
| select 'pg_proc' as catalog,pronamespace as nsp from pg_proc) o on |
| (n.oid = o.nsp) |
| WHERE n.oid is NULL |
| ''' |
| err = connect2run(qry, ('catalog', 'nsp')) |
| if not err: |
| logger.info('[OK] missing schema definitions') |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] missing schema definitons') |
| logger.error('found %d references to non-existent schemas' % len(err)) |
| logger.error(qry) |
| for e in err: |
| logger.error(formatErr(e[0], e[1], e[2])) |
| |
| ############# |
| ''' |
| Produce repair scripts to remove dangling entries of gp_fastsequence: |
| - one file per segment dbid |
| - contains DELETE statements to remove catalog entry |
| ''' |
| def removeFastSequence(db): |
| ''' |
| MPP-14758: gp_fastsequence does not get cleanup after a failed transaction (AO/CO) |
| Note: this is slightly different from the normal foreign key check |
| because it streches the cross reference across segments. |
| This makes it safe in the event of cross consistency issues with pg_class, |
| but may not repair some issues when there are cross consistency problems |
| ''' |
| try: |
| qry = """ |
| SELECT dbid, objid |
| FROM gp_segment_configuration AS cfg JOIN |
| (SELECT gp_segment_id, objid |
| FROM (select gp_segment_id, objid from gp_fastsequence |
| union |
| select gp_segment_id, objid from gp_dist_random('gp_fastsequence')) AS fs |
| LEFT OUTER JOIN |
| (select oid from pg_class |
| union |
| select oid from gp_dist_random('pg_class')) AS c |
| ON (fs.objid = c.oid) |
| WHERE c.oid IS NULL) AS r |
| ON r.gp_segment_id = cfg.content |
| WHERE cfg.role = 'p'; |
| """ |
| curs = db.query(qry) |
| for row in curs.dictresult(): |
| seg = row['dbid'] # dbid of targetted segment |
| name = 'gp_fastsequence tuple' # for comment purposes |
| table = 'gp_fastsequence' # table name |
| cols = {'objid': row['objid']} # column name and value |
| objname = 'gp_fastsequence' # for comment purposes |
| buildRemove(seg, name, table, cols, objname) |
| except Exception, e: |
| logger.error('removeFastSequence: ' + str(e)) |
| |
| |
| ############# |
| |
| def removeIndexConstraint(nspname, relname, constraint): |
| GV.Constraints.append('ALTER TABLE "%s"."%s" DROP CONSTRAINT "%s" CASCADE;' % \ |
| (nspname, relname, constraint)) |
| |
| def distributeRandomly(rel): |
| GV.Policies.append('ALTER TABLE %s SET DISTRIBUTED RANDOMLY;' % rel) |
| |
| def buildRemove(seg, name, table, cols, objname): |
| first = False |
| fullstr = '' |
| str = '' |
| for col in cols: |
| if not first: |
| fullstr = '--Object Name: %s\n--Remove %s for %i\n' % \ |
| (objname, name, cols[col]) |
| if len(str) > 0: |
| str += ' or ' |
| str += '%s = \'%s\'' % (col, cols[col]) |
| fullstr += 'DELETE FROM %s WHERE %s;' % (table, str) |
| addRemove(seg, fullstr) |
| |
| def addRemove(seg, line): |
| if not GV.Remove.has_key(seg): |
| GV.Remove[seg] = [] |
| GV.Remove[seg].append(line) |
| |
| def buildAdjustConname(seg, relname, relid, oldconname, newconname): |
| # relname text |
| # relid oid as int |
| # old/newconname text |
| stmt = '--Constraint Name: %s on %s becomes %s\n' % \ |
| (oldconname, relname, newconname) |
| stmt += 'UPDATE pg_constraint\n' |
| stmt += 'SET conname = %s\n' % newconname |
| stmt += 'WHERE conrelid = %d and conname = %s;' % (relid, oldconname) |
| addAdjustConname(seg, stmt) |
| |
| def addAdjustConname(seg, stmt): |
| if not GV.AdjustConname.has_key(seg): |
| GV.AdjustConname[seg] = [] |
| GV.AdjustConname[seg].append(stmt) |
| |
| def addDemoteConstraint(seg, repair_sequence): |
| if not GV.DemoteConstraint.has_key(seg): |
| GV.DemoteConstraint[seg] = [] |
| GV.DemoteConstraint[seg].append(repair_sequence) |
| |
| |
| ############# |
| def checkDepend(): |
| # Check for dependencies on non-existent objects |
| logger.info('-----------------------------------') |
| logger.info('Checking Object Dependencies') |
| |
| db = connect2(GV.cfg[1], utilityMode=False) |
| |
| # Catalogs that link up to pg_depend/pg_shdepend |
| qry = "select relname from pg_class where relnamespace=11 and relhasoids" |
| curs = db.query(qry) |
| catalogs = [] |
| for row in curs.getresult(): |
| catalogs.append(row[0]) |
| |
| # For each catalog construct the subquery for that catalog |
| # |
| # It would be desireable to switch these queries to a gp_dist_random |
| # type query rather than issueing it in utility mode on every segment, |
| # however because there are numerous issues remaining with oid |
| # inconsistencies (see crossDBCheck()) that doesn't work out well. |
| deps = [] |
| for cat in catalogs: |
| qry = """ |
| SELECT '{catalog}' as catalog, objid FROM ( |
| SELECT objid FROM pg_depend |
| WHERE classid = '{catalog}'::regclass |
| UNION ALL |
| SELECT refobjid FROM pg_depend |
| WHERE refclassid = '{catalog}'::regclass |
| ) d |
| LEFT OUTER JOIN {catalog} c on (d.objid = c.oid) |
| WHERE c.oid is NULL |
| UNION ALL |
| SELECT '{catalog}' as catalog, objid FROM ( |
| SELECT dbid, objid FROM pg_shdepend |
| WHERE classid = '{catalog}'::regclass |
| UNION ALL |
| SELECT dbid, refobjid FROM pg_shdepend |
| WHERE refclassid = '{catalog}'::regclass |
| ) d JOIN pg_database db |
| ON (d.dbid = db.oid and datname= current_database()) |
| LEFT OUTER JOIN {catalog} c on (d.objid = c.oid) |
| WHERE c.oid is NULL |
| """.format(catalog=cat) |
| deps.append(qry) |
| |
| qry = """ |
| SELECT distinct catalog, objid |
| FROM (%s |
| ) q |
| """ % "UNION ALL".join(deps) |
| try: |
| err = connect2run(qry) |
| if not err: |
| logger.info('[OK] basic object dependencies') |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] basic object dependencies') |
| logger.error(' found %d dependencies on dropped objects' % len(err)) |
| # logger.info(qry) |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint("[ERROR] executing test: basic object dependencies") |
| myprint(" Execution error: " + str(e)) |
| myprint(qry) |
| |
| |
| def fixupowners(nspname, relname, oldrole, newrole): |
| |
| # Must first alter the table to the wrong owner, then back to the right |
| # owner. The purpose of this is that AT doesn't dispatch the change unless |
| # it think the table actually changed owner. |
| # |
| # Note: this means that the Owners list must be run in the order given. |
| # |
| GV.Owners.append('-- owner for "%s"."%s"' % (nspname, relname)) |
| GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' % |
| (nspname, relname, oldrole)) |
| GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' % |
| (nspname, relname, newrole)) |
| |
| |
| def checkOwners(): |
| logger.info('-----------------------------------') |
| logger.info('Checking table ownership') |
| |
| # Check owners in pg_class |
| # |
| # - Report each table that has an inconsistency with the master database, |
| # this can be on the table directly, or on one of the table subobjects |
| # (pg_toast, pg_aoseg, etc) |
| # |
| # - Theoretically this could result in multiple corrections for a given |
| # table based on having multiple "wrong" owners. Realistically most |
| # of the problems we have seen with this problem the wrong owner is |
| # almost always the gpadmin user, so this is not expected. If it does |
| # occur it won't be a problem, but we will issue more ALTER TABLE |
| # commands than is strictly necessary. |
| # |
| # - Between 3.3 and 4.0 the ao segment columns migrated from pg_class |
| # to pg_appendonly. |
| db = connect2(GV.cfg[1], utilityMode=False) |
| qry = ''' |
| select distinct n.nspname, coalesce(o.relname, c.relname) as relname, |
| a.rolname, m.rolname as master_rolname |
| from gp_dist_random('pg_class') r |
| join pg_class c on (c.oid = r.oid) |
| left join pg_appendonly ao on (c.oid = ao.segrelid or |
| c.oid = ao.segidxid or |
| c.oid = ao.blkdirrelid or |
| c.oid = ao.blkdiridxid) |
| left join pg_class t on (t.reltoastidxid = c.oid) |
| left join pg_class o on (o.oid = ao.relid or |
| o.reltoastrelid = t.oid or |
| o.reltoastrelid = c.oid) |
| join pg_authid a on (a.oid = r.relowner) |
| join pg_authid m on (m.oid = coalesce(o.relowner, c.relowner)) |
| join pg_namespace n on (n.oid = coalesce(o.relnamespace, c.relnamespace)) |
| where c.relowner <> r.relowner |
| ''' |
| try: |
| curs = db.query(qry) |
| |
| rows = [] |
| for row in curs.dictresult(): |
| rows.append(row) |
| |
| if len(rows) == 0: |
| logger.info('[OK] table ownership') |
| else: |
| GV.testStatus = False |
| setError(ERROR_REMOVE) |
| logger.info('[FAIL] table ownership') |
| logger.error('found %d table ownership issue(s)' % len(rows)) |
| logger.error('%s' % qry) |
| for row in rows[0:100]: |
| logger.error(' %s.%s relowner %s != %s' |
| % (row['nspname'], row['relname'], row['rolname'], |
| row['master_rolname'])) |
| if len(rows) > 100: |
| logger.error("...") |
| for row in rows: |
| fixupowners(row['nspname'], row['relname'], row['rolname'], |
| row['master_rolname']) |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint("[ERROR] executing: check table ownership") |
| myprint(" Execution error: " + str(e)) |
| myprint(qry) |
| |
| # TODO: add a check that subobject owners agree with the main object owner. |
| # (The above checks only compare master vs segment) |
| |
| |
| # Check owners in pg_type |
| # - Ignore implementation types of pg_class entries - they should be |
| # in the check above since ALTER TABLE is required to fix them, not |
| # ALTER TYPE. |
| db = connect2(GV.cfg[1], utilityMode=False) |
| qry = ''' |
| select distinct n.nspname, t.typname, a.rolname, m.rolname as master_rolname |
| from gp_dist_random('pg_type') r |
| join pg_type t on (t.oid = r.oid) |
| join pg_namespace n on (n.oid = t.typnamespace) |
| join pg_authid a on (a.oid = r.typowner) |
| join pg_authid m on (m.oid = t.typowner) |
| where r.typowner <> t.typowner and r.typrelid = 0 |
| ''' |
| try: |
| curs = db.query(qry) |
| |
| rows = [] |
| for row in curs.dictresult(): |
| rows.append(row) |
| |
| if len(rows) == 0: |
| logger.info('[OK] type ownership') |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] type ownership') |
| logger.error('found %d type ownership issue(s)' % len(rows)) |
| logger.error('%s' % qry) |
| for row in rows[0:100]: |
| logger.error(' %s.%s typeowner %s != %s' |
| % (row['nspname'], row['typname'], row['rolname'], |
| row['master_rolname'])) |
| if len(rows) > 100: |
| logger.error("...") |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint("[ERROR] executing test: check type ownership") |
| myprint(" Execution error: " + str(e)) |
| myprint(qry) |
| |
| # FIXME: add repair script |
| # NOTE: types with typrelid probably will be repaired by the |
| # script generated by the table ownership test above. |
| |
| # TODO: |
| # Check owners in pg_proc |
| # Check owners in pg_database |
| # Check owners in pg_tablespace |
| # Check owners in pg_filespace |
| # Check owners in pg_namespace |
| # Check owners in pg_operator |
| # Check owners in pg_opclass |
| # ... |
| |
| |
| def checkPersistentTables(): |
| logger.info('-----------------------------------') |
| logger.info('Checking persistent tables') |
| |
| |
| # Some of the persistency checks modified depending on if we are in |
| # change tracking or not. For instance when we are in-sync we |
| # shouldn't have persistent entries stuck in "drop pending", but this |
| # is a perfectly valid state to maintain while in change-tracking. |
| db = connect2(GV.cfg[1], utilityMode=False) |
| qry = "SELECT count(*) FROM gp_segment_configuration WHERE mode != 's'" |
| curs = db.query(qry) |
| in_sync = (curs.getresult()[0][0] == 0) |
| if in_sync: |
| logger.info("[OK] System is synchronized") |
| else: |
| logger.warn("System requires recovery via gprecoverseg") |
| |
| queries = [] |
| |
| # Checks on FILESPACE |
| qname = "gp_persistent_filespace_node state check" |
| qry = """ |
| SELECT p.filespace_oid, |
| case when p.persistent_state = 0 then 'free' |
| when p.persistent_state = 1 then 'create pending' |
| when p.persistent_state = 2 then 'created' |
| when p.persistent_state = 3 then 'drop pending' |
| when p.persistent_state = 4 then 'abort create' |
| when p.persistent_state = 5 then 'JIT create pending' |
| when p.persistent_state = 6 then 'bulk load create pending' |
| else 'unknown state: ' || p.persistent_state |
| end as persistent_state, |
| case when p.mirror_existence_state = 0 then 'mirror free' |
| when p.mirror_existence_state = 1 then 'not mirrored' |
| when p.mirror_existence_state = 2 then 'mirror create pending' |
| when p.mirror_existence_state = 3 then 'mirror created' |
| when p.mirror_existence_state = 4 then 'mirror down before create' |
| when p.mirror_existence_state = 5 then 'mirror down during create' |
| when p.mirror_existence_state = 6 then 'mirror drop pending' |
| when p.mirror_existence_state = 7 then 'mirror only drop remains' |
| else 'unknown state: ' || p.mirror_existence_state |
| end as mirror_existence_state |
| FROM gp_persistent_filespace_node p |
| WHERE p.persistent_state not in (0, 2) |
| or p.mirror_existence_state not in (0,1,3) |
| """ |
| if in_sync: |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_filespace_node <=> pg_filespace" |
| qry = """ |
| SELECT coalesce(f.oid, p.filespace_oid) as filespace_oid, |
| f.fsname as "filespace" |
| FROM (SELECT * FROM gp_persistent_filespace_node |
| WHERE persistent_state = 2) p |
| FULL OUTER JOIN (SELECT oid, fsname FROM pg_filespace |
| WHERE oid != 3052) f |
| ON (p.filespace_oid = f.oid) |
| WHERE (p.filespace_oid is NULL OR f.oid is NULL) |
| """ |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_filespace_node <=> gp_global_sequence" |
| qry = """ |
| SELECT p.filespace_oid, f.fsname as "filespace", |
| case when p.persistent_state = 0 then 'free' |
| when p.persistent_state = 1 then 'create pending' |
| when p.persistent_state = 2 then 'created' |
| when p.persistent_state = 3 then 'drop pending' |
| when p.persistent_state = 4 then 'abort create' |
| when p.persistent_state = 5 then 'JIT create pending' |
| when p.persistent_state = 6 then 'bulk load create pending' |
| else 'unknown state: ' || p.persistent_state |
| end as persistent_state, |
| p.persistent_serial_num, s.sequence_num |
| FROM gp_global_sequence s, gp_persistent_filespace_node p |
| LEFT JOIN pg_filespace f ON (f.oid = p.filespace_oid) |
| WHERE s.ctid = '(0,4)' and p.persistent_serial_num > s.sequence_num |
| """ |
| queries.append([qname, qry]) |
| |
| # Checks on DATABASE |
| qname = "gp_persistent_database_node state check" |
| qry = """ |
| SELECT p.tablespace_oid, p.database_oid, |
| case when p.persistent_state = 0 then 'free' |
| when p.persistent_state = 1 then 'create pending' |
| when p.persistent_state = 2 then 'created' |
| when p.persistent_state = 3 then 'drop pending' |
| when p.persistent_state = 4 then 'abort create' |
| when p.persistent_state = 5 then 'JIT create pending' |
| when p.persistent_state = 6 then 'bulk load create pending' |
| else 'unknown state: ' || p.persistent_state |
| end as persistent_state, |
| case when p.mirror_existence_state = 0 then 'mirror free' |
| when p.mirror_existence_state = 1 then 'not mirrored' |
| when p.mirror_existence_state = 2 then 'mirror create pending' |
| when p.mirror_existence_state = 3 then 'mirror created' |
| when p.mirror_existence_state = 4 then 'mirror down before create' |
| when p.mirror_existence_state = 5 then 'mirror down during create' |
| when p.mirror_existence_state = 6 then 'mirror drop pending' |
| when p.mirror_existence_state = 7 then 'mirror only drop remains' |
| else 'unknown state: ' || p.mirror_existence_state |
| end as mirror_existence_state |
| FROM gp_persistent_database_node p |
| WHERE p.persistent_state not in (0, 2) |
| or p.mirror_existence_state not in (0,1,3) |
| """ |
| if in_sync: |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_database_node <=> pg_database" |
| qry = """ |
| SELECT coalesce(d.oid, p.database_oid) as database_oid, |
| d.datname as database |
| FROM (SELECT * FROM gp_persistent_database_node |
| WHERE persistent_state = 2) p |
| FULL OUTER JOIN pg_database d |
| ON (d.oid = p.database_oid) |
| WHERE (d.datname is null or p.database_oid is null) |
| """ |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_database_node <=> pg_tablespace" |
| qry = """ |
| SELECT coalesce(t.oid, p.database_oid) as database_oid, |
| t.spcname as tablespace |
| FROM (SELECT * FROM gp_persistent_database_node |
| WHERE persistent_state = 2) p |
| LEFT OUTER JOIN (SELECT oid, spcname FROM pg_tablespace |
| WHERE oid != 1664) t |
| ON (t.oid = p.tablespace_oid) |
| WHERE t.spcname is null |
| """ |
| queries.append([qname, qry]) |
| |
| |
| qname = "gp_persistent_database_node <=> gp_global_sequence" |
| qry = """ |
| SELECT p.database_oid, p.tablespace_oid, d.datname as "database", |
| case when p.persistent_state = 0 then 'free' |
| when p.persistent_state = 1 then 'create pending' |
| when p.persistent_state = 2 then 'created' |
| when p.persistent_state = 3 then 'drop pending' |
| when p.persistent_state = 4 then 'abort create' |
| when p.persistent_state = 5 then 'JIT create pending' |
| when p.persistent_state = 6 then 'bulk load create pending' |
| else 'unknown state: ' || p.persistent_state |
| end as persistent_state, |
| p.persistent_serial_num, s.sequence_num |
| FROM gp_global_sequence s, gp_persistent_database_node p |
| LEFT JOIN pg_database d ON (d.oid = p.database_oid) |
| WHERE s.ctid = '(0,2)' and p.persistent_serial_num > s.sequence_num |
| """ |
| queries.append([qname, qry]) |
| |
| # Checks on TABLESPACE |
| qname = "gp_persistent_tablespace_node state check" |
| qry = """ |
| SELECT p.filespace_oid, p.tablespace_oid, |
| case when p.persistent_state = 0 then 'free' |
| when p.persistent_state = 1 then 'create pending' |
| when p.persistent_state = 2 then 'created' |
| when p.persistent_state = 3 then 'drop pending' |
| when p.persistent_state = 4 then 'abort create' |
| when p.persistent_state = 5 then 'JIT create pending' |
| when p.persistent_state = 6 then 'bulk load create pending' |
| else 'unknown state: ' || p.persistent_state |
| end as persistent_state, |
| case when p.mirror_existence_state = 0 then 'mirror free' |
| when p.mirror_existence_state = 1 then 'not mirrored' |
| when p.mirror_existence_state = 2 then 'mirror create pending' |
| when p.mirror_existence_state = 3 then 'mirror created' |
| when p.mirror_existence_state = 4 then 'mirror down before create' |
| when p.mirror_existence_state = 5 then 'mirror down during create' |
| when p.mirror_existence_state = 6 then 'mirror drop pending' |
| when p.mirror_existence_state = 7 then 'mirror only drop remains' |
| else 'unknown state: ' || p.mirror_existence_state |
| end as mirror_existence_state |
| FROM gp_persistent_tablespace_node p |
| WHERE p.persistent_state not in (0, 2) |
| or p.mirror_existence_state not in (0,1,3) |
| """ |
| if in_sync: |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_tablespace_node <=> pg_tablespace" |
| qry = """ |
| SELECT coalesce(t.oid, p.tablespace_oid) as tablespace_oid, |
| t.spcname as tablespace |
| FROM (SELECT * FROM gp_persistent_tablespace_node |
| WHERE persistent_state = 2) p |
| FULL OUTER JOIN ( |
| SELECT oid, spcname FROM pg_tablespace WHERE oid not in (1663, 1664) |
| ) t ON (t.oid = p.tablespace_oid) |
| WHERE t.spcname is null or p.tablespace_oid is null |
| """ |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_tablespace_node <=> pg_filespace" |
| qry = """ |
| SELECT p.filespace_oid, f.fsname as "filespace" |
| FROM (SELECT * FROM gp_persistent_tablespace_node |
| WHERE persistent_state = 2) p |
| LEFT OUTER JOIN pg_filespace f |
| ON (f.oid = p.filespace_oid) |
| WHERE f.fsname is null |
| """ |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_tablespace_node <=> gp_global_sequence" |
| qry = """ |
| SELECT p.filespace_oid, p.tablespace_oid, t.spcname as "tablespace", |
| case when p.persistent_state = 0 then 'free' |
| when p.persistent_state = 1 then 'create pending' |
| when p.persistent_state = 2 then 'created' |
| when p.persistent_state = 3 then 'drop pending' |
| when p.persistent_state = 4 then 'abort create' |
| when p.persistent_state = 5 then 'JIT create pending' |
| when p.persistent_state = 6 then 'bulk load create pending' |
| else 'unknown state: ' || p.persistent_state |
| end as persistent_state, |
| p.persistent_serial_num, s.sequence_num |
| FROM gp_global_sequence s, gp_persistent_tablespace_node p |
| LEFT JOIN pg_tablespace t ON (t.oid = p.tablespace_oid) |
| WHERE s.ctid = '(0,3)' and p.persistent_serial_num > s.sequence_num |
| """ |
| queries.append([qname, qry]) |
| |
| # Checks on RELATION |
| qname = "gp_persistent_relation_node state check" |
| qry = """ |
| SELECT p.tablespace_oid, p.relfilenode_oid, p.segment_file_num, |
| case when p.persistent_state = 0 then 'free' |
| when p.persistent_state = 1 then 'create pending' |
| when p.persistent_state = 2 then 'created' |
| when p.persistent_state = 3 then 'drop pending' |
| when p.persistent_state = 4 then 'abort create' |
| when p.persistent_state = 5 then 'JIT create pending' |
| when p.persistent_state = 6 then 'bulk load create pending' |
| else 'unknown state: ' || p.persistent_state |
| end as persistent_state, |
| case when p.mirror_existence_state = 0 then 'mirror free' |
| when p.mirror_existence_state = 1 then 'not mirrored' |
| when p.mirror_existence_state = 2 then 'mirror create pending' |
| when p.mirror_existence_state = 3 then 'mirror created' |
| when p.mirror_existence_state = 4 then 'mirror down before create' |
| when p.mirror_existence_state = 5 then 'mirror down during create' |
| when p.mirror_existence_state = 6 then 'mirror drop pending' |
| when p.mirror_existence_state = 7 then 'mirror only drop remains' |
| else 'unknown state: ' || p.mirror_existence_state |
| end as mirror_existence_state |
| FROM gp_persistent_relation_node p |
| WHERE (p.persistent_state not in (0, 2) |
| or p.mirror_existence_state not in (0,1,3)) |
| and p.database_oid in ( |
| SELECT oid FROM pg_database WHERE datname = current_database() |
| ) |
| """ |
| if in_sync: |
| queries.append([qname, qry]) |
| |
| |
| qname = "gp_persistent_relation_node <=> pg_tablespace" |
| qry = """ |
| SELECT distinct p.tablespace_oid |
| FROM (SELECT * FROM gp_persistent_relation_node |
| WHERE persistent_state = 2 |
| AND database_oid in ( |
| SELECT oid FROM pg_database |
| WHERE datname = current_database() |
| UNION ALL |
| SELECT 0)) p |
| LEFT OUTER JOIN pg_tablespace t |
| ON (t.oid = p.tablespace_oid) |
| WHERE t.oid is null |
| """ |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_relation_node <=> pg_database" |
| qry = """ |
| SELECT datname, oid, count(*) |
| FROM ( |
| SELECT d.datname as datname, p.database_oid as oid |
| FROM (SELECT * FROM gp_persistent_relation_node |
| WHERE database_oid != 0 and persistent_state = 2 |
| ) p |
| full outer join pg_database d ON (d.oid = p.database_oid) |
| ) x |
| GROUP BY 1,2 |
| HAVING datname is null or oid is null or count(*) < 100 |
| """ |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_relation_node <=> gp_relation_node" |
| qry = """ |
| SELECT coalesce(p.relfilenode_oid, r.relfilenode_oid) as relfilenode, |
| p.ctid, r.persistent_tid |
| FROM ( |
| SELECT p.ctid, p.* FROM gp_persistent_relation_node p |
| WHERE persistent_state = 2 AND p.database_oid in ( |
| SELECT oid FROM pg_database WHERE datname = current_database() |
| UNION ALL |
| SELECT 0 |
| ) |
| ) p |
| FULL OUTER JOIN gp_relation_node r |
| ON (p.relfilenode_oid = r.relfilenode_oid and |
| p.segment_file_num = r.segment_file_num) |
| WHERE (p.relfilenode_oid is NULL OR |
| r.relfilenode_oid is NULL OR |
| p.ctid != r.persistent_tid) |
| """ |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_relation_node <=> pg_class" |
| qry = """ |
| SELECT coalesce(p.relfilenode_oid, c.relfilenode) as relfilenode, |
| c.nspname, c.relname, c.relkind, c.relstorage |
| FROM ( |
| SELECT * FROM gp_persistent_relation_node |
| WHERE persistent_state = 2 AND database_oid in ( |
| SELECT oid FROM pg_database WHERE datname = current_database() |
| UNION ALL |
| SELECT 0 |
| ) |
| ) p |
| FULL OUTER JOIN ( |
| SELECT n.nspname, c.relname, c.relfilenode, c.relstorage, c.relkind |
| FROM pg_class c |
| LEFT OUTER JOIN pg_namespace n ON (c.relnamespace = n.oid) |
| WHERE c.relstorage not in ('v', 'x', 'f') |
| ) c ON (p.relfilenode_oid = c.relfilenode) |
| WHERE p.relfilenode_oid is NULL OR c.relfilenode is NULL |
| """ |
| queries.append([qname, qry]) |
| |
| qname = "gp_persistent_relation_node <=> gp_global_sequence" |
| qry = """ |
| SELECT p.tablespace_oid, p.database_oid, p.relfilenode_oid, |
| p.segment_file_num, |
| case when p.persistent_state = 0 then 'free' |
| when p.persistent_state = 1 then 'create pending' |
| when p.persistent_state = 2 then 'created' |
| when p.persistent_state = 3 then 'drop pending' |
| when p.persistent_state = 4 then 'abort create' |
| when p.persistent_state = 5 then 'JIT create pending' |
| when p.persistent_state = 6 then 'bulk load create pending' |
| else 'unknown state: ' || p.persistent_state |
| end as persistent_state, |
| p.persistent_serial_num, s.sequence_num |
| FROM gp_global_sequence s, gp_persistent_relation_node p |
| LEFT JOIN pg_tablespace t ON (t.oid = p.tablespace_oid) |
| WHERE s.ctid = '(0,1)' and p.persistent_serial_num > s.sequence_num |
| """ |
| queries.append([qname, qry]) |
| |
| # Look for extra/missing files in the filesystem |
| # |
| # Note: heap tables only ever store segment_file_num 0 in the persistent |
| # tables, while ao/co tables will store every segment_file_num that they |
| # use. This results in the segment_file_num/relstorage where clause. |
| qname = "gp_persistent_relation_node <=> filesystem" |
| qry = """ |
| SELECT coalesce(a.tablespace_oid, b.tablespace_oid) as tablespace_oid, |
| coalesce(a.database_oid, b.database_oid) as database_oid, |
| coalesce(a.relfilenode_oid, b.relfilenode_oid) as relfilenode_oid, |
| coalesce(a.segment_file_num, b.segment_file_num) as segment_file_num, |
| a.relfilenode_oid is null as filesystem, |
| b.relfilenode_oid is null as persistent, |
| b.relkind, b.relstorage |
| FROM gp_persistent_relation_node a |
| FULL OUTER JOIN ( |
| SELECT p.*, c.relkind, c.relstorage |
| FROM gp_persistent_relation_node_check() p |
| LEFT OUTER JOIN pg_class c |
| ON (p.relfilenode_oid = c.relfilenode) |
| WHERE (p.segment_file_num = 0 or c.relstorage != 'h') |
| ) b ON (a.tablespace_oid = b.tablespace_oid and |
| a.database_oid = b.database_oid and |
| a.relfilenode_oid = b.relfilenode_oid and |
| a.segment_file_num = b.segment_file_num) |
| WHERE (a.relfilenode_oid is null OR |
| (a.persistent_state = 2 and b.relfilenode_oid is null)) and |
| coalesce(a.database_oid, b.database_oid) in ( |
| SELECT oid FROM pg_database WHERE datname = current_database() |
| UNION ALL |
| SELECT 0 |
| ); |
| """ |
| queries.append([qname, qry]) |
| |
| # Look for databases in the filesystem that have been dropped but still |
| # have dangling files. |
| qname = "pg_database <=> filesystem" |
| qry = """ |
| SELECT tablespace_oid, database_oid, count(*) |
| FROM gp_persistent_relation_node_check() p |
| LEFT OUTER JOIN pg_database d |
| ON (p.database_oid = d.oid) |
| WHERE d.oid is null and database_oid != 0 |
| GROUP BY tablespace_oid, database_oid; |
| """ |
| queries.append([qname, qry]) |
| |
| |
| # Phew, that was a lot of queries, eh? |
| # now execute them on every segment. |
| for (qname, qry) in queries: |
| err = connect2run(qry) |
| if not err: |
| logger.info('[OK] ' + qname) |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.info('[FAIL] ' + qname) |
| logger.error('%s found %d issue(s)' % (qname, len(err))) |
| logger.error(qry) |
| |
| # Report at most 100 rows per segment, for brevity |
| last = None |
| count = 0 |
| for e in err: |
| cfg = e[0] |
| col = e[1] |
| row = e[2] |
| |
| # If this is a new host start again |
| if last != cfg: |
| count = 0 |
| last = cfg |
| |
| if count == 100: |
| logger.error("...") |
| count += 1 |
| if count > 100: |
| continue |
| |
| # Actual formatting could be prettied up more |
| if count == 0: |
| logger.error("--------") |
| logger.error("%s:%s:%s" % (cfg['hostname'], |
| cfg['port'], |
| cfg['datadir'])) |
| logger.error(" " + " | ".join(map(str, col))) |
| logger.error(" " + " | ".join([str(row[x]) for x in col])) |
| count += 1 |
| |
| return |
| |
| |
| def closeDbs(): |
| for key, conns in GV.db.iteritems(): |
| db = conns[0] |
| db.close() |
| GV.db = {} # remove everything |
| |
| |
| #------------------------------------------------------------------------------- |
| def getCatObj(namestr): |
| |
| db = connect2(GV.cfg[1], utilityMode=False) |
| try: |
| cat = GV.catalog.getCatalogTable(namestr) |
| except Exception, e: |
| myprint("No such catalog table: %s\n" % str(namestr)) |
| raise |
| return cat |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkACL(): |
| logger.info('-----------------------------------') |
| logger.info('Performing cross database ACL tests') |
| |
| # looks up information in the catalog: |
| tables = GV.catalog.getCatalogTables() |
| |
| for cat in sorted(tables): |
| checkTableACL(cat) |
| |
| #------------------------------------------------------------------------------- |
| def checkTableACL(cat): |
| |
| catname = cat.getTableName() |
| pkey = cat.getPrimaryKey() |
| master = cat.isMasterOnly() |
| isShared = cat.isShared() |
| acl = cat.getTableAcl() |
| |
| if GV.aclStatus == None: |
| GV.aclStatus = True |
| |
| # Skip: |
| # - master only tables |
| # - tables without a primary key |
| # - tables without acls |
| if master or pkey == [] or acl == None: |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # Comparing ACLs cannot be done with a simple equality comparison |
| # since it is valid for the ACLs to have different order. Instead |
| # we compare that both acls are subsets of each other => equality. |
| |
| mPkey = ['m.'+i for i in pkey] |
| if cat.tableHasConsistentOids(): |
| mPkey = ['m.oid'] |
| |
| qry = """ |
| SELECT s.gp_segment_id as segid, {mPkey}, |
| m.{acl} as master_acl, s.{acl} as segment_acl |
| FROM {catalog} m |
| JOIN gp_dist_random('{catalog}') s using ({primary_key}) |
| WHERE not (m.{acl} @> s.{acl} and s.{acl} @> m.{acl}) or |
| (m.{acl} is null and s.{acl} is not null) or |
| (s.{acl} is null and m.{acl} is not null) |
| ORDER BY {primary_key}, s.gp_segment_id |
| """.format(catalog = catname, |
| primary_key = ', '.join(pkey), |
| mPkey = ', '.join(mPkey), |
| acl = acl) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[1], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Cross consistency acl check for ' + catname) |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| GV.aclStatus = False |
| logger.info('[FAIL] Cross consistency acl check for ' + catname) |
| logger.error(' %s acl check has %d issue(s)' % (catname, nrows)) |
| # logger.error(qry) |
| |
| fields = curs.listfields() |
| log_literal(logger,logging.ERROR," " + " | ".join(fields)) |
| for row in curs.getresult(): |
| log_literal(logger,logging.ERROR," " + " | ".join(map(str, row))) |
| processACLResult(catname, fields, curs.getresult()) |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| GV.aclStatus = False |
| myprint('[ERROR] executing: Cross consistency check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkForeignKey(): |
| |
| logger.info('-----------------------------------') |
| logger.info('Performing foreign key tests') |
| |
| if GV.foreignKeyStatus == None: |
| GV.foreignKeyStatus = True |
| |
| # looks up information in the catalog: |
| tables = GV.catalog.getCatalogTables() |
| |
| for cat in sorted(tables): |
| checkTableForeignKey(cat) |
| |
| #------------------------------------------------------------------------------- |
| def checkTableForeignKey(cat): |
| |
| catname = cat.getTableName() |
| fkeylist = cat.getForeignKeys() |
| isShared = cat.isShared() |
| pkeylist = cat.getPrimaryKey() |
| coltypes = cat.getTableColtypes() |
| |
| # skip tables without fkey |
| if len(fkeylist) <= 0: |
| return |
| if len(cat.getPrimaryKey()) <= 0: |
| return |
| |
| # skip these master-only tables |
| skipped_masteronly = ['gp_relation_node', 'pg_description', 'pg_shdescription', |
| 'pg_stat_last_operation', 'pg_stat_last_shoperation', 'pg_statistic'] |
| |
| if catname in skipped_masteronly: |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # primary key lists |
| cat1pkeys = [] |
| pkeys = [] |
| |
| # build array of catalog primary keys (with aliases) and |
| # primary key alias list |
| for pk in pkeylist: |
| cat1pkeys.append('cat1.' + pk + ' as %s_%s' % (catname,pk)) |
| pkeys.append('%s_%s' % (catname,pk)) |
| |
| logger.info('Building %d queries to check FK constraint on table %s' % (len(fkeylist),catname)) |
| for fkeydef in fkeylist: |
| castedFkey = [c+autoCast.get(coltypes[c],'') for c in fkeydef.getColumns()] |
| fkeystr = ', '.join(castedFkey) |
| cat1fkeystr = 'cat1.' + ', cat1.'.join(fkeydef.getColumns()) |
| pkeystr = ', '.join(fkeydef.getPKey()) |
| pkcatname = fkeydef.getPkeyTableName() |
| |
| qry = fkQuery(catname, pkcatname, fkeystr, pkeystr, pkeys, cat1pkeys) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[1], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Foreign key check for %s(%s) referencing %s(%s)'% |
| (catname, fkeystr, pkcatname, pkeystr)) |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| GV.foreignKeyStatus = False |
| logger.info('[FAIL] Foreign key check for %s(%s) referencing %s(%s)'% |
| (catname, fkeystr, pkcatname, pkeystr)) |
| logger.error(' %s has %d issue(s): entry has NULL reference of %s(%s)'% |
| (catname, nrows, pkcatname, pkeystr)) |
| |
| fields = curs.listfields() |
| log_literal(logger,logging.ERROR," " + " | ".join(fields)) |
| for row in curs.getresult(): |
| log_literal(logger,logging.ERROR," " + " | ".join(map(str, row))) |
| processForeignKeyResult(catname, pkcatname, fields, curs.getresult()) |
| |
| if catname == 'gp_fastsequence' and pkcatname == 'pg_class': |
| setError(ERROR_REMOVE) |
| removeFastSequence(db) |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| GV.foreignKeyStatus = False |
| myprint('[ERROR] executing: Foreign key check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| #------------------------------------------------------------------------------- |
| def fkQuery(catname, pkcatname, fkeystr, pkeystr, pkeys, cat1pkeys): |
| |
| qry = """ |
| SELECT {primary_key_alias}, {cat2_dot_pk}, |
| array_agg(gp_segment_id order by gp_segment_id) as segids |
| FROM ( |
| SELECT cat1.gp_segment_id, {cat1_dot_pk}, cat1.{FK1} as {cat2_dot_pk} |
| FROM |
| gp_dist_random('{CATALOG1}') cat1 LEFT OUTER JOIN |
| gp_dist_random('{CATALOG2}') cat2 |
| ON (cat1.gp_segment_id = cat2.gp_segment_id AND |
| cat1.{FK1} = cat2.{PK2} ) |
| WHERE cat2.{PK2} is NULL |
| AND cat1.{FK1} != 0 |
| UNION ALL |
| SELECT -1 as gp_segment_id, {cat1_dot_pk}, cat1.{FK1} as {cat2_dot_pk} |
| FROM |
| {CATALOG1} cat1 LEFT OUTER JOIN |
| {CATALOG2} cat2 |
| ON (cat1.gp_segment_id = cat2.gp_segment_id AND |
| cat1.{FK1} = cat2.{PK2} ) |
| WHERE cat2.{PK2} is NULL |
| AND cat1.{FK1} != 0 |
| ORDER BY {primary_key_alias}, gp_segment_id |
| ) allresults |
| GROUP BY {primary_key_alias}, {cat2_dot_pk} |
| """.format(FK1 = fkeystr, |
| PK2 = pkeystr, |
| CATALOG1 = catname, |
| CATALOG2 = pkcatname, |
| cat1_dot_pk = ', '.join(cat1pkeys) , |
| cat2_dot_pk = '%s_%s' % (pkcatname,pkeystr), |
| primary_key_alias = ', '.join(pkeys)) |
| |
| return qry |
| |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkMissingEntry(): |
| logger.info('-----------------------------------') |
| logger.info('Performing cross consistency tests: check for missing or extraneous entries') |
| |
| if GV.missingEntryStatus == None: |
| GV.missingEntryStatus = True |
| |
| # looks up information in the catalog: |
| tables = GV.catalog.getCatalogTables() |
| |
| for cat in sorted(tables): |
| checkTableMissingEntry(cat) |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkTableMissingEntry(cat): |
| |
| catname = cat.getTableName() |
| pkey = cat.getPrimaryKey() |
| master = cat.isMasterOnly() |
| isShared = cat.isShared() |
| coltypes = cat.getTableColtypes() |
| |
| # Skip master only tables |
| if master: |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # Skip catalogs without primary key |
| if pkey == []: |
| logger.warn("[WARN] Skipped missing/extra entry check for %s" % |
| catname) |
| return |
| |
| castedPkey = cat.getPrimaryKey() |
| castedPkey = [ c+autoCast.get(coltypes[c],'') for c in castedPkey ] |
| |
| if cat.tableHasConsistentOids(): |
| qry = missingEntryQuery(GV.max_content, catname, ['oid'], ['oid']) |
| else: |
| qry = missingEntryQuery(GV.max_content, catname, pkey, castedPkey) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[1], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Checking for missing or extraneous entries for ' + catname) |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| GV.missingEntryStatus = False |
| logger.info('[FAIL] Checking for missing or extraneous entries for ' + catname) |
| logger.error(' %s has %d issue(s)' % (catname, nrows)) |
| |
| fields = curs.listfields() |
| log_literal(logger,logging.ERROR," " + " | ".join(fields)) |
| for row in curs.getresult(): |
| log_literal(logger,logging.ERROR," " + " | ".join(map(str, row))) |
| processMissingDuplicateEntryResult(catname, fields, curs.getresult(), "missing") |
| |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| GV.missingEntryStatus = False |
| myprint('[ERROR] executing: Missing or extraneous entries check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| |
| #------------------------------------------------------------------------------- |
| def missingEntryQuery(max_content, catname, pkey, castedPkey): |
| |
| # ================= |
| # Missing / Extra |
| # ================= |
| # Cross product |
| # (all unique {primary_key} present on any segment) |
| # (all unique segment_ids in the system) |
| # Left join |
| # (actual ({primary_key}, segment_id) present in the catalog) |
| # |
| # Count number not null vs number null in the join: |
| # If the nulls are <= 1/2 the result report those segments as "missing" |
| # If the non-nulls are <= 1/2 the result report those segments as "extra" |
| |
| qry = """ |
| SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; |
| |
| -- distribute catalog table from master, so that we can avoid to gather |
| CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS |
| SELECT gp_segment_id segid, {primary_key} FROM {catalog}; |
| SELECT {oidcasted_pk}, exists, array_agg(segid order by segid) as segids |
| FROM |
| ( |
| SELECT ideal.*, |
| case when actual.segid is null then 'missing' else 'extra' end as exists, |
| count(*) over (partition by {primary_key}, actual.segid is null) as subcount |
| FROM ( |
| SELECT segid, {primary_key} |
| FROM( SELECT distinct {primary_key} FROM _tmp_master |
| UNION |
| SELECT distinct {primary_key} FROM gp_dist_random('{catalog}') ) all_pks, |
| ( SELECT distinct content as segid from gp_segment_configuration) all_segs |
| ) ideal |
| LEFT OUTER JOIN |
| ( SELECT segid, {primary_key} FROM _tmp_master |
| UNION ALL |
| SELECT gp_segment_id as segid, {primary_key} FROM gp_dist_random('{catalog}') |
| ) actual USING (segid, {primary_key}) |
| ) missing_extra |
| WHERE subcount <= ({max_content}+2)/2.0 |
| GROUP BY {oidcasted_pk}, exists; |
| """.format(oidcasted_pk = ','.join(castedPkey), |
| primary_key = ','.join(pkey), |
| catalog = catname, |
| max_content = max_content) |
| |
| return qry |
| |
| def transformTextArrayCols(catname, columns, colnames, coltypes): |
| # Distributing by hash(text[]) is potentially dangerous, |
| # so if the table has text[] column, then we need array_to_string |
| # |
| # We should fix the text[] hash expression issue, but |
| # for this script, this seems like a reasonable workaround. |
| |
| transformed = [] |
| for i in range(len(columns)): |
| if coltypes[colnames[i]] == '_text': |
| transformed.append("array_to_string(" + columns[i] + ", ',')") |
| else: |
| transformed.append(columns[i]) |
| |
| return transformed |
| |
| #------------------------------------------------------------------------------- |
| def checkInconsistentEntry(): |
| logger.info('-----------------------------------') |
| logger.info('Performing cross consistency test: check for inconsistent entries') |
| |
| if GV.inconsistentEntryStatus == None: |
| GV.inconsistentEntryStatus = True |
| |
| # looks up information in the catalog: |
| tables = GV.catalog.getCatalogTables() |
| |
| for cat in sorted(tables): |
| checkTableInconsistentEntry(cat) |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkTableInconsistentEntry(cat): |
| |
| catname = cat.getTableName() |
| pkey = cat.getPrimaryKey() |
| master = cat.isMasterOnly() |
| isShared = cat.isShared() |
| columns = cat.getTableColumns(with_acl=False) |
| coltypes = cat.getTableColtypes() |
| |
| # Skip master only tables |
| if master: |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # Skip catalogs without primary key |
| if pkey == []: |
| logger.warn("[WARN] Skipped cross consistency check for %s" % |
| catname) |
| return |
| |
| castedPkey = cat.getPrimaryKey() |
| castedPkey = [ c+autoCast.get(coltypes[c],'') for c in castedPkey ] |
| castcols = [ c+autoCast.get(coltypes[c],'') for c in columns ] |
| |
| # transform text[] cols for sure. See comments on transformTextArrayCols |
| castcols = transformTextArrayCols(catname, castcols, columns, cat.getTableColtypes()) |
| castcols = [castcols[i] + ' AS ' + columns[i] for i in range(len(columns))] |
| if cat.tableHasConsistentOids(): |
| qry = inconsistentEntryQuery(GV.max_content, catname, ['oid'], columns, castcols) |
| else: |
| qry = inconsistentEntryQuery(GV.max_content, catname, castedPkey, columns, castcols) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[1], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Checking for inconsistent entries for ' + catname) |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| GV.inconsistentEntryStatus = False |
| logger.info('[FAIL] Checking for inconsistent entries for ' + catname) |
| logger.error(' %s has %d issue(s)' % (catname, nrows)) |
| |
| fields = curs.listfields() |
| log_literal(logger,logging.ERROR," " + " | ".join(fields)) |
| for row in curs.getresult(): |
| log_literal(logger,logging.ERROR," " + " | ".join(map(str, row))) |
| processInconsistentEntryResult(catname, pkey, fields, curs.getresult()) |
| |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| GV.inconsistentEntryStatus = False |
| myprint('[ERROR] executing: Inconsistent entries check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| #------------------------------------------------------------------------------- |
| def inconsistentEntryQuery(max_content, catname, pkey, columns, castcols): |
| |
| #-- ============== |
| #-- Inconsistent |
| #-- ============== |
| #-- Build set of all values in the instance |
| #-- |
| #-- Count number of unique values for the primary key |
| #-- - Ignore rows where this does not equal number of segments |
| #-- (These are the missing/extra rows and are covered by other checks) |
| #-- |
| #-- Count number of unique values for all columns |
| #-- - Ignore rows where this equals number of segments |
| #-- |
| #-- Group the majority opinion into one bucket and report everyone who disagrees with |
| #-- the majority individually. |
| #-- |
| #-- Majority gets reported with gp_segment_id == null |
| #-- |
| #-- SET GUC due to MPP-14531: |
| |
| qry = """ |
| SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; |
| |
| SET gp_enable_mk_sort=off; |
| |
| -- distribute catalog table from master, so that we can avoid to gather |
| CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS |
| SELECT gp_segment_id segid, {columns} FROM {catalog}; |
| SELECT {columns}, array_agg(gp_segment_id order by gp_segment_id) as segids |
| FROM |
| ( |
| SELECT DISTINCT |
| case when dcount <= ({max_content}+2)/2.0 then gp_segment_id else null end as gp_segment_id, {columns} |
| FROM |
| ( |
| select count(*) over (partition by {columns}) as dcount, |
| count(*) over (partition by {pkey}) as pcount, |
| gp_segment_id, {columns} |
| from ( |
| select segid gp_segment_id, {castcols} from _tmp_master |
| union all |
| select gp_segment_id, {castcols} |
| from gp_dist_random('{catalog}') |
| ) all_segments |
| ) counted_segments |
| WHERE dcount != {max_content}+2 and pcount = {max_content}+2 |
| ORDER BY {pkey}, gp_segment_id |
| ) rowresult |
| GROUP BY {columns} |
| ORDER BY {pkey}, segids; |
| """.format(pkey = ','.join(pkey), |
| catalog = catname, |
| columns = ','.join(columns), |
| castcols = ','.join(castcols), |
| max_content = max_content) |
| |
| return qry |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkDuplicateEntry(): |
| logger.info('-----------------------------------') |
| logger.info('Performing test: checking for duplicate entries') |
| |
| # looks up information in the catalog: |
| tables = GV.catalog.getCatalogTables() |
| |
| for cat in sorted(tables): |
| checkTableDuplicateEntry(cat) |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkTableDuplicateEntry(cat): |
| |
| catname = cat.getTableName() |
| pkey = cat.getPrimaryKey() |
| master = cat.isMasterOnly() |
| isShared = cat.isShared() |
| columns = cat.getTableColumns(with_acl=False) |
| coltypes = cat.getTableColtypes() |
| |
| # Skip master only tables |
| if master: |
| return |
| |
| # skip shared/non-shared tables |
| if GV.opt['-S']: |
| if re.match("none", GV.opt['-S'], re.I) and isShared: |
| return |
| if re.match("only", GV.opt['-S'], re.I) and not isShared: |
| return |
| |
| # Skip catalogs without primary key |
| if pkey == []: |
| logger.warn("[WARN] Skipped duplicate check for %s" % |
| catname) |
| return |
| |
| pkey = [ c+autoCast.get(coltypes[c],'') for c in pkey ] |
| if cat.tableHasConsistentOids(): |
| qry = duplicateEntryQuery(catname, ['oid']) |
| else: |
| qry = duplicateEntryQuery(catname, pkey) |
| |
| # Execute the query |
| try: |
| db = connect2(GV.cfg[1], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Checking for duplicate entries for ' + catname) |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.error('[FAIL] Checking for duplicate entries for ' + catname) |
| logger.error(' %s has %d issue(s)' % (catname, nrows)) |
| |
| fields = curs.listfields() |
| log_literal(logger,logging.ERROR," " + " | ".join(fields)) |
| for row in curs.getresult(): |
| log_literal(logger,logging.ERROR," " + " | ".join(map(str, row))) |
| processMissingDuplicateEntryResult(catname, fields, curs.getresult(), "duplicate") |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] executing: duplicate entries check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| #------------------------------------------------------------------------------- |
| def duplicateEntryQuery(catname, pkey): |
| |
| #-- =========== |
| #-- Duplicate |
| #-- =========== |
| #-- Return any rows having count > 1 for a given segid, {unique_key} pair |
| #-- |
| |
| qry = """ |
| SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; |
| |
| -- distribute catalog table from master, so that we can avoid to gather |
| CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS |
| SELECT gp_segment_id segid, {pkey} FROM {catalog}; |
| SELECT {pkey}, total, array_agg(segid order by segid) as segids |
| FROM ( |
| SELECT segid, {pkey}, count(*) as total |
| FROM ( |
| select segid, {pkey} FROM _tmp_master |
| union all |
| select gp_segment_id as segid, {pkey} FROM gp_dist_random('{catalog}') |
| ) all_segments |
| GROUP BY segid, {pkey} |
| HAVING count(*) > 1 |
| ) rowresult |
| GROUP BY {pkey}, total |
| """.format(catalog = catname, |
| pkey = ','.join(pkey)) |
| |
| return qry |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkDuplicatePersistentEntry(): |
| |
| """ |
| MPP-18178: check duplicate entries in gp_persistent_relation_node |
| have the same: tablespace_oid, database_oid, relfilenode_oid, segment_file_num |
| except mirror_existence_state != 6 (MIRROR DROP PENDING) |
| """ |
| |
| catname = 'gp_persistent_relation_node' |
| fields = ['tablespace_oid', 'database_oid', 'relfilenode_oid', 'segment_file_num'] |
| excol = 'mirror_existence_state' |
| state = 6 |
| |
| qry = duplicatePersistentEntryQuery(catname, fields, excol, state) |
| |
| try: |
| db = connect2(GV.cfg[1], utilityMode=False) |
| curs = db.query(qry) |
| nrows = curs.ntuples() |
| |
| if nrows == 0: |
| logger.info('[OK] Checking for duplicate persistent entries for ' + catname) |
| else: |
| GV.testStatus = False |
| setError(ERROR_NOREPAIR) |
| logger.error('[FAIL] Checking for duplicate persistent entries for ' + catname) |
| logger.error(' %s has %d issue(s)' % (catname, nrows)) |
| |
| fields = curs.listfields() |
| log_literal(logger, logging.ERROR, " " + " | ".join(fields)) |
| for row in curs.getresult(): |
| log_literal(logger, logging.ERROR, " " + " | ".join(map(str, row))) |
| processMissingDuplicateEntryResult(catname, fields, curs.getresult(), "duplicate") |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint('[ERROR] Executing: duplicate persistent entries check for ' + catname) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| def duplicatePersistentEntryQuery(catname, pkey, excol, state): |
| |
| """ |
| Check for duplicate persistent table entries: |
| the query is similar to the duplicate catalog entries query, except an extra condition |
| and the check is limited to the connected database for reporting purposes |
| """ |
| |
| qry = """ |
| SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; |
| |
| -- distribute catalog table from master, so that we can avoid to gather |
| CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS |
| SELECT gp_segment_id segid, {pkey}, {excol} FROM {catalog}; |
| SELECT {pkey}, total, array_agg(segid order by segid) as segids |
| FROM ( |
| SELECT segid, {pkey}, count(*) as total |
| FROM ( |
| select segid, {pkey} from _tmp_master |
| where {excol} != {state} |
| union all |
| select gp_segment_id as segid, {pkey} from gp_dist_random('{catalog}') |
| where {excol} != {state} |
| ) all_segments |
| LEFT OUTER JOIN pg_database d ON (all_segments.database_oid = d.oid) |
| WHERE d.datname = '{dbname}' |
| GROUP BY segid, {pkey} |
| HAVING count(*) > 1 |
| ) rowresult |
| GROUP BY {pkey}, total |
| """.format(catalog=catname, pkey=','.join(pkey), excol=excol, state=state, dbname=GV.dbname) |
| return qry |
| |
| |
| |
| ############################################################################ |
| # version = X.X : run this test as part of running all tests on gpdb <= X.X |
| # individually, any test should be ok run on any version |
| # online = True: okie to run gpcheckcat online |
| # order = X : the order test should be run when running all tests |
| ############################################################################ |
| |
| name_test = { |
| "duplicate": |
| { |
| "description": "Check for duplicate entries", |
| "fn": lambda: checkDuplicateEntry(), |
| "version": 'main', |
| "order": 1, |
| "online": True |
| }, |
| "missing_extraneous": |
| { |
| "description": "Cross consistency check for missing or extraneous entries", |
| "fn": lambda: checkMissingEntry(), |
| "version": 'main', |
| "order": 2, |
| "online": True |
| }, |
| "inconsistent": |
| { |
| "description": "Cross consistency check for master segment inconsistency", |
| "fn": lambda: checkInconsistentEntry(), |
| "version": 'main', |
| "order": 3, |
| "online": True |
| }, |
| "foreign_key": |
| { |
| "description": "Check foreign keys", |
| "fn": lambda: checkForeignKey(), |
| "version": 'main', |
| "order": 4, |
| "online": True |
| }, |
| "acl": |
| { |
| "description": "Cross consistency check for access control privileges", |
| "fn": lambda: checkACL(), |
| "version": 'main', |
| "order": 5, |
| "online": True |
| }, |
| "persistent": |
| { |
| "description": "Check persistent tables", |
| "fn": lambda: checkPersistentTables(), |
| "version": 'main', |
| "order": 6, |
| "online": False |
| }, |
| "pgclass": |
| { |
| "description": "Check pg_class entry that does not have any correspond pg_attribute entry", |
| "fn": lambda: checkPGClass(), |
| "version": 'main', |
| "order": 7, |
| "online": False |
| }, |
| "namespace": |
| { |
| "description": "Check for leaked temporary schema and missing schema definition", |
| "fn": lambda: checkPGNamespace(), |
| "version": 'main', |
| "order": 8, |
| "online": False |
| }, |
| "distribution_policy": |
| { |
| "description": "Check constraints on randomly distributed tables", |
| "fn": lambda: checkDistribPolicy(), |
| "version": 'main', |
| "order": 9, |
| "online": False |
| }, |
| "dependency": |
| { |
| "description": "Check for dependency on non-existent objects", |
| "fn": lambda: checkDepend(), |
| "version": 'main', |
| "order": 10, |
| "online": False |
| }, |
| "owner": |
| { |
| "description": "Check table ownership that is inconsistent with the master database", |
| "fn": lambda: checkOwners(), |
| "version": 'main', |
| "order": 11, |
| "online": True |
| }, |
| "part_integrity": |
| { |
| "description": "Check pg_partition branch integrity, partition with oids, partition distribution policy", |
| "fn": lambda: checkPartitionIntegrity(), |
| "version": 'main', |
| "order": 13, |
| "online": True |
| }, |
| "part_constraint": |
| { |
| "description": "Check constraints on partitioned tables", |
| "fn": lambda: checkPartitionRegularity(), |
| "version": 'main', |
| "order": 14, |
| "online": True |
| }, |
| "duplicate_persistent": |
| { |
| "description": "Check for duplicate gp_persistent_relation_node entries", |
| "fn": lambda: checkDuplicatePersistentEntry(), |
| "version": 'main', |
| "order": 16, |
| "online": True |
| } |
| } |
| |
| |
| #------------------------------------------------------------------------------- |
| def listAllTests(): |
| |
| myprint('\nList of gpcheckcat tests:\n') |
| for name in sorted(name_test, key=lambda x: name_test[x]["order"]): |
| myprint(" %24s: %s" % \ |
| (name, name_test[name]["description"])) |
| myprint('') |
| |
| |
| #------------------------------------------------------------------------------- |
| def runTestCatname(cat): |
| |
| tests = {'missing_extraneous': lambda: checkTableMissingEntry(cat), |
| 'inconsistent': lambda: checkTableInconsistentEntry(cat), |
| 'foreign_key': lambda: checkTableForeignKey(cat), |
| 'duplicate': lambda: checkTableDuplicateEntry(cat), |
| 'acl': lambda: checkTableACL(cat)} |
| |
| for t in sorted(tests): |
| myprint("Performing test '%s' for %s" % (t,cat.getTableName())) |
| stime = time.time() |
| tests[t]() |
| etime = time.time() |
| elapsed = etime - stime |
| GV.elapsedTime += elapsed |
| elapsed = str(datetime.timedelta(seconds=elapsed))[:-4] |
| GV.totalTestRun += 1 |
| myprint('Total runtime for this test: %s' % (elapsed)) |
| |
| |
| #------------------------------------------------------------------------------- |
| def runOneTest(name): |
| |
| if not name_test.has_key(name): |
| logger.info("'%s' is not a valid test" % (name)) |
| raise LookupError |
| if GV.opt['-O'] and not name_test[name]["online"]: |
| logger.info("%s: Skip this test in online mode" % (name)) |
| return |
| else: |
| myprint("Performing test '%s'" % name) |
| GV.totalTestRun += 1 |
| GV.testStatus = True |
| stime = time.time() |
| name_test[name]["fn"]() |
| etime = time.time() |
| elapsed = etime - stime |
| GV.elapsedTime += elapsed |
| elapsed = str(datetime.timedelta(seconds=elapsed))[:-4] |
| myprint('Total runtime for this test: %s' % (elapsed)) |
| if GV.testStatus == False: |
| GV.failedTest.append(name) |
| |
| |
| |
| #------------------------------------------------------------------------------- |
| def runAllTests(): |
| ''' |
| perform catalog check for specified database |
| ''' |
| |
| for name in sorted(name_test, key=lambda x: name_test[x]["order"]): |
| if name_test[name]["version"] >= GV.version: |
| runOneTest(name) |
| |
| closeDbs() |
| |
| logger.info("------------------------------------") |
| |
| fixes = (len(GV.Remove) > 0 or |
| len(GV.AdjustConname) > 0 or |
| len(GV.DemoteConstraint) > 0 or |
| len(GV.ReSync) > 0 or |
| len(GV.Reindex) > 0 or |
| len(GV.Constraints) > 0 or |
| len(GV.Owners) > 0 or |
| len(GV.Schemas) > 0 or |
| len(GV.Policies) > 0) |
| if GV.opt['-g'] != None and fixes: |
| try: |
| if not os.path.exists(GV.opt['-g']): |
| os.mkdir(GV.opt['-g']) |
| except Exception, e: |
| logger.fatal('Unable to create directory "%s": %s' % |
| (GV.opt['-g'], str(e))) |
| sys.exit(1) |
| |
| logger.debug('Building catalog repair scripts') |
| # set allow_system_table_mods GUC for gpdb >= 4.2 |
| catmod_guc = "" |
| if GV.version >= "4.2": |
| catmod_guc = "-c allow_system_table_mods=dml" |
| |
| # build a script to run these files |
| script = '#!/bin/bash\ncd $(dirname $0)\n' |
| |
| # don't remove anything if ReSync possible -- |
| # comment out the delete statements from the repair scripts |
| maybeRemove = ""; |
| if len(GV.ReSync): |
| maybeRemove="# " |
| |
| # use the per-dbid loop for removal and for contraint |
| # name adjustments |
| |
| dbids = set(GV.Remove.keys()) |
| dbids = dbids.union(GV.AdjustConname.keys()) |
| dbids = dbids.union(GV.DemoteConstraint.keys()) |
| |
| for seg in dbids: |
| # dbid.host.port.dbname.sql |
| c = GV.cfg[seg] |
| filename = '%i.%s.%i.%s.sql' % (seg, c['hostname'], |
| c['port'], GV.dbname) |
| filename = filename.replace(' ', '_') |
| fullpath = '%s/%s' % (GV.opt['-g'], filename) |
| try: |
| file = open(fullpath, 'w') |
| except Exception, e: |
| logger.fatal('Unable to create file "%s": %s' % |
| (fullpath, str(e))) |
| sys.exit(1) |
| |
| #unique |
| lines = set() |
| if GV.Remove.has_key(seg): |
| lines = lines.union(GV.Remove[seg]) |
| if GV.AdjustConname.has_key(seg): |
| lines = lines.union(GV.AdjustConname[seg]) |
| if GV.DemoteConstraint.has_key(seg): |
| lines = lines.union(GV.DemoteConstraint[seg]) |
| |
| # make sure it is sorted |
| li = list(lines) |
| li.sort() |
| file.write('BEGIN;\n'); |
| for line in li: |
| file.write(line + '\n') |
| file.write('COMMIT;\n'); |
| |
| file.close() |
| |
| script += '\n%secho "Repairing segment %i"' % (maybeRemove, seg) |
| script += ''' |
| {maybe}env PGOPTIONS="-c gp_session_role=utility {guc}" psql -X -a -h {hostname} -p {port} -f {fname} "{dbname}" > {fname}.out |
| '''.format( |
| maybe = maybeRemove, |
| guc = catmod_guc, |
| hostname = c['hostname'], |
| port = c['port'], |
| fname = filename, |
| dbname = GV.dbname) |
| |
| if len(GV.Reindex) > 0: |
| filename = 'reindex.sql' |
| fullpath = '%s/%s' % (GV.opt['-g'], filename) |
| try: |
| file = open(fullpath, 'w') |
| except Exception, e: |
| logger.fatal('Unable to create file "%s": %s' % |
| (fullpath, str(e))) |
| sys.exit(1) |
| |
| for r in GV.Reindex: |
| file.write('REINDEX INDEX %s;\n' % r) |
| file.close() |
| |
| script += 'echo "Reindexing potentially damaged bitmap indexes"\n' |
| c = GV.cfg[1] |
| script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \ |
| (c['hostname'], c['port'], filename, GV.dbname, filename) |
| |
| if len(GV.Constraints) > 0: |
| GV.Constraints = list(set(GV.Constraints)) |
| filename = 'removeconstraints.sql' |
| fullpath = '%s/%s' % (GV.opt['-g'], filename) |
| try: |
| file = open(fullpath, 'w') |
| except Exception, e: |
| logger.fatal('Unable to create file "%s": %s' % |
| (fullpath, str(e))) |
| sys.exit(1) |
| |
| for r in GV.Constraints: |
| file.write('%s\n' % r) |
| file.close() |
| |
| script += 'echo "Dropping invalid unique constraints"\n' |
| c = GV.cfg[1] |
| script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \ |
| (c['hostname'], c['port'], filename, GV.dbname, filename) |
| |
| if len(GV.Owners) > 0: |
| # Previously we removed duplicate statements from the "Owners" list |
| # by casting it through a "list(set(...))", however this does not work |
| # since the order of items in the list is important for correct |
| # results. |
| filename = 'fixowners.sql' |
| fullpath = '%s/%s' % (GV.opt['-g'], filename) |
| |
| try: |
| file = open(fullpath, 'w') |
| except Exception, e: |
| logger.fatal('Unable to create file "%s": %s' % |
| (fullpath, str(e))) |
| sys.exit(1) |
| |
| for r in GV.Owners: |
| file.write('%s\n' % r) |
| file.close() |
| |
| script += 'echo "Correcting table ownership"\n' |
| c = GV.cfg[1] |
| script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \ |
| (c['hostname'], c['port'], filename, GV.dbname, filename) |
| |
| if len(GV.Schemas) > 0: |
| # Remove leaked temporary schemas |
| filename = 'fixnamespace.sql' |
| fullpath = '%s/%s' % (GV.opt['-g'], filename) |
| |
| try: |
| file = open(fullpath, 'w') |
| except Exception, e: |
| logger.fatal('Unable to create file "%s": %s' % |
| (fullpath, str(e))) |
| sys.exit(1) |
| |
| for s in GV.Schemas: |
| file.write('DROP SCHEMA IF EXISTS "%s" CASCADE;\n' % s) |
| file.close() |
| |
| script += 'echo "Dropping temporary schemas"\n' |
| c = GV.cfg[1] |
| script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \ |
| (c['hostname'], c['port'], filename, GV.dbname, filename) |
| |
| if len(GV.Policies) > 0: |
| # changes to distribution policies |
| filename = 'fixdistribution.sql' |
| fullpath = '%s/%s' % (GV.opt['-g'], filename) |
| |
| try: |
| file = open(fullpath, 'w') |
| except Exception, e: |
| logger.fatal('Unable to create file "%s": %s' % |
| (fullpath, str(e))) |
| sys.exit(1) |
| |
| for r in GV.Policies: |
| file.write('%s\n' % r) |
| file.close() |
| |
| script += 'echo "Fixing distribution policies"\n' |
| c = GV.cfg[1] |
| script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \ |
| (c['hostname'], c['port'], filename, GV.dbname, filename) |
| |
| |
| try: |
| path = '%s/runsql.sh' % GV.opt['-g'] |
| file = open(path, 'w') |
| file.write(script) |
| file.close() |
| os.chmod(path, stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR) |
| except Exception, e: |
| logger.fatal('Could not write script: %s' % str(e)) |
| sys.exit(1) |
| |
| myprint('') |
| myprint('[INFO]: repair scripts generated in directory %s' % GV.opt['-g']) |
| |
| if GV.retcode >= ERROR_NOREPAIR: |
| logger.warn('[WARN]: unable to generate repairs for some issues') |
| logger.info("Check complete") |
| |
| |
| #------------------------------------------------------------------------------- |
| def getCatalog(): |
| # Establish a connection to the master & looks up info in the catalog |
| db = connect2(GV.cfg[1], utilityMode=False) |
| return GPCatalog(db) |
| |
| |
| #------------------------------------------------------------------------------- |
| def getReportConfiguration(): |
| cfg = {} |
| for dbid, each in GV.cfg.iteritems(): |
| if each['content'] == -1: |
| segname = 'master' |
| else: |
| segname = 'seg'+str(each['content']+1) |
| cfg[each['content']] = {'segname': segname, |
| 'hostname': each['hostname'], |
| 'port': each['port']} |
| return cfg |
| |
| #=============================================================================== |
| # GPCHECKCAT REPORT |
| #=============================================================================== |
| |
| GPObjects = {} # key = (catname, oid), value = GPObject |
| GPObjectGraph = {} # key = GPObject, value=[GPObject] |
| #------------------------------------------------------------------------------- |
| |
| |
| # TableMainColumn[catname] = [colname, catname2] |
| # - report this catname's issues as part of catname2 |
| # - catname.colname references catname2.oid |
| #------------------------------------------------------------------------------- |
| TableMainColumn = {} |
| # Table with no OID |
| TableMainColumn['gp_verification_history'] = ['vertoken','gp_verification_history'] |
| TableMainColumn['gp_version_at_initdb'] = ['schemaversion', 'gp_version_at_initdb'] |
| TableMainColumn['pg_aggregate'] = ['aggfnoid','pg_proc'] |
| TableMainColumn['pg_amop'] = ['amopclaid','pg_opclass'] |
| TableMainColumn['pg_amproc'] = ['amopclaid','pg_opclass'] |
| TableMainColumn['pg_appendonly'] = ['relid','pg_class'] |
| TableMainColumn['pg_appendonly_alter_column'] = ['relid','pg_class'] |
| TableMainColumn['pg_attribute'] = ['attrelid','pg_class'] |
| TableMainColumn['pg_attribute_encoding'] = ['attrelid', 'pg_class'] |
| TableMainColumn['pg_auth_member'] = ['roleid','pg_authid'] |
| TableMainColumn['pg_autovacuum'] = ['vacrelid','pg_class'] |
| TableMainColumn['pg_exttable'] = ['reloid','pg_class'] |
| TableMainColumn['pg_foreign_table'] = ['reloid','pg_class'] |
| TableMainColumn['pg_index'] = ['indexrelid','pg_class'] |
| TableMainColumn['pg_inherits'] = ['inhrelid','pg_class'] |
| TableMainColumn['pg_largeobject'] = ['loid','pg_largeobject'] |
| TableMainColumn['pg_pltemplate'] = ['tmplname', 'pg_pltemplate'] |
| TableMainColumn['pg_proc_callback'] = ['profnoid', 'pg_proc'] |
| TableMainColumn['pg_type_encoding'] = ['typid', 'pg_type'] |
| TableMainColumn['pg_window'] = ['winfnoid','pg_proc'] |
| |
| # Table with OID (special case), these OIDs are known to be inconsistent |
| TableMainColumn['pg_attrdef'] = ['adrelid','pg_class'] |
| TableMainColumn['pg_constraint'] = ['conrelid','pg_class'] |
| TableMainColumn['pg_trigger'] = ['tgrelid','pg_class'] |
| TableMainColumn['pg_rewrite'] = ['ev_class','pg_class'] |
| |
| # Persistent table |
| TableMainColumn['gp_persistent_relation_node'] = ['relfilenode_oid', 'pg_class'] |
| |
| # Cast OID alias type to OID since our graph of objects is based on OID |
| # int2vector is also casted to int2[] due to the lack of an <(int2vector, int2vector) operator |
| autoCast = { |
| 'regproc': '::oid', |
| 'regprocedure': '::oid', |
| 'regoper': '::oid', |
| 'regoperator': '::oid', |
| 'regclass': '::oid', |
| 'regtype': '::oid', |
| 'int2vector': '::int2[]' |
| } |
| |
| #------------------------------------------------------------------------------- |
| class QueryException(Exception): pass |
| |
| |
| #------------------------------------------------------------------------------- |
| # Get gpObj from GPObjects, instantiate a new one & add to GPObjects if not found |
| def getGPObject(oid, catname): |
| |
| gpObj = GPObjects.get((oid, catname),None) |
| if gpObj is None: |
| if catname == 'pg_class': |
| gpObj = RelationObject(oid, catname) |
| else: |
| gpObj = GPObject(oid, catname) |
| GPObjects[(oid,catname)] = gpObj |
| return gpObj |
| |
| |
| |
| #------------------------------------------------------------------------------- |
| def getOidFromPK(catname, pkeys): |
| |
| # pkeys: name-value pair |
| pkeystrList = ["%s='%s'" % (key,value) for key, value in pkeys.iteritems()] |
| pkeystr = ' and '.join(pkeystrList) |
| |
| qry = """ |
| SELECT oid |
| FROM ( |
| SELECT oid FROM {catname} |
| WHERE {pkeystr} |
| UNION ALL |
| SELECT oid FROM gp_dist_random('{catname}') |
| WHERE {pkeystr} |
| ) alloids |
| GROUP BY oid |
| ORDER BY count(*) desc, oid |
| """ .format(catname = catname, |
| pkeystr = pkeystr) |
| |
| try: |
| db = connect2(GV.cfg[1], utilityMode=False) |
| curs = db.query(qry) |
| if (len(curs.dictresult()) == 0): |
| raise QueryException("No such entry '%s' in %s" % (pkeystr,catname)) |
| |
| return curs.dictresult().pop()['oid'] |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| #------------------------------------------------------------------------------- |
| # TODO: use execSQLForSingletonRow |
| #------------------------------------------------------------------------------- |
| def getClassOidForType(oid): |
| # For FK check: pg_type entry is missing, so we need to use pg_class |
| qry = """ |
| SELECT oid |
| FROM ( |
| SELECT oid FROM pg_class WHERE reltype=%d |
| UNION ALL |
| SELECT oid FROM gp_dist_random('pg_class') WHERE reltype=%d |
| ) alltyprelids |
| GROUP BY oid ORDER BY count(*) desc LIMIT 1 |
| """ % (oid, oid) |
| |
| try: |
| db = connect() |
| curs = db.query(qry) |
| if len(curs.dictresult()) == 0: return 0 |
| return curs.dictresult().pop()['oid'] |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| |
| |
| #------------------------------------------------------------------------------- |
| def getClassOidForRelfilenode(relfilenode): |
| qry = "SELECT oid FROM pg_class WHERE relfilenode = %d;" % (relfilenode) |
| try: |
| dburl = dbconn.DbURL(hostname=GV.opt['-h'], port=GV.opt['-p'], dbname=GV.dbname) |
| conn = dbconn.connect(dburl) |
| oid = dbconn.execSQLForSingletonRow(conn, qry)[0] |
| return oid |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| |
| #------------------------------------------------------------------------------- |
| # Process results of tests (CC and FK) for a particular catalog: |
| # - Categorize entry into GPObject |
| # - Instantiate GPObject if needed |
| #------------------------------------------------------------------------------- |
| def processMissingDuplicateEntryResult(catname, colname, allValues, type): |
| |
| # type = {"missing" | "duplicate"} |
| ''' |
| colname: proname | pronamespace | proargtypes | exists | segids |
| allValues: add | 2200 | 23 23 | extra | {2,3} |
| scube | 2200 | 1700 | missing | {2} |
| scube_accum | 2200 | 1700 1700 | missing | {2} |
| |
| colname: oid | total | segids |
| allValues: 18853 | 2 | {-1,1} |
| 18853 | 3 | {0} |
| ''' |
| gpObjName = catname |
| gpColName = None |
| pknames = [i for i in colname[:-2]] # Everything except the last two columns |
| |
| # This catname may not be a GPObject (e.g. pg_attribute) |
| # If these catnames have oids, they are known to be inconsistent |
| if catname in TableMainColumn: |
| gpColName = TableMainColumn[catname][0] |
| gpObjName = TableMainColumn[catname][1] |
| elif 'oid' in pknames: |
| gpColName = 'oid' |
| |
| # Process entries |
| for row in allValues: |
| rowObjName = gpObjName |
| segids = row[-1] |
| pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames) |
| |
| if gpColName != None: |
| oid = row[colname.index(gpColName)] |
| else: |
| oid = getOidFromPK(catname, pkeys) |
| pkeys = {'oid': oid} |
| |
| # Special case: constraint for domain, report pg_type.contypid |
| if catname == 'pg_constraint' and oid == 0: |
| oid = row[colname.index('contypid')] |
| rowObjName = 'pg_type' |
| # Special case: if composite type, report as part of RelationObject |
| if rowObjName == 'pg_type': |
| typrelid = getClassOidForType(oid) |
| if typrelid != 0: |
| oid = typrelid |
| rowObjName = 'pg_class' |
| # Special case: persistent table |
| if catname == 'gp_persistent_relation_node': |
| relfilenode = oid |
| oid = getClassOidForRelfilenode(relfilenode) |
| |
| gpObj = getGPObject(oid, rowObjName) |
| |
| if type == "missing": |
| issue = CatMissingIssue(catname, pkeys, segids, row[-2]) |
| gpObj.setMissingIssue(issue) |
| else: |
| assert (type == "duplicate") |
| issue = CatDuplicateIssue(catname, pkeys, segids, row[-2]) |
| gpObj.setDuplicateIssue(issue) |
| |
| |
| |
| #------------------------------------------------------------------------------- |
| def processInconsistentEntryResult(catname, pknames, colname, allValues): |
| # If tableHasInconsistentOid, columns does not have oid |
| ''' |
| 17365 | test10 | 2200 | 17366 | 0 | 0 | 17366 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {0} -- row1 |
| 17365 | test10 | 2200 | 17366 | 0 | 0 | 0 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {NULL} -- row2 |
| |
| 176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 4 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {-1} |
| 176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {0,1} |
| 176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {2,3} |
| ''' |
| |
| # Group allValues rows by its key (oid or primary key) into a dictionary: |
| groupedValues = {} |
| for row in allValues: |
| if 'oid' in colname: |
| keys = row[colname.index('oid')] |
| else: |
| keys = tuple((row[colname.index(n)]) for n in pknames) |
| if keys in groupedValues: |
| groupedValues[keys].append(row) |
| else: |
| groupedValues[keys] = [row] |
| |
| |
| gpObjName = catname |
| gpColName = None |
| |
| # This catname may not be a GPObject (e.g. pg_attribute) |
| # If these catnames have oids, they are known to be inconsistent |
| if catname in TableMainColumn: |
| gpColName = TableMainColumn[catname][0] |
| gpObjName = TableMainColumn[catname][1] |
| elif 'oid' in colname: |
| gpColName = 'oid' |
| |
| for keys,values in groupedValues.iteritems(): |
| rowObjName = gpObjName |
| pkeys = dict((n,values[0][colname.index(n)]) for n in pknames) |
| |
| if gpColName != None: |
| oid = values[0][colname.index(gpColName)] |
| else: |
| oid = getOidFromPK(catname, pkeys) |
| issue = CatInconsistentIssue(catname, colname, list(values)) |
| |
| # Special case: constraint for domain, report pg_type.contypid |
| if catname == 'pg_constraint' and oid == 0: |
| oid = values[0][colname.index('contypid')] |
| rowObjName = 'pg_type' |
| # Special case: if composite type, report as part of RelationObject |
| if rowObjName == 'pg_type': |
| typrelid = getClassOidForType(oid) |
| if typrelid != 0: |
| oid = typrelid |
| rowObjName = 'pg_class' |
| |
| gpObj = getGPObject(oid, rowObjName) |
| gpObj.setInconsistentIssue(issue) |
| |
| |
| #------------------------------------------------------------------------------- |
| def processForeignKeyResult(catname, pkcatname, colname, allValues): |
| |
| ''' |
| colname: pg_class_relname | pg_class_relnamespace | pg_type_oid | segids |
| allValues: test11 | 2200 | 17389 | {-1,0,1,2,3} |
| test4 | 2200 | 17077 | {-1,0,1,2,3} |
| ''' |
| |
| gpObjName = pkcatname |
| gpColName = colname[-2].rsplit('_',1)[1] |
| fkeytab = catname |
| fkeylist = [name.rsplit('_',1)[1] for name in colname[:-2]] |
| |
| # This catname may not be a GPObject (e.g. pg_attribute) |
| # 1. pg_attrdef(adrelid) referencing pg_attribute(attrelid) |
| # 2. pg_rewrite(ev_class) referencing pg_attribute(attrelid) |
| if pkcatname in TableMainColumn: |
| if gpColName == TableMainColumn[catname][0]: |
| gpObjName = TableMainColumn[catname][1] |
| |
| # Process each row |
| for row in allValues: |
| rowObjName = gpObjName |
| segids = row[-1] |
| oid = row[len(colname)-2] |
| # Build fkeys dict: used to find oid of pg_class |
| fkeys = dict((n,row[colname.index(fkeytab+'_'+n)]) for n in fkeylist) |
| issue = CatForeignKeyIssue(pkcatname, {gpColName: oid}, segids, catname, fkeys) |
| |
| # Special cases: |
| # 1. pg_class(oid) referencing pg_type(oid) - relation & composite type |
| if pkcatname == 'pg_type' and fkeytab == 'pg_class': |
| rowObjName = 'pg_class' |
| oid = getOidFromPK(rowObjName, fkeys) |
| gpObj = getGPObject(oid, rowObjName) |
| gpObj.setForeignKeyIssue(issue) |
| |
| |
| #------------------------------------------------------------------------------- |
| def processACLResult(catname, colname, allValues): |
| |
| ''' |
| colname: segid | datname | master_acl | segment_acl |
| allValues: 2 | acldb | {=Tc/nmiller,nmiller=CTc/nmiller,person1=C/nmiller} | {=Tc/nmiller,nmiller=CTc/nmiller} |
| 1 | gptest | None | {=Tc/nmiller,nmiller=CTc/nmiller,person2=CTc/nmiller} |
| ''' |
| |
| gpObjName = catname |
| gpColName = None |
| pknames = [i for i in colname[1:-2]] |
| |
| # We have to look up key to report (pg_pltemplate) |
| if catname in TableMainColumn: |
| gpColName = TableMainColumn[catname][0] |
| gpObjName = TableMainColumn[catname][1] |
| elif 'oid' in pknames: |
| gpColName = 'oid' |
| |
| # Process entries |
| for row in allValues: |
| segid = row[0] |
| pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames) |
| |
| if gpColName != None: |
| oid = row[colname.index(gpColName)] |
| else: |
| oid = getOidFromPK(catname, pkeys) |
| pkeys = {'oid': oid} |
| |
| macl = row[-2][1:-1].split(',') if row[-2] != None else [] |
| sacl = row[-1][1:-1].split(',') if row[-1] != None else [] |
| macl_only = list(set(macl).difference(sacl)) |
| sacl_only = list(set(sacl).difference(macl)) |
| |
| issue = CatACLIssue(catname, pkeys, segid, macl_only, sacl_only) |
| gpObj = getGPObject(oid, gpObjName) |
| gpObj.setACLIssue(issue) |
| |
| |
| #------------------------------------------------------------------------------- |
| class CatInconsistentIssue: |
| |
| def __init__(self, catname, columns, rows): |
| |
| self.catname = catname |
| self.columns = columns |
| self.rows = rows |
| |
| def report(self): |
| |
| def format_segids(segids): |
| if segids == '{NULL}': |
| idstr = 'all other segments' |
| else: |
| ids = [int(i) for i in segids[1:-1].split(',')] |
| idstr = '' |
| for i in ids: |
| idstr += '%s (%s:%d) ' % \ |
| (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'], |
| GV.report_cfg[i]['port']) |
| return idstr |
| |
| for i in range(0,len(self.columns)-1): |
| colset = set(j[i] for j in self.rows) |
| if len(colset) > 1: |
| for row in self.rows: |
| myprint("%20s is '%s' on %s" % (self.columns[i],row[i],format_segids(row[-1]))) |
| |
| |
| #------------------------------------------------------------------------------- |
| class CatForeignKeyIssue: |
| |
| def __init__(self, catname, pkeys, segids, fcatname, fkeys): |
| |
| self.catname = catname |
| self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx |
| self.segids = segids # list of affected segids |
| self.fcatname = fcatname # checked object catname |
| self.fkeys = fkeys # string of fkeys=xxxx |
| |
| def report(self): |
| |
| ids = [int(i) for i in self.segids[1:-1].split(',')] |
| idstr = '' |
| if len(ids) == len(GV.report_cfg): |
| idstr = 'all segments' |
| else: |
| for i in ids: |
| idstr += '%s (%s:%d) ' % \ |
| (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'], |
| GV.report_cfg[i]['port']) |
| |
| if self.fcatname == 'pg_attribute': |
| myprint(" No %s entry for %s column '%s' on %s" \ |
| % (self.catname, self.fcatname,self.fkeys['attname'],idstr)) |
| else: |
| myprint(" No %s entry for %s %s on %s" \ |
| % (self.catname, self.fcatname,self.fkeys,idstr)) |
| |
| #------------------------------------------------------------------------------- |
| class CatMissingIssue: |
| |
| def __init__(self, catname, pkeys, segids, type ): |
| |
| self.catname = catname |
| self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx |
| self.segids = segids # list of affected segids |
| self.type = type # missing or extra |
| assert(self.type in ['missing','extra']) |
| |
| def report(self): |
| |
| ids = [int(i) for i in self.segids[1:-1].split(',')] |
| idstr = '' |
| if len(ids) == len(GV.report_cfg): |
| idstr = 'all segments' |
| else: |
| for i in ids: |
| idstr += '%s (%s:%d) ' % \ |
| (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'], |
| GV.report_cfg[i]['port']) |
| |
| if self.catname == 'pg_attribute': |
| myprint(" %s column '%s' on %s" \ |
| % (self.type.capitalize(), self.pkeys['attname'],idstr)) |
| elif self.catname == 'pg_class': |
| myprint(' %s relation metadata for %s on %s' \ |
| % (self.type.capitalize(), str(self.pkeys), idstr)) |
| else: |
| myprint(' %s %s metadata of %s on %s' \ |
| % (self.type.capitalize(), self.catname[3:], str(self.pkeys), idstr)) |
| |
| |
| #------------------------------------------------------------------------------- |
| class CatDuplicateIssue: |
| |
| def __init__(self, catname, pkeys, segids, enum ): |
| |
| self.catname = catname |
| self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx |
| self.segids = segids # list of affected segids |
| self.enum = enum # number of entries |
| |
| def report(self): |
| """ Found # catname entries of "oid=XXXXX" on segment {Y,Z} """ |
| myprint(' Found %d %s entries of %s on segment %s' \ |
| % (self.enum, self.catname, str(self.pkeys), str(self.segids))) |
| |
| |
| #------------------------------------------------------------------------------- |
| class CatACLIssue: |
| |
| def __init__(self, catname, pkeys, segid, macl_only, sacl_only ): |
| |
| self.catname = catname |
| self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx |
| self.segid = segid # the affected segid |
| self.macl_only = macl_only # acl appears only on master |
| self.sacl_only = sacl_only # acl appears only on segment |
| |
| def report(self): |
| """ Master (host:port) and seg# (host:port) have different ACL: |
| Exist(s) on master only: [...........] |
| Exist(s) on seg# only: [...........] |
| """ |
| mstr = 'Master (%s:%s)' % (GV.report_cfg[-1]['hostname'],GV.report_cfg[-1]['port']) |
| sstr = '%s (%s:%s)' % (GV.report_cfg[self.segid]['segname'], |
| GV.report_cfg[self.segid]['hostname'], |
| GV.report_cfg[self.segid]['port']) |
| myprint(' %s and %s have different ACLs:' % (mstr, sstr)) |
| if len(self.macl_only) > 0: |
| myprint(' Exist(s) on master only: %s' % (self.macl_only)) |
| if len(self.sacl_only) > 0: |
| myprint(' Exist(s) on %s only: %s ' % \ |
| (GV.report_cfg[self.segid]['segname'], self.sacl_only)) |
| |
| |
| #------------------------------------------------------------------------------- |
| class GPObject: |
| |
| def __init__(self, oid, catname): |
| self.oid = oid |
| self.catname = catname |
| self.missingIssues = {} # key=issue.catname, value=list of catMissingIssue |
| self.inconsistentIssues = {} # key=issue.catname, value=list of catInconsistentIssue |
| self.duplicateIssues = {} # key=issue.catname, value=list of catDuplicateIssue |
| self.aclIssues = {} # key=issue.catname, value=list of catACLIssue |
| self.foreignkeyIssues = {} # key=issue.catname, value=list of catForeignKeyIssue |
| |
| def setMissingIssue(self, issue): |
| if issue.catname in self.missingIssues: |
| self.missingIssues[issue.catname].append(issue) |
| else: |
| self.missingIssues[issue.catname] = [issue] |
| |
| def setInconsistentIssue(self, issue): |
| if issue.catname in self.inconsistentIssues: |
| self.inconsistentIssues[issue.catname].append(issue) |
| else: |
| self.inconsistentIssues[issue.catname] = [issue] |
| |
| def setDuplicateIssue(self, issue): |
| if issue.catname in self.duplicateIssues: |
| self.duplicateIssues[issue.catname].append(issue) |
| else: |
| self.duplicateIssues[issue.catname] = [issue] |
| |
| def setACLIssue(self, issue): |
| if issue.catname in self.aclIssues: |
| self.aclIssues[issue.catname].append(issue) |
| else: |
| self.aclIssues[issue.catname] = [issue] |
| |
| def setForeignKeyIssue(self, issue): |
| if issue.catname in self.foreignkeyIssues: |
| self.foreignkeyIssues[issue.catname].append(issue) |
| else: |
| self.foreignkeyIssues[issue.catname] = [issue] |
| |
| def isTopLevel(self): |
| return True |
| |
| def reportAllIssues(self): |
| if self.__class__ == GPObject: |
| myprint('') |
| myprint('----------------------------------------------------') |
| myprint('Object oid: %s' % (self.oid)) |
| myprint('Table name: %s' % (self.catname)) |
| myprint('') |
| |
| # Report inconsistent issues |
| if len(self.inconsistentIssues): |
| for catname,issues in self.inconsistentIssues.iteritems(): |
| myprint(' Name of test which found this issue: inconsistent_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| # Report missing issues |
| if len(self.missingIssues): |
| omitlist = ['pg_attribute','pg_attribute_encoding','pg_type','pg_appendonly','pg_index'] |
| if 'pg_class' in self.missingIssues: |
| myprint(' Name of test which found this issue: missing_extraneous_pg_class') |
| for name in omitlist: |
| if name in self.missingIssues: |
| myprint(' Name of test which found this issue: missing_extraneous_%s' % name) |
| for each in self.missingIssues['pg_class']: |
| each.report() |
| |
| for catname, issues in self.missingIssues.iteritems(): |
| if catname != 'pg_class' and catname not in omitlist: |
| myprint(' Name of test which found this issue: missing_extraneous_%s' % catname) |
| for each in issues: |
| each.report() |
| else: |
| for catname,issues in self.missingIssues.iteritems(): |
| myprint(' Name of test which found this issue: missing_extraneous_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| # Report foreign key issues |
| if len(self.foreignkeyIssues): |
| for catname,issues in self.foreignkeyIssues.iteritems(): |
| myprint(' Name of test which found this issue: foreign_key_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| if len(self.duplicateIssues): |
| for catname,issues in self.duplicateIssues.iteritems(): |
| myprint(' Name of test which found this issue: duplicate_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| # Report ACL issues |
| if len(self.aclIssues): |
| for catname,issues in self.aclIssues.iteritems(): |
| myprint(' Name of test which found this issue: acl_%s' % catname) |
| for each in issues: |
| each.report() |
| myprint('') |
| |
| |
| def __cmp__(self, other): |
| if isinstance(other, GPObject): |
| return cmp((self.oid, self.catname), (other.oid, other.catname)) |
| else: |
| return NotImplemented |
| |
| def __hash__(self): |
| return hash((self.oid, self.catname)) |
| |
| |
| #------------------------------------------------------------------------------- |
| class RelationObject(GPObject): |
| |
| def __init__(self, oid, catname): |
| GPObject.__init__(self, oid, catname) |
| self.relname = None |
| self.nspname = None |
| self.relkind = None |
| self.paroid = None |
| |
| def reportAllIssues(self): |
| if self.isTopLevel(): |
| myprint('') |
| myprint('----------------------------------------------------') |
| if self.relkind == 'c': |
| myprint('Type oid: %d' % (self.oid)) |
| myprint('Type name: %s.%s' % (self.nspname, self.relname)) |
| else: |
| myprint('Relation oid: %d' % (self.oid)) |
| myprint('Relation name: %s.%s' % (self.nspname, self.relname)) |
| else: |
| myprint(' Sub-object: ') |
| myprint(' ----------------------------------------------------') |
| myprint(' Relation oid: %d' % (self.oid)) |
| myprint(' Relation name: %s.%s' % (self.nspname, self.relname)) |
| myprint('') |
| GPObject.reportAllIssues(self) |
| |
| def setRelInfo(self, relname, nspname, relkind, paroid): |
| self.relname = relname |
| self.nspname = nspname |
| self.relkind = relkind |
| self.paroid = paroid |
| |
| def isTopLevel(self): |
| if self.relkind == 'i' or self.relkind == 't' or self.relkind == 'o': |
| return False |
| return True |
| |
| |
| #------------------------------------------------------------------------------- |
| def getRelInfo(objects): |
| |
| # Get relinfo: relname, relkind, par oid for all oids |
| # get relname, relkind |
| # left outer join with union of |
| # - get par oid for toast |
| # - get par oid for aoco |
| # - get par oid for index |
| # for each query: if there is inconsistency, majority wins |
| |
| if objects == {}: return |
| |
| oids = [oid for (oid, catname) in objects if catname=='pg_class'] |
| if oids == []: return |
| |
| qry = """ |
| SELECT oid, relname, nspname, relkind, paroid |
| FROM ( |
| SELECT oid, relname, nspname, relkind |
| FROM ( |
| SELECT oid, relname, nspname, relkind, rank() over (partition by oid order by count(*) desc) |
| FROM |
| ( |
| SELECT c.oid, c.relname, c.relkind, n.nspname |
| FROM pg_class c |
| left outer join pg_namespace n |
| on (c.relnamespace = n.oid) |
| WHERE c.oid in ({oids}) |
| UNION ALL |
| SELECT c.oid, c.relname, c.relkind, n.nspname |
| FROM gp_dist_random('pg_class') c |
| left outer join gp_dist_random('pg_namespace') n |
| on (c.relnamespace = n.oid and c.gp_segment_id = n.gp_segment_id) |
| WHERE c.oid in ({oids}) |
| ) allrelinfo |
| GROUP BY oid, relname, nspname, relkind |
| ) relinfo |
| WHERE rank=1 |
| ) relinfo |
| |
| LEFT OUTER JOIN |
| |
| ( |
| SELECT reltoastrelid as childoid, oid as paroid |
| FROM ( |
| SELECT reltoastrelid, oid, rank() over (partition by reltoastrelid order by count(*) desc) |
| FROM |
| ( SELECT reltoastrelid, oid FROM pg_class |
| WHERE reltoastrelid in ({oids}) |
| UNION ALL |
| SELECT reltoastrelid, oid FROM gp_dist_random('pg_class') |
| WHERE reltoastrelid in ({oids}) |
| ) allpar_toast |
| GROUP BY reltoastrelid, oid |
| ) par_toast |
| WHERE rank=1 |
| |
| UNION ALL |
| |
| SELECT segrelid as childoid, relid as paroid |
| FROM ( |
| SELECT segrelid, relid, rank() over (partition by segrelid order by count(*) desc) |
| FROM |
| ( SELECT segrelid, relid FROM pg_appendonly |
| WHERE segrelid in ({oids}) |
| UNION ALL |
| SELECT segrelid, relid FROM gp_dist_random('pg_appendonly') |
| WHERE segrelid in ({oids}) |
| ) allpar_aoco |
| GROUP BY segrelid, relid |
| ) par_aoco |
| WHERE rank=1 |
| |
| UNION ALL |
| |
| SELECT indexrelid as childoid, indrelid as paroid |
| FROM ( |
| SELECT indexrelid, indrelid, rank() over (partition by indexrelid order by count(*) desc) |
| FROM |
| ( SELECT indexrelid, indrelid FROM pg_index |
| WHERE indexrelid in ({oids}) |
| UNION ALL |
| SELECT indexrelid, indrelid FROM gp_dist_random('pg_index') |
| WHERE indexrelid in ({oids}) |
| ) allpar_index |
| GROUP BY indexrelid, indrelid |
| ) par_index |
| WHERE rank=1 |
| ) par ON childoid = oid |
| """.format( oids = ','.join(map(str,oids))) |
| |
| try: |
| db = connect2(GV.cfg[1], utilityMode=False) |
| curs = db.query(qry) |
| for row in curs.getresult(): |
| (oid, relname, nspname, relkind, paroid) = row |
| objects[oid,'pg_class'].setRelInfo(relname, nspname, relkind, paroid) |
| |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| myprint(' Execution error: ' + str(e)) |
| myprint(qry) |
| |
| |
| #------------------------------------------------------------------------------- |
| def buildGraph(): |
| |
| def buildGraphRecursive(objects, graph): |
| if objects == {}: return |
| getRelInfo (objects) |
| localObjects = {} |
| for (oid, catname),obj in objects.iteritems(): |
| # Top level object, add to graph |
| if obj.isTopLevel(): |
| if obj not in graph: |
| graph[obj] = [] |
| # Not top level object, find parent, add to localObjects if needed |
| else: |
| parobj = objects.get((obj.paroid, catname), None) |
| if parobj is None: |
| parobj = localObjects.get((obj.paroid, catname), None) |
| if parobj is None: |
| parobj = RelationObject(obj.paroid, catname) |
| localObjects[(obj.paroid, catname)] = parobj |
| assert (parobj is not None) |
| # Add obj and parobj to graph |
| if parobj not in graph: |
| graph[parobj] = [] |
| if obj not in graph[parobj]: |
| graph[parobj].append(obj) |
| buildGraphRecursive(localObjects, graph) |
| |
| buildGraphRecursive(GPObjects, GPObjectGraph) |
| |
| |
| |
| #------------------------------------------------------------------------------- |
| def checkcatReport(): |
| |
| def reportAllIssuesRecursive(par, graph): |
| par.reportAllIssues() |
| if par not in graph: |
| return |
| for child in sorted(graph[par]): |
| reportAllIssuesRecursive(child, graph) |
| |
| buildGraph() |
| reportedTest = ['duplicate','missing_extraneous','inconsistent','foreign_key','acl', 'duplicate_persistent'] |
| myprint('') |
| myprint('SUMMARY REPORT') |
| myprint('===================================================================') |
| elapsed = str(datetime.timedelta(seconds=GV.elapsedTime))[:-4] |
| myprint('Total runtime for %d test(s): %s' % (GV.totalTestRun, elapsed)) |
| |
| if len(GPObjectGraph) == 0 and len(GV.failedTest) == 0: |
| myprint('Found no catalog issue\n') |
| return |
| |
| if len(GPObjectGraph) > 0: |
| total = 0 |
| for par in GPObjectGraph: |
| if par.isTopLevel(): total += 1 |
| myprint('Found a total of %d issue(s)' % total) |
| |
| for par in sorted(GPObjectGraph): |
| if par.isTopLevel(): |
| reportAllIssuesRecursive(par, GPObjectGraph) |
| myprint('') |
| |
| notReported = set(GV.failedTest).difference(reportedTest) |
| if len(notReported) > 0: |
| myprint('Failed test(s) that are not reported here: %s' % (', '.join(notReported))) |
| myprint('See %s for detail\n' % get_logfile()) |
| |
| |
| def myprint(str): |
| log_literal(logger,logging.CRITICAL,str) |
| |
| |
| ############# |
| if __name__ == '__main__': |
| |
| parseCommandLine() |
| if GV.opt['-l']: |
| listAllTests() |
| sys.exit(GV.retcode) |
| |
| GV.version = getversion() |
| if GV.version < "4.0": |
| myprint("Error: only Greenplum database version >= 4.0 are supported\n") |
| sys.exit(GV.retcode) |
| |
| GV.cfg = getGPConfiguration() |
| GV.report_cfg = getReportConfiguration() |
| GV.max_content = max([GV.cfg[dbid]['content'] for dbid in GV.cfg]) |
| GV.catalog = getCatalog() |
| |
| for dbname in GV.alldb: |
| |
| # Reset global variables |
| GV.reset_stmt_queues() |
| GPObjects = {} |
| GPObjectGraph = {} |
| GV.dbname = dbname |
| |
| myprint('') |
| myprint("Connected as user \'%s\' to database '%s', port '%d', gpdb version '%s'" \ |
| % (GV.opt['-U'], GV.dbname, GV.report_cfg[-1]['port'], GV.version)) |
| myprint('-------------------------------------------------------------------') |
| |
| if GV.opt['-R']: |
| name = GV.opt['-R'] |
| try: |
| runOneTest(name) |
| except LookupError, e: |
| myprint("'%s' is not a valid test\n\n" % (name)) |
| setError(ERROR_NOREPAIR) |
| sys.exit(GV.retcode) |
| elif GV.opt['-C']: |
| namestr = GV.opt['-C'] |
| try: |
| cat = getCatObj(namestr) |
| runTestCatname(cat) |
| except Exception, e: |
| setError(ERROR_NOREPAIR) |
| sys.exit(GV.retcode) |
| else: |
| runAllTests() |
| |
| checkcatReport() |
| |
| # skip shared tables on subsequent passes |
| if not GV.opt['-S']: |
| GV.opt['-S'] = "none" |
| |
| sys.exit(GV.retcode) |