blob: 00c2e4b21f37473240e6c3b45001934eda2a67d2 [file] [log] [blame]
#!/usr/bin/env python3
'''
Usage: gpcheckcat [<option>] [dbname]
-? | --help : help menu
-B parallel : number of worker threads
-g dir : generate SQL to rectify catalog corruption, put it in dir
-p port : DB port number
-P passwd : DB password
-U uname : DB User Name
-v : verbose
-A : all databases
-S option : shared table options (none, only)
-O : Online
-l : list all tests
-R test | 'test1, test2' : run this particular test(s) (quoted, comma seperated list for multiple tests)
-s test | 'test1, test2' : skip this particular test(s) (quoted, comma seperated list for multiple tests)
-C catname : run cross consistency, FK and ACL tests for this catalog table
-x : set session level GUCs
Test subset options are mutually exclusive, use only one of '-R', '-s', or '-C'.
'''
import datetime
import getopt
import logging
import re
import sys
import time
from functools import reduce
try:
from gppylib import gplog
from gppylib.db import dbconn
from gppylib.gpcatalog import *
from gppylib.commands.unix import *
from gppylib.commands.gp import conflict_with_gpexpand
from gppylib.system.info import *
from pgdb import DatabaseError
from gpcheckcat_modules.unique_index_violation_check import UniqueIndexViolationCheck
from gpcheckcat_modules.leaked_schema_dropper import LeakedSchemaDropper
from gpcheckcat_modules.repair import Repair
from gpcheckcat_modules.foreign_key_check import ForeignKeyCheck
from gpcheckcat_modules.orphaned_toast_tables_check import OrphanedToastTablesCheck
import pg
except ImportError as e:
sys.exit('Error: unable to import module: ' + str(e))
# cache OID -> object name cache
oidmap = {}
# -------------------------------------------------------------------------------
EXECNAME = os.path.split(__file__)[-1]
gplog.setup_tool_logging(EXECNAME, getLocalHostname(), getUserName())
gplog.very_quiet_stdout_logging()
logger = gplog.get_default_logger()
sysinfo = SystemInfo(logger)
parallelism = get_max_available_thread_count()
#-------------------------------------------------------------------------------
################
def parse_int(val):
try:
val = int(val)
except ValueError:
val = 0
return val
###############################
def quote_value(name):
"""Add surrounding single quote, double interior single quote."""
assert isinstance(name, str)
return "'" + "''".join(name.split("'")) + "'"
SUCCESS = 0
ERROR_REMOVE = 1
ERROR_RESYNC = 2
ERROR_NOREPAIR = 3
FIRST_NORMAL_OID = 16384
PG_CATALOG_OID = 11
def setError(level):
'''
Increases the error level to the specified level, if specified level is
lower than the existing level no change is made.
error level 0 => success
error level 1 => error, with repair script removes objects
error level 2 => error, with repair script that resynchronizes objects
error level 3 => error, no repair script
'''
GV.retcode = max(level, GV.retcode)
def setScriptRetCode(level):
GV.script_retcode = max(level,GV.script_retcode)
###############################
class Global():
def __init__(self):
self.timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
self.retcode = SUCCESS
self.opt = {}
self.opt['-h'] = None
self.opt['-p'] = None
self.opt['-P'] = None
self.opt['-U'] = None
self.opt['-v'] = False
self.opt['-V'] = False
self.opt['-t'] = False
self.opt['-g'] = 'gpcheckcat.repair.' + self.timestamp
self.opt['-B'] = parallelism
self.opt['-T'] = None
self.opt['-A'] = False
self.opt['-S'] = None
self.opt['-O'] = False
self.opt['-R'] = None
self.opt['-s'] = None
self.opt['-C'] = None
self.opt['-l'] = False
self.opt['-E'] = False
self.opt['-x'] = []
self.coordinator_dbid = None
self.cfg = None
self.dbname = None
self.firstdb = None
self.alldb = []
self.db = {}
self.tmpdir = None
self.reset_stmt_queues()
self.home = os.environ.get('HOME')
if self.home is None:
usage('Error: $HOME must be set')
self.user = os.environ.get('USER') or os.environ.get('LOGNAME')
if self.user is None:
usage('Error: either $USER or $LOGNAME must be set')
self.catalog = None
self.max_content = 0
self.report_cfg = {}
self.script_retcode = SUCCESS
def reset_stmt_queues(self):
# dictionary of SQL statements. Key is dbid
self.Remove = {}
# dictionary of SQL statements. Key is dbid
self.AdjustConname = {}
# dictionary of SQL statements. Key is dbid
self.DemoteConstraint = {}
# array of SQL statements. Key is dbid
self.ReSync = {}
# constraints which are actually unenforcable
self.Constraints = []
# ownership fixups
self.Owners = []
# fix distribution policies
self.Policies = []
# Indexes to rebuild
self.Reindex = []
self.missingEntryStatus = None
self.inconsistentEntryStatus = None
self.foreignKeyStatus = None
self.aclStatus = None
# the following variables used for reporting purposes
self.elapsedTime = 0
self.totalCheckRun = 0
self.checkStatus = True
self.failedChecks = []
self.missing_attr_tables = []
self.extra_attr_tables = []
GV = Global()
###############################
def usage(exitarg=None):
print(__doc__)
sys.exit(exitarg)
###############################
def getversion():
db = connect()
curs = db.query('''
select regexp_replace(version(),
E'.*PostgreSQL [^ ]+ .Apache Cloudberry ([1-9]+.[0-9]+|main).*',
E'\\\\1') as ver;''')
row = curs.getresult()[0]
version = row[0]
logger.debug('got version %s' % version)
return version
###############################
def getalldbs():
"""
get all connectable databases
"""
db = connect()
curs = db.query('''
select datname from pg_database where datallowconn order by datname ''')
row = curs.getresult()
return row
###############################
def parseCommandLine():
try:
# A colon following the flag indicates an argument is expected
(options, args) = getopt.getopt(sys.argv[1:], '?x:p:P:U:B:vg:t:AOS:R:s:C:lE', 'help')
except Exception as e:
usage('Error: ' + str(e))
for (switch, val) in options:
if switch in ('-?', '--help'):
usage(0)
elif switch[1] in 'pBPUgSRCs':
GV.opt[switch] = val
elif switch[1] in 'vtAOlE':
GV.opt[switch] = True
elif switch[1] in 'x':
GV.opt[switch].append(val)
def setdef(x, v):
if not GV.opt[x]:
t = os.environ.get(v)
GV.opt[x] = t
if t:
logger.debug('%s not specified; default to %s (from %s)' %
(x, t, v))
if GV.opt['-l']: return
if GV.opt['-v']:
gplog.enable_verbose_logging()
setdef('-P', 'PGPASSWORD')
setdef('-U', 'PGUSER')
setdef('-U', 'USER')
setdef('-p', 'PGPORT')
if not GV.opt['-p']:
usage('Please specify -p port')
GV.opt['-p'] = parse_int(GV.opt['-p'])
GV.opt['-h'] = 'localhost'
logger.debug('Set default hostname to %s' % GV.opt['-h'])
if GV.opt['-R']:
logger.debug('Run this test: %s' % GV.opt['-R'])
if GV.opt['-s']:
logger.debug('Skip these tests: %s' % GV.opt['-s'])
if GV.opt['-C']:
GV.opt['-C'] = GV.opt['-C'].lower()
logger.debug('Run cross consistency test for table: %s' % GV.opt['-C'])
if (len(args) != 0 and len(args) != 1):
usage('Too many arguments')
if len(args) == 0:
GV.dbname = os.environ.get('PGDATABASE')
if GV.dbname is None:
GV.dbname = 'template1'
logger.debug('database is %s' % GV.dbname)
else:
GV.dbname = args[0]
GV.firstdb = GV.dbname
GV.alldb.append(GV.firstdb)
# build list of all connectable databases
if GV.opt['-A']:
alldb = getalldbs()
GV.alldb.pop()
for adbname in alldb:
GV.alldb.append(adbname[0])
if GV.opt['-S']:
if not re.match("none|only", GV.opt['-S'], re.I):
usage('Error: invalid value \'%s\' for shared table option. Legal values are (none, only)' % GV.opt['-S'])
GV.opt['-S'] = GV.opt['-S'].lower()
logger.debug('coordinator at host %s, port %s, user %s, database %s' %
(GV.opt['-h'], GV.opt['-p'], GV.opt['-U'], GV.dbname))
try:
GV.opt['-B'] = int(GV.opt['-B'])
except Exception as e:
usage('Error: ' + str(e))
if GV.opt['-B'] < 1:
usage('Error: parallelism must be 1 or greater')
logger.debug('degree of parallelism: %s' % GV.opt['-B'])
#############
def connect(user=None, password=None, host=None, port=None,
database=None, utilityMode=False):
'''Connect to DB using parameters in GV'''
# unset search path due to CVE-2018-1058
options = '-c search_path='
if utilityMode:
options += ' -c gp_role=utility'
for val in GV.opt['-x']:
options += ' -c {}'.format(val)
if not user: user = GV.opt['-U']
if not password: password = GV.opt['-P']
if not host: host = GV.opt['-h']
if not port: port = GV.opt['-p']
if not database: database = GV.dbname
try:
logger.debug('connecting to %s:%s %s' % (host, port, database))
db = pg.connect(host=host, port=port, user=user,
passwd=password, dbname=database, opt=options)
except pg.InternalError as ex:
logger.fatal('could not connect to %s: "%s"' %
(database, str(ex).strip()))
exit(1)
logger.debug('connected with %s:%s %s' % (host, port, database))
return db
#############
def connect2(cfgrec, user=None, password=None, database=None, utilityMode=True):
host = cfgrec['address']
port = cfgrec['port']
datadir = cfgrec['datadir']
logger.debug('connect %s:%s:%s' % (host, port, datadir))
if not database: database = GV.dbname
if cfgrec['content'] == -1:
utilityMode = False
key = "%s.%s.%s.%s.%s.%s.%s" % (host, port, datadir, user, password, database,
str(utilityMode))
conns = GV.db.get(key)
if conns:
return conns[0]
conn = connect(host=host, port=port, user=user, password=password,
database=database, utilityMode=utilityMode)
if conn:
GV.db[key] = [conn, cfgrec]
return conn
class execThread(Thread):
def __init__(self, cfg, db, qry):
self.cfg = cfg
self.db = db
self.qry = qry
self.curs = None
self.error = None
Thread.__init__(self)
def run(self):
try:
self.curs = self.db.query(self.qry)
except BaseException as e:
self.error = e
def processThread(threads):
batch = []
for th in threads:
logger.debug('waiting on thread %s' % th.name)
th.join()
if th.error:
setError(ERROR_NOREPAIR)
myprint("%s:%d:%s : %s" %
(th.cfg['hostname'],
th.cfg['port'],
th.cfg['datadir'],
str(th.error)))
else:
batch.append([th.cfg, th.curs])
return batch
#############
def connect2run(qry, col=None):
logger.debug('%s' % qry)
batch = []
threads = []
i = 1
# parallelise queries
for dbid in GV.cfg:
c = GV.cfg[dbid]
db = connect2(c)
thread = execThread(c, db, qry)
thread.start()
logger.debug('launching query thread %s for dbid %i' %
(thread.name, dbid))
threads.append(thread)
# we don't want too much going on at once
if (i % GV.opt['-B']) == 0:
# process this batch of threads
batch.extend(processThread(threads))
threads = []
i += 1
# process the rest of threads
batch.extend(processThread(threads))
err = []
for [cfg, curs] in batch:
if col is None:
col = curs.listfields()
for row in curs.dictresult():
err.append([cfg, col, row])
return err
def formatErr(c, col, row):
s = ('%s:%s:%s, content %s, dbid %s' %
(c['hostname'], c['port'], c['datadir'],
c['content'], c['dbid']))
for i in col:
s = '%s, %s %s' % (s, i, row[i])
return s
#############
def getGPConfiguration():
cfg = {}
db = connect()
# note that in 4.0, sql commands cannot be run against the segment mirrors directly
# so we filter out non-primary segment databases in the query
qry = '''
SELECT content, preferred_role = 'p' as definedprimary,
dbid, role = 'p' as isprimary, hostname, address, port,
datadir
FROM gp_segment_configuration
WHERE (role = 'p' or content < 0 )
'''
curs = db.query(qry)
for row in curs.dictresult():
if row['content'] == -1 and not row['isprimary']:
continue # skip standby coordinator
cfg[row['dbid']] = row
db.close()
return cfg
def checkDistribPolicy():
logger.info('-----------------------------------')
logger.info('Checking constraints on randomly distributed tables')
qry = '''
select n.nspname, rel.relname, pk.conname as constraint
from pg_constraint pk
join pg_class rel on (pk.conrelid = rel.oid)
join pg_namespace n on (rel.relnamespace = n.oid)
join gp_distribution_policy d on (rel.oid = d.localoid)
where pk.contype in('p', 'u') and d.policytype = 'p' and d.distkey = ''
'''
db = connect2(GV.cfg[GV.coordinator_dbid])
try:
curs = db.query(qry)
err = []
for row in curs.dictresult():
err.append([GV.cfg[GV.coordinator_dbid], ('nspname', 'relname', 'constraint'), row])
if not err:
logger.info('[OK] randomly distributed tables')
else:
GV.checkStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] randomly distributed tables')
logger.error('pg_constraint has %d issue(s)' % len(err))
logger.error(qry)
for e in err:
logger.error(formatErr(e[0], e[1], e[2]))
for e in err:
cons = e[2]
removeIndexConstraint(cons['nspname'], cons['relname'],
cons['constraint'])
except Exception as e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkDistribPolicy')
myprint(' Execution error: ' + str(e))
logger.info('-----------------------------------')
logger.info('Checking that unique constraints are only on distribution columns')
# final part of the WHERE clause here is a little tricky: we want to make
# sure that the set of distribution columns is a left subset of the
# constraint.
qry = '''
select n.nspname, rel.relname, pk.conname as constraint
from pg_constraint pk
join pg_class rel on (pk.conrelid = rel.oid)
join pg_namespace n on (rel.relnamespace = n.oid)
join gp_distribution_policy d on (rel.oid = d.localoid)
where pk.contype in ('p', 'u') and d.policytype = 'p'
and not d.distkey::int2[] operator(pg_catalog.<@) pk.conkey
'''
try:
curs = db.query(qry)
err = []
for row in curs.dictresult():
err.append([GV.cfg[GV.coordinator_dbid], ('nspname', 'relname', 'constraint'), row])
if not err:
logger.info('[OK] unique constraints')
else:
GV.checkStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] unique constraints')
logger.error('pg_constraint has %d issue(s)' % len(err))
logger.error(qry)
for e in err: logger.error(formatErr(e[0], e[1], e[2]))
for e in err:
cons = e[2]
removeIndexConstraint(cons['nspname'], cons['relname'],
cons['constraint'])
except Exception as e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkDistribPolicy')
myprint(' Execution error: ' + str(e))
checkConstraintsRepair()
#############
def checkPartitionIntegrity():
logger.info('-----------------------------------')
logger.info('Checking pg_partition ...')
err = []
db = connect()
# Check for the numsegments value of parent and child partition from the gp_distribution_policy table
qry = '''
select inhparent::regclass, inhrelid::regclass, parent.numsegments as numsegments_parent, child.numsegments as numsegments_child
from pg_inherits inner join pg_partitioned_table on (pg_inherits.inhparent = pg_partitioned_table.partrelid)
inner join gp_distribution_policy parent on (parent.localoid = inhparent)
inner join gp_distribution_policy child on (child.localoid = inhrelid)
where parent.numsegments is distinct from child.numsegments
and not (inhrelid in (select ftrelid from pg_catalog.pg_foreign_table) and child.numsegments = NULL);
'''
try:
curs = db.query(qry)
cols = ('inhparent', 'inhrelid', 'numsegments_parent', 'numsegments_child')
col_names = {
'inhparent': 'table',
'inhrelid': 'affected child',
'numsegments_parent': 'parent numsegments value',
'numsegments_child': 'child numsegments value',
}
err = []
for row in curs.dictresult():
err.append([GV.cfg[GV.coordinator_dbid], cols, row])
if not err:
logger.info('[OK] partition numsegments check')
else:
err_count = len(err)
GV.checkStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] partition numsegments check')
logger.error('partition numsegments check found %d issue(s)' % err_count)
if err_count > 100:
logger.error(qry)
myprint(
'[ERROR]: child partition(s) have different numsegments value '
'from the root partition. Check the gpcheckcat log for details.'
)
logger.error('The following tables have different numsegments value (showing at most 100 rows):')
# report at most 100 rows, for brevity
err = err[:100]
for index, e in enumerate(err):
cfg = e[0]
col = e[1]
row = e[2]
if index == 0:
logger.error("--------")
logger.error(" " + " | ".join(map(col_names.get, col)))
logger.error(" " + "-+-".join(['-' * len(col_names[x]) for x in col]))
logger.error(" " + " | ".join([str(row[x]) for x in col]))
if err_count > 100:
logger.error(" ...")
except Exception as e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkPartitionIntegrity')
myprint(' Execution error: ' + str(e))
# Check for the distribution policy of parent and child partitions based on the following conditions:
# 1. If a root is randomly distributed, then all middle, leaf level partitions must also be randomly distributed
# 2. If a root is hash distributed, then middle level should be same as root and
# leaf level partition can be hash on same key or randomly distributed
qry = '''
select inhparent::regclass, inhrelid::regclass,
pg_get_table_distributedby(inhparent) as dby_parent,
pg_get_table_distributedby(inhrelid) as dby_child
from pg_inherits inner join pg_partitioned_table on (pg_inherits.inhparent = pg_partitioned_table.partrelid)
where pg_get_table_distributedby(inhrelid) is distinct from pg_get_table_distributedby(inhparent)
and not (pg_get_table_distributedby(inhparent) = 'DISTRIBUTED RANDOMLY' and pg_get_table_distributedby(inhrelid) = '')
and not (inhrelid in (select ftrelid from pg_catalog.pg_foreign_table) and pg_get_table_distributedby(inhrelid) = '')
and not (pg_get_table_distributedby(inhparent) like 'DISTRIBUTED BY%' and pg_get_table_distributedby(inhrelid) = 'DISTRIBUTED RANDOMLY'
and (select isleaf from pg_partition_tree(inhparent) where relid = inhrelid));
'''
try:
curs = db.query(qry)
cols = ('inhparent', 'inhrelid', 'dby_parent', 'dby_child')
col_names = {
'inhparent': 'table',
'inhrelid': 'affected child',
'dby_parent': 'table distribution key',
'dby_child': 'child distribution key',
}
err = []
for row in curs.dictresult():
err.append([GV.cfg[GV.coordinator_dbid], cols, row])
if not err:
logger.info('[OK] partition distribution policy check')
else:
GV.checkStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] partition distribution policy check')
logger.error('partition distribution policy check found %d issue(s)' % len(err))
if len(err) > 100:
logger.error(qry)
myprint(
'[ERROR]: child partition(s) are distributed differently from '
'the root partition, and must be manually redistributed, for '
'some tables. Check the gpcheckcat log for details.'
)
logger.error('The following tables must be manually redistributed:')
count = 0
for e in err:
cfg = e[0]
col = e[1]
row = e[2]
# TODO: generate a repair script for this row. This is
# difficult, since we can't redistribute child partitions
# directly.
# report at most 100 rows, for brevity
if count == 100:
logger.error("...")
count += 1
if count > 100:
continue
if count == 0:
logger.error("--------")
logger.error(" " + " | ".join(map(col_names.get, col)))
logger.error(" " + "-+-".join(['-' * len(col_names[x]) for x in col]))
logger.error(" " + " | ".join([str(row[x]) for x in col]))
count += 1
logger.error(
'Execute an ALTER TABLE ... SET DISTRIBUTED BY statement, with '
'the desired distribution key, on the partition root for each '
'affected table.'
)
except Exception as e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkPartitionIntegrity')
myprint(' Execution error: ' + str(e))
db.close()
checkPoliciesRepair()
#############
def checkPGClass():
logger.info('-----------------------------------')
logger.info('Checking pg_class ...')
qry = '''
SELECT relname, relkind, tc.oid as oid
FROM pg_class tc left outer join
pg_attribute ta on (tc.oid = ta.attrelid)
WHERE ta.attrelid is NULL and tc.relnatts != 0
'''
err = connect2run(qry, ('relname', 'relkind', 'oid'))
if not err:
logger.info('[OK] pg_class')
else:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] pg_class')
logger.error('pg_class has %d issue(s)' % len(err))
logger.error(qry)
for e in err[0:100]:
logger.error(formatErr(e[0], e[1], e[2]))
if len(err) > 100:
logger.error("...")
#############
def checkPGNamespace():
# Check for objects in various catalogs that are in a schema that has
# been dropped.
logger.info('Checking missing schema definitions ...')
qry = '''
SELECT o.catalog, o.nsp
FROM pg_namespace n right outer join
(select 'pg_class' as catalog, relnamespace as nsp from pg_class
union
select 'pg_type' as catalog, typnamespace as nsp from pg_type
union
select 'pg_operator' as catalog, oprnamespace as nsp from pg_operator
union
select 'pg_proc' as catalog,pronamespace as nsp from pg_proc) o on
(n.oid = o.nsp)
WHERE n.oid is NULL
'''
err = connect2run(qry, ('catalog', 'nsp'))
if not err:
logger.info('[OK] missing schema definitions')
else:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] missing schema definitons')
logger.error('found %d references to non-existent schemas' % len(err))
logger.error(qry)
for e in err:
logger.error(formatErr(e[0], e[1], e[2]))
#############
'''
Produce repair scripts to remove dangling entries of gp_fastsequence:
- one file per segment dbid
- contains DELETE statements to remove catalog entry
'''
def removeFastSequence(db):
'''
MPP-14758: gp_fastsequence does not get cleanup after a failed transaction (AO/CO)
Note: this is slightly different from the normal foreign key check
because it stretches the cross reference across segments.
This makes it safe in the event of cross consistency issues with pg_class,
but may not repair some issues when there are cross consistency problems
'''
try:
qry = """
SELECT dbid, objid
FROM gp_segment_configuration AS cfg JOIN
(SELECT gp_segment_id, objid
FROM (select gp_segment_id, objid from gp_fastsequence
union
select gp_segment_id, objid from gp_dist_random('gp_fastsequence')) AS fs
LEFT OUTER JOIN
(select oid from pg_class
union
select oid from gp_dist_random('pg_class')) AS c
ON (fs.objid = c.oid)
WHERE c.oid IS NULL
) AS r
ON r.gp_segment_id = cfg.content
WHERE cfg.role = 'p';
"""
curs = db.query(qry)
for row in curs.dictresult():
seg = row['dbid'] # dbid of targeted segment
name = 'gp_fastsequence tuple' # for comment purposes
table = 'gp_fastsequence' # table name
cols = {'objid': row['objid']} # column name and value
objname = 'gp_fastsequence' # for comment purposes
buildRemove(seg, name, table, cols, objname)
except Exception as e:
logger.error('removeFastSequence: ' + str(e))
#############
def removeIndexConstraint(nspname, relname, constraint):
GV.Constraints.append('ALTER TABLE "%s"."%s" DROP CONSTRAINT "%s" CASCADE;' % \
(nspname, relname, constraint))
def buildRemove(seg, name, table, cols, objname):
first = False
fullstr = ''
str = ''
for col in cols:
if not first:
fullstr = '--Object Name: %s\n--Remove %s for %i\n' % \
(objname, name, cols[col])
if len(str) > 0:
str += ' or '
str += '%s = \'%s\'' % (col, cols[col])
fullstr += 'DELETE FROM %s WHERE %s;' % (table, str)
addRemove(seg, fullstr)
def addRemove(seg, line):
if not seg in GV.Remove:
GV.Remove[seg] = []
GV.Remove[seg].append(line)
def buildAdjustConname(seg, relname, relid, oldconname, newconname):
# relname text
# relid oid as int
# old/newconname text
stmt = '--Constraint Name: %s on %s becomes %s\n' % \
(oldconname, relname, newconname)
stmt += 'UPDATE pg_constraint\n'
stmt += 'SET conname = %s\n' % newconname
stmt += 'WHERE conrelid = %d and conname = %s;' % (relid, oldconname)
addAdjustConname(seg, stmt)
def addAdjustConname(seg, stmt):
if not seg in GV.AdjustConname:
GV.AdjustConname[seg] = []
GV.AdjustConname[seg].append(stmt)
def addDemoteConstraint(seg, repair_sequence):
if not seg in GV.DemoteConstraint:
GV.DemoteConstraint[seg] = []
GV.DemoteConstraint[seg].append(repair_sequence)
#############
def drop_leaked_schemas(leaked_schema_dropper, dbname):
logger.info('-----------------------------------')
logger.info('Checking for leaked temporary schemas')
db_connection = connect(database=dbname)
try:
dropped_schemas = leaked_schema_dropper.drop_leaked_schemas(db_connection)
if not dropped_schemas:
logger.info('[OK] temporary schemas')
else:
logger.info('[FAIL] temporary schemas')
myprint("Found and dropped %d unbound temporary schemas" % len(dropped_schemas))
logger.error('Dropped leaked schemas \'%s\' in the database \'%s\'' % (dropped_schemas, dbname))
except Exception as e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
finally:
db_connection.close()
def checkDepend():
# Check for dependencies on non-existent objects
logger.info('-----------------------------------')
logger.info('Checking Object Dependencies')
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
# Catalogs that link up to pg_depend/pg_shdepend
qry = """
select relname from pg_class c
where relkind='r'
and relnamespace=%d
and exists (select 1 from pg_attribute a where attname = 'oid' and a.attrelid = c.oid)
""" % PG_CATALOG_OID
curs = db.query(qry)
catalogs = []
for row in curs.getresult():
catalogs.append(row[0])
checkDependJoinCatalog(catalogs)
checkCatalogJoinDepend(catalogs)
def checkDependJoinCatalog(catalogs):
# Construct subquery that will verify that all (classid, objid)
# and (refclassid, refobjid) pairs existing in pg_depend actually
# exist in that catalog table (classid::regclass or
# refclassid::regclass)
deps = []
for cat in catalogs:
qry = """
SELECT '{catalog}' as catalog, objid FROM (
SELECT objid FROM pg_depend
WHERE classid = '{catalog}'::regclass
UNION ALL
SELECT refobjid FROM pg_depend
WHERE refclassid = '{catalog}'::regclass
) d
LEFT OUTER JOIN {catalog} c on (d.objid = c.oid)
WHERE c.oid is NULL
UNION ALL
SELECT '{catalog}' as catalog, objid FROM (
SELECT dbid, objid FROM pg_shdepend
WHERE classid = '{catalog}'::regclass
UNION ALL
SELECT dbid, refobjid FROM pg_shdepend
WHERE refclassid = '{catalog}'::regclass
) d JOIN pg_database db
ON (d.dbid = db.oid and datname= current_database())
LEFT OUTER JOIN {catalog} c on (d.objid = c.oid)
WHERE c.oid is NULL
""".format(catalog=cat)
deps.append(qry)
qry = """
SELECT distinct catalog, objid
FROM (%s
) q
""" % "UNION ALL".join(deps)
try:
err = connect2run(qry)
if not err:
logger.info('[OK] extra object dependencies')
else:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] extra object dependencies')
logger.error(' found %d dependencies on dropped objects' % len(err))
for config, column, row in err:
gpObj = getGPObject(row['objid'], row['catalog'])
issue = CatDependencyIssue(row['catalog'], row['objid'], config['content'])
gpObj.addDependencyIssue(issue)
except Exception as e:
setError(ERROR_NOREPAIR)
myprint("[ERROR] executing test: extra object dependencies")
myprint(" Execution error: " + str(e))
myprint(qry)
def checkCatalogJoinDepend(catalogs):
# Construct subqueries to aggregate all OIDs from catalog tables
# not in the DEPENDENCY_EXCLUSION
deps = []
for cat in catalogs:
if cat not in DEPENDENCY_EXCLUSION:
qry = """
SELECT '{catalog}' AS catalog, oid FROM {catalog} WHERE oid >= {oid}
""".format(catalog=cat, oid=FIRST_NORMAL_OID)
# Exclude pg_proc entries in pg_catalog namespace. This is
# mainly to avoid flagging language handler functions that
# linger after the language is dropped. Extensions (which
# do drop their proclang handler functions) will be
# replacing language so this exclusion should be fine.
if str(cat) == 'pg_proc':
qry += """AND pronamespace != {oid}
""".format(oid=PG_CATALOG_OID)
# Exclude pg_am entries which depend on internal handlers.
if str(cat) == 'pg_am':
qry = """
SELECT '{catalog}' AS catalog, pg_am.oid AS oid FROM {catalog} JOIN pg_proc ON pg_am.amhandler::text = pg_proc.proname WHERE pg_am.oid >= {oid} AND pg_proc.oid >= {oid}
""".format(catalog=cat, oid=FIRST_NORMAL_OID)
deps.append(qry)
# Construct query that will check that each OID in our aggregated
# catalog OIDs list has a reference in pg_depend under objid or
# refobjid column
qry = """
SELECT distinct catalog, c.oid as objid
FROM (
{subquery}
) c
LEFT OUTER JOIN
(
SELECT objid AS oid FROM pg_depend WHERE objid >= {oid}
UNION
SELECT refobjid AS oid FROM pg_depend WHERE refobjid >= {oid}
) d
ON (c.oid = d.oid)
WHERE d.oid IS NULL;
""".format(subquery="UNION ALL".join(deps), oid=FIRST_NORMAL_OID)
try:
err = connect2run(qry)
if not err:
logger.info('[OK] missing object dependencies')
else:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] missing object dependencies')
logger.error(' found %d existing objects without dependencies' % len(err))
for config, column, row in err:
gpObj = getGPObject(row['objid'], row['catalog'])
issue = CatDependencyIssue(row['catalog'], row['objid'], config['content'])
gpObj.addDependencyIssue(issue)
except Exception as e:
setError(ERROR_NOREPAIR)
myprint("[ERROR] executing test: missing object dependencies")
myprint(" Execution error: " + str(e))
myprint(qry)
def fixupowners(nspname, relname, oldrole, newrole):
# Must first alter the table to the wrong owner, then back to the right
# owner. The purpose of this is that AT doesn't dispatch the change unless
# it think the table actually changed owner.
#
# Note: this means that the Owners list must be run in the order given.
#
GV.Owners.append('-- owner for "%s"."%s"' % (nspname, relname))
GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' %
(nspname, relname, oldrole))
GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' %
(nspname, relname, newrole))
def checkOwners():
logger.info('-----------------------------------')
logger.info('Checking table ownership')
# Check owners in pg_class
#
# - Report each table that has an inconsistency with the coordinator database,
# this can be on the table directly, or on one of the table subobjects
# (pg_toast, pg_aoseg, etc)
#
# - Theoretically this could result in multiple corrections for a given
# table based on having multiple "wrong" owners. Realistically most
# of the problems we have seen with this problem the wrong owner is
# almost always the gpadmin user, so this is not expected. If it does
# occur it won't be a problem, but we will issue more ALTER TABLE
# commands than is strictly necessary.
#
# - Between 3.3 and 4.0 the ao segment columns migrated from pg_class
# to pg_appendonly.
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
qry = '''
select distinct n.nspname, coalesce(o.relname, c.relname) as relname,
a.rolname, m.rolname as coordinator_rolname
from gp_dist_random('pg_class') r
join pg_class c on (c.oid = r.oid)
left join pg_appendonly ao on (c.oid = ao.segrelid or
c.oid = ao.blkdirrelid or
c.oid = ao.blkdiridxid)
left join pg_class o on (o.oid = ao.relid or
o.reltoastrelid = c.oid)
join pg_authid a on (a.oid = r.relowner)
join pg_authid m on (m.oid = coalesce(o.relowner, c.relowner))
join pg_namespace n on (n.oid = coalesce(o.relnamespace, c.relnamespace))
where c.relowner <> r.relowner
'''
try:
curs = db.query(qry)
rows = []
for row in curs.dictresult():
rows.append(row)
if len(rows) == 0:
logger.info('[OK] table ownership')
else:
GV.checkStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] table ownership')
logger.error('found %d table ownership issue(s)' % len(rows))
logger.error('%s' % qry)
for row in rows[0:100]:
logger.error(' %s.%s relowner %s != %s'
% (row['nspname'], row['relname'], row['rolname'],
row['coordinator_rolname']))
if len(rows) > 100:
logger.error("...")
for row in rows:
fixupowners(row['nspname'], row['relname'], row['rolname'],
row['coordinator_rolname'])
except Exception as e:
setError(ERROR_NOREPAIR)
myprint("[ERROR] executing: check table ownership")
myprint(" Execution error: " + str(e))
myprint(qry)
# TODO: add a check that subobject owners agree with the main object owner.
# (The above checks only compare coordinator vs segment)
# Check owners in pg_type
# - Ignore implementation types of pg_class entries - they should be
# in the check above since ALTER TABLE is required to fix them, not
# ALTER TYPE.
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
qry = '''
select distinct n.nspname, t.typname, a.rolname, m.rolname as coordinator_rolname
from gp_dist_random('pg_type') r
join pg_type t on (t.oid = r.oid)
join pg_namespace n on (n.oid = t.typnamespace)
join pg_authid a on (a.oid = r.typowner)
join pg_authid m on (m.oid = t.typowner)
where r.typowner <> t.typowner
'''
try:
curs = db.query(qry)
rows = []
for row in curs.dictresult():
rows.append(row)
if len(rows) == 0:
logger.info('[OK] type ownership')
else:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] type ownership')
logger.error('found %d type ownership issue(s)' % len(rows))
logger.error('%s' % qry)
for row in rows[0:100]:
logger.error(' %s.%s typeowner %s != %s'
% (row['nspname'], row['typname'], row['rolname'],
row['coordinator_rolname']))
if len(rows) > 100:
logger.error("...")
except Exception as e:
setError(ERROR_NOREPAIR)
myprint("[ERROR] executing test: check type ownership")
myprint(" Execution error: " + str(e))
myprint(qry)
# FIXME: add repair script
# NOTE: types with typrelid probably will be repaired by the
# script generated by the table ownership check above.
# TODO:
# Check owners in pg_proc
# Check owners in pg_database
# Check owners in pg_tablespace
# Check owners in pg_namespace
# Check owners in pg_operator
# Check owners in pg_opclass
# ...
checkOwnersRepair()
def closeDbs():
for key, conns in GV.db.items():
db = conns[0]
db.close()
GV.db = {} # remove everything
# -------------------------------------------------------------------------------
def getCatObj(namestr):
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
try:
cat = GV.catalog.getCatalogTable(namestr)
except Exception as e:
myprint("No such catalog table: %s\n" % str(namestr))
raise
return cat
# -------------------------------------------------------------------------------
def checkACL():
logger.info('-----------------------------------')
logger.info('Performing cross database ACL tests')
# looks up information in the catalog:
tables = GV.catalog.getCatalogTables()
for cat in tables:
checkTableACL(cat)
# -------------------------------------------------------------------------------
def checkTableACL(cat):
catname = cat.getTableName()
pkey = cat.getPrimaryKey()
coordinator = cat.isCoordinatorOnly()
isShared = cat.isShared()
acl = cat.getTableAcl()
if GV.aclStatus is None:
GV.aclStatus = True
# Skip:
# - coordinator only tables
# - tables without a primary key
# - tables without acls
if coordinator or pkey == [] or acl is None:
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# Comparing ACLs cannot be done with a simple equality comparison
# since it is valid for the ACLs to have different order. Instead
# we compare that both acls are subsets of each other => equality.
mPkey = ['m.' + i for i in pkey]
if cat.tableHasConsistentOids():
mPkey = ['m.oid']
qry = """
SELECT s.gp_segment_id as segid, {mPkey},
m.{acl} as coordinator_acl, s.{acl} as segment_acl
FROM {catalog} m
JOIN gp_dist_random('{catalog}') s using ({primary_key})
WHERE not (m.{acl} @> s.{acl} and s.{acl} @> m.{acl}) or
(m.{acl} is null and s.{acl} is not null) or
(s.{acl} is null and m.{acl} is not null)
ORDER BY {primary_key}, s.gp_segment_id
""".format(catalog=catname,
primary_key=', '.join(pkey),
mPkey=', '.join(mPkey),
acl=acl)
# Execute the query
try:
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Cross consistency acl check for ' + catname)
else:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
GV.aclStatus = False
logger.info('[FAIL] Cross consistency acl check for ' + catname)
logger.error(' %s acl check has %d issue(s)' % (catname, nrows))
fields = curs.listfields()
gplog.log_literal(logger, logging.ERROR, " " + " | ".join(fields))
for row in curs.getresult():
gplog.log_literal(logger, logging.ERROR, " " + " | ".join(map(str, row)))
processACLResult(catname, fields, curs.getresult())
except Exception as e:
setError(ERROR_NOREPAIR)
GV.aclStatus = False
myprint('[ERROR] executing: Cross consistency check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
# -------------------------------------------------------------------------------
def checkForeignKey(cat_tables=None):
logger.info('-----------------------------------')
logger.info('Performing foreign key tests')
if GV.foreignKeyStatus is None:
GV.foreignKeyStatus = True
# looks up information in the catalog:
if not cat_tables:
cat_tables = GV.catalog.getCatalogTables()
db_connection = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
try:
foreign_key_check = ForeignKeyCheck(db_connection, logger, GV.opt['-S'], autoCast)
foreign_key_issues = foreign_key_check.runCheck(cat_tables)
if foreign_key_issues:
GV.checkStatus = False
for catname, tuples in foreign_key_issues.items():
for pkcatname, fields, results in tuples:
processForeignKeyResult(catname, pkcatname, fields, results)
if catname == 'gp_fastsequence' and pkcatname == 'pg_class':
setError(ERROR_REMOVE)
removeFastSequence(db_connection)
else:
setError(ERROR_NOREPAIR)
except Exception as ex:
setError(ERROR_NOREPAIR)
GV.foreignKeyStatus = False
myprint(' Execution error: ' + str(ex))
# -------------------------------------------------------------------------------
def checkMissingEntry():
logger.info('-----------------------------------')
logger.info('Performing cross consistency tests: check for missing or extraneous issues')
if GV.missingEntryStatus is None:
GV.missingEntryStatus = True
catalog_issues = _checkAllTablesForMissingEntries()
if len(catalog_issues) == 0:
return
if GV.opt['-E']:
do_repair_for_extra(catalog_issues)
else:
setError(ERROR_NOREPAIR)
def _checkAllTablesForMissingEntries():
catalog_issues = {}
tables = GV.catalog.getCatalogTables()
for catalog_table_obj in tables:
issues = checkTableMissingEntry(catalog_table_obj)
if not issues:
continue
catalog_name = catalog_table_obj.getTableName()
_, pk_name = getPrimaryKeyColumn(catalog_name, catalog_table_obj.getPrimaryKey())
# We do this check in this function rather than getPrimaryKeyColumn as
# it is used in checkACL and we do not want to modify that functionality
catalog_issues[(catalog_table_obj, pk_name)] = issues
return catalog_issues
# -------------------------------------------------------------------------------
def checkTableMissingEntry(cat):
catname = cat.getTableName()
pkey = cat.getPrimaryKey()
coordinator = cat.isCoordinatorOnly()
isShared = cat.isShared()
coltypes = cat.getTableColtypes()
# Skip coordinator only tables
if coordinator:
return
# Skip gp_segment_configuration
if catname == "gp_segment_configuration":
return
# Skip gp_matview_aux or gp_matview_tables
if catname == "gp_matview_aux" or catname == "gp_matview_tables":
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# Skip catalogs without primary key
if len(pkey) == 0:
logger.warning("[WARN] Skipped missing/extra entry check for %s" % catname)
return
castedPkey = cat.getPrimaryKey()
castedPkey = [c + autoCast.get(coltypes[c], '') for c in castedPkey]
if cat.tableHasConsistentOids():
qry = missingEntryQuery(GV.max_content, catname, ['oid'], ['oid'])
else:
qry = missingEntryQuery(GV.max_content, catname, pkey, castedPkey)
# Execute the query
try:
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
results = curs.getresult()
fields = curs.listfields()
if nrows != 0:
results = filterSpuriousFailures(catname, fields, results)
nrows = len(results)
if nrows == 0:
logger.info('[OK] Checking for missing or extraneous entries for ' + catname)
else:
if catname in ['pg_constraint']:
logger_with_level = logger.warning
log_level = logging.WARNING
else:
GV.checkStatus = False
GV.missingEntryStatus = False
logger_with_level = logger.error
log_level = logging.ERROR
logger.info(('[%s] Checking for missing or extraneous entries for ' + catname) %
('WARNING' if log_level == logging.WARNING else 'FAIL'))
logger_with_level(' %s has %d issue(s)' % (catname, nrows))
gplog.log_literal(logger, log_level, " " + " | ".join(fields))
for row in results:
gplog.log_literal(logger, log_level, " " + " | ".join(map(str, row)))
processMissingDuplicateEntryResult(catname, fields, results, "missing")
if catname == 'pg_type':
generateVerifyFile(catname, fields, results, 'missing_extraneous')
return results
except Exception as e:
setError(ERROR_NOREPAIR)
GV.missingEntryStatus = False
myprint('[ERROR] executing: Missing or extraneous entries check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
class checkAOSegVpinfoThread(execThread):
def __init__(self, cfg, db):
execThread.__init__(self, cfg, db, None)
def run(self):
aoseg_query = """
SELECT a.relname, a.relid, a.segrelid, cl.relname, a.relnatts
FROM (SELECT p.relid, p.segrelid, c.relname, c.relnatts FROM pg_appendonly p LEFT JOIN pg_class c ON p.relid = c.oid LEFT JOIN pg_am a ON (a.oid = c.relam)
WHERE a.amname = 'ao_column') a
LEFT JOIN pg_class cl ON a.segrelid = cl.oid;
"""
try:
# Read the list of aoseg tables from the database
curs = self.db.query(aoseg_query)
for relname, relid, segrelid, segrelname, attr_count in curs.getresult():
# We check vpinfo consistency only for segs that are in state
# AOSEG_STATE_DEFAULT and which are not RESERVED_SEGNO.
# RESERVED_SEGNO can have a different number of attributes than
# the other segments. For eg. if we ALTER TABLE ADD COLUMN in a
# transaction, and then insert rows in the same transaction, and
# then abort, the RESERVED_SEGNO will hold the inserted rows.
# The vpinfo for RESERVED_SEGNO will have more columns than
# relnatts in that case.
qry = "SELECT distinct(length(vpinfo)) FROM pg_aoseg.%s where state = 1 and segno <> 0;" % (segrelname)
vpinfo_curs = self.db.query(qry)
nrows = vpinfo_curs.ntuples()
if nrows == 0:
continue
elif nrows > 1:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] inconsistent vpinfo')
logger.error("found {nrows} vpinfo(s) with different length in 'pg_aoseg.{segrelname}' of table '{relname}' on segment {content}"
.format(nrows = nrows,
segrelname = segrelname,
relname = relname,
content = self.cfg['content']))
logger.error(qry)
continue
vpinfo_length = vpinfo_curs.getresult()[0][0]
# vpinfo is bytea type, the length of the first 3 fields is 12 bytes, and the size of AOCSVPInfoEntry is 16
# typedef struct AOCSVPInfo
# {
# int32 _len;
# int32 version;
# int32 nEntry;
#
# AOCSVPInfoEntry entry[1];
# } AOCSVPInfo;
vpinfo_attr_count = (vpinfo_length - 12) / 16
if vpinfo_attr_count != attr_count:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] inconsistent vpinfo')
logger.error("vpinfo in 'pg_aoseg.{segrelname}' of table '{relname}' contains {vpinfo_attr_count} attributes, while pg_class has {attr_count} attributes on segment {content}"
.format(segrelname = segrelname,
relname = relname,
vpinfo_attr_count = vpinfo_attr_count,
attr_count = attr_count,
content = self.cfg['content']))
logger.error(qry)
except Exception as e:
GV.checkStatus = False
self.error = e
def checkAOSegVpinfo():
threads = []
i = 1
# parallelise check
for dbid in GV.cfg:
cfg = GV.cfg[dbid]
db_connection = connect2(cfg)
thread = checkAOSegVpinfoThread(cfg, db_connection)
thread.start()
logger.debug('launching check thread %s for dbid %i' %
(thread.name, dbid))
threads.append(thread)
if (i % GV.opt['-B']) == 0:
processThread(threads)
threads = []
i += 1
processThread(threads)
# -------------------------------------------------------------------------------
# Exclude these tuples from the catalog table scan
# pg_depend: classid 2603 (pg_amproc) is excluded because their OIDs can be
# nonsynchronous (catalog.c:RelationNeedsSynchronizedOIDs())
MISSING_ENTRY_EXCLUDE = {'pg_depend': 'WHERE classid != 2603'}
def missingEntryQuery(max_content, catname, pkey, castedPkey):
# =================
# Missing / Extra
# =================
#
# (Fetch all the entries from segments. For each entry, collect the
# segment IDs of all the segments where the entry is present, in an array)
#
# Full outer join
#
# (Fetch all entries in coordinator)
#
#
# The WHERE clause at the bottom filters out all the boring rows, leaving
# only rows that are missing from one of the segments, or from the coordinator.
catalog_exclude = MISSING_ENTRY_EXCLUDE.get(catname, "")
qry = """
SELECT {primary_key},
case when coordinator is null then segids
when segids is null then array[coordinator.segid]
else coordinator.segid || segids end as segids
FROM
(
SELECT {primary_key}, array_agg(gp_segment_id order by gp_segment_id) as segids
FROM gp_dist_random('{catalog}') {catalog_exclude} GROUP BY {primary_key}
) as seg
FULL OUTER JOIN
(
SELECT gp_segment_id as segid, {primary_key} FROM {catalog} {catalog_exclude}
) as coordinator
USING ({primary_key})
WHERE coordinator.segid is null
OR segids is null
OR NOT segids @> (select array_agg(content::int4) from gp_segment_configuration WHERE content >= 0)
""".format(primary_key=','.join(pkey),
catalog=catname,
catalog_exclude=catalog_exclude)
return qry
# -------------------------------------------------------------------------------
def checkInconsistentEntry():
logger.info('-----------------------------------')
logger.info('Performing cross consistency test: check for inconsistent entries')
if GV.inconsistentEntryStatus is None:
GV.inconsistentEntryStatus = True
# looks up information in the catalog:
tables = GV.catalog.getCatalogTables()
for cat in tables:
checkTableInconsistentEntry(cat)
# -------------------------------------------------------------------------------
def checkTableInconsistentEntry(cat):
catname = cat.getTableName()
pkey = cat.getPrimaryKey()
coordinator = cat.isCoordinatorOnly()
isShared = cat.isShared()
columns = cat.getTableColumns(with_acl=False)
coltypes = cat.getTableColtypes()
# Skip coordinator only tables
if coordinator:
return
# Skip gp_segment_configuration or pg_appendonly
if catname == "gp_segment_configuration" or catname == "pg_appendonly":
return
# Skip gp_matview_aux or gp_matview_tables
if catname == "gp_matview_aux" or catname == "gp_matview_tables":
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# Skip catalogs without primary key
if pkey == []:
logger.warning("[WARN] Skipped cross consistency check for %s" %
catname)
return
castedPkey = cat.getPrimaryKey()
castedPkey = [c + autoCast.get(coltypes[c], '') for c in castedPkey]
castcols = [c + autoCast.get(coltypes[c], '') for c in columns]
# pg_attribute.attmissingval has pseudo-type 'any', which doesn't allow
# much operations on it. Cast it to text, by replacing it with
# "attmissingval::text" in the columns list.
columns = list(map(lambda b: b.replace("attmissingval","attmissingval::text"), columns))
castcols = list(map(lambda b: b.replace("attmissingval","attmissingval::text"), castcols))
# pg_dependencies and pg_mcv_list disallow inputs, convert to text for the query
columns = list(map(lambda b: b.replace("stxddependencies","stxddependencies::text"), columns))
castcols = list(map(lambda b: b.replace("stxddependencies","stxddependencies::text"), castcols))
columns = list(map(lambda b: b.replace("stxdmcv","stxdmcv::text"), columns))
castcols = list(map(lambda b: b.replace("stxdmcv","stxdmcv::text"), castcols))
# fixme: we should support the rolpassword column when the password is encrypted with SCRAM-SHA-256
if catname == "pg_authid":
columns.remove("rolpassword")
castcols.remove("rolpassword")
columns.remove("rolpasswordsetat")
castcols.remove("rolpasswordsetat")
columns.remove("rollockdate")
castcols.remove("rollockdate")
columns.remove("rolpasswordexpire")
castcols.remove("rolpasswordexpire")
if catname == "pg_password_history":
columns.remove("passhistpasswordsetat")
castcols.remove("passhistpasswordsetat")
if catname == "pg_directory_table":
columns.remove("dtlocation")
castcols.remove("dtlocation")
if cat.tableHasConsistentOids():
qry = inconsistentEntryQuery(GV.max_content, catname, ['oid'], columns, castcols)
else:
qry = inconsistentEntryQuery(GV.max_content, catname, castedPkey, columns, castcols)
# Execute the query
try:
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Checking for inconsistent entries for ' + catname)
else:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
GV.inconsistentEntryStatus = False
logger.info('[FAIL] Checking for inconsistent entries for ' + catname)
logger.error(' %s has %d issue(s)' % (catname, nrows))
fields = curs.listfields()
gplog.log_literal(logger, logging.ERROR, " " + " | ".join(fields))
for row in curs.getresult():
gplog.log_literal(logger, logging.ERROR, " " + " | ".join(map(str, row)))
results = curs.getresult()
processInconsistentEntryResult(catname, pkey, fields, results)
if catname == 'pg_type':
generateVerifyFile(catname, fields, results, 'duplicate')
except Exception as e:
setError(ERROR_NOREPAIR)
GV.inconsistentEntryStatus = False
myprint('[ERROR] executing: Inconsistent entries check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
# -------------------------------------------------------------------------------
def inconsistentEntryQuery(max_content, catname, pkey, columns, castcols):
# -- ==============
# -- Inconsistent
# -- ==============
# -- Build set of all values in the instance
# --
# -- Count number of unique values for the primary key
# -- - Ignore rows where this does not equal number of segments
# -- (These are the missing/extra rows and are covered by other checks)
# --
# -- Count number of unique values for all columns
# -- - Ignore rows where this equals number of segments
# --
# -- Group the majority opinion into one bucket and report everyone who disagrees with
# -- the majority individually.
# --
# -- Majority gets reported with gp_segment_id == null
qry = """
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- distribute catalog table from coordinator, so that we can avoid to gather
CREATE TEMPORARY TABLE _tmp_coordinator ON COMMIT DROP AS
SELECT gp_segment_id AS segid, {columns} FROM {catalog} DISTRIBUTED BY (segid);
SELECT {columns}, array_agg(gp_segment_id order by gp_segment_id) as segids
FROM
(
SELECT DISTINCT
case when dcount <= ({max_content}+2)/2.0 then gp_segment_id else null end as gp_segment_id, {columns}
FROM
(
select count(*) over (partition by {columns}) as dcount,
count(*) over (partition by {pkey}) as pcount,
gp_segment_id, {columns}
from (
select segid gp_segment_id, {castcols} from _tmp_coordinator
union all
select gp_segment_id, {castcols}
from gp_dist_random('{catalog}')
) all_segments
) counted_segments
WHERE dcount != {max_content}+2 and pcount = {max_content}+2
ORDER BY {pkey}, gp_segment_id
) rowresult
GROUP BY {columns}
ORDER BY {pkey}, segids;
""".format(pkey=','.join(pkey),
catalog=catname,
columns=','.join(columns),
castcols=','.join(castcols),
max_content=max_content)
return qry
# -------------------------------------------------------------------------------
def checkDuplicateEntry():
logger.info('-----------------------------------')
logger.info('Performing test: checking for duplicate entries')
# looks up information in the catalog:
tables = GV.catalog.getCatalogTables()
for cat in tables:
## pg_depend does not care about duplicates at the moment
if str(cat) == 'pg_depend':
continue
checkTableDuplicateEntry(cat)
# -------------------------------------------------------------------------------
def checkTableDuplicateEntry(cat):
catname = cat.getTableName()
pkey = cat.getPrimaryKey()
coordinator = cat.isCoordinatorOnly()
isShared = cat.isShared()
columns = cat.getTableColumns(with_acl=False)
coltypes = cat.getTableColtypes()
# Skip coordinator only tables
if coordinator:
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# Skip catalogs without primary key
if pkey == []:
logger.warning("[WARN] Skipped duplicate check for %s" %
catname)
return
pkey = [c + autoCast.get(coltypes[c], '') for c in pkey]
if cat.tableHasConsistentOids():
qry = duplicateEntryQuery(catname, ['oid'])
else:
qry = duplicateEntryQuery(catname, pkey)
# Execute the query
try:
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Checking for duplicate entries for ' + catname)
else:
GV.checkStatus = False
setError(ERROR_NOREPAIR)
logger.error('[FAIL] Checking for duplicate entries for ' + catname)
logger.error(' %s has %d issue(s)' % (catname, nrows))
fields = curs.listfields()
gplog.log_literal(logger, logging.ERROR, " " + " | ".join(fields))
results = curs.getresult()
for row in results:
gplog.log_literal(logger, logging.ERROR, " " + " | ".join(map(str, row)))
processMissingDuplicateEntryResult(catname, fields, results, "duplicate")
if catname == 'pg_type':
generateVerifyFile(catname, fields, results, 'duplicate')
except Exception as e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing: duplicate entries check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
# -------------------------------------------------------------------------------
def duplicateEntryQuery(catname, pkey):
# -- ===========
# -- Duplicate
# -- ===========
# -- Return any rows having count > 1 for a given segid, {unique_key} pair
# --
qry = """
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- distribute catalog table from coordinator, so that we can avoid to gather
CREATE TEMPORARY TABLE _tmp_coordinator ON COMMIT DROP AS
SELECT gp_segment_id AS segid, {pkey} FROM {catalog} DISTRIBUTED BY (segid);
SELECT {pkey}, total, array_agg(segid order by segid) as segids
FROM (
SELECT segid, {pkey}, count(*) as total
FROM (
select segid, {pkey} FROM _tmp_coordinator
union all
select gp_segment_id as segid, {pkey} FROM gp_dist_random('{catalog}')
) all_segments
GROUP BY segid, {pkey}
HAVING count(*) > 1
) rowresult
GROUP BY {pkey}, total
""".format(catalog=catname,
pkey=','.join(pkey))
return qry
# -------------------------------------------------------------------------------
def checkUniqueIndexViolation():
logger.info('-----------------------------------')
logger.info('Performing check: checking for violated unique indexes')
db_connection = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
violations = UniqueIndexViolationCheck().runCheck(db_connection)
checkname = 'unique index violation(s)'
if violations:
logger.info('[FAIL] %s' % checkname)
GV.checkStatus = False
setError(ERROR_NOREPAIR)
log_unique_index_violations(violations)
else:
logger.info('[OK] %s' % checkname)
def log_unique_index_violations(violations):
log_output = ['\n segment_id | index_name | table_name | column_names']
log_output.append(' ---------------------------------------------------')
for violation in violations:
error_object = getGPObject(violation['table_oid'], violation['table_name'])
issue = CatUniqueIndexViolationIssue(violation['table_name'],
violation['index_name'])
error_object.addDuplicateIssue(issue)
for seg_id in violation['violated_segments']:
log_output.append(' %s | %s | %s | %s' % (
seg_id,
violation['index_name'],
violation['table_name'],
violation['column_names'],
))
logger.error('\n'.join(log_output))
# -------------------------------------------------------------------------------
def checkOrphanedToastTables():
logger.info('-----------------------------------')
logger.info('Performing check: checking for orphaned TOAST tables')
db_connection = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
checker = OrphanedToastTablesCheck()
check_passed = checker.runCheck(db_connection)
checkname = 'orphaned toast table(s)'
if check_passed:
logger.info('[OK] %s' % checkname)
else:
logger.info('[FAIL] %s' % checkname)
GV.checkStatus = False
setError(ERROR_REMOVE)
# log raw orphan toast table query results only to the log file
log_output = ['Orphan toast tables detected for the following scenarios:']
for issue_type in checker.issue_types():
log_output.append('\n' + issue_type.header)
log_output.append('content_id | toast_table_oid | toast_table_name | expected_table_oid | expected_table_name | dependent_table_oid | dependent_table_name')
log_output.append('-----------+-----------------+------------------+--------------------+---------------------+---------------------+---------------------')
for row in checker.rows_of_type(issue_type):
log_output.append('{} | {} | {} | {} | {} | {} | {}'.format(
row['content_id'],
row['toast_table_oid'],
row['toast_table_name'],
row['expected_table_oid'],
row['expected_table_name'],
row['dependent_table_oid'],
row['dependent_table_name']))
log_output = '\n'.join(log_output)
log_output = log_output + '\n'
logger.error(log_output)
# log fix instructions for the orphaned toast tables to stdout and log file
gplog.log_literal(logger, logging.CRITICAL, checker.get_fix_text())
# log per-orphan table issue and cause to stdout and log file
for issue, segments in checker.iterate_issues():
cat_issue_obj = CatOrphanToastTableIssue(issue.table.oid,
issue.table.catname,
issue,
segments)
error_object = getGPObject(issue.table.oid, issue.table.catname)
error_object.addOrphanToastTableIssue(cat_issue_obj)
do_repair_for_segments(segments_to_repair_statements=checker.add_repair_statements(GV.cfg),
issue_type="orphaned_toast_tables",
description='Repairing orphaned TOAST tables')
############################################################################
# Help populating repair part for all checked types
# All functions dependent on results stored in global variable GV
############################################################################
name_repair = {
"segment":
{
"description": "Check if needs repairing segment",
"fn": lambda maybeRemove, catmod_guc, seg: checkSegmentRepair(maybeRemove, catmod_guc, seg)
},
}
############################################################################
# version = X.X : run this check as part of running all checks on gpdb <= X.X
# individually, any check should be ok run on any version
# online = True: okie to run gpcheckcat online
# order = X : the order check should be run when running all checks
############################################################################
all_checks = {
"unique_index_violation":
{
"description": "Check for violated unique indexes",
"fn": lambda: checkUniqueIndexViolation(),
"version": "main",
"order": 1,
"online": True
},
"duplicate":
{
"description": "Check for duplicate entries",
"fn": lambda: checkDuplicateEntry(),
"version": 'main',
"order": 2,
"online": True
},
"missing_extraneous":
{
"description": "Cross consistency check for missing or extraneous entries",
"fn": lambda: checkMissingEntry(),
"version": 'main',
"order": 3,
"online": True
},
"inconsistent":
{
"description": "Cross consistency check for coordinator segment inconsistency",
"fn": lambda: checkInconsistentEntry(),
"version": 'main',
"order": 4,
"online": True
},
"foreign_key":
{
"description": "Check foreign keys",
"fn": lambda: checkForeignKey(),
"version": 'main',
"order": 5,
"online": True
},
"acl":
{
"description": "Cross consistency check for access control privileges",
"fn": lambda: checkACL(),
"version": 'main',
"order": 6,
"online": True,
"defaultSkip": True
},
"pgclass":
{
"description": "Check pg_class entry that does not have any correspond pg_attribute entry",
"fn": lambda: checkPGClass(),
"version": 'main',
"order": 7,
"online": False
},
"namespace":
{
"description": "Check for schemas with a missing schema definition",
"fn": lambda: checkPGNamespace(),
"version": 'main',
"order": 8,
"online": False
},
"distribution_policy":
{
"description": "Check constraints on randomly distributed tables",
"fn": lambda: checkDistribPolicy(),
"version": 'main',
"order": 9,
"online": False
},
"dependency":
{
"description": "Check for dependency on non-existent objects",
"fn": lambda: checkDepend(),
"version": 'main',
"order": 10,
"online": False
},
"owner":
{
"description": "Check table ownership that is inconsistent with the coordinator database",
"fn": lambda: checkOwners(),
"version": 'main',
"order": 11,
"online": True,
"defaultSkip": True
},
"part_integrity":
{
"description": "Check pg_partition branch integrity, partition with oids, partition distribution policy",
"fn": lambda: checkPartitionIntegrity(),
"version": 'main',
"order": 12,
"online": True
},
"orphaned_toast_tables":
{
"description": "Check pg_class and pg_depend for orphaned TOAST tables",
"fn": checkOrphanedToastTables,
"version": 'main',
"order": 13,
"online": True
},
"aoseg_table":
{
"description": "Check that vpinfo in aoseg table is consistent with pg_attribute",
"fn": lambda: checkAOSegVpinfo(),
"version": 'main',
"order": 15,
"online": False
}
}
#-------------------------------------------------------------------------------
def listAllChecks():
myprint('\nList of gpcheckcat tests:\n')
for name in sorted(all_checks, key=lambda x: all_checks[x]["order"]):
myprint(" %24s: %s" % \
(name, all_checks[name]["description"]))
myprint('')
#-------------------------------------------------------------------------------
def runCheckCatname(catalog_table_obj):
checks = {'missing_extraneous': lambda: checkTableMissingEntry(catalog_table_obj),
'inconsistent': lambda: checkTableInconsistentEntry(catalog_table_obj),
# We assume that everything is passed as a list to checkForeignKey
'foreign_key': lambda: checkForeignKey([catalog_table_obj]),
'duplicate': lambda: checkTableDuplicateEntry(catalog_table_obj),
'acl': lambda: checkTableACL(catalog_table_obj)}
for check in sorted(checks):
myprint("Performing test '%s' for %s" % (check, catalog_table_obj.getTableName()))
stime = time.time()
checks[check]()
etime = time.time()
elapsed = etime - stime
GV.elapsedTime += elapsed
elapsed = str(datetime.timedelta(seconds=elapsed))[:-4]
GV.totalCheckRun += 1
myprint("Total runtime for test '%s': %s" % (check, elapsed))
#-------------------------------------------------------------------------------
def runOneCheck(name):
if GV.opt['-O'] and not all_checks[name]["online"]:
logger.info("%s: Skip this test in online mode" % (name))
return
else:
myprint("Performing test '%s'" % name)
GV.totalCheckRun += 1
GV.checkStatus = True
stime = time.time()
all_checks[name]["fn"]()
etime = time.time()
elapsed = etime - stime
GV.elapsedTime += elapsed
elapsed = str(datetime.timedelta(seconds=elapsed))[:-4]
myprint("Total runtime for test '%s': %s" % (name, elapsed))
if GV.checkStatus == False:
GV.failedChecks.append(name)
#-------------------------------------------------------------------------------
def runAllChecks(run_tests):
'''
perform catalog check for specified database
'''
for name in sorted(all_checks, key=lambda x: all_checks[x]["order"]):
if all_checks[name]["version"] >= GV.version and name in run_tests:
if (not GV.opt['-R'] and "defaultSkip" in all_checks[name] and all_checks[name]["defaultSkip"]):
logger.debug("Default skipping test: "+name)
else:
runOneCheck(name)
closeDbs()
logger.info("------------------------------------")
fixes = (len(GV.Remove) > 0 or
len(GV.AdjustConname) > 0 or
len(GV.DemoteConstraint) > 0 or
len(GV.ReSync) > 0)
if GV.opt['-g'] != None and fixes:
repair_obj = Repair(context=GV)
repair_dir_path = repair_obj.create_repair_dir()
logger.debug('Building catalog repair scripts')
# build a script to run these files
script = ''
catmod_guc = "-c allow_system_table_mods=true"
# don't remove anything if ReSync possible --
# comment out the delete statements from the repair scripts
maybeRemove = ""
if len(GV.ReSync):
maybeRemove = "# "
# use the per-dbid loop for removal and for constraint
# name adjustments
dbids = set(GV.Remove.keys())
dbids = dbids.union(GV.AdjustConname.keys())
dbids = dbids.union(GV.DemoteConstraint.keys())
for seg in dbids:
script += '\n%secho "Repairing segment %i"\n' % (maybeRemove, seg)
segment_repair_script = name_repair["segment"]["fn"](maybeRemove, catmod_guc, seg)
script += segment_repair_script
repair_obj.append_content_to_bash_script(repair_dir_path, script)
print_repair_issues(repair_dir_path)
if GV.retcode >= ERROR_NOREPAIR:
logger.warning('[WARN]: unable to generate repairs for some issues')
logger.info("Check complete")
def do_repair(sql_repair_contents, issue_type, description):
logger.info("Starting repair of %s with %d issues" % (description, len(sql_repair_contents)))
if len(sql_repair_contents) == 0:
return
repair_dir_path = ""
try:
repair_obj = Repair(context=GV, issue_type=issue_type, desc=description)
repair_dir_path = repair_obj.create_repair(sql_repair_contents=sql_repair_contents)
except Exception as ex:
logger.fatal(str(ex))
print_repair_issues(repair_dir_path)
def do_repair_for_segments(segments_to_repair_statements, issue_type, description):
logger.info("Starting repair of %s" % description)
repair_dir_path = ""
try:
repair_obj = Repair(context=GV, issue_type=issue_type, desc=description)
repair_dir_path = repair_obj.create_segment_repair_scripts(segments_to_repair_statements)
except Exception as ex:
logger.fatal(str(ex))
print_repair_issues(repair_dir_path)
def do_repair_for_extra(catalog_issues):
setError(ERROR_REMOVE)
repair_dir_path = ""
for (catalog_table_obj, pk_name), issues in catalog_issues.items():
try:
repair_obj = Repair(context=GV, issue_type="extra", desc="Removing extra entries")
repair_dir_path = repair_obj.create_repair_for_extra_missing(catalog_table_obj=catalog_table_obj,
issues=issues,
pk_name=pk_name,
segments=GV.cfg)
except Exception as ex:
logger.fatal(str(ex))
print_repair_issues(repair_dir_path)
def print_repair_issues(repair_dir_path):
myprint('')
myprint("catalog issue(s) found , repair script(s) generated in dir {0}".format(repair_dir_path))
myprint('')
def checkReindexRepair():
if len(GV.Reindex) > 0:
# dbname.type.timestamp.sql
filename = '%s.%s.%s.sql' % (GV.dbname, "reindex", GV.timestamp)
fullpath = '%s/%s' % (GV.opt['-g'], filename)
try:
file = open(fullpath, 'w')
except Exception as e:
logger.fatal('Unable to create file "%s": %s' % (fullpath, str(e)))
sys.exit(1)
description = '\necho "Reindexing potentially damaged bitmap indexes"\n'
for r in GV.Reindex:
file.write('REINDEX INDEX %s;\n' % r)
file.close()
return description, filename
else:
return None, None
def checkConstraintsRepair():
do_repair(GV.Constraints, issue_type="removeconstraints", description='Dropping invalid unique constraints')
def checkOwnersRepair():
# Previously we removed duplicate statements from the "Owners" list
# by casting it through a "list(set(...))", however this does not work
# since the order of items in the list is important for correct results.
do_repair(GV.Owners, issue_type="fixowner", description='Correcting table ownership')
def checkPoliciesRepair():
# changes to distribution policies
do_repair(GV.Policies, issue_type="fixdistribution", description='Fixing distribution policies')
def checkSegmentRepair(maybeRemove, catmod_guc, seg):
# dbid.host.port.dbname.timestamp.sql
c = GV.cfg[seg]
filename = '%i.%s.%i.%s.%s.sql' % (seg, c['hostname'], c['port'], GV.dbname, GV.timestamp)
filename = filename.replace(' ', '_')
fullpath = '%s/%s' % (GV.opt['-g'], filename)
try:
file = open(fullpath, 'w')
except Exception as e:
logger.fatal('Unable to create file "%s": %s' % (fullpath, str(e)))
sys.exit(1)
# unique
lines = set()
if seg in GV.Remove:#foreign_key check
lines = lines.union(GV.Remove[seg])
if seg in GV.AdjustConname:#part_constraint
lines = lines.union(GV.AdjustConname[seg])
if seg in GV.DemoteConstraint:
lines = lines.union(GV.DemoteConstraint[seg])
# make sure it is sorted
li = list(lines)
li.sort()
file.write('BEGIN;\n')
for line in li:
file.write(line + '\n')
file.write('COMMIT;\n')
file.close()
run_psql_script = '''{maybe}env PGOPTIONS="-c gp_role=utility {guc}" psql -X -a -h {hostname} -p {port} -f {fname} "{dbname}" > {fname}.out'''.format(
maybe=maybeRemove, guc=catmod_guc,
hostname=c['hostname'],
port=c['port'],
fname=filename,
dbname=GV.dbname)
return run_psql_script
# -------------------------------------------------------------------------------
def getCatalog():
# Establish a connection to the coordinator & looks up info in the catalog
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
return GPCatalog(db)
# -------------------------------------------------------------------------------
def getReportConfiguration():
cfg = {}
for _, each in GV.cfg.items():
cfg[each['content']] = {'segname': "content " + str(each['content']),
'hostname': each['hostname'],
'port': each['port']}
return cfg
# ===============================================================================
# GPCHECKCAT REPORT
# ===============================================================================
GPObjects = {} # key = (catname, oid), value = GPObject
GPObjectGraph = {} # key = GPObject, value=[GPObject]
# -------------------------------------------------------------------------------
# TableMainColumn[catname] = [colname, catname2]
# - report this catname's issues as part of catname2
# - catname.colname references catname2.oid
# -------------------------------------------------------------------------------
TableMainColumn = {}
# Table with no OID
TableMainColumn['gp_distribution_policy'] = ['localoid', 'pg_class']
TableMainColumn['gp_version_at_initdb'] = ['schemaversion', 'gp_version_at_initdb']
TableMainColumn['pg_aggregate'] = ['aggfnoid', 'pg_proc']
TableMainColumn['pg_appendonly'] = ['relid', 'pg_class']
TableMainColumn['pg_attribute'] = ['attrelid', 'pg_class']
TableMainColumn['pg_attribute_encoding'] = ['attrelid', 'pg_class']
TableMainColumn['pg_auth_members'] = ['roleid', 'pg_authid']
TableMainColumn['pg_autovacuum'] = ['vacrelid', 'pg_class']
TableMainColumn['pg_compression'] = ['compname', 'pg_compression']
TableMainColumn['pg_depend'] = ['objid', 'pg_class']
TableMainColumn['pg_foreign_table'] = ['ftrelid', 'pg_class']
TableMainColumn['pg_index'] = ['indexrelid', 'pg_class']
TableMainColumn['pg_inherits'] = ['inhrelid', 'pg_class']
TableMainColumn['pg_largeobject'] = ['loid', 'pg_largeobject']
TableMainColumn['pg_pltemplate'] = ['tmplname', 'pg_pltemplate']
TableMainColumn['pg_proc_callback'] = ['profnoid', 'pg_proc']
TableMainColumn['pg_statistic_ext_data'] = ['stxoid', 'pg_statistic_ext']
TableMainColumn['pg_type_encoding'] = ['typid', 'pg_type']
TableMainColumn['pg_window'] = ['winfnoid', 'pg_proc']
TableMainColumn['pg_password_history'] = ['passhistroleid', 'pg_authid']
TableMainColumn['pg_description'] = ['objoid', 'pg_description']
# Table with OID (special case), these OIDs are known to be inconsistent
TableMainColumn['pg_attrdef'] = ['adrelid', 'pg_class']
TableMainColumn['pg_constraint'] = ['conrelid', 'pg_class']
TableMainColumn['pg_trigger'] = ['tgrelid', 'pg_class']
TableMainColumn['pg_rewrite'] = ['ev_class', 'pg_class']
# Cast OID alias type to OID since our graph of objects is based on OID
# int2vector is also casted to int2[] due to the lack of an <(int2vector, int2vector) operator
autoCast = {
'regproc': '::oid',
'regprocedure': '::oid',
'regoper': '::oid',
'regoperator': '::oid',
'regclass': '::oid',
'regtype': '::oid',
'int2vector': '::int2[]'
}
# -------------------------------------------------------------------------------
class QueryException(Exception): pass
# -------------------------------------------------------------------------------
# Get gpObj from GPObjects, instantiate a new one & add to GPObjects if not found
def getGPObject(oid, catname):
gpObj = GPObjects.get((oid, catname), None)
if gpObj is None:
if catname == 'pg_class':
gpObj = RelationObject(oid, catname)
else:
gpObj = GPObject(oid, catname)
GPObjects[(oid, catname)] = gpObj
return gpObj
# -------------------------------------------------------------------------------
def getOidFromPK(catname, pkeys):
# pkeys: name-value pair
pkeystrList = ["%s='%s'" % (key, value) for key, value in pkeys.items()]
pkeystr = ' and '.join(pkeystrList)
qry = """
SELECT oid
FROM (
SELECT oid FROM {catname}
WHERE {pkeystr}
UNION ALL
SELECT oid FROM gp_dist_random('{catname}')
WHERE {pkeystr}
) alloids
GROUP BY oid
ORDER BY count(*) desc, oid
""".format(catname=catname,
pkeystr=pkeystr)
try:
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
curs = db.query(qry)
if (len(curs.dictresult()) == 0):
raise QueryException("No such entry '%s' in %s" % (pkeystr, catname))
return curs.dictresult().pop()['oid']
except Exception as e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
myprint(qry)
# -------------------------------------------------------------------------------
def getClassOidForRelfilenode(relfilenode):
qry = "SELECT oid FROM pg_class WHERE relfilenode = %d;" % (relfilenode)
try:
dburl = dbconn.DbURL(hostname=GV.opt['-h'], port=GV.opt['-p'], dbname=GV.dbname)
conn = dbconn.connect(dburl)
oid = dbconn.queryRow(conn, qry)[0]
return oid
except Exception as e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
# -------------------------------------------------------------------------------
def getResourceTypeOid(oid):
qry = """
SELECT oid
FROM (
SELECT oid FROM pg_resourcetype WHERE restypid = %d
UNION ALL
SELECT oid FROM gp_dist_random('pg_resourcetype')
WHERE restypid = %d
) alloids
GROUP BY oid ORDER BY count(*) desc LIMIT 1
""" % (oid, oid)
try:
db = connect()
curs = db.query(qry)
if len(curs.dictresult()) == 0: return 0
return curs.dictresult().pop()['oid']
except Exception as e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
def filterSpuriousFailures(catname, colnames, results):
if catname == 'pg_depend':
# Catalog tables having content only on coordinator
# (e.g. pg_statistic*) should be excluded from pg_depend
# comparisons between coordinator and segments.
legitFailures = []
coordinatorOnlyOids = list(map(lambda cat: cat._oid,
filter(lambda cat: cat.isCoordinatorOnly(),
GV.catalog.getCatalogTables())))
for row in results:
legit = True
if row[colnames.index('classid')] in coordinatorOnlyOids:
logger.info('excluding coordinator-only entry from missing_pg_depend: %s' % str(row))
legit = False
if legit:
legitFailures.append(row)
else:
legitFailures = results
return legitFailures
#-------------------------------------------------------------------------------
# Process results of checks (CC and FK) for a particular catalog:
# - Categorize entry into GPObject
# - Instantiate GPObject if needed
# -------------------------------------------------------------------------------
def processMissingDuplicateEntryResult(catname, colname, allValues, type):
# type = {"missing" | "duplicate"}
'''
colname: proname | pronamespace | proargtypes | segids
allValues: add | 2200 | 23 23 | {2,3}
scube | 2200 | 1700 | {-1,0,1,3}
scube_accum | 2200 | 1700 1700 | {-1,0,1,3}
colname: oid | total | segids
allValues: 18853 | 2 | {-1,1}
18853 | 3 | {0}
'''
gpObjName = catname
gpColName = None
pknames = [i for i in colname[:-1]] # Everything except the last column
# This catname may not be a GPObject (e.g. pg_attribute)
# If these catnames have oids, they are known to be inconsistent
if catname in TableMainColumn:
gpColName = TableMainColumn[catname][0]
gpObjName = TableMainColumn[catname][1]
elif 'oid' in pknames:
gpColName = 'oid'
# Process entries
for row in allValues:
rowObjName = gpObjName
# pygresql 4 to 5 update means that this segid is now a list of int (i.e. [-1, 0, 1]) instead of a string like "{-1,0,1}"
segids = "{" + ",".join(str(i) for i in row[-1]) + "}"
pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames)
if gpColName != None:
oid = row[colname.index(gpColName)]
else:
oid = getOidFromPK(catname, pkeys)
pkeys = {'oid': oid}
# Special case: constraint for domain, report pg_type.contypid
if catname == 'pg_constraint' and oid == 0:
oid = row[colname.index('contypid')]
rowObjName = 'pg_type'
gpObj = getGPObject(oid, rowObjName)
if type == "missing":
# In how many segments (counting coordinator as segment -1) was the
# entry present?
#
# If it was present in half or more, then report the ones that
# it was *not* present as 'missing'. If it was present in half
# or less, then report the ones that it was present as 'extra'
# Note that if it was present in exactly half of the segments,
# we will report it as both missing and extra.
ids = [int(i) for i in segids[1:-1].split(',')]
if len(ids) <= (GV.max_content + 2) / 2:
issue = CatMissingIssue(catname, pkeys, segids, 'extra')
gpObj.addMissingIssue(issue)
if len(ids) >= (GV.max_content + 2) / 2:
allids = [int(i) for i in range(-1, GV.max_content+1)]
diffids = set(allids) - set(ids)
# convert the difference set back into a string that looks
# like a PostgreSQL array.
segids = ",".join(str(x) for x in diffids)
segids = "{%s}" % (segids)
issue = CatMissingIssue(catname, pkeys, segids, 'missing')
gpObj.addMissingIssue(issue)
else:
assert (type == "duplicate")
issue = CatDuplicateIssue(catname, pkeys, segids, row[-2])
gpObj.addDuplicateIssue(issue)
# -------------------------------------------------------------------------------
def processInconsistentEntryResult(catname, pknames, colname, allValues):
# If tableHasInconsistentOid, columns does not have oid
'''
17365 | test10 | 2200 | 17366 | 0 | 0 | 17366 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [0] -- row1
17365 | test10 | 2200 | 17366 | 0 | 0 | 0 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [None] -- row2
176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 4 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [-1]
176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [0,1]
176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | [2,3]
'''
# Group allValues rows by its key (oid or primary key) into a dictionary:
groupedValues = {}
for row in allValues:
if 'oid' in colname:
keys = row[colname.index('oid')]
else:
keys = tuple((row[colname.index(n)]) for n in pknames)
if keys in groupedValues:
groupedValues[keys].append(row)
else:
groupedValues[keys] = [row]
gpObjName = catname
gpColName = None
# This catname may not be a GPObject (e.g. pg_attribute)
# If these catnames have oids, they are known to be inconsistent
if catname in TableMainColumn:
gpColName = TableMainColumn[catname][0]
gpObjName = TableMainColumn[catname][1]
elif 'oid' in colname:
gpColName = 'oid'
for keys, values in groupedValues.items():
rowObjName = gpObjName
pkeys = dict((n, values[0][colname.index(n)]) for n in pknames)
if gpColName != None:
oid = values[0][colname.index(gpColName)]
else:
oid = getOidFromPK(catname, pkeys)
issue = CatInconsistentIssue(catname, colname, list(values))
# Special case: constraint for domain, report pg_type.contypid
if catname == 'pg_constraint' and oid == 0:
oid = values[0][colname.index('contypid')]
rowObjName = 'pg_type'
gpObj = getGPObject(oid, rowObjName)
gpObj.addInconsistentIssue(issue)
# -------------------------------------------------------------------------------
def processForeignKeyResult(catname, pkcatname, colname, allValues):
'''
colname: pg_attribute_attrelid | pg_attribute_attname | missing_catalog | present_key | pg_class_oid | segids
allValues: None | None | pg_attribute | oid | 190683 | {-1}
or
allValues: 12344 | foo | pg_class | attrelid | 190683 | {-1}
'''
gpObjName = pkcatname
gpColName = colname[-2].rsplit('_', 1)[1]
fkeytab = catname
# The foreign keys are all entries except the last 4
# NOTE: We can't assume the first few rows are the expected issues anymore...
fkeylist = [name.rsplit(catname+'_', 1)[1] for name in colname[:-4]]
# This catname may not be a GPObject (e.g. pg_attribute)
# 1. pg_attrdef(adrelid) referencing pg_attribute(attrelid)
# 2. pg_rewrite(ev_class) referencing pg_attribute(attrelid)
if pkcatname in TableMainColumn:
if gpColName == TableMainColumn[catname][0]:
gpObjName = TableMainColumn[catname][1]
# Process each row
for row in allValues:
rowObjName = gpObjName
segids = row[-1]
oid = row[len(colname) - 2]
missing_cat = row[-4]
# Build fkeys dict: used to find oid of pg_class
fkeys = dict((n, row[colname.index(catname + '_' + n)]) for n in fkeylist)
# Must get third column from end since fkeylist length can vary
if not fkeys:
fkeys = row[-3]
# We are flipping the values of the primary key and the foreign key
# depending on the missing catalog, since we want to print out the
# missing catalog at the front of the stderr
existing_cat = pkcatname
if missing_cat == pkcatname:
existing_cat = fkeytab
missing_pkeys={gpColName: oid}
existing_fkeys=fkeys
if catname == missing_cat:
missing_pkeys=fkeys
existing_fkeys={gpColName: oid}
issue = CatForeignKeyIssue(catname=missing_cat, pkeys=missing_pkeys,
segids=segids, fcatname=existing_cat, fkeys=existing_fkeys)
# Special cases:
# 1. pg_class(oid) referencing pg_type(oid) - relation & composite type
# 2. pg_resqueuecapability(restypid) referencing pg_resourcetype(restypid)
if pkcatname == 'pg_type' and fkeytab == 'pg_class':
rowObjName = 'pg_class'
oid = getOidFromPK(rowObjName, fkeys)
elif pkcatname == 'pg_resourcetype' and gpColName == 'restypid':
oid = getResourceTypeOid(oid)
gpObj = getGPObject(oid, rowObjName)
gpObj.addForeignKeyIssue(issue)
def getPrimaryKeyColumn(catname, pknames):
# We have to look up key to report (pg_pltemplate)
gpObjName = catname
gpColName = None
if catname in TableMainColumn:
gpColName = TableMainColumn[catname][0]
gpObjName = TableMainColumn[catname][1]
elif 'oid' in pknames:
gpColName = 'oid'
return gpObjName, gpColName
# -------------------------------------------------------------------------------
def processACLResult(catname, colname, allValues):
'''
colname: segid | datname | coordinator_acl | segment_acl
allValues: 2 | acldb | {=Tc/nmiller,nmiller=CTc/nmiller,person1=C/nmiller} | {=Tc/nmiller,nmiller=CTc/nmiller}
1 | gptest | None | {=Tc/nmiller,nmiller=CTc/nmiller,person2=CTc/nmiller}
'''
pknames = [i for i in colname[1:-2]]
gpObjName, gpColName = getPrimaryKeyColumn(catname, pknames)
# Process entries
for row in allValues:
segid = row[0]
pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames)
if gpColName != None:
oid = row[colname.index(gpColName)]
else:
oid = getOidFromPK(catname, pkeys)
pkeys = {'oid': oid}
macl = row[-2] if row[-2] != None else []
sacl = row[-1] if row[-1] != None else []
macl_only = list(set(macl).difference(sacl))
sacl_only = list(set(sacl).difference(macl))
issue = CatACLIssue(catname, pkeys, segid, macl_only, sacl_only)
gpObj = getGPObject(oid, gpObjName)
gpObj.addACLIssue(issue)
# -------------------------------------------------------------------------------
class CatInconsistentIssue:
def __init__(self, catname, columns, rows):
self.catname = catname
self.columns = columns
self.rows = rows
def report(self):
def format_segids(segids):
if segids == [None]:
idstr = 'all other segments'
else:
ids = segids
idstr = ''
for i in ids:
idstr += '%s (%s:%d) ' % \
(GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
GV.report_cfg[i]['port'])
return idstr
for i in range(0, len(self.columns) - 1):
colset = set(str(j[i]) for j in self.rows)
if len(colset) > 1:
for row in self.rows:
myprint("%20s is '%s' on %s" % (self.columns[i], row[i], format_segids(row[-1])))
# -------------------------------------------------------------------------------
class CatForeignKeyIssue:
def __init__(self, catname, pkeys, segids, fcatname, fkeys):
self.catname = catname
self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx
self.segids = segids # list of affected segids
self.fcatname = fcatname # checked object catname
self.fkeys = fkeys # string of fkeys=xxxx
def report(self):
ids = self.segids
idstr = ''
if len(ids) == len(GV.report_cfg):
idstr = 'all segments'
else:
for i in ids:
idstr += '%s (%s:%d) ' % \
(GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
GV.report_cfg[i]['port'])
myprint(" No %s %s entry for %s %s on %s" \
% (self.catname, self.pkeys, self.fcatname, self.fkeys, idstr))
# -------------------------------------------------------------------------------
class CatMissingIssue:
def __init__(self, catname, pkeys, segids, type):
self.catname = catname
self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx
self.segids = segids # list of affected segids
self.type = type # missing or extra
assert (self.type in ['missing', 'extra'])
def report(self):
ids = [int(i) for i in self.segids[1:-1].split(',')]
idstr = ''
if len(ids) == len(GV.report_cfg):
idstr = 'all segments'
else:
for i in ids:
idstr += '%s (%s:%d) ' % \
(GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
GV.report_cfg[i]['port'])
if self.catname == 'pg_attribute':
myprint(" %s column '%s' on %s" \
% (self.type.capitalize(), self.pkeys['attname'], idstr))
elif self.catname == 'pg_class':
myprint(' %s relation metadata for %s on %s' \
% (self.type.capitalize(), str(self.pkeys), idstr))
else:
myprint(' %s %s metadata of %s on %s' \
% (self.type.capitalize(), self.catname[3:], str(self.pkeys), idstr))
# -------------------------------------------------------------------------------
class CatDuplicateIssue:
def __init__(self, catname, pkeys, segids, enum):
self.catname = catname
self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx
self.segids = segids # list of affected segids
self.enum = enum # number of entries
def report(self):
""" Found # catname entries of "oid=XXXXX" on segment {Y,Z} """
myprint(' Found %d %s entries of %s on segment %s' \
% (self.enum, self.catname, str(self.pkeys), str(self.segids)))
# -------------------------------------------------------------------------------
class CatACLIssue:
def __init__(self, catname, pkeys, segid, macl_only, sacl_only):
self.catname = catname
self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx
self.segid = segid # the affected segid
self.macl_only = macl_only # acl appears only on coordinator
self.sacl_only = sacl_only # acl appears only on segment
def report(self):
""" Coordinator (host:port) and seg# (host:port) have different ACL:
Exist(s) on coordinator only: [...........]
Exist(s) on seg# only: [...........]
"""
mstr = 'Coordinator (%s:%s)' % (GV.report_cfg[-1]['hostname'], GV.report_cfg[-1]['port'])
sstr = '%s (%s:%s)' % (GV.report_cfg[self.segid]['segname'],
GV.report_cfg[self.segid]['hostname'],
GV.report_cfg[self.segid]['port'])
myprint(' %s and %s have different ACLs:' % (mstr, sstr))
if len(self.macl_only) > 0:
myprint(' Exist(s) on coordinator only: %s' % (self.macl_only))
if len(self.sacl_only) > 0:
myprint(' Exist(s) on %s only: %s ' % \
(GV.report_cfg[self.segid]['segname'], self.sacl_only))
# -------------------------------------------------------------------------------
class CatUniqueIndexViolationIssue:
def __init__(self, table_name, index_name):
self.catname = table_name
self.index_name = index_name
def report(self):
myprint(
' Table %s has a violated unique index: %s'
% (self.catname, self.index_name)
)
class CatDependencyIssue:
def __init__(self, table_name, oid, content):
self.catname = table_name
self.oid = oid
self.content = content
def report(self):
myprint(
' Table %s has a dependency issue on oid %s at content %s'
% (self.catname, self.oid, self.content)
)
# -------------------------------------------------------------------------------
class CatOrphanToastTableIssue:
def __init__(self, oid, catname, issue, segments):
self.oid = oid
self.catname = catname
self.issue = issue
self.segments = segments
def report(self):
myprint('%s' % self.issue.description)
myprint('''On segment(s) %s table '%s' (oid: %s) %s''' % (', '.join(map(str, sorted(self.segments))), self.catname, self.oid, self.issue.cause))
# -------------------------------------------------------------------------------
class GPObject:
def __init__(self, oid, catname):
self.oid = oid
self.catname = catname
self.missingIssues = {} # key=issue.catname, value=list of catMissingIssue
self.inconsistentIssues = {} # key=issue.catname, value=list of catInconsistentIssue
self.duplicateIssues = {} # key=issue.catname, value=list of catDuplicateIssue
self.aclIssues = {} # key=issue.catname, value=list of catACLIssue
self.foreignkeyIssues = {} # key=issue.catname, value=list of catForeignKeyIssue
self.dependencyIssues = {} # key=issue.catname, value=list of catDependencyIssue
self.orphanToastTableIssues = {} # key=issue.catname, value=list of orphanToastTableIssues
def addDependencyIssue(self, issue):
if issue.catname in self.dependencyIssues:
self.dependencyIssues[issue.catname].append(issue)
else:
self.dependencyIssues[issue.catname] = [issue]
def addMissingIssue(self, issue):
if issue.catname in self.missingIssues:
self.missingIssues[issue.catname].append(issue)
else:
self.missingIssues[issue.catname] = [issue]
def addInconsistentIssue(self, issue):
if issue.catname in self.inconsistentIssues:
self.inconsistentIssues[issue.catname].append(issue)
else:
self.inconsistentIssues[issue.catname] = [issue]
def addDuplicateIssue(self, issue):
if issue.catname in self.duplicateIssues:
self.duplicateIssues[issue.catname].append(issue)
else:
self.duplicateIssues[issue.catname] = [issue]
def addACLIssue(self, issue):
if issue.catname in self.aclIssues:
self.aclIssues[issue.catname].append(issue)
else:
self.aclIssues[issue.catname] = [issue]
def addForeignKeyIssue(self, issue):
if issue.catname in self.foreignkeyIssues:
self.foreignkeyIssues[issue.catname].append(issue)
else:
self.foreignkeyIssues[issue.catname] = [issue]
def addOrphanToastTableIssue(self, issue):
if issue.catname in self.orphanToastTableIssues:
self.orphanToastTableIssues[issue.catname].append(issue)
else:
self.orphanToastTableIssues[issue.catname] = [issue]
def isTopLevel(self):
return True
def reportAllIssues(self):
"""
We want to prevent oids from certain catalog tables to appear on
stdout. Hence we create a separate function for myprint which
will log all the oid inconsistencies as WARNING instead of
CRITICAL in order to log it in the log file.
"""
def __myprint(string):
if self.catname in ['pg_constraint']:
_myprint(string, logging.WARNING)
else:
_myprint(string)
global myprint
myprint = __myprint
if self.__class__ == GPObject:
myprint('')
myprint('----------------------------------------------------')
myprint('Object oid: %s' % (self.oid))
myprint('Table name: %s' % (self.catname))
myprint('')
# Report dependency issues
if len(self.dependencyIssues):
for catname, issues in self.dependencyIssues.items():
myprint(' Name of test which found this issue: dependency_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report inconsistent issues
if len(self.inconsistentIssues):
for catname, issues in self.inconsistentIssues.items():
myprint(' Name of test which found this issue: inconsistent_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report missing issues
if len(self.missingIssues):
omitlist = ['pg_attribute', 'pg_attribute_encoding', 'pg_type', 'pg_appendonly', 'pg_index', 'pg_password_history', 'pg_directory_table']
if 'pg_class' in self.missingIssues:
myprint(' Name of test which found this issue: missing_extraneous_pg_class')
for name in omitlist:
if name in self.missingIssues:
myprint(' Name of test which found this issue: missing_extraneous_%s' % name)
for each in self.missingIssues['pg_class']:
each.report()
for catname, issues in self.missingIssues.items():
if catname != 'pg_class' and catname not in omitlist:
myprint(' Name of test which found this issue: missing_extraneous_%s' % catname)
for each in issues:
each.report()
else:
for catname, issues in self.missingIssues.items():
myprint(' Name of test which found this issue: missing_extraneous_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report foreign key issues
if len(self.foreignkeyIssues):
for catname, issues in self.foreignkeyIssues.items():
myprint(' Name of test which found this issue: foreign_key_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report duplicate issues
if len(self.duplicateIssues):
for catname, issues in self.duplicateIssues.items():
myprint(' Name of test which found this issue: duplicate_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report ACL issues
if len(self.aclIssues):
for catname, issues in self.aclIssues.items():
myprint(' Name of test which found this issue: acl_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report Orphan Toast Table issues
if len(self.orphanToastTableIssues):
for catname, issues in self.orphanToastTableIssues.items():
for each in issues:
each.report()
myprint('')
myprint = _myprint
# Collect all tables with missing issues for later reporting
if len(self.missingIssues):
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
oid_query = "select (select nspname from pg_namespace where oid=relnamespace) || '.' || relname from pg_class where oid=%d"
type_query = "select (select nspname from pg_namespace where oid=relnamespace) || '.' || relname from pg_class where reltype=%d"
for issues in self.missingIssues.values() :
for issue in issues:
# Get schemaname.tablename corresponding to oid
for key in issue.pkeys:
if 'relid' in key or key in ['ev_class', 'reloid']:
table_list = db.query(oid_query % issue.pkeys[key]).getresult()
if table_list:
if issue.type == 'missing':
GV.missing_attr_tables.append( (table_list[0][0], issue.segids) )
else:
GV.extra_attr_tables.append( (table_list[0][0], issue.segids) )
elif key == 'oid':
table_list = db.query(type_query % issue.pkeys[key]).getresult()
if table_list:
if issue.type == 'missing':
GV.missing_attr_tables.append( (table_list[0][0], issue.segids) )
else:
GV.extra_attr_tables.append( (table_list[0][0], issue.segids) )
def __cmp__(self, other):
if isinstance(other, GPObject):
return cmp((self.oid, self.catname), (other.oid, other.catname))
else:
return NotImplemented
def __hash__(self):
return hash((self.oid, self.catname))
# -------------------------------------------------------------------------------
class RelationObject(GPObject):
def __init__(self, oid, catname):
GPObject.__init__(self, oid, catname)
self.relname = None
self.nspname = None
self.relkind = None
self.paroid = None
def reportAllIssues(self):
oid = 'N/A' if self.oid is None else str(self.oid)
nspname = 'N/A' if self.nspname is None else self.nspname
relname = 'N/A' if self.relname is None else self.relname
if self.isTopLevel():
myprint('')
myprint('----------------------------------------------------')
objname = 'Type' if self.relkind == 'c' else 'Relation'
myprint('%s oid: %s' % (objname, oid))
myprint('%s schema: %s' % (objname, nspname))
myprint('%s name: %s' % (objname, relname))
else:
myprint(' Sub-object: ')
myprint(' ----------------------------------------------------')
myprint(' Relation oid: %s' % oid)
myprint(' Relation schema: %s' % nspname)
myprint(' Relation name: %s' % relname)
myprint('')
GPObject.reportAllIssues(self)
def setRelInfo(self, relname, nspname, relkind, paroid):
self.relname = relname
self.nspname = nspname
self.relkind = relkind
self.paroid = paroid
def isTopLevel(self):
if self.relkind == 'i' or self.relkind == 't' or self.relkind == 'o':
return False
return True
# -------------------------------------------------------------------------------
def getRelInfo(objects):
# Get relinfo: relname, relkind, par oid for all oids
# get relname, relkind
# left outer join with union of
# - get par oid for toast
# - get par oid for aoco
# - get par oid for index
# for each query: if there is inconsistency, majority wins
if objects == {}: return
oids = [oid for (oid, catname) in objects if catname == 'pg_class']
if oids == [] or None in oids: return
qry = """
SELECT oid, relname, nspname, relkind, paroid
FROM (
SELECT oid, relname, nspname, relkind
FROM (
SELECT oid, relname, nspname, relkind, rank() over (partition by oid order by count(*) desc)
FROM
(
SELECT c.oid, c.relname, c.relkind, n.nspname
FROM pg_class c
left outer join pg_namespace n
on (c.relnamespace = n.oid)
WHERE c.oid in ({oids})
UNION ALL
SELECT c.oid, c.relname, c.relkind, n.nspname
FROM gp_dist_random('pg_class') c
left outer join gp_dist_random('pg_namespace') n
on (c.relnamespace = n.oid and c.gp_segment_id = n.gp_segment_id)
WHERE c.oid in ({oids})
) allrelinfo
GROUP BY oid, relname, nspname, relkind
) relinfo
WHERE rank=1
) relinfo
LEFT OUTER JOIN
(
SELECT reltoastrelid as childoid, oid as paroid
FROM (
SELECT reltoastrelid, oid, rank() over (partition by reltoastrelid order by count(*) desc)
FROM
( SELECT reltoastrelid, oid FROM pg_class
WHERE reltoastrelid in ({oids})
UNION ALL
SELECT reltoastrelid, oid FROM gp_dist_random('pg_class')
WHERE reltoastrelid in ({oids})
) allpar_toast
GROUP BY reltoastrelid, oid
) par_toast
WHERE rank=1
UNION ALL
SELECT segrelid as childoid, relid as paroid
FROM (
SELECT segrelid, relid, rank() over (partition by segrelid order by count(*) desc)
FROM
( SELECT segrelid, relid FROM pg_appendonly
WHERE segrelid in ({oids})
UNION ALL
SELECT segrelid, relid FROM gp_dist_random('pg_appendonly')
WHERE segrelid in ({oids})
) allpar_aoco
GROUP BY segrelid, relid
) par_aoco
WHERE rank=1
UNION ALL
SELECT indexrelid as childoid, indrelid as paroid
FROM (
SELECT indexrelid, indrelid, rank() over (partition by indexrelid order by count(*) desc)
FROM
( SELECT indexrelid, indrelid FROM pg_index
WHERE indexrelid in ({oids})
UNION ALL
SELECT indexrelid, indrelid FROM gp_dist_random('pg_index')
WHERE indexrelid in ({oids})
) allpar_index
GROUP BY indexrelid, indrelid
) par_index
WHERE rank=1
) par ON childoid = oid
""".format(oids=','.join(map(str, oids)))
try:
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
curs = db.query(qry)
for row in curs.getresult():
(oid, relname, nspname, relkind, paroid) = row
objects[oid, 'pg_class'].setRelInfo(relname, nspname, relkind, paroid)
except Exception as e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
myprint(qry)
# -------------------------------------------------------------------------------
def buildGraph():
def buildGraphRecursive(objects, graph):
if objects == {}: return
getRelInfo(objects)
localObjects = {}
for (oid, catname), obj in objects.items():
# Top level object, add to graph
if obj.isTopLevel():
if obj not in graph:
graph[obj] = []
# Not top level object, find parent, add to localObjects if needed
else:
parobj = objects.get((obj.paroid, catname), None)
if parobj is None:
parobj = localObjects.get((obj.paroid, catname), None)
if parobj is None:
parobj = RelationObject(obj.paroid, catname)
localObjects[(obj.paroid, catname)] = parobj
assert (parobj is not None)
# Add obj and parobj to graph
if parobj not in graph:
graph[parobj] = []
if obj not in graph[parobj]:
graph[parobj].append(obj)
buildGraphRecursive(localObjects, graph)
buildGraphRecursive(GPObjects, GPObjectGraph)
# -------------------------------------------------------------------------------
def checkcatReport():
def reportAllIssuesRecursive(par, graph):
par.reportAllIssues()
if par not in graph:
return
for child in graph[par]:
reportAllIssuesRecursive(child, graph)
buildGraph()
reportedCheck = ['duplicate','missing_extraneous','inconsistent','foreign_key','acl', 'orphaned_toast_tables']
myprint('')
myprint('SUMMARY REPORT: %s' % ('FAILED' if len(GV.failedChecks) else 'PASSED'))
myprint('===================================================================')
elapsed = datetime.timedelta(seconds=int(GV.elapsedTime))
endTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
myprint('Completed %d test(s) on database \'%s\' at %s with elapsed time %s' \
% (GV.totalCheckRun, GV.dbname, endTime, elapsed))
if len(GPObjectGraph) == 0 and len(GV.failedChecks) == 0:
myprint('Found no catalog issue\n')
assert(GV.retcode == SUCCESS)
return
if len(GPObjectGraph) > 0:
total = 0
for par in GPObjectGraph:
if par.isTopLevel() and par.catname not in ['pg_constraint']: total += 1
if total == 0:
myprint('Found no catalog issue\n')
if par.catname in ['pg_constraint']:
myprint('Warnings were generated, see %s for detail\n' % gplog.get_logfile())
else:
myprint('Found a total of %d issue(s)' % total)
for par in GPObjectGraph:
if par.isTopLevel():
reportAllIssuesRecursive(par, GPObjectGraph)
myprint('')
# Report tables with missing attributes in a more usable format
if len(GV.missing_attr_tables) or len(GV.extra_attr_tables):
# Expand partition tables
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)
GV.missing_attr_tables = list(set(GV.missing_attr_tables))
if len(GV.missing_attr_tables) > 0:
myprint('----------------------------------------------------')
myprint(" Tables with missing issues:")
myprint(" Format [database name].[schema name].[table name].[segment id]:")
for table, segids in sorted(GV.missing_attr_tables):
for id in segids[1:-1].split(','):
myprint(" Table %s.%s.%s" % (GV.dbname, table, id))
myprint('')
GV.extra_attr_tables = list(set(GV.extra_attr_tables))
if len(GV.extra_attr_tables) > 0:
myprint('----------------------------------------------------')
myprint(" Tables with extra issues:")
myprint(" Format [database name].[schema name].[table name].[segment id]:")
for table, segids in sorted(GV.extra_attr_tables):
for id in segids[1:-1].split(','):
myprint(" Table %s.%s.%s" % (GV.dbname, table, id))
myprint('')
myprint('')
notReported = set(GV.failedChecks).difference(reportedCheck)
if len(notReported) > 0:
myprint('Failed test(s) that are not reported here: %s' % (', '.join(notReported)))
myprint('See %s for detail\n' % gplog.get_logfile())
def _myprint(str, level=logging.CRITICAL):
gplog.log_literal(logger, level, str)
myprint = _myprint
def generateVerifyFile(catname, fields, results, checkname):
in_clause = reduce(lambda x, y: x + ('' if not x else ',') + str(y[fields.index('oid')]), results, '')
verify_sql = '''
SELECT *
FROM (
SELECT relname, oid FROM pg_class WHERE reltype IN ({in_clause})
UNION ALL
SELECT relname, oid FROM gp_dist_random('pg_class') WHERE reltype IN ({in_clause})
) alltyprelids
GROUP BY relname, oid ORDER BY count(*) desc
'''.format(in_clause=in_clause)
filename = 'gpcheckcat.verify.%s.%s.%s.%s.sql' % (GV.dbname, catname, checkname, GV.timestamp)
try:
with open(filename, 'w') as fp:
fp.write(verify_sql + '\n')
except Exception as e:
logger.warning('Unable to generate verify file for %s (%s)' % (catname, str(e)))
def truncate_batch_size(primaries):
if GV.opt['-B'] > primaries:
GV.opt['-B'] = primaries
myprint("Truncated batch size to number of primaries: %d" % primaries)
def check_gpexpand():
check_result, msg = conflict_with_gpexpand("gpcheckcat",
refuse_phase1=True,
refuse_phase2=False)
if not check_result:
myprint(msg)
sys.exit(1)
def check_test_subset_parameter_count():
test_option_count = 0
if GV.opt['-R']:
test_option_count += 1
if GV.opt['-s']:
test_option_count += 1
if GV.opt['-C']:
test_option_count += 1
if test_option_count > 1:
myprint("Error: multiple test subset options are selected. Please pass only one of [-R, -s, -C] if necessary.\n")
setError(ERROR_NOREPAIR)
sys.exit(GV.retcode)
return
def check_test_names_parsed(tests):
correct_tests = []
for t in tests:
if t not in all_checks:
myprint("'%s' is not a valid test" % t)
else:
correct_tests.append(t)
return correct_tests
def main():
parseCommandLine()
if GV.opt['-l']:
listAllChecks()
sys.exit(GV.retcode)
GV.version = getversion()
# Our current implementation does always support gpcheckcat
#if GV.version < "4.0":
# myprint("Error: only Apache Cloudberry version >= 4.0 are supported\n")
# sys.exit(GV.retcode)
check_test_subset_parameter_count()
# gpcheckcat should check gpexpand running status
check_gpexpand()
GV.cfg = getGPConfiguration()
truncate_batch_size(len(GV.cfg.keys()))
GV.report_cfg = getReportConfiguration()
GV.max_content = max([GV.cfg[dbid]['content'] for dbid in GV.cfg])
for dbid in GV.cfg:
if (GV.cfg[dbid]['content'] == -1):
GV.coordinator_dbid = dbid
break
if GV.coordinator_dbid is None:
myprint("Error: coordinator configuration info not found in gp_segment_configuration\n")
setError(ERROR_NOREPAIR)
sys.exit(GV.retcode)
GV.catalog = getCatalog()
leaked_schema_dropper = LeakedSchemaDropper()
for dbname in GV.alldb:
# Reset global variables
GV.retcode=SUCCESS
GV.reset_stmt_queues()
GPObjects.clear()
GPObjectGraph.clear()
GV.dbname = dbname
myprint('')
myprint("Connected as user \'%s\' to database '%s', port '%d', gpdb version '%s'" \
% (GV.opt['-U'], GV.dbname, GV.report_cfg[-1]['port'], GV.version))
myprint('-------------------------------------------------------------------')
myprint('Batch size: %s' % GV.opt['-B'])
drop_leaked_schemas(leaked_schema_dropper, dbname)
if GV.opt['-C']:
namestr = GV.opt['-C']
try:
catalog_table_obj = getCatObj(namestr)
runCheckCatname(catalog_table_obj)
except Exception as e:
setError(ERROR_NOREPAIR)
sys.exit(GV.retcode)
else:
if GV.opt['-R']:
run_names = GV.opt['-R']
run_names_array = [x.strip() for x in run_names.split(",")]
run_tests = check_test_names_parsed(run_names_array)
elif GV.opt['-s']:
skip_names = GV.opt['-s']
skip_names_array = [x.strip() for x in skip_names.split(",")]
skip_tests = check_test_names_parsed(skip_names_array)
run_tests = [x for x in all_checks if x not in skip_tests]
else:
run_tests = all_checks.keys()
runAllChecks(run_tests)
checkcatReport()
setScriptRetCode(GV.retcode)
# skip shared tables on subsequent passes
if not GV.opt['-S']:
GV.opt['-S'] = "none"
sys.exit(GV.script_retcode)
#############
if __name__ == '__main__':
main()