blob: 524cf07fffa1498dcb7837b428b8b177b158ef65 [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from multiprocessing import Value
import os
import re
from textwrap import dedent
from time import sleep, time
from sys import maxint
from tests.stress.queries import QueryType
from tests.stress.util import create_and_start_daemon_thread, increment
from tests.util.thrift_util import op_handle_to_query_id
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
# Metrics collected during the stress running process.
NUM_QUERIES_DEQUEUED = "num_queries_dequeued"
# The number of queries that were submitted to a query runner.
NUM_QUERIES_SUBMITTED = "num_queries_submitted"
# The number of queries that have entered the RUNNING state (i.e. got through Impala's
# admission control and started executing) or were cancelled or hit an error.
NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED = "num_queries_started_running_or_cancelled"
NUM_QUERIES_FINISHED = "num_queries_finished"
NUM_QUERIES_EXCEEDED_MEM_LIMIT = "num_queries_exceeded_mem_limit"
NUM_QUERIES_AC_REJECTED = "num_queries_ac_rejected"
NUM_QUERIES_AC_TIMEDOUT = "num_queries_ac_timedout"
NUM_QUERIES_CANCELLED = "num_queries_cancelled"
NUM_RESULT_MISMATCHES = "num_result_mismatches"
NUM_OTHER_ERRORS = "num_other_errors"
class CancelMechanism:
"""Pseudo-enum to specify how a query should be cancelled."""
# Cancel via the stress test client issuing an RPC to the server.
VIA_CLIENT = "CANCEL_VIA_CLIENT"
# Cancel by setting the EXEC_TIME_LIMIT_S query option.
VIA_OPTION = "CANCEL_VIA_OPTION"
ALL_MECHS = [VIA_CLIENT, VIA_OPTION]
RESULT_HASHES_DIR = "result_hashes"
class QueryTimeout(Exception):
pass
class QueryRunner(object):
"""Encapsulates functionality to run a query and provide a runtime report."""
SPILLED_PATTERNS = [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
BATCH_SIZE = 1024
def __init__(self, impalad, results_dir, use_kerberos, common_query_options,
test_admission_control, check_if_mem_was_spilled=False):
"""Creates a new instance, but does not start the process. """
self.impalad = impalad
self.use_kerberos = use_kerberos
self.results_dir = results_dir
self.check_if_mem_was_spilled = check_if_mem_was_spilled
self.common_query_options = common_query_options
self.test_admission_control = test_admission_control
# proc is filled out by caller
self.proc = None
# impalad_conn is initialised in connect()
self.impalad_conn = None
# All these values are shared values between processes. We want these to be accessible
# by the parent process that started this QueryRunner, for operational purposes.
self._metrics = {
NUM_QUERIES_DEQUEUED: Value("i", 0),
NUM_QUERIES_SUBMITTED: Value("i", 0),
NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED: Value("i", 0),
NUM_QUERIES_FINISHED: Value("i", 0),
NUM_QUERIES_EXCEEDED_MEM_LIMIT: Value("i", 0),
NUM_QUERIES_AC_REJECTED: Value("i", 0),
NUM_QUERIES_AC_TIMEDOUT: Value("i", 0),
NUM_QUERIES_CANCELLED: Value("i", 0),
NUM_RESULT_MISMATCHES: Value("i", 0),
NUM_OTHER_ERRORS: Value("i", 0)}
def connect(self):
"""Connect to the server and start the query runner thread."""
self.impalad_conn = self.impalad.impala.connect(impalad=self.impalad)
def run_query(self, query, mem_limit_mb, run_set_up=False,
timeout_secs=maxint, cancel_mech=None, retain_profile=False):
"""Run a query and return an execution report. If 'run_set_up' is True, set up sql
will be executed before the main query. This should be the case during the binary
search phase of the stress test. 'cancel_mech' is optionally a CancelMechanism
value that should be used to cancel the query after timeout_secs.
If 'cancel_mech' is provided, don't get the query profile for timed out queries
because the query was purposely cancelled, rather than having some problem that needs
investigation.
"""
assert self.impalad_conn, "connect() must be called before run_query()"
assert cancel_mech is None or cancel_mech in CancelMechanism.ALL_MECHS
timeout_unix_time = time() + timeout_secs
report = QueryReport(query)
try:
with self.impalad_conn.cursor() as cursor:
start_time = time()
self._set_db_and_options(cursor, query, run_set_up, mem_limit_mb, timeout_secs,
cancel_mech)
error = None
try:
cursor.execute_async(
"/* Mem: %s MB. Coordinator: %s. */\n"
% (mem_limit_mb, self.impalad.host_name) + query.sql)
report.query_id = op_handle_to_query_id(cursor._last_operation.handle if
cursor._last_operation else None)
LOG.debug("Query id is %s", report.query_id)
self._wait_until_fetchable(cursor, report, timeout_unix_time)
if query.query_type == QueryType.SELECT:
report.result_hash = self._fetch_and_hash_result(cursor, timeout_unix_time,
query)
else:
# If query is in error state, this will raise an exception
cursor._wait_to_finish()
if (retain_profile or
query.result_hash and report.result_hash != query.result_hash):
fetch_and_set_profile(cursor, report)
except QueryTimeout:
if not cancel_mech:
fetch_and_set_profile(cursor, report)
# Cancel from this client if a) we hit the timeout unexpectedly or b) we're
# deliberately cancelling the query via the client.
if not cancel_mech or cancel_mech == CancelMechanism.VIA_CLIENT:
self._cancel(cursor, report)
else:
# Wait until the query is cancelled by a different mechanism.
self._wait_until_cancelled(cursor, report.query_id)
report.timed_out = True
return report
except Exception as error:
report.query_id = op_handle_to_query_id(cursor._last_operation.handle if
cursor._last_operation else None)
LOG.debug("Error running query with id %s: %s", report.query_id, error)
if (cancel_mech == CancelMechanism.VIA_OPTION and
is_exec_time_limit_error(error)):
# Timeout via query option counts as a timeout.
report.timed_out = True
else:
self._check_for_memory_errors(report, cursor, error)
if report.has_query_error():
return report
report.runtime_secs = time() - start_time
if cursor.execution_failed() or self.check_if_mem_was_spilled:
fetch_and_set_profile(cursor, report)
report.mem_was_spilled = any([
pattern.search(report.profile) is not None
for pattern in QueryRunner.SPILLED_PATTERNS])
# TODO: is this needed? Memory errors are generally caught by the try/except.
report.not_enough_memory = "Memory limit exceeded" in report.profile
except Exception as error:
# A mem limit error would have been caught above, no need to check for that here.
LOG.debug("Caught error", error)
report.other_error = error
return report
def _set_db_and_options(self, cursor, query, run_set_up, mem_limit_mb, timeout_secs,
cancel_mech):
"""Set up a new cursor for running a query by switching to the correct database and
setting query options."""
if query.db_name:
LOG.debug("Using %s database", query.db_name)
cursor.execute("USE %s" % query.db_name)
if run_set_up and query.set_up_sql:
LOG.debug("Running set up query:\n%s", query.set_up_sql)
cursor.execute(query.set_up_sql)
for query_option, value in self.common_query_options.iteritems():
cursor.execute(
"SET {query_option}={value}".format(query_option=query_option, value=value))
for query_option, value in query.options.iteritems():
cursor.execute(
"SET {query_option}={value}".format(query_option=query_option, value=value))
# Set a time limit if it is the expected method of cancellation, or as an additional
# method of cancellation if the query runs for too long. This is useful if, e.g.
# if the client is blocked in a fetch() call and can't initiate the cancellation.
if not cancel_mech or cancel_mech == CancelMechanism.VIA_OPTION:
# EXEC_TIME_LIMIT_S is an int32, so we need to cap the value.
cursor.execute("SET EXEC_TIME_LIMIT_S={timeout_secs}".format(
timeout_secs=min(timeout_secs, 2**31 - 1)))
cursor.execute("SET ABORT_ON_ERROR=1")
if self.test_admission_control:
LOG.debug(
"Running query without mem limit at %s with timeout secs %s:\n%s",
self.impalad.host_name, timeout_secs, query.sql)
else:
LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
LOG.debug(
"Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
def _wait_until_fetchable(self, cursor, report, timeout_unix_time):
"""Wait up until timeout_unix_time until the query results can be fetched (if it's
a SELECT query) or until it has finished executing (if it's a different query type
like DML). If the timeout expires raises a QueryTimeout exception."""
# Loop until the query gets to the right state or a timeout expires.
sleep_secs = 0.1
secs_since_log = 0
# True if we incremented num_queries_started_running_or_cancelled for this query.
started_running_or_cancelled = False
while True:
query_state = cursor.status()
# Check if the query got past the PENDING/INITIALIZED states, either because
# it's executing or hit an error.
if (not started_running_or_cancelled and query_state not in ('PENDING_STATE',
'INITIALIZED_STATE')):
started_running_or_cancelled = True
increment(self._metrics[NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED])
# Return if we're ready to fetch results (in the FINISHED state) or we are in
# another terminal state like EXCEPTION.
if query_state not in ('PENDING_STATE', 'INITIALIZED_STATE', 'RUNNING_STATE'):
return
if time() > timeout_unix_time:
if not started_running_or_cancelled:
increment(self._metrics[NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED])
raise QueryTimeout()
if secs_since_log > 5:
secs_since_log = 0
LOG.debug("Waiting for query to execute")
sleep(sleep_secs)
secs_since_log += sleep_secs
def _wait_until_cancelled(self, cursor, query_id):
"""Wait until 'cursor' is in a CANCELED or ERROR status."""
last_log_time = 0
status = cursor.status()
while status not in ["CANCELED_STATE", "ERROR_STATE"]:
# Work around IMPALA-7561: queries don't transition out of FINISHED state once they
# hit eos.
if status == "FINISHED_STATE" and "Last row fetched" in cursor.get_profile():
return
if time() - last_log_time > 5:
LOG.debug("Waiting for query with id {query_id} to be cancelled".format(
query_id=query_id))
last_log_time = time()
sleep(0.1)
status = cursor.status()
def update_from_query_report(self, report):
LOG.debug("Updating runtime stats (Query Runner PID: {0})".format(self.proc.pid))
increment(self._metrics[NUM_QUERIES_FINISHED])
if report.not_enough_memory:
increment(self._metrics[NUM_QUERIES_EXCEEDED_MEM_LIMIT])
if report.ac_rejected:
increment(self._metrics[NUM_QUERIES_AC_REJECTED])
if report.ac_timedout:
increment(self._metrics[NUM_QUERIES_AC_TIMEDOUT])
if report.was_cancelled:
increment(self._metrics[NUM_QUERIES_CANCELLED])
def _cancel(self, cursor, report):
if not report.query_id:
return
try:
LOG.debug("Attempting cancellation of query with id %s", report.query_id)
cursor.cancel_operation()
LOG.debug("Sent cancellation request for query with id %s", report.query_id)
except Exception as e:
LOG.debug("Error cancelling query with id %s: %s", report.query_id, e)
try:
LOG.debug("Attempting to cancel query through the web server.")
self.impalad.cancel_query(report.query_id)
except Exception as e:
LOG.debug("Error cancelling query %s through the web server: %s",
report.query_id, e)
def _check_for_memory_errors(self, report, cursor, caught_exception):
"""To be called after a query failure to check for signs of failed due to a
mem limit or admission control rejection/timeout. The report will be updated
accordingly.
"""
fetch_and_set_profile(cursor, report)
caught_msg = str(caught_exception).lower().strip()
# Distinguish error conditions based on string fragments. The AC rejection and
# out-of-memory conditions actually overlap (since some memory checks happen in
# admission control) so check the out-of-memory conditions first.
if "memory limit exceeded" in caught_msg or \
"repartitioning did not reduce the size of a spilled partition" in caught_msg or \
"failed to get minimum memory reservation" in caught_msg or \
"minimum memory reservation is greater than" in caught_msg or \
"minimum memory reservation needed is greater than" in caught_msg:
report.not_enough_memory = True
return
if "rejected query from pool" in caught_msg:
report.ac_rejected = True
return
if "admission for query exceeded timeout" in caught_msg:
report.ac_timedout = True
return
LOG.debug("Non-mem limit error for query with id %s: %s", report.query_id,
caught_exception, exc_info=True)
report.other_error = caught_exception
def _fetch_and_hash_result(self, cursor, timeout_unix_time, query):
"""Fetches results from 'cursor' and returns a hash that is independent of row order.
Raises QueryTimeout() if we couldn't fetch all rows from the query before time()
reaches 'timeout_unix_time'.
'query' is only used for debug logging purposes (if the result is not as expected a
log file will be left in RESULTS_DIR for investigation).
"""
query_id = op_handle_to_query_id(cursor._last_operation.handle if
cursor._last_operation else None)
result_log_filename = None
curr_hash = 1
try:
with self._open_result_file(query, query_id) as result_log:
result_log.write(query.sql)
result_log.write("\n")
result_log_filename = result_log.name
while True:
# Check for timeout before each fetch call. Fetching can block indefinitely,
# e.g. if the query is a selective scan, so we may miss the timeout, but we
# will live with that limitation for now.
# TODO: IMPALA-8289: use a non-blocking fetch when support is added
if time() >= timeout_unix_time:
LOG.debug("Hit timeout before fetch for query with id {0} {1} >= {2}".format(
query_id, time(), timeout_unix_time))
raise QueryTimeout()
LOG.debug("Fetching result for query with id {0}".format(query_id))
rows = cursor.fetchmany(self.BATCH_SIZE)
if not rows:
LOG.debug("No more results for query with id {0}".format(query_id))
break
for row in rows:
curr_hash = _add_row_to_hash(row, curr_hash)
if result_log:
result_log.write(",".join([str(val) for val in row]))
result_log.write("\thash=")
result_log.write(str(curr_hash))
result_log.write("\n")
except Exception:
# Don't retain result files for failed queries.
if result_log_filename is not None:
os.remove(result_log_filename)
raise
# Only retain result files with incorrect results.
if result_log_filename is not None and curr_hash == query.result_hash:
os.remove(result_log_filename)
return curr_hash
def _open_result_file(self, query, query_id):
"""Opens and returns a file under RESULT_HASHES_DIR to write query results to."""
file_name = '_'.join([query.logical_query_id, query_id.replace(":", "_")])
if query.result_hash is None:
file_name += "_initial"
file_name += "_results.txt"
result_log = open(os.path.join(self.results_dir, RESULT_HASHES_DIR, file_name),
"w")
return result_log
def get_metric_val(self, name):
"""Get the current value of the metric called 'name'."""
return self._metrics[name].value
def get_metric_vals(self):
"""Get the current values of the all metrics as a list of (k, v) pairs."""
return [(k, v.value) for k, v in self._metrics.iteritems()]
def increment_metric(self, name):
"""Increment the current value of the metric called 'name'."""
increment(self._metrics[name])
class QueryReport(object):
"""Holds information about a single query run."""
def __init__(self, query):
self.query = query
self.result_hash = None
self.runtime_secs = None
self.mem_was_spilled = False
# not_enough_memory includes conditions like "Memory limit exceeded", admission
# control rejecting because not enough memory, etc.
self.not_enough_memory = False
# ac_rejected is true if the query was rejected by admission control.
# It is mutually exclusive with not_enough_memory - if the query is rejected by
# admission control because the memory limit is too low, it is counted as
# not_enough_memory.
# TODO: reconsider whether they should be mutually exclusive
self.ac_rejected = False
self.ac_timedout = False
self.other_error = None
self.timed_out = False
self.was_cancelled = False
self.profile = None
self.query_id = None
def __str__(self):
return dedent("""
<QueryReport
result_hash: %(result_hash)s
runtime_secs: %(runtime_secs)s
mem_was_spilled: %(mem_was_spilled)s
not_enough_memory: %(not_enough_memory)s
ac_rejected: %(ac_rejected)s
ac_timedout: %(ac_timedout)s
other_error: %(other_error)s
timed_out: %(timed_out)s
was_cancelled: %(was_cancelled)s
query_id: %(query_id)s
>
""".strip() % self.__dict__)
def has_query_error(self):
"""Return true if any kind of error status was returned from the query (i.e.
the query didn't run to completion, time out or get cancelled)."""
return (self.not_enough_memory or self.ac_rejected or self.ac_timedout
or self.other_error)
def write_query_profile(self, directory, prefix=None):
"""
Write out the query profile bound to this object to a given directory.
The file name is generated and will contain the query ID. Use the optional prefix
parameter to set a prefix on the filename.
Example return:
tpcds_300_decimal_parquet_q21_00000001_a38c8331_profile.txt
Parameters:
directory (str): Directory to write profile.
prefix (str): Prefix for filename.
"""
if not (self.profile and self.query_id):
return
if prefix is not None:
file_name = prefix + '_'
else:
file_name = ''
file_name += self.query.logical_query_id + '_'
file_name += self.query_id.replace(":", "_") + "_profile.txt"
profile_log_path = os.path.join(directory, file_name)
with open(profile_log_path, "w") as profile_log:
profile_log.write(self.profile)
def _add_row_to_hash(row, curr_hash):
"""Hashes row and accumulates it with 'curr_hash'. The hash is invariant of the
order in which rows are added to the hash, since many queries we run through
the stress test do not have a canonical order in which they return rows."""
for idx, val in enumerate(row):
curr_hash += _hash_val(idx, val)
# Modulo the result to keep it "small" otherwise the math ops can be slow
# since python does infinite precision math.
curr_hash %= maxint
return curr_hash
def _hash_val(col_idx, val):
"""Compute a numeric hash of 'val', which is at column index 'col_idx'."""
if val is None:
# The hash() of None can change from run to run since it's based on
# a memory address. A chosen value will be used instead.
val = 38463209
elif isinstance(val, float):
# Floats returned by Impala may not be deterministic, the ending
# insignificant digits may differ. Only the first 6 digits will be used
# after rounding.
sval = "%f" % val
dot_idx = sval.find(".")
val = round(val, 6 - dot_idx)
return (col_idx + 1) * hash(val)
def fetch_and_set_profile(cursor, report):
"""Set the report's query profile using the given cursor.
Producing a query profile can be somewhat expensive. A v-tune profile of
impalad showed 10% of cpu time spent generating query profiles.
"""
if not report.profile and cursor._last_operation:
try:
report.profile = cursor.get_profile()
except Exception as e:
LOG.debug("Error getting profile for query with id %s: %s", report.query_id, e)
def is_exec_time_limit_error(error):
"""Return true if this is an error hit as a result of hitting EXEC_TIME_LIMIT_S."""
return "expired due to execution time limit" in str(error)