blob: 90db9e8a404feace1a821a4ae9390dec07f46cef [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
from time import sleep
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import extend_exec_option_dimension
from tests.util.parse_util import parse_duration_string_ms
class TestFetch(ImpalaTestSuite):
"""Tests that are independent of whether result spooling is enabled or not."""
@classmethod
def add_test_dimensions(cls):
super(TestFetch, cls).add_test_dimensions()
# Result fetching should be independent of file format, so only test against
# Parquet files.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
@classmethod
def get_workload(cls):
return 'functional-query'
def test_rows_sent_counters(self, vector):
"""Validate that ClientFetchWaitTimer, NumRowsFetched, RowMaterializationRate,
and RowMaterializationTimer are set to valid values in the ImpalaServer section
of the runtime profile."""
num_rows = 25
query = "select sleep(100) from functional.alltypes limit {0}".format(num_rows)
handle = self.execute_query_async(query, vector.get_value('exec_option'))
try:
# Wait until the query is 'FINISHED' and results are available for fetching.
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
# Sleep for 2.5 seconds so that the ClientFetchWaitTimer is >= 1s.
sleep(2.5)
# Fetch the results so that the fetch related counters are updated.
assert self.client.fetch(query, handle).success
runtime_profile = self.client.get_runtime_profile(handle)
fetch_timer = re.search("ClientFetchWaitTimer: (.*)", runtime_profile)
assert fetch_timer and len(fetch_timer.groups()) == 1 and \
parse_duration_string_ms(fetch_timer.group(1)) > 1000
assert "NumRowsFetched: {0} ({0})".format(num_rows) in runtime_profile
assert re.search("RowMaterializationRate: [1-9]", runtime_profile)
# The query should take at least 1s to materialize all rows since it should sleep
# for at least 1s during materialization.
materialization_timer = re.search("RowMaterializationTimer: (.*)", runtime_profile)
assert materialization_timer and len(materialization_timer.groups()) == 1 and \
parse_duration_string_ms(materialization_timer.group(1)) > 1000
finally:
self.client.close_query(handle)
class TestFetchAndSpooling(ImpalaTestSuite):
"""Tests that apply when result spooling is enabled or disabled."""
@classmethod
def add_test_dimensions(cls):
super(TestFetchAndSpooling, cls).add_test_dimensions()
# Result fetching should be independent of file format, so only test against
# Parquet files.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
extend_exec_option_dimension(cls, 'spool_query_results', 'true')
@classmethod
def get_workload(cls):
return 'functional-query'
def test_rows_sent_counters(self, vector):
"""Validate that RowsSent and RowsSentRate are set to valid values in
the PLAN_ROOT_SINK section of the runtime profile."""
num_rows = 10
if ('spool_query_results' in vector.get_value('exec_option') and
bool(vector.get_value('exec_option')['spool_query_results'])):
vector.get_value('exec_option')['debug_action'] = "BPRS_BEFORE_ADD_BATCH:SLEEP@1000"
else:
vector.get_value('exec_option')['debug_action'] = "BPRS_BEFORE_ADD_ROWS:SLEEP@1000"
result = self.execute_query("select id from functional.alltypes limit {0}"
.format(num_rows), vector.get_value('exec_option'))
assert "RowsSent: {0} ({0})".format(num_rows) in result.runtime_profile
rows_sent_rate = re.search("RowsSentRate: (\d*\.?\d*)", result.runtime_profile)
assert rows_sent_rate
assert float(rows_sent_rate.group(1)) > 0
class TestFetchTimeout(ImpalaTestSuite):
"""A few basic tests for FETCH_ROWS_TIMEOUT_MS that are not specific to the HS2 protocol
(e.g. in contrast to the tests in tests/hs2/test_fetch_timeout.py). These tests are
necessary because part of the FETCH_ROWS_TIMEOUT_MS code is HS2/Beeswax specific.
Unlike the tests in hs2/test_fetch_timeout.py, these tests do not validate that
individual RPC calls timeout, instead they set a low value for the timeout and assert
that the query works end-to-end."""
@classmethod
def add_test_dimensions(cls):
super(TestFetchTimeout, cls).add_test_dimensions()
# Result fetching should be independent of file format, so only test against
# Parquet files.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
extend_exec_option_dimension(cls, 'spool_query_results', 'true')
@classmethod
def get_workload(cls):
return 'functional-query'
def test_fetch_timeout(self, vector):
"""A simple test that runs a query with a low timeout and introduces delays in
RowBatch production. Asserts that the query succeeds and returns the expected number
of rows."""
num_rows = 100
query = "select * from functional.alltypes limit {0}".format(num_rows)
vector.get_value('exec_option')['batch_size'] = 1
vector.get_value('exec_option')['fetch_rows_timeout_ms'] = 1
vector.get_value('exec_option')['debug_action'] = '0:GETNEXT:DELAY'
results = self.execute_query(query, vector.get_value('exec_option'))
assert results.success
assert len(results.data) == num_rows
def test_fetch_before_finished_timeout(self, vector):
"""Tests that the FETCH_ROWS_TIMEOUT_MS timeout applies to queries that are not in
the 'finished' state. Similar to the test tests/hs2/test_fetch_timeout.py::
TestFetchTimeout::test_fetch_before_finished_timeout(_with_result_spooling)."""
num_rows = 10
query = "select * from functional.alltypes limit {0}".format(num_rows)
vector.get_value('exec_option')['debug_action'] = 'CRS_BEFORE_COORD_STARTS:SLEEP@5000'
vector.get_value('exec_option')['fetch_rows_timeout_ms'] = '1000'
results = self.execute_query(query, vector.get_value('exec_option'))
assert results.success
assert len(results.data) == num_rows