blob: 10849784fa54368dee38ca1aa5ef073be8f17444 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import logging
import os
import pytest
import re
import subprocess
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_dimensions import create_client_protocol_dimension
from tests.common.test_vector import ImpalaTestVector
from tests.shell.util import run_impala_shell_cmd, get_impalad_port
LOG = logging.getLogger('test_client_keepalive')
class TestClientKeepalive(CustomClusterTestSuite):
"""Tests for enabling server-side keepalive for client connections.
The mechanism is slightly different SSL and non-SSL, so this tests both."""
CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
SSL_ARGS = ("--ssl_client_ca_certificate=%s/server-cert.pem "
"--ssl_server_certificate=%s/server-cert.pem "
"--ssl_private_key=%s/server-key.pem "
"--hostname=localhost " # Required to match hostname in certificate
% (CERT_DIR, CERT_DIR, CERT_DIR))
KEEPALIVE_ARGS = ("--client_keepalive_probe_period_s=600")
def get_ss_command(self):
# HACK: Most systems have ss on the PATH, but sometimes the PATH is misconfigured
# while ss is still available in /usr/sbin. This tries the PATH and then falls back
# to trying /usr/sbin/ss.
possible_ss_commands = ['ss', '/usr/sbin/ss']
with open(os.devnull, "w") as devnull:
for ss_command in possible_ss_commands:
try:
retcode = subprocess.call([ss_command], stdout=devnull, stderr=devnull)
LOG.info("{0} returns {1}".format(ss_command, retcode))
if retcode == 0:
return ss_command
except Exception as e:
LOG.info(e)
pass
raise Exception("No valid ss executable. Tried: {0}".format(possible_ss_commands))
def check_keepalive(self, vector, ssl):
ss = self.get_ss_command()
impalad_port = get_impalad_port(vector)
# Sleep 1 second to make sure the connection is idle, then use the ss utility
# to print information about keepalive.
# -H disables the header
# -t limits it to TCP connections
# -o prints the timer information which includes keepalive
# -n uses numeric addresses to avoid DNS lookups
# sport X - limit to connections for the impalad port that we are using
ss_command = "sleep 1 && {0} -Hton sport = {1}".format(ss, impalad_port)
LOG.info("Command: {0}".format(ss_command))
shell_options = ["-q", "shell {0}".format(ss_command)]
if ssl:
shell_options.append("--ssl")
result = run_impala_shell_cmd(vector, shell_options)
LOG.info("STDOUT: {0} STDERR: {1}".format(result.stdout, result.stderr))
# The message is of the form "timer:(keepalive,$TIME,$NUM_RETRIES)"
# e.g. "timer:(keepalive,9min58sec,0)" or "timer:(keepalive,10min,0)"
KEEPALIVE_REGEX = r"timer:\(keepalive,([0-9]+)min([0-9]+sec)?,([0-9])\)"
match = re.search(KEEPALIVE_REGEX, result.stdout)
assert match, "Could not find keepalive information in {0}".format(result.stdout)
num_minutes = int(match.group(1))
num_retries = int(match.group(3))
assert num_minutes == 9 or num_minutes == 10
assert num_retries == 0
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args=(SSL_ARGS + KEEPALIVE_ARGS),
statestored_args=SSL_ARGS,
catalogd_args=SSL_ARGS)
def test_ssl_keepalive(self, vector):
# Keepalive applies to all client ports / protocols, so test all protocols
# Iterate over test vector within test function to avoid restart cluster
for vector in\
[ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
self.check_keepalive(vector, ssl=True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args=KEEPALIVE_ARGS)
def test_nonssl_keepalive(self, vector):
# Keepalive applies to all client ports / protocols, so test all protocols
# Iterate over test vector within test function to avoid restart cluster
for vector in\
[ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
self.check_keepalive(vector, ssl=False)