blob: 1d14c97900bab09dda8deaf5e3325e52180fe146 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import re
from getpass import getuser
from signal import SIGRTMIN
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_cluster import DEFAULT_KRPC_PORT
from tests.common.impala_connection import ERROR, FINISHED, PENDING
from tests.common.test_vector import HS2
from tests.util.workload_management import assert_query, redaction_rules_file
from time import sleep
class TestQueryLive(CustomClusterTestSuite):
"""Tests to assert the query live table is correctly populated.
This test class does not extend from WorkloadManagementTestSuite due to long
--query_log_write_interval_s requirement in most test method."""
@classmethod
def default_test_protocol(cls):
return HS2
def setup_method(self, method):
super(TestQueryLive, self).setup_method(method)
self.wait_for_wm_init_complete()
def assert_describe_extended(self):
describe_ext_result = self.execute_query('describe extended sys.impala_query_live')
# Alter can add additional event fields. Filter them out.
describe_ext_data = [
line for line in describe_ext_result.data if 'impala.events.catalog' not in line]
assert len(describe_ext_data) == 88
system_table_re = re.compile(r'__IMPALA_SYSTEM_TABLE\s+true')
assert list(filter(system_table_re.search, describe_ext_data))
external_re = re.compile(r'EXTERNAL\s+TRUE')
assert list(filter(external_re.search, describe_ext_data))
external_table_re = re.compile(r'Table Type:\s+EXTERNAL_TABLE')
assert list(filter(external_table_re.search, describe_ext_data))
def assert_impalads(self, profile, present=[0, 1, 2], absent=[]):
for port_idx in present:
assert ":" + str(DEFAULT_KRPC_PORT + port_idx) + ":" in profile
for port_idx in absent:
assert ":" + str(DEFAULT_KRPC_PORT + port_idx) not in profile
def assert_only_coordinators(self, profile, coords=[0, 1], execs=[2]):
self.assert_impalads(profile, coords, execs)
assert "SYSTEM_TABLE_SCAN_NODE (id=0) [{} instances]".format(len(coords)) in profile
def assert_fragment_instances(self, profile, expected):
"""Asserts that the per host number of fragment instances is as expected."""
hosts = ['{}({})'.format(DEFAULT_KRPC_PORT + i, expect)
for i, expect in enumerate(expected)]
actual_hosts = re.search(r'Per Host Number of Fragment Instances: (.*)', profile)
assert actual_hosts is not None
# Split and remove hostname
actual_hosts = [host.split(':')[1] for host in actual_hosts.group(1).split(' ')]
assert len(hosts) == len(actual_hosts)
for host in hosts:
if host in actual_hosts:
actual_hosts.remove(host)
else:
assert False, "did not find host {}".format(host)
assert len(actual_hosts) == 0, "did not find all expected hosts"
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live",
workload_mgmt=True,
disable_log_buffering=True)
def test_query_live_hs2(self):
"""Asserts the query live table shows and allows filtering queries. Uses the hs2
client to connect to Impala."""
# Use a query that reads data from disk for the 1st one, as more representative and a
# better fit for assert_query.
result1 = self.hs2_client.execute("select * from functional.alltypes",
fetch_profile_after_close=True)
assert_query('sys.impala_query_live', self.hs2_client, 'test_query_live',
result1.runtime_profile)
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live",
workload_mgmt=True,
disable_log_buffering=True)
def test_query_live(self):
"""Asserts the query live table shows and allows filtering queries. Uses the default
client to connect to Impala."""
# Use a query that reads data from disk for the 1st one, as more representative and a
# better fit for assert_query.
result1 = self.client.execute("select * from functional.alltypes",
fetch_profile_after_close=True)
assert_query('sys.impala_query_live', self.client, 'test_query_live',
result1.runtime_profile)
# pending queries
result2 = self.execute_query(
"select query_id, cluster_id from sys.impala_query_live order by start_time_utc")
assert len(result2.data) == 3
assert "{}\t{}".format(result1.query_id, "test_query_live") == result2.data[0]
assert "{}\t{}".format(result2.query_id, "test_query_live") == result2.data[2]
# Expect new metrics for the impala_query_live table scanner.
assert "ActiveQueryCollectionTime: " in result2.runtime_profile
assert "PendingQueryCollectionTime: " in result2.runtime_profile
# query filtering
result3 = self.execute_query(
"select query_id from sys.impala_query_live "
"where total_time_ms > 0.0 order by start_time_utc")
assert len(result3.data) == 4
assert result1.query_id == result3.data[0]
assert result2.query_id == result3.data[2]
assert result3.query_id == result3.data[3]
result4 = self.execute_query(
"select db_name, db_user, count(*) as query_count from sys.impala_query_live "
"group by db_name, db_user order by db_name")
assert len(result4.data) == 1
assert "default\t{}\t5".format(getuser()) == result4.data[0]
result5 = self.execute_query(
'select * from sys.impala_query_live where cluster_id = "test_query_live_0"')
assert len(result5.data) == 0
result = self.execute_query("""
select count(*) from functional.alltypestiny a
inner join functional.alltypes b on a.id = b.id
inner join functional.alltypessmall c on b.id = c.id
""")
result5 = self.execute_query(
'select tables_queried from sys.impala_query_live where query_id = "'
+ result.query_id + '"')
assert len(result5.data) == 1
assert result5.data[0] == \
"functional.alltypes,functional.alltypestiny,functional.alltypessmall"
# describe query
describe_result = self.execute_query('describe sys.impala_query_live')
assert len(describe_result.data) == 56
self.assert_describe_extended()
# show create table
show_create_tbl = self.execute_query('show create table sys.impala_query_live')
assert len(show_create_tbl.data) == 1
assert 'CREATE EXTERNAL TABLE sys.impala_query_live' in show_create_tbl.data[0]
assert "'__IMPALA_SYSTEM_TABLE'='true'" in show_create_tbl.data[0]
# cannot compute stats or perform write operations
compute_stats_result = self.execute_query_expect_failure(self.client,
'compute stats sys.impala_query_live')
assert 'AnalysisException: COMPUTE STATS not supported for system table: '\
'sys.impala_query_live' in str(compute_stats_result)
create_result = self.execute_query_expect_failure(self.client,
'create table sys.impala_query_live (i int)')
assert 'AnalysisException: Table already exists: sys.impala_query_live'\
in str(create_result)
insert_result = self.execute_query_expect_failure(self.client,
'insert into sys.impala_query_live select * from sys.impala_query_live limit 1')
assert 'UnsupportedOperationException: Cannot create data sink into table of type: '\
'org.apache.impala.catalog' in str(insert_result)
assert 'SystemTable' in str(insert_result)
update_result = self.execute_query_expect_failure(self.client,
'update sys.impala_query_live set query_id = ""')
assert 'AnalysisException: Impala only supports modifying Kudu and Iceberg tables, '\
'but the following table is neither: sys.impala_query_live'\
in str(update_result)
delete_result = self.execute_query_expect_failure(self.client,
'delete from sys.impala_query_live')
assert 'AnalysisException: Impala only supports modifying Kudu and Iceberg tables, '\
'but the following table is neither: sys.impala_query_live'\
in str(delete_result)
# Drop table at the end, it's only recreated on impalad startup.
self.execute_query_expect_success(self.client, 'drop table sys.impala_query_live')
# Must come directly after "drop table sys.impala_query_live"
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live "
"--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal",
workload_mgmt=True,
default_query_options=[
('default_transactional_type', 'insert_only')],
disable_log_buffering=True)
def test_default_transactional(self):
"""Asserts the query live table works when impala is started with
default_transactional_type=insert_only."""
result = self.client.execute("select * from functional.alltypes",
fetch_profile_after_close=True)
assert_query('sys.impala_query_live', self.client, 'test_query_live',
result.runtime_profile)
self.assert_describe_extended()
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live "
"--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal",
workload_mgmt=True,
disable_log_buffering=True)
def test_local_catalog(self):
"""Asserts the query live table works with local catalog mode."""
result = self.client.execute("select * from functional.alltypes",
fetch_profile_after_close=True)
assert_query('sys.impala_query_live', self.client, 'test_query_live',
result.runtime_profile)
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live "
"--redaction_rules_file={}"
.format(redaction_rules_file()),
workload_mgmt=True,
disable_log_buffering=True)
def test_redaction(self):
"""Asserts the query live table table redacts the statement."""
result = self.client.execute(
"select *, 'supercalifragilisticexpialidocious' from functional.alltypes",
fetch_profile_after_close=True)
assert_query('sys.impala_query_live', self.client, 'test_query_live',
result.runtime_profile)
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live",
workload_mgmt=True,
disable_log_buffering=True)
def test_alter(self):
"""Asserts alter works on query live table."""
column_desc = 'test_alter\tstring\t'
add_column = self.execute_query(
'alter table sys.impala_query_live add columns(test_alter string)')
assert add_column.data == ['New column(s) have been added to the table.']
try:
describe_column = self.execute_query('describe sys.impala_query_live')
assert len(describe_column.data) == 57
assert column_desc in describe_column.data
select_column = self.execute_query(
'select test_alter from sys.impala_query_live limit 1')
assert select_column.data == ['NULL']
self.assert_impalad_log_contains('WARNING', r'Unknown column \(position 56\)'
+ ' added to table IMPALA_QUERY_LIVE; check if a coordinator was upgraded')
finally:
# Ensure new column is dropped in case of test failure
drop_column = self.execute_query(
'alter table sys.impala_query_live drop test_alter')
assert drop_column.data == ['Column has been dropped.']
describe_column2 = self.execute_query('describe sys.impala_query_live')
assert len(describe_column2.data) == 56
assert column_desc not in describe_column2.data
select_column2 = self.execute_query('select * from sys.impala_query_live')
assert len(select_column2.data) > 1
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live",
workload_mgmt=True,
cluster_size=3,
num_exclusive_coordinators=2,
disable_log_buffering=True)
def test_dedicated_coordinators(self):
"""Asserts scans are performed only on coordinators."""
# Use a query that reads data from disk for the 1st one, as more representative and a
# better fit for assert_query.
result = self.client.execute("select * from functional.alltypes",
fetch_profile_after_close=True)
assert_query('sys.impala_query_live', self.client, 'test_query_live',
result.runtime_profile)
client2 = self.create_client_for_nth_impalad(1)
query = "select query_id, impala_coordinator from sys.impala_query_live " \
"order by start_time_utc"
handle1 = self.execute_query_async(query)
handle2 = client2.execute_async(query)
result1 = self.client.fetch(query, handle1)
result2 = client2.fetch(query, handle2)
assert len(result1.data) == 4
assert result1.data == result2.data
profile1 = self.client.get_runtime_profile(handle1)
self.assert_only_coordinators(profile1)
profile2 = client2.get_runtime_profile(handle2)
self.assert_only_coordinators(profile2)
self.close_query(handle1)
client2.close_query(handle2)
client2.close()
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live",
workload_mgmt=True,
cluster_size=3,
num_exclusive_coordinators=2,
disable_log_buffering=True)
def test_executor_groups(self):
"""Asserts scans are performed only on coordinators with multiple executor groups."""
# Add a (non-dedicated) coordinator and executor in a different executor group.
self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra '
'--enable_workload_mgmt '
'--query_log_write_interval_s=300 '
'--shutdown_grace_period_s=0 '
'--shutdown_deadline_s=15 '
'--cluster_id=test_query_live '],
cluster_size=1,
add_executors=True,
expected_num_impalads=4)
result = self.client.execute(
"select query_id, impala_coordinator from sys.impala_query_live",
fetch_profile_after_close=True)
assert len(result.data) == 1
self.assert_only_coordinators(result.runtime_profile, coords=[0, 1], execs=[2, 3])
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live",
workload_mgmt=True,
disable_log_buffering=True)
def test_query_entries_are_unique(self):
"""Asserts queries in the query live table are unique."""
# Start a query and close it with a delay between CloseClientRequestState and
# Unregister. Then query sys.impala_query_live from multiple coordinators.
client2 = self.create_client_for_nth_impalad(1)
async_handle = self.execute_query_async("select count(*) from functional.alltypes",
{"debug_action": "CLOSED_NOT_UNREGISTERED:SLEEP@1000"})
self.close_query(async_handle)
query = "select * from sys.impala_query_live order by start_time_utc"
handle = self.execute_query_async(query)
handle2 = client2.execute_async(query)
result = self.client.fetch(query, handle)
result2 = client2.fetch(query, handle2)
assert len(result.data) == 3
assert result.data[0] == result2.data[0]
def remove_dynamic_fields(fields):
# Excludes QUERY_STATE, IMPALA_QUERY_END_STATE, QUERY_TYPE, TOTAL_TIME_MS, and
# everything after QUERY_OPTS_CONFIG as they change over the course of compiling
# and running the query.
return fields[:10] + fields[13:15] + fields[16:17]
# Compare cluster_id and query_id. Not all fields will match as they're queried on a
# live query at different times. Order is only guaranteed with minicluster on a
# single machine, where they use the same clock.
data1a = remove_dynamic_fields(result.data[1].split('\t'))
data1b = remove_dynamic_fields(result.data[2].split('\t'))
data2a = remove_dynamic_fields(result2.data[1].split('\t'))
data2b = remove_dynamic_fields(result2.data[2].split('\t'))
assert data1a == data2a
assert data1b == data2b
self.close_query(handle)
client2.close_query(handle2)
client2.close()
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live ",
workload_mgmt=True,
cluster_size=3,
num_exclusive_coordinators=2,
disable_log_buffering=True)
def test_missing_coordinator(self):
"""Asserts scans finish if a coordinator disappears mid-schedule. Depends on
test config of statestore_heartbeat_frequency_ms=50."""
query = "select query_id, impala_coordinator from sys.impala_query_live"
handle = self.execute_query_async(query, query_options={
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@3000'})
# Wait for query to compile and assign ranges, then kill impalad
# in non-graceful manner during debug delay.
self.client.wait_for_impala_state(handle, PENDING, 3)
self.cluster.impalads[1].kill()
# Wait for query to pass admission control before fetching.
self.client.wait_for_any_impala_state(handle, [FINISHED, ERROR], 60)
result = self.client.fetch(query, handle)
assert len(result.data) == 1
expected_message = 'is no longer available for system table scan assignment'
self.assert_impalad_log_contains('WARNING', expected_message)
assert expected_message in self.client.get_runtime_profile(handle)
self.close_query(handle)
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live",
workload_mgmt=True,
disable_log_buffering=True)
def test_shutdown_coordinator(self):
"""Asserts query fails if a coordinator disappears after scheduling. Depends on
test config of statestore_heartbeat_frequency_ms=50."""
query = "select query_id, impala_coordinator from sys.impala_query_live"
handle = self.execute_query_async(query, query_options={
'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'})
# Wait for query to compile.
self.client.wait_for_impala_state(handle, PENDING, 3)
# Ensure enough time for scheduling to assign ranges.
sleep(1)
# Kill impalad in non-graceful manner during debug delay.
self.cluster.impalads[1].kill()
try:
# Wait for query to pass admission control before fetching.
self.client.wait_for_any_impala_state(handle, [FINISHED, ERROR], 60)
self.client.fetch(query, handle)
assert False, "fetch should fail"
except Exception as e:
assert "Network error: Client connection negotiation failed" in str(e)
# client closes the query on failure.
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live "
# Override flag set by
# workload_mgmt arg.
"--shutdown_grace_period_s=120",
workload_mgmt=True,
disable_log_buffering=True)
def test_graceful_shutdown_coordinator(self):
"""Asserts query succeeds if another coordinator is shutdown gracefully after
scheduling. Depends on test config of statestore_heartbeat_frequency_ms=50."""
query = "select query_id from sys.impala_query_live"
handle = self.execute_query_async(query, query_options={
'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'})
# Wait for query to compile and assign ranges, then gracefully shutdown impalad.
self.client.wait_for_impala_state(handle, PENDING, 3)
self.cluster.impalads[1].kill(SIGRTMIN)
self.client.wait_for_impala_state(handle, FINISHED, 10)
# Allow time for statestore update to propagate. Shutdown grace period is 120s.
sleep(1)
# Coordinator in graceful shutdown should not be scheduled in new queries.
shutdown = self.execute_query(query)
result = self.client.fetch(query, handle)
assert len(result.data) == 1
assert result.query_id == result.data[0]
self.client.get_runtime_profile(handle)
self.assert_impalads(self.client.get_runtime_profile(handle))
self.close_query(handle)
assert len(shutdown.data) == 2
self.assert_impalads(shutdown.runtime_profile, present=[0, 2], absent=[1])
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_live ",
workload_mgmt=True,
cluster_size=3,
num_exclusive_coordinators=2,
disable_log_buffering=True)
def test_multi_table_union(self):
"""Asserts only system table scan fragments are scheduled to coordinators."""
utc_timestamp = self.execute_query('select utc_timestamp()')
assert len(utc_timestamp.data) == 1
start_time = utc_timestamp.data[0]
# Wait for the query to be written to the log to ensure there's something in it."
logged = self.execute_query_expect_success(self.client,
'select count(*) from functional.alltypes')
self.cluster.get_first_impalad().service.wait_for_metric_value(
"impala-server.completed-queries.written", 2, 30, allow_greater=True)
query = """select query_id from
(select query_id, start_time_utc from sys.impala_query_live live
where start_time_utc > "{0}" union
select query_id, start_time_utc from sys.impala_query_log
where start_time_utc > "{0}")
as history order by start_time_utc""".format(start_time)
result = self.client.execute(query, fetch_profile_after_close=True)
assert len(result.data) == 3
assert utc_timestamp.query_id == result.data[0]
assert logged.query_id == result.data[1]
assert result.query_id == result.data[2]
# Unions run in a single fragment, and on the union of selected scan nodes (even
# though some may not have actual scan ranges to evaluate). So all nodes are involved
# in scans.
self.assert_fragment_instances(result.runtime_profile, [3, 2, 2])
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
"--cluster_id=test_query_live ",
workload_mgmt=True,
cluster_size=3,
num_exclusive_coordinators=2,
disable_log_buffering=True)
def test_multi_table_join(self, unique_database):
"""Asserts only system table scan fragments are scheduled to coordinators."""
self.execute_query('create table {}.users (user string)'.format(unique_database))
self.execute_query('insert into {}.users values ("alice"), ("bob"), ("{}")'
.format(unique_database, getuser()))
result = self.client.execute('select straight_join count(*) from {}.users, '
'sys.impala_query_live where user = db_user and start_time_utc > utc_timestamp()'
.format(unique_database), fetch_profile_after_close=True)
assert len(result.data) == 1
assert '1' == result.data[0]
# HDFS scan runs on 1 node, System Table scan on 2
assert 2 == result.runtime_profile.count('HDFS_SCAN_NODE')
assert 3 == result.runtime_profile.count('SYSTEM_TABLE_SCAN_NODE')
# impala_query_live is assigned to build side, so executor has HDFS scan fragment and
# aggregation, coordinators have System Table scan fragments, and initial coordinator
# has the root fragment.
self.assert_fragment_instances(result.runtime_profile, [2, 1, 1])