blob: d21dfe4e458493a91fd48325eff964d6610f23a7 [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This script is used as the driver to run performance benchmarks.
# It does the following:
# - parses the user defined options and validates them.
# - Matches each workload to its set of queries and constructs the required objects.
# - Runs each workload in serial order (a workload is a combination of dataset and scale
# factor)
# - Pretty prints the results of each query's execution.
# - Stores the execution details in JSON format.
#
import getpass
import json
import logging
import os
import prettytable
from collections import deque
from copy import deepcopy
from datetime import datetime
from decimal import Decimal
from itertools import groupby
from optparse import OptionParser
from random import shuffle
from sys import exit
from tests.common.test_dimensions import TableFormatInfo
from tests.performance.query import Query, HiveQueryResult, ImpalaQueryResult
from tests.performance.query_executor import QueryExecConfig
from tests.performance.workload_runner import WorkloadRunner
from tests.performance.workload import Workload
from tests.util.plugin_runner import PluginRunner
parser = OptionParser()
parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
default=False, help="If set, outputs all benchmark diagnostics.")
parser.add_option("--exploration_strategy", dest="exploration_strategy", default="core",
help=("The exploration strategy to use for running benchmark: 'core', "
"'pairwise', or 'exhaustive'"))
parser.add_option("-w", "--workloads", dest="workloads", default="tpcds",
help=("The workload(s) and scale factors to run in a comma-separated "
" list format. Optional scale factors for each workload are specified"
" using colons. For example: -w tpcds,tpch:400gb,tpch:1gb. "
"Some valid workloads:'tpch', 'tpcds', ..."))
parser.add_option("--impalads", dest="impalads", default="localhost",
help=("A comma-separated list of impalad instances to run the "
"workload against."))
parser.add_option("--exec_options", dest="exec_options", default=str(),
help="Runquery exec option string.")
parser.add_option("--results_json_file", dest="results_json_file",
default=os.environ['IMPALA_HOME'] + "/benchmark_results.json",
help="The output file where benchmark results are saved")
parser.add_option("-i", "--query_iterations", type="int", dest="query_iterations",
default=1, help="Number of times to run each query within a workload")
parser.add_option("-x", "--workload_iterations", type="int", dest="workload_iterations",
default=1, help="Number of times to run each workload.")
parser.add_option("--num_clients", type="int", dest="num_clients", default=1,
help="Number of clients (threads) to use when executing each query.")
parser.add_option("--query_names", dest="query_names", default=str(),
help="A comma-separated list of regular expressions. A query is"
" executed if it matches any of the expressions.")
parser.add_option("--table_formats", dest="table_formats", default=str(),
help=("Override the default test vectors and run using only the"
" specified table formats. Ex. --table_formats=seq/snap/block"
",text/none"))
parser.add_option("--shuffle_query_exec_order", dest="shuffle_queries",
action="store_true", default=False, help=("Randomizes the order "
"of query execution. Useful when the execution scope is a workload"))
parser.add_option("--plan_first", dest="plan_first", action="store_true", default=False,
help=("Runs EXPLAIN before running the query so that metadata loading"
" is excluded from the timing"))
parser.add_option("--use_kerberos", dest="use_kerberos", action="store_true",
default=False, help="If set, enables talking to a kerberized impalad")
parser.add_option("--continue_on_query_error", dest="continue_on_query_error",
action="store_true", default=False,
help="If set, continue execution on each query error.")
parser.add_option("-c", "--client_type", dest="client_type", default='beeswax',
choices=['beeswax', 'jdbc', 'hs2'],
help="Client type. Valid options are 'beeswax' or 'jdbc' or 'hs2'")
parser.add_option("--plugin_names", dest="plugin_names", default=None,
help=("Set of comma-separated plugin names with scope; Plugins are"
" specified as <plugin_name>[:<scope>]. If no scope if specified,"
" it defaults to Query. Plugin names are case sensitive"))
parser.add_option("--exec_engine", dest="exec_engine", default="impala",
choices=['impala', 'hive'],
help=("Which SQL engine to use - impala, hive are valid options"))
parser.add_option("--hiveserver", dest="hiveserver", default="localhost",
help=("Host that has HiveServers2 service running"))
parser.add_option("--user", dest="user", default=getpass.getuser(),
help=("User account under which workload/query will run"))
parser.add_option("--get_password", dest="get_password", default=False,
action="store_true", help=("Prompt for password for user account"))
parser.add_option("--use_ssl", dest="use_ssl", action="store_true", default=False,
help=("Whether to use SSL or not"))
options, args = parser.parse_args()
options.password = None
if options.get_password:
options.password = getpass.getpass()
options.get_password = None
logging.basicConfig(level=logging.INFO, format='[%(name)s]: %(message)s')
LOG = logging.getLogger('run-workload')
class WorkloadConfig(object):
"""Converts the options dict into a class"""
def __init__(self, **config):
self.__dict__.update(config)
class CustomJSONEncoder(json.JSONEncoder):
"""Override the JSONEncoder's default method.
This class is needed for two reasons:
- JSON does have a datetime field. We intercept a datetime object and convert it into
a standard iso string.
- JSON does not know how to serialize object. We intercept the objects and
provide their __dict__ representations
"""
def default(self, obj,):
if isinstance(obj, Decimal):
return str(obj)
if isinstance(obj, datetime):
# Convert datetime into an standard iso string
return obj.isoformat()
elif isinstance(obj, (Query, HiveQueryResult, QueryExecConfig, TableFormatInfo)):
# Serialize these objects manually by returning their __dict__ methods.
return obj.__dict__
else:
super(CustomJSONEncoder, self).default(obj)
def prettytable_print(results, failed=False):
"""Print a list of query results in prettytable"""
column_names = ['Query', 'Start Time', 'Time Taken (s)', 'Client ID']
if failed: column_names.append('Error')
table = prettytable.PrettyTable(column_names)
table.align = 'l'
table.float_format = '.2'
# Group the results by table format.
for table_format_str, gr in groupby(results, lambda x: x.query.table_format_str):
print "Table Format: %s" % table_format_str
for result in gr:
start_time = result.start_time.strftime("%Y-%m-%d %H:%M:%S") if result.start_time \
is not None else '-'
row = [result.query.name, start_time, result.time_taken, result.client_name]
if failed: row.append(result.query_error)
table.add_row(row)
print table.get_string(sortby='Client ID')
table.clear_rows()
print str()
def print_result_summary(results):
"""Print failed and successfull queries for a given result list"""
failed_results = filter(lambda x: x.success == False, results)
successful_results = filter(lambda x: x.success == True, results)
prettytable_print(successful_results)
if failed_results: prettytable_print(failed_results, failed=True)
def get_workload_scale_factor():
"""Extract the workload -> scale factor mapping from the command line
The expected string is "workload_1[:scale_factor_1],...,workload_n[:scale_factor_n]"
"""
workload_str = options.workloads
workload_tuples = split_and_strip(workload_str)
assert len(workload_tuples) > 0, "At least one workload must be specified"
for workload_tuple in workload_tuples:
# Each member should conform to workload[:scale_factor]
workload_tuple = split_and_strip(workload_tuple, delim=":")
assert len(workload_tuple) in [1,2], "Error parsing workload:scale_factor"
if len(workload_tuple) == 1: workload_tuple.append(str())
yield workload_tuple
def split_and_strip(input_string, delim=","):
"""Convert a string into a list using the given delimiter"""
if not input_string: return list()
return map(str.strip, input_string.split(delim))
def create_workload_config():
"""Parse command line inputs.
Some user inputs needs to be transformed from delimited strings to lists in order to be
consumed by the performacne framework. Additionally, plugin_names are converted into
objects, and need to be added to the config.
"""
config = deepcopy(vars(options))
# We don't need workloads and query_names in the config map as they're already specified
# in the workload object.
del config['workloads']
del config['query_names']
config['plugin_runner'] = plugin_runner
# transform a few options from strings to lists
config['table_formats'] = split_and_strip(config['table_formats'])
impalads = split_and_strip(config['impalads'])
# Randomize the order of impalads.
shuffle(impalads)
config['impalads'] = deque(impalads)
return WorkloadConfig(**config)
def _validate_options():
"""Basic validation for some commandline options"""
# the sasl module must be importable on a secure setup.
if options.use_kerberos: import sasl
# If Hive is the exec engine, hs2 is the only suported interface.
if options.exec_engine.lower() == "hive" and options.client_type != "hs2":
raise RuntimeError("The only supported client type for Hive engine is hs2")
# Check for duplicate workload/scale_factor combinations
workloads = split_and_strip(options.workloads)
if not len(set(workloads)) == len(workloads):
raise RuntimeError("Duplicate workload/scale factor combinations are not allowed")
# The list of Impalads must be provided as a comma separated list of either host:port
# combination or just host.
for impalad in split_and_strip(options.impalads):
if len(impalad.split(":")) not in [1,2]:
raise RuntimeError("Impalads must be of the form host:port or host.")
if __name__ == "__main__":
# Check for badly formed user options.
_validate_options()
# Intialize the PluginRunner.
plugin_runner = None
if options.plugin_names:
plugin_runner = PluginRunner(split_and_strip(options.plugin_names))
# Intialize workloads.
workload_runners = list()
query_name_filters = split_and_strip(options.query_names)
# Create a workload config object.
for workload_name, scale_factor in get_workload_scale_factor():
config = create_workload_config()
workload = Workload(workload_name, query_name_filters=query_name_filters)
workload_runners.append(WorkloadRunner(workload, scale_factor, config))
# Run all the workloads serially
result_map = dict()
exit_code = 0
for workload_runner in workload_runners:
try:
if plugin_runner: plugin_runner.run_plugins_pre(scope="Workload")
workload_runner.run()
if plugin_runner: plugin_runner.run_plugins_post(scope="Workload")
finally:
key = "%s_%s" % (workload_runner.workload.name, workload_runner.scale_factor)
result_map[key] = workload_runner.results
if not all(result.success for result in workload_runner.results): exit_code = 1
# Print the results
print "\nWorkload: {0}, Scale Factor: {1}\n".format(
workload_runner.workload.name.upper(), workload_runner.scale_factor)
print_result_summary(workload_runner.results)
# Store the results
with open(options.results_json_file, 'w') as f:
json.dump(result_map, f, cls=CustomJSONEncoder)
exit(exit_code)