| #!/usr/bin/env python |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| ''' |
| minirepro utility |
| |
| USAGE |
| |
| minirepro <db-name> [-h <master-host>] [-U <username>] [-p <port>] |
| -q <SQL-file> -f <repo-data-file> |
| |
| minirepro -? |
| |
| |
| DESCRIPTION |
| |
| For any SQL commands, the minirepro utility generates HAWQ Database |
| information for the commands. The information can be analyzed to |
| perform root cause analysis. |
| |
| The minirepro utility reads the input SQL file, passes the input SQL |
| command to hawq_toolkit function hawq_dump_query_oids() to get the dependent |
| object ids. Then the utility uses pg_dump to dump the object DDLs, and |
| queries the system catalog to collect statistics of these relations. |
| The information is written to output file. The output includes a minimal |
| sets of DDLs and statistics of relations and functions that are related |
| to the input SQL commands. |
| |
| |
| PARAMETERS |
| |
| <db-name> |
| Name of the HAWQ Database. |
| |
| -h <master-host> |
| HAWQ Database master host. Default is localhost. |
| |
| -U <username> |
| HAWQ Database user name to log into the database and run the |
| SQL command. Default is the PGUSER environment variable. If PGUSER |
| is not defined, OS user name running the utility is used. |
| |
| -p <port> |
| Port that is used to connect to HAWQ Database. |
| Default is the PGPORT environment variable. If PGPORT is not defined, |
| the default value is 5432. |
| |
| -q <SQL-file> |
| A text file that contains SQL commands. The commands can be on |
| multiple lines. |
| |
| -f <repo-data-file> |
| The output file that contains DDLs and statistics of relations |
| and functions that are related to the SQL commands. |
| |
| -? Show this help text and exit. |
| |
| |
| EXAMPLE |
| |
| minirepro gptest -h locahost -U gpadmin -p 4444 -q ~/in.sql -f ~/out.sql |
| ''' |
| |
| import os, sys, re, json, platform, subprocess |
| from optparse import OptionParser |
| from pygresql import pgdb |
| from datetime import datetime |
| |
| version = '1.10' |
| PATH_PREFIX = '/tmp/' |
| PGDUMP_FILE = 'pg_dump_out.sql' |
| sysnslist = "('pg_toast', 'pg_bitmapindex', 'pg_catalog', 'information_schema', 'hawq_toolkit', 'pg_aoseg')" |
| pgoptions = '-c gp_session_role=utility' |
| |
| class MRQuery(object): |
| def __init__(self): |
| self.schemas = [] |
| self.funcids = [] |
| self.relids = [] |
| |
| def E(query_str): |
| return pgdb.escape_string(query_str) |
| |
| def generate_timestamp(): |
| timestamp = datetime.now() |
| return timestamp.strftime("%Y%m%d%H%M%S") |
| |
| def result_iter(cursor, arraysize=1000): |
| 'An iterator that uses fetchmany to keep memory usage down' |
| while True: |
| results = cursor.fetchmany(arraysize) |
| if not results: |
| break |
| for result in results: |
| yield result |
| |
| def get_server_version(cursor): |
| query = "select version()" |
| try: |
| cursor.execute(query) |
| vals = cursor.fetchone() |
| return vals[0] |
| except pgdb.DatabaseError as e: |
| sys.stderr.write('\nError while trying to find HAWQ/GPDB version.\n\n' + str(e) + '\n\n') |
| sys.exit(1) |
| |
| def parse_cmd_line(): |
| p = OptionParser(usage='Usage: %prog <database> [options]', version='%prog '+version, conflict_handler="resolve") |
| p.add_option('-?', '--help', action='help', help='Show this help message and exit') |
| p.add_option('-h', '--host', action='store', |
| dest='host', help='Specify a remote host') |
| p.add_option('-p', '--port', action='store', |
| dest='port', help='Specify a port other than 5432') |
| p.add_option('-U', '--user', action='store', dest='user', |
| help='Connect as someone other than current user') |
| p.add_option('-q', action='store', dest='query_file', |
| help='file name that contains the query') |
| p.add_option('-f', action='store', dest='output_file', |
| help='minirepro output file name') |
| return p |
| |
| def dump_query(connectionInfo, query_file): |
| (host, port, user, db) = connectionInfo |
| print "Extracting metadata from query file %s ..." % query_file |
| |
| with open(query_file, 'r') as query_f: |
| sql_text = query_f.read() |
| query = "select hawq_toolkit.hawq_dump_query_oids('%s')" % E(sql_text) |
| |
| toolkit_sql = PATH_PREFIX + 'toolkit.sql' |
| with open(toolkit_sql, 'w') as toolkit_f: |
| toolkit_f.write(query) |
| |
| query_cmd = "psql %s --pset footer -Atq -h %s -p %s -U %s -f %s" % (db, host, port, user, toolkit_sql) |
| print query_cmd |
| |
| p = subprocess.Popen(query_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ) |
| |
| if p.wait() is not 0: |
| errormsg = p.communicate()[1] |
| sys.stderr.writelines('\nError when executing function hawq_toolkit.hawq_dump_query_oids.\n\n' + errormsg + '\n\n') |
| sys.exit(1) |
| |
| outmsg, errormsg = p.communicate() |
| if errormsg: |
| sys.stderr.writelines('\nError when executing function hawq_toolkit.hawq_dump_query_oids.\n\n' + errormsg + '\n\n') |
| sys.exit(1) |
| return outmsg |
| |
| # relation and function oids will be extracted from the dump string |
| def parse_oids(cursor, json_oids): |
| result = MRQuery() |
| result.relids = json.loads(json_oids)['relids'] |
| result.funcids = json.loads(json_oids)['funcids'] |
| |
| if len(result.relids) == 0: |
| result.relids = '0' |
| if len(result.funcids) == 0: |
| result.funcids = '0' |
| |
| cat_query = "SELECT distinct(nspname) FROM pg_class c, pg_namespace n WHERE " \ |
| "c.relnamespace = n.oid AND c.oid IN (%s) " \ |
| "AND n.nspname NOT IN %s" % (result.relids, sysnslist) |
| |
| cursor.execute(cat_query) |
| for vals in result_iter(cursor): |
| result.schemas.append(vals[0]) |
| |
| cat_query = "SELECT distinct(nspname) FROM pg_proc p, pg_namespace n WHERE " \ |
| "p.pronamespace = n.oid AND p.oid IN (%s) " \ |
| "AND n.nspname NOT IN %s" % (result.funcids, sysnslist) |
| |
| cursor.execute(cat_query) |
| for vals in result_iter(cursor): |
| result.schemas.append(vals[0]) |
| |
| return result |
| |
| def pg_dump_object(mr_query, connectionInfo, envOpts): |
| out_file = PATH_PREFIX + PGDUMP_FILE |
| dmp_cmd = 'pg_dump -h %s -p %s -U %s -sxO %s' % connectionInfo |
| dmp_cmd = "%s --relation-oids %s --function-oids %s -f %s" % \ |
| (dmp_cmd, mr_query.relids, mr_query.funcids, E(out_file)) |
| print dmp_cmd |
| p = subprocess.Popen(dmp_cmd, shell=True, stderr=subprocess.PIPE, env=envOpts) |
| if p.wait() is not 0: |
| sys.stderr.write('\nError while dumping schema.\n\n' + p.communicate()[1] + '\n\n') |
| sys.exit(1) |
| |
| def dump_tuple_count(cur, oid_str, f_out): |
| stmt = "SELECT pgc.relname, pgn.nspname, pgc.relpages, pgc.reltuples FROM pg_class pgc, pg_namespace pgn " \ |
| "WHERE pgc.relnamespace = pgn.oid and pgc.oid in (%s) and pgn.nspname NOT LIKE 'pg_temp_%%' " \ |
| "and pgn.nspname NOT IN %s" % (oid_str, sysnslist) |
| |
| templateStmt = '-- Table: {1}\n' \ |
| 'UPDATE pg_class\nSET\n' \ |
| '{0}\n' \ |
| 'WHERE relname = \'{1}\' AND relnamespace = ' \ |
| '(SELECT oid FROM pg_namespace WHERE nspname = \'{2}\');\n\n' |
| |
| cur.execute(stmt) |
| columns = [x[0] for x in cur.description] |
| types = ['int', 'real'] |
| for vals in result_iter(cur): |
| lines = [] |
| for col, val, typ in zip(columns[2:], vals[2:], types): |
| # i.e. relpages = 1::int, reltuples = 1.0::real |
| lines.append('\t%s = %s::%s' % (col, val, typ)) |
| updateStmt = templateStmt.format(E(',\n'.join(lines)), E(vals[0]), E(vals[1])) |
| f_out.writelines(updateStmt) |
| |
| def dump_stats(cur, oid_str, f_out): |
| query = 'SELECT pgc.relname, pgn.nspname, pga.attname, pgt.typname, pgs.* ' \ |
| 'FROM pg_class pgc, pg_statistic pgs, pg_namespace pgn, pg_attribute pga, pg_type pgt ' \ |
| 'WHERE pgc.relnamespace = pgn.oid and pgc.oid in (%s) and pgn.nspname NOT IN %s ' \ |
| 'and pgn.nspname NOT LIKE \'pg_temp_%%\' ' \ |
| 'and pgc.oid = pgs.starelid ' \ |
| 'and pga.attrelid = pgc.oid ' \ |
| 'and pga.attnum = pgs.staattnum ' \ |
| 'and pga.atttypid = pgt.oid ' \ |
| 'ORDER BY pgc.relname, pgs.staattnum' % (oid_str, sysnslist) |
| |
| pstring = '--\n' \ |
| '-- Table: {0}, Attribute: {1}\n' \ |
| '--\n' \ |
| 'INSERT INTO pg_statistic VALUES (\n' \ |
| '{2});\n\n' |
| types = ['smallint', # staattnum |
| 'real', |
| 'integer', |
| 'real', |
| 'smallint', |
| 'smallint', |
| 'smallint', |
| 'smallint', |
| 'oid', |
| 'oid', |
| 'oid', |
| 'oid', |
| 'real[]', |
| 'real[]', |
| 'real[]', |
| 'real[]' |
| ] |
| |
| cur.execute(query) |
| |
| for vals in result_iter(cur): |
| rowVals = ["\t'%s.%s'::regclass" % (E(vals[1]), E(vals[0]))] |
| |
| if vals[3][0] == '_': |
| rowTypes = types + [vals[3]] * 4 |
| else: |
| rowTypes = types + [vals[3] + '[]'] * 4 |
| for val, typ in zip(vals[5:], rowTypes): |
| if val is None: |
| val = 'NULL' |
| elif isinstance(val, (str, unicode)) and val[0] == '{': |
| val = "E'%s'" % E(val) |
| rowVals.append('\t{0}::{1}'.format(val, typ)) |
| f_out.writelines(pstring.format(E(vals[0]), E(vals[2]), ',\n'.join(rowVals))) |
| |
| def main(): |
| parser = parse_cmd_line() |
| options, args = parser.parse_args() |
| if len(args) != 1: |
| parser.error("No database specified") |
| exit(1) |
| |
| # setup all the arguments & options |
| envOpts = os.environ |
| db = args[0] |
| host = options.host or platform.node() |
| user = options.user or ('PGUSER' in envOpts and envOpts['PGUSER']) or os.getlogin() |
| port = options.port or ('PGPORT' in envOpts and envOpts['PGPORT']) or '5432' |
| query_file = options.query_file |
| output_file = options.output_file |
| |
| if query_file is None: |
| parser.error("No query file specified.") |
| exit(1) |
| if output_file is None: |
| parser.error("No output file specified.") |
| exit(1) |
| if not os.path.isfile(query_file): |
| parser.error('Query file %s does not exist.' % query_file) |
| exit(1) |
| output_file = os.path.abspath(output_file) |
| |
| timestamp = generate_timestamp() |
| global PATH_PREFIX |
| PATH_PREFIX = PATH_PREFIX + timestamp + '/' |
| |
| # create tmp dir if not already there |
| try: |
| os.stat(PATH_PREFIX) |
| except: |
| os.mkdir(PATH_PREFIX) |
| |
| # setup the connection info tuple with options |
| connectionInfo = (host, port, user, db) |
| connectionString = ':'.join([host, port, db, user, '', pgoptions, '']) |
| print "Connecting to database: host=%s, port=%s, user=%s, db=%s ..." % connectionInfo |
| conn = pgdb.connect(connectionString) |
| cursor = conn.cursor() |
| |
| # get server version, which is dumped to minirepro output file |
| server_ver = get_server_version(cursor) |
| |
| """ |
| invoke hawq_toolkit UDF, dump object oids as json text |
| input: query file name |
| output: json oids string |
| """ |
| json_str = dump_query(connectionInfo, query_file) |
| |
| """ |
| parse json oids string, collect all things that need to be dumped |
| input: json oids string |
| output: MRQuery class (self.schemas, self.funcids, self.relids) |
| """ |
| mr_query = parse_oids(cursor, json_str) |
| |
| # dump relations and functions |
| print "Invoking pg_dump to dump DDL ..." |
| pg_dump_object(mr_query, connectionInfo, envOpts) |
| |
| ### start writing out to stdout ### |
| output_dir = os.path.dirname(output_file) |
| if not os.path.exists(output_dir): |
| os.makedirs(output_dir) |
| f_out = open(output_file, 'w') |
| ts = datetime.today() |
| f_out.writelines(['-- MiniRepro ' + version, |
| '\n-- Copyright 2016, The Apache Software Foundation' |
| '\n-- Database: ' + db, |
| '\n-- Date: ' + ts.date().isoformat(), |
| '\n-- Time: ' + ts.time().isoformat(), |
| '\n-- CmdLine: ' + ' '.join(sys.argv), |
| '\n-- Version: ' + server_ver + '\n\n']) |
| |
| # make sure we connect with the right database |
| f_out.writelines('\\connect ' + db + '\n\n') |
| |
| # first create schema DDLs |
| print "Writing schema DDLs ..." |
| table_schemas = ["CREATE SCHEMA %s;\n" % E(schema) for schema in mr_query.schemas if schema != 'public'] |
| f_out.writelines(table_schemas) |
| |
| # write relation and function DDLs |
| print "Writing relation and function DDLs ..." |
| with open(PATH_PREFIX + PGDUMP_FILE, 'r') as f_pgdump: |
| f_out.writelines(f_pgdump) |
| |
| # explicitly allow editing of these pg_class & pg_statistic tables |
| f_out.writelines(['\n-- ', |
| '\n-- Allow system table modifications', |
| '\n-- ', |
| '\nset allow_system_table_mods="DML";\n\n']) |
| |
| # dump table stats |
| print "Writing table statistics ..." |
| dump_tuple_count(cursor, mr_query.relids, f_out) |
| |
| # dump column stats |
| print "Writing column statistics ..." |
| dump_stats(cursor, mr_query.relids, f_out) |
| |
| cursor.close() |
| conn.close() |
| |
| # attach query text |
| print "Attaching raw query text ..." |
| f_out.writelines(['\n-- ', |
| '\n-- Query text', |
| '\n-- \n\n']) |
| |
| with open(query_file, 'r') as query_f: |
| for line in query_f: |
| f_out.writelines('-- ' + line) |
| |
| f_out.writelines('\n-- MiniRepro completed.\n') |
| f_out.close() |
| |
| print "--- MiniRepro completed! ---" |
| |
| if __name__ == "__main__": |
| main() |