|  | #!/usr/bin/env impala-python | 
|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements.  See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership.  The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied.  See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  | # | 
|  | # This script is used as the driver to run performance benchmarks. | 
|  | # It does the following: | 
|  | #   - parses the user defined options and validates them. | 
|  | #   - Matches each workload to its set of queries and constructs the required objects. | 
|  | #   - Runs each workload in serial order (a workload is a combination of dataset and scale | 
|  | #     factor) | 
|  | #   - Pretty prints the results of each query's execution. | 
|  | #   - Stores the execution details in JSON format. | 
|  | # | 
|  |  | 
|  | from __future__ import absolute_import, division, print_function | 
|  | import getpass | 
|  | import json | 
|  | import logging | 
|  | import os | 
|  | import prettytable | 
|  |  | 
|  | from collections import deque | 
|  | from copy import deepcopy | 
|  | from datetime import datetime | 
|  | from decimal import Decimal | 
|  | from itertools import groupby | 
|  | from optparse import OptionParser | 
|  | from random import shuffle | 
|  | from sys import exit | 
|  |  | 
|  | from tests.common.test_dimensions import TableFormatInfo | 
|  | from tests.performance.query import Query, HiveQueryResult | 
|  | from tests.performance.query_executor import QueryExecConfig | 
|  | from tests.performance.workload_runner import WorkloadRunner | 
|  | from tests.performance.workload import Workload | 
|  | from tests.util.plugin_runner import PluginRunner | 
|  |  | 
|  | parser = OptionParser() | 
|  | parser.add_option("-v", "--verbose", dest="verbose", action="store_true", | 
|  | default=False, help="If set, outputs all benchmark diagnostics.") | 
|  | parser.add_option("--exploration_strategy", dest="exploration_strategy", default="core", | 
|  | help=("The exploration strategy to use for running benchmark: 'core', " | 
|  | "'pairwise', or 'exhaustive'")) | 
|  | parser.add_option("-w", "--workloads", dest="workloads", default="tpcds", | 
|  | help=("The workload(s) and scale factors to run in a comma-separated " | 
|  | " list format. Optional scale factors for each workload are specified" | 
|  | " using colons. For example: -w tpcds,tpch:400gb,tpch:1gb. " | 
|  | "Some valid workloads:'tpch', 'tpcds', ...")) | 
|  | parser.add_option("--impalads", dest="impalads", default="localhost", | 
|  | help=("A comma-separated list of impalad instances to run the " | 
|  | "workload against.")) | 
|  | parser.add_option("--exec_options", dest="exec_options", default=str(), | 
|  | help=("Run query exec option string " | 
|  | "(formatted as 'opt1:val1;opt2:val2').")) | 
|  | parser.add_option("--results_json_file", dest="results_json_file", | 
|  | default=os.environ['IMPALA_HOME'] + "/benchmark_results.json", | 
|  | help="The output file where benchmark results are saved") | 
|  | parser.add_option("-i", "--query_iterations", type="int", dest="query_iterations", | 
|  | default=1, help="Number of times to run each query within a workload") | 
|  | parser.add_option("-x", "--workload_iterations", type="int", dest="workload_iterations", | 
|  | default=1, help="Number of times to run each workload.") | 
|  | parser.add_option("--num_clients", type="int", dest="num_clients", default=1, | 
|  | help="Number of clients (threads) to use when executing each query.") | 
|  | parser.add_option("--query_names", dest="query_names", default=str(), | 
|  | help="A comma-separated list of regular expressions. A query is" | 
|  | " executed if it matches any of the expressions.") | 
|  | parser.add_option("--table_formats", dest="table_formats", default=str(), | 
|  | help=("Override the default test vectors and run using only the" | 
|  | " specified table formats. Ex. --table_formats=seq/snap/block" | 
|  | ",text/none")) | 
|  | parser.add_option("--shuffle_query_exec_order", dest="shuffle_queries", | 
|  | action="store_true", default=False, help=("Randomizes the order " | 
|  | "of query execution. Useful when the execution scope is a workload")) | 
|  | parser.add_option("--plan_first", dest="plan_first", action="store_true", default=False, | 
|  | help=("Runs EXPLAIN before running the query so that metadata loading" | 
|  | " is excluded from the timing")) | 
|  |  | 
|  | parser.add_option("--use_kerberos", dest="use_kerberos", action="store_true", | 
|  | default=False, help="If set, enables talking to a kerberized impalad") | 
|  | parser.add_option("--continue_on_query_error", dest="continue_on_query_error", | 
|  | action="store_true", default=False, | 
|  | help="If set, continue execution on each query error.") | 
|  | parser.add_option("-c", "--client_type", dest="client_type", default='beeswax', | 
|  | choices=['beeswax', 'jdbc', 'hs2'], | 
|  | help="Client type. Valid options are 'beeswax' or 'jdbc' or 'hs2'") | 
|  | parser.add_option("--plugin_names", dest="plugin_names", default=None, | 
|  | help=("Set of comma-separated plugin names with scope; Plugins are" | 
|  | " specified as <plugin_name>[:<scope>]. If no scope if specified," | 
|  | " it defaults to Query. Plugin names are case sensitive")) | 
|  | parser.add_option("--exec_engine", dest="exec_engine", default="impala", | 
|  | choices=['impala', 'hive'], | 
|  | help=("Which SQL engine to use - impala, hive are valid options")) | 
|  | parser.add_option("--hiveserver", dest="hiveserver", default="localhost", | 
|  | help=("Host that has HiveServers2 service running")) | 
|  | parser.add_option("--user", dest="user", default=getpass.getuser(), | 
|  | help=("User account under which workload/query will run")) | 
|  | parser.add_option("--get_password", dest="get_password", default=False, | 
|  | action="store_true", help=("Prompt for password for user account")) | 
|  | parser.add_option("--use_ssl", dest="use_ssl", action="store_true", default=False, | 
|  | help=("Whether to use SSL or not")) | 
|  |  | 
|  | options, args = parser.parse_args() | 
|  |  | 
|  | options.password = None | 
|  | if options.get_password: | 
|  | options.password = getpass.getpass() | 
|  | options.get_password = None | 
|  |  | 
|  | LOG = logging.getLogger('run-workload') | 
|  |  | 
|  |  | 
|  | class WorkloadConfig(object): | 
|  | """Converts the options dict into a class""" | 
|  | def __init__(self, **config): | 
|  | self.__dict__.update(config) | 
|  |  | 
|  |  | 
|  | class CustomJSONEncoder(json.JSONEncoder): | 
|  | """Override the JSONEncoder's default method. | 
|  |  | 
|  | This class is needed for two reasons: | 
|  | - JSON does have a datetime field. We intercept a datetime object and convert it into | 
|  | a standard iso string. | 
|  | - JSON does not know how to serialize object. We intercept the objects and | 
|  | provide their __dict__ representations | 
|  | """ | 
|  | def default(self, obj,): | 
|  | if isinstance(obj, Decimal): | 
|  | return str(obj) | 
|  | if isinstance(obj, datetime): | 
|  | # Convert datetime into an standard iso string | 
|  | return obj.isoformat() | 
|  | elif isinstance(obj, (Query, HiveQueryResult, QueryExecConfig, TableFormatInfo)): | 
|  | # Serialize these objects manually by returning their __dict__ methods. | 
|  | return obj.__dict__ | 
|  | else: | 
|  | super(CustomJSONEncoder, self).default(obj) | 
|  |  | 
|  |  | 
|  | def prettytable_print(results, failed=False): | 
|  | """Print a list of query results in prettytable""" | 
|  | column_names = ['Query', 'Start Time', 'Time Taken (s)', 'Client ID'] | 
|  | if failed: column_names.append('Error') | 
|  | table = prettytable.PrettyTable(column_names) | 
|  | table.align = 'l' | 
|  | table.float_format = '.2' | 
|  | # Group the results by table format. | 
|  | for table_format_str, gr in groupby(results, lambda x: x.query.table_format_str): | 
|  | print("Table Format: %s" % table_format_str) | 
|  | for result in gr: | 
|  | start_time = result.start_time.strftime("%Y-%m-%d %H:%M:%S") if result.start_time \ | 
|  | is not None else '-' | 
|  | row = [result.query.name, start_time, result.time_taken, result.client_name] | 
|  | if failed: row.append(result.query_error) | 
|  | table.add_row(row) | 
|  | print(table.get_string(sortby='Client ID')) | 
|  | table.clear_rows() | 
|  | print(str()) | 
|  |  | 
|  |  | 
|  | def print_result_summary(results): | 
|  | """Print failed and successfull queries for a given result list""" | 
|  | failed_results = [x for x in results if not x.success] | 
|  | successful_results = [x for x in results if x.success] | 
|  | prettytable_print(successful_results) | 
|  | if failed_results: prettytable_print(failed_results, failed=True) | 
|  |  | 
|  |  | 
|  | def get_workload_scale_factor(): | 
|  | """Extract the workload -> scale factor mapping from the command line | 
|  |  | 
|  | The expected string is "workload_1[:scale_factor_1],...,workload_n[:scale_factor_n]" | 
|  | """ | 
|  | workload_str = options.workloads | 
|  | workload_tuples = split_and_strip(workload_str) | 
|  | assert len(workload_tuples) > 0, "At least one workload must be specified" | 
|  | for workload_tuple in workload_tuples: | 
|  | # Each member should conform to workload[:scale_factor] | 
|  | workload_tuple = split_and_strip(workload_tuple, delim=":") | 
|  | assert len(workload_tuple) in [1, 2], "Error parsing workload:scale_factor" | 
|  | if len(workload_tuple) == 1: workload_tuple.append(str()) | 
|  | yield workload_tuple | 
|  |  | 
|  |  | 
|  | def split_and_strip(input_string, delim=","): | 
|  | """Convert a string into a list using the given delimiter""" | 
|  | if not input_string: return list() | 
|  | return list(map(str.strip, input_string.split(delim))) | 
|  |  | 
|  |  | 
|  | def create_workload_config(): | 
|  | """Parse command line inputs. | 
|  |  | 
|  | Some user inputs needs to be transformed from delimited strings to lists in order to be | 
|  | consumed by the performacne framework. Additionally, plugin_names are converted into | 
|  | objects, and need to be added to the config. | 
|  | """ | 
|  | config = deepcopy(vars(options)) | 
|  | # We don't need workloads and query_names in the config map as they're already specified | 
|  | # in the workload object. | 
|  | del config['workloads'] | 
|  | del config['query_names'] | 
|  | config['plugin_runner'] = plugin_runner | 
|  | # transform a few options from strings to lists | 
|  | config['table_formats'] = split_and_strip(config['table_formats']) | 
|  | impalads = split_and_strip(config['impalads']) | 
|  | # Randomize the order of impalads. | 
|  | shuffle(impalads) | 
|  | config['impalads'] = deque(impalads) | 
|  | return WorkloadConfig(**config) | 
|  |  | 
|  |  | 
|  | def _validate_options(): | 
|  | """Basic validation for some commandline options""" | 
|  | # the sasl module must be importable on a secure setup. | 
|  | if options.use_kerberos: import sasl | 
|  |  | 
|  | # If Hive is the exec engine, hs2 is the only suported interface. | 
|  | if options.exec_engine.lower() == "hive" and options.client_type != "hs2": | 
|  | raise RuntimeError("The only supported client type for Hive engine is hs2") | 
|  |  | 
|  | # Check for duplicate workload/scale_factor combinations | 
|  | workloads = split_and_strip(options.workloads) | 
|  | if not len(set(workloads)) == len(workloads): | 
|  | raise RuntimeError("Duplicate workload/scale factor combinations are not allowed") | 
|  |  | 
|  | # The list of Impalads must be provided as a comma separated list of either host:port | 
|  | # combination or just host. | 
|  | for impalad in split_and_strip(options.impalads): | 
|  | if len(impalad.split(":")) not in [1, 2]: | 
|  | raise RuntimeError("Impalads must be of the form host:port or host.") | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | logging.basicConfig(level=logging.INFO, format='[%(name)s]: %(message)s') | 
|  | # Check for badly formed user options. | 
|  | _validate_options() | 
|  |  | 
|  | # Intialize the PluginRunner. | 
|  | plugin_runner = None | 
|  | if options.plugin_names: | 
|  | plugin_runner = PluginRunner(split_and_strip(options.plugin_names)) | 
|  |  | 
|  | # Intialize workloads. | 
|  | workload_runners = list() | 
|  | query_name_filters = split_and_strip(options.query_names) | 
|  | # Create a workload config object. | 
|  | for workload_name, scale_factor in get_workload_scale_factor(): | 
|  | config = create_workload_config() | 
|  | workload = Workload(workload_name, query_name_filters=query_name_filters) | 
|  | workload_runners.append(WorkloadRunner(workload, scale_factor, config)) | 
|  |  | 
|  | # Run all the workloads serially | 
|  | result_map = dict() | 
|  | exit_code = 0 | 
|  | for workload_runner in workload_runners: | 
|  | try: | 
|  | if plugin_runner: plugin_runner.run_plugins_pre(scope="Workload") | 
|  | workload_runner.run() | 
|  | if plugin_runner: plugin_runner.run_plugins_post(scope="Workload") | 
|  | finally: | 
|  | key = "%s_%s" % (workload_runner.workload.name, workload_runner.scale_factor) | 
|  | result_map[key] = workload_runner.results | 
|  |  | 
|  | if not all(result.success for result in workload_runner.results): exit_code = 1 | 
|  |  | 
|  | # Print the results | 
|  | print("\nWorkload: {0}, Scale Factor: {1}\n".format( | 
|  | workload_runner.workload.name.upper(), workload_runner.scale_factor)) | 
|  | print_result_summary(workload_runner.results) | 
|  |  | 
|  | # Store the results | 
|  | with open(options.results_json_file, 'w') as f: | 
|  | json.dump(result_map, f, cls=CustomJSONEncoder, ensure_ascii=False) | 
|  |  | 
|  | exit(exit_code) |