blob: b264fbe236f1b443ae75b5a8f8e196ba555b48b0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.common.environ import ImpalaTestClusterFlagsDetector
from tests.common.file_utils import grep_dir
from tests.common.skip import SkipIfBuildType
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
import itertools
import json
import os
import pytest
import re
import requests
class TestWebPage(ImpalaTestSuite):
ROOT_URL = "http://localhost:{0}/"
GET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/get_java_loglevel"
SET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/set_java_loglevel"
RESET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/reset_java_loglevel"
SET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/set_glog_level"
RESET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/reset_glog_level"
CATALOG_URL = "http://localhost:{0}/catalog"
CATALOG_OBJECT_URL = "http://localhost:{0}/catalog_object"
TABLE_METRICS_URL = "http://localhost:{0}/table_metrics"
QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends"
QUERY_FINSTANCES_URL = "http://localhost:{0}/query_finstances"
RPCZ_URL = "http://localhost:{0}/rpcz"
THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
MEMZ_URL = "http://localhost:{0}/memz"
METRICS_URL = "http://localhost:{0}/metrics"
JMX_URL = "http://localhost:{0}/jmx"
ADMISSION_URL = "http://localhost:{0}/admission"
RESET_RESOURCE_POOL_STATS_URL = "http://localhost:{0}/resource_pool_reset"
BACKENDS_URL = "http://localhost:{0}/backends"
PROMETHEUS_METRICS_URL = "http://localhost:{0}/metrics_prometheus"
# log4j changes do not apply to the statestore since it doesn't
# have an embedded JVM. So we make two sets of ports to test the
# log level endpoints, one without the statestore port and the
# one with it.
TEST_PORTS_WITHOUT_SS = ["25000", "25020"]
TEST_PORTS_WITH_SS = ["25000", "25010", "25020"]
IMPALAD_TEST_PORT = ["25000"]
CATALOG_TEST_PORT = ["25020"]
def test_get_root_url(self):
"""Tests that the root URL is accessible and loads properly"""
self.get_and_check_status(self.ROOT_URL)
def test_get_build_flags(self):
"""Tests that the build flags on the root page contain valid values"""
for port in self.TEST_PORTS_WITH_SS:
build_flags = ImpalaTestClusterFlagsDetector.\
get_build_flags_from_web_ui(self.ROOT_URL.format(port))
assert len(build_flags) == 3
assert "is_ndebug" in build_flags
assert build_flags["is_ndebug"] in ["true", "false"]
assert "cmake_build_type" in build_flags
assert build_flags["cmake_build_type"] in ["debug", "release", "address_sanitizer",
"tidy", "ubsan", "ubsan_full", "tsan", "code_coverage_release",
"code_coverage_debug"]
assert "library_link_type" in build_flags
assert build_flags["library_link_type"] in ["dynamic", "static"]
@SkipIfBuildType.remote
def test_root_correct_build_flags(self, cluster_properties):
"""Tests that the build flags on the root page contain correct values"""
assert not cluster_properties.is_remote_cluster()
for port in self.TEST_PORTS_WITH_SS:
build_flags = ImpalaTestClusterFlagsDetector.\
get_build_flags_from_web_ui(self.ROOT_URL.format(port))
assert build_flags["cmake_build_type"] == cluster_properties.build_flavor
assert build_flags["library_link_type"] == cluster_properties.library_link_type
def test_root_consistent_build_flags(self):
"""Tests that the build flags on the root page contain consistent values"""
for port in self.TEST_PORTS_WITH_SS:
build_flags = ImpalaTestClusterFlagsDetector.\
get_build_flags_from_web_ui(self.ROOT_URL.format(port))
is_ndebug = build_flags["is_ndebug"] == "true"
if not is_ndebug:
assert not build_flags["cmake_build_type"] in ["release"]
if build_flags["cmake_build_type"] in ["debug"]:
assert not is_ndebug
def test_memz(self):
"""Tests /memz at impalad / statestored / catalogd"""
page = requests.get("http://localhost:25000/memz")
assert page.status_code == requests.codes.ok
page = requests.get("http://localhost:25010/memz")
assert page.status_code == requests.codes.ok
page = requests.get("http://localhost:25020/memz")
assert page.status_code == requests.codes.ok
def test_memz_shows_fragment_instance_id(self):
"""Tests that the memory breakdown on memz shows fragment instance IDs."""
query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)"
query_handle = self.client.execute_async(query)
try:
self.wait_for_state(query_handle, self.client.QUERY_STATES['RUNNING'], 1000)
memz_breakdown = self.get_debug_page(self.MEMZ_URL)['detailed']
finstance_re = re.compile("Fragment [0-9a-f]{16}:[0-9a-f]{16}")
assert finstance_re.search(memz_breakdown), memz_breakdown
finally:
self.client.close_query(query_handle)
def test_query_profile_encoded_unknown_query_id(self):
"""Test that /query_profile_encoded error message starts with the expected line in
case of missing query and does not contain any leading whitespace.
"""
cluster = ImpalaCluster.get_e2e_test_cluster()
impalad = cluster.get_any_impalad()
result = impalad.service.read_debug_webpage("query_profile_encoded?query_id=123")
assert result.startswith("Could not obtain runtime profile: Query id")
def test_jmx_endpoint(self):
"""Tests that the /jmx endpoint on the Catalog and Impalads returns a valid json."""
for port in self.TEST_PORTS_WITHOUT_SS:
input_url = self.JMX_URL.format(port)
response = requests.get(input_url)
assert response.status_code == requests.codes.ok
assert "application/json" == response.headers['Content-Type']
jmx_json = ""
try:
jmx_json = json.loads(response.text)
assert "beans" in jmx_json.keys(), "Ill formatted JSON returned: %s" % jmx_json
except ValueError:
assert False, "Invalid JSON returned from /jmx endpoint: %s" % jmx_json
def get_and_check_status(
self, url, string_to_search="", ports_to_test=None, regex=False, headers=None):
"""Helper method that polls a given url and asserts the return code is ok and
the response contains the input string."""
if ports_to_test is None:
ports_to_test = self.TEST_PORTS_WITH_SS
responses = []
for port in ports_to_test:
input_url = url.format(port)
response = requests.get(input_url, headers=headers)
assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\
.format(input_url, string_to_search, response.text)
if regex:
assert re.search(string_to_search, response.text), "URL: {0} Str:'{1}'\nResp:{2}"\
.format(input_url, string_to_search, response.text)
else:
assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format(
input_url, string_to_search, response.text)
responses.append(response)
return responses
def get_debug_page(self, page_url, port=25000):
"""Returns the content of the debug page 'page_url' as json."""
responses = self.get_and_check_status(page_url + "?json", ports_to_test=[port])
assert len(responses) == 1
assert "application/json" in responses[0].headers['Content-Type']
return json.loads(responses[0].text)
def get_and_check_status_jvm(self, url, string_to_search=""):
"""Calls get_and_check_status() for impalad and catalogd only"""
return self.get_and_check_status(url, string_to_search,
ports_to_test=self.TEST_PORTS_WITHOUT_SS)
def test_content_type(self):
"""Checks that an appropriate content-type is set for various types of pages."""
# Mapping from each page to its MIME type.
page_to_mime =\
{"?json": "application/json", "?raw": "text/plain; charset=UTF-8",
"": "text/html; charset=UTF-8"}
for port in self.TEST_PORTS_WITH_SS:
for page, content_type in page_to_mime.items():
url = self.METRICS_URL.format(port) + page
assert content_type == requests.get(url).headers['Content-Type']
def test_log_level(self):
"""Test that the /log_level page outputs are as expected and work well on basic and
malformed inputs. This however does not test that the log level changes are actually
in effect."""
# Check that the log_level end points are accessible.
self.get_and_check_status_jvm(self.GET_JAVA_LOGLEVEL_URL)
self.get_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL)
self.get_and_check_status_jvm(self.RESET_JAVA_LOGLEVEL_URL)
self.get_and_check_status(self.SET_GLOG_LOGLEVEL_URL)
self.get_and_check_status(self.RESET_GLOG_LOGLEVEL_URL)
# Try getting log level of a class.
get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class" +
"=org.apache.impala.catalog.HdfsTable")
self.get_and_check_status_jvm(get_loglevel_url, "DEBUG")
# Set the log level of a class to TRACE and confirm the setting is in place
set_loglevel_url = (self.SET_JAVA_LOGLEVEL_URL + "?class" +
"=org.apache.impala.catalog.HdfsTable&level=trace")
self.get_and_check_status_jvm(set_loglevel_url, "Effective log level: TRACE")
get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class" +
"=org.apache.impala.catalog.HdfsTable")
self.get_and_check_status_jvm(get_loglevel_url, "TRACE")
# Check the log level of a different class and confirm it is still DEBUG
get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class" +
"=org.apache.impala.catalog.HdfsPartition")
self.get_and_check_status_jvm(get_loglevel_url, "DEBUG")
# Reset Java logging levels and check the logging level of the class again
self.get_and_check_status_jvm(self.RESET_JAVA_LOGLEVEL_URL, "Java log levels reset.")
get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class" +
"=org.apache.impala.catalog.HdfsTable")
self.get_and_check_status_jvm(get_loglevel_url, "DEBUG")
# Set a new glog level and make sure the setting has been applied.
set_glog_url = (self.SET_GLOG_LOGLEVEL_URL + "?glog=3")
self.get_and_check_status(set_glog_url, "v set to 3")
# Try resetting the glog logging defaults again.
self.get_and_check_status(self.RESET_GLOG_LOGLEVEL_URL, "v set to ")
# Try to get the log level of an empty class input
get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class=")
self.get_and_check_status_jvm(get_loglevel_url)
# Same as above, for set log level request
set_loglevel_url = (self.SET_JAVA_LOGLEVEL_URL + "?class=")
self.get_and_check_status_jvm(get_loglevel_url)
# Empty input for setting a glog level request
set_glog_url = (self.SET_GLOG_LOGLEVEL_URL + "?glog=")
self.get_and_check_status(set_glog_url)
# Try setting a non-existent log level on a valid class. In such cases,
# log4j automatically sets it as DEBUG. This is the behavior of
# Level.toLevel() method.
set_loglevel_url = (self.SET_JAVA_LOGLEVEL_URL + "?class" +
"=org.apache.impala.catalog.HdfsTable&level=foo&")
self.get_and_check_status_jvm(set_loglevel_url, "Effective log level: DEBUG")
# Try setting an invalid glog level.
set_glog_url = self.SET_GLOG_LOGLEVEL_URL + "?glog=foo"
self.get_and_check_status(set_glog_url, "Bad glog level input")
# Try a non-existent endpoint on log_level URL.
bad_loglevel_url = self.SET_GLOG_LOGLEVEL_URL + "?badurl=foo"
self.get_and_check_status(bad_loglevel_url)
@pytest.mark.execute_serially
def test_uda_with_log_level(self):
"""IMPALA-7903: Impala crashes when executing an aggregate query with log level set
to 3. Running this test serially not to interfere with other tests setting the log
level."""
# Check that the log_level end points are accessible.
self.get_and_check_status(self.SET_GLOG_LOGLEVEL_URL)
self.get_and_check_status(self.RESET_GLOG_LOGLEVEL_URL)
# Set log level to 3.
set_glog_url = (self.SET_GLOG_LOGLEVEL_URL + "?glog=3")
self.get_and_check_status(set_glog_url, "v set to 3")
# Check that Impala doesn't crash when running a query that aggregates.
self.client.execute("select avg(int_col) from functional.alltypessmall")
# Reset log level.
self.get_and_check_status(self.RESET_GLOG_LOGLEVEL_URL, "v set to ")
def test_catalog(self, cluster_properties, unique_database):
"""Tests the /catalog and /catalog_object endpoints."""
# Non-partitioned table
query = "create table {0}.foo (id int, val int)".format(unique_database)
self.execute_query(query)
insert_query = "insert into {0}.foo values (1, 200)".format(unique_database)
self.execute_query(insert_query)
# Partitioned table
partitioned_query = "create table {0}.foo_part (id int, val int) partitioned by (" \
"year int)".format(unique_database)
self.execute_query(partitioned_query)
partition_insert_query = "insert into {0}.foo_part partition (year=2010) values "\
"(1, 200)".format(unique_database)
self.execute_query(partition_insert_query)
# Kudu table
kudu_query = "create table {0}.foo_kudu (id int, val int, primary key (id)) " \
"stored as kudu".format(unique_database)
self.execute_query(kudu_query)
kudu_insert_query = "insert into {0}.foo_kudu values (1, 200)".format(unique_database)
self.execute_query(kudu_insert_query)
# Partitioned parquet table
parquet_query = "create table {0}.foo_part_parquet (id int, val int) partitioned " \
"by (year int) stored as parquet".format(unique_database)
self.execute_query(parquet_query)
parquet_insert_query = "insert into {0}.foo_part_parquet partition (year=2010) " \
"values (1, 200)".format(unique_database)
self.execute_query(parquet_insert_query)
self.get_and_check_status_jvm(self.CATALOG_URL, unique_database)
self.get_and_check_status_jvm(self.CATALOG_URL, "foo_part")
# IMPALA-5028: Test toThrift() of a partitioned table via the WebUI code path.
self.__test_catalog_object(unique_database, "foo_part", cluster_properties)
self.__test_catalog_object(unique_database, "foo_kudu", cluster_properties)
self.__test_catalog_object(unique_database, "foo_part_parquet", cluster_properties)
self.__test_catalog_object(unique_database, "foo", cluster_properties)
self.__test_table_metrics(unique_database, "foo_part", "total-file-size-bytes")
self.__test_table_metrics(unique_database, "foo_part", "num-files")
self.__test_table_metrics(unique_database, "foo_part", "alter-duration")
self.__test_catalog_tablesfilesusage(unique_database, "foo_part", "1")
self.__test_catalog_tables_loading_time(unique_database, "foo_part")
def __test_catalog_object(self, db_name, tbl_name, cluster_properties):
"""Tests the /catalog_object endpoint for the given db/table. Runs
against an unloaded as well as a loaded table."""
obj_url = self.CATALOG_OBJECT_URL + \
"?object_type=TABLE&object_name={0}.{1}".format(db_name, tbl_name)
if cluster_properties.is_catalog_v2_cluster():
impalad_expected_str = \
"No URI handler for '/catalog_object'"
self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name))
self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT)
# Catalog object endpoint is disabled in local catalog mode.
self.check_endpoint_is_disabled(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
self.client.execute("select count(*) from %s.%s" % (db_name, tbl_name))
self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT)
self.check_endpoint_is_disabled(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
else:
impalad_expected_str = tbl_name
self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name))
self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT)
self.get_and_check_status(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
self.client.execute("select count(*) from %s.%s" % (db_name, tbl_name))
self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT)
self.get_and_check_status(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
def check_endpoint_is_disabled(self, url, string_to_search="", ports_to_test=None):
"""Helper method that verifies the given url does not exist."""
if ports_to_test is None:
ports_to_test = self.TEST_PORTS_WITH_SS
for port in ports_to_test:
input_url = url.format(port)
response = requests.get(input_url)
assert response.status_code == requests.codes.not_found, "URL: {0} Str:'{" \
"1}'\nResp:{2}".format(input_url, string_to_search, response.text)
assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format(
input_url, string_to_search, response.text)
def __test_table_metrics(self, db_name, tbl_name, metric):
self.client.execute("refresh %s.%s" % (db_name, tbl_name))
self.get_and_check_status(self.TABLE_METRICS_URL +
"?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT)
def __test_catalog_tables_loading_time(self, db_name, tbl_name):
"""Test the list of tables with the longest loading time in the catalog page.
Make sure the table exists. And the table is not empty"""
self.client.execute("refresh %s.%s" % (db_name, tbl_name))
self.get_and_check_status(self.CATALOG_URL,
"Tables with Longest Metadata Loading Time", ports_to_test=self.CATALOG_TEST_PORT)
response = self.get_and_check_status(self.CATALOG_URL + "?json",
"longest_loading_tables", ports_to_test=self.CATALOG_TEST_PORT)
response_json = json.loads(response[0].text)
assert "longest_loading_tables" in response_json, \
"Response {0}".format(response_json)
loading_tables = response_json["longest_loading_tables"]
assert len(loading_tables) > 0
members = ["median_metadata_loading_time_ns", "max_metadata_loading_time_ns",
"p75_loading_time_ns", "p95_loading_time_ns", "p99_loading_time_ns"]
for member in members:
if member not in loading_tables[0]:
assert False, "{0} not in loading tables {1}".format(member, loading_tables)
def __test_catalog_tablesfilesusage(self, db_name, tbl_name, numfiles):
"""Test the list of tables with most number of files in the catalog page.
Make sure the loaded table has correct file count."""
self.client.execute("refresh %s.%s" % (db_name, tbl_name))
response = self.get_and_check_status(self.CATALOG_URL,
"Tables with Most Number of Files", ports_to_test=self.CATALOG_TEST_PORT)
list_file_str = re.search('<table id="high-file-count-tables"( .*?)</table>',
response[0].text, re.MULTILINE | re.DOTALL)
target_metric = "%s.%s-metric" % (db_name, tbl_name)
list_files = re.findall('<tr>(.*?)</tr>', list_file_str.group(0),
re.MULTILINE | re.DOTALL)
for trow in list_files:
# Find the entry for the db table and verify its file count.
if re.search(target_metric, trow) is not None:
# Get the number following <td> in the entry
nfiles = re.search('(?<=\<td\>)\d+', trow)
assert nfiles.group(0) == numfiles
response = self.get_and_check_status(self.CATALOG_URL + "?json",
"high_file_count_tables", ports_to_test=self.CATALOG_TEST_PORT)
response_json = json.loads(response[0].text)
high_filecount_tbls = response_json["high_file_count_tables"]
tbl_fname = "%s.%s" % (db_name, tbl_name)
assert len(high_filecount_tbls) > 0
# The expected table might not be in the Top-N list, we may not find it
# in the list. Just make sure the file count is right if it is in the
# list.
for tblinfo in high_filecount_tbls:
if tblinfo["name"] == tbl_fname:
assert tblinfo["num_files"] == int(numfiles)
def __run_query_and_get_debug_page(self, query, page_url, query_options=None,
expected_state=None):
"""Runs a query to obtain the content of the debug page pointed to by page_url, then
cancels the query. Optionally takes in an expected_state parameter, if specified the
method waits for the query to reach the expected state before getting its debug
information."""
if query_options:
self.client.set_configuration(query_options)
query_handle = self.client.execute_async(query)
response_json = ""
try:
if expected_state:
self.wait_for_state(query_handle, expected_state, 100)
responses = self.get_and_check_status(
page_url + "?query_id=%s&json" % query_handle.get_handle().id,
ports_to_test=[25000])
assert len(responses) == 1
response_json = json.loads(responses[0].text)
finally:
self.client.cancel(query_handle)
return response_json
@pytest.mark.xfail(run=False, reason="IMPALA-8059")
def test_backend_states(self, unique_database):
"""Test that /query_backends returns the list of backend states for DML or
queries; nothing for DDL statements"""
sleep_query = "select sleep(10000) from functional.alltypes limit 1"
ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, sleep_query)
running_state = self.client.QUERY_STATES['RUNNING']
backend_state_properties = ['cpu_user_s', 'rpc_latency', 'num_remaining_instances',
'num_instances', 'peak_per_host_mem_consumption',
'time_since_last_heard_from', 'status', 'host',
'cpu_sys_s', 'done', 'bytes_read']
for query in [sleep_query, ctas_sleep_query]:
response_json = self.__run_query_and_get_debug_page(query,
self.QUERY_BACKENDS_URL,
expected_state=running_state)
assert 'backend_states' in response_json
backend_states = response_json['backend_states']
assert len(backend_states) > 0
for backend_state in backend_states:
for backend_state_property in backend_state_properties:
assert backend_state_property in backend_state
assert backend_state['status'] == 'OK'
assert not backend_state['done']
response_json = self.__run_query_and_get_debug_page("describe functional.alltypes",
self.QUERY_BACKENDS_URL)
assert 'backend_states' not in response_json
@pytest.mark.xfail(run=False, reason="IMPALA-8059")
def test_backend_instances(self, unique_database, query_options=None):
"""Test that /query_finstances returns the list of fragment instances for DML or queries;
nothing for DDL statements"""
sleep_query = "select sleep(10000) from functional.alltypes limit 1"
ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, sleep_query)
running_state = self.client.QUERY_STATES['RUNNING']
instance_stats_properties = ['fragment_name', 'time_since_last_heard_from',
'current_state', 'first_status_update_received',
'instance_id', 'done']
for query in [sleep_query, ctas_sleep_query]:
response_json = self.__run_query_and_get_debug_page(query,
self.QUERY_FINSTANCES_URL,
expected_state=running_state)
assert 'backend_instances' in response_json
backend_instances = response_json['backend_instances']
assert len(backend_instances) > 0
for backend_instance in backend_instances:
assert 'host' in backend_instance
assert 'instance_stats' in backend_instance
instances_stats = backend_instance['instance_stats']
assert len(instances_stats) > 0
for instance_stats in instances_stats:
for instance_stats_property in instance_stats_properties:
assert instance_stats_property in instance_stats
assert not instance_stats['done']
response_json = self.__run_query_and_get_debug_page("describe functional.alltypes",
self.QUERY_BACKENDS_URL)
assert 'backend_instances' not in response_json
@pytest.mark.xfail(run=False, reason="IMPALA-8059")
def test_backend_instances_mt_dop(self, unique_database):
"""Test that accessing /query_finstances does not crash the backend when running with
mt_dop."""
self.test_backend_instances(unique_database, query_options=dict(mt_dop=4))
def test_io_mgr_threads(self):
"""Test that IoMgr threads have readable names. This test assumed that all systems we
support have a disk called 'sda'."""
responses = self.get_and_check_status(
self.THREAD_GROUP_URL + "?group=disk-io-mgr&json", ports_to_test=[25000])
assert len(responses) == 1
response_json = json.loads(responses[0].text)
thread_names = [t["name"] for t in response_json['threads']]
expected_name_patterns = ["ADLS remote", "S3 remote", "HDFS remote"]
for pattern in expected_name_patterns:
assert any(pattern in t for t in thread_names), \
"Could not find thread matching '%s'" % pattern
def test_krpc_rpcz(self):
"""Test that KRPC metrics are exposed in /rpcz and that they are updated when
executing a query."""
TEST_QUERY = "select count(c2.string_col) from \
functional.alltypestiny join functional.alltypessmall c2"
SVC_NAME = 'impala.DataStreamService'
def get_per_conn_metrics(inbound):
"""Get inbound or outbound per-connection metrics"""
rpcz = self.get_debug_page(self.RPCZ_URL)
if inbound:
key = "inbound_per_conn_metrics"
else:
key = "per_conn_metrics"
conns = rpcz[key]
return conns
def get_svc_metrics(svc_name):
rpcz = self.get_debug_page(self.RPCZ_URL)
assert len(rpcz['services']) > 0
for s in rpcz['services']:
if s['service_name'] == svc_name:
assert len(s['rpc_method_metrics']) > 0, '%s metrics are empty' % svc_name
return sorted(s['rpc_method_metrics'], key=lambda m: m['method_name'])
assert False, 'Could not find metrics for %s' % svc_name
svc_before = get_svc_metrics(SVC_NAME)
inbound_before = get_per_conn_metrics(True)
outbound_before = get_per_conn_metrics(False)
self.client.execute(TEST_QUERY)
svc_after = get_svc_metrics(SVC_NAME)
inbound_after = get_per_conn_metrics(True)
outbound_after = get_per_conn_metrics(False)
assert svc_before != svc_after
assert inbound_before != inbound_after
assert outbound_before != outbound_after
# Some connections should have metrics after executing query
assert len(inbound_after) > 0
assert len(outbound_after) > 0
# Spot-check some fields, including socket stats.
for conn in itertools.chain(inbound_after, outbound_after):
assert conn["remote_ip"] != ""
assert conn["num_calls_in_flight"] >= 0
assert conn["num_calls_in_flight"] == len(conn["calls_in_flight"])
# Check rtt, which should be present in 'struct tcp_info' even in old kernels
# like 2.6.32.
assert conn["socket_stats"]["rtt"] > 0, conn
# send_queue_bytes uses TIOCOUTQ, which is also present in 2.6.32 and even older
# kernels.
assert conn["socket_stats"]["send_queue_bytes"] >= 0, conn
@pytest.mark.execute_serially
def test_admission_page(self):
"""Sanity check for the admission debug page's http end points (both admission and
reset stats end points)."""
# Make sure at least one query is submitted to the default pool since impala startup,
# so that it shows up in the admission control debug page. Checks for both with and
# without the pool_name search string.
self.client.execute("select 1")
response_json = self.__fetch_resource_pools_json()
# Find the default pool. It is either "root.default" if a fair-scheduler.xml file
# is provided or "default-pool" otherwise.
default_pool = None
for pool_json in response_json:
pool_name = pool_json['pool_name']
if pool_name in ['default-pool', 'root.default']:
default_pool = pool_name
break
assert default_pool is not None, \
"Expected a default pool to be present in {0}".format(response_json)
response_json = self.__fetch_resource_pools_json(default_pool)
assert response_json[0]['pool_name'] == default_pool
# Make sure the reset informational stats endpoint works, both with and without the
# pool_name search string.
assert response_json[0]['total_admitted'] > 0
self.get_and_check_status(
self.RESET_RESOURCE_POOL_STATS_URL + "?pool_name={0}".format(default_pool),
ports_to_test=[25000])
response_json = self.__fetch_resource_pools_json(default_pool)
assert response_json[0]['total_admitted'] == 0
self.client.execute("select 1")
response_json = self.__fetch_resource_pools_json(default_pool)
assert response_json[0]['total_admitted'] > 0
self.get_and_check_status(self.RESET_RESOURCE_POOL_STATS_URL, ports_to_test=[25000])
response_json = self.__fetch_resource_pools_json(default_pool)
pool_config = response_json[0]
assert pool_config['total_admitted'] == 0
# check that metrics exist
assert 'max_query_mem_limit' in pool_config
assert 'min_query_mem_limit' in pool_config
assert 'max_running_queries_multiple' in pool_config
assert 'max_memory_multiple' in pool_config
assert 'clamp_mem_limit_query_option' in pool_config
assert 'max_running_queries_derived' in pool_config
assert 'max_queued_queries_derived' in pool_config
assert 'max_memory_derived' in pool_config
def __fetch_resource_pools_json(self, pool_name=None):
"""Helper method used to fetch the resource pool json from the admission debug page.
If a 'pool_name' is passed to this method, it adds the pool_name search string to the
http request."""
search_string = "?json"
if pool_name is not None:
search_string += "&pool_name=" + pool_name
responses = self.get_and_check_status(self.ADMISSION_URL + search_string,
ports_to_test=[25000])
assert len(responses) == 1
response_json = json.loads(responses[0].text)
assert 'resource_pools' in response_json
return response_json['resource_pools']
@SkipIfBuildType.remote
def test_backends_page(self):
"""Sanity check for the backends debug page's http end point"""
responses = self.get_and_check_status(self.BACKENDS_URL + '?json',
ports_to_test=[25000])
assert len(responses) == 1
response_json = json.loads(responses[0].text)
assert 'backends' in response_json
# When this test runs, all impalads would have already started.
assert len(response_json['backends']) == 3
# Look at results for a single backend - they are not sorted.
backend_row = response_json['backends'][0]
assert len(backend_row['webserver_url']) > 0
webserver_ports = ('25000', '25001', '25002')
assert backend_row['webserver_url'].endswith(webserver_ports)
# The 'address' column is the backend port of the impalad.
assert len(backend_row['address']) > 0
be_ports = ('22000', '22001', '22002')
assert backend_row['address'].endswith(be_ports)
# The 'krpc_address' is the krpc address of the impalad.
assert len(backend_row['krpc_address']) > 0
krpc_ports = ('27000', '27001', '27002')
assert backend_row['krpc_address'].endswith(krpc_ports)
assert backend_row['is_coordinator']
assert backend_row['is_executor']
assert not backend_row['is_quiescing']
assert len(backend_row['admit_mem_limit']) > 0
def test_download_profile(self):
"""Test download text profile for a query"""
query = "select count(*) from functional.alltypes"
query_id = self.client.execute(query).query_id
profile_page_url = "{0}query_profile?query_id={1}".format(
self.ROOT_URL, query_id)
# Check the download tag is there.
for profile_format in ["Text", "Json"]:
responses = self.get_and_check_status(
profile_page_url, profile_format,
ports_to_test=self.IMPALAD_TEST_PORT)
assert len(responses) == 1
if profile_format == 'Text':
download_link = "query_profile_plain_text?query_id={0}".format(query_id)
assert download_link in responses[0].text
# Get the response from download link and validate it by checking
# the query is in the file.
responses = self.get_and_check_status(
self.ROOT_URL + download_link, query, self.IMPALAD_TEST_PORT)
# Check the query id is in the content of the reponse.
assert len(responses) == 1
assert query_id in responses[0].text
elif profile_format == 'Json':
download_link = "query_profile_json?query_id={0}".format(query_id)
assert download_link in responses[0].text
# Get the response from download link and validate it by checking
# the query is in the file.
responses = self.get_and_check_status(
self.ROOT_URL + download_link, query, self.IMPALAD_TEST_PORT)
assert len(responses) == 1
# Check the return content is valid json
try:
json_res = json.loads(responses[0].text)
except ValueError:
assert False, "Downloaded JSON format query profile cannot be parsed. " \
"Json profile:{0}".format(responses[0].text)
# Find the query id in json
assert query_id in json_res["contents"]["profile_name"], json_res
def test_prometheus_metrics(self):
"""Test to check prometheus metrics"""
resp = self.get_and_check_status(self.PROMETHEUS_METRICS_URL)
assert len(resp) == 3
# check if metric shows up
assert 'impala_statestore_subscriber_heartbeat_interval_time_min' in resp[0].text
def test_healthz_endpoint(self):
"""Test to check that the /healthz endpoint returns 200 OK."""
page = requests.get("http://localhost:25000/healthz")
assert page.status_code == requests.codes.ok
def test_knox_compatibility(self):
"""Checks that the template files conform to the requirements for compatibility with
the Apache Knox service definition."""
# Matches all 'a' links with an 'href' that doesn't start with either '#' (which stays
# on the same page an so doesn't need to the hostname) or '{{ __common__.host-url }}'
# Note that if we ever need to add a link that doesn't conform to this, we will
# probably also have to change the Knox service definition.
href_regex = "<(a|link) .*? href=['\"](?!({{ __common__.host-url }})|#)"
# Matches all 'script' tags that aren't absoluve urls.
script_regex = "<script .*?src=['\"](?!({{ __common__.host-url }})|http)"
# Matches all 'form' tags that are not followed by including the hidden inputs.
form_regex = "<form [^{]*?>(?!{{>www/form-hidden-inputs.tmpl}})"
# Matches XMLHttpRequest.open() in javascript that are not followed with make_url().
javascript_regex = "open\(['\"]GET['\"], (?!make_url)"
# Matches urls in json parameters passed to DataTables.
datatables_regex = "url: ['\"](?!make_url)"
regex = "(%s)|(%s)|(%s)|(%s)|(%s)" % \
(href_regex, script_regex, form_regex, javascript_regex, datatables_regex)
results = grep_dir(os.path.join(os.environ['IMPALA_HOME'], "www"), regex, ".*\.tmpl")
assert len(results) == 0, \
"All links on the webui must include the webserver host: %s" % results
# Check that when Knox integration is not being used, the links are relative, by
# checking for the root link from the header.
self.get_and_check_status(self.ROOT_URL, "href='/'", self.IMPALAD_TEST_PORT)
# Check that if the 'x-forwarded-context' header is present in the request, the links
# are written as absolute.
self.get_and_check_status(self.ROOT_URL,
"href='http://.*:%s/'" % self.IMPALAD_TEST_PORT[0], self.IMPALAD_TEST_PORT,
regex=True, headers={'X-Forwarded-Context': '/gateway'})