blob: 495305b18552e43cbb6709ce8f7c19d84827080a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIf, SkipIfBuildType
@SkipIfBuildType.not_dev_build
class TestRPCException(CustomClusterTestSuite):
"""Tests Impala exception handling in TransmitData() RPC to make sure no
duplicated row batches are sent. """
# DataStreamService rpc names
TRANSMIT_DATA_RPC = "TransmitData"
END_DATA_STREAM_RPC = "EndDataStream"
# Error to specify for ImpalaServicePool to reject rpcs with a 'server too busy' error.
REJECT_TOO_BUSY_MSG = "REJECT_TOO_BUSY"
# The BE krpc port of the impalad these tests simulate rpc errors at.
KRPC_PORT = 27002
# This query ends up calling TransmitData() more than 2048 times to ensure
# proper test coverage.
TEST_QUERY = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
where t1.l_orderkey = t2.l_orderkey"
EXPECTED_RESULT = ['30012985']
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestRPCException, cls).setup_class()
def _get_num_fails(self, impalad):
num_fails = impalad.service.get_metric_value("impala.debug_action.fail")
if num_fails is None:
return 0
return num_fails
# Execute TEST_QUERY repeatedly until the FAIL debug action has been hit. If
# 'exception_string' is None, it's expected to always complete sucessfully with result
# matching EXPECTED_RESULT. Otherwise, it's expected to either succeed or fail with
# the error 'exception_string'.
def execute_test_query(self, exception_string):
impalad = self.cluster.impalads[2]
assert impalad.service.krpc_port == self.KRPC_PORT
# Re-run the query until the metrics show that we hit the debug action or we've run 10
# times. Each test in this file has at least a 50% chance of hitting the action per
# run, so there's at most a (1/2)^10 chance that this loop will fail spuriously.
i = 0
while self._get_num_fails(impalad) == 0 and i < 10:
i += 1
try:
result = self.client.execute(self.TEST_QUERY)
assert result.data == self.EXPECTED_RESULT, "Query returned unexpected results."
except ImpalaBeeswaxException as e:
if exception_string is None:
raise e
assert exception_string in str(e), "Query failed with unexpected exception."
assert self._get_num_fails(impalad) > 0, "Debug action wasn't hit after 10 iters."
def _get_fail_action(rpc, error=None, port=KRPC_PORT, p=0.1):
"""Returns a debug action that causes rpcs with the name 'rpc' that are sent to the
impalad at 'port' to FAIL with probability 'p' and return 'error' if specified."""
debug_action = "IMPALA_SERVICE_POOL:127.0.0.1:{port}:{rpc}:FAIL@{probability}" \
.format(rpc=rpc, probability=p, port=port)
if error is not None:
debug_action += "@" + error
return debug_action
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--debug_actions=" +
_get_fail_action(rpc=TRANSMIT_DATA_RPC, error=REJECT_TOO_BUSY_MSG))
def test_transmit_data_retry(self):
"""Run a query where TransmitData may fail with a "server too busy" error. We should
always retry in this case, so the query should always eventually succeed."""
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--debug_actions=" +
_get_fail_action(rpc=TRANSMIT_DATA_RPC, error=REJECT_TOO_BUSY_MSG) +
"|" + _get_fail_action(rpc=TRANSMIT_DATA_RPC))
def test_transmit_data_error(self):
"""Run a query where TransmitData may fail with a "server too busy" or with a generic
error. The query should either succeed or fail with the given error."""
self.execute_test_query("Debug Action: IMPALA_SERVICE_POOL:FAIL@0.1")
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--debug_actions=" +
_get_fail_action(rpc=END_DATA_STREAM_RPC, error=REJECT_TOO_BUSY_MSG, p=0.5))
def test_end_data_stream_retry(self):
"""Run a query where EndDataStream may fail with a "server too busy" error. We should
always retry in this case, so the query should always eventually succeed."""
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--debug_actions=" +
_get_fail_action(rpc=END_DATA_STREAM_RPC, error=REJECT_TOO_BUSY_MSG, p=0.5) +
"|" + _get_fail_action(rpc=END_DATA_STREAM_RPC, p=0.5))
def test_end_data_stream_error(self):
"""Run a query where EndDataStream may fail with a "server too busy" or with a generic
error. The query should either succeed or fail with the given error."""
self.execute_test_query("Debug Action: IMPALA_SERVICE_POOL:FAIL@0.5")