blob: 7ae8e00cae6c47f6b1321c143ce949b67f8cd9b9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Injects failures at specific locations in each of the plan nodes. Currently supports
# two types of failures - cancellation of the query and a failure test hook.
import pytest
import os
import re
from collections import defaultdict
from time import sleep
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
from tests.common.skip import SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
from tests.common.test_dimensions import create_exec_option_dimension
from tests.common.test_vector import ImpalaTestDimension
# Not included:
# - SCANNER_ERROR, because it only fires if the query already hit an error.
# Map debug actions to their corresponding query options' values.
MT_DOP_VALUES = [0, 4]
# Queries should cover all exec nodes.
"select * from alltypes",
"select count(*) from alltypessmall",
"select count(int_col) from alltypessmall group by id",
"select 1 from alltypessmall a join alltypessmall b on =",
"select 1 from alltypessmall a join alltypessmall b on !=",
"select 1 from alltypessmall order by id",
"select 1 from alltypessmall order by id limit 100",
"select * from alltypessmall union all select * from alltypessmall",
"select row_number() over (partition by int_col order by id) from alltypessmall",
"select c from (select id c from alltypessmall order by id limit 10) v where c = 1"
@SkipIf.skip_hbase # -skip_hbase argument specified
@SkipIfS3.hbase # S3: missing coverage: failures
@SkipIfIsilon.hbase # ISILON: missing coverage: failures.
class TestFailpoints(ImpalaTestSuite):
def get_workload(cls):
return 'functional-query'
def add_test_dimensions(cls):
super(TestFailpoints, cls).add_test_dimensions()
ImpalaTestDimension('query', *QUERIES))
ImpalaTestDimension('action', *FAILPOINT_ACTIONS))
ImpalaTestDimension('location', *FAILPOINT_LOCATIONS))
ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
create_exec_option_dimension([0], [False], [0]))
# Don't create PREPARE:WAIT debug actions because cancellation is not checked until
# after the prepare phase once execution is started.
lambda v: not (v.get_value('action') == 'CANCEL'
and v.get_value('location') == 'PREPARE'))
# Don't create CLOSE:WAIT debug actions to avoid leaking plan fragments (there's no
# way to cancel a plan fragment once Close() has been called)
lambda v: not (v.get_value('action') == 'CANCEL'
and v.get_value('location') == 'CLOSE'))
# Run serially because this can create enough memory pressure to invoke the Linux OOM
# killer on machines with 30GB RAM. This makes the test run in 4 minutes instead of 1-2.
def test_failpoints(self, vector):
query = vector.get_value('query')
action = vector.get_value('action')
location = vector.get_value('location')
vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
plan_node_ids = self.__parse_plan_nodes_from_explain(query, vector)
except ImpalaBeeswaxException as e:
if "MT_DOP not supported" in str(e):
pytest.xfail(reason="MT_DOP not supported.")
raise e
for node_id in plan_node_ids:
debug_action = '%d:%s:%s' % (node_id, location, FAILPOINT_ACTION_MAP[action])
# IMPALA-7046: add jitter to backend startup to exercise various failure paths.
debug_action += '|COORD_BEFORE_EXEC_RPC:JITTER@100@0.3''Current debug action: SET DEBUG_ACTION=%s' % debug_action)
vector.get_value('exec_option')['debug_action'] = debug_action
if action == 'CANCEL':
self.__execute_cancel_action(query, vector)
elif action == 'FAIL' or action == 'MEM_LIMIT_EXCEEDED':
self.__execute_fail_action(query, vector)
assert 0, 'Unknown action: %s' % action
# We should be able to execute the same query successfully when no failures are
# injected.
del vector.get_value('exec_option')['debug_action']
self.execute_query(query, vector.get_value('exec_option'))
def __parse_plan_nodes_from_explain(self, query, vector):
"""Parses the EXPLAIN <query> output and returns a list of node ids.
Expects format of <ID>:<NAME>"""
explain_result =\
self.execute_query("explain " + query, vector.get_value('exec_option'),
node_ids = []
for row in
match ='\s*(?P<node_id>\d+)\:(?P<node_type>\S+\s*\S+)', row)
if match is not None:
return node_ids
def test_lifecycle_failures(self):
"""Test that targeted failure injections in the query lifecycle do not cause crashes
or hangs"""
query = "select * from tpch.lineitem limit 10000"
# Test that the admission controller handles scheduler errors correctly.
result = self.execute_query_expect_failure(self.client, query,
query_options={'debug_action': debug_action})
assert "Error during scheduling" in str(result)
# Fail the Prepare() phase of all fragment instances.
debug_action = 'FIS_IN_PREPARE:FAIL@1.0'
self.execute_query_expect_failure(self.client, query,
query_options={'debug_action': debug_action})
# Fail the Open() phase of all fragment instances.
debug_action = 'FIS_IN_OPEN:FAIL@1.0'
self.execute_query_expect_failure(self.client, query,
query_options={'debug_action': debug_action})
# Fail the ExecInternal() phase of all fragment instances.
debug_action = 'FIS_IN_EXEC_INTERNAL:FAIL@1.0'
self.execute_query_expect_failure(self.client, query,
query_options={'debug_action': debug_action})
# Fail the fragment instance thread creation with a 0.5 probability.
debug_action = 'FIS_FAIL_THREAD_CREATION:FAIL@0.5'
# We want to test the behavior when only some fragment instance threads fail to be
# created, so we set the probability of fragment instance thread creation failure to
# 0.5. Since there's only a 50% chance of fragment instance thread creation failure,
# we attempt to catch a query failure up to a very conservative maximum of 50 tries.
for i in range(50):
query_options={'debug_action': debug_action})
except ImpalaBeeswaxException as e:
assert 'Debug Action: FIS_FAIL_THREAD_CREATION:FAIL@0.5' \
in str(e), str(e)
def __execute_fail_action(self, query, vector):
self.execute_query(query, vector.get_value('exec_option'),
assert 'Expected Failure'
except ImpalaBeeswaxException as e:
# IMPALA-5197: None of the debug actions should trigger corrupted file message
assert 'Corrupt Parquet file' not in str(e)
def __execute_cancel_action(self, query, vector):'Starting async query execution')
handle = self.execute_query_async(query, vector.get_value('exec_option'),
cancel_result = self.client.cancel(handle)
assert cancel_result.status_code == 0,\
'Unexpected status code from cancel request: %s' % cancel_result