| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| # import mainUtils FIRST to get python version check |
| from gppylib.mainUtils import * |
| from optparse import OptionParser |
| from Queue import Queue,Empty |
| import os |
| import shutil |
| import fnmatch |
| import tempfile |
| import time |
| from threading import Thread |
| import threading |
| from datetime import datetime |
| |
| try: |
| from gppylib import gplog, pgconf, userinput |
| from gppylib.commands.base import Command, WorkerPool, Worker |
| from gppylib.operations import Operation |
| from gppylib.gpversion import GpVersion |
| from gppylib.db import dbconn |
| from gppylib.operations.unix import CheckDir, CheckFile, MakeDir |
| from gppylib.operations.dump_analyzedb import get_partition_state, \ |
| get_pgstatlastoperations_dict, compare_metadata, compare_dict, \ |
| write_lines_to_file, verify_lines_in_file, ValidateIncludeTargets, \ |
| ValidateSchemaExists |
| from gppylib.operations.backup_utils import execute_sql, get_lines_from_file |
| |
| except ImportError, e: |
| sys.exit('Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e)) |
| |
| EXECNAME = 'analyzedb' |
| ANALYZE_SQL = """psql %s -p %s -c 'analyze %s'""" |
| ANALYZE_ROOT_SQL = """psql %s -p %s -c 'analyze rootpartition %s'""" |
| STATEFILE_DIR = 'db_analyze' |
| logger = gplog.get_default_logger() |
| CURR_TIME = None |
| |
| PG_PARTITIONS_SURROGATE = """ |
| SELECT n.nspname AS schemaname, cl.relname AS tablename, n2.nspname AS partitionschemaname, cl2.relname AS partitiontablename, cl3.relname AS parentpartitiontablename |
| FROM pg_namespace n, pg_namespace n2, pg_class cl, pg_class cl2, pg_partition pp, |
| pg_partition_rule pr1 |
| LEFT JOIN pg_partition_rule pr2 ON pr1.parparentrule = pr2.oid |
| LEFT JOIN pg_class cl3 ON pr2.parchildrelid = cl3.oid |
| WHERE pp.paristemplate = false AND pp.parrelid = cl.oid AND pr1.paroid = pp.oid AND cl2.oid = pr1.parchildrelid AND cl.relnamespace = n.oid AND cl2.relnamespace = n2.oid |
| """ |
| |
| GET_ALL_DATA_TABLES_SQL = """ select n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where |
| c.relnamespace = n.oid and c.relkind='r'::char and (c.relnamespace > 16384 or n.nspname = 'public' or n.nspname = 'pg_catalog') and c.oid not in (select reloid from pg_exttable) |
| EXCEPT |
| select distinct schemaname, tablename from (%s) AS pps1 |
| EXCEPT |
| select distinct partitionschemaname, parentpartitiontablename from (%s) AS pps2 where parentpartitiontablename is not NULL |
| """ % (PG_PARTITIONS_SURROGATE, PG_PARTITIONS_SURROGATE) |
| |
| GET_VALID_DATA_TABLES_SQL = """ |
| select n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where |
| c.relnamespace = n.oid and c.oid in (%s) and c.relkind='r'::char and (c.relnamespace > 16384 or n.nspname = 'public' or n.nspname = 'pg_catalog') and c.oid not in (select reloid from pg_exttable) |
| """ |
| |
| GET_REQUESTED_AO_DATA_TABLE_INFO_SQL = """ |
| SELECT ALL_DATA_TABLES.oid, ALL_DATA_TABLES.schemaname, ALL_DATA_TABLES.tablename, OUTER_PG_CLASS.relname as tupletable FROM |
| ( |
| select c.oid as oid, n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where |
| c.relnamespace = n.oid and c.oid in (%s) |
| ) as ALL_DATA_TABLES, pg_appendonly, pg_class OUTER_PG_CLASS |
| WHERE ALL_DATA_TABLES.oid = pg_appendonly.relid |
| AND OUTER_PG_CLASS.oid = pg_appendonly.segrelid |
| """ |
| |
| GET_REQUESTED_LAST_OP_INFO_SQL = """ |
| SELECT PGN.nspname, PGC.relname, objid, staactionname, stasubtype, statime FROM pg_stat_last_operation, pg_class PGC, pg_namespace PGN |
| WHERE objid = PGC.oid |
| AND objid in (%s) |
| AND PGC.relnamespace = PGN.oid |
| AND staactionname IN ('CREATE', 'ALTER', 'TRUNCATE') |
| ORDER BY objid, staactionname |
| """ |
| |
| GET_ALL_DATA_TABLES_IN_SCHEMA_SQL = """ |
| select n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where |
| c.relnamespace = n.oid and c.relkind='r'::char and (c.relnamespace > 16384 or n.nspname = 'public' or n.nspname = 'pg_catalog') and c.oid not in (select reloid from pg_exttable) |
| and n.nspname = '%s' |
| EXCEPT |
| select distinct schemaname, tablename from (%s) AS pps1 |
| EXCEPT |
| select distinct partitionschemaname, parentpartitiontablename from (%s) AS pps2 where parentpartitiontablename is not NULL |
| """ % ('%s', PG_PARTITIONS_SURROGATE, PG_PARTITIONS_SURROGATE) |
| |
| GET_LEAF_PARTITIONS_SQL = """ |
| select partitionschemaname || '.' || partitiontablename from (%s) AS pps1 where schemaname = '%s' and tablename = '%s' |
| EXCEPT |
| select distinct partitionschemaname || '.' || parentpartitiontablename from (%s) AS pps2 where parentpartitiontablename is not NULL |
| and schemaname = '%s' and tablename = '%s' |
| """ % (PG_PARTITIONS_SURROGATE, '%s', '%s', PG_PARTITIONS_SURROGATE, '%s', '%s') |
| |
| GET_MID_LEVEL_PARTITIONS_SQL = """ |
| select distinct partitionschemaname, parentpartitiontablename from (%s) AS pps1 where parentpartitiontablename is not NULL |
| """ % PG_PARTITIONS_SURROGATE |
| |
| GET_REQUESTED_NON_AO_TABLES_SQL = """ |
| select n.nspname as schemaname, c.relname as tablename from pg_class c, pg_namespace n where |
| c.relnamespace = n.oid and c.relkind='r'::char and (c.relnamespace > 16384 or n.nspname = 'public' or n.nspname = 'pg_catalog') |
| and c.oid not in (select relid from pg_appendonly) and c.oid in (%s) and c.oid not in (select reloid from pg_exttable) |
| EXCEPT |
| select distinct schemaname, tablename from (%s) AS pps1 |
| EXCEPT |
| select distinct partitionschemaname, parentpartitiontablename from (%s) AS pps2 where parentpartitiontablename is not NULL |
| """ % ('%s', PG_PARTITIONS_SURROGATE, PG_PARTITIONS_SURROGATE) |
| |
| GET_COLUMN_NAMES_SQL = """ |
| SELECT attname FROM pg_attribute WHERE attrelid = '%s'::regclass AND attnum > 0 AND NOT attisdropped |
| """ |
| |
| GET_INCLUDED_COLUMNS_FROM_EXCLUDE_SQL = """ |
| SELECT attname FROM pg_attribute WHERE attrelid = '%s'::regclass AND attname NOT IN (%s) AND attnum > 0 AND NOT attisdropped |
| """ |
| |
| VALIDATE_COLUMN_NAMES_SQL = """ |
| SELECT count(*) FROM pg_attribute WHERE attrelid = '%s'::regclass AND attname IN (%s) AND attnum > 0 AND NOT attisdropped |
| """ |
| |
| VALIDATE_TABLE_NAMES_SQL = """ |
| SELECT %s |
| """ |
| |
| GET_LEAF_ROOT_MAPPING_SQL = """ |
| SELECT n.nspname || '.' || c2.relname, n.nspname || '.' || c.relname from pg_class c, pg_class c2, pg_namespace n, pg_partition pp, pg_partition_rule ppr |
| WHERE ppr.parchildrelid in (%s) AND ppr.paroid = pp.oid AND pp.parrelid = c.oid AND c.relnamespace = n.oid AND ppr.parchildrelid = c2.oid; |
| """ |
| |
| GET_ALL_ROOT_PARTITION_TABLES_SQL = """ |
| SELECT distinct n.nspname || '.' || c.relname from pg_class c, pg_namespace n, pg_partition pp |
| WHERE pp.parrelid = c.oid AND c.relnamespace = n.oid AND pp.paristemplate = false |
| """ |
| |
| ORDER_CANDIDATES_BY_OID_SQL = """ |
| SELECT schemaname, tablename FROM |
| (SELECT c.oid as tableoid, n.nspname as schemaname, c.relname as tablename FROM pg_class c, pg_namespace n where c.relnamespace=n.oid and c.oid in (%s)) AS foo |
| ORDER BY tableoid DESC; |
| """ |
| |
| |
| class AnalyzeDb(Operation): |
| def __init__(self, options, args): |
| |
| if args: |
| logger.warn("Please note that some of the arguments (%s) aren't valid and will be ignored.", args) |
| if options.masterDataDirectory is None: |
| options.masterDataDirectory = gp.get_masterdatadir() |
| self.master_datadir = options.masterDataDirectory |
| self.analyze_dir = STATEFILE_DIR |
| self.pg_port = gp.get_master_port() |
| self.dbname = options.dbname |
| self.schema = options.schema |
| self.single_table = options.single_table |
| self.config_file = options.config_file |
| self.entire_db = False |
| self.include_cols = options.include_cols |
| self.exclude_cols = options.exclude_cols |
| self.full_analyze = options.full_analyze |
| self.dry_run = options.dry_run |
| self.parallel_level = options.parallel_level |
| self.rootstats = True |
| self.silent = options.silent |
| self.verbose = options.verbose |
| self.clean_last = options.clean_last |
| self.clean_all = options.clean_all |
| |
| self.success_list = [] |
| |
| self._validate_options() |
| self._preprocess_options() |
| |
| self.conn = dbconn.connect(dbconn.DbURL(port=self.pg_port, dbname=self.dbname), utility=False) |
| |
| def _validate_options(self): |
| """ |
| Validates the options passed in to the application. |
| """ |
| if not self.dbname: |
| raise ProgramArgumentValidationException("option -d required. Please see 'analyzedb -?' for usage.") |
| |
| if self.clean_all + self.clean_last > 1: |
| raise ProgramArgumentValidationException('options --clean_last and --clean_all are mutually exclusive') |
| |
| if (self.schema is not None) + (self.single_table is not None) \ |
| + (self.config_file is not None) > 1: |
| raise ProgramArgumentValidationException('options -s, -t and -f are mutually exclusive') |
| |
| if (self.include_cols is not None) + (self.exclude_cols is not None) > 1: |
| raise ProgramArgumentValidationException('options -i and -x are mutually exclusive') |
| |
| if (self.include_cols is not None or self.exclude_cols is not None) and self.single_table is None: |
| raise ProgramArgumentValidationException('option -i or -x can only be used together with -t') |
| |
| if self.parallel_level < 1 or self.parallel_level > 10: |
| raise ProgramArgumentValidationException('option -p requires a value between 1 and 10') |
| |
| |
| def _preprocess_options(self): |
| |
| if self.clean_all: |
| analyze_folder = os.path.join(self.master_datadir, self.analyze_dir, self.dbname) |
| if os.path.exists(analyze_folder): |
| if not self.silent and not userinput.ask_yesno(None, "\nDeleting all files and folders in %s ?" % analyze_folder, 'N'): |
| raise UserAbortedException() |
| for f in os.listdir(analyze_folder): |
| f_path = os.path.join(analyze_folder, f) |
| if os.path.isdir(f_path): |
| shutil.rmtree(f_path) |
| else: |
| os.remove(f_path) |
| else: |
| logger.warning("Folder %s does not exist. Exiting...") |
| |
| if self.clean_last: |
| last_analyze_timestamp = get_lastest_analyze_timestamp(self.master_datadir, self.analyze_dir, self.dbname) |
| if last_analyze_timestamp is not None: |
| analyze_folder = os.path.join(self.master_datadir, self.analyze_dir, self.dbname, last_analyze_timestamp) |
| if os.path.exists(analyze_folder): |
| if not self.silent and not userinput.ask_yesno(None, "\nDeleting folder %s and all files inside?" % analyze_folder, 'N'): |
| raise UserAbortedException() |
| shutil.rmtree(analyze_folder) |
| else: |
| logger.warning("Folder %s does not exist. Exiting...") |
| else: |
| logger.warning("No valid state files directories exist. Exiting...") |
| |
| if self.include_cols is not None: |
| self.include_cols = self.include_cols.strip().split(',') |
| |
| if self.exclude_cols is not None: |
| self.exclude_cols = self.exclude_cols.strip().split(',') |
| |
| if self.single_table is None and self.schema is None and self.config_file is None: |
| self.entire_db = True |
| |
| if self.verbose: |
| logger.setLevel(10) |
| |
| def execute(self): |
| try: |
| if self.clean_all or self.clean_last: |
| return 0 |
| |
| global CURR_TIME |
| CURR_TIME = generate_timestamp() |
| |
| # The input_col_dict keeps track of the requested columns to analyze. |
| # key: table name (e.g. 'public.foo') |
| # value: a set of requested column names (e.g. set(['col1','col2']), or '-1' indicating all columns |
| input_col_dict = {} |
| |
| # parse input and update input column dictionary |
| input_tables = self._get_input_tables(input_col_dict) # ['public.foo', 'public.bar', ...] |
| input_tables_set = set(input_tables) |
| |
| if len(input_tables_set) == 0: |
| logger.warning("There are no tables or partitions to be analyzed. Exiting...") |
| return 0 |
| |
| logger.info("Checking for tables with stale stats...") |
| # get all heap tables in the requested tables. all heap tables are regarded as dirty |
| heap_partitions = get_heap_tables_set(self.conn, input_tables_set) # set(['schema.table1', ...]) |
| |
| # get the current state of the requested tables |
| # curr_ao_state contains the number of DML commands that have been executed on an AO table |
| # curr_last_op contains the timestamp of the last DDL command (CREATE, ALTER, TRUNCATE, DROP) of an AO table |
| curr_ao_state = self._get_ao_state(input_tables_set) # ['schema, table, modcount', ...] |
| curr_last_op = self._get_lastop_state(input_tables_set) # e.g. ['public,ao_tab,67468,ALTER,ADD COLUMN,2014-10-15 14:49:27.658777-07', ...] |
| last_analyze_timestamp = get_lastest_analyze_timestamp(self.master_datadir, self.analyze_dir, self.dbname) |
| |
| # get the previous state of the database |
| prev_ao_state = get_prev_ao_state(last_analyze_timestamp, self.master_datadir, self.analyze_dir, self.dbname) |
| prev_last_op = get_prev_last_op(last_analyze_timestamp, self.master_datadir, self.analyze_dir, self.dbname) |
| |
| # compare two states to get dirty tables |
| dirty_partitions = self._get_dirty_data_tables(heap_partitions, curr_ao_state, curr_last_op, prev_ao_state, prev_last_op) |
| |
| # get the previous column states |
| # read from the file on disk which contains info about what columns had up-to-date stats after the last time the table was analyzed |
| prev_col_dict = get_prev_col_state(last_analyze_timestamp, self.master_datadir, self.analyze_dir, self.dbname) |
| |
| candidates = set() # set(['public.foo', 'public.bar', ...]) |
| |
| if self.full_analyze: # full analyze does not support column-level increments and will invalidate previous column-level state |
| candidates = input_tables_set |
| else: # incremental |
| for table in input_tables_set: |
| if table in dirty_partitions: # for dirty partitions, we invalidate all previous column-level state |
| candidates.add(table) |
| else: |
| # figure out which columns need to be analyzed |
| self._update_input_col_dict_with_column_increments(table, input_col_dict, prev_col_dict) |
| if len(input_col_dict[table]) > 0: |
| candidates.add(table) |
| |
| candidates = self._get_valid_candidates(candidates) |
| if len(candidates) < 1: |
| logger.warning("There are no tables or partitions to be analyzed. Exiting...") |
| return 0 |
| |
| root_partition_col_dict = {} |
| # root_partition_col_dict contains the mapping between the root partitions |
| # and its corresponding columns to be analyzed |
| # key: name of the root partition whose stats needs to be refreshed |
| # value: a set of column names to be analyzed, or '-1' meaning all columns of that table |
| if self.rootstats: |
| root_partition_col_dict = self._get_root_partition_col_dict(candidates, input_col_dict) |
| |
| ordered_candidates = self._get_ordered_candidates(candidates, root_partition_col_dict) |
| target_list = [] |
| logger.info("---------------------------------------------------") |
| logger.info("Tables or partitions to be analyzed") |
| logger.info("---------------------------------------------------") |
| for can in ordered_candidates: |
| if can in candidates: |
| target = self._get_tablename_with_cols(can, input_col_dict) |
| else: # can in root_partition_col_dict |
| target = self._get_tablename_with_cols(can, root_partition_col_dict) |
| logger.info(target) |
| target_list.append(target) |
| logger.info("---------------------------------------------------") |
| |
| if self.dry_run: |
| return 0 |
| |
| if not self.silent and not userinput.ask_yesno(None, "\nContinue with Analyze", 'N'): |
| raise UserAbortedException() |
| |
| num_workers = min(self.parallel_level, len(ordered_candidates)) |
| logger.info("Starting analyze with %d workers..." % num_workers) |
| pool = AnalyzeWorkerPool(numWorkers=num_workers) |
| for can in ordered_candidates: |
| if can in candidates: |
| target = self._get_tablename_with_cols(can, input_col_dict) |
| cmd = Command('analyze %s' % target, ANALYZE_SQL % (self.dbname, self.pg_port, quote_tbl(target))) |
| else: # can in root_partition_col_dict |
| target = self._get_tablename_with_cols(can, root_partition_col_dict) |
| cmd = Command('analyze rootpartition %s' % target, ANALYZE_ROOT_SQL % (self.dbname, self.pg_port, quote_tbl(target))) |
| pool.addCommand(cmd) |
| |
| wait_count = len(ordered_candidates) |
| try: |
| start_time = time.time() |
| while wait_count > 0: |
| done_cmd = pool.completed_queue.get() |
| if done_cmd.was_successful(): |
| subject = self._get_tablename_from_cmd_name(done_cmd.name) |
| self.success_list.append(subject) |
| if wait_count % 10 == 0: |
| logger.info("progress status: completed %d out of %d tables or partitions" % |
| (len(self.success_list), len(ordered_candidates))) |
| wait_count -= 1 |
| |
| for worker in pool.workers: |
| worker.join(1) |
| except: |
| pool.haltWork() |
| for worker in pool.workers: |
| worker.join(1) |
| raise |
| |
| finally: |
| self._write_back(curr_ao_state, curr_last_op, prev_ao_state, prev_last_op, heap_partitions, |
| input_col_dict, prev_col_dict, root_partition_col_dict, self.full_analyze, dirty_partitions, |
| target_list) |
| end_time = time.time() |
| logger.info("Total elapsed time: %d seconds. Analyzed %d out of %d table(s) or partition(s) successfully." |
| % (int(end_time-start_time), len(self.success_list), len(ordered_candidates))) |
| logger.info("Done.") |
| except: |
| raise |
| |
| finally: |
| if self.conn: |
| self.conn.close() |
| |
| return 0 |
| |
| def _get_input_tables(self, col_dict): |
| """ |
| Depending on the way this program was invoked, gather all the requested tables to be analyzed. |
| At the same time, parse the requested columns and populate the col_dict. |
| If a requested table is partitioned, expand all the leaf partitions. |
| """ |
| logger.info("Getting and verifying input tables...") |
| if self.single_table: |
| validate_include_tables(self.conn, [self.single_table], None) |
| # for single table, we always try to expand it to avoid getting all root partitions in the database |
| self._parse_column(col_dict, self.single_table, self.include_cols, self.exclude_cols, True) |
| |
| elif self.schema: # all tables in a schema |
| ValidateSchemaExists(self.dbname, self.schema, self.pg_port).run() |
| logger.debug("getting all tables in the schema...") |
| all_schema_tables = run_sql(self.conn, GET_ALL_DATA_TABLES_IN_SCHEMA_SQL % self.schema) |
| # convert table name from ['public','foo'] to 'public.foo' and populate col_dict as all columns requested |
| for table in all_schema_tables: |
| col_dict['.'.join(table)] = set(['-1']) |
| |
| elif self.config_file: |
| validate_include_tables(self.conn, None, self.config_file) |
| all_root_partitions = run_sql(self.conn, GET_ALL_ROOT_PARTITION_TABLES_SQL) |
| cfg_file = open(self.config_file, 'rU') |
| for line in cfg_file: |
| toks = line.strip().split() |
| table = toks[0] |
| included_cols = self._get_include_or_exclude_cols(toks, '-i') |
| excluded_cols = self._get_include_or_exclude_cols(toks, '-x') |
| self._parse_column(col_dict, table, included_cols, excluded_cols, [table] in all_root_partitions) |
| |
| else: # all user tables in database |
| alltables = run_sql(self.conn, GET_ALL_DATA_TABLES_SQL) |
| for table in alltables: |
| col_dict['.'.join(table)] = set(['-1']) |
| |
| return col_dict.keys() |
| |
| def _get_include_or_exclude_cols(self, line_tokens, option_str): |
| """ |
| Get included or excluded columns from a line in the config file |
| :param line_tokens: tokenized lien in the config file, e.g., ['<schema>.<table>', '-i', '<col1>,<col2>,...'] |
| :param option_str: 'include' or 'exclude' |
| :return: a list of included or excluded columns |
| """ |
| pos = line_tokens.index(option_str) if option_str in line_tokens else -1 |
| if pos < 0: |
| cols = None |
| else: |
| if pos+1 >= len(line_tokens): |
| raise Exception("No %s columns specified." % option_str) |
| cols = line_tokens[pos+1].split(',') |
| |
| return cols |
| |
| def _get_valid_candidates(self, candidates): |
| """ |
| Validate candidates for the following: |
| 1. check whether candidates set is empty |
| 2. check invalid characters in table names |
| 3. skip views and external tables |
| """ |
| ret = [] |
| mid_level_partitions = run_sql(self.conn, GET_MID_LEVEL_PARTITIONS_SQL) |
| for can in candidates: |
| if '\n' in can or ',' in can or ':' in can: |
| raise Exception('Table name has an invalid character "\\n", ":", "," : "%s"' % can) |
| if can.split('.') in mid_level_partitions: |
| logger.warning("Skipping mid-level partition %s" % can) |
| else: |
| ret.append(can) |
| |
| if self.config_file is not None or self.single_table is not None: |
| valid_tables = [] |
| if len(ret) > 0: |
| oid_str = get_oid_str(ret) |
| qresult = run_sql(self.conn, GET_VALID_DATA_TABLES_SQL % oid_str) |
| for tbl in qresult: |
| valid_tables.append('.'.join(tbl)) |
| return valid_tables |
| |
| return ret |
| |
| def _get_dirty_data_tables(self, heap_tables_set, curr_ao_state, curr_last_op, prev_ao_state, prev_last_op): |
| """ |
| dirty data tables include: |
| - heap tables |
| - ao tables that have gone through DML or DDL |
| """ |
| logger.debug("getting dirty data tables...") |
| dirty_heap_tables = heap_tables_set |
| dirty_ao_tables = self._get_dirty_ao_state_tables(curr_ao_state, prev_ao_state) |
| dirty_metadata_set = self._get_dirty_lastop_tables(curr_last_op, prev_last_op) |
| return dirty_heap_tables | dirty_ao_tables | dirty_metadata_set |
| |
| def _get_ao_state(self, input_tables_set): |
| logger.debug("getting ao state...") |
| oid_str = get_oid_str(input_tables_set) |
| ao_partition_info = run_sql(self.conn, GET_REQUESTED_AO_DATA_TABLE_INFO_SQL % oid_str) |
| return get_partition_state(self.pg_port, self.dbname, 'pg_aoseg', ao_partition_info) |
| |
| def _get_lastop_state(self, input_tables_set): |
| # oid, action, subtype, timestamp |
| logger.debug("getting last operation states...") |
| oid_str = get_oid_str(input_tables_set) |
| rows = run_sql(self.conn, GET_REQUESTED_LAST_OP_INFO_SQL % oid_str) |
| data = [] |
| for row in rows: |
| line = "%s,%s,%d,%s,%s,%s" % (row[0], row[1], row[2], row[3], row[4], row[5]) |
| data.append(line) |
| return data |
| |
| def _write_back(self, curr_ao_state, curr_last_op, prev_ao_state, prev_last_op, heap_partitions, |
| input_col_dict, prev_col_dict, root_partition_col_dict, is_full, dirty_partitions, target_list): |
| validate_dir("%s/%s/%s/%s" % (self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME)) |
| |
| curr_ao_state_dict = create_ao_state_dict(curr_ao_state) |
| curr_last_op_dict = create_last_op_dict(curr_last_op) |
| prev_ao_state_dict = create_ao_state_dict(prev_ao_state) |
| prev_last_op_dict = create_last_op_dict(prev_last_op) |
| |
| for table in (x for x in self.success_list if x not in heap_partitions and x not in root_partition_col_dict): |
| # update modcount for tables that are successfully analyzed |
| new_modcount = curr_ao_state_dict[table] |
| prev_ao_state_dict[table] = new_modcount |
| |
| # update last op for tables that are successfully analyzed |
| last_op_info = curr_last_op_dict[table] # {'CREATE':'<entry>', 'ALTER':'<entry>', ...} |
| prev_last_op_dict[table] = last_op_info |
| |
| # update column dict |
| if is_full or table in dirty_partitions or table not in prev_col_dict or '-1' in input_col_dict[table]: |
| prev_col_dict[table] = input_col_dict[table] |
| else: |
| prev_col_dict[table] = prev_col_dict[table] | input_col_dict[table] |
| |
| ao_state_output = construct_entries_from_dict_aostate(prev_ao_state_dict) |
| last_op_output = construct_entries_from_dict_lastop(prev_last_op_dict) |
| col_state_output = construct_entries_from_dict_colstate(prev_col_dict) |
| |
| if len(ao_state_output) > 0: |
| ao_state_filename = generate_statefile_name('ao', self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME) |
| logger.info("Writing ao state file %s" % ao_state_filename) |
| write_lines_to_file(ao_state_filename, ao_state_output) |
| logger.debug("Verifying ao state file ...") |
| verify_lines_in_file(ao_state_filename, ao_state_output) |
| |
| if len(last_op_output) > 0: |
| last_operation_filename = generate_statefile_name('lastop', self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME) |
| logger.info("Writing last operation file %s" % last_operation_filename) |
| write_lines_to_file(last_operation_filename, last_op_output) |
| logger.debug("Verifying last operation file ...") |
| verify_lines_in_file(last_operation_filename, last_op_output) |
| |
| if len(prev_col_dict) > 0: |
| col_state_filename = generate_statefile_name('col', self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME) |
| logger.info("Writing column state file %s" % col_state_filename) |
| write_lines_to_file(col_state_filename, col_state_output) |
| logger.debug("Verifying column state ...") |
| verify_lines_in_file(col_state_filename, col_state_output) |
| |
| report_filename = generate_statefile_name('report', self.master_datadir, self.analyze_dir, self.dbname, CURR_TIME) |
| logger.info("Writing report file %s" % report_filename) |
| with open(report_filename, 'w') as fp: |
| fp.write("%s:%s:%s:%s %s:%s:analyzedb %s\n\n" % (CURR_TIME[:8], CURR_TIME[8:10], CURR_TIME[10:12], CURR_TIME[12:14], |
| unix.getLocalHostname(), unix.getUserName(), ' '.join(sys.argv[1:]))) |
| fp.write("Tables or partitions to analyze:\n---------------------------------------\n") |
| for target in target_list: |
| fp.write("%s\n" % target.strip()) |
| fp.write("\n\nTables or partitions successfully analyzed:\n--------------------------------------------\n") |
| for tbl in self.success_list: |
| fp.write("%s\n" % tbl.strip()) |
| fp.write("\n%d out of %d tables are analyzed.\n" % (len(self.success_list), len(target_list))) |
| if len(target_list) == len(self.success_list): |
| fp.write("\nanalyzedb finished successfully.\n") |
| |
| def _get_dirty_lastop_tables(self, curr_last_op, prev_last_op): |
| old_pgstatoperations_dict = get_pgstatlastoperations_dict(prev_last_op) |
| dirty_tables = compare_metadata(old_pgstatoperations_dict, curr_last_op) |
| return dirty_tables |
| |
| def _get_dirty_ao_state_tables(self, curr_ao_state, prev_ao_state): |
| last_state_dict = create_ao_state_dict(prev_ao_state) |
| curr_state_dict = create_ao_state_dict(curr_ao_state) |
| return compare_dict(last_state_dict, curr_state_dict) |
| |
| def _parse_column(self, col_dict, table, include_cols, exclude_cols, is_root_partition): |
| """ |
| Given a list of included or excluded columns of a table, populate the column dictionary. |
| If the table is partitioned, expand it into all leaf partitions. |
| If both include_cols and exclude_cols are empty, use '-1' as the value indicating 'all columns'. |
| """ |
| if include_cols is not None: |
| validate_columns(self.conn, table, include_cols) |
| elif exclude_cols is not None: |
| validate_columns(self.conn, table, exclude_cols) |
| included_column_set = get_include_cols_from_exclude(self.conn, table, exclude_cols) |
| if len(included_column_set) == 0: |
| raise Exception("All columns have been excluded from table %s" % table) |
| if is_root_partition: |
| logger.debug("expanding partition tables...") |
| tbl_parts = self._expand_partition_tables([table]) |
| else: |
| tbl_parts = [table] |
| |
| for tbl in tbl_parts: |
| if include_cols is not None: |
| col_dict[tbl] = set(include_cols) |
| elif exclude_cols is not None: |
| col_dict[tbl] = included_column_set |
| else: # all columns |
| col_dict[tbl] = set(['-1']) |
| |
| def _update_input_col_dict_with_column_increments(self, table, input_col_dict, prev_col_dict): |
| if table in prev_col_dict: |
| # since expanding the default '-1' to all column name is expensive, we avoid this as much as possible |
| if '-1' not in input_col_dict[table] or '-1' not in prev_col_dict[table]: |
| input_col_set = self._expand_columns(input_col_dict, table) |
| prev_col_set = self._expand_columns(prev_col_dict, table) |
| pending_cols = input_col_set - prev_col_set # set difference |
| input_col_dict[table] = pending_cols |
| else: # both previous and current runs are without column specification |
| input_col_dict[table] = set() |
| |
| def _get_tablename_with_cols(self, table, col_dict): |
| if '-1' in col_dict[table]: |
| return table |
| else: |
| return table + '(' + ','.join(sorted(col_dict[table])) + ')' |
| |
| def _get_tablename_from_cmd_name(self, cmdName): |
| if '(' in cmdName: |
| subject = cmdName.split()[-1].split('(')[0] |
| else: |
| subject = cmdName.split()[-1] |
| |
| return subject |
| |
| def _expand_partition_tables(self, table_list): |
| ret = [] |
| for table in table_list: |
| (schema, parent) = table.split('.') |
| qresult = run_sql(self.conn, GET_LEAF_PARTITIONS_SQL % (schema, parent, schema, parent)) |
| child_parts = [x[0] for x in qresult] |
| if len(child_parts) == 0: |
| ret.append(table) |
| else: |
| ret += child_parts |
| |
| return ret |
| |
| def _get_root_partition_col_dict(self, candidates, input_col_dict): |
| """ |
| Examine the candidates and figure out the root partitions whose stats need refreshing and |
| what columns need to be analyzed on those root partitions. |
| If the program is invoked on whole schema or whole database, then we know all columns have |
| been requested. Thus we can use one query to obtain the root partitions associated with the |
| candidates. |
| If the program is invoked by '-t' or '-f', we need to either look up the partition_dict or |
| issue a query to get the leaf-root relationship. Then the columns to be analyzed on the root |
| level are the set union of the columns to be analyzed for all leaf partitions. |
| """ |
| logger.debug("getting mapping between leaf and root partition tables...") |
| ret = {} |
| # The leaf_root_dict keeps track of the mapping between a leaf partition and its root partition |
| # for the use of refreshing root stats. |
| leaf_root_dict = {} |
| oid_str = get_oid_str(candidates) |
| qresult = run_sql(self.conn, GET_LEAF_ROOT_MAPPING_SQL % oid_str) |
| for mapping in qresult: |
| leaf_root_dict[mapping[0]] = mapping[1] |
| |
| for can in candidates: |
| if can in leaf_root_dict: # this is a leaf partition |
| if leaf_root_dict[can] not in ret: |
| ret[leaf_root_dict[can]] = input_col_dict[can].copy() |
| else: |
| ret[leaf_root_dict[can]] |= input_col_dict[can].copy() |
| ## TODO: do we need column expansion here? |
| return ret |
| |
| def _get_ordered_candidates(self, candidates, root_partition_col_dict): |
| """ |
| Take all tables in candidates and root_partition_col_dict and order them |
| by descending order of their OIDs. This gives us two important benefits: |
| 1. The root partition will be analyzed right after the leaves |
| 2. The leaf partitions (if range partitioned, especially by date) will be ordered in descending |
| order of the partition key, so that newer partitions can be analyzed first. |
| """ |
| candidate_regclass_str = get_oid_str(candidates+root_partition_col_dict.keys()) |
| qresult = run_sql(self.conn, ORDER_CANDIDATES_BY_OID_SQL % candidate_regclass_str) |
| return ['.'.join(x) for x in qresult] |
| |
| def _expand_columns(self, col_dict, table): |
| if '-1' in col_dict[table]: |
| cols = run_sql(self.conn, GET_COLUMN_NAMES_SQL % quote_tbl(table)) |
| return set([x[0] for x in cols]) |
| else: |
| return col_dict[table] |
| |
| |
| def run_sql(conn, query): |
| try: |
| cursor = dbconn.execSQL(conn, query) |
| res = cursor.fetchall() |
| except Exception, db_err: |
| raise ExceptionNoStackTraceNeeded("%s" % db_err.__str__()) #.split('\n')[0]) |
| cursor.close() |
| return res |
| |
| def generate_timestamp(): |
| timestamp = datetime.now() |
| return timestamp.strftime("%Y%m%d%H%M%S") |
| |
| def quote_tbl(target): |
| """ |
| Quote schema names and table names before pass them to query strings. |
| If target contains column name, e.g. public.foo(a,b), quote column name separately. |
| E.g. public.foo(a,b) will become "public"."foo"("a","b") |
| """ |
| sch, tbl_raw = target.split('.') |
| cols_str = '' |
| if '(' in tbl_raw: |
| tbl = tbl_raw.split('(')[0] |
| cols = tbl_raw.split('(')[1][:-1].split(',') |
| cols_str = '(' + ','.join(['"%s"' % x for x in cols]) + ')' |
| else: |
| tbl = tbl_raw |
| |
| return '"%s"."%s"' % (sch, tbl) + cols_str |
| |
| |
| def get_oid_str(table_list): |
| return ','.join(["""'\"%s\".\"%s\"'::regclass""" % (x.split('.')[0], x.split('.')[1]) for x in table_list]) |
| |
| |
| def get_heap_tables_set(conn, input_tables_set): |
| logger.debug("getting heap tables...") |
| oid_str = get_oid_str(input_tables_set) |
| dirty_tables = set() |
| qresult = run_sql(conn, GET_REQUESTED_NON_AO_TABLES_SQL % oid_str) |
| for row in qresult: |
| dirty_tables.add('.'.join(row)) |
| return dirty_tables |
| |
| def get_lastest_analyze_timestamp(master_datadir, statefile_dir, dbname): |
| analyze_dirs = get_analyze_dirs(master_datadir, statefile_dir, dbname) |
| |
| for analyze_dir in analyze_dirs: |
| files = sorted(os.listdir(analyze_dir)) |
| |
| if len(files) == 0: |
| logger.warn('Analyze state file directory %s is empty. Ignoring this directory...' % analyze_dir) |
| continue |
| |
| analyze_report_files = fnmatch.filter(files, 'analyze_[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]_report') |
| |
| if len(analyze_report_files) == 0: |
| logger.warn('No analyze report files found in analyze directory %s. Ignoring this directory...' % analyze_dir) |
| continue |
| for report_file in analyze_report_files: |
| return report_file.split('_')[1] |
| |
| return None |
| |
| def get_prev_ao_state(timestamp, master_datadir, analyze_dir, dbname): |
| logger.debug("getting previous ao state...") |
| prev_state_filename = generate_statefile_name('ao', master_datadir, analyze_dir, dbname, timestamp) |
| if not os.path.isfile(prev_state_filename): |
| return [] |
| return get_lines_from_file(prev_state_filename) |
| |
| def get_prev_last_op(timestamp, master_datadir, analyze_dir, dbname): |
| logger.debug("getting previous last operation...") |
| old_pgstatoperations_file = generate_statefile_name('lastop', master_datadir, analyze_dir, dbname, timestamp) |
| if not os.path.isfile(old_pgstatoperations_file): |
| old_pgstatoperations = [] |
| else: |
| old_pgstatoperations = get_lines_from_file(old_pgstatoperations_file) |
| return old_pgstatoperations |
| |
| def get_prev_col_state(timestamp, master_datadir, analyze_dir, dbname): |
| logger.debug("getting previous column states...") |
| prev_col_state_file = generate_statefile_name('col', master_datadir, analyze_dir, dbname, timestamp) |
| if not os.path.isfile(prev_col_state_file): |
| return {} |
| lines = get_lines_from_file(prev_col_state_file) |
| prev_col_dict = {} |
| for line in lines: |
| toks = line.strip().split(',') |
| toks = map(str.strip, toks) |
| prev_col_dict['.'.join(toks[:2])] = set(toks[2:]) |
| |
| return prev_col_dict |
| |
| def create_ao_state_dict(ao_state_entries): |
| ao_state_dict = dict() |
| for entry in ao_state_entries: |
| fields = entry.split(',') |
| if len(fields) != 3: |
| raise Exception('Invalid ao state file format %s' % entry) |
| key = '%s.%s' % (fields[0].strip(), fields[1].strip()) |
| ao_state_dict[key] = fields[2].strip() |
| |
| return ao_state_dict |
| |
| def create_last_op_dict(last_op_entries): |
| last_op_dict = {} |
| for entry in last_op_entries: |
| toks = entry.split(',') |
| if len(toks) != 6: |
| raise Exception('Wrong number of tokens in last_operation data for last backup: "%s"' % entry) |
| key = '%s.%s' % (toks[0],toks[1]) |
| op = toks[3] |
| if key not in last_op_dict: |
| last_op_dict[key] = {op:entry} |
| else: |
| last_op_dict[key][op] = entry |
| |
| return last_op_dict |
| |
| def construct_entries_from_dict_aostate(ao_state_dict): |
| ret = [] |
| for key, item in ao_state_dict.iteritems(): |
| token = ','.join(key.split('.')) |
| ret.append(','.join([token, item])) |
| return ret |
| |
| def construct_entries_from_dict_lastop(last_op_dict): |
| ret = [] |
| for value in last_op_dict.itervalues(): |
| for entry in value.itervalues(): |
| ret.append(entry) |
| return ret |
| |
| def construct_entries_from_dict_colstate(prev_col_dict): |
| ret = [] |
| for table, col_set in prev_col_dict.iteritems(): |
| cols = ','.join(col_set) |
| ret.append(','.join([','.join(table.split('.')), cols])) # public,foo,a,b,c |
| return ret |
| |
| def generate_statefile_name(type_str, master_data_dir, analyze_dir, dbname, timestamp): |
| use_dir = "%s/%s/%s/%s" % (master_data_dir, analyze_dir, dbname, timestamp) |
| if type_str == 'lastop': |
| ret_str = "%s/analyze_%s_last_operation" |
| elif type_str == 'ao': |
| ret_str = "%s/analyze_%s_ao_state_file" |
| elif type_str == 'col': |
| ret_str = "%s/analyze_%s_col_state_file" |
| elif type_str == 'report': |
| ret_str = "%s/analyze_%s_report" |
| else: |
| raise Exception("Invalid type string for generating state file name") |
| return ret_str % (use_dir, timestamp) |
| |
| def get_analyze_dirs(master_datadir, statefile_dir, dbname): |
| analyze_path = os.path.join(master_datadir, statefile_dir, dbname) |
| |
| if not os.path.isdir(analyze_path): |
| return [] |
| |
| initial_list = os.listdir(analyze_path) |
| initial_list = fnmatch.filter(initial_list, '[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]') |
| |
| dirnames = [] |
| for d in initial_list: |
| pth = os.path.join(analyze_path, d) |
| if os.path.isdir(pth): |
| dirnames.append(pth) |
| |
| if len(dirnames) == 0: |
| return [] |
| dirnames = sorted(dirnames, key=lambda x: int(os.path.basename(x)), reverse=True) |
| return dirnames |
| |
| def validate_dir(path): |
| exists = CheckDir(path).run() |
| if exists: |
| logger.info("Directory %s exists" % path) |
| else: |
| try: |
| MakeDir(path).run() |
| except OSError, e: |
| logger.exception("Could not create directory %s" % path) |
| raise AnalyzeDirCreateFailed() |
| else: |
| logger.info("Created %s" % path) |
| try: |
| with tempfile.TemporaryFile(dir=path) as f: |
| pass |
| except Exception, e: |
| logger.exception("Cannot write to %s" % path) |
| raise AnalyzeDirNotWritable() |
| |
| def validate_include_tables(conn, table_list, include_file): |
| """ |
| table_list needs to be a list of 'schema.table's |
| include_file could contain columns specified by -i |
| """ |
| tables = {} |
| if table_list is not None: |
| for tbl in table_list: |
| if '.' not in tbl: |
| raise ExceptionNoStackTraceNeeded("No schema name supplied for table %s" % tbl) |
| tables[tbl] = 0 |
| |
| if include_file is not None: |
| in_file = open(include_file, 'rU') |
| line_no = 1 |
| for line in in_file: |
| toks = line.strip().split() |
| if len(toks) == 0: # emtpy line |
| continue |
| if len(toks) > 3: # we are expecting <schema>.<table> --in(ex)clude_column <col1>,<col2>,... |
| raise ExceptionNoStackTraceNeeded("Wrong input arguments in line %d of config file. Please check usage." % line_no) |
| if '.' not in toks[0]: |
| raise ExceptionNoStackTraceNeeded("No schema name supplied for table %s" % toks[0]) |
| if toks[0] in tables: |
| raise ExceptionNoStackTraceNeeded("Duplicate table name in line %d of config file." % line_no) |
| tables[toks[0]] = 0 |
| line_no += 1 |
| |
| # since the number of target entries cannot be greater than 1664 in GPDB/HAWQ, |
| # we validate the tables in batches of 1500 |
| tablenames = tables.keys() |
| batch_size = 1500 |
| nbatches = (len(tables)-1)/batch_size + 1 |
| curr_batch = 0 |
| |
| while curr_batch < nbatches: |
| oid_str = get_oid_str(tablenames[curr_batch*batch_size:(curr_batch+1)*batch_size]) |
| run_sql(conn, VALIDATE_TABLE_NAMES_SQL % oid_str) |
| curr_batch += 1 |
| |
| def get_include_cols_from_exclude(conn, table, exclude_cols): |
| """ |
| Given a list of excluded columns of a table, get the list of included columns |
| """ |
| quoted_exclude_cols = ','.join(["'%s'" % x for x in exclude_cols]) |
| cols = run_sql(conn, GET_INCLUDED_COLUMNS_FROM_EXCLUDE_SQL % (quote_tbl(table), quoted_exclude_cols)) |
| |
| return set([x[0] for x in cols]) |
| |
| def validate_columns(conn, table, column_list): |
| """ |
| Check whether all column names in a list are valid for a table |
| """ |
| if len(column_list) == 0: |
| return |
| valid_col_count = dbconn.execSQLForSingleton(conn, VALIDATE_COLUMN_NAMES_SQL % (quote_tbl(table), ','.join(["'%s'" % x for x in column_list]))) |
| |
| if int(valid_col_count) != len(column_list): |
| raise Exception("Invalid input columns for table %s." % table) |
| |
| def create_parser(): |
| parser = OptionParser(version='%prog version 1.0', |
| description= |
| "Analyze a database incrementally. 'Incremental' means if a table or partition has not been modified by" |
| "DML or DDL commands since the last analyzedb run, it will be automatically skipped since its statistics" |
| "must be up to date. Some restrictions apply: " |
| "1. The incremental semantics only applies to append-only tables or partitions. All heap tables are regarded" |
| "as having stale stats every time analyzedb is run. This is because we use AO metadata to check for DML or" |
| "DDL events, which is not available to heap tables. " |
| "2. Views, indices and external tables are automatically skipped. " |
| "3. Table names or schema names containing comma or period is not supported yet." |
| ) |
| parser.set_usage('%prog [options] ') |
| parser.remove_option('-h') |
| |
| parser.add_option('-d', dest='dbname', metavar="<database name>", |
| help="Database name. Required.") |
| parser.add_option('-s', dest='schema', metavar="<schema name>", |
| help="Specify a schema to analyze. All tables in the schema will be analyzed.") |
| parser.add_option('-t', dest='single_table', metavar="<schema name>.<table name>", |
| help="Analyze a single table. Table name needs to be qualified with schema name.") |
| parser.add_option('-i', type='string', dest='include_cols', metavar="<column1>,<column2>,...", |
| help="Columns to include to be analyzed, separated by comma. All columns will be analyzed if not specified.") |
| parser.add_option('-x', type='string', dest='exclude_cols', metavar="<column1>,<column2>,...", |
| help="Columns to exclude to be analyzed, separated by comma. All columns will be analyzed if not specified.") |
| parser.add_option('-f', '--file', dest='config_file', metavar="<config_file>", |
| help="Config file that includes a list of tables to be analyzed. " |
| "Table names must be qualified with schema name. Optionally a list of columns (separated by " |
| "comma) can be specified using -i or -x.") |
| parser.add_option('-l', '--list', action='store_true', dest='dry_run', default=False, |
| help="List the tables to be analyzed without actually running analyze (dry run).") |
| parser.add_option('-p', type='int', dest='parallel_level', default=5, metavar="<parallel level>", |
| help="Parallel level, i.e. the number of tables to be analyzed in parallel. Valid numbers are between 1 and 10. Default value is 5.") |
| parser.add_option('--full', action='store_true', dest='full_analyze', default=False, |
| help="Analyze without using incremental. All tables requested by the user will be analyzed.") |
| parser.add_option('--clean_last', action='store_true', dest='clean_last', default=False, |
| help="Clean the state files generated by last analyzedb run. All other options except -d will be ignored.") |
| parser.add_option('--clean_all', action='store_true', dest='clean_all', default=False, |
| help="Clean all the state files generated by analyzedb. All other options except -d will be ignored.") |
| parser.add_option('-h', '-?', '--help', action='help', |
| help='Show this help message and exit.') |
| parser.add_option('-v', '--verbose', action='store_true', dest='verbose', help='Print debug messages.') |
| parser.add_option('-a', action='store_true', dest='silent', default=False, |
| help="Quiet mode. Do not prompt for user confirmation.") |
| |
| return parser |
| |
| class AnalyzeDirCreateFailed(Exception): pass |
| class AnalyzeDirNotWritable(Exception): pass |
| |
| class AnalyzeWorkerPool(WorkerPool): |
| """ |
| a custom worker pool for analyze workers |
| """ |
| def __init__(self, numWorkers=5, items=None): |
| self.workers=[] |
| self.work_queue=Queue() |
| self.completed_queue=Queue() |
| self.num_assigned=0 |
| if items is not None: |
| for item in items: |
| self.work_queue.put(item) |
| self.num_assigned += 1 |
| |
| for i in range(0,numWorkers): |
| # use AnalyzeWorker instead of Worker |
| w = AnalyzeWorker("worker%d" % i,self) |
| self.workers.append(w) |
| w.start() |
| self.numWorkers = numWorkers |
| self.logger = logger |
| |
| |
| class AnalyzeWorker(Thread): |
| """ |
| a custom worker thread for Analyze |
| """ |
| pool=None |
| cmd=None |
| name=None |
| logger=None |
| |
| def __init__(self,name,pool,timeout=0.05): |
| self.name=name |
| self.pool=pool |
| self.timeout=timeout |
| self.logger=logger |
| self.stoprequest = threading.Event() |
| Thread.__init__(self) |
| |
| def run(self): |
| while not self.stoprequest.isSet(): |
| try: |
| self.cmd = self.pool.getNextWorkItem(timeout=self.timeout) |
| |
| if self.cmd is not None: |
| # tablename = self.cmd.cmdStr.split()[-1][:-1] |
| self.logger.info("[%s] started %s" % (self.name, self.cmd.name)) |
| start_time = time.time() |
| self.cmd.run() |
| end_time = time.time() |
| stderr = self.cmd.get_stderr_lines() |
| if len(stderr) > 0: # emit stderr if there is any |
| self.logger.warning('\n'.join(stderr)) |
| if self.cmd.was_successful(): |
| self.logger.info("[%s] finished %s. Elapsed time: %d seconds." % (self.name, self.cmd.name, |
| int(end_time-start_time))) |
| self.pool.addFinishedWorkItem(self.cmd) |
| self.cmd=None |
| |
| except Empty: |
| return |
| |
| except: |
| if self.cmd: |
| self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd)) |
| self.pool.addFinishedWorkItem(self.cmd) |
| self.cmd=None |
| raise |
| |
| def haltWork(self): |
| self.stoprequest.set() |
| c = self.cmd |
| if c is not None: |
| c.interrupt() |
| c.cancel() |
| |
| |
| if __name__ == '__main__': |
| sys.argv[0] = EXECNAME |
| simple_main(create_parser, AnalyzeDb) |