blob: a41a431dc0c03f79c7f091b9a44239ff005759e7 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import os
import pytest
import re
import tempfile
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_dimensions import create_client_protocol_http_transport
from time import sleep
from tests.shell.util import run_impala_shell_cmd
class TestImpalaShellCommandLine(CustomClusterTestSuite):
"""Runs tests of the Impala shell by first standing up an Impala cluster with
specific startup flags. Then, the Impala shell is launched with specific arguments
in a separate process. Assertions are done by scanning the shell output and Impala
server logs for expected strings."""
LOG_DIR_HTTP_TRACING = tempfile.mkdtemp(prefix="http_tracing")
LOG_DIR_HTTP_TRACING_OFF = tempfile.mkdtemp(prefix="http_tracing_off")
IMPALA_ID_RE = "([0-9a-f]{16}:[0-9a-f]{16})"
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
"""Overrides all other add_dimension methods in super classes up the entire class
hierarchy ensuring that each test in this class only get run once using the
hs2-http protocol."""
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_http_transport())
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("-log_dir={0} -v 2".format(LOG_DIR_HTTP_TRACING))
def test_http_tracing_headers(self, vector):
"""Asserts that tracing headers are automatically added by the impala shell to
all calls to the backend impala engine made using the hs2 over http protocol.
The impala coordinator logs are searched to ensure these tracing headers were added
and also were passed through to the coordinator."""
args = ['--protocol', vector.get_value('protocol'), '-q', 'select version();profile']
result = run_impala_shell_cmd(vector, args)
# Shut down cluster to ensure logs flush to disk.
sleep(5)
self._stop_impala_cluster()
# Ensure the query ran successfully.
assert result.stdout.find("version()") > -1
assert result.stdout.find("impalad version") > -1
assert result.stdout.find("Query Runtime Profile") > -1
request_id_base = ""
request_id_serialnum = 0
session_id = ""
query_id = ""
last_known_query_id = ""
tracing_lines_count = 0
request_id_re = re.compile("x-request-id=([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-"
"[0-9a-f]{4}-[0-9a-f]{12})-(\\d+)")
session_id_re = re.compile("x-session-id={0}"
.format(TestImpalaShellCommandLine.IMPALA_ID_RE))
query_id_re = re.compile("x-query-id={0}"
.format(TestImpalaShellCommandLine.IMPALA_ID_RE))
profile_query_id_re = re.compile("Query \\(id={0}\\)"
.format(TestImpalaShellCommandLine.IMPALA_ID_RE))
# Find all HTTP Connection Tracing log lines.
with open(os.path.join(self.LOG_DIR_HTTP_TRACING, "impalad.INFO")) as log_file:
for line in log_file:
if line.find("HTTP Connection Tracing Headers") > -1:
tracing_lines_count += 1
# The impala shell builds a request_id that consists of the same randomly
# generated uuid and a serially increasing integer appended on the end.
# Ensure both these conditions are met.
m = request_id_re.search(line)
assert m is not None, \
"did not find request id in HTTP connection tracing log line '{0}'" \
.format(line)
if request_id_base == "":
# The current line is the very first HTTP connection tracing line in the logs.
request_id_base = m.group(1)
else:
assert request_id_base == m.group(1), \
"base request id expected '{0}', actual '{1}'" \
.format(request_id_base, m.group(1))
request_id_serialnum += 1
assert request_id_serialnum == int(m.group(2)), \
"request id serial number expected '{0}', actual '{1}'" \
.format(request_id_serialnum, m.group(2))
# The session_id is generated by impala and must be the same once it
# appears in a tracing log line.
m = session_id_re.search(line)
if m is not None:
if session_id == "":
session_id = m.group(1)
else:
assert session_id == m.group(1), \
"session id expected '{0}', actual '{1}'".format(session_id, m.group(1))
# The query_id is generated by impala and must be the same for the
# duration of the query.
m = query_id_re.search(line)
if m is None:
query_id = ""
else:
if query_id == "":
query_id = m.group(1)
last_known_query_id = query_id
else:
assert query_id == m.group(1), \
"query id expected '{0}', actual '{1}'".format(query_id, m.group(1))
# Assert that multiple HTTP connection tracing log lines were found.
assert tracing_lines_count > 10, \
"did not find enough HTTP connection tracing log lines, found {0} lines" \
.format(tracing_lines_count)
# Ensure the last found query id matches the actual query id
# from the impala query profile.
m = profile_query_id_re.search(result.stdout)
if m is not None:
assert last_known_query_id == m.group(1), \
"impala query profile id, expected '{0}', actual '{1}'" \
.format(last_known_query_id, m.group(1))
else:
pytest.fail("did not find Impala query id in shell stdout")
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("-log_dir={0} -v 2".format(LOG_DIR_HTTP_TRACING_OFF))
def test_http_tracing_headers_off(self, vector):
"""Asserts the impala shell command line parameter to prevent the addition of http
tracing headers actually leaves out those tracing headers."""
args = ['--protocol', vector.get_value('protocol'), '--no_http_tracing',
'-q', 'select version();profile']
result = run_impala_shell_cmd(vector, args)
# Shut down cluster to ensure logs flush to disk.
sleep(5)
self._stop_impala_cluster()
# Ensure the query ran successfully.
assert result.stdout.find("version()") > -1
assert result.stdout.find("impalad version") > -1
assert result.stdout.find("Query Runtime Profile") > -1
# Find all HTTP Connection Tracing log lines (there should not be any).
with open(os.path.join(self.LOG_DIR_HTTP_TRACING_OFF, "impalad.INFO")) as log_file:
for line in log_file:
if line.find("HTTP Connection Tracing Headers") != -1:
pytest.fail("found HTTP connection tracing line line: {0}".format(line))