blob: a7c2f165a83b3a04f23e02833fd7b9c05918f3f3 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''
Usage: gpcheckcat [<option>] [dbname]
-?
-B parallel: number of worker threads
-g dir : generate SQL to rectify catalog corruption, put it in dir
-p port : DB port number
-P passwd : DB password
-U uname : DB User Name
-v : verbose
-A : all databases
-S option : shared table options (none, only)
-O : Online
-l : list all tests
-R test : run this particular test
-C catname : run cross consistency, FK and ACL tests for this catalog table
'''
import sys, os, subprocess, tempfile, stat, re
from threading import Thread
from datetime import datetime
from time import localtime, strftime
import time
import getopt
try:
from gppylib.db import dbconn
from gppylib.gplog import *
from gppylib.gpcatalog import *
from gppylib.commands.unix import *
from pygresql.pgdb import DatabaseError
from pygresql import pg
except ImportError, e:
sys.exit('Error: unable to import module: ' + str(e))
parallelism = 8
# cache OID -> object name cache
oidmap = {}
#-------------------------------------------------------------------------------
EXECNAME = os.path.split(__file__)[-1]
setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
very_quiet_stdout_logging()
logger = get_default_logger()
#-------------------------------------------------------------------------------
################
def parse_int(val):
try: val = int(val)
except ValueError: val = 0
return val
###############################
def quote_value(name):
"""Add surrounding single quote, double interior single quote."""
assert isinstance(name,str)
return "'" + "''".join(name.split("'")) + "'"
SUCCESS=0
ERROR_REMOVE=1
ERROR_RESYNC=2
ERROR_NOREPAIR=3
def setError(level):
'''
Increases the error level to the specified level, if specified level is
lower than the existing level no change is made.
error level 0 => success
error level 1 => error, with repair script removes objects
error level 2 => error, with repair script that resynchronizes objects
error level 3 => error, no repair script
'''
GV.retcode = max(level, GV.retcode)
###############################
class Global():
def __init__(self):
self.retcode = SUCCESS
self.opt = {}
self.opt['-h'] = None
self.opt['-p'] = None
self.opt['-P'] = None
self.opt['-U'] = None
self.opt['-v'] = False
self.opt['-V'] = False
self.opt['-t'] = False
self.opt['-g'] = 'gpcheckcat.repair.' + strftime('%Y-%m-%d.%H.%M.%S', localtime())
self.opt['-B'] = parallelism
self.opt['-T'] = None
self.opt['-A'] = False
self.opt['-S'] = None
self.opt['-O'] = False
self.opt['-R'] = None
self.opt['-C'] = None
self.opt['-l'] = False
self.cfg = None
self.dbname = None
self.firstdb = None
self.alldb = []
self.db = {}
self.tmpdir = None
self.reset_stmt_queues()
self.home = os.environ.get('HOME')
if self.home == None:
usage('Error: $HOME must be set')
self.user = os.environ.get('USER') or os.environ.get('LOGNAME')
if self.user == None:
usage('Error: either $USER or $LOGNAME must be set')
self.catalog = None
self.max_content = 0
self.report_cfg = {}
def reset_stmt_queues(self):
# dictionary of SQL statements. Key is dbid
self.Remove = {}
# dictionary of SQL statements. Key is dbid
self.AdjustConname = {}
# dictionary of SQL statements. Key is dbid
self.DemoteConstraint = {}
# array of SQL statements. Key is dbid
self.ReSync = {}
# Indexes to rebuild
self.Reindex = []
# constraints which are actually unenforcable
self.Constraints = []
# ownership fixups
self.Owners = []
# drop leaked temporary schemas
self.Schemas = []
# fix distribution policies
self.Policies = []
self.missingEntryStatus = None
self.inconsistentEntryStatus = None
self.foreignKeyStatus = None
self.aclStatus = None
# the following variables used for reporting purposes
self.elapsedTime = 0
self.totalTestRun = 0
self.testStatus = True
self.failedTest = []
GV = Global()
###############################
def usage(exitarg = None):
print __doc__
sys.exit(exitarg)
###############################
def getversion():
versions = ["3.2", "3.3", "4.0", "4.1", "main"]
db = connect()
curs = db.query('''
select regexp_replace(version(),
E'.*PostgreSQL [^ ]+ .Greenplum Database ([1-9]+.[0-9]+|main).*',
E'\\\\1') as ver;''')
row = curs.getresult()[0]
version = row[0]
logger.debug('got version %s' % version)
return version
###############################
def getalldbs():
"""
get all connectable databases
"""
db = connect()
curs = db.query('''
select datname from pg_database where datallowconn order by datname ''')
row = curs.getresult()
return row
###############################
def parseCommandLine():
try:
(options, args) = getopt.getopt(sys.argv[1:], '?p:P:U:B:vg:t:AOS:R:C:l')
except Exception, e:
usage('Error: ' + str(e))
for (switch, val) in options:
if switch == '-?': usage(0)
elif switch[1] in 'pBPUgSRC': GV.opt[switch] = val
elif switch[1] in 'vtAOl': GV.opt[switch] = True
def setdef(x, v):
if not GV.opt[x]:
t = os.environ.get(v)
GV.opt[x] = t
if t:
logger.debug('%s not specified; default to %s (from %s)' %
(x, t, v))
if GV.opt['-l']: return
if GV.opt['-v']:
enable_verbose_logging()
setdef('-P', 'PGPASSWORD')
setdef('-U', 'PGUSER')
setdef('-U', 'USER')
setdef('-p', 'PGPORT')
if not GV.opt['-p']:
usage('Please specify -p port')
GV.opt['-p'] = parse_int(GV.opt['-p'])
GV.opt['-h'] = 'localhost'
logger.debug('Set default hostname to %s' % GV.opt['-h'])
if GV.opt['-R']:
logger.debug('Run this test: %s' % GV.opt['-R'])
if GV.opt['-C']:
GV.opt['-C'] = GV.opt['-C'].lower()
logger.debug('Run cross consistency test for table: %s' % GV.opt['-C'])
if (len(args) != 0 and len(args) != 1):
usage('Too many arguments')
if len(args) == 0:
GV.dbname = os.environ.get('PGDATABASE')
if GV.dbname is None:
GV.dbname = 'template1'
logger.debug('database is %s' % GV.dbname)
else:
GV.dbname = args[0]
GV.firstdb = GV.dbname
GV.alldb.append(GV.firstdb)
# build list of all connectable databases
if GV.opt['-A']:
alldb = getalldbs()
GV.alldb.pop()
for adbname in alldb:
GV.alldb.append(adbname[0])
if GV.opt['-S']:
if not re.match("none|only", GV.opt['-S'], re.I):
usage('Error: invalid value \'%s\' for shared table option. Legal values are (none, only)'% GV.opt['-S']);
GV.opt['-S'] = GV.opt['-S'].lower()
logger.debug('master at host %s, port %s, user %s, database %s' %
(GV.opt['-h'], GV.opt['-p'], GV.opt['-U'], GV.dbname))
try:
GV.opt['-B'] = int(GV.opt['-B'])
except Exception, e:
usage('Error: ' + str(e))
if GV.opt['-B'] < 1:
usage('Error: parallelism must be 1 or greater')
logger.debug('degree of parallelism: %s' % GV.opt['-B'])
#############
def connect(user=None, password=None, host=None, port=None,
database=None, utilityMode=False):
'''Connect to DB using parameters in GV'''
options = utilityMode and '-c gp_session_role=utility' or None
if not user: user = GV.opt['-U']
if not password: password = GV.opt['-P']
if not host: host = GV.opt['-h']
if not port: port = GV.opt['-p']
if not database: database = GV.dbname
try:
logger.debug('connecting to %s:%s %s' % (host, port, database))
db = pg.connect(host=host, port=port, user=user,
passwd=password, dbname=database, opt=options)
except pg.InternalError, ex:
logger.fatal('could not connect to %s: "%s"' %
(database, str(ex).strip()))
exit(1)
logger.debug('connected with %s:%s %s' % (host, port, database))
return db
#############
def connect2(cfgrec, user=None, password=None, database=None, utilityMode=True):
host = cfgrec['address']
port = cfgrec['port']
datadir = cfgrec['datadir']
logger.debug('connect %s:%s:%s' % (host, port, datadir))
if not database: database = GV.dbname
if cfgrec['content'] == -1:
utilityMode = False
key = "%s.%s.%s.%s.%s.%s.%s" % (host, port, datadir, user, password, database,
str(utilityMode))
conns = GV.db.get(key)
if conns:
return conns[0]
conn = connect(host=host, port=port, user=user, password=password,
database=database, utilityMode=utilityMode)
if conn:
GV.db[key] = [conn, cfgrec]
return conn
class execThread(Thread):
def __init__(self, cfg, db, qry):
self.cfg = cfg
self.db = db
self.qry = qry
self.curs = None
self.error = None
Thread.__init__(self)
def run(self):
try:
self.curs = self.db.query(self.qry)
except BaseException, e:
self.error = e
def processThread(threads):
batch = []
for th in threads:
logger.debug('waiting on thread %s' % th.getName())
th.join()
if th.error:
setError(ERROR_NOREPAIR)
myprint("%s:%d:%s : %s" %
(th.cfg['hostname'],
th.cfg['port'],
th.cfg['datadir'],
str(th.error)))
else:
batch.append([th.cfg, th.curs])
return batch
#############
def connect2run(qry, col=None):
logger.debug('%s' % qry)
batch = []
threads = []
i = 1
# parallelise queries
for dbid in GV.cfg:
c = GV.cfg[dbid]
db = connect2(c)
thread = execThread(c, db, qry)
thread.start()
logger.debug('launching query thread %s for dbid %i' %
(thread.getName(), dbid))
threads.append(thread)
# we don't want too much going on at once
if (i % GV.opt['-B']) == 0:
# process this batch of threads
batch.extend(processThread(threads))
threads = []
i += 1
# process the rest of threads
batch.extend(processThread(threads))
err = []
for [cfg, curs] in batch:
if col == None:
col = curs.listfields()
for row in curs.dictresult():
err.append([cfg, col, row])
return err
def formatErr(c, col, row):
s = ('%s:%s:%s, content %s, dbid %s' %
(c['hostname'], c['port'], c['datadir'],
c['content'], c['dbid']))
for i in col:
s = '%s, %s %s' % (s, i, row[i])
return s
#############
def getGPConfiguration():
cfg = {}
db = connect()
# note that in 4.0, sql commands cannot be run against the segment mirrors directly
# so we filter out non-primary segment databases in the query
qry = '''
SELECT content, preferred_role = 'p' as definedprimary,
dbid, role = 'p' as isprimary, hostname, address, port,
fselocation as datadir
FROM gp_segment_configuration JOIN pg_filespace_entry on (dbid = fsedbid)
WHERE fsefsoid = (select oid from pg_filespace where fsname='pg_system')
AND (role = 'p' or content < 0 )
'''
curs = db.query(qry)
for row in curs.dictresult():
if row['content'] == -1 and row['isprimary'] != 't':
continue # skip standby master
cfg[row['dbid']] = row
db.close()
return cfg
def checkDistribPolicy():
logger.info('-----------------------------------')
logger.info('Checking constraints on randomly distributed tables')
qry = '''
select n.nspname, rel.relname, pk.conname as constraint
from pg_constraint pk
join pg_class rel on (pk.conrelid = rel.oid)
join pg_namespace n on (rel.relnamespace = n.oid)
join gp_distribution_policy d on (rel.oid = d.localoid)
where pk.contype in('p', 'u') and d.attrnums is null
'''
db = connect2(GV.cfg[1])
try:
curs = db.query(qry)
err = []
for row in curs.dictresult():
err.append([GV.cfg[1], ('nspname', 'relname', 'constraint'), row])
if not err:
logger.info('[OK] randomly distributed tables')
else:
GV.testStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] randomly distributed tables')
logger.error('pg_constraint has %d issue(s)' % len(err))
logger.error(qry)
for e in err:
logger.error(formatErr(e[0], e[1], e[2]))
for e in err:
cons = e[2]
removeIndexConstraint(cons['nspname'], cons['relname'],
cons['constraint'])
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkDistribPolicy')
myprint(' Execution error: ' + str(e))
logger.info('-----------------------------------')
logger.info('Checking that unique constraints are only on distribution columns')
# final part of the WHERE clause here is a little tricky: we want to make
# sure that the set of distribution columns is a left subset of the
# constraint.
qry = '''
select n.nspname, rel.relname, pk.conname as constraint
from pg_constraint pk
join pg_class rel on (pk.conrelid = rel.oid)
join pg_namespace n on (rel.relnamespace = n.oid)
join gp_distribution_policy d on (rel.oid = d.localoid)
where pk.contype in ('p', 'u') and
(d.attrnums is null or not
d.attrnums operator(pg_catalog.<@) pk.conkey)
'''
try:
curs = db.query(qry)
err = []
for row in curs.dictresult():
err.append([GV.cfg[1], ('nspname', 'relname', 'constraint'), row])
if not err:
logger.info('[OK] unique constraints')
else:
GV.testStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] unique constraints')
logger.error('pg_constraint has %d issue(s)' % len(err))
logger.error(qry)
for e in err: logger.error(formatErr(e[0], e[1], e[2]))
for e in err:
cons = e[2]
removeIndexConstraint(cons['nspname'], cons['relname'],
cons['constraint'])
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkDistribPolicy')
myprint(' Execution error: ' + str(e))
#############
def checkPartitionIntegrity():
#
# MPP-8558: look for data in branch (not leaf) partitions
#
logger.info('-----------------------------------')
logger.info('Checking pg_partition ...')
err = []
db = connect()
qry = '''
select distinct
quote_ident(n.nspname) || '.' || quote_ident(c.relname) as parname
from pg_partition_rule r1
join pg_partition_rule r2 on (r1.oid = r2.parparentrule)
join pg_class c on (r1.parchildrelid = c.oid)
join pg_namespace n on (c.relnamespace = n.oid)
where r1.parchildrelid <> 0 and r2.parchildrelid <> 0
'''
try:
curs = db.query(qry)
for row in curs.dictresult():
qy2 = ('select count(*) as cc from (select 1 from only %s limit(2)) q'
% (row['parname']))
# logger.info(qy2)
curs2 = db.query(qy2)
for row2 in curs2.dictresult():
if row2['cc'] == 0 :
continue
err.append([row['parname']])
if not err:
logger.info('[OK] pg_partition branch integrity')
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] pg_partition branch integrity')
logger.error('pg_partition has %d issue(s)' % len(err))
logger.error('Discovered row data in branch partitions')
for e in err:
logger.error(' partition table name: %s' % (e[0]))
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkPartitionIntegrity')
myprint(' Execution error: ' + str(e))
# MPP-9283: look for partitions create "with oids".
qry = '''
select n1.nspname, pc.relname, pc.oid
from pg_class pc, pg_partition_rule pr, pg_namespace n1
where pr.parchildrelid = pc.oid and pc.relnamespace = n1.oid and pc.relhasoids
union
select n1.nspname, pc.relname, pc.oid
from pg_class pc, pg_partition pp, pg_namespace n1
where pp.parrelid = pc.oid and pc.relnamespace = n1.oid and pc.relhasoids
'''
try:
curs = db.query(qry)
err = []
for row in curs.dictresult():
err.append([GV.cfg[1], ('nspname', 'relname', 'oid'), row])
if not err:
logger.info('[OK] partition with oids check')
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] partition with oids check')
logger.error('partition with oids check found %d issue(s)' % len(err))
logger.error(qry)
for e in err[0:100]:
row = e[2]
logger.error(" table %s.%s oid %s" %
(row['nspname'], row['relname'], row['oid']))
if len(err) > 100:
logger.error("...")
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkPartitionIntegrity')
myprint(' Execution error: ' + str(e))
# TODO: add repair script
# MPP-11120: check for child partitions with different
# distribution policies The complexity of this query is due to
# having to account for the possibility of partitions with "holes"
# which would make similar distribution policies appear different
# in gp_distribution_policy. This is handled by unnesting both
# distribution
qry = '''
select
parrelid,
parchildrelid,
d1.attrnums as parpolicy,
d2.attrnums as parchildpolicy
from
(
select distinct
coalesce(p1.parrelid, p2.parrelid)::regclass as parrelid,
coalesce(p1.parchildrelid, p2.parchildrelid)::regclass as parchildrelid
from
(
select parrelid, parchildrelid, g1.attname, g1.index
from pg_partition_rule pr
join pg_partition p on (pr.paroid = p.oid)
join (
select localoid, attname, index
from (
select
localoid,
unnest(attrnums) as offset,
generate_series(1, array_upper(attrnums, 1)) as index
from
gp_distribution_policy
) d
join pg_attribute a on (a.attrelid = d.localoid and a.attnum = d.offset)
) g1 on (parrelid = g1.localoid)
where parchildrelid != 0
) p1
full outer join
(
select parrelid, parchildrelid, g2.attname, g2.index
from pg_partition_rule pr
join pg_partition p on (pr.paroid = p.oid)
join (
select localoid, attname, index
from (
select
localoid,
unnest(attrnums) as offset,
generate_series(1, array_upper(attrnums, 1)) as index
from
gp_distribution_policy
) d
join pg_attribute a on (a.attrelid = d.localoid and a.attnum = d.offset)
) g2 on (parchildrelid = g2.localoid)
where parchildrelid != 0
) p2
on (p1.parrelid = p2.parrelid and
p1.parchildrelid = p2.parchildrelid and
p1.index = p2.index)
where p1.attname is distinct from p2.attname
) p
join gp_distribution_policy d1 on (d1.localoid = p.parrelid)
join gp_distribution_policy d2 on (d2.localoid = p.parchildrelid)
where d2.attrnums is not null
'''
try:
curs = db.query(qry)
cols = ('parrelid', 'parchildrelid', 'parpolicy', 'parchildpolicy')
err = []
for row in curs.dictresult():
err.append([GV.cfg[1], cols, row])
if not err:
logger.info('[OK] partition distribution policy check')
else:
GV.testStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] partition distribution policy check')
logger.error('partition distribution policy check found %d issue(s)' % len(err))
if len(err) > 100:
logger.error(qry)
count = 0
for e in err:
cfg = e[0]
col = e[1]
row = e[2]
# generate repair script for this row
distributeRandomly(row['parchildrelid'])
# report at most 100 rows, for brevity
if count == 100:
logger.error("...")
count += 1
if count > 100:
continue
if count == 0:
logger.error("--------")
logger.error(" " + " | ".join(map(str, col)))
logger.error(" " + "-+-".join(['-'*len(x) for x in col]))
logger.error(" " + " | ".join([str(row[x]) for x in col]))
count += 1
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkPartitionIntegrity')
myprint(' Execution error: ' + str(e))
db.close()
#############
# A dictionary of SQL statements for checkPartitionRegularity()
partitionRegularityChecks = {
# Query: irreg_user_constraint
#
# A row represents an irregular user-defined constraint.
#
# This view just filters ptable_user_con_info so that it returns no rows
# if no irregular user-defined constraints exist.
#
# An irregular user-constraint is a constraint whose definition (DDL expression
# format) appears on the root table of a partitioned table, and doesn't appear
# on every part table of the partitioned table. It is possible to construct
# these using sequences of DDL operations on release prior to Rio. In Rio,
# it is possible to construct them with FOREIGN KEY constraints, but not with
# other constraint types.
#
# tableid: pg_class.oid of the partitioned table
# conname: name of the constraint on the partitioned table
# contype: type of constraint (same on part and table)
# condef: specification of constraint (same on part and table)
# numexpected: number of constraint occurrences expected (incl table)
# numactual: number of constraint occurrences observed (incl table)
'irreg_user_constraint' :
"""select
tableid,
tableconname,
contype,
condef,
numexpected,
numactual
from
(
select
c.tableid,
c.tableconname,
c.contype,
c.condef,
i.tableparts + 1,
count(c.partid),
count(distinct c.partconname)
from
(
select
u.tableid,
u.conname,
u.contype,
u.condef,
p.partid,
p.conid,
coalesce(p.conname)
from
(
select
p.tableid,
c.conname,
c.contype,
pg_get_constraintdef(c.oid) as condef
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as p(tableid, tabledepth) ,
pg_constraint c
where
p.tableid = c.conrelid
) as u(tableid, conname, contype, condef)
join
(
select
x.tableid::regclass as tableid,
c.conrelid::regclass as partid,
c.oid as conid,
c.conname,
c.contype,
pg_get_constraintdef(c.oid) as condef
from
pg_constraint c,
(
select
tableid,
tabledepth,
tableid::regclass partid,
0 as partdepth,
0 as partordinal,
'r'::char as partstatus
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as ptable(tableid, tabledepth)
union all
select
parrelid::regclass as tableid,
t.tabledepth as tabledepth,
r.parchildrelid::regclass partid,
p.parlevel + 1 as partdepth,
r.parruleord as partordinal,
case
when t.tabledepth = p.parlevel + 1 then 'l'::char
else 'i'::char
end as partstatus
from
pg_partition p,
pg_partition_rule r,
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as t(tableid, tabledepth)
where
p.oid = r.paroid
and not p.paristemplate
and p.parrelid = t.tableid
) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus)
where
x.partid = c.conrelid
) as p(tableid, partid, conid, conname, contype, condef)
on (
u.tableid = p.tableid and
u.contype = p.contype and
u.condef = p.condef
)
) as c(tableid, tableconname, contype, condef, partid, partconid, partconname) ,
(
select
t.tableid,
t.tabledepth,
n.nparts as tableparts,
r.nspname,
r.relname
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as t(tableid, tabledepth) ,
(
select
c.oid::regclass as relid,
n.nspname,
c.relname,
c.relkind
from
pg_class c,
pg_namespace n
where
c.relnamespace = n.oid
) as r(relid, nspname, relname, relkind) ,
(
select tableid, count(*)
from (
select
t.tableid::regclass,
p.parchildrelid::regclass as partid
from
(
select pg_partition.parrelid, pg_partition.oid
from pg_partition
where not pg_partition.paristemplate
) t(tableid, partid),
pg_partition_rule p
where p.paroid = t.partid
) as contains_part(tableid, partid)
group by tableid
) n(tableid, nparts)
where
t.tableid = r.relid and
t.tableid = n.tableid
) as i(tableid, tabledepth, tableparts, nspname, relname)
where
c.tableid = i.tableid
group by
c.tableid,
c.tableconname,
c.contype,
c.condef,
i.tableparts
) as ptable_user_con_info(tableid, tableconname, contype, condef, numexpected, numactual, numnames)
where
contype != 'f' and -- no FK consistency in Rio
numexpected != numactual""",
# Query: irreg_sys_constraint
#
# A row represents an irregular system-defined constraint.
#
# An irregular system constraint is a constraint whose definition (DDL
# expression format) appears on some part tables and that is not a system-
# defined partition constraint.
#
# This view doesn't actually test this. Instead, it checks for an
# expected pattern of system-defined constraints.
# 1. The constraints checked are those that don't appear on the root.
# 2. Each part has as many constraints as its depth in the partition
# less any default parts on its path.
# Violators are listed as results.
#
# tableid: pg_class.oid of the partitioned table
# partid: pg_class.oid of the (constrained) part table
# expected: number of constraint occurrences expected
# actual: number of constraint occurrences observed
'irreg_sys_constraint' :
"""select
coalesce(x.partid, a.partid) as partid,
x.tableid,
x.expected,
coalesce(a.actual, 0) as actual
from
(
{expected}
) x
full join
(
{actual}
) a
on (x.partid = a.partid)
where
x.expected != a.actual or
x.expected is null or
x.expected != coalesce(a.actual,0);""",
# Query based on view: unenforced_constraint_info
#
# a row represents an unenforced unique constraint to be fixed.
#
# tableid
# partid
# indexid
# partoid
# indexoid
# contype
# conname
# refobjid
'unenforced_constraint_info' :
"""select
c.tableid,
c.partid,
b.indexid,
c.partid::int as partoid,
b.indexid::int as indexoid,
b.contype,
b.conname,
b.refobjid
from
(
select
k.tableid,
k.distkey,
k.partkey,
c.conid,
c.conname,
c.contype,
i.indrelid::regclass,
i.indkey::smallint[]
from
pg_index i,
(
select
k.tableid,
d.attrcnt,
d.attrnums,
sum(k.partkeylen),
array_agg(k.partkey[g.g])
from
(
select
p.parrelid::regclass,
t.tabledepth,
p.parlevel,
p.parnatts,
p.paratts
from
pg_partition p,
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as t(tableid, tabledepth)
where
p.parrelid = t.tableid and
p.paristemplate is false
) as k(tableid, tabledepth, partdepth, partkeylen, partkey) ,
generate_series(0, (select max(partkeylen) from (
select
p.parrelid::regclass,
t.tabledepth,
p.parlevel,
p.parnatts,
p.paratts
from
pg_partition p,
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as t(tableid, tabledepth)
where
p.parrelid = t.tableid and
p.paristemplate is false
) as partition_level(tableid, tabledepth, partdepth, partkeylen, partkey) )-1) g(g),
(
select
localoid::regclass,
attrnums,
coalesce(1+array_upper(attrnums,1)-array_lower(attrnums,1),0)
from
gp_distribution_policy
) as d(relid, attrnums, attrcnt)
where
g.g < k.partkeylen and
k.tableid = d.relid
group by
k.tableid,
d.attrcnt,
d.attrnums
) as k(tableid, distkeylen, distkey, partkeylen, partkey) ,
(
select
relid,
conid,
conname,
contype,
indexid
from
(
select
r.oid::regclass as relid,
c.oid as conid,
c.conname,
c.contype,
c.consrc,
pg_get_constraintdef(c.oid) as condef,
d.objid::regclass as indexid
from
(
pg_class r
join
pg_constraint c
on
r.oid = c.conrelid
and r.relkind = 'r'
)
left join
pg_depend d
on
d.refobjid = c.oid
and d.classid = 'pg_class'::regclass
and d.refclassid = 'pg_constraint'::regclass
and d.deptype = 'i'
) as relconstraint(relid, conid, conname, contype, consrc, condef, indexid)
where
indexid is not null
or contype in ('p', 'u')
) as c(relid, conid, conname, contype, indexid)
where
c.relid = k.tableid and
i.indrelid = k.tableid and
i.indexrelid = c.indexid and
not i.indkey::smallint[] @> array_cat(k.distkey, k.partkey)
) as x(tableid, distkey, partkey, conid, conname, contype, indrelid, indkey) ,
(
select
tableid,
tabledepth,
tableid::regclass partid,
0 as partdepth,
0 as partordinal,
'r'::char as partstatus
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as ptable(tableid, tabledepth)
union all
select
parrelid::regclass as tableid,
t.tabledepth as tabledepth,
r.parchildrelid::regclass partid,
p.parlevel + 1 as partdepth,
r.parruleord as partordinal,
case
when t.tabledepth = p.parlevel + 1 then 'l'::char
else 'i'::char
end as partstatus
from
pg_partition p,
pg_partition_rule r,
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as t(tableid, tabledepth)
where
p.oid = r.paroid
and not p.paristemplate
and p.parrelid = t.tableid
) as c(tableid, tabledepth, partid, partdepth, partordinal, partstatus) ,
(
select
r.oid::regclass as relid,
i.oid::regclass as indexid,
c.oid as conid,
c.contype,
c.conname,
pg_get_constraintdef(c.oid) as condef,
d.classid,
d.objid,
d.objsubid,
d.refclassid,
d.refobjid,
d.refobjsubid,
d.deptype
from
pg_constraint c,
pg_class i,
pg_depend d,
pg_index x,
pg_class r
where
d.classid = 'pg_class'::regclass and
d.objid = i.oid and
d.objsubid = 0 and
d.refclassid = 'pg_constraint'::regclass and
d.refobjid = c.oid and
d.refobjsubid = 0 and
d.deptype = 'i' and
i.relkind = 'i' and
i.oid = x.indexrelid and
r.oid = x.indrelid
) as b(relid, indexid, conid, contype, conname, condef, classid, objid, objsubid, refclassid, refobjid, refobjsubid, deptype)
where
x.tableid = c.tableid and
c.partid = b.relid and
b.classid = 'pg_class'::regclass and
b.objsubid = 0 and
b.refclassid = 'pg_constraint'::regclass and
b.refobjsubid = 0 and
b.deptype = 'i'""",
# Repair sequence for issues identified by unenforced_constraint_info.
# Must substitute values from a row of that query, before running.
'unenforced_constraint_repairs' :
"""--Demote type-{contype} constraint {conname} on part {partid}
--of partitioned table {tableid} to simple unique index.
--Disconnect index {indexid} from constraint {conname}
delete
from
pg_depend
where
(classid, objid, objsubid, refclassid, refobjid, refobjsubid, deptype) =
(
select
d.classid,
d.objid,
d.objsubid,
d.refclassid,
d.refobjid,
d.refobjsubid,
d.deptype
from
pg_index i,
pg_depend d
where
i.indexrelid = {indexoid} and
d.classid = 'pg_class'::regclass and
d.objid = i.indexrelid and
d.objsubid = 0 and
d.refclassid = 'pg_constraint'::regclass and
d.refobjid = (
select oid
from pg_constraint
where
conrelid = {partoid} and
conname = {quoted_conname} and
contypid = 0 and
contype = {quoted_contype}
) and
d.refobjsubid = 0 and
d.deptype = 'i'
);
--Replace dependency of constraint {conname} on table columns of {partid}
--with dependency of index {indexoid} on same.
update
pg_depend
set
classid = 'pg_class'::regclass,
objid = {indexoid}
where
classid = 'pg_constraint'::regclass and
objid = (
select oid
from pg_constraint
where
conrelid = {partoid} and
conname = {quoted_conname} and
contypid = 0 and
contype = {quoted_contype}
) and
objsubid = 0 and
refclassid = 'pg_class'::regclass and
refobjid = {partoid} and
refobjsubid <> 0 and
deptype = 'a';
--Delete constraint {conname} leaving its index {indexid}
delete
from
pg_constraint
where
conrelid = {partoid} and
conname = {quoted_conname} and
contypid = 0 and
contype = {quoted_contype};""",
# Query: fix_ill_named_constraint
#
# a row represents the data for a fixup of an ill-named constraint
#
# tableid: pg_class.oid of the partitioned table as regclass
# tableconname: pg_constraint.conname on the partitioned table
# partid: pg_class.oid of the part table as regclass
# partrelid: pg_class.oid of the part table as int
# partconname: pg_constraint.conname on the part or partitioned table
'fix_ill_named_constraint' :
"""select
tableid,
tableconname,
partid,
partid::int as partrelid,
partconname
from (
select
u.tableid,
u.conname,
u.contype,
u.condef,
p.partid,
p.conid,
coalesce(p.conname)
from
(
select
p.tableid,
c.conname,
c.contype,
pg_get_constraintdef(c.oid) as condef
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as p(tableid, tabledepth) ,
pg_constraint c
where
p.tableid = c.conrelid
) as u(tableid, conname, contype, condef)
join
(
select
x.tableid::regclass as tableid,
c.conrelid::regclass as partid,
c.oid as conid,
c.conname,
c.contype,
pg_get_constraintdef(c.oid) as condef
from
pg_constraint c,
(
select
tableid,
tabledepth,
tableid::regclass partid,
0 as partdepth,
0 as partordinal,
'r'::char as partstatus
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as ptable(tableid, tabledepth)
union all
select
parrelid::regclass as tableid,
t.tabledepth as tabledepth,
r.parchildrelid::regclass partid,
p.parlevel + 1 as partdepth,
r.parruleord as partordinal,
case
when t.tabledepth = p.parlevel + 1 then 'l'::char
else 'i'::char
end as partstatus
from
pg_partition p,
pg_partition_rule r,
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as t(tableid, tabledepth)
where
p.oid = r.paroid
and not p.paristemplate
and p.parrelid = t.tableid
) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus)
where
x.partid = c.conrelid
) as p(tableid, partid, conid, conname, contype, condef)
on (
u.tableid = p.tableid and
u.contype = p.contype and
u.condef = p.condef
)
) as part_user_constraint(tableid, tableconname, contype, condef, partid, partconid, partconname)
where partconname != tableconname""",
# Query based on view: part_sys_actual
# tableid
# partid
# actual observed number of system-constraints
"actual_part_con_query" :
"""select
tableid,
partid,
count(*) as actual
from (
select
tableid,
partid,
conid,
conname,
contype,
condef
from (
select
x.tableid::regclass as tableid,
c.conrelid::regclass as partid,
c.oid as conid,
c.conname,
c.contype,
pg_get_constraintdef(c.oid) as condef
from
pg_constraint c,
(
select
tableid,
tabledepth,
tableid::regclass partid,
0 as partdepth,
0 as partordinal,
'r'::char as partstatus
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as ptable(tableid, tabledepth)
union all
select
parrelid::regclass as tableid,
t.tabledepth as tabledepth,
r.parchildrelid::regclass partid,
p.parlevel + 1 as partdepth,
r.parruleord as partordinal,
case
when t.tabledepth = p.parlevel + 1 then 'l'::char
else 'i'::char
end as partstatus
from
pg_partition p,
pg_partition_rule r,
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as t(tableid, tabledepth)
where
p.oid = r.paroid
and not p.paristemplate
and p.parrelid = t.tableid
) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus)
where
x.partid = c.conrelid
) as part_constraint(tableid, partid, conid, conname, contype, condef)
where
(partid, condef) not in
(
select partid, condef
from (
select
u.tableid,
u.conname,
u.contype,
u.condef,
p.partid,
p.conid,
coalesce(p.conname)
from
(
select
p.tableid,
c.conname,
c.contype,
pg_get_constraintdef(c.oid) as condef
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as p(tableid, tabledepth) ,
pg_constraint c
where
p.tableid = c.conrelid
) as u(tableid, conname, contype, condef)
join
(
select
x.tableid::regclass as tableid,
c.conrelid::regclass as partid,
c.oid as conid,
c.conname,
c.contype,
pg_get_constraintdef(c.oid) as condef
from
pg_constraint c,
(
select
tableid,
tabledepth,
tableid::regclass partid,
0 as partdepth,
0 as partordinal,
'r'::char as partstatus
from
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as ptable(tableid, tabledepth)
union all
select
parrelid::regclass as tableid,
t.tabledepth as tabledepth,
r.parchildrelid::regclass partid,
p.parlevel + 1 as partdepth,
r.parruleord as partordinal,
case
when t.tabledepth = p.parlevel + 1 then 'l'::char
else 'i'::char
end as partstatus
from
pg_partition p,
pg_partition_rule r,
(
select
parrelid::regclass,
max(parlevel)+1
from
pg_partition
group by parrelid
) as t(tableid, tabledepth)
where
p.oid = r.paroid
and not p.paristemplate
and p.parrelid = t.tableid
) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus)
where
x.partid = c.conrelid
) as p(tableid, partid, conid, conname, contype, condef)
on (
u.tableid = p.tableid and
u.contype = p.contype and
u.condef = p.condef
)
) as part_user_constraint(tableid, tableconname, contype, condef, partid, partconid, partconname)
)
) as part_sys_constraint(tableid, partid, conid, conname, contype, condef)
group by tableid, partid""",
# Query contains_directly has a row per step along a path from
# root to leaf. The columns are
# tableid - pg_class.oid of the partitioned table.
# parentid - pg_class.oid of the "from" table.
# childid - pg_class.oid of the "to" table, the target of the step.
# npcon - constraint count for this step: 0 if "to" is default, else 1.
#
'contains_directly' :
"""(
select
r.parrelid::regclass as tableid,
coalesce(p.parchildrelid, r.parrelid)::regclass as parentid,
c.parchildrelid::regclass as childid,
case when c.parisdefault then 0 else 1 end as npcon
from
pg_partition r,
pg_partition_rule c
left join pg_partition_rule p on (c.parparentrule = p.oid)
where
not r.paristemplate and
not c.parchildrelid = 0 and
c.paroid = r.oid
) {alias}""",
# The base case is the length 1 path from root or branch to branch
# or leaf. This is always the first union term.
'base_query' :
"""select
start.tableid,
start.parentid,
start.childid,
start.npcon
from
{contains_directly_start}""",
# A union term is an N-way self join of contains_directly and
# identifies paths of length N. Note that the length of a path
# to a part at depth N+1 is N.
'union_term' :
"""select
start.tableid,
start.parentid,
stop.childid,
start.npcon -- stop.npcon
from
{contains_directly_start},
{contains_directly_stop}{intermediate_path_from}
where
start.childid {intermediate_path_where}
= stop.parentid""",
#
'union_query' :
"""select tableid, childid as partid, sum(npcon) as expected
from (
{open_union}
) r
group by tableid, childid"""
}
#############
# For partitionRegularityChecks.
def make_union_term(depth):
assert depth > 0
union_term = partitionRegularityChecks['union_term']
contains_directly = partitionRegularityChecks['contains_directly']
path_from = []
path_where = []
if depth > 1: # for later .join()
path_from.append('')
path_where.append('')
for i in range(1, depth):
path_alias = "path%d" % i
path_from.append(contains_directly.format(alias = path_alias))
path_where.append(" = %s.parentid and %s.childid"
% (path_alias, path_alias) )
return union_term.format(
contains_directly_start = contains_directly.format(alias = 'start'),
contains_directly_stop = contains_directly.format(alias = 'stop'),
intermediate_path_from = ",\n ".join(path_from),
intermediate_path_where = "\n".join(path_where)
)
# For partitionRegularityChecks.
def expected_part_con_query(maxdepth):
cds = partitionRegularityChecks['contains_directly'].format(alias = 'start')
base_query = partitionRegularityChecks['base_query']
terms = [base_query.format(contains_directly_start=cds)]
for depth in range(1, maxdepth+1):
terms.append(make_union_term(depth))
return partitionRegularityChecks['union_query'].format(
open_union = '\n\nunion all\n\n'.join(terms))
# For partitionRegularityChecks.
def make_irregular_sys_con_query(maxdepth):
expected = expected_part_con_query(maxdepth)
actual = partitionRegularityChecks['actual_part_con_query']
qry = partitionRegularityChecks['irreg_sys_constraint']
return qry.format(expected = expected, actual = actual)
#############
def checkPartitionRegularity():
#
# Rio partitioning changes introduce new regularity requirements for constraints
# on partitioned tables. These are the checks.
#
okayToRun = GV.missingEntryStatus and GV.inconsistentEntryStatus and GV.foreignKeyStatus
irregular = False
logger.info('-----------------------------------')
logger.info('Checking pg_partition/pg_constraint regularity ...')
if not okayToRun:
logger.warn('unable to run pg_partition/pg_constraint regularity checks')
logger.warn('prerequisite tests failed or not run')
return
db = connect()
testname = 'user-defined constraint regularity on partitioned tables'
qry = partitionRegularityChecks['irreg_user_constraint']
err = []
try:
curs = db.query(qry)
for row in curs.dictresult():
err.append( (row['tableconname'], row['tableid'], row['numactual'], row['numexpected']) )
if not err:
logger.info('[OK] %s' % testname)
else:
irregular = True
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] %s' % testname)
logger.error(' %s test has %d issue(s)' % (testname, len(err)) )
efmt = ' Constraint "%s" of partitioned table "%s" occurs %d times, %d expected.'
for e in err:
logger.error(efmt % e)
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: %s' % testname)
myprint(' Execution error: ' + str(e))
testname = 'system-defined partition constraint regularity on partitioned tables'
# Determine maximum number of partitioning levels in this database
try:
qry = """select max(parlevel)+1 as maxdepth from pg_partition;"""
curs = db.query(qry)
assert len(curs.dictresult()) == 1 # scalar aggregation!
maxdepth = curs.dictresult()[0]['maxdepth']
err = []
if isinstance(maxdepth, int):
qry = make_irregular_sys_con_query(maxdepth)
curs = db.query(qry)
for row in curs.dictresult():
err.append( (row['partid'], row['tableid'], row['actual'], row['expected']) )
if not err:
logger.info('[OK] %s' % testname)
else:
irregular = True
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] %s' % testname)
logger.error(' %s test has %d issue(s)' % ( testname, len(err)) )
efmt = ' Part table "%s" of partitioned table "%s" has %d system-defined constraints, %d expected.'
for e in err:
logger.error(efmt % e)
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: %s' % testname)
myprint(' Execution error: ' + str(e))
testname = 'unenforced unique constraints on partitioned tables'
if irregular:
logger.warn('skipping test %s due to constraint irregularities' % testname)
elif GV.Remove:
logger.warn('skipping test %s due to required removals' % testname)
else:
# logger.info('Checking %s ...' % testname)
qry = partitionRegularityChecks['unenforced_constraint_info']
err = []
try:
curs = db.query(qry)
for row in curs.dictresult():
row['con_type_phrase'] = 'primary key' if row['contype'] == 'p' else 'unique'
row['quoted_conname'] = quote_value(row['conname'])
row['quoted_contype'] = quote_value(row['contype'])
err.append(row)
if not err:
logger.info('[OK] %s' % testname)
else:
GV.testStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] %s' % testname)
logger.error(' %s test has %d issue(s)' % ( testname, len(err) ) )
efmt = ' partitioned table "{tableid}" has unenforced {con_type_phrase} constraint: "{partid}"'
repair_fmt = partitionRegularityChecks['unenforced_constraint_repairs']
n = 0
for e in err:
n = n+1
if n <= 100:
logger.error(efmt.format(**e))
repair_sequence = repair_fmt.format(**e)
for dbid in GV.cfg:
addDemoteConstraint(dbid, repair_sequence)
if len(err) > 100:
logger.error("...")
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: %s' % testname)
myprint(' Execution error: ' + str(e))
testname = 'inconsistently named constraint in partitioned table'
if irregular:
logger.warn('skipping test %s due to constraint irregularities' % testname)
elif GV.Remove:
logger.warn('skipping test %s due to required removals' % testname)
elif GV.DemoteConstraint:
logger.warn('skipping test %s due to unenforced constraint removals' % testname)
else:
# logger.info('Checking %s ...' % testname)
qry = partitionRegularityChecks['fix_ill_named_constraint']
err = []
try:
curs = db.query(qry)
for row in curs.dictresult():
err.append( row )
if not err:
logger.info('[OK] %s' % testname)
else:
GV.testStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] %s' % testname)
logger.error(' %s test has %d issue(s)' % ( testname, len(err) ) )
efmt = ' part table "%s" has constraint named "%s"; should be "%s"'
n = 0
for e in err:
n = n+1
if n <= 100:
# part name, wrong constraint name, right constraint name
logger.error(efmt % (e['partid'], e['partconname'], e['tableconname']))
for dbid in GV.cfg:
buildAdjustConname(
dbid, e['tableid'], int(e['partrelid']),
quote_value(e['partconname']),
quote_value(e['tableconname']) )
if len(err) > 100:
logger.error("...")
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: %s' % testname)
myprint(' Execution error: ' + str(e))
db.close()
#############
def checkPGClass():
logger.info('-----------------------------------')
logger.info('Checking pg_class ...')
qry = '''
SELECT relname, relkind, tc.oid as oid,
reltoastrelid, reltoastidxid
FROM pg_class tc left outer join
pg_attribute ta on (tc.oid = ta.attrelid)
WHERE ta.attrelid is NULL
'''
err = connect2run(qry, ('relname', 'relkind', 'oid'))
if not err:
logger.info('[OK] pg_class')
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] pg_class')
logger.error('pg_class has %d issue(s)' % len(err))
logger.error(qry)
for e in err[0:100]:
logger.error(formatErr(e[0], e[1], e[2]))
if len(err) > 100:
logger.error("...")
#############
def checkPGNamespace():
logger.info('-----------------------------------')
logger.info('Checking for leaked temporary schemas')
db = connect2(GV.cfg[1], utilityMode=False)
# The simpler form of this query that pushed the union into the
# inner select does not run correctly on 3.2.x
qry = '''
SELECT distinct nspname as schema
FROM (
SELECT nspname, replace(nspname, 'pg_temp_','')::int as sess_id
FROM gp_dist_random('pg_namespace')
WHERE nspname ~ '^pg_temp_[0-9]+'
) n LEFT OUTER JOIN pg_stat_activity x using (sess_id)
WHERE x.sess_id is null
UNION
SELECT nspname as schema
FROM (
SELECT nspname, replace(nspname, 'pg_temp_','')::int as sess_id
FROM pg_namespace
WHERE nspname ~ '^pg_temp_[0-9]+'
) n LEFT OUTER JOIN pg_stat_activity x using (sess_id)
WHERE x.sess_id is null
'''
try:
curs = db.query(qry)
if curs.ntuples() == 0:
logger.info('[OK] temporary schemas')
else:
GV.testStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] temporary schemas')
logger.error('found %d unbound temporary schemas' % curs.ntuples())
for row in curs.getresult():
logger.error(" ... %s" % row[0])
GV.Schemas.append(row[0])
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkPGNamespace')
myprint(' Execution error: ' + str(e))
# Check for objects in various catalogs that are in a schema that has
# been dropped.
logger.info('Checking missing schema definitions ...')
qry = '''
SELECT o.catalog, o.nsp
FROM pg_namespace n right outer join
(select 'pg_class' as catalog, relnamespace as nsp from pg_class
union
select 'pg_type' as catalog, typnamespace as nsp from pg_type
union
select 'pg_operator' as catalog, oprnamespace as nsp from pg_operator
union
select 'pg_proc' as catalog,pronamespace as nsp from pg_proc) o on
(n.oid = o.nsp)
WHERE n.oid is NULL
'''
err = connect2run(qry, ('catalog', 'nsp'))
if not err:
logger.info('[OK] missing schema definitions')
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] missing schema definitons')
logger.error('found %d references to non-existent schemas' % len(err))
logger.error(qry)
for e in err:
logger.error(formatErr(e[0], e[1], e[2]))
#############
'''
Produce repair scripts to remove dangling entries of gp_fastsequence:
- one file per segment dbid
- contains DELETE statements to remove catalog entry
'''
def removeFastSequence(db):
'''
MPP-14758: gp_fastsequence does not get cleanup after a failed transaction (AO/CO)
Note: this is slightly different from the normal foreign key check
because it streches the cross reference across segments.
This makes it safe in the event of cross consistency issues with pg_class,
but may not repair some issues when there are cross consistency problems
'''
try:
qry = """
SELECT dbid, objid
FROM gp_segment_configuration AS cfg JOIN
(SELECT gp_segment_id, objid
FROM (select gp_segment_id, objid from gp_fastsequence
union
select gp_segment_id, objid from gp_dist_random('gp_fastsequence')) AS fs
LEFT OUTER JOIN
(select oid from pg_class
union
select oid from gp_dist_random('pg_class')) AS c
ON (fs.objid = c.oid)
WHERE c.oid IS NULL) AS r
ON r.gp_segment_id = cfg.content
WHERE cfg.role = 'p';
"""
curs = db.query(qry)
for row in curs.dictresult():
seg = row['dbid'] # dbid of targetted segment
name = 'gp_fastsequence tuple' # for comment purposes
table = 'gp_fastsequence' # table name
cols = {'objid': row['objid']} # column name and value
objname = 'gp_fastsequence' # for comment purposes
buildRemove(seg, name, table, cols, objname)
except Exception, e:
logger.error('removeFastSequence: ' + str(e))
#############
def removeIndexConstraint(nspname, relname, constraint):
GV.Constraints.append('ALTER TABLE "%s"."%s" DROP CONSTRAINT "%s" CASCADE;' % \
(nspname, relname, constraint))
def distributeRandomly(rel):
GV.Policies.append('ALTER TABLE %s SET DISTRIBUTED RANDOMLY;' % rel)
def buildRemove(seg, name, table, cols, objname):
first = False
fullstr = ''
str = ''
for col in cols:
if not first:
fullstr = '--Object Name: %s\n--Remove %s for %i\n' % \
(objname, name, cols[col])
if len(str) > 0:
str += ' or '
str += '%s = \'%s\'' % (col, cols[col])
fullstr += 'DELETE FROM %s WHERE %s;' % (table, str)
addRemove(seg, fullstr)
def addRemove(seg, line):
if not GV.Remove.has_key(seg):
GV.Remove[seg] = []
GV.Remove[seg].append(line)
def buildAdjustConname(seg, relname, relid, oldconname, newconname):
# relname text
# relid oid as int
# old/newconname text
stmt = '--Constraint Name: %s on %s becomes %s\n' % \
(oldconname, relname, newconname)
stmt += 'UPDATE pg_constraint\n'
stmt += 'SET conname = %s\n' % newconname
stmt += 'WHERE conrelid = %d and conname = %s;' % (relid, oldconname)
addAdjustConname(seg, stmt)
def addAdjustConname(seg, stmt):
if not GV.AdjustConname.has_key(seg):
GV.AdjustConname[seg] = []
GV.AdjustConname[seg].append(stmt)
def addDemoteConstraint(seg, repair_sequence):
if not GV.DemoteConstraint.has_key(seg):
GV.DemoteConstraint[seg] = []
GV.DemoteConstraint[seg].append(repair_sequence)
#############
def checkDepend():
# Check for dependencies on non-existent objects
logger.info('-----------------------------------')
logger.info('Checking Object Dependencies')
db = connect2(GV.cfg[1], utilityMode=False)
# Catalogs that link up to pg_depend/pg_shdepend
qry = "select relname from pg_class where relnamespace=11 and relhasoids"
curs = db.query(qry)
catalogs = []
for row in curs.getresult():
catalogs.append(row[0])
# For each catalog construct the subquery for that catalog
#
# It would be desireable to switch these queries to a gp_dist_random
# type query rather than issueing it in utility mode on every segment,
# however because there are numerous issues remaining with oid
# inconsistencies (see crossDBCheck()) that doesn't work out well.
deps = []
for cat in catalogs:
qry = """
SELECT '{catalog}' as catalog, objid FROM (
SELECT objid FROM pg_depend
WHERE classid = '{catalog}'::regclass
UNION ALL
SELECT refobjid FROM pg_depend
WHERE refclassid = '{catalog}'::regclass
) d
LEFT OUTER JOIN {catalog} c on (d.objid = c.oid)
WHERE c.oid is NULL
UNION ALL
SELECT '{catalog}' as catalog, objid FROM (
SELECT dbid, objid FROM pg_shdepend
WHERE classid = '{catalog}'::regclass
UNION ALL
SELECT dbid, refobjid FROM pg_shdepend
WHERE refclassid = '{catalog}'::regclass
) d JOIN pg_database db
ON (d.dbid = db.oid and datname= current_database())
LEFT OUTER JOIN {catalog} c on (d.objid = c.oid)
WHERE c.oid is NULL
""".format(catalog=cat)
deps.append(qry)
qry = """
SELECT distinct catalog, objid
FROM (%s
) q
""" % "UNION ALL".join(deps)
try:
err = connect2run(qry)
if not err:
logger.info('[OK] basic object dependencies')
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] basic object dependencies')
logger.error(' found %d dependencies on dropped objects' % len(err))
# logger.info(qry)
except Exception, e:
setError(ERROR_NOREPAIR)
myprint("[ERROR] executing test: basic object dependencies")
myprint(" Execution error: " + str(e))
myprint(qry)
def fixupowners(nspname, relname, oldrole, newrole):
# Must first alter the table to the wrong owner, then back to the right
# owner. The purpose of this is that AT doesn't dispatch the change unless
# it think the table actually changed owner.
#
# Note: this means that the Owners list must be run in the order given.
#
GV.Owners.append('-- owner for "%s"."%s"' % (nspname, relname))
GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' %
(nspname, relname, oldrole))
GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' %
(nspname, relname, newrole))
def checkOwners():
logger.info('-----------------------------------')
logger.info('Checking table ownership')
# Check owners in pg_class
#
# - Report each table that has an inconsistency with the master database,
# this can be on the table directly, or on one of the table subobjects
# (pg_toast, pg_aoseg, etc)
#
# - Theoretically this could result in multiple corrections for a given
# table based on having multiple "wrong" owners. Realistically most
# of the problems we have seen with this problem the wrong owner is
# almost always the gpadmin user, so this is not expected. If it does
# occur it won't be a problem, but we will issue more ALTER TABLE
# commands than is strictly necessary.
#
# - Between 3.3 and 4.0 the ao segment columns migrated from pg_class
# to pg_appendonly.
db = connect2(GV.cfg[1], utilityMode=False)
qry = '''
select distinct n.nspname, coalesce(o.relname, c.relname) as relname,
a.rolname, m.rolname as master_rolname
from gp_dist_random('pg_class') r
join pg_class c on (c.oid = r.oid)
left join pg_appendonly ao on (c.oid = ao.segrelid or
c.oid = ao.segidxid or
c.oid = ao.blkdirrelid or
c.oid = ao.blkdiridxid)
left join pg_class t on (t.reltoastidxid = c.oid)
left join pg_class o on (o.oid = ao.relid or
o.reltoastrelid = t.oid or
o.reltoastrelid = c.oid)
join pg_authid a on (a.oid = r.relowner)
join pg_authid m on (m.oid = coalesce(o.relowner, c.relowner))
join pg_namespace n on (n.oid = coalesce(o.relnamespace, c.relnamespace))
where c.relowner <> r.relowner
'''
try:
curs = db.query(qry)
rows = []
for row in curs.dictresult():
rows.append(row)
if len(rows) == 0:
logger.info('[OK] table ownership')
else:
GV.testStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] table ownership')
logger.error('found %d table ownership issue(s)' % len(rows))
logger.error('%s' % qry)
for row in rows[0:100]:
logger.error(' %s.%s relowner %s != %s'
% (row['nspname'], row['relname'], row['rolname'],
row['master_rolname']))
if len(rows) > 100:
logger.error("...")
for row in rows:
fixupowners(row['nspname'], row['relname'], row['rolname'],
row['master_rolname'])
except Exception, e:
setError(ERROR_NOREPAIR)
myprint("[ERROR] executing: check table ownership")
myprint(" Execution error: " + str(e))
myprint(qry)
# TODO: add a check that subobject owners agree with the main object owner.
# (The above checks only compare master vs segment)
# Check owners in pg_type
# - Ignore implementation types of pg_class entries - they should be
# in the check above since ALTER TABLE is required to fix them, not
# ALTER TYPE.
db = connect2(GV.cfg[1], utilityMode=False)
qry = '''
select distinct n.nspname, t.typname, a.rolname, m.rolname as master_rolname
from gp_dist_random('pg_type') r
join pg_type t on (t.oid = r.oid)
join pg_namespace n on (n.oid = t.typnamespace)
join pg_authid a on (a.oid = r.typowner)
join pg_authid m on (m.oid = t.typowner)
where r.typowner <> t.typowner and r.typrelid = 0
'''
try:
curs = db.query(qry)
rows = []
for row in curs.dictresult():
rows.append(row)
if len(rows) == 0:
logger.info('[OK] type ownership')
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] type ownership')
logger.error('found %d type ownership issue(s)' % len(rows))
logger.error('%s' % qry)
for row in rows[0:100]:
logger.error(' %s.%s typeowner %s != %s'
% (row['nspname'], row['typname'], row['rolname'],
row['master_rolname']))
if len(rows) > 100:
logger.error("...")
except Exception, e:
setError(ERROR_NOREPAIR)
myprint("[ERROR] executing test: check type ownership")
myprint(" Execution error: " + str(e))
myprint(qry)
# FIXME: add repair script
# NOTE: types with typrelid probably will be repaired by the
# script generated by the table ownership test above.
# TODO:
# Check owners in pg_proc
# Check owners in pg_database
# Check owners in pg_tablespace
# Check owners in pg_filespace
# Check owners in pg_namespace
# Check owners in pg_operator
# Check owners in pg_opclass
# ...
def checkPersistentTables():
logger.info('-----------------------------------')
logger.info('Checking persistent tables')
# Some of the persistency checks modified depending on if we are in
# change tracking or not. For instance when we are in-sync we
# shouldn't have persistent entries stuck in "drop pending", but this
# is a perfectly valid state to maintain while in change-tracking.
db = connect2(GV.cfg[1], utilityMode=False)
qry = "SELECT count(*) FROM gp_segment_configuration WHERE mode != 's'"
curs = db.query(qry)
in_sync = (curs.getresult()[0][0] == 0)
if in_sync:
logger.info("[OK] System is synchronized")
else:
logger.warn("System requires recovery via gprecoverseg")
queries = []
# Checks on FILESPACE
qname = "gp_persistent_filespace_node state check"
qry = """
SELECT p.filespace_oid,
case when p.persistent_state = 0 then 'free'
when p.persistent_state = 1 then 'create pending'
when p.persistent_state = 2 then 'created'
when p.persistent_state = 3 then 'drop pending'
when p.persistent_state = 4 then 'abort create'
when p.persistent_state = 5 then 'JIT create pending'
when p.persistent_state = 6 then 'bulk load create pending'
else 'unknown state: ' || p.persistent_state
end as persistent_state,
case when p.mirror_existence_state = 0 then 'mirror free'
when p.mirror_existence_state = 1 then 'not mirrored'
when p.mirror_existence_state = 2 then 'mirror create pending'
when p.mirror_existence_state = 3 then 'mirror created'
when p.mirror_existence_state = 4 then 'mirror down before create'
when p.mirror_existence_state = 5 then 'mirror down during create'
when p.mirror_existence_state = 6 then 'mirror drop pending'
when p.mirror_existence_state = 7 then 'mirror only drop remains'
else 'unknown state: ' || p.mirror_existence_state
end as mirror_existence_state
FROM gp_persistent_filespace_node p
WHERE p.persistent_state not in (0, 2)
or p.mirror_existence_state not in (0,1,3)
"""
if in_sync:
queries.append([qname, qry])
qname = "gp_persistent_filespace_node <=> pg_filespace"
qry = """
SELECT coalesce(f.oid, p.filespace_oid) as filespace_oid,
f.fsname as "filespace"
FROM (SELECT * FROM gp_persistent_filespace_node
WHERE persistent_state = 2) p
FULL OUTER JOIN (SELECT oid, fsname FROM pg_filespace
WHERE oid != 3052) f
ON (p.filespace_oid = f.oid)
WHERE (p.filespace_oid is NULL OR f.oid is NULL)
"""
queries.append([qname, qry])
qname = "gp_persistent_filespace_node <=> gp_global_sequence"
qry = """
SELECT p.filespace_oid, f.fsname as "filespace",
case when p.persistent_state = 0 then 'free'
when p.persistent_state = 1 then 'create pending'
when p.persistent_state = 2 then 'created'
when p.persistent_state = 3 then 'drop pending'
when p.persistent_state = 4 then 'abort create'
when p.persistent_state = 5 then 'JIT create pending'
when p.persistent_state = 6 then 'bulk load create pending'
else 'unknown state: ' || p.persistent_state
end as persistent_state,
p.persistent_serial_num, s.sequence_num
FROM gp_global_sequence s, gp_persistent_filespace_node p
LEFT JOIN pg_filespace f ON (f.oid = p.filespace_oid)
WHERE s.ctid = '(0,4)' and p.persistent_serial_num > s.sequence_num
"""
queries.append([qname, qry])
# Checks on DATABASE
qname = "gp_persistent_database_node state check"
qry = """
SELECT p.tablespace_oid, p.database_oid,
case when p.persistent_state = 0 then 'free'
when p.persistent_state = 1 then 'create pending'
when p.persistent_state = 2 then 'created'
when p.persistent_state = 3 then 'drop pending'
when p.persistent_state = 4 then 'abort create'
when p.persistent_state = 5 then 'JIT create pending'
when p.persistent_state = 6 then 'bulk load create pending'
else 'unknown state: ' || p.persistent_state
end as persistent_state,
case when p.mirror_existence_state = 0 then 'mirror free'
when p.mirror_existence_state = 1 then 'not mirrored'
when p.mirror_existence_state = 2 then 'mirror create pending'
when p.mirror_existence_state = 3 then 'mirror created'
when p.mirror_existence_state = 4 then 'mirror down before create'
when p.mirror_existence_state = 5 then 'mirror down during create'
when p.mirror_existence_state = 6 then 'mirror drop pending'
when p.mirror_existence_state = 7 then 'mirror only drop remains'
else 'unknown state: ' || p.mirror_existence_state
end as mirror_existence_state
FROM gp_persistent_database_node p
WHERE p.persistent_state not in (0, 2)
or p.mirror_existence_state not in (0,1,3)
"""
if in_sync:
queries.append([qname, qry])
qname = "gp_persistent_database_node <=> pg_database"
qry = """
SELECT coalesce(d.oid, p.database_oid) as database_oid,
d.datname as database
FROM (SELECT * FROM gp_persistent_database_node
WHERE persistent_state = 2) p
FULL OUTER JOIN pg_database d
ON (d.oid = p.database_oid)
WHERE (d.datname is null or p.database_oid is null)
"""
queries.append([qname, qry])
qname = "gp_persistent_database_node <=> pg_tablespace"
qry = """
SELECT coalesce(t.oid, p.database_oid) as database_oid,
t.spcname as tablespace
FROM (SELECT * FROM gp_persistent_database_node
WHERE persistent_state = 2) p
LEFT OUTER JOIN (SELECT oid, spcname FROM pg_tablespace
WHERE oid != 1664) t
ON (t.oid = p.tablespace_oid)
WHERE t.spcname is null
"""
queries.append([qname, qry])
qname = "gp_persistent_database_node <=> gp_global_sequence"
qry = """
SELECT p.database_oid, p.tablespace_oid, d.datname as "database",
case when p.persistent_state = 0 then 'free'
when p.persistent_state = 1 then 'create pending'
when p.persistent_state = 2 then 'created'
when p.persistent_state = 3 then 'drop pending'
when p.persistent_state = 4 then 'abort create'
when p.persistent_state = 5 then 'JIT create pending'
when p.persistent_state = 6 then 'bulk load create pending'
else 'unknown state: ' || p.persistent_state
end as persistent_state,
p.persistent_serial_num, s.sequence_num
FROM gp_global_sequence s, gp_persistent_database_node p
LEFT JOIN pg_database d ON (d.oid = p.database_oid)
WHERE s.ctid = '(0,2)' and p.persistent_serial_num > s.sequence_num
"""
queries.append([qname, qry])
# Checks on TABLESPACE
qname = "gp_persistent_tablespace_node state check"
qry = """
SELECT p.filespace_oid, p.tablespace_oid,
case when p.persistent_state = 0 then 'free'
when p.persistent_state = 1 then 'create pending'
when p.persistent_state = 2 then 'created'
when p.persistent_state = 3 then 'drop pending'
when p.persistent_state = 4 then 'abort create'
when p.persistent_state = 5 then 'JIT create pending'
when p.persistent_state = 6 then 'bulk load create pending'
else 'unknown state: ' || p.persistent_state
end as persistent_state,
case when p.mirror_existence_state = 0 then 'mirror free'
when p.mirror_existence_state = 1 then 'not mirrored'
when p.mirror_existence_state = 2 then 'mirror create pending'
when p.mirror_existence_state = 3 then 'mirror created'
when p.mirror_existence_state = 4 then 'mirror down before create'
when p.mirror_existence_state = 5 then 'mirror down during create'
when p.mirror_existence_state = 6 then 'mirror drop pending'
when p.mirror_existence_state = 7 then 'mirror only drop remains'
else 'unknown state: ' || p.mirror_existence_state
end as mirror_existence_state
FROM gp_persistent_tablespace_node p
WHERE p.persistent_state not in (0, 2)
or p.mirror_existence_state not in (0,1,3)
"""
if in_sync:
queries.append([qname, qry])
qname = "gp_persistent_tablespace_node <=> pg_tablespace"
qry = """
SELECT coalesce(t.oid, p.tablespace_oid) as tablespace_oid,
t.spcname as tablespace
FROM (SELECT * FROM gp_persistent_tablespace_node
WHERE persistent_state = 2) p
FULL OUTER JOIN (
SELECT oid, spcname FROM pg_tablespace WHERE oid not in (1663, 1664)
) t ON (t.oid = p.tablespace_oid)
WHERE t.spcname is null or p.tablespace_oid is null
"""
queries.append([qname, qry])
qname = "gp_persistent_tablespace_node <=> pg_filespace"
qry = """
SELECT p.filespace_oid, f.fsname as "filespace"
FROM (SELECT * FROM gp_persistent_tablespace_node
WHERE persistent_state = 2) p
LEFT OUTER JOIN pg_filespace f
ON (f.oid = p.filespace_oid)
WHERE f.fsname is null
"""
queries.append([qname, qry])
qname = "gp_persistent_tablespace_node <=> gp_global_sequence"
qry = """
SELECT p.filespace_oid, p.tablespace_oid, t.spcname as "tablespace",
case when p.persistent_state = 0 then 'free'
when p.persistent_state = 1 then 'create pending'
when p.persistent_state = 2 then 'created'
when p.persistent_state = 3 then 'drop pending'
when p.persistent_state = 4 then 'abort create'
when p.persistent_state = 5 then 'JIT create pending'
when p.persistent_state = 6 then 'bulk load create pending'
else 'unknown state: ' || p.persistent_state
end as persistent_state,
p.persistent_serial_num, s.sequence_num
FROM gp_global_sequence s, gp_persistent_tablespace_node p
LEFT JOIN pg_tablespace t ON (t.oid = p.tablespace_oid)
WHERE s.ctid = '(0,3)' and p.persistent_serial_num > s.sequence_num
"""
queries.append([qname, qry])
# Checks on RELATION
qname = "gp_persistent_relation_node state check"
qry = """
SELECT p.tablespace_oid, p.relfilenode_oid, p.segment_file_num,
case when p.persistent_state = 0 then 'free'
when p.persistent_state = 1 then 'create pending'
when p.persistent_state = 2 then 'created'
when p.persistent_state = 3 then 'drop pending'
when p.persistent_state = 4 then 'abort create'
when p.persistent_state = 5 then 'JIT create pending'
when p.persistent_state = 6 then 'bulk load create pending'
else 'unknown state: ' || p.persistent_state
end as persistent_state,
case when p.mirror_existence_state = 0 then 'mirror free'
when p.mirror_existence_state = 1 then 'not mirrored'
when p.mirror_existence_state = 2 then 'mirror create pending'
when p.mirror_existence_state = 3 then 'mirror created'
when p.mirror_existence_state = 4 then 'mirror down before create'
when p.mirror_existence_state = 5 then 'mirror down during create'
when p.mirror_existence_state = 6 then 'mirror drop pending'
when p.mirror_existence_state = 7 then 'mirror only drop remains'
else 'unknown state: ' || p.mirror_existence_state
end as mirror_existence_state
FROM gp_persistent_relation_node p
WHERE (p.persistent_state not in (0, 2)
or p.mirror_existence_state not in (0,1,3))
and p.database_oid in (
SELECT oid FROM pg_database WHERE datname = current_database()
)
"""
if in_sync:
queries.append([qname, qry])
qname = "gp_persistent_relation_node <=> pg_tablespace"
qry = """
SELECT distinct p.tablespace_oid
FROM (SELECT * FROM gp_persistent_relation_node
WHERE persistent_state = 2
AND database_oid in (
SELECT oid FROM pg_database
WHERE datname = current_database()
UNION ALL
SELECT 0)) p
LEFT OUTER JOIN pg_tablespace t
ON (t.oid = p.tablespace_oid)
WHERE t.oid is null
"""
queries.append([qname, qry])
qname = "gp_persistent_relation_node <=> pg_database"
qry = """
SELECT datname, oid, count(*)
FROM (
SELECT d.datname as datname, p.database_oid as oid
FROM (SELECT * FROM gp_persistent_relation_node
WHERE database_oid != 0 and persistent_state = 2
) p
full outer join pg_database d ON (d.oid = p.database_oid)
) x
GROUP BY 1,2
HAVING datname is null or oid is null or count(*) < 100
"""
queries.append([qname, qry])
qname = "gp_persistent_relation_node <=> gp_relation_node"
qry = """
SELECT coalesce(p.relfilenode_oid, r.relfilenode_oid) as relfilenode,
p.ctid, r.persistent_tid
FROM (
SELECT p.ctid, p.* FROM gp_persistent_relation_node p
WHERE persistent_state = 2 AND p.database_oid in (
SELECT oid FROM pg_database WHERE datname = current_database()
UNION ALL
SELECT 0
)
) p
FULL OUTER JOIN gp_relation_node r
ON (p.relfilenode_oid = r.relfilenode_oid and
p.segment_file_num = r.segment_file_num)
WHERE (p.relfilenode_oid is NULL OR
r.relfilenode_oid is NULL OR
p.ctid != r.persistent_tid)
"""
queries.append([qname, qry])
qname = "gp_persistent_relation_node <=> pg_class"
qry = """
SELECT coalesce(p.relfilenode_oid, c.relfilenode) as relfilenode,
c.nspname, c.relname, c.relkind, c.relstorage
FROM (
SELECT * FROM gp_persistent_relation_node
WHERE persistent_state = 2 AND database_oid in (
SELECT oid FROM pg_database WHERE datname = current_database()
UNION ALL
SELECT 0
)
) p
FULL OUTER JOIN (
SELECT n.nspname, c.relname, c.relfilenode, c.relstorage, c.relkind
FROM pg_class c
LEFT OUTER JOIN pg_namespace n ON (c.relnamespace = n.oid)
WHERE c.relstorage not in ('v', 'x', 'f')
) c ON (p.relfilenode_oid = c.relfilenode)
WHERE p.relfilenode_oid is NULL OR c.relfilenode is NULL
"""
queries.append([qname, qry])
qname = "gp_persistent_relation_node <=> gp_global_sequence"
qry = """
SELECT p.tablespace_oid, p.database_oid, p.relfilenode_oid,
p.segment_file_num,
case when p.persistent_state = 0 then 'free'
when p.persistent_state = 1 then 'create pending'
when p.persistent_state = 2 then 'created'
when p.persistent_state = 3 then 'drop pending'
when p.persistent_state = 4 then 'abort create'
when p.persistent_state = 5 then 'JIT create pending'
when p.persistent_state = 6 then 'bulk load create pending'
else 'unknown state: ' || p.persistent_state
end as persistent_state,
p.persistent_serial_num, s.sequence_num
FROM gp_global_sequence s, gp_persistent_relation_node p
LEFT JOIN pg_tablespace t ON (t.oid = p.tablespace_oid)
WHERE s.ctid = '(0,1)' and p.persistent_serial_num > s.sequence_num
"""
queries.append([qname, qry])
# Look for extra/missing files in the filesystem
#
# Note: heap tables only ever store segment_file_num 0 in the persistent
# tables, while ao/co tables will store every segment_file_num that they
# use. This results in the segment_file_num/relstorage where clause.
qname = "gp_persistent_relation_node <=> filesystem"
qry = """
SELECT coalesce(a.tablespace_oid, b.tablespace_oid) as tablespace_oid,
coalesce(a.database_oid, b.database_oid) as database_oid,
coalesce(a.relfilenode_oid, b.relfilenode_oid) as relfilenode_oid,
coalesce(a.segment_file_num, b.segment_file_num) as segment_file_num,
a.relfilenode_oid is null as filesystem,
b.relfilenode_oid is null as persistent,
b.relkind, b.relstorage
FROM gp_persistent_relation_node a
FULL OUTER JOIN (
SELECT p.*, c.relkind, c.relstorage
FROM gp_persistent_relation_node_check() p
LEFT OUTER JOIN pg_class c
ON (p.relfilenode_oid = c.relfilenode)
WHERE (p.segment_file_num = 0 or c.relstorage != 'h')
) b ON (a.tablespace_oid = b.tablespace_oid and
a.database_oid = b.database_oid and
a.relfilenode_oid = b.relfilenode_oid and
a.segment_file_num = b.segment_file_num)
WHERE (a.relfilenode_oid is null OR
(a.persistent_state = 2 and b.relfilenode_oid is null)) and
coalesce(a.database_oid, b.database_oid) in (
SELECT oid FROM pg_database WHERE datname = current_database()
UNION ALL
SELECT 0
);
"""
queries.append([qname, qry])
# Look for databases in the filesystem that have been dropped but still
# have dangling files.
qname = "pg_database <=> filesystem"
qry = """
SELECT tablespace_oid, database_oid, count(*)
FROM gp_persistent_relation_node_check() p
LEFT OUTER JOIN pg_database d
ON (p.database_oid = d.oid)
WHERE d.oid is null and database_oid != 0
GROUP BY tablespace_oid, database_oid;
"""
queries.append([qname, qry])
# Phew, that was a lot of queries, eh?
# now execute them on every segment.
for (qname, qry) in queries:
err = connect2run(qry)
if not err:
logger.info('[OK] ' + qname)
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.info('[FAIL] ' + qname)
logger.error('%s found %d issue(s)' % (qname, len(err)))
logger.error(qry)
# Report at most 100 rows per segment, for brevity
last = None
count = 0
for e in err:
cfg = e[0]
col = e[1]
row = e[2]
# If this is a new host start again
if last != cfg:
count = 0
last = cfg
if count == 100:
logger.error("...")
count += 1
if count > 100:
continue
# Actual formatting could be prettied up more
if count == 0:
logger.error("--------")
logger.error("%s:%s:%s" % (cfg['hostname'],
cfg['port'],
cfg['datadir']))
logger.error(" " + " | ".join(map(str, col)))
logger.error(" " + " | ".join([str(row[x]) for x in col]))
count += 1
return
def closeDbs():
for key, conns in GV.db.iteritems():
db = conns[0]
db.close()
GV.db = {} # remove everything
#-------------------------------------------------------------------------------
def getCatObj(namestr):
db = connect2(GV.cfg[1], utilityMode=False)
try:
cat = GV.catalog.getCatalogTable(namestr)
except Exception, e:
myprint("No such catalog table: %s\n" % str(namestr))
raise
return cat
#-------------------------------------------------------------------------------
def checkACL():
logger.info('-----------------------------------')
logger.info('Performing cross database ACL tests')
# looks up information in the catalog:
tables = GV.catalog.getCatalogTables()
for cat in sorted(tables):
checkTableACL(cat)
#-------------------------------------------------------------------------------
def checkTableACL(cat):
catname = cat.getTableName()
pkey = cat.getPrimaryKey()
master = cat.isMasterOnly()
isShared = cat.isShared()
acl = cat.getTableAcl()
if GV.aclStatus == None:
GV.aclStatus = True
# Skip:
# - master only tables
# - tables without a primary key
# - tables without acls
if master or pkey == [] or acl == None:
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# Comparing ACLs cannot be done with a simple equality comparison
# since it is valid for the ACLs to have different order. Instead
# we compare that both acls are subsets of each other => equality.
mPkey = ['m.'+i for i in pkey]
if cat.tableHasConsistentOids():
mPkey = ['m.oid']
qry = """
SELECT s.gp_segment_id as segid, {mPkey},
m.{acl} as master_acl, s.{acl} as segment_acl
FROM {catalog} m
JOIN gp_dist_random('{catalog}') s using ({primary_key})
WHERE not (m.{acl} @> s.{acl} and s.{acl} @> m.{acl}) or
(m.{acl} is null and s.{acl} is not null) or
(s.{acl} is null and m.{acl} is not null)
ORDER BY {primary_key}, s.gp_segment_id
""".format(catalog = catname,
primary_key = ', '.join(pkey),
mPkey = ', '.join(mPkey),
acl = acl)
# Execute the query
try:
db = connect2(GV.cfg[1], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Cross consistency acl check for ' + catname)
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
GV.aclStatus = False
logger.info('[FAIL] Cross consistency acl check for ' + catname)
logger.error(' %s acl check has %d issue(s)' % (catname, nrows))
# logger.error(qry)
fields = curs.listfields()
log_literal(logger,logging.ERROR," " + " | ".join(fields))
for row in curs.getresult():
log_literal(logger,logging.ERROR," " + " | ".join(map(str, row)))
processACLResult(catname, fields, curs.getresult())
except Exception, e:
setError(ERROR_NOREPAIR)
GV.aclStatus = False
myprint('[ERROR] executing: Cross consistency check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
#-------------------------------------------------------------------------------
def checkForeignKey():
logger.info('-----------------------------------')
logger.info('Performing foreign key tests')
if GV.foreignKeyStatus == None:
GV.foreignKeyStatus = True
# looks up information in the catalog:
tables = GV.catalog.getCatalogTables()
for cat in sorted(tables):
checkTableForeignKey(cat)
#-------------------------------------------------------------------------------
def checkTableForeignKey(cat):
catname = cat.getTableName()
fkeylist = cat.getForeignKeys()
isShared = cat.isShared()
pkeylist = cat.getPrimaryKey()
coltypes = cat.getTableColtypes()
# skip tables without fkey
if len(fkeylist) <= 0:
return
if len(cat.getPrimaryKey()) <= 0:
return
# skip these master-only tables
skipped_masteronly = ['gp_relation_node', 'pg_description', 'pg_shdescription',
'pg_stat_last_operation', 'pg_stat_last_shoperation', 'pg_statistic']
if catname in skipped_masteronly:
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# primary key lists
cat1pkeys = []
pkeys = []
# build array of catalog primary keys (with aliases) and
# primary key alias list
for pk in pkeylist:
cat1pkeys.append('cat1.' + pk + ' as %s_%s' % (catname,pk))
pkeys.append('%s_%s' % (catname,pk))
logger.info('Building %d queries to check FK constraint on table %s' % (len(fkeylist),catname))
for fkeydef in fkeylist:
castedFkey = [c+autoCast.get(coltypes[c],'') for c in fkeydef.getColumns()]
fkeystr = ', '.join(castedFkey)
cat1fkeystr = 'cat1.' + ', cat1.'.join(fkeydef.getColumns())
pkeystr = ', '.join(fkeydef.getPKey())
pkcatname = fkeydef.getPkeyTableName()
qry = fkQuery(catname, pkcatname, fkeystr, pkeystr, pkeys, cat1pkeys)
# Execute the query
try:
db = connect2(GV.cfg[1], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Foreign key check for %s(%s) referencing %s(%s)'%
(catname, fkeystr, pkcatname, pkeystr))
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
GV.foreignKeyStatus = False
logger.info('[FAIL] Foreign key check for %s(%s) referencing %s(%s)'%
(catname, fkeystr, pkcatname, pkeystr))
logger.error(' %s has %d issue(s): entry has NULL reference of %s(%s)'%
(catname, nrows, pkcatname, pkeystr))
fields = curs.listfields()
log_literal(logger,logging.ERROR," " + " | ".join(fields))
for row in curs.getresult():
log_literal(logger,logging.ERROR," " + " | ".join(map(str, row)))
processForeignKeyResult(catname, pkcatname, fields, curs.getresult())
if catname == 'gp_fastsequence' and pkcatname == 'pg_class':
setError(ERROR_REMOVE)
removeFastSequence(db)
except Exception, e:
setError(ERROR_NOREPAIR)
GV.foreignKeyStatus = False
myprint('[ERROR] executing: Foreign key check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
#-------------------------------------------------------------------------------
def fkQuery(catname, pkcatname, fkeystr, pkeystr, pkeys, cat1pkeys):
qry = """
SELECT {primary_key_alias}, {cat2_dot_pk},
array_agg(gp_segment_id order by gp_segment_id) as segids
FROM (
SELECT cat1.gp_segment_id, {cat1_dot_pk}, cat1.{FK1} as {cat2_dot_pk}
FROM
gp_dist_random('{CATALOG1}') cat1 LEFT OUTER JOIN
gp_dist_random('{CATALOG2}') cat2
ON (cat1.gp_segment_id = cat2.gp_segment_id AND
cat1.{FK1} = cat2.{PK2} )
WHERE cat2.{PK2} is NULL
AND cat1.{FK1} != 0
UNION ALL
SELECT -1 as gp_segment_id, {cat1_dot_pk}, cat1.{FK1} as {cat2_dot_pk}
FROM
{CATALOG1} cat1 LEFT OUTER JOIN
{CATALOG2} cat2
ON (cat1.gp_segment_id = cat2.gp_segment_id AND
cat1.{FK1} = cat2.{PK2} )
WHERE cat2.{PK2} is NULL
AND cat1.{FK1} != 0
ORDER BY {primary_key_alias}, gp_segment_id
) allresults
GROUP BY {primary_key_alias}, {cat2_dot_pk}
""".format(FK1 = fkeystr,
PK2 = pkeystr,
CATALOG1 = catname,
CATALOG2 = pkcatname,
cat1_dot_pk = ', '.join(cat1pkeys) ,
cat2_dot_pk = '%s_%s' % (pkcatname,pkeystr),
primary_key_alias = ', '.join(pkeys))
return qry
#-------------------------------------------------------------------------------
def checkMissingEntry():
logger.info('-----------------------------------')
logger.info('Performing cross consistency tests: check for missing or extraneous entries')
if GV.missingEntryStatus == None:
GV.missingEntryStatus = True
# looks up information in the catalog:
tables = GV.catalog.getCatalogTables()
for cat in sorted(tables):
checkTableMissingEntry(cat)
#-------------------------------------------------------------------------------
def checkTableMissingEntry(cat):
catname = cat.getTableName()
pkey = cat.getPrimaryKey()
master = cat.isMasterOnly()
isShared = cat.isShared()
coltypes = cat.getTableColtypes()
# Skip master only tables
if master:
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# Skip catalogs without primary key
if pkey == []:
logger.warn("[WARN] Skipped missing/extra entry check for %s" %
catname)
return
castedPkey = cat.getPrimaryKey()
castedPkey = [ c+autoCast.get(coltypes[c],'') for c in castedPkey ]
if cat.tableHasConsistentOids():
qry = missingEntryQuery(GV.max_content, catname, ['oid'], ['oid'])
else:
qry = missingEntryQuery(GV.max_content, catname, pkey, castedPkey)
# Execute the query
try:
db = connect2(GV.cfg[1], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Checking for missing or extraneous entries for ' + catname)
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
GV.missingEntryStatus = False
logger.info('[FAIL] Checking for missing or extraneous entries for ' + catname)
logger.error(' %s has %d issue(s)' % (catname, nrows))
fields = curs.listfields()
log_literal(logger,logging.ERROR," " + " | ".join(fields))
for row in curs.getresult():
log_literal(logger,logging.ERROR," " + " | ".join(map(str, row)))
processMissingDuplicateEntryResult(catname, fields, curs.getresult(), "missing")
except Exception, e:
setError(ERROR_NOREPAIR)
GV.missingEntryStatus = False
myprint('[ERROR] executing: Missing or extraneous entries check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
#-------------------------------------------------------------------------------
def missingEntryQuery(max_content, catname, pkey, castedPkey):
# =================
# Missing / Extra
# =================
# Cross product
# (all unique {primary_key} present on any segment)
# (all unique segment_ids in the system)
# Left join
# (actual ({primary_key}, segment_id) present in the catalog)
#
# Count number not null vs number null in the join:
# If the nulls are <= 1/2 the result report those segments as "missing"
# If the non-nulls are <= 1/2 the result report those segments as "extra"
qry = """
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- distribute catalog table from master, so that we can avoid to gather
CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS
SELECT gp_segment_id segid, {primary_key} FROM {catalog};
SELECT {oidcasted_pk}, exists, array_agg(segid order by segid) as segids
FROM
(
SELECT ideal.*,
case when actual.segid is null then 'missing' else 'extra' end as exists,
count(*) over (partition by {primary_key}, actual.segid is null) as subcount
FROM (
SELECT segid, {primary_key}
FROM( SELECT distinct {primary_key} FROM _tmp_master
UNION
SELECT distinct {primary_key} FROM gp_dist_random('{catalog}') ) all_pks,
( SELECT distinct content as segid from gp_segment_configuration) all_segs
) ideal
LEFT OUTER JOIN
( SELECT segid, {primary_key} FROM _tmp_master
UNION ALL
SELECT gp_segment_id as segid, {primary_key} FROM gp_dist_random('{catalog}')
) actual USING (segid, {primary_key})
) missing_extra
WHERE subcount <= ({max_content}+2)/2.0
GROUP BY {oidcasted_pk}, exists;
""".format(oidcasted_pk = ','.join(castedPkey),
primary_key = ','.join(pkey),
catalog = catname,
max_content = max_content)
return qry
def transformTextArrayCols(catname, columns, colnames, coltypes):
# Distributing by hash(text[]) is potentially dangerous,
# so if the table has text[] column, then we need array_to_string
#
# We should fix the text[] hash expression issue, but
# for this script, this seems like a reasonable workaround.
transformed = []
for i in range(len(columns)):
if coltypes[colnames[i]] == '_text':
transformed.append("array_to_string(" + columns[i] + ", ',')")
else:
transformed.append(columns[i])
return transformed
#-------------------------------------------------------------------------------
def checkInconsistentEntry():
logger.info('-----------------------------------')
logger.info('Performing cross consistency test: check for inconsistent entries')
if GV.inconsistentEntryStatus == None:
GV.inconsistentEntryStatus = True
# looks up information in the catalog:
tables = GV.catalog.getCatalogTables()
for cat in sorted(tables):
checkTableInconsistentEntry(cat)
#-------------------------------------------------------------------------------
def checkTableInconsistentEntry(cat):
catname = cat.getTableName()
pkey = cat.getPrimaryKey()
master = cat.isMasterOnly()
isShared = cat.isShared()
columns = cat.getTableColumns(with_acl=False)
coltypes = cat.getTableColtypes()
# Skip master only tables
if master:
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# Skip catalogs without primary key
if pkey == []:
logger.warn("[WARN] Skipped cross consistency check for %s" %
catname)
return
castedPkey = cat.getPrimaryKey()
castedPkey = [ c+autoCast.get(coltypes[c],'') for c in castedPkey ]
castcols = [ c+autoCast.get(coltypes[c],'') for c in columns ]
# transform text[] cols for sure. See comments on transformTextArrayCols
castcols = transformTextArrayCols(catname, castcols, columns, cat.getTableColtypes())
castcols = [castcols[i] + ' AS ' + columns[i] for i in range(len(columns))]
if cat.tableHasConsistentOids():
qry = inconsistentEntryQuery(GV.max_content, catname, ['oid'], columns, castcols)
else:
qry = inconsistentEntryQuery(GV.max_content, catname, castedPkey, columns, castcols)
# Execute the query
try:
db = connect2(GV.cfg[1], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Checking for inconsistent entries for ' + catname)
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
GV.inconsistentEntryStatus = False
logger.info('[FAIL] Checking for inconsistent entries for ' + catname)
logger.error(' %s has %d issue(s)' % (catname, nrows))
fields = curs.listfields()
log_literal(logger,logging.ERROR," " + " | ".join(fields))
for row in curs.getresult():
log_literal(logger,logging.ERROR," " + " | ".join(map(str, row)))
processInconsistentEntryResult(catname, pkey, fields, curs.getresult())
except Exception, e:
setError(ERROR_NOREPAIR)
GV.inconsistentEntryStatus = False
myprint('[ERROR] executing: Inconsistent entries check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
#-------------------------------------------------------------------------------
def inconsistentEntryQuery(max_content, catname, pkey, columns, castcols):
#-- ==============
#-- Inconsistent
#-- ==============
#-- Build set of all values in the instance
#--
#-- Count number of unique values for the primary key
#-- - Ignore rows where this does not equal number of segments
#-- (These are the missing/extra rows and are covered by other checks)
#--
#-- Count number of unique values for all columns
#-- - Ignore rows where this equals number of segments
#--
#-- Group the majority opinion into one bucket and report everyone who disagrees with
#-- the majority individually.
#--
#-- Majority gets reported with gp_segment_id == null
#--
#-- SET GUC due to MPP-14531:
qry = """
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET gp_enable_mk_sort=off;
-- distribute catalog table from master, so that we can avoid to gather
CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS
SELECT gp_segment_id segid, {columns} FROM {catalog};
SELECT {columns}, array_agg(gp_segment_id order by gp_segment_id) as segids
FROM
(
SELECT DISTINCT
case when dcount <= ({max_content}+2)/2.0 then gp_segment_id else null end as gp_segment_id, {columns}
FROM
(
select count(*) over (partition by {columns}) as dcount,
count(*) over (partition by {pkey}) as pcount,
gp_segment_id, {columns}
from (
select segid gp_segment_id, {castcols} from _tmp_master
union all
select gp_segment_id, {castcols}
from gp_dist_random('{catalog}')
) all_segments
) counted_segments
WHERE dcount != {max_content}+2 and pcount = {max_content}+2
ORDER BY {pkey}, gp_segment_id
) rowresult
GROUP BY {columns}
ORDER BY {pkey}, segids;
""".format(pkey = ','.join(pkey),
catalog = catname,
columns = ','.join(columns),
castcols = ','.join(castcols),
max_content = max_content)
return qry
#-------------------------------------------------------------------------------
def checkDuplicateEntry():
logger.info('-----------------------------------')
logger.info('Performing test: checking for duplicate entries')
# looks up information in the catalog:
tables = GV.catalog.getCatalogTables()
for cat in sorted(tables):
checkTableDuplicateEntry(cat)
#-------------------------------------------------------------------------------
def checkTableDuplicateEntry(cat):
catname = cat.getTableName()
pkey = cat.getPrimaryKey()
master = cat.isMasterOnly()
isShared = cat.isShared()
columns = cat.getTableColumns(with_acl=False)
coltypes = cat.getTableColtypes()
# Skip master only tables
if master:
return
# skip shared/non-shared tables
if GV.opt['-S']:
if re.match("none", GV.opt['-S'], re.I) and isShared:
return
if re.match("only", GV.opt['-S'], re.I) and not isShared:
return
# Skip catalogs without primary key
if pkey == []:
logger.warn("[WARN] Skipped duplicate check for %s" %
catname)
return
pkey = [ c+autoCast.get(coltypes[c],'') for c in pkey ]
if cat.tableHasConsistentOids():
qry = duplicateEntryQuery(catname, ['oid'])
else:
qry = duplicateEntryQuery(catname, pkey)
# Execute the query
try:
db = connect2(GV.cfg[1], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Checking for duplicate entries for ' + catname)
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.error('[FAIL] Checking for duplicate entries for ' + catname)
logger.error(' %s has %d issue(s)' % (catname, nrows))
fields = curs.listfields()
log_literal(logger,logging.ERROR," " + " | ".join(fields))
for row in curs.getresult():
log_literal(logger,logging.ERROR," " + " | ".join(map(str, row)))
processMissingDuplicateEntryResult(catname, fields, curs.getresult(), "duplicate")
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing: duplicate entries check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
#-------------------------------------------------------------------------------
def duplicateEntryQuery(catname, pkey):
#-- ===========
#-- Duplicate
#-- ===========
#-- Return any rows having count > 1 for a given segid, {unique_key} pair
#--
qry = """
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- distribute catalog table from master, so that we can avoid to gather
CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS
SELECT gp_segment_id segid, {pkey} FROM {catalog};
SELECT {pkey}, total, array_agg(segid order by segid) as segids
FROM (
SELECT segid, {pkey}, count(*) as total
FROM (
select segid, {pkey} FROM _tmp_master
union all
select gp_segment_id as segid, {pkey} FROM gp_dist_random('{catalog}')
) all_segments
GROUP BY segid, {pkey}
HAVING count(*) > 1
) rowresult
GROUP BY {pkey}, total
""".format(catalog = catname,
pkey = ','.join(pkey))
return qry
#-------------------------------------------------------------------------------
def checkDuplicatePersistentEntry():
"""
MPP-18178: check duplicate entries in gp_persistent_relation_node
have the same: tablespace_oid, database_oid, relfilenode_oid, segment_file_num
except mirror_existence_state != 6 (MIRROR DROP PENDING)
"""
catname = 'gp_persistent_relation_node'
fields = ['tablespace_oid', 'database_oid', 'relfilenode_oid', 'segment_file_num']
excol = 'mirror_existence_state'
state = 6
qry = duplicatePersistentEntryQuery(catname, fields, excol, state)
try:
db = connect2(GV.cfg[1], utilityMode=False)
curs = db.query(qry)
nrows = curs.ntuples()
if nrows == 0:
logger.info('[OK] Checking for duplicate persistent entries for ' + catname)
else:
GV.testStatus = False
setError(ERROR_NOREPAIR)
logger.error('[FAIL] Checking for duplicate persistent entries for ' + catname)
logger.error(' %s has %d issue(s)' % (catname, nrows))
fields = curs.listfields()
log_literal(logger, logging.ERROR, " " + " | ".join(fields))
for row in curs.getresult():
log_literal(logger, logging.ERROR, " " + " | ".join(map(str, row)))
processMissingDuplicateEntryResult(catname, fields, curs.getresult(), "duplicate")
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] Executing: duplicate persistent entries check for ' + catname)
myprint(' Execution error: ' + str(e))
myprint(qry)
def duplicatePersistentEntryQuery(catname, pkey, excol, state):
"""
Check for duplicate persistent table entries:
the query is similar to the duplicate catalog entries query, except an extra condition
and the check is limited to the connected database for reporting purposes
"""
qry = """
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- distribute catalog table from master, so that we can avoid to gather
CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS
SELECT gp_segment_id segid, {pkey}, {excol} FROM {catalog};
SELECT {pkey}, total, array_agg(segid order by segid) as segids
FROM (
SELECT segid, {pkey}, count(*) as total
FROM (
select segid, {pkey} from _tmp_master
where {excol} != {state}
union all
select gp_segment_id as segid, {pkey} from gp_dist_random('{catalog}')
where {excol} != {state}
) all_segments
LEFT OUTER JOIN pg_database d ON (all_segments.database_oid = d.oid)
WHERE d.datname = '{dbname}'
GROUP BY segid, {pkey}
HAVING count(*) > 1
) rowresult
GROUP BY {pkey}, total
""".format(catalog=catname, pkey=','.join(pkey), excol=excol, state=state, dbname=GV.dbname)
return qry
############################################################################
# version = X.X : run this test as part of running all tests on gpdb <= X.X
# individually, any test should be ok run on any version
# online = True: okie to run gpcheckcat online
# order = X : the order test should be run when running all tests
############################################################################
name_test = {
"duplicate":
{
"description": "Check for duplicate entries",
"fn": lambda: checkDuplicateEntry(),
"version": 'main',
"order": 1,
"online": True
},
"missing_extraneous":
{
"description": "Cross consistency check for missing or extraneous entries",
"fn": lambda: checkMissingEntry(),
"version": 'main',
"order": 2,
"online": True
},
"inconsistent":
{
"description": "Cross consistency check for master segment inconsistency",
"fn": lambda: checkInconsistentEntry(),
"version": 'main',
"order": 3,
"online": True
},
"foreign_key":
{
"description": "Check foreign keys",
"fn": lambda: checkForeignKey(),
"version": 'main',
"order": 4,
"online": True
},
"acl":
{
"description": "Cross consistency check for access control privileges",
"fn": lambda: checkACL(),
"version": 'main',
"order": 5,
"online": True
},
"persistent":
{
"description": "Check persistent tables",
"fn": lambda: checkPersistentTables(),
"version": 'main',
"order": 6,
"online": False
},
"pgclass":
{
"description": "Check pg_class entry that does not have any correspond pg_attribute entry",
"fn": lambda: checkPGClass(),
"version": 'main',
"order": 7,
"online": False
},
"namespace":
{
"description": "Check for leaked temporary schema and missing schema definition",
"fn": lambda: checkPGNamespace(),
"version": 'main',
"order": 8,
"online": False
},
"distribution_policy":
{
"description": "Check constraints on randomly distributed tables",
"fn": lambda: checkDistribPolicy(),
"version": 'main',
"order": 9,
"online": False
},
"dependency":
{
"description": "Check for dependency on non-existent objects",
"fn": lambda: checkDepend(),
"version": 'main',
"order": 10,
"online": False
},
"owner":
{
"description": "Check table ownership that is inconsistent with the master database",
"fn": lambda: checkOwners(),
"version": 'main',
"order": 11,
"online": True
},
"part_integrity":
{
"description": "Check pg_partition branch integrity, partition with oids, partition distribution policy",
"fn": lambda: checkPartitionIntegrity(),
"version": 'main',
"order": 13,
"online": True
},
"part_constraint":
{
"description": "Check constraints on partitioned tables",
"fn": lambda: checkPartitionRegularity(),
"version": 'main',
"order": 14,
"online": True
},
"duplicate_persistent":
{
"description": "Check for duplicate gp_persistent_relation_node entries",
"fn": lambda: checkDuplicatePersistentEntry(),
"version": 'main',
"order": 16,
"online": True
}
}
#-------------------------------------------------------------------------------
def listAllTests():
myprint('\nList of gpcheckcat tests:\n')
for name in sorted(name_test, key=lambda x: name_test[x]["order"]):
myprint(" %24s: %s" % \
(name, name_test[name]["description"]))
myprint('')
#-------------------------------------------------------------------------------
def runTestCatname(cat):
tests = {'missing_extraneous': lambda: checkTableMissingEntry(cat),
'inconsistent': lambda: checkTableInconsistentEntry(cat),
'foreign_key': lambda: checkTableForeignKey(cat),
'duplicate': lambda: checkTableDuplicateEntry(cat),
'acl': lambda: checkTableACL(cat)}
for t in sorted(tests):
myprint("Performing test '%s' for %s" % (t,cat.getTableName()))
stime = time.time()
tests[t]()
etime = time.time()
elapsed = etime - stime
GV.elapsedTime += elapsed
elapsed = str(datetime.timedelta(seconds=elapsed))[:-4]
GV.totalTestRun += 1
myprint('Total runtime for this test: %s' % (elapsed))
#-------------------------------------------------------------------------------
def runOneTest(name):
if not name_test.has_key(name):
logger.info("'%s' is not a valid test" % (name))
raise LookupError
if GV.opt['-O'] and not name_test[name]["online"]:
logger.info("%s: Skip this test in online mode" % (name))
return
else:
myprint("Performing test '%s'" % name)
GV.totalTestRun += 1
GV.testStatus = True
stime = time.time()
name_test[name]["fn"]()
etime = time.time()
elapsed = etime - stime
GV.elapsedTime += elapsed
elapsed = str(datetime.timedelta(seconds=elapsed))[:-4]
myprint('Total runtime for this test: %s' % (elapsed))
if GV.testStatus == False:
GV.failedTest.append(name)
#-------------------------------------------------------------------------------
def runAllTests():
'''
perform catalog check for specified database
'''
for name in sorted(name_test, key=lambda x: name_test[x]["order"]):
if name_test[name]["version"] >= GV.version:
runOneTest(name)
closeDbs()
logger.info("------------------------------------")
fixes = (len(GV.Remove) > 0 or
len(GV.AdjustConname) > 0 or
len(GV.DemoteConstraint) > 0 or
len(GV.ReSync) > 0 or
len(GV.Reindex) > 0 or
len(GV.Constraints) > 0 or
len(GV.Owners) > 0 or
len(GV.Schemas) > 0 or
len(GV.Policies) > 0)
if GV.opt['-g'] != None and fixes:
try:
if not os.path.exists(GV.opt['-g']):
os.mkdir(GV.opt['-g'])
except Exception, e:
logger.fatal('Unable to create directory "%s": %s' %
(GV.opt['-g'], str(e)))
sys.exit(1)
logger.debug('Building catalog repair scripts')
# set allow_system_table_mods GUC for gpdb >= 4.2
catmod_guc = ""
if GV.version >= "4.2":
catmod_guc = "-c allow_system_table_mods=dml"
# build a script to run these files
script = '#!/bin/bash\ncd $(dirname $0)\n'
# don't remove anything if ReSync possible --
# comment out the delete statements from the repair scripts
maybeRemove = "";
if len(GV.ReSync):
maybeRemove="# "
# use the per-dbid loop for removal and for contraint
# name adjustments
dbids = set(GV.Remove.keys())
dbids = dbids.union(GV.AdjustConname.keys())
dbids = dbids.union(GV.DemoteConstraint.keys())
for seg in dbids:
# dbid.host.port.dbname.sql
c = GV.cfg[seg]
filename = '%i.%s.%i.%s.sql' % (seg, c['hostname'],
c['port'], GV.dbname)
filename = filename.replace(' ', '_')
fullpath = '%s/%s' % (GV.opt['-g'], filename)
try:
file = open(fullpath, 'w')
except Exception, e:
logger.fatal('Unable to create file "%s": %s' %
(fullpath, str(e)))
sys.exit(1)
#unique
lines = set()
if GV.Remove.has_key(seg):
lines = lines.union(GV.Remove[seg])
if GV.AdjustConname.has_key(seg):
lines = lines.union(GV.AdjustConname[seg])
if GV.DemoteConstraint.has_key(seg):
lines = lines.union(GV.DemoteConstraint[seg])
# make sure it is sorted
li = list(lines)
li.sort()
file.write('BEGIN;\n');
for line in li:
file.write(line + '\n')
file.write('COMMIT;\n');
file.close()
script += '\n%secho "Repairing segment %i"' % (maybeRemove, seg)
script += '''
{maybe}env PGOPTIONS="-c gp_session_role=utility {guc}" psql -X -a -h {hostname} -p {port} -f {fname} "{dbname}" > {fname}.out
'''.format(
maybe = maybeRemove,
guc = catmod_guc,
hostname = c['hostname'],
port = c['port'],
fname = filename,
dbname = GV.dbname)
if len(GV.Reindex) > 0:
filename = 'reindex.sql'
fullpath = '%s/%s' % (GV.opt['-g'], filename)
try:
file = open(fullpath, 'w')
except Exception, e:
logger.fatal('Unable to create file "%s": %s' %
(fullpath, str(e)))
sys.exit(1)
for r in GV.Reindex:
file.write('REINDEX INDEX %s;\n' % r)
file.close()
script += 'echo "Reindexing potentially damaged bitmap indexes"\n'
c = GV.cfg[1]
script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \
(c['hostname'], c['port'], filename, GV.dbname, filename)
if len(GV.Constraints) > 0:
GV.Constraints = list(set(GV.Constraints))
filename = 'removeconstraints.sql'
fullpath = '%s/%s' % (GV.opt['-g'], filename)
try:
file = open(fullpath, 'w')
except Exception, e:
logger.fatal('Unable to create file "%s": %s' %
(fullpath, str(e)))
sys.exit(1)
for r in GV.Constraints:
file.write('%s\n' % r)
file.close()
script += 'echo "Dropping invalid unique constraints"\n'
c = GV.cfg[1]
script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \
(c['hostname'], c['port'], filename, GV.dbname, filename)
if len(GV.Owners) > 0:
# Previously we removed duplicate statements from the "Owners" list
# by casting it through a "list(set(...))", however this does not work
# since the order of items in the list is important for correct
# results.
filename = 'fixowners.sql'
fullpath = '%s/%s' % (GV.opt['-g'], filename)
try:
file = open(fullpath, 'w')
except Exception, e:
logger.fatal('Unable to create file "%s": %s' %
(fullpath, str(e)))
sys.exit(1)
for r in GV.Owners:
file.write('%s\n' % r)
file.close()
script += 'echo "Correcting table ownership"\n'
c = GV.cfg[1]
script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \
(c['hostname'], c['port'], filename, GV.dbname, filename)
if len(GV.Schemas) > 0:
# Remove leaked temporary schemas
filename = 'fixnamespace.sql'
fullpath = '%s/%s' % (GV.opt['-g'], filename)
try:
file = open(fullpath, 'w')
except Exception, e:
logger.fatal('Unable to create file "%s": %s' %
(fullpath, str(e)))
sys.exit(1)
for s in GV.Schemas:
file.write('DROP SCHEMA IF EXISTS "%s" CASCADE;\n' % s)
file.close()
script += 'echo "Dropping temporary schemas"\n'
c = GV.cfg[1]
script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \
(c['hostname'], c['port'], filename, GV.dbname, filename)
if len(GV.Policies) > 0:
# changes to distribution policies
filename = 'fixdistribution.sql'
fullpath = '%s/%s' % (GV.opt['-g'], filename)
try:
file = open(fullpath, 'w')
except Exception, e:
logger.fatal('Unable to create file "%s": %s' %
(fullpath, str(e)))
sys.exit(1)
for r in GV.Policies:
file.write('%s\n' % r)
file.close()
script += 'echo "Fixing distribution policies"\n'
c = GV.cfg[1]
script += 'psql -X -a -h %s -p %i -f %s "%s" > %s.out 2>&1\n' % \
(c['hostname'], c['port'], filename, GV.dbname, filename)
try:
path = '%s/runsql.sh' % GV.opt['-g']
file = open(path, 'w')
file.write(script)
file.close()
os.chmod(path, stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
except Exception, e:
logger.fatal('Could not write script: %s' % str(e))
sys.exit(1)
myprint('')
myprint('[INFO]: repair scripts generated in directory %s' % GV.opt['-g'])
if GV.retcode >= ERROR_NOREPAIR:
logger.warn('[WARN]: unable to generate repairs for some issues')
logger.info("Check complete")
#-------------------------------------------------------------------------------
def getCatalog():
# Establish a connection to the master & looks up info in the catalog
db = connect2(GV.cfg[1], utilityMode=False)
return GPCatalog(db)
#-------------------------------------------------------------------------------
def getReportConfiguration():
cfg = {}
for dbid, each in GV.cfg.iteritems():
if each['content'] == -1:
segname = 'master'
else:
segname = 'seg'+str(each['content']+1)
cfg[each['content']] = {'segname': segname,
'hostname': each['hostname'],
'port': each['port']}
return cfg
#===============================================================================
# GPCHECKCAT REPORT
#===============================================================================
GPObjects = {} # key = (catname, oid), value = GPObject
GPObjectGraph = {} # key = GPObject, value=[GPObject]
#-------------------------------------------------------------------------------
# TableMainColumn[catname] = [colname, catname2]
# - report this catname's issues as part of catname2
# - catname.colname references catname2.oid
#-------------------------------------------------------------------------------
TableMainColumn = {}
# Table with no OID
TableMainColumn['gp_verification_history'] = ['vertoken','gp_verification_history']
TableMainColumn['gp_version_at_initdb'] = ['schemaversion', 'gp_version_at_initdb']
TableMainColumn['pg_aggregate'] = ['aggfnoid','pg_proc']
TableMainColumn['pg_amop'] = ['amopclaid','pg_opclass']
TableMainColumn['pg_amproc'] = ['amopclaid','pg_opclass']
TableMainColumn['pg_appendonly'] = ['relid','pg_class']
TableMainColumn['pg_appendonly_alter_column'] = ['relid','pg_class']
TableMainColumn['pg_attribute'] = ['attrelid','pg_class']
TableMainColumn['pg_attribute_encoding'] = ['attrelid', 'pg_class']
TableMainColumn['pg_auth_member'] = ['roleid','pg_authid']
TableMainColumn['pg_autovacuum'] = ['vacrelid','pg_class']
TableMainColumn['pg_exttable'] = ['reloid','pg_class']
TableMainColumn['pg_foreign_table'] = ['reloid','pg_class']
TableMainColumn['pg_index'] = ['indexrelid','pg_class']
TableMainColumn['pg_inherits'] = ['inhrelid','pg_class']
TableMainColumn['pg_largeobject'] = ['loid','pg_largeobject']
TableMainColumn['pg_pltemplate'] = ['tmplname', 'pg_pltemplate']
TableMainColumn['pg_proc_callback'] = ['profnoid', 'pg_proc']
TableMainColumn['pg_type_encoding'] = ['typid', 'pg_type']
TableMainColumn['pg_window'] = ['winfnoid','pg_proc']
# Table with OID (special case), these OIDs are known to be inconsistent
TableMainColumn['pg_attrdef'] = ['adrelid','pg_class']
TableMainColumn['pg_constraint'] = ['conrelid','pg_class']
TableMainColumn['pg_trigger'] = ['tgrelid','pg_class']
TableMainColumn['pg_rewrite'] = ['ev_class','pg_class']
# Persistent table
TableMainColumn['gp_persistent_relation_node'] = ['relfilenode_oid', 'pg_class']
# Cast OID alias type to OID since our graph of objects is based on OID
# int2vector is also casted to int2[] due to the lack of an <(int2vector, int2vector) operator
autoCast = {
'regproc': '::oid',
'regprocedure': '::oid',
'regoper': '::oid',
'regoperator': '::oid',
'regclass': '::oid',
'regtype': '::oid',
'int2vector': '::int2[]'
}
#-------------------------------------------------------------------------------
class QueryException(Exception): pass
#-------------------------------------------------------------------------------
# Get gpObj from GPObjects, instantiate a new one & add to GPObjects if not found
def getGPObject(oid, catname):
gpObj = GPObjects.get((oid, catname),None)
if gpObj is None:
if catname == 'pg_class':
gpObj = RelationObject(oid, catname)
else:
gpObj = GPObject(oid, catname)
GPObjects[(oid,catname)] = gpObj
return gpObj
#-------------------------------------------------------------------------------
def getOidFromPK(catname, pkeys):
# pkeys: name-value pair
pkeystrList = ["%s='%s'" % (key,value) for key, value in pkeys.iteritems()]
pkeystr = ' and '.join(pkeystrList)
qry = """
SELECT oid
FROM (
SELECT oid FROM {catname}
WHERE {pkeystr}
UNION ALL
SELECT oid FROM gp_dist_random('{catname}')
WHERE {pkeystr}
) alloids
GROUP BY oid
ORDER BY count(*) desc, oid
""" .format(catname = catname,
pkeystr = pkeystr)
try:
db = connect2(GV.cfg[1], utilityMode=False)
curs = db.query(qry)
if (len(curs.dictresult()) == 0):
raise QueryException("No such entry '%s' in %s" % (pkeystr,catname))
return curs.dictresult().pop()['oid']
except Exception, e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
myprint(qry)
#-------------------------------------------------------------------------------
# TODO: use execSQLForSingletonRow
#-------------------------------------------------------------------------------
def getClassOidForType(oid):
# For FK check: pg_type entry is missing, so we need to use pg_class
qry = """
SELECT oid
FROM (
SELECT oid FROM pg_class WHERE reltype=%d
UNION ALL
SELECT oid FROM gp_dist_random('pg_class') WHERE reltype=%d
) alltyprelids
GROUP BY oid ORDER BY count(*) desc LIMIT 1
""" % (oid, oid)
try:
db = connect()
curs = db.query(qry)
if len(curs.dictresult()) == 0: return 0
return curs.dictresult().pop()['oid']
except Exception, e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
#-------------------------------------------------------------------------------
def getClassOidForRelfilenode(relfilenode):
qry = "SELECT oid FROM pg_class WHERE relfilenode = %d;" % (relfilenode)
try:
dburl = dbconn.DbURL(hostname=GV.opt['-h'], port=GV.opt['-p'], dbname=GV.dbname)
conn = dbconn.connect(dburl)
oid = dbconn.execSQLForSingletonRow(conn, qry)[0]
return oid
except Exception, e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
#-------------------------------------------------------------------------------
# Process results of tests (CC and FK) for a particular catalog:
# - Categorize entry into GPObject
# - Instantiate GPObject if needed
#-------------------------------------------------------------------------------
def processMissingDuplicateEntryResult(catname, colname, allValues, type):
# type = {"missing" | "duplicate"}
'''
colname: proname | pronamespace | proargtypes | exists | segids
allValues: add | 2200 | 23 23 | extra | {2,3}
scube | 2200 | 1700 | missing | {2}
scube_accum | 2200 | 1700 1700 | missing | {2}
colname: oid | total | segids
allValues: 18853 | 2 | {-1,1}
18853 | 3 | {0}
'''
gpObjName = catname
gpColName = None
pknames = [i for i in colname[:-2]] # Everything except the last two columns
# This catname may not be a GPObject (e.g. pg_attribute)
# If these catnames have oids, they are known to be inconsistent
if catname in TableMainColumn:
gpColName = TableMainColumn[catname][0]
gpObjName = TableMainColumn[catname][1]
elif 'oid' in pknames:
gpColName = 'oid'
# Process entries
for row in allValues:
rowObjName = gpObjName
segids = row[-1]
pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames)
if gpColName != None:
oid = row[colname.index(gpColName)]
else:
oid = getOidFromPK(catname, pkeys)
pkeys = {'oid': oid}
# Special case: constraint for domain, report pg_type.contypid
if catname == 'pg_constraint' and oid == 0:
oid = row[colname.index('contypid')]
rowObjName = 'pg_type'
# Special case: if composite type, report as part of RelationObject
if rowObjName == 'pg_type':
typrelid = getClassOidForType(oid)
if typrelid != 0:
oid = typrelid
rowObjName = 'pg_class'
# Special case: persistent table
if catname == 'gp_persistent_relation_node':
relfilenode = oid
oid = getClassOidForRelfilenode(relfilenode)
gpObj = getGPObject(oid, rowObjName)
if type == "missing":
issue = CatMissingIssue(catname, pkeys, segids, row[-2])
gpObj.setMissingIssue(issue)
else:
assert (type == "duplicate")
issue = CatDuplicateIssue(catname, pkeys, segids, row[-2])
gpObj.setDuplicateIssue(issue)
#-------------------------------------------------------------------------------
def processInconsistentEntryResult(catname, pknames, colname, allValues):
# If tableHasInconsistentOid, columns does not have oid
'''
17365 | test10 | 2200 | 17366 | 0 | 0 | 17366 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {0} -- row1
17365 | test10 | 2200 | 17366 | 0 | 0 | 0 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {NULL} -- row2
176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 4 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {-1}
176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {0,1}
176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {2,3}
'''
# Group allValues rows by its key (oid or primary key) into a dictionary:
groupedValues = {}
for row in allValues:
if 'oid' in colname:
keys = row[colname.index('oid')]
else:
keys = tuple((row[colname.index(n)]) for n in pknames)
if keys in groupedValues:
groupedValues[keys].append(row)
else:
groupedValues[keys] = [row]
gpObjName = catname
gpColName = None
# This catname may not be a GPObject (e.g. pg_attribute)
# If these catnames have oids, they are known to be inconsistent
if catname in TableMainColumn:
gpColName = TableMainColumn[catname][0]
gpObjName = TableMainColumn[catname][1]
elif 'oid' in colname:
gpColName = 'oid'
for keys,values in groupedValues.iteritems():
rowObjName = gpObjName
pkeys = dict((n,values[0][colname.index(n)]) for n in pknames)
if gpColName != None:
oid = values[0][colname.index(gpColName)]
else:
oid = getOidFromPK(catname, pkeys)
issue = CatInconsistentIssue(catname, colname, list(values))
# Special case: constraint for domain, report pg_type.contypid
if catname == 'pg_constraint' and oid == 0:
oid = values[0][colname.index('contypid')]
rowObjName = 'pg_type'
# Special case: if composite type, report as part of RelationObject
if rowObjName == 'pg_type':
typrelid = getClassOidForType(oid)
if typrelid != 0:
oid = typrelid
rowObjName = 'pg_class'
gpObj = getGPObject(oid, rowObjName)
gpObj.setInconsistentIssue(issue)
#-------------------------------------------------------------------------------
def processForeignKeyResult(catname, pkcatname, colname, allValues):
'''
colname: pg_class_relname | pg_class_relnamespace | pg_type_oid | segids
allValues: test11 | 2200 | 17389 | {-1,0,1,2,3}
test4 | 2200 | 17077 | {-1,0,1,2,3}
'''
gpObjName = pkcatname
gpColName = colname[-2].rsplit('_',1)[1]
fkeytab = catname
fkeylist = [name.rsplit('_',1)[1] for name in colname[:-2]]
# This catname may not be a GPObject (e.g. pg_attribute)
# 1. pg_attrdef(adrelid) referencing pg_attribute(attrelid)
# 2. pg_rewrite(ev_class) referencing pg_attribute(attrelid)
if pkcatname in TableMainColumn:
if gpColName == TableMainColumn[catname][0]:
gpObjName = TableMainColumn[catname][1]
# Process each row
for row in allValues:
rowObjName = gpObjName
segids = row[-1]
oid = row[len(colname)-2]
# Build fkeys dict: used to find oid of pg_class
fkeys = dict((n,row[colname.index(fkeytab+'_'+n)]) for n in fkeylist)
issue = CatForeignKeyIssue(pkcatname, {gpColName: oid}, segids, catname, fkeys)
# Special cases:
# 1. pg_class(oid) referencing pg_type(oid) - relation & composite type
if pkcatname == 'pg_type' and fkeytab == 'pg_class':
rowObjName = 'pg_class'
oid = getOidFromPK(rowObjName, fkeys)
gpObj = getGPObject(oid, rowObjName)
gpObj.setForeignKeyIssue(issue)
#-------------------------------------------------------------------------------
def processACLResult(catname, colname, allValues):
'''
colname: segid | datname | master_acl | segment_acl
allValues: 2 | acldb | {=Tc/nmiller,nmiller=CTc/nmiller,person1=C/nmiller} | {=Tc/nmiller,nmiller=CTc/nmiller}
1 | gptest | None | {=Tc/nmiller,nmiller=CTc/nmiller,person2=CTc/nmiller}
'''
gpObjName = catname
gpColName = None
pknames = [i for i in colname[1:-2]]
# We have to look up key to report (pg_pltemplate)
if catname in TableMainColumn:
gpColName = TableMainColumn[catname][0]
gpObjName = TableMainColumn[catname][1]
elif 'oid' in pknames:
gpColName = 'oid'
# Process entries
for row in allValues:
segid = row[0]
pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames)
if gpColName != None:
oid = row[colname.index(gpColName)]
else:
oid = getOidFromPK(catname, pkeys)
pkeys = {'oid': oid}
macl = row[-2][1:-1].split(',') if row[-2] != None else []
sacl = row[-1][1:-1].split(',') if row[-1] != None else []
macl_only = list(set(macl).difference(sacl))
sacl_only = list(set(sacl).difference(macl))
issue = CatACLIssue(catname, pkeys, segid, macl_only, sacl_only)
gpObj = getGPObject(oid, gpObjName)
gpObj.setACLIssue(issue)
#-------------------------------------------------------------------------------
class CatInconsistentIssue:
def __init__(self, catname, columns, rows):
self.catname = catname
self.columns = columns
self.rows = rows
def report(self):
def format_segids(segids):
if segids == '{NULL}':
idstr = 'all other segments'
else:
ids = [int(i) for i in segids[1:-1].split(',')]
idstr = ''
for i in ids:
idstr += '%s (%s:%d) ' % \
(GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
GV.report_cfg[i]['port'])
return idstr
for i in range(0,len(self.columns)-1):
colset = set(j[i] for j in self.rows)
if len(colset) > 1:
for row in self.rows:
myprint("%20s is '%s' on %s" % (self.columns[i],row[i],format_segids(row[-1])))
#-------------------------------------------------------------------------------
class CatForeignKeyIssue:
def __init__(self, catname, pkeys, segids, fcatname, fkeys):
self.catname = catname
self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx
self.segids = segids # list of affected segids
self.fcatname = fcatname # checked object catname
self.fkeys = fkeys # string of fkeys=xxxx
def report(self):
ids = [int(i) for i in self.segids[1:-1].split(',')]
idstr = ''
if len(ids) == len(GV.report_cfg):
idstr = 'all segments'
else:
for i in ids:
idstr += '%s (%s:%d) ' % \
(GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
GV.report_cfg[i]['port'])
if self.fcatname == 'pg_attribute':
myprint(" No %s entry for %s column '%s' on %s" \
% (self.catname, self.fcatname,self.fkeys['attname'],idstr))
else:
myprint(" No %s entry for %s %s on %s" \
% (self.catname, self.fcatname,self.fkeys,idstr))
#-------------------------------------------------------------------------------
class CatMissingIssue:
def __init__(self, catname, pkeys, segids, type ):
self.catname = catname
self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx
self.segids = segids # list of affected segids
self.type = type # missing or extra
assert(self.type in ['missing','extra'])
def report(self):
ids = [int(i) for i in self.segids[1:-1].split(',')]
idstr = ''
if len(ids) == len(GV.report_cfg):
idstr = 'all segments'
else:
for i in ids:
idstr += '%s (%s:%d) ' % \
(GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
GV.report_cfg[i]['port'])
if self.catname == 'pg_attribute':
myprint(" %s column '%s' on %s" \
% (self.type.capitalize(), self.pkeys['attname'],idstr))
elif self.catname == 'pg_class':
myprint(' %s relation metadata for %s on %s' \
% (self.type.capitalize(), str(self.pkeys), idstr))
else:
myprint(' %s %s metadata of %s on %s' \
% (self.type.capitalize(), self.catname[3:], str(self.pkeys), idstr))
#-------------------------------------------------------------------------------
class CatDuplicateIssue:
def __init__(self, catname, pkeys, segids, enum ):
self.catname = catname
self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx
self.segids = segids # list of affected segids
self.enum = enum # number of entries
def report(self):
""" Found # catname entries of "oid=XXXXX" on segment {Y,Z} """
myprint(' Found %d %s entries of %s on segment %s' \
% (self.enum, self.catname, str(self.pkeys), str(self.segids)))
#-------------------------------------------------------------------------------
class CatACLIssue:
def __init__(self, catname, pkeys, segid, macl_only, sacl_only ):
self.catname = catname
self.pkeys = pkeys # string of pkey=xxxx or oid=xxxx
self.segid = segid # the affected segid
self.macl_only = macl_only # acl appears only on master
self.sacl_only = sacl_only # acl appears only on segment
def report(self):
""" Master (host:port) and seg# (host:port) have different ACL:
Exist(s) on master only: [...........]
Exist(s) on seg# only: [...........]
"""
mstr = 'Master (%s:%s)' % (GV.report_cfg[-1]['hostname'],GV.report_cfg[-1]['port'])
sstr = '%s (%s:%s)' % (GV.report_cfg[self.segid]['segname'],
GV.report_cfg[self.segid]['hostname'],
GV.report_cfg[self.segid]['port'])
myprint(' %s and %s have different ACLs:' % (mstr, sstr))
if len(self.macl_only) > 0:
myprint(' Exist(s) on master only: %s' % (self.macl_only))
if len(self.sacl_only) > 0:
myprint(' Exist(s) on %s only: %s ' % \
(GV.report_cfg[self.segid]['segname'], self.sacl_only))
#-------------------------------------------------------------------------------
class GPObject:
def __init__(self, oid, catname):
self.oid = oid
self.catname = catname
self.missingIssues = {} # key=issue.catname, value=list of catMissingIssue
self.inconsistentIssues = {} # key=issue.catname, value=list of catInconsistentIssue
self.duplicateIssues = {} # key=issue.catname, value=list of catDuplicateIssue
self.aclIssues = {} # key=issue.catname, value=list of catACLIssue
self.foreignkeyIssues = {} # key=issue.catname, value=list of catForeignKeyIssue
def setMissingIssue(self, issue):
if issue.catname in self.missingIssues:
self.missingIssues[issue.catname].append(issue)
else:
self.missingIssues[issue.catname] = [issue]
def setInconsistentIssue(self, issue):
if issue.catname in self.inconsistentIssues:
self.inconsistentIssues[issue.catname].append(issue)
else:
self.inconsistentIssues[issue.catname] = [issue]
def setDuplicateIssue(self, issue):
if issue.catname in self.duplicateIssues:
self.duplicateIssues[issue.catname].append(issue)
else:
self.duplicateIssues[issue.catname] = [issue]
def setACLIssue(self, issue):
if issue.catname in self.aclIssues:
self.aclIssues[issue.catname].append(issue)
else:
self.aclIssues[issue.catname] = [issue]
def setForeignKeyIssue(self, issue):
if issue.catname in self.foreignkeyIssues:
self.foreignkeyIssues[issue.catname].append(issue)
else:
self.foreignkeyIssues[issue.catname] = [issue]
def isTopLevel(self):
return True
def reportAllIssues(self):
if self.__class__ == GPObject:
myprint('')
myprint('----------------------------------------------------')
myprint('Object oid: %s' % (self.oid))
myprint('Table name: %s' % (self.catname))
myprint('')
# Report inconsistent issues
if len(self.inconsistentIssues):
for catname,issues in self.inconsistentIssues.iteritems():
myprint(' Name of test which found this issue: inconsistent_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report missing issues
if len(self.missingIssues):
omitlist = ['pg_attribute','pg_attribute_encoding','pg_type','pg_appendonly','pg_index']
if 'pg_class' in self.missingIssues:
myprint(' Name of test which found this issue: missing_extraneous_pg_class')
for name in omitlist:
if name in self.missingIssues:
myprint(' Name of test which found this issue: missing_extraneous_%s' % name)
for each in self.missingIssues['pg_class']:
each.report()
for catname, issues in self.missingIssues.iteritems():
if catname != 'pg_class' and catname not in omitlist:
myprint(' Name of test which found this issue: missing_extraneous_%s' % catname)
for each in issues:
each.report()
else:
for catname,issues in self.missingIssues.iteritems():
myprint(' Name of test which found this issue: missing_extraneous_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report foreign key issues
if len(self.foreignkeyIssues):
for catname,issues in self.foreignkeyIssues.iteritems():
myprint(' Name of test which found this issue: foreign_key_%s' % catname)
for each in issues:
each.report()
myprint('')
if len(self.duplicateIssues):
for catname,issues in self.duplicateIssues.iteritems():
myprint(' Name of test which found this issue: duplicate_%s' % catname)
for each in issues:
each.report()
myprint('')
# Report ACL issues
if len(self.aclIssues):
for catname,issues in self.aclIssues.iteritems():
myprint(' Name of test which found this issue: acl_%s' % catname)
for each in issues:
each.report()
myprint('')
def __cmp__(self, other):
if isinstance(other, GPObject):
return cmp((self.oid, self.catname), (other.oid, other.catname))
else:
return NotImplemented
def __hash__(self):
return hash((self.oid, self.catname))
#-------------------------------------------------------------------------------
class RelationObject(GPObject):
def __init__(self, oid, catname):
GPObject.__init__(self, oid, catname)
self.relname = None
self.nspname = None
self.relkind = None
self.paroid = None
def reportAllIssues(self):
if self.isTopLevel():
myprint('')
myprint('----------------------------------------------------')
if self.relkind == 'c':
myprint('Type oid: %d' % (self.oid))
myprint('Type name: %s.%s' % (self.nspname, self.relname))
else:
myprint('Relation oid: %d' % (self.oid))
myprint('Relation name: %s.%s' % (self.nspname, self.relname))
else:
myprint(' Sub-object: ')
myprint(' ----------------------------------------------------')
myprint(' Relation oid: %d' % (self.oid))
myprint(' Relation name: %s.%s' % (self.nspname, self.relname))
myprint('')
GPObject.reportAllIssues(self)
def setRelInfo(self, relname, nspname, relkind, paroid):
self.relname = relname
self.nspname = nspname
self.relkind = relkind
self.paroid = paroid
def isTopLevel(self):
if self.relkind == 'i' or self.relkind == 't' or self.relkind == 'o':
return False
return True
#-------------------------------------------------------------------------------
def getRelInfo(objects):
# Get relinfo: relname, relkind, par oid for all oids
# get relname, relkind
# left outer join with union of
# - get par oid for toast
# - get par oid for aoco
# - get par oid for index
# for each query: if there is inconsistency, majority wins
if objects == {}: return
oids = [oid for (oid, catname) in objects if catname=='pg_class']
if oids == []: return
qry = """
SELECT oid, relname, nspname, relkind, paroid
FROM (
SELECT oid, relname, nspname, relkind
FROM (
SELECT oid, relname, nspname, relkind, rank() over (partition by oid order by count(*) desc)
FROM
(
SELECT c.oid, c.relname, c.relkind, n.nspname
FROM pg_class c
left outer join pg_namespace n
on (c.relnamespace = n.oid)
WHERE c.oid in ({oids})
UNION ALL
SELECT c.oid, c.relname, c.relkind, n.nspname
FROM gp_dist_random('pg_class') c
left outer join gp_dist_random('pg_namespace') n
on (c.relnamespace = n.oid and c.gp_segment_id = n.gp_segment_id)
WHERE c.oid in ({oids})
) allrelinfo
GROUP BY oid, relname, nspname, relkind
) relinfo
WHERE rank=1
) relinfo
LEFT OUTER JOIN
(
SELECT reltoastrelid as childoid, oid as paroid
FROM (
SELECT reltoastrelid, oid, rank() over (partition by reltoastrelid order by count(*) desc)
FROM
( SELECT reltoastrelid, oid FROM pg_class
WHERE reltoastrelid in ({oids})
UNION ALL
SELECT reltoastrelid, oid FROM gp_dist_random('pg_class')
WHERE reltoastrelid in ({oids})
) allpar_toast
GROUP BY reltoastrelid, oid
) par_toast
WHERE rank=1
UNION ALL
SELECT segrelid as childoid, relid as paroid
FROM (
SELECT segrelid, relid, rank() over (partition by segrelid order by count(*) desc)
FROM
( SELECT segrelid, relid FROM pg_appendonly
WHERE segrelid in ({oids})
UNION ALL
SELECT segrelid, relid FROM gp_dist_random('pg_appendonly')
WHERE segrelid in ({oids})
) allpar_aoco
GROUP BY segrelid, relid
) par_aoco
WHERE rank=1
UNION ALL
SELECT indexrelid as childoid, indrelid as paroid
FROM (
SELECT indexrelid, indrelid, rank() over (partition by indexrelid order by count(*) desc)
FROM
( SELECT indexrelid, indrelid FROM pg_index
WHERE indexrelid in ({oids})
UNION ALL
SELECT indexrelid, indrelid FROM gp_dist_random('pg_index')
WHERE indexrelid in ({oids})
) allpar_index
GROUP BY indexrelid, indrelid
) par_index
WHERE rank=1
) par ON childoid = oid
""".format( oids = ','.join(map(str,oids)))
try:
db = connect2(GV.cfg[1], utilityMode=False)
curs = db.query(qry)
for row in curs.getresult():
(oid, relname, nspname, relkind, paroid) = row
objects[oid,'pg_class'].setRelInfo(relname, nspname, relkind, paroid)
except Exception, e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
myprint(qry)
#-------------------------------------------------------------------------------
def buildGraph():
def buildGraphRecursive(objects, graph):
if objects == {}: return
getRelInfo (objects)
localObjects = {}
for (oid, catname),obj in objects.iteritems():
# Top level object, add to graph
if obj.isTopLevel():
if obj not in graph:
graph[obj] = []
# Not top level object, find parent, add to localObjects if needed
else:
parobj = objects.get((obj.paroid, catname), None)
if parobj is None:
parobj = localObjects.get((obj.paroid, catname), None)
if parobj is None:
parobj = RelationObject(obj.paroid, catname)
localObjects[(obj.paroid, catname)] = parobj
assert (parobj is not None)
# Add obj and parobj to graph
if parobj not in graph:
graph[parobj] = []
if obj not in graph[parobj]:
graph[parobj].append(obj)
buildGraphRecursive(localObjects, graph)
buildGraphRecursive(GPObjects, GPObjectGraph)
#-------------------------------------------------------------------------------
def checkcatReport():
def reportAllIssuesRecursive(par, graph):
par.reportAllIssues()
if par not in graph:
return
for child in sorted(graph[par]):
reportAllIssuesRecursive(child, graph)
buildGraph()
reportedTest = ['duplicate','missing_extraneous','inconsistent','foreign_key','acl', 'duplicate_persistent']
myprint('')
myprint('SUMMARY REPORT')
myprint('===================================================================')
elapsed = str(datetime.timedelta(seconds=GV.elapsedTime))[:-4]
myprint('Total runtime for %d test(s): %s' % (GV.totalTestRun, elapsed))
if len(GPObjectGraph) == 0 and len(GV.failedTest) == 0:
myprint('Found no catalog issue\n')
return
if len(GPObjectGraph) > 0:
total = 0
for par in GPObjectGraph:
if par.isTopLevel(): total += 1
myprint('Found a total of %d issue(s)' % total)
for par in sorted(GPObjectGraph):
if par.isTopLevel():
reportAllIssuesRecursive(par, GPObjectGraph)
myprint('')
notReported = set(GV.failedTest).difference(reportedTest)
if len(notReported) > 0:
myprint('Failed test(s) that are not reported here: %s' % (', '.join(notReported)))
myprint('See %s for detail\n' % get_logfile())
def myprint(str):
log_literal(logger,logging.CRITICAL,str)
#############
if __name__ == '__main__':
parseCommandLine()
if GV.opt['-l']:
listAllTests()
sys.exit(GV.retcode)
GV.version = getversion()
if GV.version < "4.0":
myprint("Error: only Greenplum database version >= 4.0 are supported\n")
sys.exit(GV.retcode)
GV.cfg = getGPConfiguration()
GV.report_cfg = getReportConfiguration()
GV.max_content = max([GV.cfg[dbid]['content'] for dbid in GV.cfg])
GV.catalog = getCatalog()
for dbname in GV.alldb:
# Reset global variables
GV.reset_stmt_queues()
GPObjects = {}
GPObjectGraph = {}
GV.dbname = dbname
myprint('')
myprint("Connected as user \'%s\' to database '%s', port '%d', gpdb version '%s'" \
% (GV.opt['-U'], GV.dbname, GV.report_cfg[-1]['port'], GV.version))
myprint('-------------------------------------------------------------------')
if GV.opt['-R']:
name = GV.opt['-R']
try:
runOneTest(name)
except LookupError, e:
myprint("'%s' is not a valid test\n\n" % (name))
setError(ERROR_NOREPAIR)
sys.exit(GV.retcode)
elif GV.opt['-C']:
namestr = GV.opt['-C']
try:
cat = getCatObj(namestr)
runTestCatname(cat)
except Exception, e:
setError(ERROR_NOREPAIR)
sys.exit(GV.retcode)
else:
runAllTests()
checkcatReport()
# skip shared tables on subsequent passes
if not GV.opt['-S']:
GV.opt['-S'] = "none"
sys.exit(GV.retcode)