| #!/usr/bin/env impala-python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # This script provides help with parsing and reporting of perf results. It currently |
| # provides three main capabilities: |
| # 1) Printing perf results to console in 'pretty' format |
| # 2) Comparing two perf result sets together and displaying comparison results to console |
| # 3) Outputting the perf results in JUnit format which is useful for plugging in to |
| # Jenkins perf reporting. |
| |
| # By default in Python if you divide an int by another int (5 / 2), the result will also |
| # be an int (2). The following line changes this behavior so that float will be returned |
| # if necessary (2.5). |
| |
| from __future__ import division |
| import difflib |
| import json |
| import logging |
| import os |
| import prettytable |
| import re |
| from collections import defaultdict |
| from datetime import date, datetime |
| from optparse import OptionParser |
| from tests.util.calculation_util import ( |
| calculate_tval, calculate_avg, calculate_stddev, calculate_geomean, calculate_mwu) |
| |
| LOG = logging.getLogger(__name__) |
| |
| # String constants |
| AVG = 'avg' |
| AVG_TIME = 'avg_time' |
| BASELINE_AVG = 'baseline_avg' |
| BASELINE_MAX = 'baseline_max' |
| CLIENT_NAME = 'client_name' |
| COMPRESSION_CODEC = 'compression_codec' |
| COMPRESSION_TYPE = 'compression_type' |
| DELTA_AVG = 'delta_avg' |
| DELTA_MAX = 'delta_max' |
| DELTA_RSTD = 'delta_rstd' |
| DETAIL = 'detail' |
| EST_NUM_ROWS = 'est_num_rows' |
| EST_PEAK_MEM = 'est_peak_mem' |
| EXECUTOR_NAME = 'executor_name' |
| EXEC_SUMMARY = 'exec_summary' |
| FILE_FORMAT = 'file_format' |
| GEOMEAN = 'geomean' |
| ITERATIONS = 'iterations' |
| MAX_TIME = 'max_time' |
| NAME = 'name' |
| NUM_CLIENTS = 'num_clients' |
| NUM_HOSTS = 'num_hosts' |
| NUM_INSTANCES = 'num_instances' |
| NUM_ROWS = 'num_rows' |
| OPERATOR = 'operator' |
| PEAK_MEM = 'peak_mem' |
| PERCENT_OF_QUERY = 'percent_of_query' |
| PREFIX = 'prefix' |
| QUERY = 'query' |
| QUERY_STR = 'query_str' |
| REF_RSTD = 'ref_rstd' |
| RESULT_LIST = 'result_list' |
| RSTD = 'rstd' |
| RUNTIME_PROFILE = 'runtime_profile' |
| SCALE_FACTOR = 'scale_factor' |
| SORTED = 'sorted' |
| STDDEV = 'stddev' |
| STDDEV_TIME = 'stddev_time' |
| TEST_VECTOR = 'test_vector' |
| TIME_TAKEN = 'time_taken' |
| WORKLOAD_NAME = 'workload_name' |
| |
| parser = OptionParser() |
| parser.add_option("--input_result_file", dest="result_file", |
| default=os.environ['IMPALA_HOME'] + '/benchmark_results.json', |
| help="The input JSON file with benchmark results") |
| parser.add_option("--hive_results", dest="hive_results", action="store_true", |
| help="Process results generated from queries ran against Hive") |
| parser.add_option("--reference_result_file", dest="reference_result_file", |
| default=os.environ['IMPALA_HOME'] + '/reference_benchmark_results.json', |
| help="The input JSON file with reference benchmark results") |
| parser.add_option("--junit_output_file", dest="junit_output_file", default='', |
| help='If set, outputs results in Junit format to the specified file') |
| parser.add_option("--no_output_table", dest="no_output_table", action="store_true", |
| default=False, help='Outputs results in table format to the console') |
| parser.add_option("--report_description", dest="report_description", default=None, |
| help='Optional description for the report.') |
| parser.add_option("--cluster_name", dest="cluster_name", default='UNKNOWN', |
| help="Name of the cluster the results are from (ex. Bolt)") |
| parser.add_option("--verbose", "-v", dest="verbose", action="store_true", |
| default=False, help='Outputs to console with with increased verbosity') |
| parser.add_option("--output_all_summary_nodes", dest="output_all_summary_nodes", |
| action="store_true", default=False, |
| help='Print all execution summary nodes') |
| parser.add_option("--build_version", dest="build_version", default='UNKNOWN', |
| help="Build/version info about the Impalad instance results are from.") |
| parser.add_option("--lab_run_info", dest="lab_run_info", default='UNKNOWN', |
| help="Information about the lab run (name/id) that published "\ |
| "the results.") |
| parser.add_option("--run_user_name", dest="run_user_name", default='anonymous', |
| help="User name that this run is associated with in the perf database") |
| parser.add_option("--tval_threshold", dest="tval_threshold", default=3.0, |
| type="float", help="The ttest t-value at which a performance change "\ |
| "will be flagged as sigificant.") |
| parser.add_option("--zval_threshold", dest="zval_threshold", default=3.0, type="float", |
| help="The Mann-Whitney Z-value at which a performance change will be " |
| "flagged as sigificant.") |
| parser.add_option("--min_percent_change_threshold", |
| dest="min_percent_change_threshold", default=5.0, |
| type="float", help="Any performance changes below this threshold" \ |
| " will not be classified as significant. If the user specifies an" \ |
| " empty value, the threshold will be set to 0") |
| parser.add_option("--max_percent_change_threshold", |
| dest="max_percent_change_threshold", default=float("inf"), |
| type="float", help="Any performance changes above this threshold"\ |
| " will be classified as significant. If the user specifies an" \ |
| " empty value, the threshold will be set to positive infinity") |
| parser.add_option("--allowed_latency_diff_secs", |
| dest="allowed_latency_diff_secs", default=0.0, type="float", |
| help="If specified, only a timing change that differs by more than\ |
| this value will be considered significant.") |
| options, args = parser.parse_args() |
| |
| |
| def get_dict_from_json(filename): |
| """Given a JSON file, return a nested dictionary. |
| |
| Everything in this file is based on the nested dictionary data structure. The dictionary |
| is structured as follows: Top level maps to workload. Each workload maps to file_format. |
| Each file_format maps to queries. Each query contains a key "result_list" that maps to a |
| list of ImpalaQueryResult (look at query.py) dictionaries. The compute stats method |
| add additional keys such as "avg" or "stddev" here. |
| |
| Here's how the keys are structred: |
| To get a workload, the key looks like this: |
| (('workload_name', 'tpch'), ('scale_factor', '300gb')) |
| Each workload has a key that looks like this: |
| (('file_format', 'text'), ('compression_codec', 'zip'), |
| ('compression_type', 'block')) |
| Each Query has a key like this: |
| (('name', 'TPCH_Q10')) |
| |
| This is useful for finding queries in a certain category and computing stats |
| |
| Args: |
| filename (str): path to the JSON file |
| |
| returns: |
| dict: a nested dictionary with grouped queries |
| """ |
| |
| def add_result(query_result): |
| """Add query to the dictionary. |
| |
| Automatically finds the path in the nested dictionary and adds the result to the |
| appropriate list. |
| |
| TODO: This method is hard to reason about, so it needs to be made more streamlined. |
| """ |
| |
| def get_key(level_num): |
| """Build a key for a particular nesting level. |
| |
| The key is built by extracting the appropriate values from query_result. |
| """ |
| |
| level = list() |
| # In the outer layer, we group by workload name and scale factor |
| level.append([('query', 'workload_name'), ('query', 'scale_factor')]) |
| # In the middle layer, we group by file format and compression type |
| level.append([('query', 'test_vector', 'file_format'), |
| ('query', 'test_vector', 'compression_codec'), |
| ('query', 'test_vector', 'compression_type')]) |
| # In the bottom layer, we group by query name |
| level.append([('query', 'name')]) |
| |
| key = [] |
| |
| def get_nested_val(path): |
| """given a path to a variable in query result, extract the value. |
| |
| For example, to extract compression_type from the query_result, we need to follow |
| the this path in the nested dictionary: |
| "query_result" -> "query" -> "test_vector" -> "compression_type" |
| """ |
| cur = query_result |
| for step in path: |
| cur = cur[step] |
| return cur |
| |
| for path in level[level_num]: |
| key.append((path[-1], get_nested_val(path))) |
| |
| return tuple(key) |
| |
| # grouped is the nested dictionary defined in the outer function get_dict_from_json. |
| # It stores all the results grouped by query name and other parameters. |
| cur = grouped |
| # range(3) because there are 3 levels of nesting, as defined in get_key |
| for level_num in range(3): |
| cur = cur[get_key(level_num)] |
| cur[RESULT_LIST].append(query_result) |
| |
| with open(filename, "r") as f: |
| data = json.load(f) |
| grouped = defaultdict( lambda: defaultdict( |
| lambda: defaultdict(lambda: defaultdict(list)))) |
| for workload_name, workload in data.items(): |
| for query_result in workload: |
| if query_result['success']: |
| add_result(query_result) |
| calculate_time_stats(grouped) |
| return grouped |
| |
| def all_query_results(grouped): |
| for workload_scale, workload in grouped.items(): |
| for file_format, queries in workload.items(): |
| for query_name, results in queries.items(): |
| yield(results) |
| |
| |
| def get_commit_date(commit_sha): |
| import urllib2 |
| |
| url = 'https://api.github.com/repos/apache/impala/commits/' + commit_sha |
| try: |
| request = urllib2.Request(url) |
| response = urllib2.urlopen(request).read() |
| data = json.loads(response.decode('utf8')) |
| return data['commit']['committer']['date'][:10] |
| except: |
| return '' |
| |
| def get_impala_version(grouped): |
| """Figure out Impala version by looking at query profile.""" |
| first_result = all_query_results(grouped).next() |
| profile = first_result['result_list'][0]['runtime_profile'] |
| match = re.search('Impala Version:\s(.*)\s\(build\s(.*)\)', profile) |
| version = match.group(1) |
| commit_sha = match.group(2) |
| commit_date = get_commit_date(commit_sha) |
| return '{0} ({1})'.format(version, commit_date) |
| |
| def calculate_time_stats(grouped): |
| """ |
| Add statistics to the nested dictionary. |
| |
| Each query name is supplemented with the average, standard deviation, number of clients, |
| iterations, and a sorted list of the time taken to complete each run. |
| """ |
| |
| def remove_first_run(result_list): |
| """We want to remove the first result because the performance is much worse on the |
| first run. |
| """ |
| if len(result_list) > 1: |
| # We want to remove the first result only if there is more that one result |
| result_list.remove(min(result_list, key=lambda result: result['start_time'])) |
| |
| for workload_scale, workload in grouped.items(): |
| for file_format, queries in workload.items(): |
| for query_name, results in queries.items(): |
| result_list = results[RESULT_LIST] |
| remove_first_run(result_list) |
| avg = calculate_avg( |
| [query_results[TIME_TAKEN] for query_results in result_list]) |
| dev = calculate_stddev( |
| [query_results[TIME_TAKEN] for query_results in result_list]) |
| num_clients = max( |
| int(query_results[CLIENT_NAME]) for query_results in result_list) |
| |
| iterations = int((len(result_list) + 1) / num_clients) |
| results[AVG] = avg |
| results[STDDEV] = dev |
| results[NUM_CLIENTS] = num_clients |
| results[ITERATIONS] = iterations |
| results[SORTED] = [query_results[TIME_TAKEN] for query_results in result_list] |
| results[SORTED].sort() |
| |
| class Report(object): |
| |
| significant_perf_change = False |
| |
| class FileFormatComparisonRow(object): |
| """Represents a row in the overview table, where queries are grouped together and |
| average and geomean are calculated per workload and file format (first table in the |
| report). |
| """ |
| def __init__(self, workload_scale, file_format, queries, ref_queries): |
| |
| time_list = [] |
| ref_time_list = [] |
| for query_name, results in queries.items(): |
| if query_name in ref_queries: |
| # We want to calculate the average and geomean of the query only if it is both |
| # results and reference results |
| for query_results in results[RESULT_LIST]: |
| time_list.append(query_results[TIME_TAKEN]) |
| ref_results = ref_queries[query_name] |
| for ref_query_results in ref_results[RESULT_LIST]: |
| ref_time_list.append(ref_query_results[TIME_TAKEN]) |
| |
| |
| self.workload_name = '{0}({1})'.format( |
| workload_scale[0][1].upper(), workload_scale[1][1]) |
| |
| self.file_format = '{0} / {1} / {2}'.format( |
| file_format[0][1], file_format[1][1], file_format[2][1]) |
| |
| self.avg = calculate_avg(time_list) |
| ref_avg = calculate_avg(ref_time_list) |
| |
| self.delta_avg = calculate_change(self.avg, ref_avg) |
| |
| self.geomean = calculate_geomean(time_list) |
| ref_geomean = calculate_geomean(ref_time_list) |
| |
| self.delta_geomean = calculate_change(self.geomean, ref_geomean) |
| |
| class QueryComparisonRow(object): |
| """Represents a row in the table where individual queries are shown (second table in |
| the report). |
| """ |
| def __init__(self, results, ref_results): |
| self.workload_name = '{0}({1})'.format( |
| results[RESULT_LIST][0][QUERY][WORKLOAD_NAME].upper(), |
| results[RESULT_LIST][0][QUERY][SCALE_FACTOR]) |
| self.query_name = results[RESULT_LIST][0][QUERY][NAME] |
| self.file_format = '{0} / {1} / {2}'.format( |
| results[RESULT_LIST][0][QUERY][TEST_VECTOR][FILE_FORMAT], |
| results[RESULT_LIST][0][QUERY][TEST_VECTOR][COMPRESSION_TYPE], |
| results[RESULT_LIST][0][QUERY][TEST_VECTOR][COMPRESSION_TYPE]) |
| self.avg = results[AVG] |
| self.rsd = results[STDDEV] / self.avg if self.avg > 0 else 0.0 |
| self.significant_variability = True if self.rsd > 0.1 else False |
| self.num_clients = results[NUM_CLIENTS] |
| self.iters = results[ITERATIONS] |
| |
| if ref_results is None: |
| self.perf_change = False |
| self.zval = 0 |
| self.tval = 0 |
| # If reference results are not present, comparison columns will have inf in them |
| self.base_avg = float('-inf') |
| self.base_rsd = float('-inf') |
| self.delta_avg = float('-inf') |
| self.perf_change_str = '' |
| else: |
| median = results[SORTED][int(len(results[SORTED]) / 2)] |
| all_diffs = [x - y for x in results[SORTED] for y in ref_results[SORTED]] |
| all_diffs.sort() |
| self.median_diff = all_diffs[int(len(all_diffs) / 2)] / median |
| self.perf_change, self.zval, self.tval = ( |
| self.__check_perf_change_significance(results, ref_results)) |
| self.base_avg = ref_results[AVG] |
| self.base_rsd = ref_results[STDDEV] / self.base_avg if self.base_avg > 0 else 0.0 |
| self.delta_avg = calculate_change(self.avg, self.base_avg) |
| if self.perf_change: |
| self.perf_change_str = self.__build_perf_change_str( |
| results, ref_results, self.zval, self.tval) |
| Report.significant_perf_change = True |
| else: |
| self.perf_change_str = '' |
| |
| if not options.hive_results: |
| try: |
| save_runtime_diffs(results, ref_results, self.perf_change, self.zval, self.tval) |
| except Exception as e: |
| LOG.error('Could not generate an html diff: {0}'.format(e)) |
| |
| def __check_perf_change_significance(self, stat, ref_stat): |
| zval = calculate_mwu(stat[SORTED], ref_stat[SORTED]) |
| tval = calculate_tval(stat[AVG], stat[STDDEV], stat[ITERATIONS], |
| ref_stat[AVG], ref_stat[STDDEV], ref_stat[ITERATIONS]) |
| try: |
| percent_difference = abs(ref_stat[AVG] - stat[AVG]) * 100 / ref_stat[AVG] |
| except ZeroDivisionError: |
| percent_difference = 0.0 |
| absolute_difference = abs(ref_stat[AVG] - stat[AVG]) |
| if absolute_difference < options.allowed_latency_diff_secs: |
| return False, zval, tval |
| if percent_difference < options.min_percent_change_threshold: |
| return False, zval, tval |
| return (abs(zval) > options.zval_threshold or abs(tval) > options.tval_threshold, |
| zval, tval) |
| |
| def __build_perf_change_str(self, result, ref_result, zval, tval): |
| """Build a performance change string. |
| |
| For example: |
| Regression: TPCDS-Q52 [parquet/none/none] (1.390s -> 1.982s [+42.59%]) |
| """ |
| perf_change_type = ("(R) Regression" if zval >= 0 and tval >= 0 |
| else "(I) Improvement" if zval <= 0 and tval <= 0 |
| else "(?) Anomoly") |
| query = result[RESULT_LIST][0][QUERY] |
| |
| workload_name = '{0}({1})'.format( |
| query[WORKLOAD_NAME].upper(), |
| query[SCALE_FACTOR]) |
| query_name = query[NAME] |
| file_format = query[TEST_VECTOR][FILE_FORMAT] |
| compression_codec = query[TEST_VECTOR][COMPRESSION_CODEC] |
| compression_type = query[TEST_VECTOR][COMPRESSION_TYPE] |
| |
| template = ("{perf_change_type}: " |
| "{workload_name} {query_name} " |
| "[{file_format} / {compression_codec} / {compression_type}] " |
| "({ref_avg:.2f}s -> {avg:.2f}s [{delta:+.2%}])\n") |
| |
| perf_change_str = template.format( |
| perf_change_type = perf_change_type, |
| workload_name = workload_name, |
| query_name = query_name, |
| file_format = file_format, |
| compression_codec = compression_codec, |
| compression_type = compression_type, |
| ref_avg = ref_result[AVG], |
| avg = result[AVG], |
| delta = calculate_change(result[AVG], ref_result[AVG])) |
| |
| perf_change_str += build_exec_summary_str(result, ref_result) |
| |
| return perf_change_str + '\n' |
| |
| class QueryVariabilityRow(object): |
| """Represents a row in the query variability table. |
| """ |
| def __init__(self, results, ref_results): |
| |
| if ref_results is None: |
| self.base_rel_stddev = float('inf') |
| else: |
| self.base_rel_stddev = ref_results[STDDEV] / ref_results[AVG]\ |
| if ref_results > 0 else 0.0 |
| |
| self.workload_name = '{0}({1})'.format( |
| results[RESULT_LIST][0][QUERY][WORKLOAD_NAME].upper(), |
| results[RESULT_LIST][0][QUERY][SCALE_FACTOR]) |
| |
| self.query_name = results[RESULT_LIST][0][QUERY][NAME] |
| self.file_format = results[RESULT_LIST][0][QUERY][TEST_VECTOR][FILE_FORMAT] |
| self.compression = results[RESULT_LIST][0][QUERY][TEST_VECTOR][COMPRESSION_CODEC]\ |
| + ' / ' + results[RESULT_LIST][0][QUERY][TEST_VECTOR][COMPRESSION_TYPE] |
| |
| self.rel_stddev = results[STDDEV] / results[AVG] if results[AVG] > 0 else 0.0 |
| self.significant_variability = self.rel_stddev > 0.1 |
| |
| variability_template = ("(V) Significant Variability: " |
| "{workload_name} {query_name} [{file_format} / {compression}] " |
| "({base_rel_stddev:.2%} -> {rel_stddev:.2%})\n") |
| |
| if self.significant_variability and ref_results: |
| #If ref_results do not exist, variability analysis will not be conducted |
| self.variability_str = variability_template.format( |
| workload_name = self.workload_name, |
| query_name = self.query_name, |
| file_format = self.file_format, |
| compression = self.compression, |
| base_rel_stddev = self.base_rel_stddev, |
| rel_stddev = self.rel_stddev) |
| |
| self.exec_summary_str = build_exec_summary_str( |
| results, ref_results, for_variability = True) |
| else: |
| self.variability_str = str() |
| self.exec_summary_str = str() |
| |
| def __str__(self): |
| return self.variability_str + self.exec_summary_str |
| |
| def __init__(self, grouped, ref_grouped): |
| self.grouped = grouped |
| self.ref_grouped = ref_grouped |
| self.query_comparison_rows = [] |
| self.file_format_comparison_rows = [] |
| self.query_variability_rows = [] |
| self.__analyze() |
| |
| def __analyze(self): |
| """Generates a comparison data that can be printed later""" |
| |
| for workload_scale, workload in self.grouped.items(): |
| for file_format, queries in workload.items(): |
| if self.ref_grouped is not None and workload_scale in self.ref_grouped and\ |
| file_format in self.ref_grouped[ workload_scale]: |
| ref_queries = self.ref_grouped[workload_scale][file_format] |
| self.file_format_comparison_rows.append(Report.FileFormatComparisonRow( |
| workload_scale, file_format, queries, ref_queries)) |
| else: |
| #If not present in reference results, set to None |
| ref_queries = None |
| for query_name, results in queries.items(): |
| if self.ref_grouped is not None and workload_scale in self.ref_grouped and\ |
| file_format in self.ref_grouped[workload_scale] and query_name in\ |
| self.ref_grouped[workload_scale][file_format]: |
| ref_results = self.ref_grouped[workload_scale][file_format][query_name] |
| query_comparison_row = Report.QueryComparisonRow(results, ref_results) |
| self.query_comparison_rows.append(query_comparison_row) |
| |
| query_variability_row = Report.QueryVariabilityRow(results, ref_results) |
| self.query_variability_rows.append(query_variability_row) |
| else: |
| #If not present in reference results, set to None |
| ref_results = None |
| |
| def __str__(self): |
| output = str() |
| |
| #per file format analysis overview table |
| table = prettytable.PrettyTable(['Workload', 'File Format', 'Avg (s)', 'Delta(Avg)', |
| 'GeoMean(s)', 'Delta(GeoMean)']) |
| table.float_format = '.2' |
| table.align = 'l' |
| self.file_format_comparison_rows.sort( |
| key = lambda row: row.delta_geomean, reverse = True) |
| for row in self.file_format_comparison_rows: |
| table_row = [ |
| row.workload_name, |
| row.file_format, |
| row.avg, |
| '{0:+.2%}'.format(row.delta_avg), |
| row.geomean, |
| '{0:+.2%}'.format(row.delta_geomean)] |
| table.add_row(table_row) |
| |
| output += str(table) + '\n\n' |
| |
| #main comparison table |
| detailed_performance_change_analysis_str = str() |
| table = prettytable.PrettyTable(['Workload', 'Query', 'File Format', 'Avg(s)', |
| 'Base Avg(s)', 'Delta(Avg)', 'StdDev(%)', |
| 'Base StdDev(%)', 'Iters', 'Median Diff(%)', |
| 'MW Zval', 'Tval']) |
| table.float_format = '.2' |
| table.align = 'l' |
| #Sort table from worst to best regression |
| self.query_comparison_rows.sort(key=lambda row: row.delta_avg + row.median_diff, |
| reverse=True) |
| for row in self.query_comparison_rows: |
| delta_avg_template = ' {0:+.2%}' if not row.perf_change else ( |
| 'R {0:+.2%}' if row.zval >= 0 and row.tval >= 0 else 'I {0:+.2%}' if row.zval <= 0 |
| and row.tval <= 0 else '? {0:+.2%}') |
| |
| table_row = [ |
| row.workload_name, |
| row.query_name, |
| row.file_format, |
| row.avg, |
| row.base_avg if row.base_avg != float('-inf') else 'N/A', |
| ' N/A' if row.delta_avg == float('-inf') else delta_avg_template.format( |
| row.delta_avg), |
| ('* {0:.2%} *' if row.rsd > 0.1 else ' {0:.2%} ').format(row.rsd), |
| ' N/A' if row.base_rsd == float('-inf') else ( |
| '* {0:.2%} *' if row.base_rsd > 0.1 else ' {0:.2%} ').format(row.base_rsd), |
| row.iters, |
| ' N/A' if row.median_diff == float('-inf') else delta_avg_template.format( |
| row.median_diff), |
| row.zval, |
| row.tval] |
| table.add_row(table_row) |
| detailed_performance_change_analysis_str += row.perf_change_str |
| |
| output += str(table) + '\n\n' |
| output += detailed_performance_change_analysis_str |
| |
| variability_analysis_str = str() |
| self.query_variability_rows.sort(key = lambda row: row.rel_stddev, reverse = True) |
| for row in self.query_variability_rows: |
| variability_analysis_str += str(row) |
| |
| output += variability_analysis_str |
| |
| if Report.significant_perf_change: |
| output += 'Significant perf change detected' |
| |
| return output |
| |
| class CombinedExecSummaries(object): |
| """All execution summaries for each query are combined into this object. |
| |
| The overall average time is calculated for each node by averaging the average time |
| from each execution summary. The max time time is calculated by getting the max time |
| of max times. |
| |
| This object can be compared to another one and ExecSummaryComparison can be generated. |
| |
| Args: |
| exec_summaries (list of list of dict): A list of exec summaries (list of dict is how |
| it is received from the beeswax client. |
| |
| Attributes: |
| rows (list of dict): each dict represents a row in the summary table. Each row in rows |
| is a dictionary. Each dictionary has the following keys: |
| prefix (str) |
| operator (str) |
| num_hosts (int) |
| num_instances (int) |
| num_rows (int) |
| est_num_rows (int) |
| detail (str) |
| avg_time (float): averge of average times in all the execution summaries |
| stddev_time: standard deviation of times in all the execution summaries |
| max_time: maximum of max times in all the execution summaries |
| peak_mem (int) |
| est_peak_mem (int) |
| """ |
| |
| def __init__(self, exec_summaries): |
| # We want to make sure that all execution summaries have the same structure before |
| # we can combine them. If not, err_str will contain the reason why we can't combine |
| # the exec summaries. |
| ok, err_str = self.__check_exec_summary_schema(exec_summaries) |
| self.error_str = err_str |
| |
| self.rows = [] |
| if ok: |
| self.__build_rows(exec_summaries) |
| |
| def __build_rows(self, exec_summaries): |
| |
| first_exec_summary = exec_summaries[0] |
| |
| for row_num, row in enumerate(first_exec_summary): |
| combined_row = {} |
| # Copy fixed values from the first exec summary |
| for key in [PREFIX, OPERATOR, NUM_HOSTS, NUM_INSTANCES, NUM_ROWS, EST_NUM_ROWS, |
| DETAIL]: |
| combined_row[key] = row[key] |
| |
| avg_times = [exec_summary[row_num][AVG_TIME] for exec_summary in exec_summaries] |
| max_times = [exec_summary[row_num][MAX_TIME] for exec_summary in exec_summaries] |
| peak_mems = [exec_summary[row_num][PEAK_MEM] for exec_summary in exec_summaries] |
| est_peak_mems = [exec_summary[row_num][EST_PEAK_MEM] |
| for exec_summary in exec_summaries] |
| |
| # Set the calculated values |
| combined_row[AVG_TIME] = calculate_avg(avg_times) |
| combined_row[STDDEV_TIME] = calculate_stddev(avg_times) |
| combined_row[MAX_TIME] = max(max_times) |
| combined_row[PEAK_MEM] = max(peak_mems) |
| combined_row[EST_PEAK_MEM] = max(est_peak_mems) |
| self.rows.append(combined_row) |
| |
| def is_same_schema(self, reference): |
| """Check if the reference CombinedExecSummaries summary has the same schema as this |
| one. (For example, the operator names are the same for each node). |
| |
| The purpose of this is to check if it makes sense to combine this object with a |
| reference one to produce ExecSummaryComparison. |
| |
| Args: |
| reference (CombinedExecSummaries): comparison |
| |
| Returns: |
| bool: True if the schama's are similar enough to be compared, False otherwise. |
| """ |
| |
| if len(self.rows) != len(reference.rows): return False |
| |
| for row_num, row in enumerate(self.rows): |
| ref_row = reference.rows[row_num] |
| |
| if row[OPERATOR] != ref_row[OPERATOR]: |
| return False |
| |
| return True |
| |
| def __str__(self): |
| if self.error_str: return self.error_str |
| |
| table = prettytable.PrettyTable( |
| ["Operator", |
| "#Hosts", |
| "#Inst" |
| "Avg Time", |
| "Std Dev", |
| "Max Time", |
| "#Rows", |
| "Est #Rows"]) |
| table.align = 'l' |
| table.float_format = '.2' |
| |
| for row in self.rows: |
| table_row = [ row[PREFIX] + row[OPERATOR], |
| prettyprint_values(row[NUM_HOSTS]), |
| prettyprint_values(row[NUM_INSTANCES]), |
| prettyprint_time(row[AVG_TIME]), |
| prettyprint_time(row[STDDEV_TIME]), |
| prettyprint_time(row[MAX_TIME]), |
| prettyprint_values(row[NUM_ROWS]), |
| prettyprint_values(row[EST_NUM_ROWS])] |
| table.add_row(table_row) |
| |
| return str(table) |
| |
| @property |
| def total_runtime(self): |
| return sum([row[AVG_TIME] for row in self.rows]) |
| |
| def __check_exec_summary_schema(self, exec_summaries): |
| """Check if all given exec summaries have the same structure. |
| |
| This method is called to check if it is possible a single CombinedExecSummaries from |
| the list of exec_summaries. (For example all exec summaries must have the same |
| number of nodes.) |
| |
| This method is somewhat similar to is_same_schema. The difference is that |
| is_same_schema() checks if two CombinedExecSummaries have the same structure and this |
| method checks if all exec summaries in the list have the same structure. |
| |
| Args: |
| exec_summaries (list of dict): each dict represents an exec_summary |
| |
| Returns: |
| (bool, str): True if all exec summaries have the same structure, otherwise False |
| followed by a string containing the explanation. |
| """ |
| |
| err = 'Summaries cannot be combined: ' |
| |
| if len(exec_summaries) < 1: |
| return False, err + 'no exec summaries Found' |
| |
| first_exec_summary = exec_summaries[0] |
| |
| # This check is for Metadata queries which don't have summaries |
| if len(first_exec_summary) < 1: |
| return False, err + 'exec summary contains no nodes' |
| |
| for exec_summary in exec_summaries: |
| if len(exec_summary) != len(first_exec_summary): |
| return False, err + 'different number of nodes in exec summaries' |
| |
| for row_num, row in enumerate(exec_summary): |
| comp_row = first_exec_summary[row_num] |
| if row[OPERATOR] != comp_row[OPERATOR]: |
| return False, err + 'different operator' |
| |
| return True, str() |
| |
| |
| class ExecSummaryComparison(object): |
| """Represents a comparison between two CombinedExecSummaries. |
| |
| Args: |
| combined_summary (CombinedExecSummaries): current summary. |
| ref_combined_summary (CombinedExecSummaries): reference summaries. |
| |
| Attributes: |
| rows (list of dict): Each dict represents a single row. Each dict has the following |
| keys: |
| prefix (str) |
| operator (str) |
| num_hosts (int) |
| num_instances (int) |
| avg_time (float) |
| stddev_time (float) |
| avg_time_change (float): % change in avg time compared to reference |
| avg_time_change_total (float): % change in avg time compared to total of the query |
| max_time (float) |
| max_time_change (float): % change in max time compared to reference |
| peak_mem (int) |
| peak_mem_change (float): % change compared to reference |
| num_rows (int) |
| est_num_rows (int) |
| est_peak_mem (int) |
| detail (str) |
| combined_summary (CombinedExecSummaries): original combined summary |
| ref_combined_summary (CombinedExecSummaries): original reference combined summary. |
| If the comparison cannot be constructed, these summaries can be printed. |
| |
| Another possible way to implement this is to generate this object when we call |
| CombinedExecSummaries.compare(reference). |
| """ |
| |
| def __init__(self, combined_summary, ref_combined_summary, for_variability = False): |
| |
| # Store the original summaries, in case we can't build a comparison |
| self.combined_summary = combined_summary |
| self.ref_combined_summary = ref_combined_summary |
| |
| # If some error happened during calculations, store it here |
| self.error_str = str() |
| self.for_variability = for_variability |
| |
| self.rows = [] |
| |
| def __build_rows(self): |
| |
| if self.combined_summary.is_same_schema(self.ref_combined_summary): |
| for i, row in enumerate(self.combined_summary.rows): |
| ref_row = self.ref_combined_summary.rows[i] |
| |
| comparison_row = {} |
| for key in [PREFIX, OPERATOR, NUM_HOSTS, NUM_INSTANCES, AVG_TIME, STDDEV_TIME, |
| MAX_TIME, PEAK_MEM, NUM_ROWS, EST_NUM_ROWS, EST_PEAK_MEM, DETAIL]: |
| comparison_row[key] = row[key] |
| |
| comparison_row[PERCENT_OF_QUERY] = row[AVG_TIME] /\ |
| self.combined_summary.total_runtime\ |
| if self.combined_summary.total_runtime > 0 else 0.0 |
| |
| comparison_row[RSTD] = row[STDDEV_TIME] / row[AVG_TIME]\ |
| if row[AVG_TIME] > 0 else 0.0 |
| |
| comparison_row[BASELINE_AVG] = ref_row[AVG_TIME] |
| |
| comparison_row[DELTA_AVG] = calculate_change( |
| row[AVG_TIME], ref_row[AVG_TIME]) |
| |
| comparison_row[BASELINE_MAX] = ref_row[MAX_TIME] |
| |
| comparison_row[DELTA_MAX] = calculate_change( |
| row[MAX_TIME], ref_row[MAX_TIME]) |
| |
| self.rows.append(comparison_row) |
| else: |
| self.error_str = 'Execution summary structures are different' |
| |
| def __str__(self): |
| """Construct a PrettyTable containing the comparison""" |
| if self.for_variability: |
| return str(self.__build_table_variability()) |
| else: |
| return str(self.__build_table()) |
| |
| def __build_rows_variability(self): |
| if self.ref_combined_summary and self.combined_summary.is_same_schema( |
| self.ref_combined_summary): |
| for i, row in enumerate(self.combined_summary.rows): |
| ref_row = self.ref_combined_summary.rows[i] |
| comparison_row = {} |
| comparison_row[OPERATOR] = row[OPERATOR] |
| |
| comparison_row[PERCENT_OF_QUERY] = row[AVG_TIME] /\ |
| self.combined_summary.total_runtime\ |
| if self.combined_summary.total_runtime > 0 else 0.0 |
| |
| comparison_row[RSTD] = row[STDDEV_TIME] / row[AVG_TIME]\ |
| if row[AVG_TIME] > 0 else 0.0 |
| |
| comparison_row[REF_RSTD] = ref_row[STDDEV_TIME] / ref_row[AVG_TIME]\ |
| if ref_row[AVG_TIME] > 0 else 0.0 |
| |
| comparison_row[DELTA_RSTD] = calculate_change( |
| comparison_row[RSTD], comparison_row[REF_RSTD]) |
| |
| comparison_row[NUM_HOSTS] = row[NUM_HOSTS] |
| comparison_row[NUM_INSTANCES] = row[NUM_INSTANCES] |
| comparison_row[NUM_ROWS] = row[NUM_ROWS] |
| comparison_row[EST_NUM_ROWS] = row[EST_NUM_ROWS] |
| |
| self.rows.append(comparison_row) |
| else: |
| self.error_str = 'Execution summary structures are different' |
| |
| def __build_table_variability(self): |
| |
| def is_significant(row): |
| """Check if the performance change in the row was significant""" |
| return options.output_all_summary_nodes or ( |
| row[RSTD] > 0.1 and row[PERCENT_OF_QUERY] > 0.02) |
| |
| self.__build_rows_variability() |
| |
| if self.error_str: |
| # If the summary comparison could not be constructed, output both summaries |
| output = self.error_str + '\n' |
| output += 'Execution Summary: \n' |
| output += str(self.combined_summary) + '\n' |
| output += 'Reference Execution Summary: \n' |
| output += str(self.ref_combined_summary) |
| return output |
| |
| table = prettytable.PrettyTable( |
| ['Operator', |
| '% of Query', |
| 'StdDev(%)', |
| 'Base StdDev(%)', |
| 'Delta(StdDev(%))', |
| '#Hosts', |
| '#Inst', |
| '#Rows', |
| 'Est #Rows']) |
| table.align = 'l' |
| table.float_format = '.2' |
| table_contains_at_least_one_row = False |
| for row in filter(lambda row: is_significant(row), self.rows): |
| table_row = [row[OPERATOR], |
| '{0:.2%}'.format(row[PERCENT_OF_QUERY]), |
| '{0:.2%}'.format(row[RSTD]), |
| '{0:.2%}'.format(row[REF_RSTD]), |
| '{0:+.2%}'.format(row[DELTA_RSTD]), |
| prettyprint_values(row[NUM_HOSTS]), |
| prettyprint_values(row[NUM_INSTANCES]), |
| prettyprint_values(row[NUM_ROWS]), |
| prettyprint_values(row[EST_NUM_ROWS]) ] |
| |
| table_contains_at_least_one_row = True |
| table.add_row(table_row) |
| |
| if table_contains_at_least_one_row: |
| return str(table) + '\n' |
| else: |
| return 'No Nodes with significant StdDev %\n' |
| |
| def __build_table(self): |
| |
| def is_significant(row): |
| """Check if the performance change in the row was significant""" |
| return options.output_all_summary_nodes or ( |
| row[MAX_TIME] > 100000000 and |
| row[PERCENT_OF_QUERY] > 0.02) |
| |
| self.__build_rows() |
| if self.error_str: |
| # If the summary comparison could not be constructed, output both summaries |
| output = self.error_str + '\n' |
| output += 'Execution Summary: \n' |
| output += str(self.combined_summary) + '\n' |
| output += 'Reference Execution Summary: \n' |
| output += str(self.ref_combined_summary) |
| return output |
| |
| table = prettytable.PrettyTable( |
| ['Operator', |
| '% of Query', |
| 'Avg', |
| 'Base Avg', |
| 'Delta(Avg)', |
| 'StdDev(%)', |
| 'Max', |
| 'Base Max', |
| 'Delta(Max)', |
| '#Hosts', |
| '#Inst', |
| '#Rows', |
| 'Est #Rows']) |
| table.align = 'l' |
| table.float_format = '.2' |
| |
| for row in self.rows: |
| if is_significant(row): |
| |
| table_row = [row[OPERATOR], |
| '{0:.2%}'.format(row[PERCENT_OF_QUERY]), |
| prettyprint_time(row[AVG_TIME]), |
| prettyprint_time(row[BASELINE_AVG]), |
| prettyprint_percent(row[DELTA_AVG]), |
| ('* {0:.2%} *' if row[RSTD] > 0.1 else ' {0:.2%} ').format(row[RSTD]), |
| prettyprint_time(row[MAX_TIME]), |
| prettyprint_time(row[BASELINE_MAX]), |
| prettyprint_percent(row[DELTA_MAX]), |
| prettyprint_values(row[NUM_HOSTS]), |
| prettyprint_values(row[NUM_INSTANCES]), |
| prettyprint_values(row[NUM_ROWS]), |
| prettyprint_values(row[EST_NUM_ROWS])] |
| |
| table.add_row(table_row) |
| |
| return str(table) |
| |
| def calculate_change(val, ref_val): |
| """Calculate how big the change in val compared to ref_val is compared to total""" |
| return (val - ref_val) / ref_val if ref_val != 0 else 0.0 |
| |
| def prettyprint(val, units, divisor): |
| """ Print a value in human readable format along with it's unit. |
| |
| We start at the leftmost unit in the list and keep dividing the value by divisor until |
| the value is less than divisor. The value is then printed along with the unit type. |
| |
| Args: |
| val (int or float): Value to be printed. |
| units (list of str): Unit names for different sizes. |
| divisor (float): ratio between two consecutive units. |
| """ |
| for unit in units: |
| if abs(val) < divisor: |
| if unit == units[0]: |
| return "%d%s" % (val, unit) |
| else: |
| return "%3.2f%s" % (val, unit) |
| val /= divisor |
| |
| def prettyprint_bytes(byte_val): |
| return prettyprint(byte_val, ['B', 'KB', 'MB', 'GB', 'TB'], 1024.0) |
| |
| def prettyprint_values(unit_val): |
| return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) |
| |
| def prettyprint_time(time_val): |
| return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) |
| |
| def prettyprint_percent(percent_val): |
| return '{0:+.2%}'.format(percent_val) |
| |
| |
| def save_runtime_diffs(results, ref_results, change_significant, zval, tval): |
| """Given results and reference results, generate and output an HTML file |
| containing the Runtime Profile diff. |
| """ |
| |
| diff = difflib.HtmlDiff(wrapcolumn=90, linejunk=difflib.IS_LINE_JUNK) |
| |
| # We are comparing last queries in each run because they should have the most |
| # stable performance (unlike the first queries) |
| runtime_profile = results[RESULT_LIST][-1][RUNTIME_PROFILE] |
| ref_runtime_profile = ref_results[RESULT_LIST][-1][RUNTIME_PROFILE] |
| |
| template = ('{prefix}-{query_name}-{scale_factor}-{file_format}-{compression_codec}' |
| '-{compression_type}-runtime_profile.html') |
| |
| query = results[RESULT_LIST][-1][QUERY] |
| |
| # Neutral - no improvement or regression |
| prefix = 'neu' |
| if change_significant: |
| prefix = '???' |
| if zval >= 0 and tval >= 0: |
| prefix = 'reg' |
| elif zval <= 0 and tval <= 0: |
| prefix = 'imp' |
| |
| runtime_profile_file_name = template.format( |
| prefix = prefix, |
| query_name = query[NAME], |
| scale_factor = query[SCALE_FACTOR], |
| file_format = query[TEST_VECTOR][FILE_FORMAT], |
| compression_codec = query[TEST_VECTOR][COMPRESSION_CODEC], |
| compression_type = query[TEST_VECTOR][COMPRESSION_TYPE]) |
| |
| # Go into results dir |
| dir_path = os.path.join(os.environ["IMPALA_HOME"], 'results') |
| |
| if not os.path.exists(dir_path): |
| os.mkdir(dir_path) |
| elif not os.path.isdir(dir_path): |
| raise RuntimeError("Unable to create $IMPALA_HOME/results, results file exists") |
| |
| runtime_profile_file_path = os.path.join(dir_path, runtime_profile_file_name) |
| |
| runtime_profile_diff = diff.make_file( |
| ref_runtime_profile.splitlines(), |
| runtime_profile.splitlines(), |
| fromdesc = "Baseline Runtime Profile", |
| todesc = "Current Runtime Profile") |
| |
| with open(runtime_profile_file_path, 'w+') as f: |
| f.write(runtime_profile_diff) |
| |
| |
| def build_exec_summary_str(results, ref_results, for_variability=False): |
| # There is no summary available for Hive after query execution |
| # Metadata queries don't have execution summary |
| exec_summaries = [result[EXEC_SUMMARY] for result in results[RESULT_LIST]] |
| |
| if options.hive_results or exec_summaries[0] is None: |
| return "" |
| |
| combined_summary = CombinedExecSummaries(exec_summaries) |
| |
| if ref_results is None: |
| ref_exec_summaries = None |
| ref_combined_summary = None |
| else: |
| ref_exec_summaries = [result[EXEC_SUMMARY] |
| for result in ref_results[RESULT_LIST]] |
| ref_combined_summary = CombinedExecSummaries(ref_exec_summaries) |
| |
| comparison = ExecSummaryComparison( |
| combined_summary, ref_combined_summary, for_variability) |
| |
| return str(comparison) + '\n' |
| |
| |
| def build_summary_header(current_impala_version, ref_impala_version): |
| summary = "Report Generated on {0}\n".format(date.today()) |
| if options.report_description: |
| summary += 'Run Description: {0}\n'.format(options.report_description) |
| if options.cluster_name: |
| summary += '\nCluster Name: {0}\n'.format(options.cluster_name) |
| if options.lab_run_info: |
| summary += 'Lab Run Info: {0}\n'.format(options.lab_run_info) |
| if not options.hive_results: |
| summary += 'Impala Version: {0}\n'.format(current_impala_version) |
| summary += 'Baseline Impala Version: {0}\n'.format(ref_impala_version) |
| return summary |
| |
| |
| if __name__ == "__main__": |
| """Workflow: |
| 1. Build a nested dictionary for the current result JSON and reference result JSON. |
| 2. Calculate runtime statistics for each query for both results and reference results. |
| 5. Save performance statistics to the performance database. |
| 3. Construct a string with a an overview of workload runtime and detailed performance |
| comparison for queries with significant performance change. |
| """ |
| |
| logging.basicConfig(level=logging.DEBUG if options.verbose else logging.INFO) |
| |
| # Generate a dictionary based on the JSON file |
| grouped = get_dict_from_json(options.result_file) |
| |
| try: |
| # Generate a dictionary based on the reference JSON file |
| ref_grouped = get_dict_from_json(options.reference_result_file) |
| except Exception as e: |
| # If reference result file could not be read we can still continue. The result can |
| # be saved to the performance database. |
| LOG.error('Could not read reference result file: {0}'.format(e)) |
| ref_grouped = None |
| |
| report = Report(grouped, ref_grouped) |
| |
| ref_impala_version = 'N/A' |
| if options.hive_results: |
| current_impala_version = 'N/A' |
| else: |
| current_impala_version = get_impala_version(grouped) |
| if ref_grouped: |
| ref_impala_version = get_impala_version(ref_grouped) |
| |
| print build_summary_header(current_impala_version, ref_impala_version) |
| print report |