blob: 6cba3859096efa58cf55e1c6a7a428ed1a25da62 [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Utility functions used by the stress test to save and load runtime info
# about queries to and from JSON files.
from collections import defaultdict
import json
import logging
import os
import sys
from tests.stress.queries import Query
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
# The version of the file format containing the collected query runtime info.
RUNTIME_INFO_FILE_VERSION = 3
def save_runtime_info(path, query, impala):
"""Updates the file at 'path' with the given query information."""
store = None
if os.path.exists(path):
with open(path) as file:
store = json.load(file)
_check_store_version(store)
if not store:
store = {
"host_names": list(), "db_names": dict(), "version": RUNTIME_INFO_FILE_VERSION}
with open(path, "w+") as file:
store["host_names"] = sorted([i.host_name for i in impala.impalads])
queries = store["db_names"].get(query.db_name, dict())
query_by_options = queries.get(query.sql, dict())
query_by_options[str(sorted(query.options.items()))] = query
queries[query.sql] = query_by_options
store["db_names"][query.db_name] = queries
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
data = dict(obj.__dict__)
# Queries are stored by sql, so remove the duplicate data. Also don't store
# profiles as JSON values, but instead separately.
for k in ("sql", "solo_runtime_profile_with_spilling",
"solo_runtime_profile_without_spilling"):
if k in data:
del data[k]
return data
json.dump(
store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
def load_runtime_info(path, impala=None):
"""Reads the query runtime information at 'path' and returns a
dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
instance do not match the data in 'path'.
"""
queries_by_db_and_sql = defaultdict(lambda: defaultdict(dict))
if not os.path.exists(path):
return queries_by_db_and_sql
with open(path) as file:
store = json.load(file)
_check_store_version(store)
if (
impala and
store.get("host_names") != sorted([i.host_name for i in impala.impalads])
):
return queries_by_db_and_sql
for db_name, queries_by_sql in store["db_names"].iteritems():
for sql, queries_by_options in queries_by_sql.iteritems():
for options, json_query in queries_by_options.iteritems():
query = Query()
query.__dict__.update(json_query)
query.sql = sql
queries_by_db_and_sql[db_name][sql][options] = query
return queries_by_db_and_sql
def _check_store_version(store):
"""Clears 'store' if the version is too old or raises an error if the version is too
new.
"""
if store["version"] < RUNTIME_INFO_FILE_VERSION:
LOG.warn("Runtime file info version is old and will be ignored")
store.clear()
elif store["version"] > RUNTIME_INFO_FILE_VERSION:
raise Exception(
"Unexpected runtime file info version %s expected %s"
% (store["version"], RUNTIME_INFO_FILE_VERSION))
def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
# TODO: Provide a way to call this from the CLI. This was hard coded to run from main()
# when it was used.
print(",".join([
"Database", "Query",
"Old Mem MB w/Spilling",
"New Mem MB w/Spilling",
"Diff %",
"Old Runtime w/Spilling",
"New Runtime w/Spilling",
"Diff %",
"Old Mem MB wout/Spilling",
"New Mem MB wout/Spilling",
"Diff %",
"Old Runtime wout/Spilling",
"New Runtime wout/Spilling",
"Diff %"]))
for db_name, old_queries in old_runtime_info.iteritems():
new_queries = new_runtime_info.get(db_name)
if not new_queries:
continue
for sql, old_query in old_queries.iteritems():
new_query = new_queries.get(sql)
if not new_query:
continue
sys.stdout.write(old_query["db_name"])
sys.stdout.write(",")
sys.stdout.write(old_query["name"])
sys.stdout.write(",")
for attr in [
"required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
"required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"
]:
old_value = old_query[attr]
sys.stdout.write(str(old_value))
sys.stdout.write(",")
new_value = new_query[attr]
sys.stdout.write(str(new_value))
sys.stdout.write(",")
if old_value and new_value is not None:
sys.stdout.write("%0.2f%%" % (100 * float(new_value - old_value) / old_value))
else:
sys.stdout.write("N/A")
sys.stdout.write(",")
print()