blob: df203b3f6bf7c98ed2c6ee02b4d925f9b2a5d63f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Tests for query expiration.
import pytest
import threading
from time import sleep, time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestQueryExpiration(CustomClusterTestSuite):
"""Tests query expiration logic"""
def _check_num_executing(self, impalad, expected):
in_flight_queries = impalad.service.get_in_flight_queries()
actual = 0
for query in in_flight_queries:
if query["executing"]:
actual += 1
else:
assert query["waiting"]
assert actual == expected
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--idle_query_timeout=6 --logbuflevel=-1")
def test_query_expiration(self, vector):
"""Confirm that single queries expire if not fetched"""
impalad = self.cluster.get_first_impalad()
client = impalad.service.create_beeswax_client()
num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
handle = client.execute_async("SELECT SLEEP(1000000)")
client.execute("SET QUERY_TIMEOUT_S=1")
handle2 = client.execute_async("SELECT SLEEP(2000000)")
# Set a huge timeout, to check that the server bounds it by --idle_query_timeout
client.execute("SET QUERY_TIMEOUT_S=1000")
handle3 = client.execute_async("SELECT SLEEP(3000000)")
self._check_num_executing(impalad, 3)
before = time()
sleep(4)
# Query with timeout of 1 should have expired, other query should still be running.
assert num_expired + 1 == impalad.service.get_metric_value(
'impala-server.num-queries-expired')
self._check_num_executing(impalad, 2)
self.assert_impalad_log_contains('INFO', "Expiring query due to client inactivity: "
"[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d")
impalad.service.wait_for_metric_value('impala-server.num-queries-expired',
num_expired + 3)
# Check that we didn't wait too long to be expired (double the timeout is sufficiently
# large to avoid most noise in measurement)
assert time() - before < 12
assert client.get_state(handle) == client.QUERY_STATES['EXCEPTION']
client.execute("SET QUERY_TIMEOUT_S=0")
# Synchronous execution; calls fetch() and query should not time out.
# Note: could be flakey if execute() takes too long to call fetch() etc after the
# query completes.
handle = client.execute("SELECT SLEEP(2500)")
# Confirm that no extra expirations happened
assert impalad.service.get_metric_value('impala-server.num-queries-expired') \
== num_expired + 3
self._check_num_executing(impalad, 0)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--idle_query_timeout=0")
def test_query_expiration_no_default(self, vector):
"""Confirm that single queries expire if no default is set, but a per-query
expiration is set"""
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
client.execute("SET QUERY_TIMEOUT_S=1")
handle = client.execute_async("SELECT SLEEP(1000000)")
# Set a huge timeout, server should not expire the query while this test is running
client.execute("SET QUERY_TIMEOUT_S=1000")
handle3 = client.execute_async("SELECT SLEEP(2000000)")
before = time()
sleep(4)
# Query with timeout of 1 should have expired, other query should still be running.
assert num_expired + 1 == impalad.service.get_metric_value(
'impala-server.num-queries-expired')
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--idle_query_timeout=1")
def test_concurrent_query_expiration(self, vector):
"""Confirm that multiple concurrent queries are correctly expired if not fetched"""
class ExpiringQueryThread(threading.Thread):
def __init__(self, client):
super(ExpiringQueryThread, self).__init__()
self.client = client
self.success = False
def run(self):
self.handle = self.client.execute_async("SELECT SLEEP(3000000)")
class NonExpiringQueryThread(threading.Thread):
def __init__(self, client):
super(NonExpiringQueryThread, self).__init__()
self.client = client
self.success = False
def run(self):
result = self.client.execute("SELECT SLEEP(2500)")
self.success = result.success
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
non_expiring_threads = \
[NonExpiringQueryThread(impalad.service.create_beeswax_client())
for _ in xrange(5)]
expiring_threads = [ExpiringQueryThread(impalad.service.create_beeswax_client())
for _ in xrange(5)]
all_threads = zip(non_expiring_threads, expiring_threads)
for n, e in all_threads:
n.start()
e.start()
for n, e in all_threads:
n.join()
e.join()
impalad.service.wait_for_metric_value('impala-server.num-queries-expired',
num_expired + 5)
for n, e in all_threads:
assert n.success
assert client.get_state(e.handle) == client.QUERY_STATES['EXCEPTION']