blob: 6f9b7c4b99e8fec2ed14d9ebb2802e13332dc946 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Tests admission control
import itertools
import logging
import math
import os
import pytest
import re
import shutil
import sys
import threading
from copy import copy
from time import sleep, time
from beeswaxd.BeeswaxService import QueryState
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.resource_pool_config import ResourcePoolConfig
from tests.common.skip import (
SkipIfS3,
SkipIfABFS,
SkipIfADLS,
SkipIfEC,
SkipIfNotHdfsMinicluster,
SkipIfOS)
from tests.common.test_dimensions import (
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
from tests.common.test_vector import ImpalaTestDimension
from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
from tests.util.web_pages_util import (
get_num_completed_backends,
get_mem_admitted_backends_debug_page)
from tests.verifiers.mem_usage_verifier import MemUsageVerifier
from tests.verifiers.metric_verifier import MetricVerifier
from ImpalaService import ImpalaHiveServer2Service
from TCLIService import TCLIService
LOG = logging.getLogger('admission_test')
# The query used for testing. It is important that this query returns many rows
# while keeping fragments active on all backends. This allows a thread to keep
# the query active and consuming resources by fetching one row at a time. The
# where clause is for debugging purposes; each thread will insert its id so
# that running queries can be correlated with the thread that submitted them.
QUERY = " union all ".join(["select * from functional.alltypesagg where id != {0}"] * 30)
# The statestore heartbeat and topic update frequency (ms). Set low for testing.
STATESTORE_RPC_FREQUENCY_MS = 100
# Time to sleep (in milliseconds) between issuing queries. When the delay is at least
# the statestore heartbeat frequency, all state should be visible by every impalad by
# the time the next query is submitted. Otherwise the different impalads will see stale
# state for some admission decisions.
SUBMISSION_DELAY_MS = \
[0, STATESTORE_RPC_FREQUENCY_MS / 2, STATESTORE_RPC_FREQUENCY_MS * 3 / 2]
# The number of queries to submit. The test does not support fewer queries than
# MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES to keep some validation logic
# simple.
NUM_QUERIES = [15, 30, 50]
# Whether we will submit queries to all available impalads (in a round-robin fashion)
ROUND_ROBIN_SUBMISSION = [True, False]
# The query pool to use. The impalads should be configured to recognize this
# pool with the parameters below.
POOL_NAME = "default-pool"
# Stress test timeout (seconds). The timeout needs to be significantly higher for
# slow builds like code coverage and ASAN (IMPALA-3790, IMPALA-6241).
STRESS_TIMEOUT = build_flavor_timeout(90, slow_build_timeout=600)
# The number of queries that can execute concurrently in the pool POOL_NAME.
MAX_NUM_CONCURRENT_QUERIES = 5
# The number of queries that can be queued in the pool POOL_NAME
MAX_NUM_QUEUED_QUERIES = 10
# Mem limit (bytes) used in the mem limit test
MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024
_STATESTORED_ARGS = ("-statestore_heartbeat_frequency_ms={freq_ms} "
"-statestore_priority_update_frequency_ms={freq_ms}").format(
freq_ms=STATESTORE_RPC_FREQUENCY_MS)
# Name of the subscriber metric tracking the admission control update interval.
REQUEST_QUEUE_UPDATE_INTERVAL =\
'statestore-subscriber.topic-impala-request-queue.update-interval'
# Key in the query profile for the query options.
PROFILE_QUERY_OPTIONS_KEY = "Query Options (set by configuration): "
# The different ways that a query thread can end its query.
QUERY_END_BEHAVIORS = ['EOS', 'CLIENT_CANCEL', 'QUERY_TIMEOUT', 'CLIENT_CLOSE']
# The timeout used for the QUERY_TIMEOUT end behaviour
QUERY_END_TIMEOUT_S = 1
# Value used for --admission_control_stale_topic_threshold_ms in tests.
STALE_TOPIC_THRESHOLD_MS = 500
# Regex that matches the first part of the profile info string added when a query is
# queued.
INITIAL_QUEUE_REASON_REGEX = \
"Initial admission queue reason: waited [0-9]* ms, reason: .*"
# The path to resources directory which contains the admission control config files.
RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources")
def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
proc_mem_limit=None, queue_wait_timeout_ms=None,
admission_control_slots=None, executor_groups=None):
extra_flags = ""
if proc_mem_limit is not None:
extra_flags += " -mem_limit={0}".format(proc_mem_limit)
if queue_wait_timeout_ms is not None:
extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
if admission_control_slots is not None:
extra_flags += " -admission_control_slots={0}".format(admission_control_slots)
if executor_groups is not None:
extra_flags += " -executor_groups={0}".format(executor_groups)
return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
"-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
max_requests, max_queued, pool_max_mem, extra_flags))
def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file,
additional_args="", make_copy=False):
fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file)
llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file)
if make_copy:
copy_fs_allocation_path = os.path.join(RESOURCES_DIR, "copy-" + fs_allocation_file)
copy_llama_site_path = os.path.join(RESOURCES_DIR, "copy-" + llama_site_file)
shutil.copy2(fs_allocation_path, copy_fs_allocation_path)
shutil.copy2(llama_site_path, copy_llama_site_path)
fs_allocation_path = copy_fs_allocation_path
llama_site_path = copy_llama_site_path
return ("-vmodule admission-controller=3 -fair_scheduler_allocation_path %s "
"-llama_site_path %s %s" % (fs_allocation_path, llama_site_path,
additional_args))
def log_metrics(log_prefix, metrics):
LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "
"released=%s, timed-out=%s", log_prefix, metrics['admitted'], metrics['queued'],
metrics['dequeued'], metrics['rejected'], metrics['released'],
metrics['timed-out'])
def compute_metric_deltas(m2, m1):
"""Returns a dictionary of the differences of metrics in m2 and m1 (m2 - m1)"""
return dict((n, m2.get(n, 0) - m1.get(n, 0)) for n in m2.keys())
def metric_key(pool_name, metric_name):
"""Helper method to construct the admission controller metric keys"""
return "admission-controller.%s.%s" % (metric_name, pool_name)
class TestAdmissionControllerBase(CustomClusterTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestAdmissionControllerBase, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
# There's no reason to test this on other file formats/compression codecs right now
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
def __check_pool_rejected(self, client, pool, expected_error_re):
try:
client.set_configuration({'request_pool': pool})
client.execute("select 1")
assert False, "Query should return error"
except ImpalaBeeswaxException as e:
assert re.search(expected_error_re, str(e))
def __check_query_options(self, profile, expected_query_options):
"""Validate that the expected per-pool query options were set on the specified
profile. expected_query_options is a list of "KEY=VALUE" strings, e.g.
["MEM_LIMIT=1", ...]"""
confs = []
for line in profile.split("\n"):
if PROFILE_QUERY_OPTIONS_KEY in line:
rhs = re.split(": ", line)[1]
confs = re.split(",", rhs)
break
expected_set = set([x.lower() for x in expected_query_options])
confs_set = set([x.lower() for x in confs])
assert expected_set.issubset(confs_set)
def __check_hs2_query_opts(self, pool_name, mem_limit=None, expected_options=None):
""" Submits a query via HS2 (optionally with a mem_limit in the confOverlay)
into pool_name and checks that the expected_query_options are set in the
profile."""
execute_statement_req = TCLIService.TExecuteStatementReq()
execute_statement_req.sessionHandle = self.session_handle
execute_statement_req.confOverlay = {'request_pool': pool_name}
if mem_limit is not None: execute_statement_req.confOverlay['mem_limit'] = mem_limit
execute_statement_req.statement = "select 1"
execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
HS2TestSuite.check_response(execute_statement_resp)
fetch_results_req = TCLIService.TFetchResultsReq()
fetch_results_req.operationHandle = execute_statement_resp.operationHandle
fetch_results_req.maxRows = 1
fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
HS2TestSuite.check_response(fetch_results_resp)
close_operation_req = TCLIService.TCloseOperationReq()
close_operation_req.operationHandle = execute_statement_resp.operationHandle
HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req))
get_profile_req = ImpalaHiveServer2Service.TGetRuntimeProfileReq()
get_profile_req.operationHandle = execute_statement_resp.operationHandle
get_profile_req.sessionHandle = self.session_handle
get_profile_resp = self.hs2_client.GetRuntimeProfile(get_profile_req)
HS2TestSuite.check_response(get_profile_resp)
self.__check_query_options(get_profile_resp.profile, expected_options)
def _execute_and_collect_profiles(self, queries, timeout_s, config_options={},
allow_query_failure=False):
"""Submit the query statements in 'queries' in parallel to the first impalad in
the cluster. After submission, the results are fetched from the queries in
sequence and their profiles are collected. Wait for up to timeout_s for
each query to finish. If 'allow_query_failure' is True, succeeds if the query
completes successfully or ends up in the EXCEPTION state. Otherwise expects the
queries to complete successfully.
Returns the profile strings."""
client = self.cluster.impalads[0].service.create_beeswax_client()
expected_states = [client.QUERY_STATES['FINISHED']]
if allow_query_failure:
expected_states.append(client.QUERY_STATES['EXCEPTION'])
try:
handles = []
profiles = []
client.set_configuration(config_options)
for query in queries:
handles.append(client.execute_async(query))
for query, handle in zip(queries, handles):
state = self.wait_for_any_state(handle, expected_states, timeout_s)
if state == client.QUERY_STATES['FINISHED']:
self.client.fetch(query, handle)
profiles.append(self.client.get_runtime_profile(handle))
return profiles
finally:
client.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-test2.xml",
llama_site_file="llama-site-test2.xml"),
default_query_options=[('mem_limit', 200000000)],
statestored_args=_STATESTORED_ARGS)
@needs_session(conf_overlay={'batch_size': '100'})
def test_set_request_pool(self):
"""Tests setting the REQUEST_POOL with the pool placement policy configured
to require a specific pool, and validate that the per-pool configurations were
applied."""
impalad = self.cluster.impalads[0]
client = impalad.service.create_beeswax_client()
# Expected default mem limit for queueA, used in several tests below
queueA_mem_limit = "MEM_LIMIT=%s" % (128 * 1024 * 1024)
try:
for pool in ['', 'not_a_pool_name']:
expected_error =\
"No mapping found for request from user '\S+' with requested pool '%s'"\
% (pool)
self.__check_pool_rejected(client, pool, expected_error)
# Check rejected if user does not have access.
expected_error = "Request from user '\S+' with requested pool 'root.queueC' "\
"denied access to assigned pool 'root.queueC'"
self.__check_pool_rejected(client, 'root.queueC', expected_error)
# Also try setting a valid pool
client.set_configuration({'request_pool': 'root.queueB'})
result = client.execute("select 1")
# Query should execute in queueB which doesn't have a default mem limit set in the
# llama-site.xml, so it should inherit the value from the default process query
# options.
self.__check_query_options(result.runtime_profile,
['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB'])
# Try setting the pool for a queue with a very low queue timeout.
# queueA allows only 1 running query and has a queue timeout of 50ms, so the
# second concurrent query should time out quickly.
client.set_configuration({'request_pool': 'root.queueA'})
handle = client.execute_async("select sleep(1000)")
self.__check_pool_rejected(client, 'root.queueA', "exceeded timeout")
assert client.get_state(handle) == client.QUERY_STATES['FINISHED']
# queueA has default query options mem_limit=128m,query_timeout_s=5
self.__check_query_options(client.get_runtime_profile(handle),
[queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA'])
client.close_query(handle)
# Should be able to set query options via the set command (overriding defaults if
# applicable). mem_limit overrides the pool default. abort_on_error has no
# proc/pool default.
client.execute("set mem_limit=31337")
client.execute("set abort_on_error=1")
result = client.execute("select 1")
self.__check_query_options(result.runtime_profile,
['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',
'REQUEST_POOL=root.queueA'])
# Should be able to set query options (overriding defaults if applicable) with the
# config overlay sent with the query RPC. mem_limit is a pool-level override and
# max_io_buffers has no proc/pool default.
client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': '12345'})
result = client.execute("select 1")
self.__check_query_options(result.runtime_profile,
['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',
'ABORT_ON_ERROR=1'])
# Once options are reset to their defaults, the queue
# configuration should kick back in. We'll see the
# queue-configured mem_limit, and we won't see
# abort on error, because it's back to being the default.
client.execute('set mem_limit=""')
client.execute('set abort_on_error=""')
client.set_configuration({'request_pool': 'root.queueA'})
result = client.execute("select 1")
self.__check_query_options(result.runtime_profile,
[queueA_mem_limit, 'REQUEST_POOL=root.queueA', 'QUERY_TIMEOUT_S=5'])
finally:
client.close()
# HS2 tests:
# batch_size is set in the HS2 OpenSession() call via the requires_session() test
# decorator, so that is included in all test cases below.
batch_size = "BATCH_SIZE=100"
# Check HS2 query in queueA gets the correct query options for the pool.
self.__check_hs2_query_opts("root.queueA", None,
[queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
# Check overriding the mem limit sent in the confOverlay with the query.
self.__check_hs2_query_opts("root.queueA", '12345',
['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
# Check HS2 query in queueB gets the process-wide default query options
self.__check_hs2_query_opts("root.queueB", None,
['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB', batch_size])
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-test2.xml",
llama_site_file="llama-site-test2.xml",
additional_args="-require_username"),
statestored_args=_STATESTORED_ARGS)
def test_require_user(self):
open_session_req = TCLIService.TOpenSessionReq()
open_session_req.username = ""
open_session_resp = self.hs2_client.OpenSession(open_session_req)
TestAdmissionController.check_response(open_session_resp)
try:
execute_statement_req = TCLIService.TExecuteStatementReq()
execute_statement_req.sessionHandle = open_session_resp.sessionHandle
execute_statement_req.statement = "select count(1) from functional.alltypes"
execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
self.wait_for_operation_state(execute_statement_resp.operationHandle,
TCLIService.TOperationState.ERROR_STATE)
get_operation_status_resp = self.get_operation_status(
execute_statement_resp.operationHandle)
assert "User must be specified" in get_operation_status_resp.errorMessage
finally:
close_req = TCLIService.TCloseSessionReq()
close_req.sessionHandle = open_session_resp.sessionHandle
TestAdmissionController.check_response(self.hs2_client.CloseSession(close_req))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
pool_max_mem=10 * 1024 * 1024, proc_mem_limit=1024 * 1024 * 1024),
statestored_args=_STATESTORED_ARGS)
def test_trivial_coord_query_limits(self):
"""Tests that trivial coordinator only queries have negligible resource requirements.
"""
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
# Queries with only constant exprs or limit 0 should be admitted.
self.execute_query_expect_success(self.client, "select 1")
self.execute_query_expect_success(self.client,
"select * from functional.alltypes limit 0")
non_trivial_queries = [
"select * from functional.alltypesagg limit 1",
"select * from functional.alltypestiny"]
for query in non_trivial_queries:
ex = self.execute_query_expect_failure(self.client, query)
assert re.search("Rejected query from pool default-pool: request memory needed "
".* is greater than pool max mem resources 10.00 MB \(configured "
"statically\)", str(ex))
@SkipIfS3.hdfs_block_size
@SkipIfABFS.hdfs_block_size
@SkipIfADLS.hdfs_block_size
@SkipIfEC.fix_later
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
pool_max_mem=40 * 1024 * 1024, proc_mem_limit=1024 * 1024 * 1024),
statestored_args=_STATESTORED_ARGS)
def test_memory_rejection(self, vector):
"""Test that rejection of queries based on reservation and estimates works as
expected. The test depends on scanner memory estimates, which different on remote
filesystems with different (synthetic) block sizes."""
# Test that the query will be rejected by admission control if:
# a) the largest per-backend min buffer reservation is larger than the query mem limit
# b) the largest per-backend min buffer reservation is larger than the
# buffer_pool_limit query option
# c) the cluster-wide min-buffer reservation size is larger than the pool memory
# resources.
self.run_test_case('QueryTest/admission-reject-min-reservation', vector)
# Test that queries are rejected based on memory estimates. Set num_nodes=1 to
# avoid unpredictability from scheduling on different backends.
exec_options = vector.get_value('exec_option')
exec_options['num_nodes'] = 1
self.run_test_case('QueryTest/admission-reject-mem-estimate', vector)
# Process mem_limit used in test_mem_limit_upper_bound
PROC_MEM_TEST_LIMIT = 1024 * 1024 * 1024
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT))
def test_mem_limit_upper_bound(self, vector):
""" Test to ensure that a query is admitted if the requested memory is equal to the
process mem limit"""
query = "select * from functional.alltypesagg limit 1"
exec_options = vector.get_value('exec_option')
# Setting requested memory equal to process memory limit
exec_options['mem_limit'] = self.PROC_MEM_TEST_LIMIT
self.execute_query_expect_success(self.client, query, exec_options)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT),
num_exclusive_coordinators=1)
def test_mem_limit_dedicated_coordinator(self, vector):
"""Regression test for IMPALA-8469: coordinator fragment should be admitted on
dedicated coordinator"""
query = "select * from functional.alltypesagg limit 1"
exec_options = vector.get_value('exec_option')
# Test both single-node and distributed plans
for num_nodes in [0, 1]:
# Memory just fits in memory limits
exec_options['mem_limit'] = self.PROC_MEM_TEST_LIMIT
exec_options['num_nodes'] = num_nodes
self.execute_query_expect_success(self.client, query, exec_options)
# A bit too much memory to run on coordinator.
exec_options['mem_limit'] = long(self.PROC_MEM_TEST_LIMIT * 1.1)
ex = self.execute_query_expect_failure(self.client, query, exec_options)
assert ("Rejected query from pool default-pool: request memory needed "
"1.10 GB is greater than memory available for admission 1.00 GB" in
str(ex)), str(ex)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="mem-limit-test-fair-scheduler.xml",
llama_site_file="mem-limit-test-llama-site.xml"), num_exclusive_coordinators=1,
cluster_size=2)
def test_dedicated_coordinator_mem_accounting(self, vector):
"""Verify that when using dedicated coordinators, the memory admitted for and the
mem limit applied to the query fragments running on the coordinator is different than
the ones on executors."""
self.__verify_mem_accounting(vector, using_dedicated_coord_estimates=True)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="mem-limit-test-fair-scheduler.xml",
llama_site_file="mem-limit-test-llama-site.xml")
+ " -use_dedicated_coordinator_estimates false",
num_exclusive_coordinators=1,
cluster_size=2)
def test_dedicated_coordinator_legacy_mem_accounting(self, vector):
"""Verify that when using dedicated coordinators with specialized dedicated coord
estimates turned off using a hidden startup param, the memory admitted for and the
mem limit applied to the query fragments running on the coordinator is the same
(as expected from legacy behavior)."""
self.__verify_mem_accounting(vector, using_dedicated_coord_estimates=False)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="mem-limit-test-fair-scheduler.xml",
llama_site_file="mem-limit-test-llama-site.xml"), num_exclusive_coordinators=1,
cluster_size=2)
def test_sanity_checks_dedicated_coordinator(self, vector, unique_database):
"""Sanity tests for verifying targeted dedicated coordinator memory estimations and
behavior."""
self.client.set_configuration_option('request_pool', "root.regularPool")
ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
exec_options = vector.get_value('exec_option')
# Make sure query option MAX_MEM_ESTIMATE_FOR_ADMISSION is enforced on the dedicated
# coord estimates. Without this query option the estimate would be > 100MB.
expected_mem = 60 * (1 << 20) # 60MB
exec_options['MAX_MEM_ESTIMATE_FOR_ADMISSION'] = expected_mem
self.client.set_configuration(exec_options)
handle = self.client.execute_async(QUERY.format(1))
self.client.wait_for_finished_timeout(handle, 1000)
mem_to_admit = self.__get_mem_limits_admission_debug_page()
assert abs(mem_to_admit['coordinator'] - expected_mem) < 0.0001,\
"mem_to_admit:" + str(mem_to_admit)
assert abs(mem_to_admit['executor'] - expected_mem) < 0.0001, \
"mem_to_admit:" + str(mem_to_admit)
self.client.close_query(handle)
# If the query is only scheduled on the coordinator then the mem to admit on executor
# should be zero.
exec_options['NUM_NODES'] = 1
self.client.set_configuration(exec_options)
handle = self.client.execute_async(QUERY.format(1))
self.client.wait_for_finished_timeout(handle, 1000)
mem_to_admit = self.__get_mem_limits_admission_debug_page()
assert abs(mem_to_admit['coordinator'] - expected_mem) < 0.0001, \
"mem_to_admit:" + str(mem_to_admit)
assert abs(mem_to_admit['executor'] - 0) < 0.0001, \
"mem_to_admit:" + str(mem_to_admit)
self.client.close_query(handle)
# Make sure query execution works perfectly for a query that does not have any
# fragments schdeuled on the coordinator, but has runtime-filters that need to be
# aggregated at the coordinator.
exec_options = vector.get_value('exec_option')
exec_options['RUNTIME_FILTER_WAIT_TIME_MS'] = 30000
query = """CREATE TABLE {0}.temp_tbl AS SELECT STRAIGHT_JOIN o_orderkey
FROM tpch_parquet.lineitem INNER JOIN [SHUFFLE] tpch_parquet.orders
ON o_orderkey = l_orderkey GROUP BY 1""".format(unique_database)
result = self.execute_query_expect_success(self.client, query, exec_options)
assert "Runtime filters: All filters arrived" in result.runtime_profile
def __verify_mem_accounting(self, vector, using_dedicated_coord_estimates):
"""Helper method used by test_dedicated_coordinator_*_mem_accounting that verifies
the actual vs expected values for mem admitted and mem limit for both coord and
executor. Also verifies that those memory values are different if
'using_dedicated_coord_estimates' is true."""
self.client.set_configuration_option('request_pool', "root.regularPool")
ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
handle = self.client.execute_async(QUERY.format(1))
self.client.wait_for_finished_timeout(handle, 1000)
expected_mem_limits = self.__get_mem_limits_admission_debug_page()
actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
debug_string = " expected_mem_limits:" + str(
expected_mem_limits) + " actual_mem_limits:" + str(
actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
MB = 1 << 20
# Easiest way to check float in-equality.
assert abs(expected_mem_limits['coordinator'] - expected_mem_limits[
'executor']) > 0.0001 or not using_dedicated_coord_estimates, debug_string
# There may be some rounding errors so keep a margin of 5MB when verifying
assert abs(actual_mem_limits['coordinator'] - expected_mem_limits[
'coordinator']) < 5 * MB, debug_string
assert abs(actual_mem_limits['executor'] - expected_mem_limits[
'executor']) < 5 * MB, debug_string
assert abs(mem_admitted['coordinator'] - expected_mem_limits[
'coordinator']) < 5 * MB, debug_string
assert abs(
mem_admitted['executor'][0] - expected_mem_limits['executor']) < 5 * MB, \
debug_string
def __get_mem_limits_admission_debug_page(self):
"""Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
mem_limit calculated by the admission controller from the impala admission debug page
of the coordinator impala daemon. Returns a dictionary with the keys 'coordinator'
and 'executor' and their respective mem values in bytes."""
# Based on how the cluster is setup, the first impalad in the cluster is the
# coordinator.
response_json = self.cluster.impalads[0].service.get_debug_webpage_json("admission")
assert 'resource_pools' in response_json
assert len(response_json['resource_pools']) == 1
assert response_json['resource_pools'][0]['running_queries']
assert len(response_json['resource_pools'][0]['running_queries']) == 1
query_info = response_json['resource_pools'][0]['running_queries'][0]
return {'coordinator': float(query_info["coord_mem_to_admit"]),
'executor': float(query_info["mem_limit"])}
def __get_mem_limits_memz_debug_page(self, query_id):
"""Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
mem limits enforced on the query (identified by the 'query_id') extracted from
mem-tracker's output on the memz debug page of the dedicated coordinator and the
executor impala daemons. Returns a dictionary with the keys 'coordinator' and
'executor' and their respective mem values in bytes."""
metric_name = "Query({0})".format(query_id)
# Based on how the cluster is setup, the first impalad in the cluster is the
# coordinator.
mem_trackers = [MemUsageVerifier(i.service).get_mem_usage_values(metric_name) for i in
self.cluster.impalads]
return {'coordinator': float(mem_trackers[0]['limit']),
'executor': float(mem_trackers[1]['limit'])}
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
def test_dedicated_coordinator_planner_estimates(self, vector, unique_database):
"""Planner tests to add coverage for coordinator estimates when using dedicated
coordinators. Also includes coverage for verifying cluster memory admitted."""
vector_copy = copy(vector)
exec_options = vector_copy.get_value('exec_option')
# Remove num_nodes from the options to allow test case runner to set it in one of
# the test cases.
del exec_options['num_nodes']
exec_options['num_scanner_threads'] = 1 # To make estimates consistently reproducible
self.run_test_case('QueryTest/dedicated-coord-mem-estimates', vector_copy,
unique_database)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2)
def test_mem_limit_executors(self, vector, unique_database):
"""Verify that the query option mem_limit_executors is only enforced on the
executors."""
expected_exec_mem_limit = "999999999"
ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
self.client.set_configuration({"MEM_LIMIT_EXECUTORS": expected_exec_mem_limit})
handle = self.client.execute_async(QUERY.format(1))
self.client.wait_for_finished_timeout(handle, 1000)
expected_mem_limits = self.__get_mem_limits_admission_debug_page()
assert expected_mem_limits['executor'] > expected_mem_limits[
'coordinator'], expected_mem_limits
assert expected_mem_limits['executor'] == float(
expected_exec_mem_limit), expected_mem_limits
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
pool_max_mem=10 * PROC_MEM_TEST_LIMIT,
queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS),
start_args="--per_impalad_args=-mem_limit=3G;-mem_limit=3G;-mem_limit=2G",
statestored_args=_STATESTORED_ARGS)
def test_heterogeneous_proc_mem_limit(self, vector):
""" Test to ensure that the admission controller takes into account the actual proc
mem limits of each impalad. Starts a cluster where the last impalad has a smaller
proc mem limit than other impalads and runs queries where admission/rejection decision
depends on the coordinator knowing the other impalad's mem limits.
The queue_wait_timeout_ms has been set to be more than the prioritized statestore
update time, so that the queries don't time out before receiving updates to pool
stats"""
# Choose a query that runs on all 3 backends.
query = "select * from functional.alltypesagg, (select 1) B limit 1"
# Successfully run a query with mem limit equal to the lowest process memory among
# impalads
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "2G"
self.execute_query_expect_success(self.client, query, exec_options)
# Test that a query scheduled to run on a single node and submitted to the impalad
# with higher proc mem limit succeeds.
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "3G"
exec_options['num_nodes'] = "1"
self.execute_query_expect_success(self.client, query, exec_options)
# Exercise rejection checks in admission controller.
try:
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "3G"
self.execute_query(query, exec_options)
except ImpalaBeeswaxException as e:
assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB"
" is greater than memory available for admission 2.00 GB of \S+", str(e)), \
str(e)
# Exercise queuing checks in admission controller.
try:
# Wait for previous queries to finish to avoid flakiness.
for impalad in self.cluster.impalads:
impalad.service.wait_for_metric_value("impala-server.num-fragments-in-flight", 0)
impalad_with_2g_mem = self.cluster.impalads[2].service.create_beeswax_client()
impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
impalad_with_2g_mem.execute_async("select sleep(1000)")
# Wait for statestore update to update the mem admitted in each node.
sleep(STATESTORE_RPC_FREQUENCY_MS / 1000)
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "2G"
# Since Queuing is synchronous and we can't close the previous query till this
# returns, we wait for this to timeout instead.
self.execute_query(query, exec_options)
except ImpalaBeeswaxException as e:
assert re.search("Queued reason: Not enough memory available on host \S+.Needed "
"2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), str(e)
finally:
if impalad_with_2g_mem is not None:
impalad_with_2g_mem.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--logbuflevel=-1 " + impalad_admission_ctrl_flags(max_requests=1,
max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT),
statestored_args=_STATESTORED_ARGS)
def test_cancellation(self):
""" Test to confirm that all Async cancellation windows are hit and are able to
succesfully cancel the query"""
impalad = self.cluster.impalads[0]
client = impalad.service.create_beeswax_client()
try:
client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1)
handle = client.execute_async("select 1")
sleep(1)
client.close_query(handle)
self.assert_impalad_log_contains('INFO',
"Ready to be Rejected but already cancelled, query id=")
client.clear_configuration()
client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
handle = client.execute_async("select 2")
sleep(1)
client.close_query(handle)
self.assert_impalad_log_contains('INFO',
"Ready to be Admitted immediately but already cancelled, query id=")
client.set_configuration_option("debug_action",
"CRS_BEFORE_COORD_STARTS:SLEEP@2000")
handle = client.execute_async("select 3")
sleep(1)
client.close_query(handle)
self.assert_impalad_log_contains('INFO',
"Cancelled right after starting the coordinator query id=")
client.set_configuration_option("debug_action", "CRS_AFTER_COORD_STARTS:SLEEP@2000")
handle = client.execute_async("select 4")
sleep(1)
client.close_query(handle)
self.assert_impalad_log_contains('INFO',
"Cancelled right after starting the coordinator query id=", 2)
client.clear_configuration()
handle = client.execute_async("select sleep(10000)")
client.set_configuration_option("debug_action",
"AC_AFTER_ADMISSION_OUTCOME:SLEEP@2000")
queued_query_handle = client.execute_async("select 5")
sleep(1)
assert client.get_state(queued_query_handle) == QueryState.COMPILED
assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle)
# Only cancel the queued query, because close will wait till it unregisters, this
# gives us a chance to close the running query and allow the dequeue thread to
# dequeue the queue query
client.cancel(queued_query_handle)
client.close_query(handle)
client.close_query(queued_query_handle)
queued_profile = client.get_runtime_profile(queued_query_handle)
assert "Admission result: Cancelled (queued)" in queued_profile
self.assert_impalad_log_contains('INFO', "Dequeued cancelled query=")
client.clear_configuration()
handle = client.execute_async("select sleep(10000)")
queued_query_handle = client.execute_async("select 6")
sleep(1)
assert client.get_state(queued_query_handle) == QueryState.COMPILED
assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle)
client.close_query(queued_query_handle)
client.close_query(handle)
queued_profile = client.get_runtime_profile(queued_query_handle)
assert "Admission result: Cancelled (queued)" in queued_profile
for i in self.cluster.impalads:
i.service.wait_for_metric_value("impala-server.num-fragments-in-flight", 0)
assert self.cluster.impalads[0].service.get_metric_value(
"admission-controller.agg-num-running.default-pool") == 0
assert self.cluster.impalads[0].service.get_metric_value(
"admission-controller.total-admitted.default-pool") == 4
assert self.cluster.impalads[0].service.get_metric_value(
"admission-controller.total-queued.default-pool") == 2
finally:
client.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
pool_max_mem=1024 * 1024 * 1024),
statestored_args=_STATESTORED_ARGS)
def test_queue_reasons_num_queries(self):
"""Test that queue details appear in the profile when queued based on num_queries."""
# Run a bunch of queries - one should get admitted immediately, the rest should
# be dequeued one-by-one.
STMT = "select sleep(1000)"
TIMEOUT_S = 60
EXPECTED_REASON = \
"Latest admission queue reason: number of running queries 1 is at or over limit 1"
NUM_QUERIES = 5
profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
TIMEOUT_S)
num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
assert num_reasons == NUM_QUERIES - 1, \
"All queries except first should have been queued: " + '\n===\n'.join(profiles)
init_queue_reasons = self.__extract_init_queue_reasons(profiles)
assert len(init_queue_reasons) == NUM_QUERIES - 1, \
"All queries except first should have been queued: " + '\n===\n'.join(profiles)
over_limit_details = [detail
for detail in init_queue_reasons if 'number of running queries' in detail]
assert len(over_limit_details) == 1, \
"One query initially queued because of num_queries: " + '\n===\n'.join(profiles)
queue_not_empty_details = [detail
for detail in init_queue_reasons if 'queue is not empty' in detail]
assert len(queue_not_empty_details) == NUM_QUERIES - 2, \
"Others queued because of non-empty queue: " + '\n===\n'.join(profiles)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=10, max_queued=10,
pool_max_mem=10 * 1024 * 1024),
statestored_args=_STATESTORED_ARGS)
def test_queue_reasons_memory(self):
"""Test that queue details appear in the profile when queued based on memory."""
# Run a bunch of queries with mem_limit set so that only one can be admitted at a
# time- one should get admitted immediately, the rest should be dequeued one-by-one.
STMT = "select sleep(100)"
TIMEOUT_S = 60
EXPECTED_REASON = "Latest admission queue reason: Not enough aggregate memory " +\
"available in pool default-pool with max mem resources 10.00 MB (configured " \
"statically). Needed 9.00 MB but only 1.00 MB was available."
NUM_QUERIES = 5
profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
TIMEOUT_S, {'mem_limit': '9mb'})
num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
assert num_reasons == NUM_QUERIES - 1, \
"All queries except first should have been queued: " + '\n===\n'.join(profiles)
init_queue_reasons = self.__extract_init_queue_reasons(profiles)
assert len(init_queue_reasons) == NUM_QUERIES - 1, \
"All queries except first should have been queued: " + '\n===\n'.join(profiles)
over_limit_details = [detail for detail in init_queue_reasons
if 'Not enough aggregate memory available' in detail]
assert len(over_limit_details) == 1, \
"One query initially queued because of memory: " + '\n===\n'.join(profiles)
queue_not_empty_details = [detail
for detail in init_queue_reasons if 'queue is not empty' in detail]
assert len(queue_not_empty_details) == NUM_QUERIES - 2, \
"Others queued because of non-empty queue: " + '\n===\n'.join(profiles)
def __extract_init_queue_reasons(self, profiles):
"""Return a list of the 'Admission Queue details' strings found in 'profiles'"""
matches = [re.search(INITIAL_QUEUE_REASON_REGEX, profile) for profile in profiles]
return [match.group(0) for match in matches if match is not None]
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=100, max_queued=10,
pool_max_mem=-1, admission_control_slots=4,
executor_groups="default-pool-group1"),
statestored_args=_STATESTORED_ARGS)
def test_queue_reasons_slots(self):
"""Test that queue details appear in the profile when queued based on number of
slots."""
# Run a bunch of queries - one should get admitted immediately, the rest should
# be dequeued one-by-one.
STMT = "select min(ss_wholesale_cost) from tpcds_parquet.store_sales"
TIMEOUT_S = 60
EXPECTED_REASON = "Latest admission queue reason: Not enough admission control " +\
"slots available on host"
NUM_QUERIES = 5
profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
TIMEOUT_S, config_options={"mt_dop": 4})
num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
assert num_reasons == NUM_QUERIES - 1, \
"All queries except first should have been queued: " + '\n===\n'.join(profiles)
init_queue_reasons = self.__extract_init_queue_reasons(profiles)
assert len(init_queue_reasons) == NUM_QUERIES - 1, \
"All queries except first should have been queued: " + '\n===\n'.join(profiles)
over_limit_details = [detail
for detail in init_queue_reasons
if "Not enough admission control slots available on host" in detail]
assert len(over_limit_details) == 1, \
"One query initially queued because of slots: " + '\n===\n'.join(profiles)
queue_not_empty_details = [detail
for detail in init_queue_reasons if 'queue is not empty' in detail]
assert len(queue_not_empty_details) == NUM_QUERIES - 2, \
"Others queued because of non-empty queue: " + '\n===\n'.join(profiles)
# Confirm that the cluster quiesces and all metrics return to zero.
for impalad in self.cluster.impalads:
verifier = MetricVerifier(impalad.service)
verifier.wait_for_backend_admission_control_state()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
pool_max_mem=1024 * 1024 * 1024),
statestored_args=_STATESTORED_ARGS)
def test_query_locations_correctness(self, vector):
"""Regression test for IMPALA-7516: Test to make sure query locations and in-flight
queries are correct for different admission results that can affect it."""
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
# Choose a query that runs on all 3 backends.
query = "select * from functional.alltypesagg A, (select sleep(10000)) B limit 1"
# Case 1: When a query runs succesfully.
handle = self.client.execute_async(query)
self.__assert_num_queries_accounted(1)
self.close_query(handle)
self.__assert_num_queries_accounted(0)
# Case 2: When a query is queued then cancelled
handle_running = self.client.execute_async(query)
self.client.wait_for_admission_control(handle_running)
handle_queued = self.client.execute_async(query)
self.client.wait_for_admission_control(handle_queued)
self.impalad_test_service.wait_for_metric_value(
"admission-controller.total-queued.default-pool", 1)
# Queued queries don't show up on backends
self.__assert_num_queries_accounted(1, 1)
# First close the queued query
self.close_query(handle_queued)
self.close_query(handle_running)
self.__assert_num_queries_accounted(0)
# Case 3: When a query gets rejected
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "1b"
self.execute_query_expect_failure(self.client, query, exec_options)
self.__assert_num_queries_accounted(0)
def __assert_num_queries_accounted(self, num_running, num_queued=0):
"""Checks if the num of queries accounted by query_locations and in-flight are as
expected"""
# Wait for queries to start/un-register.
num_inflight = num_running + num_queued
assert self.impalad_test_service.wait_for_num_in_flight_queries(num_inflight)
query_locations = self.impalad_test_service.get_query_locations()
for host, num_q in query_locations.items():
assert num_q == num_running, "There should be {0} running queries on either " \
"impalads: {0}".format(query_locations)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="mem-limit-test-fair-scheduler.xml",
llama_site_file="mem-limit-test-llama-site.xml", make_copy=True),
statestored_args=_STATESTORED_ARGS)
def test_pool_mem_limit_configs(self, vector):
"""Runs functional tests for the max/min_query_mem_limit pool config attributes"""
exec_options = vector.get_value('exec_option')
# Set this to the default.
exec_options['exec_single_node_rows_threshold'] = 100
# Set num_nodes to 1 since its easier to see one-to-one mapping of per_host and
# per_cluster values used in the test.
exec_options['num_nodes'] = 1
self.run_test_case('QueryTest/admission-max-min-mem-limits', vector)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="mem-limit-test-fair-scheduler.xml",
llama_site_file="mem-limit-test-llama-site.xml",
additional_args="-default_pool_max_requests 1", make_copy=True),
statestored_args=_STATESTORED_ARGS)
def test_pool_config_change_while_queued(self, vector):
"""Tests that the invalid checks work even if the query is queued. Makes sure that a
queued query is dequeued and rejected if the config is invalid."""
pool_name = "invalidTestPool"
config_str = "max-query-mem-limit"
self.client.set_configuration_option('request_pool', pool_name)
# Setup to queue a query.
sleep_query_handle = self.client.execute_async("select sleep(10000)")
self.client.wait_for_admission_control(sleep_query_handle)
self.__wait_for_change_to_profile(sleep_query_handle,
"Admission result: Admitted immediately")
queued_query_handle = self.client.execute_async("select 2")
self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
# Change config to be invalid.
llama_site_path = os.path.join(RESOURCES_DIR, "copy-mem-limit-test-llama-site.xml")
config = ResourcePoolConfig(self.cluster.impalads[0].service, llama_site_path)
config.set_config_value(pool_name, config_str, 1)
# Close running query so the queued one gets a chance.
self.client.close_query(sleep_query_handle)
# Observe that the queued query fails.
self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
self.close_query(queued_query_handle)
# Change the config back to a valid value
config.set_config_value(pool_name, config_str, 0)
# Now do the same thing for change to pool.max-query-mem-limit such that it can no
# longer accommodate the largest min_reservation.
# Setup to queue a query.
sleep_query_handle = self.client.execute_async("select sleep(10000)")
self.client.wait_for_admission_control(sleep_query_handle)
queued_query_handle = self.client.execute_async(
"select * from functional_parquet.alltypes limit 1")
self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
# Change config to something less than the what is required to accommodate the
# largest min_reservation (which in this case is 32.09 MB.
config.set_config_value(pool_name, config_str, 25 * 1024 * 1024)
# Close running query so the queued one gets a chance.
self.client.close_query(sleep_query_handle)
# Observe that the queued query fails.
self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
self.close_query(queued_query_handle)
def __wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
for _ in range(timeout * 10):
profile = self.client.get_runtime_profile(query_handle)
if search_string in profile:
return
sleep(0.1)
assert False, "Timed out waiting for change to profile\nSearch " \
"String: {0}\nProfile:\n{1}".format(search_string, str(profile))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
pool_max_mem=1024 * 1024 * 1024))
@needs_session()
def test_queuing_status_through_query_log_and_exec_summary(self):
"""Test to verify that the HS2 client's GetLog() call and the ExecSummary expose
the query's queuing status, that is, whether the query was queued and what was the
latest queuing reason."""
# Start a long running query.
long_query_resp = self.execute_statement("select sleep(10000)")
# Ensure that the query has started executing.
self.wait_for_admission_control(long_query_resp.operationHandle)
# Submit another query.
queued_query_resp = self.execute_statement("select 1")
# Wait until the query is queued.
self.wait_for_operation_state(queued_query_resp.operationHandle,
TCLIService.TOperationState.PENDING_STATE)
# Check whether the query log message correctly exposes the queuing status.
get_log_req = TCLIService.TGetLogReq()
get_log_req.operationHandle = queued_query_resp.operationHandle
log = self.hs2_client.GetLog(get_log_req).log
assert "Admission result : Queued" in log, log
assert "Latest admission queue reason : number of running queries 1 is at or over "
"limit 1" in log, log
# Now check the same for ExecSummary.
summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq()
summary_req.operationHandle = queued_query_resp.operationHandle
summary_req.sessionHandle = self.session_handle
exec_summary_resp = self.hs2_client.GetExecSummary(summary_req)
assert exec_summary_resp.summary.is_queued
assert "number of running queries 1 is at or over limit 1" in \
exec_summary_resp.summary.queued_reason,\
exec_summary_resp.summary.queued_reason
# Close the running query.
self.close(long_query_resp.operationHandle)
# Close the queued query.
self.close(queued_query_resp.operationHandle)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=3,
pool_max_mem=1024 * 1024 * 1024) +
" --admission_control_stale_topic_threshold_ms={0}".format(
STALE_TOPIC_THRESHOLD_MS),
statestored_args=_STATESTORED_ARGS)
def test_statestore_outage(self):
"""Test behaviour with a failed statestore. Queries should continue to be admitted
but we should generate diagnostics about the stale topic."""
self.cluster.statestored.kill()
impalad = self.cluster.impalads[0]
# Sleep until the update should be definitely stale.
sleep(STALE_TOPIC_THRESHOLD_MS / 1000. * 1.5)
ac_json = impalad.service.get_debug_webpage_json('/admission')
ms_since_update = ac_json["statestore_admission_control_time_since_last_update_ms"]
assert ms_since_update > STALE_TOPIC_THRESHOLD_MS
assert ("Warning: admission control information from statestore is stale:" in
ac_json["statestore_update_staleness_detail"])
# Submit a batch of queries. One should get to run, one will be rejected because
# of the full queue, and the others will run after being queued.
STMT = "select sleep(100)"
TIMEOUT_S = 60
NUM_QUERIES = 5
profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
TIMEOUT_S, allow_query_failure=True)
ADMITTED_STALENESS_WARNING = \
"Warning: admission control information from statestore is stale"
ADMITTED_STALENESS_PROFILE_ENTRY = \
"Admission control state staleness: " + ADMITTED_STALENESS_WARNING
num_queued = 0
num_admitted_immediately = 0
num_rejected = 0
for profile in profiles:
if "Admission result: Admitted immediately" in profile:
assert ADMITTED_STALENESS_PROFILE_ENTRY in profile, profile
num_admitted_immediately += 1
elif "Admission result: Rejected" in profile:
num_rejected += 1
# Check that the rejection error returned to the client contains a warning.
query_statuses = [line for line in profile.split("\n")
if "Query Status:" in line]
assert len(query_statuses) == 1, profile
assert ADMITTED_STALENESS_WARNING in query_statuses[0]
else:
assert "Admission result: Admitted (queued)" in profile, profile
assert ADMITTED_STALENESS_PROFILE_ENTRY in profile, profile
# Check that the queued reason contains a warning.
queued_reasons = [line for line in profile.split("\n")
if "Initial admission queue reason:" in line]
assert len(queued_reasons) == 1, profile
assert ADMITTED_STALENESS_WARNING in queued_reasons[0]
num_queued += 1
assert num_admitted_immediately == 1
assert num_queued == 3
assert num_rejected == NUM_QUERIES - num_admitted_immediately - num_queued
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-test2.xml",
llama_site_file="llama-site-test2.xml"),
statestored_args=_STATESTORED_ARGS)
def test_scalable_config(self, vector):
"""
Test that the scalable configuration parameters scale as the cluster size changes.
"""
# Start with 3 Impalads
coordinator = self.cluster.impalads[0]
impalad_1 = self.cluster.impalads[1]
impalad_2 = self.cluster.impalads[2]
self.__check_admission_by_counts(expected_num_impalads=3)
# The mem_limit values that are passed to __check_admission_by_memory are based on
# the memory used by the query when run on clusters of varying sizes, but mem_limit
# is used in the test to make the test deterministic in the presence of changing
# memory estimates.
self.__check_admission_by_memory(True, '85M')
# Kill an Impalad, now there are 2.
impalad_1.kill_and_wait_for_exit()
coordinator.service.wait_for_num_known_live_backends(2)
self.__check_admission_by_counts(expected_num_impalads=2)
self.__check_admission_by_memory(False, '125M',
'is greater than pool max mem resources 200.00 MB'
' (calculated as 2 backends each with 100.00 MB)')
# Restart an Impalad, now there are 3 again.
impalad_1.start(wait_until_ready=True)
coordinator.service.wait_for_num_known_live_backends(3)
self.__check_admission_by_counts(expected_num_impalads=3)
self.__check_admission_by_memory(True, '85M')
# Kill 2 Impalads, now there are 1.
impalad_1.kill_and_wait_for_exit()
impalad_2.kill_and_wait_for_exit()
coordinator.service.wait_for_num_known_live_backends(1)
self.__check_admission_by_counts(expected_num_impalads=1)
self.__check_admission_by_memory(False, '135M',
'is greater than pool max mem resources 100.00 MB'
' (calculated as 1 backends each with 100.00 MB)')
# Restart 2 Impalads, now there are 3 again.
impalad_1.start(wait_until_ready=True)
impalad_2.start(wait_until_ready=True)
coordinator.service.wait_for_num_known_live_backends(3)
self.__check_admission_by_counts(expected_num_impalads=3)
self.__check_admission_by_memory(True, '85M')
def __check_admission_by_memory(self, expected_admission, mem_limit,
expected_rejection_reason=None):
"""
Test if a query can run against the current cluster.
:param mem_limit set in the client configuration to limit query memory.
:param expected_admission: True if admission is expected.
:param expected_rejection_reason: a string expected to be in the reason for rejection.
"""
query = "select * from functional.alltypesagg order by int_col limit 1"
profiles = self._execute_and_collect_profiles([query], timeout_s=60,
allow_query_failure=True, config_options={'request_pool': 'root.queueE',
'mem_limit': mem_limit})
assert len(profiles) == 1
profile = profiles[0]
if "Admission result: Admitted immediately" in profile:
did_admit = True
elif "Admission result: Rejected" in profile:
did_admit = False
num_rejected, rejected_reasons = self.parse_profiles_rejected(profiles)
assert num_rejected == 1
assert expected_rejection_reason is not None, rejected_reasons[0]
assert expected_rejection_reason in rejected_reasons[0], profile
else:
assert "Admission result: Admitted (queued)" in profile, profile
assert 0, "should not queue based on memory"
assert did_admit == expected_admission, profile
def __check_admission_by_counts(self, expected_num_impalads):
"""
Run some queries, find how many were admitted, queued or rejected, and check that
AdmissionController correctly enforces query count limits based in the configuration
in llama-site-test2.xml.
"""
NUM_QUERIES = 6
# Set expected values based on expected_num_impalads.
# We can run 1 query per backend for queueE from llama-site-test2.xml.
expected_num_admitted = expected_num_impalads
# The value of max-queued-queries-multiple for queueE from llama-site-test2.xml.
QUERIES_MULTIPLE = 0.6
expected_num_queued = int(math.ceil(expected_num_impalads * QUERIES_MULTIPLE))
expected_num_rejected = NUM_QUERIES - (expected_num_admitted + expected_num_queued)
impalad = self.cluster.impalads[0]
client = impalad.service.create_beeswax_client()
client.set_configuration({'request_pool': 'root.queueE'})
result = client.execute("select 1")
# Query should execute in queueE.
self.__check_query_options(result.runtime_profile, ['REQUEST_POOL=root.queueE'])
STMT = "select sleep(1000)"
TIMEOUT_S = 60
profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
TIMEOUT_S, allow_query_failure=True,
config_options={'request_pool': 'root.queueE'})
# Check admitted queries
num_admitted_immediately = self.parse_profiles_admitted(profiles)
assert num_admitted_immediately == expected_num_admitted
# Check queued queries.
num_queued, queued_reasons = self.parse_profiles_queued(profiles)
assert num_queued == expected_num_queued
assert len(queued_reasons) == num_queued
expected_queue_reason = (
"number of running queries {0} is at or over limit {1}".format(
expected_num_admitted, expected_num_admitted)
)
# The first query to get queued sees expected_queue_reason.
assert len([s for s in queued_reasons if
expected_queue_reason in s]) == 1
# Subsequent queries that are queued see that the queue is not empty.
expected_see_non_empty_queue = max(expected_num_queued - 1, 0)
assert len([s for s in queued_reasons if
"queue is not empty" in s]) == expected_see_non_empty_queue
# Check rejected queries
num_rejected, rejected_reasons = self.parse_profiles_rejected(profiles)
assert num_rejected == expected_num_rejected
expected_rejection_reason = (
"Rejected query from pool root.queueE: queue full, "
"limit={0} (calculated as {1} backends each with 0.6 queries)".format(
expected_num_queued, expected_num_impalads)
)
assert len([s for s in rejected_reasons if
expected_rejection_reason in s]) == expected_num_rejected
def parse_profiles_queued(self, profiles):
"""
Parse a list of Profile strings and sum the counts of queries queued.
:param profiles: a list of query profiles to parse.
:return: The number queued.
"""
num_queued = 0
queued_reasons_return = []
for profile in profiles:
if "Admission result: Admitted (queued)" in profile:
queued_reasons = [line for line in profile.split("\n")
if "Initial admission queue reason:" in line]
assert len(queued_reasons) == 1, profile
num_queued += 1
queued_reasons_return.append(queued_reasons[0])
return num_queued, queued_reasons_return
def parse_profiles_admitted(self, profiles):
"""
Parse a list of Profile strings and sum the counts of queries admitted immediately.
:param profiles: a list of query profiles to parse.
:return: The number admitted immediately.
"""
num_admitted_immediately = 0
for profile in profiles:
if "Admission result: Admitted immediately" in profile:
num_admitted_immediately += 1
return num_admitted_immediately
def parse_profiles_rejected(self, profiles):
"""
Parse a list of Profile strings and sum the counts of queries rejected.
:param profiles: a list of query profiles to parse.
:return: The number rejected.
"""
num_rejected = 0
rejected_reasons_return = []
for profile in profiles:
if "Admission result: Rejected" in profile:
num_rejected += 1
query_statuses = [line for line in profile.split("\n")
if "Query Status:" in line]
assert len(query_statuses) == 1, profile
rejected_reasons_return.append(query_statuses[0])
return num_rejected, rejected_reasons_return
@pytest.mark.execute_serially
def test_impala_server_startup_delay(self):
"""This test verifies that queries get queued when the coordinator has already started
accepting client connections during startup, but the local backend descriptor is not
yet available."""
server_start_delay_s = 20
# We need to start the cluster here instead of during setup_method() so we can launch
# it from a separate thread.
def start_cluster():
LOG.info("Starting cluster")
impalad_args = "--debug_actions=IMPALA_SERVER_END_OF_START:SLEEP@%s" % (
1000 * server_start_delay_s)
self._start_impala_cluster(['--impalad_args=%s' % impalad_args])
# Initiate the cluster start
start_cluster_thread = threading.Thread(target=start_cluster)
start_cluster_thread.start()
# Wait some time to arrive at IMPALA_SERVER_END_OF_START
sleep(server_start_delay_s)
# With a new client, execute a query and observe that it gets queued and ultimately
# succeeds.
client = self.create_impala_client()
result = self.execute_query_expect_success(client, "select 1")
start_cluster_thread.join()
profile = result.runtime_profile
reasons = self.__extract_init_queue_reasons([profile])
assert len(reasons) == 1
assert "Local backend has not started up yet." in reasons[0]
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
def test_release_backends(self, vector):
"""Test that executor backends are shutdown when they complete, that completed
executor backends release their admitted memory, and that
NumCompletedBackends is updated each time an executor backend completes."""
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
# Craft a query where part of the executor backends completes, while the rest remain
# running indefinitely. The query forces the 'lineitem' table to be treated as the
# small table even though it is bigger than the 'customer' table. This forces the
# small table scan ('lineitem' scan) to run on two nodes and the big table scan
# ('customers' scan) to run on a single node. By using debug actions to force the
# big table scan to hang indefinitely, the small table scan should finish quickly.
# This causes one executor backend to complete quickly, and causes the other one to
# hang.
vector.get_value('exec_option')['debug_action'] = '0:GETNEXT:WAIT'
query = "select STRAIGHT_JOIN * from tpch.customer JOIN /* +BROADCAST */ " \
"tpch.lineitem where customer.c_custkey = lineitem.l_orderkey limit 100"
# Amount of time to wait for the query to reach the running state before throwing a
# Timeout exception.
timeout = 10
handle = self.execute_query_async(query, vector.get_value('exec_option'))
try:
# Wait for the query to reach the running state (it should never reach the finished
# state because of the 'WAIT' debug action), wait for the 'lineitem' scan to
# complete, and then validate that one of the executor backends shutdowns and
# releases its admitted memory.
self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], timeout)
# Once the 'lineitem' scan completes, NumCompletedBackends should be 1.
self.assert_eventually(60, 1, lambda: "NumCompletedBackends: 1 (1)"
in self.client.get_runtime_profile(handle))
get_num_completed_backends(self.cluster.impalads[0].service,
handle.get_handle().id) == 1
mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
num_executor_zero_admitted = 0
for executor_mem_admitted in mem_admitted['executor']:
if executor_mem_admitted == 0:
num_executor_zero_admitted += 1
assert num_executor_zero_admitted == 1
finally:
# Once the query is closed, validate that all backends have shutdown.
self.client.close_query(handle)
mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
assert mem_admitted['coordinator'] == 0
for executor_mem_admitted in mem_admitted['executor']:
assert executor_mem_admitted == 0
class TestAdmissionControllerStress(TestAdmissionControllerBase):
"""Submits a number of queries (parameterized) with some delay between submissions
(parameterized) and the ability to submit to one impalad or many in a round-robin
fashion. Each query is submitted on a separate thread. After admission, the query
thread will block with the query open and wait for the main thread to notify it to
end its query. The query thread can end its query by fetching to the end, cancelling
itself, closing itself, or waiting for the query timeout to take effect. Depending
on the test parameters a varying number of queries will be admitted, queued, and
rejected. After the queries are admitted, the main thread will request each admitted
query thread to end its query and allow queued queries to be admitted.
The test tracks the state of the admission controller using the metrics from each
impalad to do the following:
(1) After submitting all queries, the change in metrics for the number of admitted,
queued, and rejected requests should sum to the number of queries and that the
values are reasonable given the test parameters.
(2) While there are running queries:
* Request the currently running queries to end and wait for the queries to end.
Verify the metric for the number of completed queries. The threads that
submitted those queries will keep their connections open until the entire test
completes. This verifies that admission control is tied to the end of the query
and does not depend on closing the connection.
* Check that queued requests are then dequeued and verify using the metric for the
number of dequeued requests. The threads that were waiting to submit the query
should then insert themselves into a list of currently running queries and then
wait for a notification from the main thread.
(3) After all queries have completed, check that the final number of admitted,
queued, and rejected requests are reasonable given the test parameters. When
submitting to a single impalad, we know exactly what the values should be,
otherwise we just check that they are within reasonable bounds.
"""
@classmethod
def add_test_dimensions(cls):
super(TestAdmissionControllerStress, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('num_queries', *NUM_QUERIES))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('round_robin_submission', *ROUND_ROBIN_SUBMISSION))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('submission_delay_ms', *SUBMISSION_DELAY_MS))
# Additional constraints for code coverage jobs and core.
num_queries = None
if ImpalaTestClusterProperties.get_instance().has_code_coverage():
# Code coverage builds can't handle the increased concurrency.
num_queries = 15
elif cls.exploration_strategy() == 'core':
num_queries = 30
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('submission_delay_ms') == 0)
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('round_robin_submission'))
if num_queries is not None:
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('num_queries') == num_queries)
def setup(self):
# All threads are stored in this list and it's used just to make sure we clean up
# properly in teardown.
self.all_threads = list()
# Each submission thread will append() itself to this list if the query begins
# execution. The main thread will access this list to determine which threads are
# executing queries that can be cancelled (it will pop() elements from the front of
# the list). The individual operations on the list are atomic and thread-safe thanks
# to the GIL.
self.executing_threads = list()
def teardown(self):
# Set shutdown for all threads (cancel if needed)
for thread in self.all_threads:
try:
thread.lock.acquire()
thread.shutdown = True
if thread.query_handle is not None:
LOG.debug("Attempt to clean up thread executing query %s (state %s)",
thread.query_num, thread.query_state)
client = thread.impalad.service.create_beeswax_client()
try:
client.cancel(thread.query_handle)
finally:
client.close()
finally:
thread.lock.release()
# Wait for all threads to exit
for thread in self.all_threads:
thread.join(5)
LOG.debug("Join thread for query num %s %s", thread.query_num,
"TIMED OUT" if thread.isAlive() else "")
def get_admission_metrics(self):
"""
Returns a map of the admission metrics, aggregated across all of the impalads.
The metrics names are shortened for brevity: 'admitted', 'queued', 'dequeued',
'rejected', 'released', and 'timed-out'.
"""
metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected': 0,
'released': 0, 'timed-out': 0}
for impalad in self.impalads:
keys = [metric_key(self.pool_name, 'total-%s' % short_name)
for short_name in metrics.keys()]
values = impalad.service.get_metric_values(keys, [0] * len(keys))
for short_name, value in zip(metrics.keys(), values):
metrics[short_name] += value
return metrics
def get_consistent_admission_metrics(self, num_submitted):
"""Same as get_admission_metrics() except retries until it gets consistent metrics for
num_submitted queries. See IMPALA-6227 for an example of problems with inconsistent
metrics where a dequeued query is reflected in dequeued but not admitted."""
ATTEMPTS = 5
for i in xrange(ATTEMPTS):
metrics = self.get_admission_metrics()
admitted_immediately = num_submitted - metrics['queued'] - metrics['rejected']
if admitted_immediately + metrics['dequeued'] == metrics['admitted']:
return metrics
LOG.info("Got inconsistent metrics {0}".format(metrics))
assert False, "Could not get consistent metrics for {0} queries after {1} attempts: "\
"{2}".format(num_submitted, ATTEMPTS, metrics)
def wait_for_metric_changes(self, metric_names, initial, expected_delta):
"""
Waits for the sum of metrics in metric_names to change by at least expected_delta.
This is similar to ImpalaService.wait_for_metric_value(), but it uses one or more
metrics aggregated across all impalads, e.g. we want to wait for the total number of
admitted, queued, and rejected metrics to change some amount in total, but we don't
know exactly how the metrics will change individually.
'metric_names' is a list of the keys returned by get_admission_metrics() which are
expected to change.
'initial' is the initial set of metrics returned by get_admission_metrics() to
compare against.
'expected_delta' is the total change expected across all impalads for the specified
metrics.
"""
log_metrics("wait_for_metric_changes, initial=", initial)
current = initial
start_time = time()
while True:
current = self.get_admission_metrics()
log_metrics("wait_for_metric_changes, current=", current)
deltas = compute_metric_deltas(current, initial)
delta_sum = sum([deltas[x] for x in metric_names])
LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",
delta_sum, deltas, expected_delta, metric_names)
if delta_sum >= expected_delta:
LOG.info("Found all %s metrics after %s seconds", delta_sum,
round(time() - start_time, 1))
return (deltas, current)
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting {0} seconds for metrics {1} delta {2} "\
"current {3} initial {4}" .format(
STRESS_TIMEOUT, ','.join(metric_names), expected_delta, str(current),
str(initial))
sleep(1)
def wait_for_statestore_updates(self, heartbeats):
"""Waits for a number of admission control statestore updates from all impalads."""
start_time = time()
init = dict()
curr = dict()
for impalad in self.impalads:
init[impalad] = impalad.service.get_metric_value(
REQUEST_QUEUE_UPDATE_INTERVAL)['count']
curr[impalad] = init[impalad]
while True:
LOG.debug("wait_for_statestore_updates: curr=%s, init=%s, d=%s", curr.values(),
init.values(), [curr[i] - init[i] for i in self.impalads])
if all([curr[i] - init[i] >= heartbeats for i in self.impalads]): break
for impalad in self.impalads:
curr[impalad] = impalad.service.get_metric_value(
REQUEST_QUEUE_UPDATE_INTERVAL)['count']
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))
LOG.info("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats)
def wait_for_admitted_threads(self, num_threads):
"""
Wait for query submission threads to update after being admitted, as determined
by observing metric changes. This is necessary because the metrics may change
before the execute_async() calls on the query threads return and add themselves
to self.executing_threads.
"""
start_time = time()
LOG.info("Waiting for %s threads to begin execution", num_threads)
# All individual list operations are thread-safe, so we don't need to use a
# lock to synchronize before checking the list length (on which another thread
# may call append() concurrently).
while len(self.executing_threads) < num_threads:
assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s seconds for "
"%s admitted client rpcs to return. Only %s executing " % (
STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
sleep(0.1)
LOG.info("Found all %s admitted threads after %s seconds", num_threads,
round(time() - start_time, 1))
def end_admitted_queries(self, num_queries):
"""
Requests each admitted query to end its query.
"""
assert len(self.executing_threads) >= num_queries
LOG.info("Requesting {0} clients to end queries".format(num_queries))
# Request admitted clients to end their queries
current_executing_queries = []
for i in xrange(num_queries):
# pop() is thread-safe, it's OK if another thread is appending concurrently.
thread = self.executing_threads.pop(0)
LOG.info("Cancelling query %s", thread.query_num)
assert thread.query_state == 'ADMITTED'
current_executing_queries.append(thread)
thread.query_state = 'REQUEST_QUERY_END'
# Wait for the queries to end
start_time = time()
while True:
all_done = True
for thread in self.all_threads:
if thread.query_state == 'REQUEST_QUERY_END':
all_done = False
if all_done:
break
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for query end" % (STRESS_TIMEOUT,)
sleep(1)
class SubmitQueryThread(threading.Thread):
def __init__(self, impalad, additional_query_options, vector, query_num,
query_end_behavior, executing_threads):
"""
executing_threads must be provided so that this thread can add itself when the
query is admitted and begins execution.
"""
super(self.__class__, self).__init__()
self.executing_threads = executing_threads
self.vector = vector
self.additional_query_options = additional_query_options
self.query_num = query_num
self.query_end_behavior = query_end_behavior
self.impalad = impalad
self.error = None
# query_state is defined and used only by the test code, not a property exposed by
# the server
self.query_state = 'NOT_SUBMITTED'
# lock protects query_handle and shutdown, used by the main thread in teardown()
self.lock = threading.RLock()
self.query_handle = None
self.shutdown = False # Set by the main thread when tearing down
def run(self):
client = None
try:
try:
# Take the lock while query_handle is being created to avoid an unlikely race
# condition with teardown() (i.e. if an error occurs on the main thread), and
# check if the test is already shut down.
self.lock.acquire()
if self.shutdown:
return
exec_options = self.vector.get_value('exec_option')
exec_options.update(self.additional_query_options)
query = QUERY.format(self.query_num)
self.query_state = 'SUBMITTING'
client = self.impalad.service.create_beeswax_client()
ImpalaTestSuite.change_database(client, self.vector.get_value('table_format'))
client.set_configuration(exec_options)
if self.query_end_behavior == 'QUERY_TIMEOUT':
client.execute("SET QUERY_TIMEOUT_S={0}".format(QUERY_END_TIMEOUT_S))
LOG.info("Submitting query %s", self.query_num)
self.query_handle = client.execute_async(query)
client.wait_for_admission_control(self.query_handle)
admission_result = client.get_admission_result(self.query_handle)
assert len(admission_result) > 0
if "Rejected" in admission_result:
LOG.info("Rejected query %s", self.query_num)
self.query_state = 'REJECTED'
self.query_handle = None
return
elif "Timed out" in admission_result:
LOG.info("Query %s timed out", self.query_num)
self.query_state = 'TIMED OUT'
self.query_handle = None
return
LOG.info("Admission result for query %s : %s", self.query_num, admission_result)
except ImpalaBeeswaxException as e:
LOG.exception(e)
raise e
finally:
self.lock.release()
LOG.info("Admitted query %s", self.query_num)
self.query_state = 'ADMITTED'
# The thread becomes visible to the main thread when it is added to the
# shared list of executing_threads. append() is atomic and thread-safe.
self.executing_threads.append(self)
# Synchronize with the main thread. At this point, the thread is executing a
# query. It needs to wait until the main thread requests it to end its query.
while not self.shutdown:
# The QUERY_TIMEOUT needs to stay active until the main thread requests it
# to end. Otherwise, the query may get cancelled early. Fetch rows 2 times
# per QUERY_TIMEOUT interval to keep the query active.
if self.query_end_behavior == 'QUERY_TIMEOUT' and \
self.query_state != 'COMPLETED':
fetch_result = client.fetch(query, self.query_handle, 1)
assert len(fetch_result.data) == 1, str(fetch_result)
if self.query_state == 'REQUEST_QUERY_END':
self._end_query(client, query)
# The query has released admission control resources
self.query_state = 'COMPLETED'
self.query_handle = None
sleep(QUERY_END_TIMEOUT_S * 0.5)
except Exception as e:
LOG.exception(e)
# Unknown errors will be raised later
self.error = e
self.query_state = 'ERROR'
finally:
LOG.info("Thread terminating in state=%s", self.query_state)
if client is not None:
client.close()
def _end_query(self, client, query):
"""Bring the query to the appropriate end state defined by self.query_end_behaviour.
Returns once the query has reached that state."""
LOG.info("Ending query %s by %s",
str(self.query_handle.get_handle()), self.query_end_behavior)
if self.query_end_behavior == 'QUERY_TIMEOUT':
# Sleep and wait for the query to be cancelled. The cancellation will
# set the state to EXCEPTION.
start_time = time()
while (client.get_state(self.query_handle) !=
client.QUERY_STATES['EXCEPTION']):
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
sleep(1)
elif self.query_end_behavior == 'EOS':
# Fetch all rows so we hit eos.
client.fetch(query, self.query_handle)
elif self.query_end_behavior == 'CLIENT_CANCEL':
client.cancel(self.query_handle)
else:
assert self.query_end_behavior == 'CLIENT_CLOSE'
client.close_query(self.query_handle)
def _check_queries_page_resource_pools(self):
"""Checks that all queries in the '/queries' webpage json have the correct resource
pool (this is called after all queries have been admitted, queued, or rejected, so
they should already have the pool set), or no pool for queries that don't go through
admission control."""
for impalad in self.impalads:
queries_json = impalad.service.get_debug_webpage_json('/queries')
for query in itertools.chain(queries_json['in_flight_queries'],
queries_json['completed_queries']):
if query['stmt_type'] == 'QUERY' or query['stmt_type'] == 'DML':
assert query['last_event'] != 'Registered' and \
query['last_event'] != 'Planning finished'
assert query['resource_pool'] == self.pool_name
else:
assert query['resource_pool'] == ''
def _get_queries_page_num_queued(self):
"""Returns the number of queries currently in the 'queued' state from the '/queries'
webpage json"""
num_queued = 0
for impalad in self.impalads:
queries_json = impalad.service.get_debug_webpage_json('/queries')
for query in queries_json['in_flight_queries']:
if query['last_event'] == 'Queued':
num_queued += 1
return num_queued
def run_admission_test(self, vector, additional_query_options):
LOG.info("Starting test case with parameters: %s", vector)
self.impalads = self.cluster.impalads
round_robin_submission = vector.get_value('round_robin_submission')
submission_delay_ms = vector.get_value('submission_delay_ms')
if not round_robin_submission:
self.impalads = [self.impalads[0]]
num_queries = vector.get_value('num_queries')
assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
initial_metrics = self.get_admission_metrics()
log_metrics("Initial metrics: ", initial_metrics)
for query_num in xrange(num_queries):
impalad = self.impalads[query_num % len(self.impalads)]
query_end_behavior = QUERY_END_BEHAVIORS[query_num % len(QUERY_END_BEHAVIORS)]
thread = self.SubmitQueryThread(impalad, additional_query_options, vector,
query_num, query_end_behavior, self.executing_threads)
thread.start()
self.all_threads.append(thread)
sleep(submission_delay_ms / 1000.0)
# Wait for the admission control to make the initial admission decision for all of
# the queries. They should either be admitted immediately, queued, or rejected.
# The test query is chosen that it with remain active on all backends until the test
# ends the query. This prevents queued queries from being dequeued in the background
# without this thread explicitly ending them, so that the test can admit queries in
# discrete waves.
LOG.info("Wait for initial admission decisions")
(metric_deltas, curr_metrics) = self.wait_for_metric_changes(
['admitted', 'queued', 'rejected'], initial_metrics, num_queries)
# Also wait for the test threads that submitted the queries to start executing.
self.wait_for_admitted_threads(metric_deltas['admitted'])
# Check that the admission decisions are reasonable given the test parameters
# The number of admitted and queued requests should be at least the configured limits
# but less than or equal to those limits times the number of impalads.
assert metric_deltas['dequeued'] == 0,\
"Queued queries should not run until others are made to finish"
assert metric_deltas['admitted'] >= MAX_NUM_CONCURRENT_QUERIES,\
"Admitted fewer than expected queries"
assert metric_deltas['admitted'] <= MAX_NUM_CONCURRENT_QUERIES * len(self.impalads),\
"Admitted more than expected queries: at least one daemon over-admitted"
assert metric_deltas['queued'] >=\
min(num_queries - metric_deltas['admitted'], MAX_NUM_QUEUED_QUERIES),\
"Should have queued more queries before rejecting them"
assert metric_deltas['queued'] <= MAX_NUM_QUEUED_QUERIES * len(self.impalads),\
"Queued too many queries: at least one daemon queued too many"
assert metric_deltas['rejected'] + metric_deltas['admitted'] +\
metric_deltas['queued'] == num_queries,\
"Initial admission decisions don't add up to {0}: {1}".format(
num_queries, str(metric_deltas))
initial_metric_deltas = metric_deltas
# Like above, check that the count from the queries webpage json is reasonable.
queries_page_num_queued = self._get_queries_page_num_queued()
assert queries_page_num_queued >=\
min(num_queries - metric_deltas['admitted'], MAX_NUM_QUEUED_QUERIES)
assert queries_page_num_queued <= MAX_NUM_QUEUED_QUERIES * len(self.impalads)
self._check_queries_page_resource_pools()
# Admit queries in waves until all queries are done. A new wave of admission
# is started by killing some of the running queries.
while len(self.executing_threads) > 0:
curr_metrics = self.get_consistent_admission_metrics(num_queries)
log_metrics("Main loop, curr_metrics: ", curr_metrics)
num_to_end = len(self.executing_threads)
LOG.info("Main loop, will request %s queries to end", num_to_end)
self.end_admitted_queries(num_to_end)
self.wait_for_metric_changes(['released'], curr_metrics, num_to_end)
num_queued_remaining =\
curr_metrics['queued'] - curr_metrics['dequeued'] - curr_metrics['timed-out']
expected_admitted = min(num_queued_remaining, MAX_NUM_CONCURRENT_QUERIES)
(metric_deltas, _) = self.wait_for_metric_changes(
['admitted', 'timed-out'], curr_metrics, expected_admitted)
# The queue timeout is set high for these tests, so we don't expect any queries to
# time out.
assert metric_deltas['admitted'] >= expected_admitted
assert metric_deltas['timed-out'] == 0
self.wait_for_admitted_threads(metric_deltas['admitted'])
# Wait a few topic updates to ensure the admission controllers have reached a steady
# state or we may find an impalad dequeue more requests after we capture metrics.
self.wait_for_statestore_updates(10)
final_metrics = self.get_consistent_admission_metrics(num_queries)
log_metrics("Final metrics: ", final_metrics)
metric_deltas = compute_metric_deltas(final_metrics, initial_metrics)
assert metric_deltas['timed-out'] == 0
if round_robin_submission:
min_expected_admitted = MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
assert metric_deltas['admitted'] >= min_expected_admitted
assert metric_deltas['admitted'] <= min_expected_admitted * len(self.impalads)
assert metric_deltas['admitted'] ==\
initial_metric_deltas['admitted'] + initial_metric_deltas['queued']
assert metric_deltas['queued'] == initial_metric_deltas['queued']
assert metric_deltas['rejected'] == initial_metric_deltas['rejected']
else:
# We shouldn't go over the max number of queries or queue size so we can compute
# the expected number of queries that should have been admitted (which includes the
# number queued as they eventually get admitted as well), queued, and rejected
expected_admitted = MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
assert metric_deltas['admitted'] == expected_admitted
assert metric_deltas['queued'] == MAX_NUM_QUEUED_QUERIES
assert metric_deltas['rejected'] == num_queries - expected_admitted
# All queries should be completed by now.
queries_page_num_queued = self._get_queries_page_num_queued()
assert queries_page_num_queued == 0
self._check_queries_page_resource_pools()
for thread in self.all_threads:
if thread.error is not None:
raise thread.error
@pytest.mark.execute_serially
@SkipIfOS.redhat6
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=MAX_NUM_CONCURRENT_QUERIES,
max_queued=MAX_NUM_QUEUED_QUERIES, pool_max_mem=-1, queue_wait_timeout_ms=600000),
statestored_args=_STATESTORED_ARGS)
def test_admission_controller_with_flags(self, vector):
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
self.pool_name = 'default-pool'
# The pool has no mem resources set, so submitting queries with huge mem_limits
# should be fine. This exercises the code that does the per-pool memory
# accounting (see MemTracker::GetPoolMemReserved()) without actually being throttled.
self.run_admission_test(vector, {'request_pool': self.pool_name,
'mem_limit': sys.maxint})
@pytest.mark.execute_serially
@SkipIfOS.redhat6
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-test2.xml",
llama_site_file="llama-site-test2.xml"),
statestored_args=_STATESTORED_ARGS)
def test_admission_controller_with_configs(self, vector):
self.pool_name = 'root.queueB'
self.run_admission_test(vector, {'request_pool': self.pool_name})
def get_proc_limit(self):
"""Gets the process mem limit as reported by the impalad's mem-tracker metric.
Raises an assertion if not all impalads have the same value."""
limit_metrics = []
for impalad in self.cluster.impalads:
limit_metrics.append(impalad.service.get_metric_value("mem-tracker.process.limit"))
assert limit_metrics[0] == limit_metrics[-1],\
"Not all impalads have the same process limit: %s" % (limit_metrics,)
assert limit_metrics[0] is not None
return limit_metrics[0]
@pytest.mark.execute_serially
@SkipIfOS.redhat6
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(
max_requests=MAX_NUM_CONCURRENT_QUERIES * 30, max_queued=MAX_NUM_QUEUED_QUERIES,
pool_max_mem=MEM_TEST_LIMIT, proc_mem_limit=MEM_TEST_LIMIT,
queue_wait_timeout_ms=600000),
statestored_args=_STATESTORED_ARGS)
def test_mem_limit(self, vector):
# Impala may set the proc mem limit lower than we think depending on the overcommit
# settings of the OS. It should be fine to continue anyway.
proc_limit = self.get_proc_limit()
if proc_limit != MEM_TEST_LIMIT:
LOG.info("Warning: Process mem limit %s is not expected val %s", proc_limit,
MEM_TEST_LIMIT)
self.pool_name = 'default-pool'
# Each query mem limit (set the query option to override the per-host memory
# estimate) should use a bit less than (total pool mem limit) / #queries so that
# once #queries are running, the total pool mem usage is about at the limit and
# additional incoming requests will be rejected. The actual pool limit on the number
# of running requests is very high so that requests are only queued/rejected due to
# the mem limit.
num_impalads = len(self.cluster.impalads)
query_mem_limit = (proc_limit / MAX_NUM_CONCURRENT_QUERIES / num_impalads) - 1
self.run_admission_test(vector,
{'request_pool': self.pool_name, 'mem_limit': query_mem_limit})