blob: c6b9f59fc0b309b856ecf31a93c21154891afce9 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.common.environ import ImpalaTestClusterFlagsDetector
from tests.common.file_utils import grep_dir
from tests.common.skip import SkipIfBuildType, SkipIfCatalogV2
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_connection import FINISHED, RUNNING
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_vector import HS2
from tests.util.filesystem_utils import supports_storage_ids
from tests.util.parse_util import parse_duration_string_ms
from tests.util.web_pages_util import (
cancel, wait_for_state, assert_query_stopped, start, join)
from datetime import datetime
from prometheus_client.parser import text_string_to_metric_families
from multiprocessing import Process
import itertools
import json
import logging
import os
import pytest
import re
import requests
import time
LOG = logging.getLogger(__name__)
class TestWebPage(ImpalaTestSuite):
ROOT_URL = "http://localhost:{0}/"
SET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/set_java_loglevel"
RESET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/reset_java_loglevel"
SET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/set_glog_level"
RESET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/reset_glog_level"
CATALOG_URL = "http://localhost:{0}/catalog"
CATALOG_OBJECT_URL = "http://localhost:{0}/catalog_object"
TABLE_METRICS_URL = "http://localhost:{0}/table_metrics"
QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends"
QUERY_FINSTANCES_URL = "http://localhost:{0}/query_finstances"
RPCZ_URL = "http://localhost:{0}/rpcz"
THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
MEMZ_URL = "http://localhost:{0}/memz"
METRICS_URL = "http://localhost:{0}/metrics"
JMX_URL = "http://localhost:{0}/jmx"
ADMISSION_URL = "http://localhost:{0}/admission"
RESET_RESOURCE_POOL_STATS_URL = "http://localhost:{0}/resource_pool_reset"
BACKENDS_URL = "http://localhost:{0}/backends"
PROMETHEUS_METRICS_URL = "http://localhost:{0}/metrics_prometheus"
QUERIES_URL = "http://localhost:{0}/queries"
QUERY_PLAN = "http://localhost:{0}/query_plan"
HEALTHZ_URL = "http://localhost:{0}/healthz"
EVENT_PROCESSOR_URL = "http://localhost:{0}/events"
HADOOP_VARZ_URL = "http://localhost:{0}/hadoop-varz"
JVM_THREADZ_URL = "http://localhost:{0}/jvm-threadz"
STACKS_URL = "http://localhost:{0}/stacks"
# log4j changes do not apply to the statestore since it doesn't
# have an embedded JVM. So we make two sets of ports to test the
# log level endpoints, one without the statestore port and the
# one with it.
TEST_PORTS_WITHOUT_SS = ["25000", "25020"]
TEST_PORTS_WITH_SS = ["25000", "25010", "25020"]
IMPALAD_TEST_PORT = ["25000"]
CATALOG_TEST_PORT = ["25020"]
def test_get_root_url(self):
"""Tests that the root URL is accessible and loads properly"""
self.get_and_check_status(self.ROOT_URL)
def test_content_encoding(self):
responses = self.get_and_check_status(self.ROOT_URL, headers={"Accept-Encoding": ""})
for response in responses:
assert "Content-Encoding" not in response.headers
responses = self.get_and_check_status(self.ROOT_URL,
headers={"Accept-Encoding": "gzip"})
for response in responses:
assert "Content-Encoding" in response.headers
assert response.headers["Content-Encoding"] == "gzip"
def test_get_build_flags(self):
"""Tests that the build flags on the root page contain valid values"""
for port in self.TEST_PORTS_WITH_SS:
build_flags = ImpalaTestClusterFlagsDetector.\
get_build_flags_from_web_ui(self.ROOT_URL.format(port))
assert len(build_flags) == 3
assert "is_ndebug" in build_flags
assert build_flags["is_ndebug"] in ["true", "false"]
assert "cmake_build_type" in build_flags
assert build_flags["cmake_build_type"] in ["debug", "release", "address_sanitizer",
"tidy", "ubsan", "ubsan_full", "tsan", "tsan_full", "code_coverage_release",
"code_coverage_debug", "debug_noopt"]
assert "library_link_type" in build_flags
assert build_flags["library_link_type"] in ["dynamic", "static"]
@SkipIfBuildType.remote
def test_root_correct_build_flags(self, cluster_properties):
"""Tests that the build flags on the root page contain correct values"""
assert not cluster_properties.is_remote_cluster()
for port in self.TEST_PORTS_WITH_SS:
build_flags = ImpalaTestClusterFlagsDetector.\
get_build_flags_from_web_ui(self.ROOT_URL.format(port))
assert build_flags["cmake_build_type"] == cluster_properties.build_flavor
assert build_flags["library_link_type"] == cluster_properties.library_link_type
def test_root_consistent_build_flags(self):
"""Tests that the build flags on the root page contain consistent values"""
for port in self.TEST_PORTS_WITH_SS:
build_flags = ImpalaTestClusterFlagsDetector.\
get_build_flags_from_web_ui(self.ROOT_URL.format(port))
is_ndebug = build_flags["is_ndebug"] == "true"
if not is_ndebug:
assert not build_flags["cmake_build_type"] in ["release"]
if build_flags["cmake_build_type"] in ["debug"]:
assert not is_ndebug
def test_root_other_info(self):
"""Tests to check glibc version and locale is available"""
for port in self.TEST_PORTS_WITH_SS:
other_info_page = requests.get(self.ROOT_URL.format(port) + "/?json").text
other_info = json.loads(other_info_page)
assert "effective_locale" in other_info
assert "glibc_version" in other_info
def test_jvm_threadz(self):
"""Tests if timestamp is available in jvm-threadz page"""
for port in self.TEST_PORTS_WITHOUT_SS:
response = requests.get(self.JVM_THREADZ_URL.format(port) + "?json")
assert response.status_code == requests.codes.ok
jvm_threadz_page = json.loads(response.text)
overview = jvm_threadz_page["overview"]
assert len(overview) == 4
assert "timestamp" in overview
def test_memz(self):
"""Tests /memz at impalad / statestored / catalogd"""
page = requests.get("http://localhost:25000/memz")
assert page.status_code == requests.codes.ok
page = requests.get("http://localhost:25010/memz")
assert page.status_code == requests.codes.ok
page = requests.get("http://localhost:25020/memz")
assert page.status_code == requests.codes.ok
page = requests.head("http://localhost:25000/memz")
assert page.status_code == requests.codes.ok
page = requests.head("http://localhost:25010/memz")
assert page.status_code == requests.codes.ok
page = requests.head("http://localhost:25020/memz")
assert page.status_code == requests.codes.ok
def test_memz_shows_fragment_instance_id(self):
"""Tests that the memory breakdown on memz shows fragment instance IDs."""
query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)"
query_handle = self.client.execute_async(query)
try:
self.client.wait_for_impala_state(query_handle, RUNNING, 1000)
memz_breakdown = self._get_debug_page(self.MEMZ_URL)['detailed']
finstance_re = re.compile("Fragment [0-9a-f]{16}:[0-9a-f]{16}")
assert finstance_re.search(memz_breakdown), memz_breakdown
finally:
self.client.close_query(query_handle)
def test_query_profile_encoded_unknown_query_id(self):
"""Test that /query_profile_encoded error message starts with the expected line in
case of missing query and does not contain any leading whitespace.
"""
cluster = ImpalaCluster.get_e2e_test_cluster()
impalad = cluster.get_any_impalad()
result = impalad.service.read_debug_webpage("query_profile_encoded?query_id=123")
assert result.startswith("Could not obtain runtime profile: Query id")
def test_jmx_endpoint(self):
"""Tests that the /jmx endpoint on the Catalog and Impalads returns a valid json."""
for port in self.TEST_PORTS_WITHOUT_SS:
input_url = self.JMX_URL.format(port)
response = requests.get(input_url)
assert response.status_code == requests.codes.ok
assert "application/json" == response.headers['Content-Type']
jmx_json = ""
try:
jmx_json = json.loads(response.text)
assert "beans" in jmx_json.keys(), "Ill formatted JSON returned: %s" % jmx_json
except ValueError:
assert False, "Invalid JSON returned from /jmx endpoint: %s" % jmx_json
def test_stacks_endpoint(self):
"""Tests that the /stacks endpoint on all daemons returns valid stack traces."""
for port in self.TEST_PORTS_WITH_SS:
input_url = self.STACKS_URL.format(port)
response = requests.get(input_url)
assert response.status_code == requests.codes.ok
# The /stacks endpoint returns plain text
assert "text/plain" in response.headers['Content-Type']
# Verify the response contains expected stack trace elements
stack_text = response.text
assert "Collected stacks from" in stack_text, \
"Missing collection summary in response from port %s" % port
assert "threads in" in stack_text, \
"Missing thread count in response from port %s" % port
assert "TID" in stack_text, \
"Missing thread IDs in response from port %s" % port
# Verify it contains at least one thread with a stack trace
# Stack traces should have lines starting with '@' for frame information
assert "@" in stack_text, \
"Missing stack frames in response from port %s" % port
def test_stacks_endpoint_format(self):
"""Tests that the /stacks endpoint returns properly formatted stack traces."""
for port in self.TEST_PORTS_WITH_SS:
input_url = self.STACKS_URL.format(port)
response = requests.get(input_url)
assert response.status_code == requests.codes.ok
stack_text = response.text
lines = stack_text.split('\n')
# Verify the header line format
header_pattern = re.compile(r'Collected stacks from \d+ threads in \d+\.\d+s')
assert any(header_pattern.match(line) for line in lines), \
"Missing or malformed collection summary header"
# Verify thread ID format (e.g., "TID 1911294 (statestored):" or
# "TID 262192 (subscriber-priority-update-worker(2:10)):" with nested parens)
# Use a greedy match that captures everything between "TID <num> (" and "):"
tid_pattern = re.compile(r'TID \d+ \(.+\):')
assert any(tid_pattern.match(line) for line in lines), \
"Missing or malformed thread ID lines"
# Verify stack frame format (lines starting with '@' and containing addresses)
frame_pattern = re.compile(r'^\s*@\s+0x[0-9a-f]+\s+.*')
assert any(frame_pattern.match(line) for line in lines), \
"Missing or malformed stack frame lines"
# Check for thread grouping format (e.g., "3 threads with same stack:")
# This may or may not be present depending on whether there are duplicate stacks
group_pattern = re.compile(r'^\d+ threads with same stack:')
has_grouped_threads = any(group_pattern.match(line) for line in lines)
# If there are grouped threads, verify that the TIDs are listed before the stack
if has_grouped_threads:
# Find a group header
for i, line in enumerate(lines):
if group_pattern.match(line):
# The next few lines should be TID lines
assert i + 1 < len(lines), "Group header at end of output"
# At least one TID should follow
found_tid = False
for j in range(i + 1, min(i + 10, len(lines))):
if tid_pattern.match(lines[j]):
found_tid = True
break
assert found_tid, "No TID found after group header"
break
def get_and_check_status(
self, url, string_to_search="", ports_to_test=None, regex=False, headers=None):
"""Helper method that polls a given url and asserts the return code is ok and
the response contains the input string."""
if ports_to_test is None:
ports_to_test = self.TEST_PORTS_WITH_SS
responses = []
for port in ports_to_test:
input_url = url.format(port)
response = requests.head(input_url, headers=headers)
assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\
.format(input_url, string_to_search, response.text)
response = requests.get(input_url, headers=headers)
assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\
.format(input_url, string_to_search, response.text)
if regex:
assert re.search(string_to_search, response.text), "URL: {0} Str:'{1}'\nResp:{2}"\
.format(input_url, string_to_search, response.text)
else:
assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format(
input_url, string_to_search, response.text)
responses.append(response)
assert 'Content-Security-Policy' in response.headers, "CSP header missing"
return responses
def post_and_check_status(self, url, data={}, string_to_search="", ports_to_test=None):
"""Helper method that posts to a given url, then asserts the return code is ok and
the response contains the expected string."""
if ports_to_test is None:
ports_to_test = self.TEST_PORTS_WITH_SS
responses = []
for port in ports_to_test:
input_url = url.format(port)
response = requests.head(input_url)
assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\
.format(input_url, string_to_search, response.text)
response = requests.post(input_url, data=data)
assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\
.format(input_url, string_to_search, response.text)
assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format(
input_url, string_to_search, response.text)
responses.append(response)
assert 'Content-Security-Policy' in response.headers, "CSP header missing"
return responses
def _get_debug_page(self, page_url, port=25000):
"""Returns the content of the debug page 'page_url' as json."""
responses = self.get_and_check_status(page_url + "?json", ports_to_test=[port])
assert len(responses) == 1
assert "application/json" in responses[0].headers['Content-Type']
return json.loads(responses[0].text)
def get_and_check_status_jvm(self, url, string_to_search=""):
"""Calls get_and_check_status() for impalad and catalogd only"""
return self.get_and_check_status(url, string_to_search,
ports_to_test=self.TEST_PORTS_WITHOUT_SS)
def post_and_check_status_jvm(self, url, data={}, string_to_search=""):
"""Calls post_and_check_status() for impalad and catalogd only"""
return self.post_and_check_status(url, data, string_to_search,
ports_to_test=self.TEST_PORTS_WITHOUT_SS)
def test_content_type(self):
"""Checks that an appropriate content-type is set for various types of pages."""
# Mapping from each page to its MIME type.
page_to_mime =\
{"?json": "application/json", "?raw": "text/plain; charset=UTF-8",
"": "text/html; charset=UTF-8"}
for port in self.TEST_PORTS_WITH_SS:
for page, content_type in page_to_mime.items():
url = self.METRICS_URL.format(port) + page
assert content_type == requests.get(url).headers['Content-Type']
def test_log_level(self):
"""Test that the /log_level page outputs are as expected and work well on basic and
malformed inputs. This however does not test that the log level changes are actually
in effect."""
# Check that the log_level end points are accessible.
self.post_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL)
self.post_and_check_status_jvm(self.RESET_JAVA_LOGLEVEL_URL)
self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL)
self.post_and_check_status(self.RESET_GLOG_LOGLEVEL_URL)
# Set the log level of a class to TRACE and confirm the setting is in place
self.post_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL,
{"class": "org.apache.impala.catalog.HdfsTable", "level": "trace"},
"org.apache.impala.catalog.HdfsTable : TRACE")
# Reset Java logging levels
self.post_and_check_status_jvm(self.RESET_JAVA_LOGLEVEL_URL, {},
"Java log levels reset.")
# Set a new glog level and make sure the setting has been applied.
self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"glog": 3}, "val(3)")
# Try resetting the glog logging defaults again.
self.post_and_check_status(self.RESET_GLOG_LOGLEVEL_URL, {},
"Current backend log level: ")
# Try to set the log level with an empty class input
self.post_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL, {"class": ""})
# Empty input for setting a glog level request
self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"glog": ""})
# Try setting a non-existent log level on a valid class. In such cases,
# log4j automatically sets it as DEBUG. This is the behavior of
# Level.toLevel() method.
self.post_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL,
{"class": "org.apache.impala.catalog.HdfsTable", "level": "foo"},
"org.apache.impala.catalog.HdfsTable : DEBUG")
# Try setting an invalid glog level.
self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"glog": "foo"},
"Bad glog level input")
# Try a non-existent endpoint on log_level URL.
self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"badurl": "foo"})
@pytest.mark.execute_serially
def test_uda_with_log_level(self):
"""IMPALA-7903: Impala crashes when executing an aggregate query with log level set
to 3. Running this test serially not to interfere with other tests setting the log
level."""
# Check that the log_level end points are accessible.
self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL)
self.post_and_check_status(self.RESET_GLOG_LOGLEVEL_URL)
# Set log level to 3.
self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"glog": 3}, "val(3)")
# Check that Impala doesn't crash when running a query that aggregates.
self.client.execute("select avg(int_col) from functional.alltypessmall")
# Reset log level.
self.post_and_check_status(self.RESET_GLOG_LOGLEVEL_URL, {},
"Current backend log level: ")
def test_catalog(self, cluster_properties, unique_database):
"""Tests the /catalog and /catalog_object endpoints."""
# Non-partitioned table
query = "create table {0}.foo (id int, val int)".format(unique_database)
self.execute_query(query)
insert_query = "insert into {0}.foo values (1, 200)".format(unique_database)
self.execute_query(insert_query)
# Partitioned table
partitioned_query = "create table {0}.foo_part (id int, val int) partitioned by (" \
"year int)".format(unique_database)
self.execute_query(partitioned_query)
partition_insert_query = "insert into {0}.foo_part partition (year=2010) values "\
"(1, 200)".format(unique_database)
self.execute_query(partition_insert_query)
# Kudu table
kudu_query = "create table {0}.foo_kudu (id int, val int, primary key (id)) " \
"stored as kudu".format(unique_database)
self.execute_query(kudu_query)
kudu_insert_query = "insert into {0}.foo_kudu values (1, 200)".format(unique_database)
self.execute_query(kudu_insert_query)
# Partitioned parquet table
parquet_query = "create table {0}.foo_part_parquet (id int, val int) partitioned " \
"by (year int) stored as parquet".format(unique_database)
self.execute_query(parquet_query)
parquet_insert_query = "insert into {0}.foo_part_parquet partition (year=2010) " \
"values (1, 200)".format(unique_database)
self.execute_query(parquet_insert_query)
self.get_and_check_status_jvm(self.CATALOG_URL, unique_database)
self.get_and_check_status_jvm(self.CATALOG_URL, "foo_part")
# IMPALA-5028: Test toThrift() of a partitioned table via the WebUI code path.
self.__test_catalog_object(unique_database, "foo_part", cluster_properties)
self.__test_catalog_object(unique_database, "foo_kudu", cluster_properties)
self.__test_catalog_object(unique_database, "foo_part_parquet", cluster_properties)
self.__test_catalog_object(unique_database, "foo", cluster_properties)
self.__test_json_db_object(unique_database)
self.__test_json_table_object(unique_database, "foo")
self.__test_json_table_object(unique_database, "foo_part")
self.__test_json_table_object(unique_database, "foo_part_parquet")
self.__test_table_metrics(unique_database, "foo_part", "total-file-size-bytes")
self.__test_table_metrics(unique_database, "foo_part", "num-files")
self.__test_table_metrics(unique_database, "foo_part", "alter-duration")
self.__test_table_metrics(unique_database, "foo_part", "events-process-duration")
self.__test_catalog_tablesfilesusage(unique_database, "foo_part", "1")
self.__test_catalog_tables_loading_time(unique_database, "foo_part")
self.get_and_check_status(self.EVENT_PROCESSOR_URL, "events-consuming-delay",
ports_to_test=self.CATALOG_TEST_PORT)
# Multi-key partitioned table
multi_part_query = "create table {0}.foo_multi_part (id int, val int) " \
"partitioned by (year int, month int, day int)".format(unique_database)
self.execute_query(multi_part_query)
multi_part_insert_query = "insert into {0}.foo_multi_part partition " \
"(year=2024, month=12, day=25) values (1, 200)".format(unique_database)
self.execute_query(multi_part_insert_query)
# Table with string partition that contains special characters
slash_part_query = "create table {0}.foo_slash_part (id int, val int) " \
"partitioned by (ds string)".format(unique_database)
self.execute_query(slash_part_query)
slash_part_insert_query = "insert into {0}.foo_slash_part partition " \
"(ds='2024/12/25') values (1, 200)".format(unique_database)
self.execute_query(slash_part_insert_query)
# Test partition catalog objects (IMPALA-9935)
self.__test_catalog_partition_object(unique_database, "foo_part", "year=2010",
cluster_properties)
self.__test_json_partition_object(unique_database, "foo_part", "year=2010",
cluster_properties)
# Test multi-key partition
self.__test_catalog_partition_object(unique_database, "foo_multi_part",
"year=2024/month=12/day=25", cluster_properties)
self.__test_json_partition_object(unique_database, "foo_multi_part",
"year=2024/month=12/day=25", cluster_properties)
# Test partition value with slash
# Note: Pass the pre-encoded partition name that matches Hive's HDFS directory format.
# Hive stores "ds=2024/12/25" as directory "ds=2024%2F12%2F25" in HDFS.
# The test methods will URL-encode this again for HTTP transmission (double-encoding).
self.__test_catalog_partition_object(unique_database, "foo_slash_part",
"ds=2024%2F12%2F25", cluster_properties)
self.__test_json_partition_object(unique_database, "foo_slash_part",
"ds=2024%2F12%2F25", cluster_properties)
def __test_catalog_object(self, db_name, tbl_name, cluster_properties):
"""Tests the /catalog_object endpoint for the given db/table. Runs
against an unloaded as well as a loaded table."""
obj_url = self.CATALOG_OBJECT_URL + \
"?object_type=TABLE&object_name={0}.{1}".format(db_name, tbl_name)
if cluster_properties.is_catalog_v2_cluster():
impalad_expected_str = \
"No URI handler for &apos;/catalog_object&apos;"
self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name))
self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT)
# Catalog object endpoint is disabled in local catalog mode.
self.check_endpoint_is_disabled(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
self.client.execute("select count(*) from %s.%s" % (db_name, tbl_name))
self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT)
self.check_endpoint_is_disabled(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
else:
impalad_expected_str = tbl_name
self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name))
self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT)
self.get_and_check_status(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
self.client.execute("select count(*) from %s.%s" % (db_name, tbl_name))
self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT)
self.get_and_check_status(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
def __test_json_db_object(self, db_name):
"""Tests the /catalog_object?json endpoint of catalogd for the given db."""
obj_url = self.CATALOG_OBJECT_URL + \
"?json&object_type=DATABASE&object_name={0}".format(db_name)
responses = self.get_and_check_status(obj_url, ports_to_test=self.CATALOG_TEST_PORT)
obj = json.loads(json.loads(responses[0].text)["json_string"])
assert obj["type"] == 2, "type should be DATABASE"
assert "catalog_version" in obj, "TCatalogObject should have catalog_version"
db_obj = obj["db"]
assert db_obj["db_name"] == db_name
assert "metastore_db" in db_obj, "Loaded database should have metastore_db"
def __test_json_table_object(self, db_name, tbl_name):
"""Tests the /catalog_object?json endpoint of catalogd for the given db/table. Runs
against an unloaded as well as a loaded table."""
obj_url = self.CATALOG_OBJECT_URL + \
"?json&object_type=TABLE&object_name={0}.{1}".format(db_name, tbl_name)
self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name))
responses = self.get_and_check_status(obj_url, ports_to_test=self.CATALOG_TEST_PORT)
obj = json.loads(json.loads(responses[0].text)["json_string"])
assert obj["type"] == 3, "type should be TABLE"
assert "catalog_version" in obj, "TCatalogObject should have catalog_version"
tbl_obj = obj["table"]
assert tbl_obj["db_name"] == db_name
assert tbl_obj["tbl_name"] == tbl_name
assert "hdfs_table" not in tbl_obj, "Unloaded table should not have hdfs_table"
self.client.execute("refresh %s.%s" % (db_name, tbl_name))
responses = self.get_and_check_status(obj_url, ports_to_test=self.CATALOG_TEST_PORT)
obj = json.loads(json.loads(responses[0].text)["json_string"])
assert obj["type"] == 3, "type should be TABLE"
assert "catalog_version" in obj, "TCatalogObject should have catalog_version"
tbl_obj = obj["table"]
assert tbl_obj["db_name"] == db_name
assert tbl_obj["tbl_name"] == tbl_name
assert "columns" in tbl_obj, "Loaded TTable should have columns"
assert tbl_obj["table_type"] == 0, "table_type should be HDFS_TABLE"
assert "metastore_table" in tbl_obj
hdfs_tbl_obj = tbl_obj["hdfs_table"]
assert "hdfsBaseDir" in hdfs_tbl_obj
assert "colNames" in hdfs_tbl_obj
assert "nullPartitionKeyValue" in hdfs_tbl_obj
assert "nullColumnValue" in hdfs_tbl_obj
assert "partitions" in hdfs_tbl_obj
assert "prototype_partition" in hdfs_tbl_obj
def __verify_catalog_partition_html_response(self, response_text):
"""Verify HTML catalog partition response contains expected Thrift structures."""
assert "CatalogException" not in response_text, \
"Response should not contain error: " + response_text[:200]
assert "TCatalogObject" in response_text, "Response should contain TCatalogObject"
assert "THdfsPartition" in response_text, "Response should contain THdfsPartition"
assert "THdfsStorageDescriptor" in response_text, \
"Response should contain THdfsStorageDescriptor"
def __test_catalog_partition_object(self, db_name, tbl_name, partition_name,
cluster_properties):
"""Tests the /catalog_object endpoint for the given db/table/partition."""
import urllib
# URL encode the entire object name (db.table:partition).
# This is necessary when partition values contain slashes (e.g., "ds=2024/12/25").
object_name = "{0}.{1}:{2}".format(db_name, tbl_name, partition_name)
encoded_object_name = urllib.parse.quote(object_name, safe='')
obj_url = self.CATALOG_OBJECT_URL + \
"?object_type=HDFS_PARTITION&object_name={0}".format(encoded_object_name)
# Make sure the table is loaded
self.client.execute("describe %s.%s" % (db_name, tbl_name))
if cluster_properties.is_catalog_v2_cluster():
# In Catalog V2 (local catalog), endpoint only works on catalogd
responses = self.get_and_check_status(obj_url, partition_name,
ports_to_test=self.CATALOG_TEST_PORT)
self.__verify_catalog_partition_html_response(responses[0].text)
# Catalog object endpoint is disabled in local catalog mode on impalad
impalad_expected_str = "No URI handler for &apos;/catalog_object&apos;"
self.check_endpoint_is_disabled(obj_url, impalad_expected_str,
ports_to_test=self.IMPALAD_TEST_PORT)
else:
# In Catalog V1, endpoint works on both catalogd and impalad
responses = self.get_and_check_status(obj_url, partition_name,
ports_to_test=self.CATALOG_TEST_PORT)
self.__verify_catalog_partition_html_response(responses[0].text)
responses = self.get_and_check_status(obj_url, partition_name,
ports_to_test=self.IMPALAD_TEST_PORT)
self.__verify_catalog_partition_html_response(responses[0].text)
def __test_json_partition_object(self, db_name, tbl_name, partition_name,
cluster_properties):
"""Tests the /catalog_object?json endpoint for the given db/table/partition."""
import urllib
# URL encode the entire object name (db.table:partition).
# This is necessary when partition values contain slashes (e.g., "ds=2024/12/25").
object_name = "{0}.{1}:{2}".format(db_name, tbl_name, partition_name)
encoded_object_name = urllib.parse.quote(object_name, safe='')
obj_url = self.CATALOG_OBJECT_URL + \
"?json&object_type=HDFS_PARTITION&object_name={0}".format(encoded_object_name)
# Make sure the table is loaded
self.client.execute("describe %s.%s" % (db_name, tbl_name))
# Test catalogd endpoint (works in both V1 and V2)
responses = self.get_and_check_status(obj_url, ports_to_test=self.CATALOG_TEST_PORT)
response_json = json.loads(responses[0].text)
assert "json_string" in response_json, "Response should contain json_string"
obj = json.loads(response_json["json_string"])
assert obj["type"] == 11, "type should be HDFS_PARTITION (11)"
assert "catalog_version" in obj, "TCatalogObject should have catalog_version"
part_obj = obj["hdfs_partition"]
assert part_obj["db_name"] == db_name
assert part_obj["tbl_name"] == tbl_name
assert part_obj["partition_name"] == partition_name
# In Catalog V1, also test impalad endpoint
if not cluster_properties.is_catalog_v2_cluster():
responses = self.get_and_check_status(obj_url, ports_to_test=self.IMPALAD_TEST_PORT)
response_json = json.loads(responses[0].text)
assert "json_string" in response_json, "Response should contain json_string"
obj = json.loads(response_json["json_string"])
assert obj["type"] == 11, "type should be HDFS_PARTITION (11)"
part_obj = obj["hdfs_partition"]
assert part_obj["db_name"] == db_name
assert part_obj["tbl_name"] == tbl_name
assert part_obj["partition_name"] == partition_name
def check_endpoint_is_disabled(self, url, string_to_search="", ports_to_test=None):
"""Helper method that verifies the given url does not exist."""
if ports_to_test is None:
ports_to_test = self.TEST_PORTS_WITH_SS
for port in ports_to_test:
input_url = url.format(port)
response = requests.head(input_url)
assert response.status_code == requests.codes.not_found, "URL: {0} Str:'{" \
"1}'\nResp:{2}".format(input_url, string_to_search, response.text)
response = requests.get(input_url)
assert response.status_code == requests.codes.not_found, "URL: {0} Str:'{" \
"1}'\nResp:{2}".format(input_url, string_to_search, response.text)
assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format(
input_url, string_to_search, response.text)
def __test_table_metrics(self, db_name, tbl_name, metric):
self.client.execute("refresh %s.%s" % (db_name, tbl_name))
self.get_and_check_status(self.TABLE_METRICS_URL
+ "?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT)
def __test_catalog_tables_loading_time(self, db_name, tbl_name):
"""Test the list of tables with the longest loading time in the catalog page.
Make sure the table exists. And the table is not empty"""
self.client.execute("refresh %s.%s" % (db_name, tbl_name))
self.get_and_check_status(self.CATALOG_URL,
"Tables with Longest Metadata Loading Time", ports_to_test=self.CATALOG_TEST_PORT)
response = self.get_and_check_status(self.CATALOG_URL + "?json",
"longest_loading_tables", ports_to_test=self.CATALOG_TEST_PORT)
response_json = json.loads(response[0].text)
assert "longest_loading_tables" in response_json, \
"Response {0}".format(response_json)
loading_tables = response_json["longest_loading_tables"]
assert len(loading_tables) > 0
members = ["median_metadata_loading_time_ns", "max_metadata_loading_time_ns",
"p75_loading_time_ns", "p95_loading_time_ns", "p99_loading_time_ns"]
for member in members:
if member not in loading_tables[0]:
assert False, "{0} not in loading tables {1}".format(member, loading_tables)
def __test_catalog_tablesfilesusage(self, db_name, tbl_name, numfiles):
"""Test the list of tables with most number of files in the catalog page.
Make sure the loaded table has correct file count."""
self.client.execute("refresh %s.%s" % (db_name, tbl_name))
response = self.get_and_check_status(self.CATALOG_URL,
"Tables with Most Number of Files", ports_to_test=self.CATALOG_TEST_PORT)
list_file_str = re.search('<table id="high-file-count-tables"( .*?)</table>',
response[0].text, re.MULTILINE | re.DOTALL)
target_metric = "%s.%s-metric" % (db_name, tbl_name)
list_files = re.findall('<tr>(.*?)</tr>', list_file_str.group(0),
re.MULTILINE | re.DOTALL)
for trow in list_files:
# Find the entry for the db table and verify its file count.
if re.search(target_metric, trow) is not None:
# Get the number following <td> in the entry
nfiles = re.search(r'(?<=\<td\>)\d+', trow)
assert nfiles.group(0) == numfiles
response = self.get_and_check_status(self.CATALOG_URL + "?json",
"high_file_count_tables", ports_to_test=self.CATALOG_TEST_PORT)
response_json = json.loads(response[0].text)
high_filecount_tbls = response_json["high_file_count_tables"]
tbl_fname = "%s.%s" % (db_name, tbl_name)
assert len(high_filecount_tbls) > 0
# The expected table might not be in the Top-N list, we may not find it
# in the list. Just make sure the file count is right if it is in the
# list.
for tblinfo in high_filecount_tbls:
if tblinfo["name"] == tbl_fname:
assert tblinfo["num_files"] == int(numfiles)
def test_query_stmt(self):
"""Create a long select query then check if it is truncated in the response json."""
# The input query is a select + 450 "x " long, which is long enough to get truncated.
query = "select \"{0}\"".format("x " * 450)
# The expected result query should be 253 long and contains the first 250
# chars + "..."
expected_result = "select \"{0}...".format("x " * 121)
check_if_contains = False
(_, response_json) = self.run_query_and_get_debug_page(
query, self.QUERIES_URL, expected_state=FINISHED)
# Search the json for the expected value.
# The query can be in in_flight_queries even though it is in FINISHED state.
for json_part in itertools.chain(
response_json['completed_queries'], response_json['in_flight_queries']):
if expected_result in json_part['stmt']:
check_if_contains = True
break
assert check_if_contains, "No matching statement found in the jsons at {}: {}".format(
datetime.now(), json.dumps(response_json, sort_keys=True, indent=4))
def test_failing_ctas(self, unique_database):
"""Regression test for IMPALA-14791: Verify that a failing CTAS query does not
crash Impala when the plan is retrieved."""
target_tbl = unique_database + ".ctas_target_fail"
ctas_query = """create table {0} stored as iceberg as
select 100000""".format(target_tbl)
debug_action = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@'
'IcebergAlreadyExistsException@Table was created concurrently'}
(_, response_json) = self.run_query_and_get_debug_page(
ctas_query, self.QUERY_PLAN, query_options=debug_action,
expected_state='ERROR')
# Verify that we have a non-empty plan_json.
assert 'plan_json' in response_json
assert 'plan_nodes' in response_json['plan_json']
assert 'label' in response_json['plan_json']['plan_nodes'][0]
# Summary is empty as execution could not start.
assert 'summary' in response_json
assert response_json['summary'] == ''
# Verify error message
assert "Table already exists" in response_json['status']
@pytest.mark.xfail(run=False, reason="IMPALA-8059")
def test_backend_states(self, unique_database):
"""Test that /query_backends returns the list of backend states for DML or
queries; nothing for DDL statements"""
sleep_query = "select sleep(10000) from functional.alltypes limit 1"
ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, sleep_query)
running_state = RUNNING
backend_state_properties = ['cpu_user_s', 'rpc_latency', 'num_remaining_instances',
'num_instances', 'peak_per_host_mem_consumption',
'time_since_last_heard_from', 'status', 'host',
'cpu_sys_s', 'done', 'bytes_read', 'rpc_size']
for query in [sleep_query, ctas_sleep_query]:
(_, response_json) = self.run_query_and_get_debug_page(
query, self.QUERY_BACKENDS_URL, expected_state=running_state)
assert 'backend_states' in response_json
backend_states = response_json['backend_states']
assert len(backend_states) > 0
for backend_state in backend_states:
for backend_state_property in backend_state_properties:
assert backend_state_property in backend_state
assert backend_state['status'] == 'OK'
assert not backend_state['done']
(_, response_json) = self.run_query_and_get_debug_page(
"describe functional.alltypes", self.QUERY_BACKENDS_URL)
assert 'backend_states' not in response_json
@pytest.mark.xfail(run=False, reason="IMPALA-8059")
def test_backend_instances(self, unique_database, query_options=None):
"""Test that /query_finstances returns the list of fragment instances for DML or queries;
nothing for DDL statements"""
sleep_query = "select sleep(10000) from functional.alltypes limit 1"
ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, sleep_query)
running_state = RUNNING
instance_stats_properties = ['fragment_name', 'time_since_last_heard_from',
'current_state', 'first_status_update_received',
'instance_id', 'done']
for query in [sleep_query, ctas_sleep_query]:
(_, response_json) = self.run_query_and_get_debug_page(
query, self.QUERY_FINSTANCES_URL, query_options=query_options,
expected_state=running_state)
assert 'backend_instances' in response_json
backend_instances = response_json['backend_instances']
assert len(backend_instances) > 0
for backend_instance in backend_instances:
assert 'host' in backend_instance
assert 'instance_stats' in backend_instance
instances_stats = backend_instance['instance_stats']
assert len(instances_stats) > 0
for instance_stats in instances_stats:
for instance_stats_property in instance_stats_properties:
assert instance_stats_property in instance_stats
assert not instance_stats['done']
(_, response_json) = self.run_query_and_get_debug_page(
"describe functional.alltypes", self.QUERY_BACKENDS_URL,
query_options=query_options)
assert 'backend_instances' not in response_json
@pytest.mark.xfail(run=False, reason="IMPALA-8059")
def test_backend_instances_mt_dop(self, unique_database):
"""Test that accessing /query_finstances does not crash the backend when running with
mt_dop."""
self.test_backend_instances(unique_database, query_options=dict(mt_dop=4))
def test_io_mgr_threads(self):
"""Test that IoMgr threads have readable names. This test assumed that all systems we
support have a disk called 'sda'."""
responses = self.get_and_check_status(
self.THREAD_GROUP_URL + "?group=disk-io-mgr&json", ports_to_test=[25000])
assert len(responses) == 1
response_json = json.loads(responses[0].text)
# Verify metric keys for each thread
for t in response_json['threads']:
assert "name" in t
assert "id" in t
assert "user_s" in t
assert "kernel_s" in t
assert "iowait_s" in t
thread_names = [t["name"] for t in response_json['threads']]
expected_name_patterns = ["ADLS remote", "S3 remote", "HDFS remote"]
for pattern in expected_name_patterns:
assert any(pattern in t for t in thread_names), \
"Could not find thread matching '%s'" % pattern
def test_rpc_read_write_metrics(self):
"""Test that read/write metrics are exposed in /rpcz"""
rpcz = self._get_debug_page(self.RPCZ_URL)
def create_histogram_regex(value_regex):
return ("Count: [0-9]+, sum: " + value_regex
+ ", min / max: " + value_regex + " / " + value_regex
+ ", 25th %-ile: " + value_regex
+ ", 50th %-ile: " + value_regex
+ ", 75th %-ile: " + value_regex
+ ", 90th %-ile: " + value_regex
+ ", 95th %-ile: " + value_regex
+ ", 99.9th %-ile: " + value_regex)
rpc_hist_time_regex = create_histogram_regex("[0-9][0-9numsh.]*")
rpc_hist_unit_regex = create_histogram_regex("[0-9][0-9KM.]*")
assert len(rpcz['servers']) > 0
for s in rpcz['servers']:
for m in s['methods']:
assert re.search(rpc_hist_time_regex, m["summary"])
assert re.search(rpc_hist_time_regex, m["read"])
assert re.search(rpc_hist_time_regex, m["write"])
assert re.search(rpc_hist_time_regex, rpcz['reactor_active_latency'])
assert re.search(rpc_hist_unit_regex, rpcz['reactor_load_percent'])
def test_krpc_rpcz(self):
"""Test that KRPC metrics are exposed in /rpcz and that they are updated when
executing a query."""
TEST_QUERY = "select count(c2.string_col) from \
functional.alltypestiny join functional.alltypessmall c2"
SVC_NAME = 'impala.DataStreamService'
def is_krpc_use_unix_domain_socket():
rpcz = self._get_debug_page(self.RPCZ_URL)
return rpcz['rpc_use_unix_domain_socket']
def get_per_conn_metrics(inbound):
"""Get inbound or outbound per-connection metrics"""
rpcz = self._get_debug_page(self.RPCZ_URL)
if inbound:
key = "inbound_per_conn_metrics"
else:
key = "per_conn_metrics"
conns = rpcz[key]
return conns
def get_svc_metrics(svc_name):
rpcz = self._get_debug_page(self.RPCZ_URL)
assert len(rpcz['services']) > 0
for s in rpcz['services']:
if s['service_name'] == svc_name:
assert s['rpcs_timed_out_in_queue'] == 0
assert len(s['rpc_method_metrics']) > 0, '%s metrics are empty' % svc_name
return sorted(s['rpc_method_metrics'], key=lambda m: m['method_name'])
assert False, 'Could not find metrics for %s' % svc_name
krpc_use_uds = is_krpc_use_unix_domain_socket()
svc_before = get_svc_metrics(SVC_NAME)
inbound_before = get_per_conn_metrics(True)
outbound_before = get_per_conn_metrics(False)
self.client.execute(TEST_QUERY)
svc_after = get_svc_metrics(SVC_NAME)
inbound_after = get_per_conn_metrics(True)
outbound_after = get_per_conn_metrics(False)
assert svc_before != svc_after
if not krpc_use_uds:
assert inbound_before != inbound_after
assert outbound_before != outbound_after
# Some connections should have metrics after executing query
assert len(inbound_after) > 0
assert len(outbound_after) > 0
# Spot-check some fields, including socket stats.
for conn in itertools.chain(inbound_after, outbound_after):
assert conn["remote_addr"] != ""
assert conn["num_calls_in_flight"] >= 0
assert conn["num_calls_in_flight"] == len(conn["calls_in_flight"])
# Check rtt, which should be present in 'struct tcp_info' even in old kernels
# like 2.6.32.
# Skip these checking if using UDS.
if not krpc_use_uds:
assert conn["socket_stats"]["rtt"] > 0, conn
# send_queue_bytes uses TIOCOUTQ, which is also present in 2.6.32 and even older
# kernels.
assert conn["socket_stats"]["send_queue_bytes"] >= 0, conn
@pytest.mark.execute_serially
def test_admission_page(self):
"""Sanity check for the admission debug page's http end points (both admission and
reset stats end points)."""
# Make sure at least one query is submitted to the default pool since impala startup,
# so that it shows up in the admission control debug page. Checks for both with and
# without the pool_name search string.
self.client.execute("select 1")
response_json = self.__fetch_resource_pools_json()
# Find the default pool. It is either "root.default" if a fair-scheduler.xml file
# is provided or "default-pool" otherwise.
default_pool = None
for pool_json in response_json:
pool_name = pool_json['pool_name']
if pool_name in ['default-pool', 'root.default']:
default_pool = pool_name
break
assert default_pool is not None, \
"Expected a default pool to be present in {0}".format(response_json)
response_json = self.__fetch_resource_pools_json(default_pool)
assert response_json[0]['pool_name'] == default_pool
# Make sure the reset informational stats endpoint works, both with and without the
# pool_name search string.
assert response_json[0]['total_admitted'] > 0
self.get_and_check_status(
self.RESET_RESOURCE_POOL_STATS_URL + "?pool_name={0}".format(default_pool),
ports_to_test=[25000])
response_json = self.__fetch_resource_pools_json(default_pool)
assert response_json[0]['total_admitted'] == 0
self.client.execute("select 1")
response_json = self.__fetch_resource_pools_json(default_pool)
assert response_json[0]['total_admitted'] > 0
self.get_and_check_status(self.RESET_RESOURCE_POOL_STATS_URL, ports_to_test=[25000])
response_json = self.__fetch_resource_pools_json(default_pool)
pool_config = response_json[0]
assert pool_config['total_admitted'] == 0
# check that metrics exist
assert 'max_query_mem_limit' in pool_config
assert 'min_query_mem_limit' in pool_config
assert 'clamp_mem_limit_query_option' in pool_config
def __fetch_resource_pools_json(self, pool_name=None):
"""Helper method used to fetch the resource pool json from the admission debug page.
If a 'pool_name' is passed to this method, it adds the pool_name search string to the
http request."""
search_string = "?json"
if pool_name is not None:
search_string += "&pool_name=" + pool_name
responses = self.get_and_check_status(self.ADMISSION_URL + search_string,
ports_to_test=[25000])
assert len(responses) == 1
response_json = json.loads(responses[0].text)
assert 'resource_pools' in response_json
return response_json['resource_pools']
@SkipIfBuildType.remote
def test_backends_page(self):
"""Sanity check for the backends debug page's http end point"""
responses = self.get_and_check_status(self.BACKENDS_URL + '?json',
ports_to_test=[25000])
assert len(responses) == 1
response_json = json.loads(responses[0].text)
assert 'backends' in response_json
# When this test runs, all impalads would have already started.
assert len(response_json['backends']) == 3
assert response_json['num_active_backends'] == 3
assert 'num_quiescing_backends' not in response_json
assert 'num_blacklisted_backends' not in response_json
# Look at results for a single backend - they are not sorted.
backend_row = response_json['backends'][0]
assert len(backend_row['webserver_url']) > 0
webserver_ports = ('25000', '25001', '25002')
assert backend_row['webserver_url'].endswith(webserver_ports)
# The 'address' column is the backend port of the impalad.
assert len(backend_row['address']) > 0
krpc_ports = ('27000', '27001', '27002')
assert backend_row['address'].endswith(krpc_ports)
# The 'krpc_address' is the krpc address of the impalad.
assert len(backend_row['krpc_address']) > 0
krpc_ports = ('27000', '27001', '27002')
assert backend_row['krpc_address'].endswith(krpc_ports)
assert backend_row['is_coordinator']
assert backend_row['is_executor']
assert not backend_row['is_quiescing']
assert not backend_row['is_blacklisted']
assert len(backend_row['admit_mem_limit']) > 0
assert len(backend_row['process_start_time']) > 0
assert len(backend_row['version']) > 0
def test_download_profile(self):
"""Test download text profile for a query"""
query = "select count(*) from functional.alltypes"
query_id = self.client.execute(query).query_id
profile_page_url = "{0}query_profile?query_id={1}".format(
self.ROOT_URL, query_id)
# Check the download tag is there.
for profile_format in ["Text", "Json"]:
responses = self.get_and_check_status(
profile_page_url, profile_format,
ports_to_test=self.IMPALAD_TEST_PORT)
assert len(responses) == 1
if profile_format == 'Text':
download_link = "query_profile_plain_text?query_id={0}".format(query_id)
assert download_link in responses[0].text
# Get the response from download link and validate it by checking
# the query is in the file.
responses = self.get_and_check_status(
self.ROOT_URL + download_link, query, self.IMPALAD_TEST_PORT)
# Check the query id is in the content of the response.
assert len(responses) == 1
assert query_id in responses[0].text
elif profile_format == 'Json':
download_link = "query_profile_json?query_id={0}".format(query_id)
assert download_link in responses[0].text
# Get the response from download link and validate it by checking
# the query is in the file.
responses = self.get_and_check_status(
self.ROOT_URL + download_link, query, self.IMPALAD_TEST_PORT)
assert len(responses) == 1
# Check the return content is valid json
try:
json_res = json.loads(responses[0].text)
except ValueError:
assert False, "Downloaded JSON format query profile cannot be parsed. " \
"Json profile:{0}".format(responses[0].text)
# Find the query id in json
assert query_id in json_res["contents"]["profile_name"], json_res
def test_prometheus_metrics(self):
"""Test to check prometheus metrics"""
resp = self.get_and_check_status(self.PROMETHEUS_METRICS_URL)
assert len(resp) == 3
# check if metric shows up
assert 'impala_statestore_subscriber_heartbeat_interval_time_min' in resp[0].text
for response in resp:
# Parse Prometheus text format using prometheus_client library.
for _ in text_string_to_metric_families(response.text):
# We have to loop through the result to force the lazy parsing function
# to parse every line.
continue
def test_healthz_endpoint(self):
"""Test to check that the /healthz endpoint returns 200 OK."""
for port in self.TEST_PORTS_WITH_SS:
page = requests.get(self.HEALTHZ_URL.format(port))
assert page.status_code == requests.codes.ok
page = requests.head(self.HEALTHZ_URL.format(port))
assert page.status_code == requests.codes.ok
def test_knox_compatibility(self):
"""Checks that the template files conform to the requirements for compatibility with
the Apache Knox service definition."""
# Matches all 'a' links with an 'href' that doesn't start with either '#' (which stays
# on the same page and so doesn't need to the hostname) or '{{ __common__.host-url }}'
# Note that if we ever need to add a link that doesn't conform to this, we will
# probably also have to change the Knox service definition.
href_regex = "<(a|link) .*? href=['\"](?!({{ __common__.host-url }})|#)"
# Matches all 'script' tags that aren't absolute urls.
script_regex = "<script .*?src=['\"](?!({{ __common__.host-url }})|http)"
# Matches all 'form' tags that are not followed by including the hidden inputs.
form_regex = "<form [^{]*?>(?!{{>www/form-hidden-inputs.tmpl}})"
# Matches XMLHttpRequest.open() in javascript that are not followed with make_url().
javascript_regex = r"open\(['\"]GET['\"], (?!make_url)"
# Matches urls in json parameters passed to DataTables.
datatables_regex = "url: ['\"](?!make_url)"
# Matches all references of paths that contain '/www/' but are not fully qualified.
path_regex = r"((?<!{{ __common__.host-url }})(?<!make_url\(\")/www/.*)"
# Matches all links with an 'href' attribute that doesn't start with 'URL', or '`$',
# or '`{{ __common__.host-url }}'.
link_regex = r"(.*link.*\.href[\s]=[\s](?!(`{{ __common__.host-url }})|(`\$)|URL))"
regex = "(%s)|(%s)|(%s)|(%s)|(%s)|(%s)|(%s)" % \
(href_regex, script_regex, form_regex, javascript_regex, datatables_regex,
path_regex, link_regex)
results = grep_dir(os.path.join(os.environ['IMPALA_HOME'], "www"), regex,
r".*\.tmpl")
assert len(results) == 0, \
"All links on the webui must include the webserver host: %s" % results
# Check that when Knox integration is not being used, the links are relative, by
# checking for the root link from the header.
self.get_and_check_status(self.ROOT_URL, "href='/'", self.IMPALAD_TEST_PORT)
# Check that if the 'x-forwarded-context' header is present in the request, the links
# are written as absolute.
self.get_and_check_status(self.ROOT_URL,
"href='http://.*:%s/'" % self.IMPALAD_TEST_PORT[0], self.IMPALAD_TEST_PORT,
regex=True, headers={'X-Forwarded-Context': '/gateway'})
def test_catalog_operations_endpoint(self):
"""Test to check that the /operations endpoint returns 200 OK."""
page = requests.get("http://localhost:25020/operations")
assert page.status_code == requests.codes.ok
page = requests.head("http://localhost:25020/operations")
assert page.status_code == requests.codes.ok
def test_catalog_operation_fields(self, unique_database):
"""Verify the CREATE_DATABASE operation is consistent with the statement shown in the
/queries page of impalad."""
catalog_operations_page = requests.get("http://localhost:25020/operations?json").text
catalog_operations = json.loads(catalog_operations_page)
assert "finished_catalog_operations" in catalog_operations
queries_page = requests.get("http://localhost:25000/queries?json").text
queries = json.loads(queries_page)
assert "completed_queries" in queries
# Find the CREATE_DATABASE operation in catalogd
ts_format = "%Y-%m-%d %H:%M:%S.%f"
found = False
for op in catalog_operations["finished_catalog_operations"]:
if op["target_name"] == unique_database \
and op["catalog_op_name"] == "CREATE_DATABASE":
catalog_op_query_id = op["query_id"]
catalog_op_user = op["user"]
catalog_op_start_time = datetime.strptime(op["start_time"], ts_format)
catalog_op_end_time = datetime.strptime(op["finish_time"], ts_format)
catalog_op_duration = parse_duration_string_ms(op["duration"])
found = True
LOG.info("Found query id in catalog operations: " + catalog_op_query_id)
break
assert found
def verify_query_record(query):
assert "CREATE DATABASE" in query["stmt"]
assert unique_database in query["stmt"]
assert catalog_op_user == query["effective_user"]
assert datetime.strptime(query["start_time"], ts_format) <= catalog_op_start_time
assert datetime.strptime(query["end_time"], ts_format) >= catalog_op_end_time
assert parse_duration_string_ms(query["duration"]) >= catalog_op_duration
# Find the query in impalad
matched = False
for query in queries["completed_queries"]:
if query["query_id"] == catalog_op_query_id:
verify_query_record(query)
matched = True
if not matched:
LOG.info("Query id {0} not found in the completed queries list".format(
catalog_op_query_id))
# Try to find the query in the in-flight queries list. It could be waiting to
# be closed.
for query in queries["in_flight_queries"]:
if query["query_id"] == catalog_op_query_id:
verify_query_record(query)
matched = True
# Dump web pages for debug
if not matched:
LOG.info("Query id {0} not found in queries page!".format(catalog_op_query_id))
LOG.info("Catalog operations: " + catalog_operations_page)
LOG.info("Queries: " + queries_page)
assert matched
def test_catalog_operation_detail_endpoint(self, unique_database):
"""Test the /operation_detail endpoint shows detailed information about catalog
operations including thrift request, timeline, and byte sizes."""
# Execute a DDL to generate a catalog operation
# Use CTAS to generate multiple catalog operations
result = self.execute_query(
"create table {0}.test_detail as select * from functional.alltypes limit 10"
.format(unique_database))
expected_query_id = result.query_id
# Get the operation from /operations endpoint
catalog_operations_page = requests.get("http://localhost:25020/operations?json").text
catalog_operations = json.loads(catalog_operations_page)
assert "finished_catalog_operations" in catalog_operations
assert len(catalog_operations["finished_catalog_operations"]) > 0
# Find finished CTAS operations - there should be two:
# CREATE_TABLE_AS_SELECT and FINALIZE_CREATE_TABLE_AS_SELECT
ctas_operations = []
for op in catalog_operations["finished_catalog_operations"]:
if expected_query_id in op["query_id"] and \
("CREATE_TABLE_AS_SELECT" in op["catalog_op_name"]
or "FINALIZE_CREATE_TABLE_AS_SELECT" in op["catalog_op_name"]):
ctas_operations.append(op)
assert len(ctas_operations) == 2, \
"Expected 2 CTAS operations (CREATE_TABLE_AS_SELECT and " \
"FINALIZE_CREATE_TABLE_AS_SELECT), found {0}. " \
"Operations: {1}".format(len(ctas_operations),
", ".join([op["catalog_op_name"] for op in ctas_operations]))
# Verify both operations have the correct query_id
for test_op in ctas_operations:
query_id = test_op["query_id"]
thread_id = test_op["thread_id"]
start_time_ms = test_op["start_time_ms"]
# Verify query_id matches what we got from execute_query
assert expected_query_id in query_id, \
"Query ID mismatch: expected {0}, got {1}".format(expected_query_id, query_id)
LOG.info("Testing operation detail for query_id: {0}, thread_id: {1}, op: {2}"
.format(query_id, thread_id, test_op["catalog_op_name"]))
# Test HTML endpoint returns 200 OK (requires start_time_ms for finished operations)
html_response = requests.get(
"http://localhost:25020/operation_detail?query_id={0}&thread_id={1}"
"&start_time_ms={2}".format(query_id, thread_id, start_time_ms))
assert html_response.status_code == requests.codes.ok
assert "Operation Information" in html_response.text
assert "Execution Timeline" in html_response.text
# Test JSON endpoint returns proper data
json_response = requests.get(
"http://localhost:25020/operation_detail?query_id={0}&thread_id={1}"
"&start_time_ms={2}&json".format(query_id, thread_id, start_time_ms))
assert json_response.status_code == requests.codes.ok
detail_data = json.loads(json_response.text)
# Verify basic operation fields are present
assert "query_id" in detail_data
assert detail_data["query_id"] == query_id
assert "catalog_op_name" in detail_data
assert test_op["catalog_op_name"] in detail_data["catalog_op_name"]
assert "target_name" in detail_data
assert "user" in detail_data
assert "status" in detail_data
assert "start_time" in detail_data
assert "finish_time" in detail_data
assert "duration" in detail_data
assert "timeline" in detail_data, "timeline field missing"
assert "request_size_bytes" in detail_data, "request_size_bytes field missing"
assert "response_size_bytes" in detail_data, "response_size_bytes field missing"
# Verify the fields contain actual data
assert len(detail_data["timeline"]) > 0, "timeline should not be empty"
assert detail_data["request_size_bytes"] > 0, \
"request_size_bytes should be positive"
assert detail_data["response_size_bytes"] > 0, \
"response_size_bytes should be positive"
# Verify timeline is formatted text (pre-formatted on server side)
timeline_str = detail_data["timeline"]
assert isinstance(timeline_str, str), "timeline should be a formatted string"
# Check for expected timeline format from RuntimeProfile
assert "Catalog Server Operation:" in timeline_str, \
"timeline should have the operation name"
# Check that it contains typical timeline events
assert ("Got" in timeline_str or "DDL" in timeline_str
or "finished" in timeline_str), "timeline should contain at least one event"
def test_catalog_operation_detail_invalid_query_id(self):
"""Test that /operation_detail handles invalid query_id gracefully."""
# Test with a non-existent query_id and thread_id
response = requests.get(
"http://localhost:25020/operation_detail"
"?query_id=nonexistent:123456789abcd"
"&thread_id=999&json"
)
assert response.status_code == requests.codes.ok
detail_data = json.loads(response.text)
assert "error" in detail_data
# Test without thread_id (should return error about missing parameter)
response = requests.get(
"http://localhost:25020/operation_detail?query_id=nonexistent:123456789abcd&json")
assert response.status_code == requests.codes.ok
detail_data = json.loads(response.text)
assert "error" in detail_data
assert "thread_id" in detail_data["error"].lower(), \
"Should return error for non-existent query_id"
# Test with missing query_id parameter
response = requests.get("http://localhost:25020/operation_detail?json")
assert response.status_code == requests.codes.ok
detail_data = json.loads(response.text)
assert "error" in detail_data, \
"Should return error when query_id parameter is missing"
# Test with missing start_time_ms for a finished operation
# First, execute a simple DDL to get a finished operation
self.execute_query("create database if not exists test_invalid_param_db")
catalog_operations_page = requests.get("http://localhost:25020/operations?json").text
catalog_operations = json.loads(catalog_operations_page)
if len(catalog_operations.get("finished_catalog_operations", [])) > 0:
finished_op = catalog_operations["finished_catalog_operations"][0]
query_id = finished_op["query_id"]
thread_id = finished_op["thread_id"]
# Try to access finished operation without start_time_ms (should fail)
response = requests.get(
"http://localhost:25020/operation_detail?query_id={0}&thread_id={1}&json"
.format(query_id, thread_id))
assert response.status_code == requests.codes.ok
detail_data = json.loads(response.text)
assert "error" in detail_data, \
"Should return error when start_time_ms is missing for finished operation"
assert "start_time_ms" in detail_data["error"].lower(), \
"Error message should mention start_time_ms"
@pytest.mark.execute_serially
def test_catalog_operation_detail_in_flight(self, unique_database):
"""Test that /operation_detail shows real-time timeline for in-flight operations.
Uses a debug action to inject a delay in DDL execution."""
# Create a table first, then refresh it with a delay
# Use catalogd_refresh_hdfs_listing_delay which triggers during REFRESH
self.client.execute("create table {0}.test_in_flight (id int)"
.format(unique_database))
delay_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@100"
# Start the REFRESH asynchronously
refresh_stmt = "refresh {0}.test_in_flight".format(unique_database)
# Set the debug action before executing
self.client.set_configuration({"debug_action": delay_action})
handle = self.client.execute_async(refresh_stmt)
# Get the query_id from the handle
expected_query_id = self.client.get_query_id(handle)
try:
# Poll for the in-flight operation (with timeout)
inflight_op = None
query_id = None
max_attempts = 20
attempt = 0
while attempt < max_attempts and inflight_op is None:
attempt += 1
# Get in-flight operations
catalog_operations_page = requests.get(
"http://localhost:25020/operations?json").text
catalog_operations = json.loads(catalog_operations_page)
# Find the in-flight REFRESH operation
if "inflight_catalog_operations" in catalog_operations:
for op in catalog_operations["inflight_catalog_operations"]:
if op["catalog_op_name"] == "REFRESH" and \
op["target_name"] == "{0}.test_in_flight".format(unique_database):
inflight_op = op
query_id = op["query_id"]
break
# If not found yet, sleep before next attempt
if inflight_op is None and attempt < max_attempts:
time.sleep(0.1)
# Assert that we found the in-flight operation
assert inflight_op is not None, \
"In-flight REFRESH operation not found after {0} attempts".format(max_attempts)
thread_id = inflight_op["thread_id"]
# Verify query_id matches what we got from the handle
assert expected_query_id in query_id, \
"Query ID mismatch: expected {0}, got {1}".format(expected_query_id, query_id)
LOG.info("Found in-flight operation with query_id: {0}, thread_id: {1}".format(
query_id, thread_id))
# Test the operation_detail page for the in-flight operation
json_response = requests.get(
"http://localhost:25020/operation_detail?query_id={0}&thread_id={1}&json"
.format(query_id, thread_id))
assert json_response.status_code == requests.codes.ok
detail_data = json.loads(json_response.text)
# Verify the operation is in STARTED status
assert detail_data["status"] == "STARTED", \
"In-flight operation should have STARTED status"
# Verify timeline is present for in-flight operation
assert "timeline" in detail_data, \
"Timeline should be present for in-flight operation"
assert len(detail_data["timeline"]) > 0, \
"Timeline should not be empty for in-flight operation"
# Verify timeline is formatted text (not JSON)
timeline_str = detail_data["timeline"]
assert isinstance(timeline_str, str), "Timeline should be a formatted string"
# Check for expected timeline format from RuntimeProfile
assert "Catalog Server Operation:" in timeline_str, \
"Timeline should have the operation name"
assert ("Got Metastore client" in timeline_str or "Got catalog version"
in timeline_str), "Timeline should contain at least one event"
LOG.info("In-flight operation detail test passed")
finally:
# Wait for the query to complete and close it
self.client.wait_for_finished_timeout(handle, 30)
self.client.close_query(handle)
def test_catalog_metrics(self):
"""Test /metrics of catalogd"""
url = self.METRICS_URL.format(*self.CATALOG_TEST_PORT) + "?json"
json_res = json.loads(requests.get(url).text)
metric_keys = {m["name"] for m in json_res["metric_group"]["metrics"]}
assert "catalog-server.metadata.file.num-loading-threads" in metric_keys
assert "catalog-server.metadata.file.num-loading-tasks" in metric_keys
assert "catalog-server.metadata.table.num-loading-file-metadata" in metric_keys
assert "catalog-server.metadata.table.num-loading-metadata" in metric_keys
assert "catalog-server.metadata.table.async-loading.num-in-progress" in metric_keys
assert "catalog-server.metadata.table.async-loading.queue-len" in metric_keys
assert "catalog.num-databases" in metric_keys
assert "catalog.num-tables" in metric_keys
assert "catalog.num-functions" in metric_keys
assert "catalog.hms-client-pool.num-idle" in metric_keys
assert "catalog.hms-client-pool.num-in-use" in metric_keys
assert "catalog.num-loaded-tables" in metric_keys
def test_iceberg_table_metrics(self):
assert '23448' == self.__get_table_metric(
"functional_parquet", "iceberg_non_partitioned", "total-file-size-bytes")
assert '20' == self.__get_table_metric(
"functional_parquet", "iceberg_non_partitioned", "num-files")
if supports_storage_ids():
# Target FS has block location information.
assert '20' == self.__get_table_metric(
"functional_parquet", "iceberg_non_partitioned", "num-blocks")
assert '13000' == self.__get_table_metric(
"functional_parquet", "iceberg_non_partitioned", "memory-estimate-bytes")
else:
# Target FS doesn't have block locations, so 'memory-estimate-bytes' differ.
assert '10000' == self.__get_table_metric(
"functional_parquet", "iceberg_non_partitioned", "memory-estimate-bytes")
def __get_table_metric(self, db_name, tbl_name, metric):
self.client.execute("refresh %s.%s" % (db_name, tbl_name))
responses = self.get_and_check_status(self.TABLE_METRICS_URL + "?name=%s.%s&json" %
(db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT)
response_json = json.loads(responses[0].text)
metrics_text = response_json['table_metrics']
for line in metrics_text.split('\n'):
if line.startswith(metric):
return line.split(": ")[1]
return None
def test_query_progress(self):
"""Tests that /queries page shows query progress."""
query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)"
(query_id, response_json) = self.run_query_and_get_debug_page(
query, self.QUERIES_URL, expected_state=RUNNING)
found = False
for json_part in response_json['in_flight_queries']:
if query_id in json_part['query_id']:
found = True
assert json_part["state"] == "RUNNING"
assert json_part["waiting"] is False
assert json_part["executing"] is True
assert json_part["query_progress"] == "0 / 4 ( 0%)"
assert found, "Query {} not found in response_json\n{}".format(
query_id, json.dumps(response_json, sort_keys=True, indent=4))
# CatalogV2 doesn't have the delay loading metadata after invalidate, so this test
# is only applicable to CatalogV1. test_query_cancel_load_tables is sufficient for V2.
@SkipIfCatalogV2.catalog_v1_test()
@pytest.mark.execute_serially
def test_query_cancel_load_metadata(self):
"""Tests that we can cancel a query in the CREATED state while catalogd loads
metadata. Invalidate metadata introduces a delay while catalogd loads metadata."""
impalad = ImpalaCluster.get_e2e_test_cluster().impalads[0].service
wait_for_state(impalad, None)
result = self.execute_query("invalidate metadata functional_parquet.alltypes")
assert result.success
# Start the query completely async. The server doesn't return a response until
# the query has exited the CREATED state, so we need to get the query ID another way.
proc, queue = start(self, "select count(*) from functional_parquet.alltypes")
wait_for_state(impalad, 'CREATED')
in_flight_queries = impalad.get_debug_webpage_json('queries')['in_flight_queries']
assert len(in_flight_queries) == 1
assert in_flight_queries[0]['state'] == 'CREATED'
query_id = in_flight_queries[0]['query_id']
cancel(impalad, query_id)
# Verify query was cancelled. Cancel and fetch requests can return before cancellation
# is finalized, so wait for the original request to return.
assert "UserCancelledException: Query cancelled by user request" in join(proc, queue)
wait_for_state(impalad, None)
assert_query_stopped(impalad, query_id)
response = impalad.read_debug_webpage(
"query_profile_plain_text?query_id={}".format(query_id))
assert "Cancelled from Impala&apos;s debug web interface by user: " \
"&apos;anonymous&apos; at" in response
@pytest.mark.execute_serially
def test_query_cancel_load_tables(self):
"""Tests that we can cancel a query in the CREATED state while loading tables."""
impalad = ImpalaCluster.get_e2e_test_cluster().impalads[0].service
wait_for_state(impalad, None)
delay_created_action = "impalad_load_tables_delay:SLEEP@5000"
# Start the query completely async. The server doesn't return a response until
# the query has exited the CREATED state, so we need to get the query ID another way.
proc, queue = start(self, "select count(*) from functional_parquet.alltypes",
{'debug_action': delay_created_action})
wait_for_state(impalad, 'CREATED')
in_flight_queries = impalad.get_debug_webpage_json('queries')['in_flight_queries']
assert len(in_flight_queries) == 1
assert in_flight_queries[0]['state'] == 'CREATED'
query_id = in_flight_queries[0]['query_id']
# Call cancel multiple times to ensure it's idempotent.
procs = [Process(target=cancel, args=(impalad, query_id)) for _ in range(3)]
for proc in procs:
proc.start()
for proc in procs:
proc.join()
# Verify query was cancelled. Cancel and fetch requests can return before cancellation
# is finalized, so wait for the original request to return and retry get_queries.
assert "UserCancelledException: Query cancelled by user request" in join(proc, queue)
wait_for_state(impalad, None)
assert_query_stopped(impalad, query_id)
response = impalad.read_debug_webpage(
"query_profile_plain_text?query_id={}".format(query_id))
assert "Cancelled from Impala&apos;s debug web interface by user: " \
"&apos;anonymous&apos; at" in response
@pytest.mark.execute_serially
def test_hadoop_varz_page(self):
"""test for /hadoop-var to check availability of hadoop configuration like
hive warehouse dir, fs.defaultFS"""
responses = self.get_and_check_status(self.HADOOP_VARZ_URL,
"hive.metastore.warehouse.dir", ports_to_test=self.TEST_PORTS_WITHOUT_SS)
responses = self.get_and_check_status(self.HADOOP_VARZ_URL,
"hive.metastore.warehouse.external.dir", ports_to_test=self.TEST_PORTS_WITHOUT_SS)
responses = self.get_and_check_status(self.HADOOP_VARZ_URL,
"fs.defaultFS", ports_to_test=self.TEST_PORTS_WITHOUT_SS)
# check if response size is 2 , for both catalog and impalad webUI
assert len(responses) == 2