blob: f450287666ec2d335923cd72721e50956e9158e8 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import requests
from shell.ImpalaHttpClient import ImpalaHttpClient
from shell.impala_client import ImpalaHS2Client
from tests.common.impala_test_suite import IMPALAD_HS2_HTTP_HOST_PORT
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from time import sleep
class FaultInjectingHttpClient(ImpalaHttpClient, object):
"""Class for injecting faults in the ImpalaHttpClient. Faults are injected by using the
'enable_fault' method. The 'flush' method is overridden to check for injected faults
and raise exceptions, if needed."""
def __init__(self, *args, **kwargs):
super(FaultInjectingHttpClient, self).__init__(*args, **kwargs)
self.fault_code = None
self.fault_message = None
self.fault_enabled = False
self.num_requests = 0
self.fault_frequency = 0
self.fault_enabled = False
def enable_fault(self, http_code, http_message, fault_frequency):
"""Inject fault with given code and message at the given frequency.
As an example, if frequency is 20% then inject fault for 1 out of every 5
requests."""
self.fault_enabled = True
self.fault_code = http_code
self.fault_message = http_message
self.fault_frequency = fault_frequency
assert fault_frequency > 0 and fault_frequency <= 1
self.num_requests = 0
def disable_fault(self):
self.fault_enabled = False
def _check_code(self):
if self.code >= 300:
# Report any http response code that is not 1XX (informational response) or
# 2XX (successful).
raise Exception("HTTP code {}: {}".format(self.code, self.message))
def _inject_fault(self):
if not self.fault_enabled:
return False
if self.fault_frequency == 1:
return True
if round(self.num_requests % (1 / self.fault_frequency)) == 1:
return True
return False
def flush(self):
ImpalaHttpClient.flush(self)
self.num_requests += 1
# Override code and message with the injected fault
if self.fault_code is not None and self._inject_fault():
self.code = self.fault_code
self.message = self.fault_message
self._check_code()
class FaultInjectingImpalaHS2Client(ImpalaHS2Client):
"""Fault injecting ImpalaHS2Client class using FaultInjectingHttpClient as the
transport"""
def __init__(self, *args, **kwargs):
"""Creates a transport with HTTP as the base."""
super(FaultInjectingImpalaHS2Client, self).__init__(*args, **kwargs)
assert self.use_http_base_transport
host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port)
assert self.http_path
transport = FaultInjectingHttpClient("http://{0}/{1}".
format(host_and_port, self.http_path))
transport.open()
self.transport = transport
def _get_http_transport(self, connect_timeout_ms):
self.transport.open()
return self.transport
def ping(self):
return self._ping_impala_service()
class TestHS2FaultInjection(CustomClusterTestSuite):
"""Class for testing the http fault injection in various rpcs used by the
impala-shell client"""
def setup(self):
impalad = IMPALAD_HS2_HTTP_HOST_PORT.split(":")
self.custom_hs2_http_client = FaultInjectingImpalaHS2Client(impalad, 1024,
kerberos_host_fqdn=None, use_http_base_transport=True, http_path='cliservice')
self.transport = self.custom_hs2_http_client.transport
def teardown(self):
self.transport.disable_fault()
self.custom_hs2_http_client.close_connection()
def connect(self):
self.custom_hs2_http_client.connect()
assert self.custom_hs2_http_client.connected
def close_query(self, query_handle):
self.transport.disable_fault()
self.custom_hs2_http_client.close_query(query_handle)
def __expect_msg_retry(self, impala_rpc_name):
"""Returns expected log message for rpcs which can be retried"""
return ("Caught exception HTTP code 502: Injected Fault, "
"type=<type 'exceptions.Exception'> in {0}. "
"Num remaining tries: 3".format(impala_rpc_name))
def __expect_msg_no_retry(self, impala_rpc_name):
"""Returns expected log message for rpcs which can not be retried"""
return ("Caught exception HTTP code 502: Injected Fault, "
"type=<type 'exceptions.Exception'> in {0}. ".format(impala_rpc_name))
@pytest.mark.execute_serially
def test_connect(self, capsys):
"""Tests fault injection in ImpalaHS2Client's connect().
OpenSession and CloseImpalaOperation rpcs fail.
Retries results in a successfull connection."""
self.transport.enable_fault(502, "Injected Fault", 0.20)
self.connect()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("OpenSession")
assert output[2] == self.__expect_msg_retry("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_close_connection(self, capsys):
"""Tests fault injection in ImpalaHS2Client's close_connection().
CloseSession rpc fails due to the fault, but succeeds anyways since exceptions
are ignored."""
self.connect()
self.transport.enable_fault(502, "Injected Fault", 0.50)
self.custom_hs2_http_client.close_connection()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("CloseSession")
@pytest.mark.execute_serially
def test_ping(self, capsys):
"""Tests fault injection in ImpalaHS2Client's _ping_impala_service().
PingImpalaHS2Service rpc fails due to the fault, but suceeds after retry"""
self.connect()
self.transport.enable_fault(502, "Injected Fault", 0.50)
version, webserver_address = None, None
version, webserver_address = self.custom_hs2_http_client.ping()
assert version is not None
assert webserver_address is not None
page = requests.get('{0}/{1}'.format(webserver_address, 'healthz'))
assert page.status_code == requests.codes.ok
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("PingImpalaHS2Service")
@pytest.mark.execute_serially
def test_execute_query(self, capsys):
"""Tests fault injection in ImpalaHS2Client's execute_query().
ExecuteStatement rpc fails and results in error since retries are not supported."""
self.connect()
self.transport.enable_fault(502, "Injected Fault", 0.50)
query_handle = None
try:
query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
except Exception, e:
assert str(e) == 'HTTP code 502: Injected Fault'
assert query_handle is None
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("ExecuteStatement")
@pytest.mark.execute_serially
def test_fetch(self, capsys):
"""Tests fault injection in ImpalaHS2Client's fetch().
FetchResults rpc fails and results in error since retries are not supported."""
self.connect()
query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
rows_fetched = None
self.transport.enable_fault(502, "Injected Fault", 0.50)
num_rows = None
rows_fetched = self.custom_hs2_http_client.fetch(query_handle)
try:
for rows in rows_fetched:
num_rows += 1
except Exception, e:
assert str(e) == 'HTTP code 502: Injected Fault'
assert num_rows is None
self.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("FetchResults")
@pytest.mark.execute_serially
def test_close_dml(self, unique_database, capsys):
"""Tests fault injection in ImpalaHS2Client's close_dml().
CloseImpalaOperation rpc fails and results in error since retries are not
supported."""
table = '{0}.{1}'.format(unique_database, 'tbl')
self.client.execute('create table {0} (c1 int, c2 int)'.format(table))
self.connect()
dml = 'insert into {0} values (1,1)'.format(table)
query_handle = self.custom_hs2_http_client.execute_query(dml, {})
self.custom_hs2_http_client.wait_to_finish(query_handle)
self.transport.enable_fault(502, "Injected Fault", 0.50)
(num_rows, num_row_errors) = None, None
try:
(num_rows, num_row_errors) = self.custom_hs2_http_client.close_dml(query_handle)
except Exception, e:
assert str(e) == 'HTTP code 502: Injected Fault'
assert num_rows is None
assert num_row_errors is None
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_close_query(self, capsys):
"""Tests fault injection in ImpalaHS2Client's close_query().
CloseImpalaOperation rpc fails due to fault, but succeeds after a retry"""
self.connect()
query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
self.custom_hs2_http_client.wait_to_finish(query_handle)
self.transport.enable_fault(502, "Injected Fault", 0.50)
self.custom_hs2_http_client.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_cancel_query(self, capsys):
"""Tests fault injection in ImpalaHS2Client's cancel_query().
CancelOperation rpc fails due to fault, but succeeds after a retry"""
self.connect()
query_handle = self.custom_hs2_http_client.execute_query('select sleep(50000)', {})
sleep(5)
self.transport.enable_fault(502, "Injected Fault", 0.50)
success = self.custom_hs2_http_client.cancel_query(query_handle)
assert success
self.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("CancelOperation")
@pytest.mark.execute_serially
def test_get_query_state(self, capsys):
"""Tests fault injection in ImpalaHS2Client's get_query_state().
GetOperationStatus rpc fails due to fault, but succeeds after a retry"""
self.connect()
query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
self.transport.enable_fault(502, "Injected Fault", 0.50)
self.custom_hs2_http_client.wait_to_finish(query_handle)
query_state = self.custom_hs2_http_client.get_query_state(query_handle)
assert query_state == self.custom_hs2_http_client.FINISHED_STATE
self.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("GetOperationStatus")
@pytest.mark.execute_serially
def test_get_runtime_profile_summary(self, capsys):
"""Tests fault injection in ImpalaHS2Client's get_runtime_profile().
GetRuntimeProfile and GetExecSummary rpc fails due to fault, but succeeds
after a retry"""
self.connect()
query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
self.custom_hs2_http_client.wait_to_finish(query_handle)
self.custom_hs2_http_client.close_query(query_handle)
self.transport.enable_fault(502, "Injected Fault", 0.50)
profile = self.custom_hs2_http_client.get_runtime_profile(query_handle)
assert profile is not None
summary = self.custom_hs2_http_client.get_summary(query_handle)
assert summary is not None
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("GetRuntimeProfile")
assert output[2] == self.__expect_msg_retry("GetExecSummary")
@pytest.mark.execute_serially
def test_get_warning_log(self, capsys):
"""Tests fault injection in ImpalaHS2Client's get_warning_log().
GetLog rpc fails due to fault, but succeeds after a retry"""
self.connect()
sql = ("select * from functional.alltypes a inner join /* +foo */ "
"functional.alltypes b on a.id=b.id")
query_handle = self.custom_hs2_http_client.execute_query(sql, {})
self.custom_hs2_http_client.wait_to_finish(query_handle)
self.transport.enable_fault(502, "Injected Fault", 0.50)
warning_log = self.custom_hs2_http_client.get_warning_log(query_handle)
assert warning_log == 'WARNINGS: JOIN hint not recognized: foo'
self.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("GetLog")
@pytest.mark.execute_serially
def test_connection_drop(self):
"""Tests connection drop between rpcs. Each rpc starts a new connection so dropping
connections between rpcs should have no effect"""
self.connect()
self.transport.close()
query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
self.transport.close()
self.custom_hs2_http_client.wait_to_finish(query_handle)
self.transport.close()
query_state = self.custom_hs2_http_client.get_query_state(query_handle)
assert query_state == self.custom_hs2_http_client.FINISHED_STATE
num_rows = 0
self.transport.close()
rows_fetched = self.custom_hs2_http_client.fetch(query_handle)
for rows in rows_fetched:
num_rows += len(rows)
assert num_rows == 1
self.transport.close()
self.close_query(query_handle)
self.transport.close()
profile = self.custom_hs2_http_client.get_runtime_profile(query_handle)
assert profile is not None
self.transport.close()
summary = self.custom_hs2_http_client.get_summary(query_handle)
assert summary is not None