blob: 4324b532eda3380ec280ab59d6488dea200f7957 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Tests for query expiration.
import pytest
import re
import threading
from time import sleep, time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestQueryExpiration(CustomClusterTestSuite):
"""Tests query expiration logic"""
def _check_num_executing(self, impalad, expected):
in_flight_queries = impalad.service.get_in_flight_queries()
# Guard against too few in-flight queries.
assert expected <= len(in_flight_queries)
actual = 0
for query in in_flight_queries:
if query["executing"]:
actual += 1
assert query["waiting"]
assert actual == expected, '%s out of %s queries with expected (%s) status' \
% (actual, len(in_flight_queries), expected)
@CustomClusterTestSuite.with_args("--idle_query_timeout=8 --logbuflevel=-1")
def test_query_expiration(self, vector):
"""Confirm that single queries expire if not fetched"""
impalad = self.cluster.get_first_impalad()
client = impalad.service.create_beeswax_client()
num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
handles = []
# This query will time out with the default idle timeout (8s).
query1 = "SELECT SLEEP(1000000)"
default_timeout_expire_handle = client.execute_async(query1)
# This query will hit a lower time limit.
client.execute("SET EXEC_TIME_LIMIT_S=3")
time_limit_expire_handle = client.execute_async(query1);
# This query will hit a lower idle timeout instead of the default timeout or time
# limit.
client.execute("SET EXEC_TIME_LIMIT_S=5")
client.execute("SET QUERY_TIMEOUT_S=3")
short_timeout_expire_handle = client.execute_async("SELECT SLEEP(2000000)")
client.execute("SET EXEC_TIME_LIMIT_S=0")
# Set a huge timeout, to check that the server bounds it by --idle_query_timeout
client.execute("SET QUERY_TIMEOUT_S=1000")
default_timeout_expire_handle2 = client.execute_async("SELECT SLEEP(3000000)")
self._check_num_executing(impalad, len(handles))
before = time()
# Queries with timeout or time limit of 1 should have expired, other queries should
# still be running.
assert num_expired + 2 == impalad.service.get_metric_value(
assert (client.get_state(short_timeout_expire_handle) ==
assert (client.get_state(time_limit_expire_handle) ==
assert (client.get_state(default_timeout_expire_handle) ==
assert (client.get_state(default_timeout_expire_handle2) ==
self.__expect_expired(client, query1, short_timeout_expire_handle,
"Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 3s000ms\)")
self.__expect_expired(client, query1, time_limit_expire_handle,
"Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 3s000ms")
self._check_num_executing(impalad, 2)
self.assert_impalad_log_contains('INFO', "Expiring query due to client inactivity: "
"[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d")
"Expiring query [0-9a-f]+:[0-9a-f]+ due to execution time limit of 3s")
# Wait until the remaining queries expire. The time limit query will have hit
# expirations but only one should be counted.
num_expired + len(handles))
# The metric and client state are not atomically maintained. Since the
# expiration metric has just been reached, accessing the client state
# is guarded in a loop to avoid flaky false negatives.
self.__expect_client_state(client, default_timeout_expire_handle,
self.__expect_client_state(client, default_timeout_expire_handle2,
# Check that we didn't wait too long to be expired (double the timeout is sufficiently
# large to avoid most noise in measurement)
assert time() - before < 16
client.execute("SET QUERY_TIMEOUT_S=0")
# Synchronous execution; calls fetch() and query should not time out.
# Note: could be flakey if execute() takes too long to call fetch() etc after the
# query completes.
handle = client.execute("SELECT SLEEP(2500)")
# Confirm that no extra expirations happened
assert impalad.service.get_metric_value('impala-server.num-queries-expired') \
== len(handles)
self._check_num_executing(impalad, 0)
for handle in handles:
except Exception, e:
# We fetched from some cancelled handles above, which unregistered the queries.
assert 'Invalid or unknown query handle' in str(e)
def test_query_expiration_no_default(self, vector):
"""Confirm that single queries expire if no default is set, but a per-query
expiration or time limit is set"""
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
query = "SELECT SLEEP(1000000)"
client.execute("SET QUERY_TIMEOUT_S=1")
timeout_handle = client.execute_async(query)
client.execute("SET QUERY_TIMEOUT_S=0")
client.execute("SET EXEC_TIME_LIMIT_S=1")
time_limit_handle = client.execute_async(query)
client.execute("SET EXEC_TIME_LIMIT_S=0")
# Set a huge timeout, server should not expire the query while this test is running
client.execute("SET QUERY_TIMEOUT_S=1000")
no_timeout_handle = client.execute_async(query)
before = time()
# Query with timeout of 1 should have expired, other query should still be running.
assert num_expired + 2 == impalad.service.get_metric_value(
assert client.get_state(timeout_handle) == client.QUERY_STATES['EXCEPTION']
assert client.get_state(time_limit_handle) == client.QUERY_STATES['EXCEPTION']
assert client.get_state(no_timeout_handle) == client.QUERY_STATES['FINISHED']
self.__expect_expired(client, query, timeout_handle,
"Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 1s000ms\)")
self.__expect_expired(client, query, time_limit_handle,
"Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 1s000ms")
def __expect_expired(self, client, query, handle, exception_regex):
"""Check that the query handle expired, with an error containing exception_regex"""
client.fetch(query, handle)
assert False
except Exception, e:
assert, str(e))
def __expect_client_state(self, client, handle, expected_state, timeout=0.1):
"""Try to fetch 'expected_state' from 'client' within 'timeout' seconds.
Fail if unable."""
start_time = time()
actual_state = client.get_state(handle)
while (actual_state != expected_state and time() - start_time < timeout):
actual_state = client.get_state(handle)
assert expected_state == actual_state
def test_concurrent_query_expiration(self, vector):
"""Confirm that multiple concurrent queries are correctly expired if not fetched"""
class ExpiringQueryThread(threading.Thread):
"""Thread that runs a query and does not fetch so it will time out."""
def __init__(self, client):
super(ExpiringQueryThread, self).__init__()
self.client = client
self.success = False
def run(self):
self.handle = self.client.execute_async("SELECT SLEEP(3000000)")
class NonExpiringQueryThread(threading.Thread):
"""Thread that runs a query that does not hit the idle timeout."""
def __init__(self, client):
super(NonExpiringQueryThread, self).__init__()
self.client = client
self.success = False
def run(self):
result = self.client.execute("SELECT SLEEP(2500)")
self.success = result.success
class TimeLimitThread(threading.Thread):
"""Thread that runs a query that hits a time limit."""
def __init__(self, client):
super(TimeLimitThread, self).__init__()
self.client = client
self.success = False
def run(self):
# Query will not be idle but will hit time limit.
self.client.execute("SET EXEC_TIME_LIMIT_S=1")
result = self.client.execute("SELECT SLEEP(2500)")
assert "Expected to hit time limit"
except Exception, e:
self.exception = e
class NonExpiringTimeLimitThread(threading.Thread):
"""Thread that runs a query that finishes before time limit."""
def __init__(self, client):
super(NonExpiringTimeLimitThread, self).__init__()
self.client = client
self.success = False = None
def run(self):
# Query will complete before time limit.
self.client.execute("SET EXEC_TIME_LIMIT_S=10")
result = self.client.execute("SELECT count(*) FROM functional.alltypes")
self.success = result.success =
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
non_expiring_threads = \
for _ in xrange(5)]
expiring_threads = [ExpiringQueryThread(impalad.service.create_beeswax_client())
for _ in xrange(5)]
time_limit_threads = [TimeLimitThread(impalad.service.create_beeswax_client())
for _ in xrange(5)]
non_expiring_time_limit_threads = [
for _ in xrange(5)]
all_threads = non_expiring_threads + expiring_threads + time_limit_threads +\
for t in all_threads:
for t in all_threads:
num_expired + 10)
for t in non_expiring_threads:
assert t.success
for t in expiring_threads:
self.__expect_client_state(client, t.handle, client.QUERY_STATES['EXCEPTION'])
for t in time_limit_threads:
"Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 1s000ms",
for t in non_expiring_time_limit_threads:
assert t.success
assert[0] == '7300' # Number of rows in alltypes.