blob: 62b703886120cbb739913a0c8d503926fe4865cf [file] [log] [blame]
#!/usr/bin/env impala-python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This module implements helpers for representing queries to be executed by the
# stress test, loading them and generating them.
import logging
import os
from textwrap import dedent
from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
from tests.comparison.query_generator import QueryGenerator
from tests.comparison.query_profile import DefaultProfile
from tests.comparison.model_translator import SqlWriter
from tests.util.parse_util import match_memory_estimate, parse_mem_to_mb
import tests.util.test_file_parser as test_file_parser
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
class QueryType(object):
class Query(object):
"""Contains a SQL statement along with expected runtime information.
This class is used as a struct, with the fields filled out by calling
def __init__(self): = None
self.sql = None
# In order to be able to make good estimates for DML queries in the binary search,
# we need to bring the table to a good initial state before excuting the sql. Running
# set_up_sql accomplishes this task.
self.set_up_sql = None
self.db_name = None
self.result_hash = None
self.required_mem_mb_with_spilling = None
self.required_mem_mb_without_spilling = None
self.solo_runtime_profile_with_spilling = None
self.solo_runtime_profile_without_spilling = None
self.solo_runtime_secs_with_spilling = None
self.solo_runtime_secs_without_spilling = None
# Query options to set before running the query.
self.options = {}
# Determines the order in which we will populate query runtime info. Queries with the
# lowest population_order property will be handled first.
self.population_order = 0
# Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
self.query_type = QueryType.SELECT
self._logical_query_id = None
def __repr__(self):
return dedent("""
Mem: %(required_mem_mb_with_spilling)s
Mem no-spilling: %(required_mem_mb_without_spilling)s
Solo Runtime: %(solo_runtime_secs_with_spilling)s
Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
DB: %(db_name)s
Options: %(options)s
Set up SQL: %(set_up_sql)s>
SQL: %(sql)s>
Population order: %(population_order)r>
""".strip() % self.__dict__)
def logical_query_id(self):
Return a meanginful unique str identifier for the query.
Example: "tpcds_300_decimal_parquet_q21"
if self._logical_query_id is None:
self._logical_query_id = '{0}_{1}'.format(self.db_name,
return self._logical_query_id
def write_runtime_info_profiles(self, directory):
"""Write profiles for spilling and non-spilling into directory (str)."""
profiles_to_write = [
(self.logical_query_id + "_profile_with_spilling.txt",
(self.logical_query_id + "_profile_without_spilling.txt",
for filename, profile in profiles_to_write:
if profile is None:
LOG.debug("No profile recorded for {0}".format(filename))
with open(os.path.join(directory, filename), "w") as fh:
def load_tpc_queries(workload):
"""Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'.""""Loading %s queries", workload)
queries = []
for query_name, query_sql in test_file_parser.load_tpc_queries(workload,
query = Query() = query_name
query.sql = query_sql
return queries
def load_queries_from_test_file(file_path, db_name=None):
LOG.debug("Loading queries from %s", file_path)
test_cases = test_file_parser.parse_query_test_file(file_path)
queries = list()
for test_case in test_cases:
query = Query()
query.sql = test_file_parser.remove_comments(test_case["QUERY"])
query.db_name = db_name
return queries
def generate_compute_stats_queries(cursor):
"""For each table in the database that cursor is connected to, generate several compute
stats queries. Each query will have a different value for the MT_DOP query option.
""""Generating Compute Stats queries")
tables = [cursor.describe_table(t) for t in cursor.list_table_names()
if not t.endswith("_original")]
result = []
mt_dop_values = [str(2**k) for k in range(5)]
for table in tables:
for mt_dop_value in mt_dop_values:
compute_query = Query()
compute_query.population_order = 1
compute_query.query_type = QueryType.COMPUTE_STATS
compute_query.sql = "COMPUTE STATS {0}".format(
compute_query.options["MT_DOP"] = mt_dop_value
compute_query.db_name = cursor.db_name = "compute_stats_{0}_mt_dop_{1}".format(, compute_query.options["MT_DOP"])
LOG.debug("Added compute stats query: {0}".format(compute_query))
return result
def generate_DML_queries(cursor, dml_mod_values):
"""Generate insert, upsert, update, delete DML statements.
For each table in the database that cursor is connected to, create 4 DML queries
(insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
controls which rows will be affected. The generated queries assume that for each table
in the database, there exists a table with a '_original' suffix that is never modified.
This function has some limitations:
1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
2. Requires that the type of the first column of the primary key is an integer type.
""""Generating DML queries")
tables = [cursor.describe_table(t) for t in cursor.list_table_names()
if not t.endswith("_original")]
result = []
for table in tables:
if not table.primary_keys:
# Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
# table.
LOG.debug("Skipping table '{0}' because it has no primary keys.".format(
if len(table.primary_keys) > 1:
# TODO(IMPALA-4665): Add support for tables with multiple primary keys.
LOG.debug("Skipping table '{0}' because it has more than "
"1 primary key column.".format(
primary_key = table.primary_keys[0]
if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
# We want to be able to apply the modulo operation on the primary key. If the
# the first primary key column happens to not be an integer, we will skip
# generating queries for this table
LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
"primary key is not an integer.".format(,
for mod_value in dml_mod_values:
# Insert
insert_query = Query()
# Populate runtime info for Insert and Upsert queries before Update and Delete
# queries because tables remain in original state after running the Insert and
# Upsert queries. During the binary search in runtime info population for the
# Insert query, we first delete some rows and then reinsert them, so the table
# remains in the original state. For the delete, the order is reversed, so the table
# is not in the original state after running the the delete (or update) query. This
# is why population_order is smaller for Insert and Upsert queries.
insert_query.population_order = 1
insert_query.query_type = QueryType.INSERT = "insert_{0}".format(
insert_query.db_name = cursor.db_name
insert_query.sql = (
"INSERT INTO TABLE {0} SELECT * FROM {0}_original "
"WHERE {1} % {2} = 0").format(,, mod_value)
# Upsert
upsert_query = Query()
upsert_query.population_order = 1
upsert_query.query_type = QueryType.UPSERT = "upsert_{0}".format(
upsert_query.db_name = cursor.db_name
upsert_query.sql = (
"FROM {0}_original WHERE {1} % {2} = 0").format(,, mod_value)
# Update
update_query = Query()
update_query.population_order = 2
update_query.query_type = QueryType.UPDATE = "update_{0}".format(
update_query.db_name = cursor.db_name
update_list = ', '.join(
'a.{0} = b.{0}'.format(
for col in table.cols if not col.is_primary_key)
update_query.sql = (
"UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
"ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(,, mod_value=mod_value,
# Delete
delete_query = Query()
delete_query.population_order = 2
delete_query.query_type = QueryType.DELETE = "delete_{0}".format(
delete_query.db_name = cursor.db_name
delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(,, mod_value)
if + "_original" in set( for table in tables):
insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(,, mod_value)
upsert_query.set_up_sql = insert_query.set_up_sql
update_query.set_up_sql = (
"UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
"WHERE {1} % {2} = 0").format(,, mod_value)
delete_query.set_up_sql = update_query.set_up_sql
LOG.debug("Added insert query: {0}".format(insert_query))
LOG.debug("Added update query: {0}".format(update_query))
LOG.debug("Added upsert query: {0}".format(upsert_query))
LOG.debug("Added delete query: {0}".format(delete_query))
assert len(result) > 0, "No DML queries were added."
return result
def generate_random_queries(impala, random_db):
"""Generator function to produce random queries. 'impala' is the Impala service
object. random_db is the name of the database that queries should be
generated for."""
with impala.cursor(db_name=random_db) as cursor:
tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
query_generator = QueryGenerator(DefaultProfile())
model_translator = SqlWriter.create()
while True:
query_model = query_generator.generate_statement(tables)
sql = model_translator.write_query(query_model)
query = Query()
query.sql = sql
query.db_name = random_db
yield query
def estimate_query_mem_mb_usage(query, impalad_conn):
"""Runs an explain plan then extracts and returns the estimated memory needed to run
the query.
with impalad_conn.cursor() as cursor:
LOG.debug("Using %s database", query.db_name)
if query.db_name:
cursor.execute('USE ' + query.db_name)
if query.query_type == QueryType.COMPUTE_STATS:
# Running "explain" on compute stats is not supported by Impala.
LOG.debug("Explaining query\n%s", query.sql)
cursor.execute('EXPLAIN ' + query.sql)
explain_rows = cursor.fetchall()
explain_lines = [row[0] for row in explain_rows]
mem_limit, units = match_memory_estimate(explain_lines)
return parse_mem_to_mb(mem_limit, units)