| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal |
| from tests.common.impala_cluster import ImpalaCluster |
| import logging |
| import pytest |
| import re |
| import time |
| |
| class TestObservability(ImpalaTestSuite): |
| @classmethod |
| def get_workload(self): |
| return 'functional-query' |
| |
| def test_merge_exchange_num_rows(self): |
| """Regression test for IMPALA-1473 - checks that the exec summary for a merging |
| exchange with a limit reports the number of rows returned as equal to the limit, |
| and that the coordinator fragment portion of the runtime profile reports the number |
| of rows returned correctly.""" |
| query = """select tinyint_col, count(*) from functional.alltypes |
| group by tinyint_col order by tinyint_col limit 5""" |
| result = self.execute_query(query) |
| assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE' |
| assert result.exec_summary[0]['num_rows'] == 5 |
| assert result.exec_summary[0]['est_num_rows'] == 5 |
| assert result.exec_summary[0]['peak_mem'] > 0 |
| |
| for line in result.runtime_profile.split('\n'): |
| # The first 'RowsProduced' we find is for the coordinator fragment. |
| if 'RowsProduced' in line: |
| assert '(5)' in line |
| break |
| |
| def test_broadcast_num_rows(self): |
| """Regression test for IMPALA-3002 - checks that the num_rows for a broadcast node |
| in the exec summaty is correctly set as the max over all instances, not the sum.""" |
| query = """select distinct a.int_col, a.string_col from functional.alltypes a |
| inner join functional.alltypessmall b on (a.id = b.id) |
| where a.year = 2009 and b.month = 2""" |
| result = self.execute_query(query) |
| assert result.exec_summary[5]['operator'] == '04:EXCHANGE' |
| assert result.exec_summary[5]['num_rows'] == 25 |
| assert result.exec_summary[5]['est_num_rows'] == 25 |
| assert result.exec_summary[5]['peak_mem'] > 0 |
| |
| @SkipIfS3.hbase |
| @SkipIfLocal.hbase |
| @SkipIfIsilon.hbase |
| @SkipIfADLS.hbase |
| def test_scan_summary(self): |
| """IMPALA-4499: Checks that the exec summary for scans show the table name.""" |
| # HDFS table |
| query = "select count(*) from functional.alltypestiny" |
| result = self.execute_query(query) |
| scan_idx = len(result.exec_summary) - 1 |
| assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HDFS' |
| assert result.exec_summary[scan_idx]['detail'] == 'functional.alltypestiny' |
| |
| # KUDU table |
| query = "select count(*) from functional_kudu.alltypestiny" |
| result = self.execute_query(query) |
| scan_idx = len(result.exec_summary) - 1 |
| assert result.exec_summary[scan_idx]['operator'] == '00:SCAN KUDU' |
| assert result.exec_summary[scan_idx]['detail'] == 'functional_kudu.alltypestiny' |
| |
| # HBASE table |
| query = "select count(*) from functional_hbase.alltypestiny" |
| result = self.execute_query(query) |
| scan_idx = len(result.exec_summary) - 1 |
| assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE' |
| assert result.exec_summary[scan_idx]['detail'] == 'functional_hbase.alltypestiny' |
| |
| def test_query_states(self): |
| """Tests that the query profile shows expected query states.""" |
| query = "select count(*) from functional.alltypes" |
| handle = self.execute_query_async(query, |
| {"debug_action": "SLEEP_BEFORE_ADMISSION_MS:1000"}) |
| # If ExecuteStatement() has completed and the query is paused in the admission control |
| # phase, then the query must be in COMPILED state. |
| profile = self.client.get_runtime_profile(handle) |
| assert "Query State: COMPILED" in profile |
| # After completion of the admission control phase, the query must have at least |
| # reached RUNNING state. |
| self.client.wait_for_admission_control(handle) |
| profile = self.client.get_runtime_profile(handle) |
| assert "Query State: RUNNING" in profile or \ |
| "Query State: FINISHED" in profile, profile |
| |
| results = self.client.fetch(query, handle) |
| profile = self.client.get_runtime_profile(handle) |
| # After fetching the results, the query must be in state FINISHED. |
| assert "Query State: FINISHED" in profile, profile |
| |
| def test_query_options(self): |
| """Test that the query profile shows expected non-default query options, both set |
| explicitly through client and those set by planner""" |
| # Set mem_limit and runtime_filter_wait_time_ms to non-default and default value. |
| query_opts = {'mem_limit': 8589934592, 'runtime_filter_wait_time_ms': 0} |
| profile = self.execute_query("select 1", query_opts).runtime_profile |
| assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in profile,\ |
| profile |
| # For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1, |
| # RUNTIME_FILTER_MODE=0 and MT_DOP=0 |
| assert "Query Options (set by configuration and planner): MEM_LIMIT=8589934592," \ |
| "NUM_NODES=1,NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n" \ |
| in profile |
| |
| def test_exec_summary(self): |
| """Test that the exec summary is populated correctly in every query state""" |
| query = "select count(*) from functional.alltypes" |
| handle = self.execute_query_async(query, |
| {"debug_action": "SLEEP_BEFORE_ADMISSION_MS:1000"}) |
| # If ExecuteStatement() has completed and the query is paused in the admission control |
| # phase, then the coordinator has not started yet and exec_summary should be empty. |
| exec_summary = self.client.get_exec_summary(handle) |
| assert exec_summary is not None and exec_summary.nodes is None |
| # After completion of the admission control phase, the coordinator would have started |
| # and we should get a populated exec_summary. |
| self.client.wait_for_admission_control(handle) |
| exec_summary = self.client.get_exec_summary(handle) |
| assert exec_summary is not None and exec_summary.nodes is not None |
| |
| self.client.fetch(query, handle) |
| exec_summary = self.client.get_exec_summary(handle) |
| # After fetching the results and reaching finished state, we should still be able to |
| # fetch an exec_summary. |
| assert exec_summary is not None and exec_summary.nodes is not None |
| |
| @SkipIfLocal.multiple_impalad |
| @pytest.mark.xfail(reason="IMPALA-6338") |
| def test_profile_fragment_instances(self): |
| """IMPALA-6081: Test that the expected number of fragment instances and their exec |
| nodes appear in the runtime profile, even when fragments may be quickly cancelled when |
| all results are already returned.""" |
| results = self.execute_query(""" |
| with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem) |
| select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a |
| join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;""") |
| # There are 3 scan nodes and each appears in the profile 4 times (for 3 fragment |
| # instances + the averaged fragment). |
| assert results.runtime_profile.count("HDFS_SCAN_NODE") == 12 |
| # There are 3 exchange nodes and each appears in the profile 2 times (for 1 fragment |
| # instance + the averaged fragment). |
| assert results.runtime_profile.count("EXCHANGE_NODE") == 6 |
| # The following appear only in the root fragment which has 1 instance. |
| assert results.runtime_profile.count("HASH_JOIN_NODE") == 2 |
| assert results.runtime_profile.count("AGGREGATION_NODE") == 2 |
| assert results.runtime_profile.count("PLAN_ROOT_SINK") == 2 |
| |
| def test_query_profile_contains_query_events(self): |
| """Test that the expected events show up in a query profile.""" |
| event_regexes = [r'Query Timeline:', |
| r'Query submitted:', |
| r'Planning finished:', |
| r'Submit for admission:', |
| r'Completed admission:', |
| r'Ready to start on .* backends:', |
| r'All .* execution backends \(.* fragment instances\) started:', |
| r'Rows available:', |
| r'First row fetched:', |
| r'Last row fetched:', |
| r'Released admission control resources:'] |
| query = "select * from functional.alltypes" |
| runtime_profile = self.execute_query(query).runtime_profile |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def test_query_profile_contains_instance_events(self): |
| """Test that /query_profile_encoded contains an event timeline for fragment |
| instances, even when there are errors.""" |
| event_regexes = [r'Fragment Instance Lifecycle Event Timeline', |
| r'Prepare Finished', |
| r'Open Finished', |
| r'First Batch Produced', |
| r'First Batch Sent', |
| r'ExecInternal Finished'] |
| query = "select count(*) from functional.alltypes" |
| runtime_profile = self.execute_query(query).runtime_profile |
| self.__verify_profile_event_sequence(event_regexes, runtime_profile) |
| |
| def __verify_profile_event_sequence(self, event_regexes, runtime_profile): |
| """Check that 'event_regexes' appear in a consecutive series of lines in |
| 'runtime_profile'""" |
| lines = runtime_profile.splitlines() |
| event_regex_index = 0 |
| |
| # Check that the strings appear in the above order with no gaps in the profile. |
| for line in runtime_profile.splitlines(): |
| match = re.search(event_regexes[event_regex_index], line) |
| if match is not None: |
| event_regex_index += 1 |
| if event_regex_index == len(event_regexes): |
| # Found all the lines - we're done. |
| return |
| else: |
| # Haven't found the first regex yet. |
| assert event_regex_index == 0, \ |
| event_regexes[event_regex_index] + " not in " + line + "\n" + runtime_profile |
| assert event_regex_index == len(event_regexes), \ |
| "Didn't find all events in profile: \n" + runtime_profile |
| |
| class TestThriftProfile(ImpalaTestSuite): |
| @classmethod |
| def get_workload(self): |
| return 'functional-query' |
| |
| # IMPALA-6399: Run this test serially to avoid a delay over the wait time in fetching |
| # the profile. |
| # This test needs to call self.client.close() to force computation of query end time, |
| # so it has to be in its own suite (IMPALA-6498). |
| @pytest.mark.execute_serially |
| def test_query_profile_thrift_timestamps(self): |
| """Test that the query profile start and end time date-time strings have |
| nanosecond precision. Nanosecond precision is expected by management API clients |
| that consume Impala debug webpages.""" |
| query = "select sleep(5)" |
| handle = self.client.execute_async(query) |
| query_id = handle.get_handle().id |
| results = self.client.fetch(query, handle) |
| self.client.close() |
| |
| MAX_WAIT = 300 |
| start = time.time() |
| end = start + MAX_WAIT |
| while time.time() <= end: |
| # Sleep before trying to fetch the profile. This helps to prevent a warning when the |
| # profile is not yet available immediately. It also makes it less likely to |
| # introduce an error below in future changes by forgetting to sleep. |
| time.sleep(1) |
| tree = self.impalad_test_service.get_thrift_profile(query_id) |
| if not tree: |
| continue |
| |
| # tree.nodes[1] corresponds to ClientRequestState::summary_profile_ |
| # See be/src/service/client-request-state.[h|cc]. |
| start_time = tree.nodes[1].info_strings["Start Time"] |
| end_time = tree.nodes[1].info_strings["End Time"] |
| # Start and End Times are of the form "2017-12-07 22:26:52.167711000" |
| start_time_sub_sec_str = start_time.split('.')[-1] |
| end_time_sub_sec_str = end_time.split('.')[-1] |
| if len(end_time_sub_sec_str) == 0: |
| elapsed = time.time() - start |
| logging.info("end_time_sub_sec_str hasn't shown up yet, elapsed=%d", elapsed) |
| continue |
| |
| assert len(end_time_sub_sec_str) == 9, end_time |
| assert len(start_time_sub_sec_str) == 9, start_time |
| return True |
| |
| # If we're here, we didn't get the final thrift profile from the debug web page. |
| # This could happen due to heavy system load. The test is then inconclusive. |
| # Log a message and fail this run. |
| |
| dbg_str = "Debug thrift profile for query {0} not available in {1} seconds".format( |
| query_id, MAX_WAIT) |
| assert False, dbg_str |