blob: 8ca6f35b4926671805a474e87fe123cd2c36e14b [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" Provides Access Utilities for Examining and Modifying the GP Catalog.
"""
import copy
import dbconn
from gppylib import gplog
logger=gplog.get_default_logger()
class CatalogError(Exception): pass
def basicSQLExec(conn,sql):
cursor=None
try:
cursor=dbconn.execSQL(conn,sql)
rows=cursor.fetchall()
return rows
finally:
if cursor:
cursor.close()
def getSessionGUC(conn,gucname):
sql = "SHOW %s" % gucname
return basicSQLExec(conn,sql)[0][0]
def getUserDatabaseList(conn):
sql = "SELECT datname FROM pg_catalog.pg_database WHERE datname NOT IN ('postgres','template1','template0') ORDER BY 1"
return basicSQLExec(conn,sql)
def getDatabaseList(conn):
sql = "SELECT datname FROM pg_catalog.pg_database"
return basicSQLExec(conn,sql)
def getActiveUserPIDs(conn):
sql = """
SELECT
procpid
FROM
pg_stat_activity
WHERE
procpid != pg_backend_pid() AND current_query<>'idle'
"""
rows = basicSQLExec(conn,sql)
return rows
def getUserPIDs(conn):
"""dont count ourselves"""
sql = """SELECT procpid FROM pg_stat_activity WHERE procpid != pg_backend_pid()"""
return basicSQLExec(conn,sql)
def cancelBackendPIDs(conn):
sql = """
SELECT
pg_cancel_backend(procpid)
FROM
(SELECT procpid FROM pg_stat_activity WHERE procpid != pg_backend_pid()) as s;
"""
rows = basicSQLExec(conn,sql)
def getCollationSettings(conn):
sql = """
SELECT current_setting('lc_collate') as lc_collate,
current_setting('lc_monetary') as lc_monetary,
current_setting('lc_numeric') as lc_numeric;
"""
rows = basicSQLExec(conn,sql)
return (rows[0][0],rows[0][1],rows[0][2])
def doesSchemaExist(conn,schemaname):
sql = "SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = '%s'" % schemaname
cursor=None
try:
cursor=dbconn.execSQL(conn,sql)
numrows = cursor.rowcount
if numrows == 0:
return False
elif numrows == 1:
return True
else:
raise CatalogError("more than one entry in pg_namespace for '%s'" % schemaname)
finally:
if cursor:
cursor.close()
def get_master_filespace_map(conn):
"""Returns an array of [fsname, fspath] arrays that represents
all the filespaces on the master."""
sql = """SELECT fsname, fselocation
FROM pg_filespace, pg_filespace_entry
WHERE pg_filespace.oid = pg_filespace_entry.fsefsoid
AND fsedbid = 1"""
cursor = None
try:
cursor = dbconn.execSQL(conn, sql)
return cursor.fetchall()
finally:
if cursor:
cursor.close()
def get_standby_filespace_map(conn):
"""Returns an array of [fsname, fspath] arrays that represents
all the filespaces on the standby master."""
sql = """SELECT fsname, fselocation
FROM pg_filespace, pg_filespace_entry
WHERE pg_filespace.oid = pg_filespace_entry.fsefsoid
AND fsedbid = (SELECT MAX(dbid) FROM gp_segment_configuration)"""
cursor = None
try:
cursor = dbconn.execSQL(conn, sql)
return cursor.fetchall()
finally:
if cursor:
cursor.close()
def get_catalogtable_list(conn):
sql = """SELECT schemaname || '.' || tablename
FROM pg_tables
WHERE schemaname = 'pg_catalog'
"""
cursor=None
try:
cursor=dbconn.execSQL(conn,sql)
return cursor.fetchall()
finally:
if cursor:
cursor.close()
def get_usertable_list(conn, include_external_tables=True):
if include_external_tables:
sql = """SELECT schemaname || '.' || tablename
FROM pg_tables
WHERE schemaname <> 'pg_catalog'
"""
else:
sql = """SELECT schemaname || '.' || tablename FROM
(SELECT
n.nspname AS schemaname, c.relname AS tablename,
pg_get_userbyid(c.relowner) AS tableowner, t.spcname AS "tablespace",
c.relhasindex AS hasindexes, c.relhasrules AS hasrules, c.reltriggers > 0 AS hastriggers
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_tablespace t ON t.oid = c.reltablespace
WHERE c.relkind = 'r'::"char" AND c.relstorage <> 'x'::"char" AND n.nspname <> 'pg_catalog') foo;
"""
return basicSQLExec(conn,sql)
def vacuum_catalog(dburl,conn,full=False,utility=False):
""" Will use the provided connection to enumerate the list of databases
and then connect to each one in turn and vacuum full all of the
catalog files
TODO: There are a few tables that are cluster-wide that strictly speaking
don't need to be vacuumed for each database. These are most likely
small and so perhaps isn't worth the added complexity to optimize them.
WARNING: doing a vacuum full on the catalog requires that
there aren't any users idle in a transaction as they typically
hold catalog share locks. The result is this vacuum will wait forever on
getting the lock. This method is best called when no one else
is connected to the system. our own connections are typically idle
in transactiona and so are especially bad.
"""
dblist = getDatabaseList(conn)
catlist = get_catalogtable_list(conn)
conn.commit()
for db in dblist:
test_url = copy.deepcopy(dburl)
test_url.pgdb = db[0]
if db[0] == 'template0' or db[0] == 'postgres':
continue
vac_conn = dbconn.connect(test_url,utility)
vac_curs = vac_conn.cursor()
vac_curs.execute("COMMIT")
vac_curs.execute("SET CLIENT_MIN_MESSAGES='ERROR'")
for table in catlist:
logger.debug('Vacuuming %s %s' % (db[0],table[0]) )
if full:
sql = "VACUUM FULL %s" % table[0]
else:
sql = "VACUUM %s" % table[0]
vac_curs.execute(sql)
vac_curs.execute(sql)
vac_conn.commit()
vac_conn.close()
class CatalogTable():
def __init__(self):
raise CatalogError("Programmer Error! CatalogTable is an abstract class")
def update_db(self,conn):
try:
cursor=conn.cursor()
cursor.execute(self.get_update_sql())
finally:
if cursor: cursor.close()
def get_update_sql(self):
raise CatalogError("Programmer Error! subclasses should override")
class GpVersionAtInitdbTable(CatalogTable):
""" pg_catalog.gp_version_at_initdb
template1=# \d pg_catalog.gp_version_at_initdb;
Table "pg_catalog.gp_version_at_initdb"
Column | Type | Modifiers
----------------+----------+-----------
schemaversion | smallint | not null
productversion | text |
Tablespace: "pg_global"
"""
schemaversion=-1
productversion=None
def __init__(self,conn):
try:
cursor = conn.cursor()
cursor.execute(self.get_select_sql())
if cursor.rowcount != 1:
raise CatalogError("pgcatalog.gp_version_at_initdb had unexpected row count "\
"of: %d. Was expecting 1." % cursor.rowcount)
row = cursor.fetchone()
self.schemaversion = int(row[0])
self.productversion = row[1]
finally:
if cursor: cursor.close()
def get_select_sql(self):
return "SELECT * FROM pg_catalog.gp_version_at_initdb"
def get_update_sql(self):
return "UPDATE pg_catalog.gp_version_at_initdb SET schemaversion = %d, "\
" productversion = '%s' " % (self.schemaversion,self.productversion)
class GpConfigurationTable(CatalogTable):
"""pg_catalog.gp_segment_configuration
postgres=# \d gp_segment_configuration
Table "pg_catalog.gp_segment_configuration"
Column | Type | Modifiers
------------------+------------+-----------
dbid | smallint | not null
content | smallint | not null
role | "char" | not null
preferred_role | "char" | not null
mode | "char" | not null
status | "char" | not null
hostname | name | not null
address | name | not null
port | integer | not null
replication_port | integer |
san_mounts | int2vector |
Indexes:
"gp_segment_config_content_preferred_role_index" UNIQUE, btree (content, preferred_role), tablespace "pg_global"
"gp_segment_config_dbid_index" UNIQUE, btree (dbid), tablespace "pg_global"
Tablespace: "pg_global"
"""
@staticmethod
def deleteAllRows(conn):
try:
cursor=conn.cursor()
cursor.execute("DELETE FROM gp_segment_configuration")
conn.commit()
finally:
if cursor: cursor.close()
# XXX: THIS IS OBVIOUSLY BROKEN IN 4.0 for SAN.
@staticmethod
def addRows(conn, segments):
try:
cursor = conn.cursor()
for seg in segments:
if seg.replicationPort != None:
replicationPort = str(seg.replicationPort)
if replicationPort.isdigit() == False:
replicationPort = 'null'
else:
replicationPort = 'null'
sql = """
INSERT INTO pg_catalog.gp_segment_configuration
( dbid , content , role , preferred_role
, mode , status , port , hostname
, address , replication_port
)
VALUES
( %d , %d , \'%s\' , \'%s\'
, \'%s\' , \'%s\' , %d , \'%s\'
, \'%s\' , %s
)
""" % ( int(seg.dbid), int(seg.content), seg.role , seg.preferred_role
, seg.mode , seg.status , int(seg.port), seg.hostname
, seg.hostname , replicationPort
)
cursor.execute(sql)
conn.commit()
except Exception, e:
raise CatalogError(e)
finally:
if cursor: cursor.close()
#------------------------------------------------------------------------------
#TODO:
class GpMasterMirroring(CatalogTable): pass
class GpDistributionPolicy(CatalogTable): pass