blob: cf0527be71d86745b015beb53e93e4f4b3fd5da1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
class TestObservability(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
def test_merge_exchange_num_rows(self):
"""Regression test for IMPALA-1473 - checks that the exec summary for a merging
exchange with a limit reports the number of rows returned as equal to the limit,
and that the coordinator fragment portion of the runtime profile reports the number
of rows returned correctly."""
query = """select tinyint_col, count(*) from functional.alltypes
group by tinyint_col order by tinyint_col limit 5"""
result = self.execute_query(query)
assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE'
assert result.exec_summary[0]['num_rows'] == 5
assert result.exec_summary[0]['est_num_rows'] == 5
for line in result.runtime_profile.split('\n'):
# The first 'RowsProduced' we find is for the coordinator fragment.
if 'RowsProduced' in line:
assert '(5)' in line
break
def test_broadcast_num_rows(self):
"""Regression test for IMPALA-3002 - checks that the num_rows for a broadcast node
in the exec summaty is correctly set as the max over all instances, not the sum."""
query = """select distinct a.int_col, a.string_col from functional.alltypes a
inner join functional.alltypessmall b on (a.id = b.id)
where a.year = 2009 and b.month = 2"""
result = self.execute_query(query)
assert result.exec_summary[5]['operator'] == '04:EXCHANGE'
assert result.exec_summary[5]['num_rows'] == 25
assert result.exec_summary[5]['est_num_rows'] == 25
@SkipIfS3.hbase
@SkipIfLocal.hbase
@SkipIfIsilon.hbase
@SkipIfADLS.hbase
def test_scan_summary(self):
"""IMPALA-4499: Checks that the exec summary for scans show the table name."""
# HDFS table
query = "select count(*) from functional.alltypestiny"
result = self.execute_query(query)
scan_idx = len(result.exec_summary) - 1
assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HDFS'
assert result.exec_summary[scan_idx]['detail'] == 'functional.alltypestiny'
# KUDU table
query = "select count(*) from functional_kudu.alltypestiny"
result = self.execute_query(query)
scan_idx = len(result.exec_summary) - 1
assert result.exec_summary[scan_idx]['operator'] == '00:SCAN KUDU'
assert result.exec_summary[scan_idx]['detail'] == 'functional_kudu.alltypestiny'
# HBASE table
query = "select count(*) from functional_hbase.alltypestiny"
result = self.execute_query(query)
scan_idx = len(result.exec_summary) - 1
assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE'
assert result.exec_summary[scan_idx]['detail'] == 'functional_hbase.alltypestiny'
def test_query_states(self):
"""Tests that the query profile shows expected query states."""
query = "select count(*) from functional.alltypes"
handle = self.execute_query_async(query, dict())
profile = self.client.get_runtime_profile(handle)
# If ExecuteStatement() has completed but the results haven't been fetched yet, the
# query must have at least reached RUNNING.
assert "Query State: RUNNING" in profile or \
"Query State: FINISHED" in profile, profile
results = self.client.fetch(query, handle)
profile = self.client.get_runtime_profile(handle)
# After fetching the results, the query must be in state FINISHED.
assert "Query State: FINISHED" in profile, profile
def test_query_options(self):
"""Test that the query profile shows expected non-default query options, both set
explicitly through client and those set by planner"""
# Set a query option explicitly through client
self.execute_query("set MEM_LIMIT = 8589934592")
# Make sure explicitly set default values are not shown in the profile
self.execute_query("set MAX_IO_BUFFERS = 0")
runtime_profile = self.execute_query("select 1").runtime_profile
assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in runtime_profile
# For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1,
# RUNTIME_FILTER_MODE=0 and MT_DOP=0
assert "Query Options (set by configuration and planner): MEM_LIMIT=8589934592," \
"NUM_NODES=1,NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n" \
in runtime_profile