| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from collections import defaultdict |
| from datetime import datetime |
| from tests.beeswax.impala_beeswax import ImpalaBeeswaxException |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, |
| SkipIfLocal, SkipIfNotHdfsMinicluster) |
| from tests.util.filesystem_utils import IS_EC |
| from time import sleep |
| from RuntimeProfile.ttypes import TRuntimeProfileFormat |
| import pytest |
| import re |
| |
| |
| class TestObservability(ImpalaTestSuite): |
| @classmethod |
| def get_workload(self): |
| return 'functional-query' |
| |
| def test_merge_exchange_num_rows(self): |
| """Regression test for IMPALA-1473 - checks that the exec summary for a merging |
| exchange with a limit reports the number of rows returned as equal to the limit, |
| and that the coordinator fragment portion of the runtime profile reports the number |
| of rows returned correctly.""" |
| query = """select tinyint_col, count(*) from functional.alltypes |
| group by tinyint_col order by tinyint_col limit 5""" |
| result = self.execute_query(query) |
| exchange = result.exec_summary[1] |
| assert exchange['operator'] == '05:MERGING-EXCHANGE' |
| assert exchange['num_rows'] == 5 |
| assert exchange['est_num_rows'] == 5 |
| assert exchange['peak_mem'] > 0 |
| |
| for line in result.runtime_profile.split('\n'): |
| # The first 'RowsProduced' we find is for the coordinator fragment. |
| if 'RowsProduced' in line: |
| assert '(5)' in line |
| break |
| |
| def test_broadcast_num_rows(self): |
| """Regression test for IMPALA-3002 - checks that the num_rows for a broadcast node |
| in the exec summary is correctly set as the max over all instances, not the sum.""" |
| query = """select distinct a.int_col, a.string_col from functional.alltypes a |
| inner join functional.alltypessmall b on (a.id = b.id) |
| where a.year = 2009 and b.month = 2""" |
| result = self.execute_query(query) |
| exchange = result.exec_summary[8] |
| assert exchange['operator'] == '04:EXCHANGE' |
| assert exchange['num_rows'] == 25 |
| assert exchange['est_num_rows'] == 25 |
| assert exchange['peak_mem'] > 0 |
| |
| def test_report_time(self): |
| """ Regression test for IMPALA-6741 - checks that last reporting time exists in |
| profiles of fragment instances.""" |
| query = """select count(distinct a.int_col) from functional.alltypes a |
| inner join functional.alltypessmall b on (a.id = b.id + cast(sleep(15) as INT))""" |
| handle = self.hs2_client.execute_async(query) |
| |
| num_validated = 0 |
| tree = self.hs2_client.get_runtime_profile(handle, TRuntimeProfileFormat.THRIFT) |
| while not self.hs2_client.state_is_finished(handle): |
| assert tree, num_validated |
| for node in tree.nodes: |
| if node.name.startswith('Instance '): |
| info_strings_key = 'Last report received time' |
| assert info_strings_key in node.info_strings |
| report_time_str = node.info_strings[info_strings_key].split(".")[0] |
| # Try converting the string to make sure it's in the expected format |
| assert datetime.strptime(report_time_str, '%Y-%m-%d %H:%M:%S') |
| num_validated += 1 |
| tree = self.hs2_client.get_runtime_profile(handle, TRuntimeProfileFormat.THRIFT) |
| # Let's not hit the backend too hard |
| sleep(0.1) |
| assert num_validated > 0 |
| self.hs2_client.close_query(handle) |
| |
| @SkipIfS3.hbase |
| @SkipIfLocal.hbase |
| @SkipIfIsilon.hbase |
| @SkipIfABFS.hbase |
| @SkipIfADLS.hbase |
| def test_scan_summary(self): |
| """IMPALA-4499: Checks that the exec summary for scans show the table name.""" |
| # HDFS table |
| query = "select count(*) from functional.alltypestiny" |
| result = self.execute_query(query) |
| scan_idx = len(result.exec_summary) - 1 |
| assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HDFS' |
| assert result.exec_summary[scan_idx]['detail'] == 'functional.alltypestiny' |
| |
| # KUDU table |
| query = "select count(*) from functional_kudu.alltypestiny" |
| result = self.execute_query(query) |
| scan_idx = len(result.exec_summary) - 1 |
| assert result.exec_summary[scan_idx]['operator'] == '00:SCAN KUDU' |
| assert result.exec_summary[scan_idx]['detail'] == 'functional_kudu.alltypestiny' |
| |
| # HBASE table |
| query = "select count(*) from functional_hbase.alltypestiny" |
| result = self.execute_query(query) |
| scan_idx = len(result.exec_summary) - 1 |
| assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE' |
| assert result.exec_summary[scan_idx]['detail'] == 'functional_hbase.alltypestiny' |
| |
| def test_sink_summary(self, unique_database): |
| """IMPALA-1048: Checks that the exec summary contains sinks.""" |
| # SELECT query. |
| query = "select count(*) from functional.alltypes" |
| result = self.execute_query(query) |
| # Sanity-check the root sink. |
| root_sink = result.exec_summary[0] |
| assert root_sink['operator'] == 'F01:ROOT' |
| assert root_sink['max_time'] >= 0 |
| assert root_sink['num_rows'] == -1 |
| assert root_sink['est_num_rows'] == -1 |
| assert root_sink['peak_mem'] >= 0 |
| assert root_sink['est_peak_mem'] >= 0 |
| # Sanity-check the exchange sink. |
| found_exchange_sender = False |
| for row in result.exec_summary[1:]: |
| if 'EXCHANGE SENDER' not in row['operator']: |
| continue |
| found_exchange_sender = True |
| assert re.match("F[0-9]+:EXCHANGE SENDER", row['operator']) |
| assert row['max_time'] >= 0 |
| assert row['num_rows'] == -1 |
| assert row['est_num_rows'] == -1 |
| assert row['peak_mem'] >= 0 |
| assert row['est_peak_mem'] >= 0 |
| assert found_exchange_sender, result |
| |
| # INSERT query. |
| query = "create table {0}.tmp as select count(*) from functional.alltypes".format( |
| unique_database) |
| result = self.execute_query(query) |
| # Sanity-check the HDFS writer sink. |
| assert result.exec_summary[0]['operator'] == 'F01:HDFS WRITER' |
| assert result.exec_summary[0]['max_time'] >= 0 |
| assert result.exec_summary[0]['num_rows'] == -1 |
| assert result.exec_summary[0]['est_num_rows'] == -1 |
| assert result.exec_summary[0]['peak_mem'] >= 0 |
| assert result.exec_summary[0]['est_peak_mem'] >= 0 |
| |
| def test_query_options(self): |
| """Test that the query profile shows expected non-default query options, both set |
| explicitly through client and those set by planner""" |
| # Set mem_limit and runtime_filter_wait_time_ms to non-default and default value. |
| query_opts = {'mem_limit': 8589934592, 'runtime_filter_wait_time_ms': 0} |
| profile = self.execute_query("select 1", query_opts).runtime_profile |
| assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in profile,\ |
| profile |
| assert "CLIENT_IDENTIFIER=" + \ |
| "query_test/test_observability.py::TestObservability::()::test_query_options" \ |
| in profile |
| # Get the TIMEZONE value. |
| server_timezone = None |
| for row in self.execute_query("set", query_opts).data: |
| name, val, _ = row.split("\t") |
| if name == "TIMEZONE": |
| server_timezone = val |
| break |
| assert server_timezone is not None |
| |
| # For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1, |
| # RUNTIME_FILTER_MODE=0 and MT_DOP=0 |
| expected_str = ("Query Options (set by configuration and planner): " |
| "MEM_LIMIT=8589934592," |
| "NUM_NODES=1,NUM_SCANNER_THREADS=1," |
| "RUNTIME_FILTER_MODE=0,MT_DOP=0,{erasure_coding}TIMEZONE={timezone}," |
| "CLIENT_IDENTIFIER=" |
| "query_test/test_observability.py::TestObservability::()::test_query_options" |
| "\n") |
| expected_str = expected_str.format( |
| erasure_coding="ALLOW_ERASURE_CODED_FILES=1," if IS_EC else "", |
| timezone=server_timezone) |
| assert expected_str in profile, profile |
| |
| def test_exec_summary(self): |
| """Test that the exec summary is populated correctly in every query state""" |
| query = "select count(*) from functional.alltypes" |
| handle = self.execute_query_async(query, |
| {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"}) |
| # If ExecuteStatement() has completed and the query is paused in the admission control |
| # phase, then the coordinator has not started yet and exec_summary should be empty. |
| exec_summary = self.client.get_exec_summary(handle) |
| assert exec_summary is not None and exec_summary.nodes is None |
| # After completion of the admission control phase, the coordinator would have started |
| # and we should get a populated exec_summary. |
| self.client.wait_for_admission_control(handle) |
| exec_summary = self.client.get_exec_summary(handle) |
| assert exec_summary is not None and exec_summary.nodes is not None |
| |
| self.client.fetch(query, handle) |
| exec_summary = self.client.get_exec_summary(handle) |
| # After fetching the results and reaching finished state, we should still be able to |
| # fetch an exec_summary. |
| assert exec_summary is not None and exec_summary.nodes is not None |
| |
| def test_exec_summary_in_runtime_profile(self): |
| """Test that the exec summary is populated in runtime profile correctly in every |
| query state""" |
| query = "select count(*) from functional.alltypes" |
| handle = self.execute_query_async(query, |
| {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"}) |
| |
| # If ExecuteStatement() has completed and the query is paused in the admission control |
| # phase, then the coordinator has not started yet and exec_summary should be empty. |
| profile = self.client.get_runtime_profile(handle) |
| assert "ExecSummary:" not in profile, profile |
| # After completion of the admission control phase, the coordinator would have started |
| # and we should get a populated exec_summary. |
| self.client.wait_for_admission_control(handle) |
| profile = self.client.get_runtime_profile(handle) |
| assert "ExecSummary:" in profile, profile |
| |
| self.client.fetch(query, handle) |
| # After fetching the results and reaching finished state, we should still be able to |
| # fetch an exec_summary in profile. |
| profile = self.client.get_runtime_profile(handle) |
| assert "ExecSummary:" in profile, profile |
| |
| @SkipIfLocal.multiple_impalad |
| def test_profile_fragment_instances(self): |
| """IMPALA-6081: Test that the expected number of fragment instances and their exec |
| nodes appear in the runtime profile, even when fragments may be quickly cancelled when |
| all results are already returned.""" |
| results = self.execute_query(""" |
| with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem) |
| select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a |
| join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;""") |
| # There are 3 scan nodes and each appears in the profile n+1 times (for n fragment |
| # instances + the averaged fragment). n depends on how data is loaded and scheduler's |
| # decision. |
| n = results.runtime_profile.count("HDFS_SCAN_NODE") |
| assert n > 0 and n % 3 == 0 |
| # There are 3 exchange nodes and each appears in the profile 2 times (for 1 fragment |
| # instance + the averaged fragment). |
| assert results.runtime_profile.count("EXCHANGE_NODE") == 6 |
| # The following appear only in the root fragment which has 1 instance. |
| assert results.runtime_profile.count("HASH_JOIN_NODE") == 2 |
| assert results.runtime_profile.count("AGGREGATION_NODE") == 2 |
| assert results.runtime_profile.count("PLAN_ROOT_SINK") == 2 |
| |
| def test_query_profile_contains_query_compilation_static_events(self): |
| """Test that the expected events show up in a query profile. These lines are static |
| and should appear in this exact order.""" |
| event_regexes = [ |
| r'Analysis finished:', |
| r'Authorization finished (.*):', |
| r'Value transfer graph computed:', |
| r'Single node plan created:', |
| r'Runtime filters computed:', |
| r'Distributed plan created:'] |
| query = "select * from functional.alltypes" |
| runtime_profile = self.execute_query(query).runtime_profile |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def test_query_profile_contains_query_compilation_metadata_load_events(self, |
| cluster_properties): |
| """Test that the Metadata load started and finished events appear in the query |
| profile when Catalog cache is evicted.""" |
| invalidate_query = "invalidate metadata functional.alltypes" |
| select_query = "select * from functional.alltypes" |
| self.execute_query(invalidate_query).runtime_profile |
| runtime_profile = self.execute_query(select_query).runtime_profile |
| # Depending on whether this is a catalog-v2 cluster or not some of the metadata |
| # loading events are different |
| if not cluster_properties.is_catalog_v2_cluster(): |
| load_event_regexes = [r'Query Compilation:', r'Metadata load started:', |
| r'Metadata load finished. loaded-tables=.*/.* load-requests=.* ' |
| r'catalog-updates=.*:', |
| r'Analysis finished:'] |
| else: |
| load_event_regexes = [ |
| r'Frontend:', |
| r'CatalogFetch.ColumnStats.Hits', |
| r'CatalogFetch.ColumnStats.Misses', |
| r'CatalogFetch.ColumnStats.Requests', |
| r'CatalogFetch.ColumnStats.Time', |
| r'CatalogFetch.Config.Hits', |
| r'CatalogFetch.Config.Misses', |
| r'CatalogFetch.Config.Requests', |
| r'CatalogFetch.Config.Time', |
| r'CatalogFetch.DatabaseList.Hits', |
| r'CatalogFetch.DatabaseList.Misses', |
| r'CatalogFetch.DatabaseList.Requests', |
| r'CatalogFetch.DatabaseList.Time', |
| r'CatalogFetch.PartitionLists.Hits', |
| r'CatalogFetch.PartitionLists.Misses', |
| r'CatalogFetch.PartitionLists.Requests', |
| r'CatalogFetch.PartitionLists.Time', |
| r'CatalogFetch.Partitions.Hits', |
| r'CatalogFetch.Partitions.Misses', |
| r'CatalogFetch.Partitions.Requests', |
| r'CatalogFetch.Partitions.Time', |
| r'CatalogFetch.RPCs.Bytes', |
| r'CatalogFetch.RPCs.Requests', |
| r'CatalogFetch.RPCs.Time', |
| r'CatalogFetch.StorageLoad.Time', |
| r'CatalogFetch.TableNames.Hits', |
| r'CatalogFetch.TableNames.Misses', |
| r'CatalogFetch.TableNames.Requests', |
| r'CatalogFetch.TableNames.Time', |
| r'CatalogFetch.Tables.Hits', |
| r'CatalogFetch.Tables.Misses', |
| r'CatalogFetch.Tables.Requests', |
| r'CatalogFetch.Tables.Time'] |
| self.__verify_profile_event_sequence(load_event_regexes, runtime_profile) |
| |
| def test_query_profile_contains_query_compilation_metadata_cached_event(self): |
| """Test that the Metadata cache available event appears in the query profile when |
| the table is cached.""" |
| refresh_query = "refresh functional.alltypes" |
| select_query = "select * from functional.alltypes" |
| self.execute_query(refresh_query).runtime_profile |
| runtime_profile = self.execute_query(select_query).runtime_profile |
| event_regexes = [r'Query Compilation:', |
| r'Metadata of all .* tables cached:', |
| r'Analysis finished:'] |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def test_query_profile_contains_query_compilation_lineage_event(self): |
| """Test that the lineage information appears in the profile in the right place. This |
| event depends on whether the lineage_event_log_dir is configured.""" |
| impalad = self.impalad_test_service |
| lineage_event_log_dir_value = impalad.get_flag_current_value("lineage_event_log_dir") |
| assert lineage_event_log_dir_value is not None |
| if lineage_event_log_dir_value == "": |
| event_regexes = [ |
| r'Distributed plan created:', |
| r'Planning finished:'] |
| else: |
| event_regexes = [ |
| r'Distributed plan created:', |
| r'Lineage info computed:', |
| r'Planning finished:'] |
| query = "select * from functional.alltypes" |
| runtime_profile = self.execute_query(query).runtime_profile |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def test_query_profile_contains_query_timeline_events(self): |
| """Test that the expected events show up in a query profile.""" |
| event_regexes = [r'Query Timeline:', |
| r'Query submitted:', |
| r'Planning finished:', |
| r'Submit for admission:', |
| r'Completed admission:', |
| r'Ready to start on .* backends:', |
| r'All .* execution backends \(.* fragment instances\) started:', |
| r'Rows available:', |
| r'First row fetched:', |
| r'Last row fetched:', |
| r'Released admission control resources:'] |
| query = "select * from functional.alltypes" |
| # Use no-limits pool so that it cannot get queued in admission control (which would |
| # add an extra event to the above timeline). |
| query_opts = {'request_pool': 'root.no-limits'} |
| runtime_profile = self.execute_query(query, query_opts).runtime_profile |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def test_query_profile_contains_instance_events(self): |
| """Test that /query_profile_encoded contains an event timeline for fragment |
| instances, even when there are errors.""" |
| event_regexes = [r'Fragment Instance Lifecycle Event Timeline', |
| r'Prepare Finished', |
| r'Open Finished', |
| r'First Batch Produced', |
| r'First Batch Sent', |
| r'ExecInternal Finished'] |
| query = "select count(*) from functional.alltypes" |
| runtime_profile = self.execute_query(query).runtime_profile |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def test_query_profile_contains_node_events(self): |
| """Test that ExecNode events show up in a profile.""" |
| event_regexes = [r'Node Lifecycle Event Timeline', |
| r'Open Started', |
| r'Open Finished', |
| r'First Batch Requested', |
| r'First Batch Returned', |
| r'Last Batch Returned', |
| r'Closed'] |
| query = "select count(*) from functional.alltypes" |
| runtime_profile = self.execute_query(query).runtime_profile |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def __verify_profile_event_sequence(self, event_regexes, runtime_profile): |
| """Check that 'event_regexes' appear in a consecutive series of lines in |
| 'runtime_profile'""" |
| lines = runtime_profile.splitlines() |
| event_regex_index = 0 |
| |
| # Check that the strings appear in the above order with no gaps in the profile. |
| for line in runtime_profile.splitlines(): |
| match = re.search(event_regexes[event_regex_index], line) |
| if match is not None: |
| event_regex_index += 1 |
| if event_regex_index == len(event_regexes): |
| # Found all the lines - we're done. |
| return |
| else: |
| # Haven't found the first regex yet. |
| assert event_regex_index == 0, \ |
| event_regexes[event_regex_index] + " not in " + line + "\n" + runtime_profile |
| assert event_regex_index == len(event_regexes), \ |
| "Didn't find all events in profile: \n" + runtime_profile |
| |
| def test_query_profile_contains_all_events(self, unique_database): |
| """Test that the expected events show up in a query profile for various queries""" |
| # make a data file to load data from |
| path = "test-warehouse/{0}.db/data_file".format(unique_database) |
| self.filesystem_client.create_file(path, "1") |
| use_query = "use {0}".format(unique_database) |
| self.execute_query(use_query) |
| # all the events we will see for every query |
| event_regexes = [ |
| r'Query Compilation:', |
| r'Query Timeline:', |
| r'Planning finished' |
| ] |
| # queries that explore different code paths in Frontend compilation |
| queries = [ |
| 'create table if not exists impala_6568 (i int)', |
| 'select * from impala_6568', |
| 'explain select * from impala_6568', |
| 'describe impala_6568', |
| 'alter table impala_6568 set tblproperties(\'numRows\'=\'10\')', |
| "load data inpath '/{0}' into table impala_6568".format(path) |
| ] |
| # run each query... |
| for query in queries: |
| runtime_profile = self.execute_query(query).runtime_profile |
| # and check that all the expected events appear in the resulting profile |
| self.__verify_profile_contains_every_event(event_regexes, runtime_profile, query) |
| |
| def __verify_profile_contains_every_event(self, event_regexes, runtime_profile, query): |
| """Test that all the expected events show up in a given query profile.""" |
| for regex in event_regexes: |
| assert any(re.search(regex, line) for line in runtime_profile.splitlines()), \ |
| "Didn't find event '" + regex + "' for query '" + query + \ |
| "' in profile: \n" + runtime_profile |
| |
| def test_compute_stats_profile(self, unique_database): |
| """Test that the profile for a 'compute stats' query contains three unique query ids: |
| one for the parent 'compute stats' query and one each for the two child queries.""" |
| table_name = "%s.test_compute_stats_profile" % unique_database |
| self.execute_query( |
| "create table %s as select * from functional.alltypestiny" % table_name) |
| results = self.execute_query("compute stats %s" % table_name) |
| # Search for all query ids (max length 33) in the profile. |
| matches = re.findall("Query \(id=.{,33}\)", results.runtime_profile) |
| query_ids = [] |
| for query_id in matches: |
| if query_id not in query_ids: |
| query_ids.append(query_id) |
| assert len(query_ids) == 3, results.runtime_profile |
| |
| def test_global_resource_counters_in_profile(self): |
| """Test that a set of global resource usage counters show up in the profile.""" |
| query = "select count(*) from functional.alltypes" |
| profile = self.execute_query(query).runtime_profile |
| expected_counters = ["TotalBytesRead", "TotalBytesSent", "TotalScanBytesSent", |
| "TotalInnerBytesSent", "ExchangeScanRatio", |
| "InnerNodeSelectivityRatio"] |
| assert all(counter in profile for counter in expected_counters) |
| |
| @SkipIfNotHdfsMinicluster.tuned_for_minicluster |
| def test_global_exchange_counters(self): |
| """Test that global exchange counters are set correctly.""" |
| query = """select count(*) from tpch_parquet.orders o inner join tpch_parquet.lineitem |
| l on o.o_orderkey = l.l_orderkey group by o.o_clerk limit 10""" |
| profile = self.execute_query(query).runtime_profile |
| |
| # TimeSeriesCounter should be prefixed with a hyphen. |
| assert " MemoryUsage" not in profile |
| assert "- MemoryUsage" in profile |
| |
| assert "ExchangeScanRatio: 3.19" in profile |
| |
| keys = ["TotalBytesSent", "TotalScanBytesSent", "TotalInnerBytesSent"] |
| counters = defaultdict(int) |
| for line in profile.splitlines(): |
| for key in keys: |
| if key in line: |
| # Match byte count within parentheses |
| m = re.search("\(([0-9]+)\)", line) |
| assert m, "Cannot match pattern for key %s in line '%s'" % (key, line) |
| # Only keep first (query-level) counter |
| if counters[key] == 0: |
| counters[key] = int(m.group(1)) |
| |
| # All counters have values |
| assert all(counters[key] > 0 for key in keys) |
| |
| assert counters["TotalBytesSent"] == (counters["TotalScanBytesSent"] + |
| counters["TotalInnerBytesSent"]) |
| |
| def test_query_profile_contains_host_resource_usage(self): |
| """Tests that the profile contains a sub-profile with per node resource usage.""" |
| result = self.execute_query("select count(*), sleep(1000) from functional.alltypes") |
| profile = result.runtime_profile |
| expected_str = "Per Node Profiles:" |
| assert any(expected_str in line for line in profile.splitlines()) |
| |
| def test_query_profile_host_resource_metrics_off(self): |
| """Tests that the query profile does not contain resource usage metrics by default or |
| when disabled explicitly.""" |
| query = "select count(*), sleep(1000) from functional.alltypes" |
| for query_opts in [None, {'resource_trace_ratio': 0.0}]: |
| profile = self.execute_query(query, query_opts).runtime_profile |
| # Assert that no host resource counters exist in the profile |
| for line in profile.splitlines(): |
| assert not re.search("HostCpu.*Percentage", line) |
| assert not re.search("HostNetworkRx", line) |
| assert not re.search("HostDiskReadThroughput", line) |
| |
| def test_query_profile_contains_host_resource_metrics(self): |
| """Tests that the query profile contains various CPU and network metrics.""" |
| query_opts = {'resource_trace_ratio': 1.0} |
| query = "select count(*), sleep(1000) from functional.alltypes" |
| profile = self.execute_query(query, query_opts).runtime_profile |
| # We check for 500ms because a query with 1s duration won't hit the 64 values limit |
| # that would trigger resampling. |
| expected_strs = ["HostCpuIoWaitPercentage (500.000ms):", |
| "HostCpuSysPercentage (500.000ms):", |
| "HostCpuUserPercentage (500.000ms):", |
| "HostNetworkRx (500.000ms):", |
| "HostNetworkTx (500.000ms):", |
| "HostDiskReadThroughput (500.000ms):", |
| "HostDiskWriteThroughput (500.000ms):"] |
| |
| # Assert that all expected counters exist in the profile. |
| for expected_str in expected_strs: |
| assert any(expected_str in line for line in profile.splitlines()), expected_str |
| |
| # Check that there are some values for each counter. |
| for line in profile.splitlines(): |
| if not any(key in line for key in expected_strs): |
| continue |
| values = line.split(':')[1].strip().split(',') |
| assert len(values) > 0 |
| |
| def _find_ts_counters_in_thrift_profile(self, profile, name): |
| """Finds all time series counters in 'profile' with a matching name.""" |
| counters = [] |
| for node in profile.nodes: |
| for counter in node.time_series_counters or []: |
| if counter.name == name: |
| counters.append(counter) |
| return counters |
| |
| @pytest.mark.execute_serially |
| def test_thrift_profile_contains_host_resource_metrics(self): |
| """Tests that the thrift profile contains time series counters for CPU and network |
| resource usage.""" |
| query_opts = {'resource_trace_ratio': 1.0} |
| self.hs2_client.set_configuration(query_opts) |
| result = self.hs2_client.execute("select sleep(2000)", |
| profile_format=TRuntimeProfileFormat.THRIFT) |
| thrift_profile = result.profile |
| |
| expected_keys = ["HostCpuUserPercentage", "HostNetworkRx", "HostDiskReadThroughput"] |
| for key in expected_keys: |
| counters = self._find_ts_counters_in_thrift_profile(thrift_profile, key) |
| # The query will run on a single node, we will only find the counter once. |
| assert len(counters) == 1 |
| counter = counters[0] |
| assert len(counter.values) > 0 |
| |
| @pytest.mark.execute_serially |
| def test_query_profile_thrift_timestamps(self): |
| """Test that the query profile start and end time date-time strings have |
| nanosecond precision. Nanosecond precision is expected by management API clients |
| that consume Impala debug webpages.""" |
| query = "select sleep(5)" |
| result = self.hs2_client.execute(query, profile_format=TRuntimeProfileFormat.THRIFT) |
| tree = result.profile |
| |
| # tree.nodes[1] corresponds to ClientRequestState::summary_profile_ |
| # See be/src/service/client-request-state.[h|cc]. |
| start_time = tree.nodes[1].info_strings["Start Time"] |
| end_time = tree.nodes[1].info_strings["End Time"] |
| # Start and End Times are of the form "2017-12-07 22:26:52.167711000" |
| start_time_sub_sec_str = start_time.split('.')[-1] |
| end_time_sub_sec_str = end_time.split('.')[-1] |
| |
| assert len(end_time_sub_sec_str) == 9, end_time |
| assert len(start_time_sub_sec_str) == 9, start_time |
| |
| @pytest.mark.execute_serially |
| def test_end_time(self): |
| """ Test that verifies that the end time of a query with a coordinator is set once |
| the coordinator releases its admission control resources. This ensures that the |
| duration of the query will be determined by the time taken to do real work rather |
| than the duration for which the query remains open. On the other hand, for queries |
| without coordinators, the End Time is set only when UnregisterQuery() is called.""" |
| # Test the end time of a query with a coordinator. |
| query = "select 1" |
| handle = self.hs2_client.execute_async(query) |
| result = self.hs2_client.fetch(query, handle) |
| # Ensure that the query returns a non-empty result set. |
| assert result is not None |
| # Once the results have been fetched, the query End Time must be set. |
| tree = self.hs2_client.get_runtime_profile(handle, TRuntimeProfileFormat.THRIFT) |
| end_time = tree.nodes[1].info_strings["End Time"] |
| assert end_time |
| self.hs2_client.close_query(handle) |
| |
| # Test the end time of a query without a coordinator. |
| query = "describe functional.alltypes" |
| handle = self.hs2_client.execute_async(query) |
| result = self.hs2_client.fetch(query, handle) |
| # Ensure that the query returns a non-empty result set. |
| assert result |
| # The query End Time must not be set until the query is unregisterted |
| tree = self.hs2_client.get_runtime_profile(handle, TRuntimeProfileFormat.THRIFT) |
| end_time = tree.nodes[1].info_strings["End Time"] |
| assert len(end_time) == 0, end_time |
| # Save the last operation to be able to retrieve the profile after closing the query |
| last_op = handle.get_handle()._last_operation |
| self.hs2_client.close_query(handle) |
| tree = last_op.get_profile(TRuntimeProfileFormat.THRIFT) |
| end_time = tree.nodes[1].info_strings["End Time"] |
| assert end_time is not None |
| |
| def test_query_profile_contains_number_of_fragment_instance(self): |
| """Test that the expected section for number of fragment instance in |
| a query profile.""" |
| event_regexes = [r'Per Host Number of Fragment Instances'] |
| query = "select count (*) from functional.alltypes" |
| runtime_profile = self.execute_query(query).runtime_profile |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def test_query_profile_contains_executor_group(self): |
| """Test that the profile contains an info string with the executor group that was |
| picked by admission control.""" |
| query = "select count (*) from functional.alltypes" |
| runtime_profile = self.execute_query(query).runtime_profile |
| assert "Executor Group:" in runtime_profile |
| |
| def test_query_profile_storage_load_time_filesystem(self, unique_database, |
| cluster_properties): |
| """Test that when a query needs load metadata for table(s), the |
| storage load time should be in the profile. Tests file systems.""" |
| table_name = 'ld_prof' |
| self.execute_query( |
| "create table {0}.{1}(col1 int)".format(unique_database, table_name)) |
| self.__check_query_profile_storage_load_time(unique_database, table_name, |
| cluster_properties) |
| |
| @SkipIfS3.hbase |
| @SkipIfLocal.hbase |
| @SkipIfIsilon.hbase |
| @SkipIfABFS.hbase |
| @SkipIfADLS.hbase |
| @pytest.mark.execute_serially |
| def test_query_profile_storage_load_time(self, cluster_properties): |
| """Test that when a query needs load metadata for table(s), the |
| storage load time should be in the profile. Tests kudu and hbase.""" |
| # KUDU table |
| self.__check_query_profile_storage_load_time("functional_kudu", "alltypes", |
| cluster_properties) |
| |
| # HBASE table |
| self.__check_query_profile_storage_load_time("functional_hbase", "alltypes", |
| cluster_properties) |
| |
| def __check_query_profile_storage_load_time(self, db_name, table_name, |
| cluster_properties): |
| """Check query profile for storage load time with a given database.""" |
| self.execute_query("invalidate metadata {0}.{1}".format(db_name, table_name)) |
| query = "select count (*) from {0}.{1}".format(db_name, table_name) |
| runtime_profile = self.execute_query(query).runtime_profile |
| if cluster_properties.is_catalog_v2_cluster(): |
| storageLoadTime = "StorageLoad.Time" |
| else: |
| storageLoadTime = "storage-load-time" |
| assert storageLoadTime in runtime_profile |
| # Call the second time, no metastore loading needed. |
| # Only check this part in Catalog V1 because of V2's random behavior |
| if not cluster_properties.is_catalog_v2_cluster(): |
| runtime_profile = self.execute_query(query).runtime_profile |
| assert storageLoadTime not in runtime_profile |
| |
| def __verify_hashtable_stats_profile(self, runtime_profile): |
| assert "Hash Table" in runtime_profile |
| assert "Probes:" in runtime_profile |
| assert "Travel:" in runtime_profile |
| assert "HashCollisions:" in runtime_profile |
| assert "Resizes:" in runtime_profile |
| nprobes = re.search('Probes:.*\((\d+)\)', runtime_profile) |
| # Probes and travel can be 0. The number can be an integer or float with K. |
| assert nprobes and len(nprobes.groups()) == 1 and nprobes.group(1) >= 0 |
| ntravel = re.search('Travel:.*\((\d+)\)', runtime_profile) |
| assert ntravel and len(ntravel.groups()) == 1 and ntravel.group(1) >= 0 |
| |
| def test_query_profle_hashtable(self): |
| """Test that the profile for join/aggregate contains hash table related |
| information.""" |
| # Join |
| query = """select a.int_col, a.string_col from functional.alltypes a |
| inner join functional.alltypessmall b on a.id = b.id""" |
| result = self.execute_query(query) |
| assert result.success |
| self.__verify_hashtable_stats_profile(result.runtime_profile) |
| # Group by |
| query = """select year, count(*) from |
| functional.alltypesagg where int_col < 7 and year = 2010 group by year""" |
| result = self.execute_query(query) |
| assert result.success |
| self.__verify_hashtable_stats_profile(result.runtime_profile) |
| |
| |
| class TestQueryStates(ImpalaTestSuite): |
| """Test that the 'Query State' and 'Impala Query State' are set correctly in the |
| runtime profile.""" |
| |
| @classmethod |
| def get_workload(self): |
| return 'functional-query' |
| |
| def test_query_states(self): |
| """Tests that the query profile shows expected query states.""" |
| query = "select count(*) from functional.alltypes where bool_col = sleep(10)" |
| handle = self.execute_query_async(query, |
| {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"}) |
| # If ExecuteStatement() has completed and the query is paused in the admission control |
| # phase, then the query must be in COMPILED state. |
| profile = self.client.get_runtime_profile(handle) |
| assert self.__is_line_in_profile("Query State: COMPILED", profile) |
| assert self.__is_line_in_profile("Impala Query State: PENDING", profile) |
| # After completion of the admission control phase, the query must have at least |
| # reached RUNNING state. |
| self.client.wait_for_admission_control(handle) |
| profile = self.client.get_runtime_profile(handle) |
| assert self.__is_line_in_profile("Query State: RUNNING", profile), profile |
| assert self.__is_line_in_profile("Impala Query State: RUNNING", profile), profile |
| |
| self.client.fetch(query, handle) |
| profile = self.client.get_runtime_profile(handle) |
| # After fetching the results, the query must be in state FINISHED. |
| assert self.__is_line_in_profile("Query State: FINISHED", profile), profile |
| assert self.__is_line_in_profile("Impala Query State: FINISHED", profile), profile |
| |
| def test_error_query_state(self): |
| """Tests that the query profile shows the proper error state.""" |
| query = "select * from functional.alltypes limit 10" |
| handle = self.execute_query_async(query, {"abort_on_error": "1", |
| "debug_action": "0:GETNEXT:FAIL"}) |
| |
| def assert_finished(): |
| profile = self.client.get_runtime_profile(handle) |
| return self.__is_line_in_profile("Query State: FINISHED", profile) and \ |
| self.__is_line_in_profile("Impala Query State: FINISHED", profile) |
| |
| self.assert_eventually(30, 1, assert_finished, |
| lambda: self.client.get_runtime_profile(handle)) |
| |
| try: |
| self.client.fetch(query, handle) |
| assert False |
| except ImpalaBeeswaxException: |
| pass |
| |
| profile = self.client.get_runtime_profile(handle) |
| assert self.__is_line_in_profile("Query State: EXCEPTION", profile), profile |
| assert self.__is_line_in_profile("Impala Query State: ERROR", profile), profile |
| |
| def __is_line_in_profile(self, line, profile): |
| """Returns true if the given 'line' is in the given 'profile'. A single line of the |
| profile must exactly match the given 'line' (excluding whitespaces).""" |
| return re.search("^\s*{0}\s*$".format(line), profile, re.M) |