blob: d822894f598f6d71fc56fff316a9472528f86cb4 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# import mainUtils FIRST to get python version check
from gppylib.mainUtils import *
from optparse import OptionParser
from Queue import Queue,Empty
import os
import shutil
import fnmatch
import tempfile
import time
from threading import Thread
import threading
from datetime import datetime
try:
from gppylib import gplog, pgconf, userinput
from gppylib.commands.base import Command, WorkerPool, Worker
from gppylib.operations import Operation
from gppylib.gpversion import GpVersion
from gppylib.db import dbconn
from gppylib.operations.unix import CheckDir, CheckFile, MakeDir
from gppylib.operations.dump_analyzedb import get_partition_state, \
get_pgstatlastoperations_dict, compare_metadata, compare_dict, \
write_lines_to_file, verify_lines_in_file, ValidateIncludeTargets, \
ValidateSchemaExists
from gppylib.operations.backup_utils import execute_sql, get_lines_from_file
except ImportError, e:
sys.exit('Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e))
EXECNAME = 'analyzedb'
ANALYZE_SQL = """psql %s -p %s -c 'analyze %s'"""
ANALYZE_ROOT_SQL = """psql %s -p %s -c 'analyze rootpartition %s'"""
STATEFILE_DIR = 'db_analyze'
logger = gplog.get_default_logger()
CURR_TIME = None
PG_PARTITIONS_SURROGATE = """
SELECT n.nspname AS schemaname, cl.relname AS tablename, n2.nspname AS partitionschemaname, cl2.relname AS partitiontablename, cl3.relname AS parentpartitiontablename
FROM pg_namespace n, pg_namespace n2, pg_class cl, pg_class cl2, pg_partition pp,
pg_partition_rule pr1
LEFT JOIN pg_partition_rule pr2 ON pr1.parparentrule = pr2.oid
LEFT JOIN pg_class cl3 ON pr2.parchildrelid = cl3.oid
WHERE pp.paristemplate = false AND pp.parrelid = cl.oid AND pr1.paroid = pp.oid AND cl2.oid = pr1.parchildrelid AND cl.relnamespace = n.oid AND cl2.relnamespace = n2.oid
"""
GET_ALL_DATA_TABLES_SQL = """ select n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where
c.relnamespace = n.oid and c.relkind='r'::char and (c.relnamespace > 16384 or n.nspname = 'public' or n.nspname = 'pg_catalog') and c.oid not in (select reloid from pg_exttable)
EXCEPT
select distinct schemaname, tablename from (%s) AS pps1
EXCEPT
select distinct partitionschemaname, parentpartitiontablename from (%s) AS pps2 where parentpartitiontablename is not NULL
""" % (PG_PARTITIONS_SURROGATE, PG_PARTITIONS_SURROGATE)
GET_VALID_DATA_TABLES_SQL = """
select n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where
c.relnamespace = n.oid and c.oid in (%s) and c.relkind='r'::char and (c.relnamespace > 16384 or n.nspname = 'public' or n.nspname = 'pg_catalog') and c.oid not in (select reloid from pg_exttable)
"""
GET_REQUESTED_AO_DATA_TABLE_INFO_SQL = """
SELECT ALL_DATA_TABLES.oid, ALL_DATA_TABLES.schemaname, ALL_DATA_TABLES.tablename, OUTER_PG_CLASS.relname as tupletable FROM
(
select c.oid as oid, n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where
c.relnamespace = n.oid and c.oid in (%s)
) as ALL_DATA_TABLES, pg_appendonly, pg_class OUTER_PG_CLASS
WHERE ALL_DATA_TABLES.oid = pg_appendonly.relid
AND OUTER_PG_CLASS.oid = pg_appendonly.segrelid
"""
GET_REQUESTED_LAST_OP_INFO_SQL = """
SELECT PGN.nspname, PGC.relname, objid, staactionname, stasubtype, statime FROM pg_stat_last_operation, pg_class PGC, pg_namespace PGN
WHERE objid = PGC.oid
AND objid in (%s)
AND PGC.relnamespace = PGN.oid
AND staactionname IN ('CREATE', 'ALTER', 'TRUNCATE')
ORDER BY objid, staactionname
"""
GET_ALL_DATA_TABLES_IN_SCHEMA_SQL = """
select n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where
c.relnamespace = n.oid and c.relkind='r'::char and (c.relnamespace > 16384 or n.nspname = 'public' or n.nspname = 'pg_catalog') and c.oid not in (select reloid from pg_exttable)
and n.nspname = '%s'
EXCEPT
select distinct schemaname, tablename from (%s) AS pps1
EXCEPT
select distinct partitionschemaname, parentpartitiontablename from (%s) AS pps2 where parentpartitiontablename is not NULL
""" % ('%s', PG_PARTITIONS_SURROGATE, PG_PARTITIONS_SURROGATE)
GET_LEAF_PARTITIONS_SQL = """
select partitionschemaname || '.' || partitiontablename from (%s) AS pps1 where schemaname = '%s' and tablename = '%s'
EXCEPT
select distinct partitionschemaname || '.' || parentpartitiontablename from (%s) AS pps2 where parentpartitiontablename is not NULL
and schemaname = '%s' and tablename = '%s'
""" % (PG_PARTITIONS_SURROGATE, '%s', '%s', PG_PARTITIONS_SURROGATE, '%s', '%s')
GET_MID_LEVEL_PARTITIONS_SQL = """
select distinct partitionschemaname, parentpartitiontablename from (%s) AS pps1 where parentpartitiontablename is not NULL
""" % PG_PARTITIONS_SURROGATE
GET_REQUESTED_NON_AO_TABLES_SQL = """
select n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where
c.relnamespace = n.oid and c.relkind='r'::char and (c.relnamespace > 16384 or n.nspname = 'public' or n.nspname = 'pg_catalog')
and c.oid not in (select relid from pg_appendonly) and c.oid in (%s) and c.oid not in (select reloid from pg_exttable)
EXCEPT
select distinct schemaname, tablename from (%s) AS pps1
EXCEPT
select distinct partitionschemaname, parentpartitiontablename from (%s) AS pps2 where parentpartitiontablename is not NULL
""" % ('%s', PG_PARTITIONS_SURROGATE, PG_PARTITIONS_SURROGATE)
GET_COLUMN_NAMES_SQL = """
SELECT attname FROM pg_attribute WHERE attrelid = '%s'::regclass AND attnum > 0 AND NOT attisdropped
"""
GET_INCLUDED_COLUMNS_FROM_EXCLUDE_SQL = """
SELECT attname FROM pg_attribute WHERE attrelid = '%s'::regclass AND attname NOT IN (%s) AND attnum > 0 AND NOT attisdropped
"""
VALIDATE_COLUMN_NAMES_SQL = """
SELECT count(*) FROM pg_attribute WHERE attrelid = '%s'::regclass AND attname IN (%s) AND attnum > 0 AND NOT attisdropped
"""
VALIDATE_TABLE_NAMES_SQL = """
SELECT %s
"""
GET_LEAF_ROOT_MAPPING_SQL = """
SELECT n.nspname || '.' || c2.relname, n.nspname || '.' || c.relname from pg_class c, pg_class c2, pg_namespace n, pg_partition pp, pg_partition_rule ppr
WHERE ppr.parchildrelid in (%s) AND ppr.paroid = pp.oid AND pp.parrelid = c.oid AND c.relnamespace = n.oid AND ppr.parchildrelid = c2.oid;
"""
GET_ALL_ROOT_PARTITION_TABLES_SQL = """
SELECT distinct n.nspname || '.' || c.relname from pg_class c, pg_namespace n, pg_partition pp
WHERE pp.parrelid = c.oid AND c.relnamespace = n.oid AND pp.paristemplate = false
"""
ORDER_CANDIDATES_BY_OID_SQL = """
SELECT schemaname, tablename FROM
(SELECT c.oid as tableoid, n.nspname as schemaname, c.relname as tablename FROM pg_class c, pg_namespace n where c.relnamespace=n.oid and c.oid in (%s)) AS foo
ORDER BY tableoid DESC;
"""
class AnalyzeDb(Operation):
def __init__(self, options, args):
if args:
logger.warn("Please note that some of the arguments (%s) aren't valid and will be ignored.", args)
if options.masterDataDirectory is None:
options.masterDataDirectory = gp.get_masterdatadir()
self.master_datadir = options.masterDataDirectory
self.analyze_dir = STATEFILE_DIR
self.pg_port = gp.get_master_port()
self.dbname = options.dbname
self.schema = options.schema
self.single_table = options.single_table
self.config_file = options.config_file
self.entire_db = False
self.include_cols = options.include_cols
self.exclude_cols = options.exclude_cols
self.full_analyze = options.full_analyze
self.dry_run = options.dry_run
self.parallel_level = options.parallel_level
self.rootstats = True
self.silent = options.silent
self.verbose = options.verbose
self.clean_last = options.clean_last
self.clean_all = options.clean_all
self.success_list = []
self._validate_options()
self._preprocess_options()
self.conn = dbconn.connect(dbconn.DbURL(port=self.pg_port, dbname=self.dbname), utility=False)
def _validate_options(self):
"""
Validates the options passed in to the application.
"""
if not self.dbname:
raise ProgramArgumentValidationException("option -d required. Please see 'analyzedb -?' for usage.")
if self.clean_all + self.clean_last > 1:
raise ProgramArgumentValidationException('options --clean_last and --clean_all are mutually exclusive')
if (self.schema is not None) + (self.single_table is not None) \
+ (self.config_file is not None) > 1:
raise ProgramArgumentValidationException('options -s, -t and -f are mutually exclusive')
if (self.include_cols is not None) + (self.exclude_cols is not None) > 1:
raise ProgramArgumentValidationException('options -i and -x are mutually exclusive')
if (self.include_cols is not None or self.exclude_cols is not None) and self.single_table is None:
raise ProgramArgumentValidationException('option -i or -x can only be used together with -t')
if self.parallel_level < 1 or self.parallel_level > 10:
raise ProgramArgumentValidationException('option -p requires a value between 1 and 10')
def _preprocess_options(self):
if self.clean_all:
analyze_folder = os.path.join(self.master_datadir, self.analyze_dir, self.dbname)
if os.path.exists(analyze_folder):
if not self.silent and not userinput.ask_yesno(None, "\nDeleting all files and folders in %s ?" % analyze_folder, 'N'):
raise UserAbortedException()
for f in os.listdir(analyze_folder):
f_path = os.path.join(analyze_folder, f)
if os.path.isdir(f_path):
shutil.rmtree(f_path)
else:
os.remove(f_path)
else:
logger.warning("Folder %s does not exist. Exiting...")
if self.clean_last:
last_analyze_timestamp = get_lastest_analyze_timestamp(self.master_datadir, self.analyze_dir, self.dbname)
if last_analyze_timestamp is not None:
analyze_folder = os.path.join(self.master_datadir, self.analyze_dir, self.dbname, last_analyze_timestamp)
if os.path.exists(analyze_folder):
if not self.silent and not userinput.ask_yesno(None, "\nDeleting folder %s and all files inside?" % analyze_folder, 'N'):
raise UserAbortedException()
shutil.rmtree(analyze_folder)
else:
logger.warning("Folder %s does not exist. Exiting...")
else:
logger.warning("No valid state files directories exist. Exiting...")
if self.include_cols is not None:
self.include_cols = self.include_cols.strip().split(',')
if self.exclude_cols is not None:
self.exclude_cols = self.exclude_cols.strip().split(',')
if self.single_table is None and self.schema is None and self.config_file is None:
self.entire_db = True
if self.verbose:
logger.setLevel(10)
def execute(self):
try:
if self.clean_all or self.clean_last:
return 0
global CURR_TIME
CURR_TIME = generate_timestamp()
# The input_col_dict keeps track of the requested columns to analyze.
# key: table name (e.g. 'public.foo')
# value: a set of requested column names (e.g. set(['col1','col2']), or '-1' indicating all columns
input_col_dict = {}
# parse input and update input column dictionary
input_tables = self._get_input_tables(input_col_dict) # ['public.foo', 'public.bar', ...]
input_tables_set = set(input_tables)
if len(input_tables_set) == 0:
logger.warning("There are no tables or partitions to be analyzed. Exiting...")
return 0
logger.info("Checking for tables with stale stats...")
# get all heap tables in the requested tables. all heap tables are regarded as dirty
heap_partitions = get_heap_tables_set(self.conn, input_tables_set) # set(['schema.table1', ...])
# get the current state of the requested tables
# curr_ao_state contains the number of DML commands that have been executed on an AO table
# curr_last_op contains the timestamp of the last DDL command (CREATE, ALTER, TRUNCATE, DROP) of an AO table
curr_ao_state = self._get_ao_state(input_tables_set) # ['schema, table, modcount', ...]
curr_last_op = self._get_lastop_state(input_tables_set) # e.g. ['public,ao_tab,67468,ALTER,ADD COLUMN,2014-10-15 14:49:27.658777-07', ...]
last_analyze_timestamp = get_lastest_analyze_timestamp(self.master_datadir, self.analyze_dir, self.dbname)
# get the previous state of the database
prev_ao_state = get_prev_ao_state(last_analyze_timestamp, self.master_datadir, self.analyze_dir, self.dbname)
prev_last_op = get_prev_last_op(last_analyze_timestamp, self.master_datadir, self.analyze_dir, self.dbname)
# compare two states to get dirty tables
dirty_partitions = self._get_dirty_data_tables(heap_partitions, curr_ao_state, curr_last_op, prev_ao_state, prev_last_op)
# get the previous column states
# read from the file on disk which contains info about what columns had up-to-date stats after the last time the table was analyzed
prev_col_dict = get_prev_col_state(last_analyze_timestamp, self.master_datadir, self.analyze_dir, self.dbname)
candidates = set() # set(['public.foo', 'public.bar', ...])
if self.full_analyze: # full analyze does not support column-level increments and will invalidate previous column-level state
candidates = input_tables_set
else: # incremental
for table in input_tables_set:
if table in dirty_partitions: # for dirty partitions, we invalidate all previous column-level state
candidates.add(table)
else:
# figure out which columns need to be analyzed
self._update_input_col_dict_with_column_increments(table, input_col_dict, prev_col_dict)
if len(input_col_dict[table]) > 0:
candidates.add(table)
candidates = self._get_valid_candidates(candidates)
if len(candidates) < 1:
logger.warning("There are no tables or partitions to be analyzed. Exiting...")
return 0
root_partition_col_dict = {}
# root_partition_col_dict contains the mapping between the root partitions
# and its corresponding columns to be analyzed
# key: name of the root partition whose stats needs to be refreshed
# value: a set of column names to be analyzed, or '-1' meaning all columns of that table
if self.rootstats:
root_partition_col_dict = self._get_root_partition_col_dict(candidates, input_col_dict)
ordered_candidates = self._get_ordered_candidates(candidates, root_partition_col_dict)
target_list = []
logger.info("---------------------------------------------------")
logger.info("Tables or partitions to be analyzed")
logger.info("---------------------------------------------------")
for can in ordered_candidates:
if can in candidates:
target = self._get_tablename_with_cols(can, input_col_dict)
else: # can in root_partition_col_dict
target = self._get_tablename_with_cols(can, root_partition_col_dict)
logger.info(target)
target_list.append(target)
logger.info("---------------------------------------------------")
if self.dry_run:
return 0
if not self.silent and not userinput.ask_yesno(None, "\nContinue with Analyze", 'N'):
raise UserAbortedException()
num_workers = min(self.parallel_level, len(ordered_candidates))
logger.info("Starting analyze with %d workers..." % num_workers)
pool = AnalyzeWorkerPool(numWorkers=num_workers)
for can in ordered_candidates:
if can in candidates:
target = self._get_tablename_with_cols(can, input_col_dict)
cmd = Command('analyze %s' % target, ANALYZE_SQL % (self.dbname, self.pg_port, quote_tbl(target)))
else: # can in root_partition_col_dict
target = self._get_tablename_with_cols(can, root_partition_col_dict)
cmd = Command('analyze rootpartition %s' % target, ANALYZE_ROOT_SQL % (self.dbname, self.pg_port, quote_tbl(target)))
pool.addCommand(cmd)
wait_count = len(ordered_candidates)
try:
start_time = time.time()
while wait_count > 0:
done_cmd = pool.completed_queue.get()
if done_cmd.was_successful():
subject = self._get_tablename_from_cmd_name(done_cmd.name)
self.success_list.append(subject)
if wait_count % 10 == 0:
logger.info("progress status: completed %d out of %d tables or partitions" %
(len(self.success_list), len(ordered_candidates)))
wait_count -= 1
for worker in pool.workers:
worker.join(1)
except:
pool.haltWork()
for worker in pool.workers:
worker.join(1)
raise
finally:
self._write_back(curr_ao_state, curr_last_op, prev_ao_state, prev_last_op, heap_partitions,
input_col_dict, prev_col_dict, root_partition_col_dict, self.full_analyze, dirty_partitions,
target_list)
end_time = time.time()
logger.info("Total elapsed time: %d seconds. Analyzed %d out of %d table(s) or partition(s) successfully."
% (int(end_time-start_time), len(self.success_list), len(ordered_candidates)))
logger.info("Done.")
except:
raise
finally:
if self.conn:
self.conn.close()
return 0
def _get_input_tables(self, col_dict):
"""
Depending on the way this program was invoked, gather all the requested tables to be analyzed.
At the same time, parse the requested columns and populate the col_dict.
If a requested table is partitioned, expand all the leaf partitions.
"""
logger.info("Getting and verifying input tables...")
if self.single_table:
validate_include_tables(self.conn, [self.single_table], None)
# for single table, we always try to expand it to avoid getting all root partitions in the database
self._parse_column(col_dict, self.single_table, self.include_cols, self.exclude_cols, True)
elif self.schema: # all tables in a schema
ValidateSchemaExists(self.dbname, self.schema, self.pg_port).run()
logger.debug("getting all tables in the schema...")
all_schema_tables = run_sql(self.conn, GET_ALL_DATA_TABLES_IN_SCHEMA_SQL % self.schema)
# convert table name from ['public','foo'] to 'public.foo' and populate col_dict as all columns requested
for table in all_schema_tables:
col_dict['.'.join(table)] = set(['-1'])
elif self.config_file:
validate_include_tables(self.conn, None, self.config_file)
all_root_partitions = run_sql(self.conn, GET_ALL_ROOT_PARTITION_TABLES_SQL)
cfg_file = open(self.config_file, 'rU')
for line in cfg_file:
toks = line.strip().split()
table = toks[0]
included_cols = self._get_include_or_exclude_cols(toks, '-i')
excluded_cols = self._get_include_or_exclude_cols(toks, '-x')
self._parse_column(col_dict, table, included_cols, excluded_cols, [table] in all_root_partitions)
else: # all user tables in database
alltables = run_sql(self.conn, GET_ALL_DATA_TABLES_SQL)
for table in alltables:
col_dict['.'.join(table)] = set(['-1'])
return col_dict.keys()
def _get_include_or_exclude_cols(self, line_tokens, option_str):
"""
Get included or excluded columns from a line in the config file
:param line_tokens: tokenized lien in the config file, e.g., ['<schema>.<table>', '-i', '<col1>,<col2>,...']
:param option_str: 'include' or 'exclude'
:return: a list of included or excluded columns
"""
pos = line_tokens.index(option_str) if option_str in line_tokens else -1
if pos < 0:
cols = None
else:
if pos+1 >= len(line_tokens):
raise Exception("No %s columns specified." % option_str)
cols = line_tokens[pos+1].split(',')
return cols
def _get_valid_candidates(self, candidates):
"""
Validate candidates for the following:
1. check whether candidates set is empty
2. check invalid characters in table names
3. skip views and external tables
"""
ret = []
mid_level_partitions = run_sql(self.conn, GET_MID_LEVEL_PARTITIONS_SQL)
for can in candidates:
if '\n' in can or ',' in can or ':' in can:
raise Exception('Table name has an invalid character "\\n", ":", "," : "%s"' % can)
if can.split('.') in mid_level_partitions:
logger.warning("Skipping mid-level partition %s" % can)
else:
ret.append(can)
if self.config_file is not None or self.single_table is not None:
valid_tables = []
if len(ret) > 0:
oid_str = get_oid_str(ret)
qresult = run_sql(self.conn, GET_VALID_DATA_TABLES_SQL % oid_str)
for tbl in qresult:
valid_tables.append('.'.join(tbl))
return valid_tables
return ret
def _get_dirty_data_tables(self, heap_tables_set, curr_ao_state, curr_last_op, prev_ao_state, prev_last_op):
"""
dirty data tables include:
- heap tables
- ao tables that have gone through DML or DDL
"""
logger.debug("getting dirty data tables...")
dirty_heap_tables = heap_tables_set
dirty_ao_tables = self._get_dirty_ao_state_tables(curr_ao_state, prev_ao_state)
dirty_metadata_set = self._get_dirty_lastop_tables(curr_last_op, prev_last_op)
return dirty_heap_tables | dirty_ao_tables | dirty_metadata_set
def _get_ao_state(self, input_tables_set):
logger.debug("getting ao state...")
oid_str = get_oid_str(input_tables_set)
ao_partition_info = run_sql(self.conn, GET_REQUESTED_AO_DATA_TABLE_INFO_SQL % oid_str)
return get_partition_state(self.pg_port, self.dbname, 'pg_aoseg', ao_partition_info)
def _get_lastop_state(self, input_tables_set):
# oid, action, subtype, timestamp
logger.debug("getting last operation states...")
oid_str = get_oid_str(input_tables_set)
rows = run_sql(self.conn, GET_REQUESTED_LAST_OP_INFO_SQL % oid_str)
data = []
for row in rows:
line = "%s,%s,%d,%s,%s,%s" % (row[0], row[1], row[2], row[3], row[4], row[5])
data.append(line)
return data
def _write_back(self, curr_ao_state, curr_last_op, prev_ao_state, prev_last_op, heap_partitions,
input_col_dict, prev_col_dict, root_partition_col_dict, is_full, dirty_partitions, target_list):
validate_dir("%s/%s/%s/%s" % (self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME))
curr_ao_state_dict = create_ao_state_dict(curr_ao_state)
curr_last_op_dict = create_last_op_dict(curr_last_op)
prev_ao_state_dict = create_ao_state_dict(prev_ao_state)
prev_last_op_dict = create_last_op_dict(prev_last_op)
for table in (x for x in self.success_list if x not in heap_partitions and x not in root_partition_col_dict):
# update modcount for tables that are successfully analyzed
new_modcount = curr_ao_state_dict[table]
prev_ao_state_dict[table] = new_modcount
# update last op for tables that are successfully analyzed
last_op_info = curr_last_op_dict[table] # {'CREATE':'<entry>', 'ALTER':'<entry>', ...}
prev_last_op_dict[table] = last_op_info
# update column dict
if is_full or table in dirty_partitions or table not in prev_col_dict or '-1' in input_col_dict[table]:
prev_col_dict[table] = input_col_dict[table]
else:
prev_col_dict[table] = prev_col_dict[table] | input_col_dict[table]
ao_state_output = construct_entries_from_dict_aostate(prev_ao_state_dict)
last_op_output = construct_entries_from_dict_lastop(prev_last_op_dict)
col_state_output = construct_entries_from_dict_colstate(prev_col_dict)
if len(ao_state_output) > 0:
ao_state_filename = generate_statefile_name('ao', self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME)
logger.info("Writing ao state file %s" % ao_state_filename)
write_lines_to_file(ao_state_filename, ao_state_output)
logger.debug("Verifying ao state file ...")
verify_lines_in_file(ao_state_filename, ao_state_output)
if len(last_op_output) > 0:
last_operation_filename = generate_statefile_name('lastop', self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME)
logger.info("Writing last operation file %s" % last_operation_filename)
write_lines_to_file(last_operation_filename, last_op_output)
logger.debug("Verifying last operation file ...")
verify_lines_in_file(last_operation_filename, last_op_output)
if len(prev_col_dict) > 0:
col_state_filename = generate_statefile_name('col', self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME)
logger.info("Writing column state file %s" % col_state_filename)
write_lines_to_file(col_state_filename, col_state_output)
logger.debug("Verifying column state ...")
verify_lines_in_file(col_state_filename, col_state_output)
report_filename = generate_statefile_name('report', self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME)
logger.info("Writing report file %s" % report_filename)
with open(report_filename, 'w') as fp:
fp.write("%s:%s:%s:%s %s:%s:analyzedb %s\n\n" % (CURR_TIME[:8], CURR_TIME[8:10], CURR_TIME[10:12], CURR_TIME[12:14],
unix.getLocalHostname(), unix.getUserName(), ' '.join(sys.argv[1:])))
fp.write("Tables or partitions to analyze:\n---------------------------------------\n")
for target in target_list:
fp.write("%s\n" % target.strip())
fp.write("\n\nTables or partitions successfully analyzed:\n--------------------------------------------\n")
for tbl in self.success_list:
fp.write("%s\n" % tbl.strip())
fp.write("\n%d out of %d tables are analyzed.\n" % (len(self.success_list), len(target_list)))
if len(target_list) == len(self.success_list):
fp.write("\nanalyzedb finished successfully.\n")
def _get_dirty_lastop_tables(self, curr_last_op, prev_last_op):
old_pgstatoperations_dict = get_pgstatlastoperations_dict(prev_last_op)
dirty_tables = compare_metadata(old_pgstatoperations_dict, curr_last_op)
return dirty_tables
def _get_dirty_ao_state_tables(self, curr_ao_state, prev_ao_state):
last_state_dict = create_ao_state_dict(prev_ao_state)
curr_state_dict = create_ao_state_dict(curr_ao_state)
return compare_dict(last_state_dict, curr_state_dict)
def _parse_column(self, col_dict, table, include_cols, exclude_cols, is_root_partition):
"""
Given a list of included or excluded columns of a table, populate the column dictionary.
If the table is partitioned, expand it into all leaf partitions.
If both include_cols and exclude_cols are empty, use '-1' as the value indicating 'all columns'.
"""
if include_cols is not None:
validate_columns(self.conn, table, include_cols)
elif exclude_cols is not None:
validate_columns(self.conn, table, exclude_cols)
included_column_set = get_include_cols_from_exclude(self.conn, table, exclude_cols)
if len(included_column_set) == 0:
raise Exception("All columns have been excluded from table %s" % table)
if is_root_partition:
logger.debug("expanding partition tables...")
tbl_parts = self._expand_partition_tables([table])
else:
tbl_parts = [table]
for tbl in tbl_parts:
if include_cols is not None:
col_dict[tbl] = set(include_cols)
elif exclude_cols is not None:
col_dict[tbl] = included_column_set
else: # all columns
col_dict[tbl] = set(['-1'])
def _update_input_col_dict_with_column_increments(self, table, input_col_dict, prev_col_dict):
if table in prev_col_dict:
# since expanding the default '-1' to all column name is expensive, we avoid this as much as possible
if '-1' not in input_col_dict[table] or '-1' not in prev_col_dict[table]:
input_col_set = self._expand_columns(input_col_dict, table)
prev_col_set = self._expand_columns(prev_col_dict, table)
pending_cols = input_col_set - prev_col_set # set difference
input_col_dict[table] = pending_cols
else: # both previous and current runs are without column specification
input_col_dict[table] = set()
def _get_tablename_with_cols(self, table, col_dict):
if '-1' in col_dict[table]:
return table
else:
return table + '(' + ','.join(sorted(col_dict[table])) + ')'
def _get_tablename_from_cmd_name(self, cmdName):
if '(' in cmdName:
subject = cmdName.split()[-1].split('(')[0]
else:
subject = cmdName.split()[-1]
return subject
def _expand_partition_tables(self, table_list):
ret = []
for table in table_list:
(schema, parent) = table.split('.')
qresult = run_sql(self.conn, GET_LEAF_PARTITIONS_SQL % (schema, parent, schema, parent))
child_parts = [x[0] for x in qresult]
if len(child_parts) == 0:
ret.append(table)
else:
ret += child_parts
return ret
def _get_root_partition_col_dict(self, candidates, input_col_dict):
"""
Examine the candidates and figure out the root partitions whose stats need refreshing and
what columns need to be analyzed on those root partitions.
If the program is invoked on whole schema or whole database, then we know all columns have
been requested. Thus we can use one query to obtain the root partitions associated with the
candidates.
If the program is invoked by '-t' or '-f', we need to either look up the partition_dict or
issue a query to get the leaf-root relationship. Then the columns to be analyzed on the root
level are the set union of the columns to be analyzed for all leaf partitions.
"""
logger.debug("getting mapping between leaf and root partition tables...")
ret = {}
# The leaf_root_dict keeps track of the mapping between a leaf partition and its root partition
# for the use of refreshing root stats.
leaf_root_dict = {}
oid_str = get_oid_str(candidates)
qresult = run_sql(self.conn, GET_LEAF_ROOT_MAPPING_SQL % oid_str)
for mapping in qresult:
leaf_root_dict[mapping[0]] = mapping[1]
for can in candidates:
if can in leaf_root_dict: # this is a leaf partition
if leaf_root_dict[can] not in ret:
ret[leaf_root_dict[can]] = input_col_dict[can].copy()
else:
ret[leaf_root_dict[can]] |= input_col_dict[can].copy()
## TODO: do we need column expansion here?
return ret
def _get_ordered_candidates(self, candidates, root_partition_col_dict):
"""
Take all tables in candidates and root_partition_col_dict and order them
by descending order of their OIDs. This gives us two important benefits:
1. The root partition will be analyzed right after the leaves
2. The leaf partitions (if range partitioned, especially by date) will be ordered in descending
order of the partition key, so that newer partitions can be analyzed first.
"""
candidate_regclass_str = get_oid_str(candidates+root_partition_col_dict.keys())
qresult = run_sql(self.conn, ORDER_CANDIDATES_BY_OID_SQL % candidate_regclass_str)
return ['.'.join(x) for x in qresult]
def _expand_columns(self, col_dict, table):
if '-1' in col_dict[table]:
cols = run_sql(self.conn, GET_COLUMN_NAMES_SQL % quote_tbl(table))
return set([x[0] for x in cols])
else:
return col_dict[table]
def run_sql(conn, query):
try:
cursor = dbconn.execSQL(conn, query)
res = cursor.fetchall()
except Exception, db_err:
raise ExceptionNoStackTraceNeeded("%s" % db_err.__str__()) #.split('\n')[0])
cursor.close()
return res
def generate_timestamp():
timestamp = datetime.now()
return timestamp.strftime("%Y%m%d%H%M%S")
def quote_tbl(target):
"""
Quote schema names and table names before pass them to query strings.
If target contains column name, e.g. public.foo(a,b), quote column name separately.
E.g. public.foo(a,b) will become "public"."foo"("a","b")
"""
sch, tbl_raw = target.split('.')
cols_str = ''
if '(' in tbl_raw:
tbl = tbl_raw.split('(')[0]
cols = tbl_raw.split('(')[1][:-1].split(',')
cols_str = '(' + ','.join(['"%s"' % x for x in cols]) + ')'
else:
tbl = tbl_raw
return '"%s"."%s"' % (sch, tbl) + cols_str
def get_oid_str(table_list):
return ','.join(["""'\"%s\".\"%s\"'::regclass""" % (x.split('.')[0], x.split('.')[1]) for x in table_list])
def get_heap_tables_set(conn, input_tables_set):
logger.debug("getting heap tables...")
oid_str = get_oid_str(input_tables_set)
dirty_tables = set()
qresult = run_sql(conn, GET_REQUESTED_NON_AO_TABLES_SQL % oid_str)
for row in qresult:
dirty_tables.add('.'.join(row))
return dirty_tables
def get_lastest_analyze_timestamp(master_datadir, statefile_dir, dbname):
analyze_dirs = get_analyze_dirs(master_datadir, statefile_dir, dbname)
for analyze_dir in analyze_dirs:
files = sorted(os.listdir(analyze_dir))
if len(files) == 0:
logger.warn('Analyze state file directory %s is empty. Ignoring this directory...' % analyze_dir)
continue
analyze_report_files = fnmatch.filter(files, 'analyze_[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]_report')
if len(analyze_report_files) == 0:
logger.warn('No analyze report files found in analyze directory %s. Ignoring this directory...' % analyze_dir)
continue
for report_file in analyze_report_files:
return report_file.split('_')[1]
return None
def get_prev_ao_state(timestamp, master_datadir, analyze_dir, dbname):
logger.debug("getting previous ao state...")
prev_state_filename = generate_statefile_name('ao', master_datadir, analyze_dir, dbname, timestamp)
if not os.path.isfile(prev_state_filename):
return []
return get_lines_from_file(prev_state_filename)
def get_prev_last_op(timestamp, master_datadir, analyze_dir, dbname):
logger.debug("getting previous last operation...")
old_pgstatoperations_file = generate_statefile_name('lastop', master_datadir, analyze_dir, dbname, timestamp)
if not os.path.isfile(old_pgstatoperations_file):
old_pgstatoperations = []
else:
old_pgstatoperations = get_lines_from_file(old_pgstatoperations_file)
return old_pgstatoperations
def get_prev_col_state(timestamp, master_datadir, analyze_dir, dbname):
logger.debug("getting previous column states...")
prev_col_state_file = generate_statefile_name('col', master_datadir, analyze_dir, dbname, timestamp)
if not os.path.isfile(prev_col_state_file):
return {}
lines = get_lines_from_file(prev_col_state_file)
prev_col_dict = {}
for line in lines:
toks = line.strip().split(',')
toks = map(str.strip, toks)
prev_col_dict['.'.join(toks[:2])] = set(toks[2:])
return prev_col_dict
def create_ao_state_dict(ao_state_entries):
ao_state_dict = dict()
for entry in ao_state_entries:
fields = entry.split(',')
if len(fields) != 3:
raise Exception('Invalid ao state file format %s' % entry)
key = '%s.%s' % (fields[0].strip(), fields[1].strip())
ao_state_dict[key] = fields[2].strip()
return ao_state_dict
def create_last_op_dict(last_op_entries):
last_op_dict = {}
for entry in last_op_entries:
toks = entry.split(',')
if len(toks) != 6:
raise Exception('Wrong number of tokens in last_operation data for last backup: "%s"' % entry)
key = '%s.%s' % (toks[0],toks[1])
op = toks[3]
if key not in last_op_dict:
last_op_dict[key] = {op:entry}
else:
last_op_dict[key][op] = entry
return last_op_dict
def construct_entries_from_dict_aostate(ao_state_dict):
ret = []
for key, item in ao_state_dict.iteritems():
token = ','.join(key.split('.'))
ret.append(','.join([token, item]))
return ret
def construct_entries_from_dict_lastop(last_op_dict):
ret = []
for value in last_op_dict.itervalues():
for entry in value.itervalues():
ret.append(entry)
return ret
def construct_entries_from_dict_colstate(prev_col_dict):
ret = []
for table, col_set in prev_col_dict.iteritems():
cols = ','.join(col_set)
ret.append(','.join([','.join(table.split('.')), cols])) # public,foo,a,b,c
return ret
def generate_statefile_name(type_str, master_data_dir, analyze_dir, dbname, timestamp):
use_dir = "%s/%s/%s/%s" % (master_data_dir, analyze_dir, dbname, timestamp)
if type_str == 'lastop':
ret_str = "%s/analyze_%s_last_operation"
elif type_str == 'ao':
ret_str = "%s/analyze_%s_ao_state_file"
elif type_str == 'col':
ret_str = "%s/analyze_%s_col_state_file"
elif type_str == 'report':
ret_str = "%s/analyze_%s_report"
else:
raise Exception("Invalid type string for generating state file name")
return ret_str % (use_dir, timestamp)
def get_analyze_dirs(master_datadir, statefile_dir, dbname):
analyze_path = os.path.join(master_datadir, statefile_dir, dbname)
if not os.path.isdir(analyze_path):
return []
initial_list = os.listdir(analyze_path)
initial_list = fnmatch.filter(initial_list, '[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]')
dirnames = []
for d in initial_list:
pth = os.path.join(analyze_path, d)
if os.path.isdir(pth):
dirnames.append(pth)
if len(dirnames) == 0:
return []
dirnames = sorted(dirnames, key=lambda x: int(os.path.basename(x)), reverse=True)
return dirnames
def validate_dir(path):
exists = CheckDir(path).run()
if exists:
logger.info("Directory %s exists" % path)
else:
try:
MakeDir(path).run()
except OSError, e:
logger.exception("Could not create directory %s" % path)
raise AnalyzeDirCreateFailed()
else:
logger.info("Created %s" % path)
try:
with tempfile.TemporaryFile(dir=path) as f:
pass
except Exception, e:
logger.exception("Cannot write to %s" % path)
raise AnalyzeDirNotWritable()
def validate_include_tables(conn, table_list, include_file):
"""
table_list needs to be a list of 'schema.table's
include_file could contain columns specified by -i
"""
tables = {}
if table_list is not None:
for tbl in table_list:
if '.' not in tbl:
raise ExceptionNoStackTraceNeeded("No schema name supplied for table %s" % tbl)
tables[tbl] = 0
if include_file is not None:
in_file = open(include_file, 'rU')
line_no = 1
for line in in_file:
toks = line.strip().split()
if len(toks) == 0: # emtpy line
continue
if len(toks) > 3: # we are expecting <schema>.<table> --in(ex)clude_column <col1>,<col2>,...
raise ExceptionNoStackTraceNeeded("Wrong input arguments in line %d of config file. Please check usage." % line_no)
if '.' not in toks[0]:
raise ExceptionNoStackTraceNeeded("No schema name supplied for table %s" % toks[0])
if toks[0] in tables:
raise ExceptionNoStackTraceNeeded("Duplicate table name in line %d of config file." % line_no)
tables[toks[0]] = 0
line_no += 1
# since the number of target entries cannot be greater than 1664 in GPDB/HAWQ,
# we validate the tables in batches of 1500
tablenames = tables.keys()
batch_size = 1500
nbatches = (len(tables)-1)/batch_size + 1
curr_batch = 0
while curr_batch < nbatches:
oid_str = get_oid_str(tablenames[curr_batch*batch_size:(curr_batch+1)*batch_size])
run_sql(conn, VALIDATE_TABLE_NAMES_SQL % oid_str)
curr_batch += 1
def get_include_cols_from_exclude(conn, table, exclude_cols):
"""
Given a list of excluded columns of a table, get the list of included columns
"""
quoted_exclude_cols = ','.join(["'%s'" % x for x in exclude_cols])
cols = run_sql(conn, GET_INCLUDED_COLUMNS_FROM_EXCLUDE_SQL % (quote_tbl(table), quoted_exclude_cols))
return set([x[0] for x in cols])
def validate_columns(conn, table, column_list):
"""
Check whether all column names in a list are valid for a table
"""
if len(column_list) == 0:
return
valid_col_count = dbconn.execSQLForSingleton(conn, VALIDATE_COLUMN_NAMES_SQL % (quote_tbl(table), ','.join(["'%s'" % x for x in column_list])))
if int(valid_col_count) != len(column_list):
raise Exception("Invalid input columns for table %s." % table)
def create_parser():
parser = OptionParser(version='%prog version 1.0',
description=
"Analyze a database incrementally. 'Incremental' means if a table or partition has not been modified by"
"DML or DDL commands since the last analyzedb run, it will be automatically skipped since its statistics"
"must be up to date. Some restrictions apply: "
"1. The incremental semantics only applies to append-only tables or partitions. All heap tables are regarded"
"as having stale stats every time analyzedb is run. This is because we use AO metadata to check for DML or"
"DDL events, which is not available to heap tables. "
"2. Views, indices and external tables are automatically skipped. "
"3. Table names or schema names containing comma or period is not supported yet."
)
parser.set_usage('%prog [options] ')
parser.remove_option('-h')
parser.add_option('-d', dest='dbname', metavar="<database name>",
help="Database name. Required.")
parser.add_option('-s', dest='schema', metavar="<schema name>",
help="Specify a schema to analyze. All tables in the schema will be analyzed.")
parser.add_option('-t', dest='single_table', metavar="<schema name>.<table name>",
help="Analyze a single table. Table name needs to be qualified with schema name.")
parser.add_option('-i', type='string', dest='include_cols', metavar="<column1>,<column2>,...",
help="Columns to include to be analyzed, separated by comma. All columns will be analyzed if not specified.")
parser.add_option('-x', type='string', dest='exclude_cols', metavar="<column1>,<column2>,...",
help="Columns to exclude to be analyzed, separated by comma. All columns will be analyzed if not specified.")
parser.add_option('-f', '--file', dest='config_file', metavar="<config_file>",
help="Config file that includes a list of tables to be analyzed. "
"Table names must be qualified with schema name. Optionally a list of columns (separated by "
"comma) can be specified using -i or -x.")
parser.add_option('-l', '--list', action='store_true', dest='dry_run', default=False,
help="List the tables to be analyzed without actually running analyze (dry run).")
parser.add_option('-p', type='int', dest='parallel_level', default=5, metavar="<parallel level>",
help="Parallel level, i.e. the number of tables to be analyzed in parallel. Valid numbers are between 1 and 10. Default value is 5.")
parser.add_option('--full', action='store_true', dest='full_analyze', default=False,
help="Analyze without using incremental. All tables requested by the user will be analyzed.")
parser.add_option('--clean_last', action='store_true', dest='clean_last', default=False,
help="Clean the state files generated by last analyzedb run. All other options except -d will be ignored.")
parser.add_option('--clean_all', action='store_true', dest='clean_all', default=False,
help="Clean all the state files generated by analyzedb. All other options except -d will be ignored.")
parser.add_option('-h', '-?', '--help', action='help',
help='Show this help message and exit.')
parser.add_option('-v', '--verbose', action='store_true', dest='verbose', help='Print debug messages.')
parser.add_option('-a', action='store_true', dest='silent', default=False,
help="Quiet mode. Do not prompt for user confirmation.")
return parser
class AnalyzeDirCreateFailed(Exception): pass
class AnalyzeDirNotWritable(Exception): pass
class AnalyzeWorkerPool(WorkerPool):
"""
a custom worker pool for analyze workers
"""
def __init__(self, numWorkers=5, items=None):
self.workers=[]
self.work_queue=Queue()
self.completed_queue=Queue()
self.num_assigned=0
if items is not None:
for item in items:
self.work_queue.put(item)
self.num_assigned += 1
for i in range(0,numWorkers):
# use AnalyzeWorker instead of Worker
w = AnalyzeWorker("worker%d" % i,self)
self.workers.append(w)
w.start()
self.numWorkers = numWorkers
self.logger = logger
class AnalyzeWorker(Thread):
"""
a custom worker thread for Analyze
"""
pool=None
cmd=None
name=None
logger=None
def __init__(self,name,pool,timeout=0.05):
self.name=name
self.pool=pool
self.timeout=timeout
self.logger=logger
self.stoprequest = threading.Event()
Thread.__init__(self)
def run(self):
while not self.stoprequest.isSet():
try:
self.cmd = self.pool.getNextWorkItem(timeout=self.timeout)
if self.cmd is not None:
# tablename = self.cmd.cmdStr.split()[-1][:-1]
self.logger.info("[%s] started %s" % (self.name, self.cmd.name))
start_time = time.time()
self.cmd.run()
end_time = time.time()
stderr = self.cmd.get_stderr_lines()
if len(stderr) > 0: # emit stderr if there is any
self.logger.warning('\n'.join(stderr))
if self.cmd.was_successful():
self.logger.info("[%s] finished %s. Elapsed time: %d seconds." % (self.name, self.cmd.name,
int(end_time-start_time)))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd=None
except Empty:
return
except:
if self.cmd:
self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd=None
raise
def haltWork(self):
self.stoprequest.set()
c = self.cmd
if c is not None:
c.interrupt()
c.cancel()
if __name__ == '__main__':
sys.argv[0] = EXECNAME
simple_main(create_parser, AnalyzeDb)