blob: f982802bb32accefed72f94ef85d45ac437ad3dd [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Utility for computing table statistics of tables in the Hive Metastore
from contextlib import contextmanager
from optparse import OptionParser
import logging
import multiprocessing
import multiprocessing.pool
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
def compute_stats_table(client_factory, db, table, continue_on_error):
"""
Runs 'compute stats' on a given table. If continue_on_error is
True, exceptions computing statistics are swallowed.
"""
with client_factory() as impala_client:
db_table = "%s.%s" % (db, table)
statement = "compute stats %s" % (db_table,)
logging.info('Executing: %s', statement)
try:
result = impala_client.execute(statement)
logging.info(" %s -> %s", db_table, ' '.join(result.data).strip())
except:
logging.exception(' Failed on table %s', db_table)
if not continue_on_error:
raise
def compute_stats(client_factory, db_names=None, table_names=None,
continue_on_error=False, parallelism=multiprocessing.cpu_count()):
"""
Runs COMPUTE STATS over the selected tables. The target tables can be filtered by
specifying a list of databases and/or table names. If no filters are specified this will
run COMPUTE STATS on all tables in all databases.
parallelism controls the size of the thread pool to which compute_stats
is sent.
"""
logging.info("Enumerating databases and tables for compute stats.")
pool = multiprocessing.pool.ThreadPool(processes=parallelism)
futures = []
with client_factory() as impala_client:
all_dbs = set(name.split('\t')[0].lower() for name
in impala_client.execute("show databases").data)
selected_dbs = all_dbs if db_names is None else set(db_names)
for db in all_dbs.intersection(selected_dbs):
all_tables =\
set([t.lower() for t in impala_client.execute("show tables in %s" % db).data])
selected_tables = all_tables if table_names is None else set(table_names)
for table in all_tables.intersection(selected_tables):
# Submit command to threadpool
futures.append(pool.apply_async(compute_stats_table,
(client_factory, db, table, continue_on_error,)))
# Wait for all stats commands to finish
for f in futures:
f.get()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s: %(message)s')
parser = OptionParser()
parser.add_option("--continue_on_error", dest="continue_on_error",
action="store_true", default=True, help="If True, continue "\
"if there is an error executing the compute stats statement.")
parser.add_option("--stop_on_error", dest="continue_on_error",
action="store_false", default=True, help="If True, stop "\
"if there is an error executing the compute stats statement.")
parser.add_option("--impalad", dest="impalad", default="localhost:21000",
help="Impala daemon <host:port> to connect to.")
parser.add_option("--use_kerberos", action="store_true", default=False,
help="Compute stats on a kerberized cluster.")
parser.add_option("--use_ssl", action="store_true", default=False,
help="Compute stats on a cluster with SSL enabled.")
parser.add_option("--parallelism", type=int, default=multiprocessing.cpu_count(),
help="Number of parallel compute stats commands.")
parser.add_option("--db_names", dest="db_names", default=None,
help="Comma-separated list of database names for which to compute "\
"stats. Can be used in conjunction with the --table_names flag. "\
"If not specified, compute stats will run on tables from all "\
"databases.")
parser.add_option("--table_names", dest="table_names", default=None,
help="Comma-separated list of table names to compute stats over. A"\
" substring comparison is done. If no tables are specified stats "\
"are computed across all tables.")
options, args = parser.parse_args()
table_names = None
if options.table_names is not None:
table_names = [name.lower().strip() for name in options.table_names.split(',')]
db_names = None
if options.db_names is not None:
db_names = [name.lower().strip() for name in options.db_names.split(',')]
@contextmanager
def client_factory():
impala_client = ImpalaBeeswaxClient(options.impalad,
use_kerberos=options.use_kerberos, use_ssl=options.use_ssl)
impala_client.connect()
yield impala_client
impala_client.close_connection()
compute_stats(client_factory, db_names=db_names, table_names=table_names,
continue_on_error=options.continue_on_error, parallelism=options.parallelism)