| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from tests.common.environ import ImpalaTestClusterFlagsDetector |
| from tests.common.file_utils import grep_dir |
| from tests.common.skip import SkipIfBuildType, SkipIfCatalogV2 |
| from tests.common.impala_cluster import ImpalaCluster |
| from tests.common.impala_connection import FINISHED, RUNNING |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.test_vector import HS2 |
| from tests.util.filesystem_utils import supports_storage_ids |
| from tests.util.parse_util import parse_duration_string_ms |
| from tests.util.web_pages_util import ( |
| cancel, wait_for_state, assert_query_stopped, start, join) |
| from datetime import datetime |
| from prometheus_client.parser import text_string_to_metric_families |
| from multiprocessing import Process |
| import itertools |
| import json |
| import logging |
| import os |
| import pytest |
| import re |
| import requests |
| import time |
| |
| LOG = logging.getLogger(__name__) |
| |
| |
| class TestWebPage(ImpalaTestSuite): |
| ROOT_URL = "http://localhost:{0}/" |
| SET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/set_java_loglevel" |
| RESET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/reset_java_loglevel" |
| SET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/set_glog_level" |
| RESET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/reset_glog_level" |
| CATALOG_URL = "http://localhost:{0}/catalog" |
| CATALOG_OBJECT_URL = "http://localhost:{0}/catalog_object" |
| TABLE_METRICS_URL = "http://localhost:{0}/table_metrics" |
| QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends" |
| QUERY_FINSTANCES_URL = "http://localhost:{0}/query_finstances" |
| RPCZ_URL = "http://localhost:{0}/rpcz" |
| THREAD_GROUP_URL = "http://localhost:{0}/thread-group" |
| MEMZ_URL = "http://localhost:{0}/memz" |
| METRICS_URL = "http://localhost:{0}/metrics" |
| JMX_URL = "http://localhost:{0}/jmx" |
| ADMISSION_URL = "http://localhost:{0}/admission" |
| RESET_RESOURCE_POOL_STATS_URL = "http://localhost:{0}/resource_pool_reset" |
| BACKENDS_URL = "http://localhost:{0}/backends" |
| PROMETHEUS_METRICS_URL = "http://localhost:{0}/metrics_prometheus" |
| QUERIES_URL = "http://localhost:{0}/queries" |
| QUERY_PLAN = "http://localhost:{0}/query_plan" |
| HEALTHZ_URL = "http://localhost:{0}/healthz" |
| EVENT_PROCESSOR_URL = "http://localhost:{0}/events" |
| HADOOP_VARZ_URL = "http://localhost:{0}/hadoop-varz" |
| JVM_THREADZ_URL = "http://localhost:{0}/jvm-threadz" |
| STACKS_URL = "http://localhost:{0}/stacks" |
| |
| # log4j changes do not apply to the statestore since it doesn't |
| # have an embedded JVM. So we make two sets of ports to test the |
| # log level endpoints, one without the statestore port and the |
| # one with it. |
| TEST_PORTS_WITHOUT_SS = ["25000", "25020"] |
| TEST_PORTS_WITH_SS = ["25000", "25010", "25020"] |
| IMPALAD_TEST_PORT = ["25000"] |
| CATALOG_TEST_PORT = ["25020"] |
| |
| def test_get_root_url(self): |
| """Tests that the root URL is accessible and loads properly""" |
| self.get_and_check_status(self.ROOT_URL) |
| |
| def test_content_encoding(self): |
| responses = self.get_and_check_status(self.ROOT_URL, headers={"Accept-Encoding": ""}) |
| for response in responses: |
| assert "Content-Encoding" not in response.headers |
| responses = self.get_and_check_status(self.ROOT_URL, |
| headers={"Accept-Encoding": "gzip"}) |
| for response in responses: |
| assert "Content-Encoding" in response.headers |
| assert response.headers["Content-Encoding"] == "gzip" |
| |
| def test_get_build_flags(self): |
| """Tests that the build flags on the root page contain valid values""" |
| for port in self.TEST_PORTS_WITH_SS: |
| build_flags = ImpalaTestClusterFlagsDetector.\ |
| get_build_flags_from_web_ui(self.ROOT_URL.format(port)) |
| |
| assert len(build_flags) == 3 |
| assert "is_ndebug" in build_flags |
| assert build_flags["is_ndebug"] in ["true", "false"] |
| assert "cmake_build_type" in build_flags |
| assert build_flags["cmake_build_type"] in ["debug", "release", "address_sanitizer", |
| "tidy", "ubsan", "ubsan_full", "tsan", "tsan_full", "code_coverage_release", |
| "code_coverage_debug", "debug_noopt"] |
| assert "library_link_type" in build_flags |
| assert build_flags["library_link_type"] in ["dynamic", "static"] |
| |
| @SkipIfBuildType.remote |
| def test_root_correct_build_flags(self, cluster_properties): |
| """Tests that the build flags on the root page contain correct values""" |
| assert not cluster_properties.is_remote_cluster() |
| for port in self.TEST_PORTS_WITH_SS: |
| build_flags = ImpalaTestClusterFlagsDetector.\ |
| get_build_flags_from_web_ui(self.ROOT_URL.format(port)) |
| |
| assert build_flags["cmake_build_type"] == cluster_properties.build_flavor |
| assert build_flags["library_link_type"] == cluster_properties.library_link_type |
| |
| def test_root_consistent_build_flags(self): |
| """Tests that the build flags on the root page contain consistent values""" |
| for port in self.TEST_PORTS_WITH_SS: |
| build_flags = ImpalaTestClusterFlagsDetector.\ |
| get_build_flags_from_web_ui(self.ROOT_URL.format(port)) |
| |
| is_ndebug = build_flags["is_ndebug"] == "true" |
| |
| if not is_ndebug: |
| assert not build_flags["cmake_build_type"] in ["release"] |
| |
| if build_flags["cmake_build_type"] in ["debug"]: |
| assert not is_ndebug |
| |
| def test_root_other_info(self): |
| """Tests to check glibc version and locale is available""" |
| for port in self.TEST_PORTS_WITH_SS: |
| other_info_page = requests.get(self.ROOT_URL.format(port) + "/?json").text |
| other_info = json.loads(other_info_page) |
| assert "effective_locale" in other_info |
| assert "glibc_version" in other_info |
| |
| def test_jvm_threadz(self): |
| """Tests if timestamp is available in jvm-threadz page""" |
| for port in self.TEST_PORTS_WITHOUT_SS: |
| response = requests.get(self.JVM_THREADZ_URL.format(port) + "?json") |
| assert response.status_code == requests.codes.ok |
| jvm_threadz_page = json.loads(response.text) |
| overview = jvm_threadz_page["overview"] |
| assert len(overview) == 4 |
| assert "timestamp" in overview |
| |
| def test_memz(self): |
| """Tests /memz at impalad / statestored / catalogd""" |
| |
| page = requests.get("http://localhost:25000/memz") |
| assert page.status_code == requests.codes.ok |
| page = requests.get("http://localhost:25010/memz") |
| assert page.status_code == requests.codes.ok |
| page = requests.get("http://localhost:25020/memz") |
| assert page.status_code == requests.codes.ok |
| page = requests.head("http://localhost:25000/memz") |
| assert page.status_code == requests.codes.ok |
| page = requests.head("http://localhost:25010/memz") |
| assert page.status_code == requests.codes.ok |
| page = requests.head("http://localhost:25020/memz") |
| assert page.status_code == requests.codes.ok |
| |
| def test_memz_shows_fragment_instance_id(self): |
| """Tests that the memory breakdown on memz shows fragment instance IDs.""" |
| query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)" |
| query_handle = self.client.execute_async(query) |
| try: |
| self.client.wait_for_impala_state(query_handle, RUNNING, 1000) |
| memz_breakdown = self._get_debug_page(self.MEMZ_URL)['detailed'] |
| finstance_re = re.compile("Fragment [0-9a-f]{16}:[0-9a-f]{16}") |
| assert finstance_re.search(memz_breakdown), memz_breakdown |
| finally: |
| self.client.close_query(query_handle) |
| |
| def test_query_profile_encoded_unknown_query_id(self): |
| """Test that /query_profile_encoded error message starts with the expected line in |
| case of missing query and does not contain any leading whitespace. |
| """ |
| cluster = ImpalaCluster.get_e2e_test_cluster() |
| impalad = cluster.get_any_impalad() |
| result = impalad.service.read_debug_webpage("query_profile_encoded?query_id=123") |
| assert result.startswith("Could not obtain runtime profile: Query id") |
| |
| def test_jmx_endpoint(self): |
| """Tests that the /jmx endpoint on the Catalog and Impalads returns a valid json.""" |
| for port in self.TEST_PORTS_WITHOUT_SS: |
| input_url = self.JMX_URL.format(port) |
| response = requests.get(input_url) |
| assert response.status_code == requests.codes.ok |
| assert "application/json" == response.headers['Content-Type'] |
| jmx_json = "" |
| try: |
| jmx_json = json.loads(response.text) |
| assert "beans" in jmx_json.keys(), "Ill formatted JSON returned: %s" % jmx_json |
| except ValueError: |
| assert False, "Invalid JSON returned from /jmx endpoint: %s" % jmx_json |
| |
| def test_stacks_endpoint(self): |
| """Tests that the /stacks endpoint on all daemons returns valid stack traces.""" |
| for port in self.TEST_PORTS_WITH_SS: |
| input_url = self.STACKS_URL.format(port) |
| response = requests.get(input_url) |
| assert response.status_code == requests.codes.ok |
| # The /stacks endpoint returns plain text |
| assert "text/plain" in response.headers['Content-Type'] |
| |
| # Verify the response contains expected stack trace elements |
| stack_text = response.text |
| assert "Collected stacks from" in stack_text, \ |
| "Missing collection summary in response from port %s" % port |
| assert "threads in" in stack_text, \ |
| "Missing thread count in response from port %s" % port |
| assert "TID" in stack_text, \ |
| "Missing thread IDs in response from port %s" % port |
| |
| # Verify it contains at least one thread with a stack trace |
| # Stack traces should have lines starting with '@' for frame information |
| assert "@" in stack_text, \ |
| "Missing stack frames in response from port %s" % port |
| |
| def test_stacks_endpoint_format(self): |
| """Tests that the /stacks endpoint returns properly formatted stack traces.""" |
| for port in self.TEST_PORTS_WITH_SS: |
| input_url = self.STACKS_URL.format(port) |
| response = requests.get(input_url) |
| assert response.status_code == requests.codes.ok |
| |
| stack_text = response.text |
| lines = stack_text.split('\n') |
| |
| # Verify the header line format |
| header_pattern = re.compile(r'Collected stacks from \d+ threads in \d+\.\d+s') |
| assert any(header_pattern.match(line) for line in lines), \ |
| "Missing or malformed collection summary header" |
| |
| # Verify thread ID format (e.g., "TID 1911294 (statestored):" or |
| # "TID 262192 (subscriber-priority-update-worker(2:10)):" with nested parens) |
| # Use a greedy match that captures everything between "TID <num> (" and "):" |
| tid_pattern = re.compile(r'TID \d+ \(.+\):') |
| assert any(tid_pattern.match(line) for line in lines), \ |
| "Missing or malformed thread ID lines" |
| |
| # Verify stack frame format (lines starting with '@' and containing addresses) |
| frame_pattern = re.compile(r'^\s*@\s+0x[0-9a-f]+\s+.*') |
| assert any(frame_pattern.match(line) for line in lines), \ |
| "Missing or malformed stack frame lines" |
| |
| # Check for thread grouping format (e.g., "3 threads with same stack:") |
| # This may or may not be present depending on whether there are duplicate stacks |
| group_pattern = re.compile(r'^\d+ threads with same stack:') |
| has_grouped_threads = any(group_pattern.match(line) for line in lines) |
| |
| # If there are grouped threads, verify that the TIDs are listed before the stack |
| if has_grouped_threads: |
| # Find a group header |
| for i, line in enumerate(lines): |
| if group_pattern.match(line): |
| # The next few lines should be TID lines |
| assert i + 1 < len(lines), "Group header at end of output" |
| # At least one TID should follow |
| found_tid = False |
| for j in range(i + 1, min(i + 10, len(lines))): |
| if tid_pattern.match(lines[j]): |
| found_tid = True |
| break |
| assert found_tid, "No TID found after group header" |
| break |
| |
| def get_and_check_status( |
| self, url, string_to_search="", ports_to_test=None, regex=False, headers=None): |
| """Helper method that polls a given url and asserts the return code is ok and |
| the response contains the input string.""" |
| if ports_to_test is None: |
| ports_to_test = self.TEST_PORTS_WITH_SS |
| |
| responses = [] |
| for port in ports_to_test: |
| input_url = url.format(port) |
| response = requests.head(input_url, headers=headers) |
| assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\ |
| .format(input_url, string_to_search, response.text) |
| response = requests.get(input_url, headers=headers) |
| assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\ |
| .format(input_url, string_to_search, response.text) |
| if regex: |
| assert re.search(string_to_search, response.text), "URL: {0} Str:'{1}'\nResp:{2}"\ |
| .format(input_url, string_to_search, response.text) |
| else: |
| assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format( |
| input_url, string_to_search, response.text) |
| responses.append(response) |
| assert 'Content-Security-Policy' in response.headers, "CSP header missing" |
| return responses |
| |
| def post_and_check_status(self, url, data={}, string_to_search="", ports_to_test=None): |
| """Helper method that posts to a given url, then asserts the return code is ok and |
| the response contains the expected string.""" |
| if ports_to_test is None: |
| ports_to_test = self.TEST_PORTS_WITH_SS |
| |
| responses = [] |
| for port in ports_to_test: |
| input_url = url.format(port) |
| response = requests.head(input_url) |
| assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\ |
| .format(input_url, string_to_search, response.text) |
| response = requests.post(input_url, data=data) |
| assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\ |
| .format(input_url, string_to_search, response.text) |
| assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format( |
| input_url, string_to_search, response.text) |
| responses.append(response) |
| assert 'Content-Security-Policy' in response.headers, "CSP header missing" |
| return responses |
| |
| def _get_debug_page(self, page_url, port=25000): |
| """Returns the content of the debug page 'page_url' as json.""" |
| responses = self.get_and_check_status(page_url + "?json", ports_to_test=[port]) |
| assert len(responses) == 1 |
| assert "application/json" in responses[0].headers['Content-Type'] |
| return json.loads(responses[0].text) |
| |
| def get_and_check_status_jvm(self, url, string_to_search=""): |
| """Calls get_and_check_status() for impalad and catalogd only""" |
| return self.get_and_check_status(url, string_to_search, |
| ports_to_test=self.TEST_PORTS_WITHOUT_SS) |
| |
| def post_and_check_status_jvm(self, url, data={}, string_to_search=""): |
| """Calls post_and_check_status() for impalad and catalogd only""" |
| return self.post_and_check_status(url, data, string_to_search, |
| ports_to_test=self.TEST_PORTS_WITHOUT_SS) |
| |
| def test_content_type(self): |
| """Checks that an appropriate content-type is set for various types of pages.""" |
| # Mapping from each page to its MIME type. |
| page_to_mime =\ |
| {"?json": "application/json", "?raw": "text/plain; charset=UTF-8", |
| "": "text/html; charset=UTF-8"} |
| for port in self.TEST_PORTS_WITH_SS: |
| for page, content_type in page_to_mime.items(): |
| url = self.METRICS_URL.format(port) + page |
| assert content_type == requests.get(url).headers['Content-Type'] |
| |
| def test_log_level(self): |
| """Test that the /log_level page outputs are as expected and work well on basic and |
| malformed inputs. This however does not test that the log level changes are actually |
| in effect.""" |
| # Check that the log_level end points are accessible. |
| self.post_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL) |
| self.post_and_check_status_jvm(self.RESET_JAVA_LOGLEVEL_URL) |
| self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL) |
| self.post_and_check_status(self.RESET_GLOG_LOGLEVEL_URL) |
| |
| # Set the log level of a class to TRACE and confirm the setting is in place |
| self.post_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL, |
| {"class": "org.apache.impala.catalog.HdfsTable", "level": "trace"}, |
| "org.apache.impala.catalog.HdfsTable : TRACE") |
| |
| # Reset Java logging levels |
| self.post_and_check_status_jvm(self.RESET_JAVA_LOGLEVEL_URL, {}, |
| "Java log levels reset.") |
| |
| # Set a new glog level and make sure the setting has been applied. |
| self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"glog": 3}, "val(3)") |
| |
| # Try resetting the glog logging defaults again. |
| self.post_and_check_status(self.RESET_GLOG_LOGLEVEL_URL, {}, |
| "Current backend log level: ") |
| |
| # Try to set the log level with an empty class input |
| self.post_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL, {"class": ""}) |
| |
| # Empty input for setting a glog level request |
| self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"glog": ""}) |
| |
| # Try setting a non-existent log level on a valid class. In such cases, |
| # log4j automatically sets it as DEBUG. This is the behavior of |
| # Level.toLevel() method. |
| self.post_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL, |
| {"class": "org.apache.impala.catalog.HdfsTable", "level": "foo"}, |
| "org.apache.impala.catalog.HdfsTable : DEBUG") |
| |
| # Try setting an invalid glog level. |
| self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"glog": "foo"}, |
| "Bad glog level input") |
| |
| # Try a non-existent endpoint on log_level URL. |
| self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"badurl": "foo"}) |
| |
| @pytest.mark.execute_serially |
| def test_uda_with_log_level(self): |
| """IMPALA-7903: Impala crashes when executing an aggregate query with log level set |
| to 3. Running this test serially not to interfere with other tests setting the log |
| level.""" |
| # Check that the log_level end points are accessible. |
| self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL) |
| self.post_and_check_status(self.RESET_GLOG_LOGLEVEL_URL) |
| # Set log level to 3. |
| self.post_and_check_status(self.SET_GLOG_LOGLEVEL_URL, {"glog": 3}, "val(3)") |
| # Check that Impala doesn't crash when running a query that aggregates. |
| self.client.execute("select avg(int_col) from functional.alltypessmall") |
| # Reset log level. |
| self.post_and_check_status(self.RESET_GLOG_LOGLEVEL_URL, {}, |
| "Current backend log level: ") |
| |
| def test_catalog(self, cluster_properties, unique_database): |
| """Tests the /catalog and /catalog_object endpoints.""" |
| # Non-partitioned table |
| query = "create table {0}.foo (id int, val int)".format(unique_database) |
| self.execute_query(query) |
| insert_query = "insert into {0}.foo values (1, 200)".format(unique_database) |
| self.execute_query(insert_query) |
| # Partitioned table |
| partitioned_query = "create table {0}.foo_part (id int, val int) partitioned by (" \ |
| "year int)".format(unique_database) |
| self.execute_query(partitioned_query) |
| partition_insert_query = "insert into {0}.foo_part partition (year=2010) values "\ |
| "(1, 200)".format(unique_database) |
| self.execute_query(partition_insert_query) |
| # Kudu table |
| kudu_query = "create table {0}.foo_kudu (id int, val int, primary key (id)) " \ |
| "stored as kudu".format(unique_database) |
| self.execute_query(kudu_query) |
| kudu_insert_query = "insert into {0}.foo_kudu values (1, 200)".format(unique_database) |
| self.execute_query(kudu_insert_query) |
| # Partitioned parquet table |
| parquet_query = "create table {0}.foo_part_parquet (id int, val int) partitioned " \ |
| "by (year int) stored as parquet".format(unique_database) |
| self.execute_query(parquet_query) |
| parquet_insert_query = "insert into {0}.foo_part_parquet partition (year=2010) " \ |
| "values (1, 200)".format(unique_database) |
| self.execute_query(parquet_insert_query) |
| |
| self.get_and_check_status_jvm(self.CATALOG_URL, unique_database) |
| self.get_and_check_status_jvm(self.CATALOG_URL, "foo_part") |
| # IMPALA-5028: Test toThrift() of a partitioned table via the WebUI code path. |
| self.__test_catalog_object(unique_database, "foo_part", cluster_properties) |
| self.__test_catalog_object(unique_database, "foo_kudu", cluster_properties) |
| self.__test_catalog_object(unique_database, "foo_part_parquet", cluster_properties) |
| self.__test_catalog_object(unique_database, "foo", cluster_properties) |
| self.__test_json_db_object(unique_database) |
| self.__test_json_table_object(unique_database, "foo") |
| self.__test_json_table_object(unique_database, "foo_part") |
| self.__test_json_table_object(unique_database, "foo_part_parquet") |
| self.__test_table_metrics(unique_database, "foo_part", "total-file-size-bytes") |
| self.__test_table_metrics(unique_database, "foo_part", "num-files") |
| self.__test_table_metrics(unique_database, "foo_part", "alter-duration") |
| self.__test_table_metrics(unique_database, "foo_part", "events-process-duration") |
| self.__test_catalog_tablesfilesusage(unique_database, "foo_part", "1") |
| self.__test_catalog_tables_loading_time(unique_database, "foo_part") |
| self.get_and_check_status(self.EVENT_PROCESSOR_URL, "events-consuming-delay", |
| ports_to_test=self.CATALOG_TEST_PORT) |
| # Multi-key partitioned table |
| multi_part_query = "create table {0}.foo_multi_part (id int, val int) " \ |
| "partitioned by (year int, month int, day int)".format(unique_database) |
| self.execute_query(multi_part_query) |
| multi_part_insert_query = "insert into {0}.foo_multi_part partition " \ |
| "(year=2024, month=12, day=25) values (1, 200)".format(unique_database) |
| self.execute_query(multi_part_insert_query) |
| # Table with string partition that contains special characters |
| slash_part_query = "create table {0}.foo_slash_part (id int, val int) " \ |
| "partitioned by (ds string)".format(unique_database) |
| self.execute_query(slash_part_query) |
| slash_part_insert_query = "insert into {0}.foo_slash_part partition " \ |
| "(ds='2024/12/25') values (1, 200)".format(unique_database) |
| self.execute_query(slash_part_insert_query) |
| |
| # Test partition catalog objects (IMPALA-9935) |
| self.__test_catalog_partition_object(unique_database, "foo_part", "year=2010", |
| cluster_properties) |
| self.__test_json_partition_object(unique_database, "foo_part", "year=2010", |
| cluster_properties) |
| # Test multi-key partition |
| self.__test_catalog_partition_object(unique_database, "foo_multi_part", |
| "year=2024/month=12/day=25", cluster_properties) |
| self.__test_json_partition_object(unique_database, "foo_multi_part", |
| "year=2024/month=12/day=25", cluster_properties) |
| # Test partition value with slash |
| # Note: Pass the pre-encoded partition name that matches Hive's HDFS directory format. |
| # Hive stores "ds=2024/12/25" as directory "ds=2024%2F12%2F25" in HDFS. |
| # The test methods will URL-encode this again for HTTP transmission (double-encoding). |
| self.__test_catalog_partition_object(unique_database, "foo_slash_part", |
| "ds=2024%2F12%2F25", cluster_properties) |
| self.__test_json_partition_object(unique_database, "foo_slash_part", |
| "ds=2024%2F12%2F25", cluster_properties) |
| |
| def __test_catalog_object(self, db_name, tbl_name, cluster_properties): |
| """Tests the /catalog_object endpoint for the given db/table. Runs |
| against an unloaded as well as a loaded table.""" |
| obj_url = self.CATALOG_OBJECT_URL + \ |
| "?object_type=TABLE&object_name={0}.{1}".format(db_name, tbl_name) |
| |
| if cluster_properties.is_catalog_v2_cluster(): |
| impalad_expected_str = \ |
| "No URI handler for '/catalog_object'" |
| self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name)) |
| self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT) |
| # Catalog object endpoint is disabled in local catalog mode. |
| self.check_endpoint_is_disabled(obj_url, impalad_expected_str, |
| ports_to_test=self.IMPALAD_TEST_PORT) |
| self.client.execute("select count(*) from %s.%s" % (db_name, tbl_name)) |
| self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT) |
| self.check_endpoint_is_disabled(obj_url, impalad_expected_str, |
| ports_to_test=self.IMPALAD_TEST_PORT) |
| else: |
| impalad_expected_str = tbl_name |
| self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name)) |
| self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT) |
| self.get_and_check_status(obj_url, impalad_expected_str, |
| ports_to_test=self.IMPALAD_TEST_PORT) |
| self.client.execute("select count(*) from %s.%s" % (db_name, tbl_name)) |
| |
| self.get_and_check_status(obj_url, tbl_name, ports_to_test=self.CATALOG_TEST_PORT) |
| self.get_and_check_status(obj_url, impalad_expected_str, |
| ports_to_test=self.IMPALAD_TEST_PORT) |
| |
| def __test_json_db_object(self, db_name): |
| """Tests the /catalog_object?json endpoint of catalogd for the given db.""" |
| obj_url = self.CATALOG_OBJECT_URL + \ |
| "?json&object_type=DATABASE&object_name={0}".format(db_name) |
| responses = self.get_and_check_status(obj_url, ports_to_test=self.CATALOG_TEST_PORT) |
| obj = json.loads(json.loads(responses[0].text)["json_string"]) |
| assert obj["type"] == 2, "type should be DATABASE" |
| assert "catalog_version" in obj, "TCatalogObject should have catalog_version" |
| db_obj = obj["db"] |
| assert db_obj["db_name"] == db_name |
| assert "metastore_db" in db_obj, "Loaded database should have metastore_db" |
| |
| def __test_json_table_object(self, db_name, tbl_name): |
| """Tests the /catalog_object?json endpoint of catalogd for the given db/table. Runs |
| against an unloaded as well as a loaded table.""" |
| obj_url = self.CATALOG_OBJECT_URL + \ |
| "?json&object_type=TABLE&object_name={0}.{1}".format(db_name, tbl_name) |
| self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name)) |
| responses = self.get_and_check_status(obj_url, ports_to_test=self.CATALOG_TEST_PORT) |
| obj = json.loads(json.loads(responses[0].text)["json_string"]) |
| assert obj["type"] == 3, "type should be TABLE" |
| assert "catalog_version" in obj, "TCatalogObject should have catalog_version" |
| tbl_obj = obj["table"] |
| assert tbl_obj["db_name"] == db_name |
| assert tbl_obj["tbl_name"] == tbl_name |
| assert "hdfs_table" not in tbl_obj, "Unloaded table should not have hdfs_table" |
| |
| self.client.execute("refresh %s.%s" % (db_name, tbl_name)) |
| responses = self.get_and_check_status(obj_url, ports_to_test=self.CATALOG_TEST_PORT) |
| obj = json.loads(json.loads(responses[0].text)["json_string"]) |
| assert obj["type"] == 3, "type should be TABLE" |
| assert "catalog_version" in obj, "TCatalogObject should have catalog_version" |
| tbl_obj = obj["table"] |
| assert tbl_obj["db_name"] == db_name |
| assert tbl_obj["tbl_name"] == tbl_name |
| assert "columns" in tbl_obj, "Loaded TTable should have columns" |
| assert tbl_obj["table_type"] == 0, "table_type should be HDFS_TABLE" |
| assert "metastore_table" in tbl_obj |
| hdfs_tbl_obj = tbl_obj["hdfs_table"] |
| assert "hdfsBaseDir" in hdfs_tbl_obj |
| assert "colNames" in hdfs_tbl_obj |
| assert "nullPartitionKeyValue" in hdfs_tbl_obj |
| assert "nullColumnValue" in hdfs_tbl_obj |
| assert "partitions" in hdfs_tbl_obj |
| assert "prototype_partition" in hdfs_tbl_obj |
| |
| def __verify_catalog_partition_html_response(self, response_text): |
| """Verify HTML catalog partition response contains expected Thrift structures.""" |
| assert "CatalogException" not in response_text, \ |
| "Response should not contain error: " + response_text[:200] |
| assert "TCatalogObject" in response_text, "Response should contain TCatalogObject" |
| assert "THdfsPartition" in response_text, "Response should contain THdfsPartition" |
| assert "THdfsStorageDescriptor" in response_text, \ |
| "Response should contain THdfsStorageDescriptor" |
| |
| def __test_catalog_partition_object(self, db_name, tbl_name, partition_name, |
| cluster_properties): |
| """Tests the /catalog_object endpoint for the given db/table/partition.""" |
| import urllib |
| # URL encode the entire object name (db.table:partition). |
| # This is necessary when partition values contain slashes (e.g., "ds=2024/12/25"). |
| object_name = "{0}.{1}:{2}".format(db_name, tbl_name, partition_name) |
| encoded_object_name = urllib.parse.quote(object_name, safe='') |
| obj_url = self.CATALOG_OBJECT_URL + \ |
| "?object_type=HDFS_PARTITION&object_name={0}".format(encoded_object_name) |
| |
| # Make sure the table is loaded |
| self.client.execute("describe %s.%s" % (db_name, tbl_name)) |
| |
| if cluster_properties.is_catalog_v2_cluster(): |
| # In Catalog V2 (local catalog), endpoint only works on catalogd |
| responses = self.get_and_check_status(obj_url, partition_name, |
| ports_to_test=self.CATALOG_TEST_PORT) |
| self.__verify_catalog_partition_html_response(responses[0].text) |
| # Catalog object endpoint is disabled in local catalog mode on impalad |
| impalad_expected_str = "No URI handler for '/catalog_object'" |
| self.check_endpoint_is_disabled(obj_url, impalad_expected_str, |
| ports_to_test=self.IMPALAD_TEST_PORT) |
| else: |
| # In Catalog V1, endpoint works on both catalogd and impalad |
| responses = self.get_and_check_status(obj_url, partition_name, |
| ports_to_test=self.CATALOG_TEST_PORT) |
| self.__verify_catalog_partition_html_response(responses[0].text) |
| |
| responses = self.get_and_check_status(obj_url, partition_name, |
| ports_to_test=self.IMPALAD_TEST_PORT) |
| self.__verify_catalog_partition_html_response(responses[0].text) |
| |
| def __test_json_partition_object(self, db_name, tbl_name, partition_name, |
| cluster_properties): |
| """Tests the /catalog_object?json endpoint for the given db/table/partition.""" |
| import urllib |
| # URL encode the entire object name (db.table:partition). |
| # This is necessary when partition values contain slashes (e.g., "ds=2024/12/25"). |
| object_name = "{0}.{1}:{2}".format(db_name, tbl_name, partition_name) |
| encoded_object_name = urllib.parse.quote(object_name, safe='') |
| obj_url = self.CATALOG_OBJECT_URL + \ |
| "?json&object_type=HDFS_PARTITION&object_name={0}".format(encoded_object_name) |
| |
| # Make sure the table is loaded |
| self.client.execute("describe %s.%s" % (db_name, tbl_name)) |
| |
| # Test catalogd endpoint (works in both V1 and V2) |
| responses = self.get_and_check_status(obj_url, ports_to_test=self.CATALOG_TEST_PORT) |
| response_json = json.loads(responses[0].text) |
| assert "json_string" in response_json, "Response should contain json_string" |
| obj = json.loads(response_json["json_string"]) |
| assert obj["type"] == 11, "type should be HDFS_PARTITION (11)" |
| assert "catalog_version" in obj, "TCatalogObject should have catalog_version" |
| part_obj = obj["hdfs_partition"] |
| assert part_obj["db_name"] == db_name |
| assert part_obj["tbl_name"] == tbl_name |
| assert part_obj["partition_name"] == partition_name |
| |
| # In Catalog V1, also test impalad endpoint |
| if not cluster_properties.is_catalog_v2_cluster(): |
| responses = self.get_and_check_status(obj_url, ports_to_test=self.IMPALAD_TEST_PORT) |
| response_json = json.loads(responses[0].text) |
| assert "json_string" in response_json, "Response should contain json_string" |
| obj = json.loads(response_json["json_string"]) |
| assert obj["type"] == 11, "type should be HDFS_PARTITION (11)" |
| part_obj = obj["hdfs_partition"] |
| assert part_obj["db_name"] == db_name |
| assert part_obj["tbl_name"] == tbl_name |
| assert part_obj["partition_name"] == partition_name |
| |
| def check_endpoint_is_disabled(self, url, string_to_search="", ports_to_test=None): |
| """Helper method that verifies the given url does not exist.""" |
| if ports_to_test is None: |
| ports_to_test = self.TEST_PORTS_WITH_SS |
| for port in ports_to_test: |
| input_url = url.format(port) |
| response = requests.head(input_url) |
| assert response.status_code == requests.codes.not_found, "URL: {0} Str:'{" \ |
| "1}'\nResp:{2}".format(input_url, string_to_search, response.text) |
| response = requests.get(input_url) |
| assert response.status_code == requests.codes.not_found, "URL: {0} Str:'{" \ |
| "1}'\nResp:{2}".format(input_url, string_to_search, response.text) |
| assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format( |
| input_url, string_to_search, response.text) |
| |
| def __test_table_metrics(self, db_name, tbl_name, metric): |
| self.client.execute("refresh %s.%s" % (db_name, tbl_name)) |
| self.get_and_check_status(self.TABLE_METRICS_URL |
| + "?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT) |
| |
| def __test_catalog_tables_loading_time(self, db_name, tbl_name): |
| """Test the list of tables with the longest loading time in the catalog page. |
| Make sure the table exists. And the table is not empty""" |
| self.client.execute("refresh %s.%s" % (db_name, tbl_name)) |
| self.get_and_check_status(self.CATALOG_URL, |
| "Tables with Longest Metadata Loading Time", ports_to_test=self.CATALOG_TEST_PORT) |
| response = self.get_and_check_status(self.CATALOG_URL + "?json", |
| "longest_loading_tables", ports_to_test=self.CATALOG_TEST_PORT) |
| response_json = json.loads(response[0].text) |
| assert "longest_loading_tables" in response_json, \ |
| "Response {0}".format(response_json) |
| loading_tables = response_json["longest_loading_tables"] |
| assert len(loading_tables) > 0 |
| members = ["median_metadata_loading_time_ns", "max_metadata_loading_time_ns", |
| "p75_loading_time_ns", "p95_loading_time_ns", "p99_loading_time_ns"] |
| for member in members: |
| if member not in loading_tables[0]: |
| assert False, "{0} not in loading tables {1}".format(member, loading_tables) |
| |
| def __test_catalog_tablesfilesusage(self, db_name, tbl_name, numfiles): |
| """Test the list of tables with most number of files in the catalog page. |
| Make sure the loaded table has correct file count.""" |
| self.client.execute("refresh %s.%s" % (db_name, tbl_name)) |
| response = self.get_and_check_status(self.CATALOG_URL, |
| "Tables with Most Number of Files", ports_to_test=self.CATALOG_TEST_PORT) |
| list_file_str = re.search('<table id="high-file-count-tables"( .*?)</table>', |
| response[0].text, re.MULTILINE | re.DOTALL) |
| target_metric = "%s.%s-metric" % (db_name, tbl_name) |
| list_files = re.findall('<tr>(.*?)</tr>', list_file_str.group(0), |
| re.MULTILINE | re.DOTALL) |
| for trow in list_files: |
| # Find the entry for the db table and verify its file count. |
| if re.search(target_metric, trow) is not None: |
| # Get the number following <td> in the entry |
| nfiles = re.search(r'(?<=\<td\>)\d+', trow) |
| assert nfiles.group(0) == numfiles |
| response = self.get_and_check_status(self.CATALOG_URL + "?json", |
| "high_file_count_tables", ports_to_test=self.CATALOG_TEST_PORT) |
| response_json = json.loads(response[0].text) |
| high_filecount_tbls = response_json["high_file_count_tables"] |
| tbl_fname = "%s.%s" % (db_name, tbl_name) |
| assert len(high_filecount_tbls) > 0 |
| # The expected table might not be in the Top-N list, we may not find it |
| # in the list. Just make sure the file count is right if it is in the |
| # list. |
| for tblinfo in high_filecount_tbls: |
| if tblinfo["name"] == tbl_fname: |
| assert tblinfo["num_files"] == int(numfiles) |
| |
| def test_query_stmt(self): |
| """Create a long select query then check if it is truncated in the response json.""" |
| # The input query is a select + 450 "x " long, which is long enough to get truncated. |
| query = "select \"{0}\"".format("x " * 450) |
| # The expected result query should be 253 long and contains the first 250 |
| # chars + "..." |
| expected_result = "select \"{0}...".format("x " * 121) |
| check_if_contains = False |
| (_, response_json) = self.run_query_and_get_debug_page( |
| query, self.QUERIES_URL, expected_state=FINISHED) |
| # Search the json for the expected value. |
| # The query can be in in_flight_queries even though it is in FINISHED state. |
| for json_part in itertools.chain( |
| response_json['completed_queries'], response_json['in_flight_queries']): |
| if expected_result in json_part['stmt']: |
| check_if_contains = True |
| break |
| |
| assert check_if_contains, "No matching statement found in the jsons at {}: {}".format( |
| datetime.now(), json.dumps(response_json, sort_keys=True, indent=4)) |
| |
| def test_failing_ctas(self, unique_database): |
| """Regression test for IMPALA-14791: Verify that a failing CTAS query does not |
| crash Impala when the plan is retrieved.""" |
| target_tbl = unique_database + ".ctas_target_fail" |
| ctas_query = """create table {0} stored as iceberg as |
| select 100000""".format(target_tbl) |
| debug_action = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@' |
| 'IcebergAlreadyExistsException@Table was created concurrently'} |
| (_, response_json) = self.run_query_and_get_debug_page( |
| ctas_query, self.QUERY_PLAN, query_options=debug_action, |
| expected_state='ERROR') |
| # Verify that we have a non-empty plan_json. |
| assert 'plan_json' in response_json |
| assert 'plan_nodes' in response_json['plan_json'] |
| assert 'label' in response_json['plan_json']['plan_nodes'][0] |
| # Summary is empty as execution could not start. |
| assert 'summary' in response_json |
| assert response_json['summary'] == '' |
| # Verify error message |
| assert "Table already exists" in response_json['status'] |
| |
| @pytest.mark.xfail(run=False, reason="IMPALA-8059") |
| def test_backend_states(self, unique_database): |
| """Test that /query_backends returns the list of backend states for DML or |
| queries; nothing for DDL statements""" |
| sleep_query = "select sleep(10000) from functional.alltypes limit 1" |
| ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, sleep_query) |
| running_state = RUNNING |
| backend_state_properties = ['cpu_user_s', 'rpc_latency', 'num_remaining_instances', |
| 'num_instances', 'peak_per_host_mem_consumption', |
| 'time_since_last_heard_from', 'status', 'host', |
| 'cpu_sys_s', 'done', 'bytes_read', 'rpc_size'] |
| |
| for query in [sleep_query, ctas_sleep_query]: |
| (_, response_json) = self.run_query_and_get_debug_page( |
| query, self.QUERY_BACKENDS_URL, expected_state=running_state) |
| |
| assert 'backend_states' in response_json |
| backend_states = response_json['backend_states'] |
| assert len(backend_states) > 0 |
| |
| for backend_state in backend_states: |
| for backend_state_property in backend_state_properties: |
| assert backend_state_property in backend_state |
| assert backend_state['status'] == 'OK' |
| assert not backend_state['done'] |
| |
| (_, response_json) = self.run_query_and_get_debug_page( |
| "describe functional.alltypes", self.QUERY_BACKENDS_URL) |
| assert 'backend_states' not in response_json |
| |
| @pytest.mark.xfail(run=False, reason="IMPALA-8059") |
| def test_backend_instances(self, unique_database, query_options=None): |
| """Test that /query_finstances returns the list of fragment instances for DML or queries; |
| nothing for DDL statements""" |
| sleep_query = "select sleep(10000) from functional.alltypes limit 1" |
| ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, sleep_query) |
| running_state = RUNNING |
| instance_stats_properties = ['fragment_name', 'time_since_last_heard_from', |
| 'current_state', 'first_status_update_received', |
| 'instance_id', 'done'] |
| |
| for query in [sleep_query, ctas_sleep_query]: |
| (_, response_json) = self.run_query_and_get_debug_page( |
| query, self.QUERY_FINSTANCES_URL, query_options=query_options, |
| expected_state=running_state) |
| |
| assert 'backend_instances' in response_json |
| backend_instances = response_json['backend_instances'] |
| assert len(backend_instances) > 0 |
| |
| for backend_instance in backend_instances: |
| assert 'host' in backend_instance |
| assert 'instance_stats' in backend_instance |
| |
| instances_stats = backend_instance['instance_stats'] |
| assert len(instances_stats) > 0 |
| for instance_stats in instances_stats: |
| for instance_stats_property in instance_stats_properties: |
| assert instance_stats_property in instance_stats |
| assert not instance_stats['done'] |
| |
| (_, response_json) = self.run_query_and_get_debug_page( |
| "describe functional.alltypes", self.QUERY_BACKENDS_URL, |
| query_options=query_options) |
| assert 'backend_instances' not in response_json |
| |
| @pytest.mark.xfail(run=False, reason="IMPALA-8059") |
| def test_backend_instances_mt_dop(self, unique_database): |
| """Test that accessing /query_finstances does not crash the backend when running with |
| mt_dop.""" |
| self.test_backend_instances(unique_database, query_options=dict(mt_dop=4)) |
| |
| def test_io_mgr_threads(self): |
| """Test that IoMgr threads have readable names. This test assumed that all systems we |
| support have a disk called 'sda'.""" |
| responses = self.get_and_check_status( |
| self.THREAD_GROUP_URL + "?group=disk-io-mgr&json", ports_to_test=[25000]) |
| assert len(responses) == 1 |
| response_json = json.loads(responses[0].text) |
| # Verify metric keys for each thread |
| for t in response_json['threads']: |
| assert "name" in t |
| assert "id" in t |
| assert "user_s" in t |
| assert "kernel_s" in t |
| assert "iowait_s" in t |
| thread_names = [t["name"] for t in response_json['threads']] |
| expected_name_patterns = ["ADLS remote", "S3 remote", "HDFS remote"] |
| for pattern in expected_name_patterns: |
| assert any(pattern in t for t in thread_names), \ |
| "Could not find thread matching '%s'" % pattern |
| |
| def test_rpc_read_write_metrics(self): |
| """Test that read/write metrics are exposed in /rpcz""" |
| rpcz = self._get_debug_page(self.RPCZ_URL) |
| |
| def create_histogram_regex(value_regex): |
| return ("Count: [0-9]+, sum: " + value_regex |
| + ", min / max: " + value_regex + " / " + value_regex |
| + ", 25th %-ile: " + value_regex |
| + ", 50th %-ile: " + value_regex |
| + ", 75th %-ile: " + value_regex |
| + ", 90th %-ile: " + value_regex |
| + ", 95th %-ile: " + value_regex |
| + ", 99.9th %-ile: " + value_regex) |
| rpc_hist_time_regex = create_histogram_regex("[0-9][0-9numsh.]*") |
| rpc_hist_unit_regex = create_histogram_regex("[0-9][0-9KM.]*") |
| assert len(rpcz['servers']) > 0 |
| for s in rpcz['servers']: |
| for m in s['methods']: |
| assert re.search(rpc_hist_time_regex, m["summary"]) |
| assert re.search(rpc_hist_time_regex, m["read"]) |
| assert re.search(rpc_hist_time_regex, m["write"]) |
| assert re.search(rpc_hist_time_regex, rpcz['reactor_active_latency']) |
| assert re.search(rpc_hist_unit_regex, rpcz['reactor_load_percent']) |
| |
| def test_krpc_rpcz(self): |
| """Test that KRPC metrics are exposed in /rpcz and that they are updated when |
| executing a query.""" |
| TEST_QUERY = "select count(c2.string_col) from \ |
| functional.alltypestiny join functional.alltypessmall c2" |
| SVC_NAME = 'impala.DataStreamService' |
| |
| def is_krpc_use_unix_domain_socket(): |
| rpcz = self._get_debug_page(self.RPCZ_URL) |
| return rpcz['rpc_use_unix_domain_socket'] |
| |
| def get_per_conn_metrics(inbound): |
| """Get inbound or outbound per-connection metrics""" |
| rpcz = self._get_debug_page(self.RPCZ_URL) |
| if inbound: |
| key = "inbound_per_conn_metrics" |
| else: |
| key = "per_conn_metrics" |
| conns = rpcz[key] |
| return conns |
| |
| def get_svc_metrics(svc_name): |
| rpcz = self._get_debug_page(self.RPCZ_URL) |
| assert len(rpcz['services']) > 0 |
| for s in rpcz['services']: |
| if s['service_name'] == svc_name: |
| assert s['rpcs_timed_out_in_queue'] == 0 |
| assert len(s['rpc_method_metrics']) > 0, '%s metrics are empty' % svc_name |
| return sorted(s['rpc_method_metrics'], key=lambda m: m['method_name']) |
| assert False, 'Could not find metrics for %s' % svc_name |
| |
| krpc_use_uds = is_krpc_use_unix_domain_socket() |
| |
| svc_before = get_svc_metrics(SVC_NAME) |
| inbound_before = get_per_conn_metrics(True) |
| outbound_before = get_per_conn_metrics(False) |
| self.client.execute(TEST_QUERY) |
| svc_after = get_svc_metrics(SVC_NAME) |
| inbound_after = get_per_conn_metrics(True) |
| outbound_after = get_per_conn_metrics(False) |
| |
| assert svc_before != svc_after |
| if not krpc_use_uds: |
| assert inbound_before != inbound_after |
| assert outbound_before != outbound_after |
| |
| # Some connections should have metrics after executing query |
| assert len(inbound_after) > 0 |
| assert len(outbound_after) > 0 |
| # Spot-check some fields, including socket stats. |
| for conn in itertools.chain(inbound_after, outbound_after): |
| assert conn["remote_addr"] != "" |
| assert conn["num_calls_in_flight"] >= 0 |
| assert conn["num_calls_in_flight"] == len(conn["calls_in_flight"]) |
| # Check rtt, which should be present in 'struct tcp_info' even in old kernels |
| # like 2.6.32. |
| # Skip these checking if using UDS. |
| if not krpc_use_uds: |
| assert conn["socket_stats"]["rtt"] > 0, conn |
| # send_queue_bytes uses TIOCOUTQ, which is also present in 2.6.32 and even older |
| # kernels. |
| assert conn["socket_stats"]["send_queue_bytes"] >= 0, conn |
| |
| @pytest.mark.execute_serially |
| def test_admission_page(self): |
| """Sanity check for the admission debug page's http end points (both admission and |
| reset stats end points).""" |
| # Make sure at least one query is submitted to the default pool since impala startup, |
| # so that it shows up in the admission control debug page. Checks for both with and |
| # without the pool_name search string. |
| self.client.execute("select 1") |
| response_json = self.__fetch_resource_pools_json() |
| |
| # Find the default pool. It is either "root.default" if a fair-scheduler.xml file |
| # is provided or "default-pool" otherwise. |
| default_pool = None |
| for pool_json in response_json: |
| pool_name = pool_json['pool_name'] |
| if pool_name in ['default-pool', 'root.default']: |
| default_pool = pool_name |
| break |
| assert default_pool is not None, \ |
| "Expected a default pool to be present in {0}".format(response_json) |
| |
| response_json = self.__fetch_resource_pools_json(default_pool) |
| assert response_json[0]['pool_name'] == default_pool |
| |
| # Make sure the reset informational stats endpoint works, both with and without the |
| # pool_name search string. |
| assert response_json[0]['total_admitted'] > 0 |
| self.get_and_check_status( |
| self.RESET_RESOURCE_POOL_STATS_URL + "?pool_name={0}".format(default_pool), |
| ports_to_test=[25000]) |
| response_json = self.__fetch_resource_pools_json(default_pool) |
| assert response_json[0]['total_admitted'] == 0 |
| |
| self.client.execute("select 1") |
| response_json = self.__fetch_resource_pools_json(default_pool) |
| assert response_json[0]['total_admitted'] > 0 |
| self.get_and_check_status(self.RESET_RESOURCE_POOL_STATS_URL, ports_to_test=[25000]) |
| response_json = self.__fetch_resource_pools_json(default_pool) |
| pool_config = response_json[0] |
| assert pool_config['total_admitted'] == 0 |
| |
| # check that metrics exist |
| assert 'max_query_mem_limit' in pool_config |
| assert 'min_query_mem_limit' in pool_config |
| assert 'clamp_mem_limit_query_option' in pool_config |
| |
| def __fetch_resource_pools_json(self, pool_name=None): |
| """Helper method used to fetch the resource pool json from the admission debug page. |
| If a 'pool_name' is passed to this method, it adds the pool_name search string to the |
| http request.""" |
| search_string = "?json" |
| if pool_name is not None: |
| search_string += "&pool_name=" + pool_name |
| responses = self.get_and_check_status(self.ADMISSION_URL + search_string, |
| ports_to_test=[25000]) |
| assert len(responses) == 1 |
| response_json = json.loads(responses[0].text) |
| assert 'resource_pools' in response_json |
| return response_json['resource_pools'] |
| |
| @SkipIfBuildType.remote |
| def test_backends_page(self): |
| """Sanity check for the backends debug page's http end point""" |
| responses = self.get_and_check_status(self.BACKENDS_URL + '?json', |
| ports_to_test=[25000]) |
| assert len(responses) == 1 |
| response_json = json.loads(responses[0].text) |
| assert 'backends' in response_json |
| # When this test runs, all impalads would have already started. |
| assert len(response_json['backends']) == 3 |
| assert response_json['num_active_backends'] == 3 |
| assert 'num_quiescing_backends' not in response_json |
| assert 'num_blacklisted_backends' not in response_json |
| |
| # Look at results for a single backend - they are not sorted. |
| backend_row = response_json['backends'][0] |
| |
| assert len(backend_row['webserver_url']) > 0 |
| webserver_ports = ('25000', '25001', '25002') |
| assert backend_row['webserver_url'].endswith(webserver_ports) |
| |
| # The 'address' column is the backend port of the impalad. |
| assert len(backend_row['address']) > 0 |
| krpc_ports = ('27000', '27001', '27002') |
| assert backend_row['address'].endswith(krpc_ports) |
| |
| # The 'krpc_address' is the krpc address of the impalad. |
| assert len(backend_row['krpc_address']) > 0 |
| krpc_ports = ('27000', '27001', '27002') |
| assert backend_row['krpc_address'].endswith(krpc_ports) |
| |
| assert backend_row['is_coordinator'] |
| assert backend_row['is_executor'] |
| assert not backend_row['is_quiescing'] |
| assert not backend_row['is_blacklisted'] |
| assert len(backend_row['admit_mem_limit']) > 0 |
| assert len(backend_row['process_start_time']) > 0 |
| assert len(backend_row['version']) > 0 |
| |
| def test_download_profile(self): |
| """Test download text profile for a query""" |
| query = "select count(*) from functional.alltypes" |
| query_id = self.client.execute(query).query_id |
| profile_page_url = "{0}query_profile?query_id={1}".format( |
| self.ROOT_URL, query_id) |
| # Check the download tag is there. |
| for profile_format in ["Text", "Json"]: |
| responses = self.get_and_check_status( |
| profile_page_url, profile_format, |
| ports_to_test=self.IMPALAD_TEST_PORT) |
| |
| assert len(responses) == 1 |
| |
| if profile_format == 'Text': |
| download_link = "query_profile_plain_text?query_id={0}".format(query_id) |
| assert download_link in responses[0].text |
| # Get the response from download link and validate it by checking |
| # the query is in the file. |
| responses = self.get_and_check_status( |
| self.ROOT_URL + download_link, query, self.IMPALAD_TEST_PORT) |
| # Check the query id is in the content of the response. |
| assert len(responses) == 1 |
| assert query_id in responses[0].text |
| elif profile_format == 'Json': |
| download_link = "query_profile_json?query_id={0}".format(query_id) |
| assert download_link in responses[0].text |
| # Get the response from download link and validate it by checking |
| # the query is in the file. |
| responses = self.get_and_check_status( |
| self.ROOT_URL + download_link, query, self.IMPALAD_TEST_PORT) |
| |
| assert len(responses) == 1 |
| # Check the return content is valid json |
| try: |
| json_res = json.loads(responses[0].text) |
| except ValueError: |
| assert False, "Downloaded JSON format query profile cannot be parsed. " \ |
| "Json profile:{0}".format(responses[0].text) |
| # Find the query id in json |
| assert query_id in json_res["contents"]["profile_name"], json_res |
| |
| def test_prometheus_metrics(self): |
| """Test to check prometheus metrics""" |
| resp = self.get_and_check_status(self.PROMETHEUS_METRICS_URL) |
| assert len(resp) == 3 |
| # check if metric shows up |
| assert 'impala_statestore_subscriber_heartbeat_interval_time_min' in resp[0].text |
| |
| for response in resp: |
| # Parse Prometheus text format using prometheus_client library. |
| for _ in text_string_to_metric_families(response.text): |
| # We have to loop through the result to force the lazy parsing function |
| # to parse every line. |
| continue |
| |
| def test_healthz_endpoint(self): |
| """Test to check that the /healthz endpoint returns 200 OK.""" |
| for port in self.TEST_PORTS_WITH_SS: |
| page = requests.get(self.HEALTHZ_URL.format(port)) |
| assert page.status_code == requests.codes.ok |
| page = requests.head(self.HEALTHZ_URL.format(port)) |
| assert page.status_code == requests.codes.ok |
| |
| def test_knox_compatibility(self): |
| """Checks that the template files conform to the requirements for compatibility with |
| the Apache Knox service definition.""" |
| # Matches all 'a' links with an 'href' that doesn't start with either '#' (which stays |
| # on the same page and so doesn't need to the hostname) or '{{ __common__.host-url }}' |
| # Note that if we ever need to add a link that doesn't conform to this, we will |
| # probably also have to change the Knox service definition. |
| href_regex = "<(a|link) .*? href=['\"](?!({{ __common__.host-url }})|#)" |
| # Matches all 'script' tags that aren't absolute urls. |
| script_regex = "<script .*?src=['\"](?!({{ __common__.host-url }})|http)" |
| # Matches all 'form' tags that are not followed by including the hidden inputs. |
| form_regex = "<form [^{]*?>(?!{{>www/form-hidden-inputs.tmpl}})" |
| # Matches XMLHttpRequest.open() in javascript that are not followed with make_url(). |
| javascript_regex = r"open\(['\"]GET['\"], (?!make_url)" |
| # Matches urls in json parameters passed to DataTables. |
| datatables_regex = "url: ['\"](?!make_url)" |
| # Matches all references of paths that contain '/www/' but are not fully qualified. |
| path_regex = r"((?<!{{ __common__.host-url }})(?<!make_url\(\")/www/.*)" |
| # Matches all links with an 'href' attribute that doesn't start with 'URL', or '`$', |
| # or '`{{ __common__.host-url }}'. |
| link_regex = r"(.*link.*\.href[\s]=[\s](?!(`{{ __common__.host-url }})|(`\$)|URL))" |
| regex = "(%s)|(%s)|(%s)|(%s)|(%s)|(%s)|(%s)" % \ |
| (href_regex, script_regex, form_regex, javascript_regex, datatables_regex, |
| path_regex, link_regex) |
| results = grep_dir(os.path.join(os.environ['IMPALA_HOME'], "www"), regex, |
| r".*\.tmpl") |
| assert len(results) == 0, \ |
| "All links on the webui must include the webserver host: %s" % results |
| |
| # Check that when Knox integration is not being used, the links are relative, by |
| # checking for the root link from the header. |
| self.get_and_check_status(self.ROOT_URL, "href='/'", self.IMPALAD_TEST_PORT) |
| # Check that if the 'x-forwarded-context' header is present in the request, the links |
| # are written as absolute. |
| self.get_and_check_status(self.ROOT_URL, |
| "href='http://.*:%s/'" % self.IMPALAD_TEST_PORT[0], self.IMPALAD_TEST_PORT, |
| regex=True, headers={'X-Forwarded-Context': '/gateway'}) |
| |
| def test_catalog_operations_endpoint(self): |
| """Test to check that the /operations endpoint returns 200 OK.""" |
| page = requests.get("http://localhost:25020/operations") |
| assert page.status_code == requests.codes.ok |
| page = requests.head("http://localhost:25020/operations") |
| assert page.status_code == requests.codes.ok |
| |
| def test_catalog_operation_fields(self, unique_database): |
| """Verify the CREATE_DATABASE operation is consistent with the statement shown in the |
| /queries page of impalad.""" |
| catalog_operations_page = requests.get("http://localhost:25020/operations?json").text |
| catalog_operations = json.loads(catalog_operations_page) |
| assert "finished_catalog_operations" in catalog_operations |
| |
| queries_page = requests.get("http://localhost:25000/queries?json").text |
| queries = json.loads(queries_page) |
| assert "completed_queries" in queries |
| |
| # Find the CREATE_DATABASE operation in catalogd |
| ts_format = "%Y-%m-%d %H:%M:%S.%f" |
| found = False |
| for op in catalog_operations["finished_catalog_operations"]: |
| if op["target_name"] == unique_database \ |
| and op["catalog_op_name"] == "CREATE_DATABASE": |
| catalog_op_query_id = op["query_id"] |
| catalog_op_user = op["user"] |
| catalog_op_start_time = datetime.strptime(op["start_time"], ts_format) |
| catalog_op_end_time = datetime.strptime(op["finish_time"], ts_format) |
| catalog_op_duration = parse_duration_string_ms(op["duration"]) |
| found = True |
| LOG.info("Found query id in catalog operations: " + catalog_op_query_id) |
| break |
| assert found |
| |
| def verify_query_record(query): |
| assert "CREATE DATABASE" in query["stmt"] |
| assert unique_database in query["stmt"] |
| assert catalog_op_user == query["effective_user"] |
| assert datetime.strptime(query["start_time"], ts_format) <= catalog_op_start_time |
| assert datetime.strptime(query["end_time"], ts_format) >= catalog_op_end_time |
| assert parse_duration_string_ms(query["duration"]) >= catalog_op_duration |
| |
| # Find the query in impalad |
| matched = False |
| for query in queries["completed_queries"]: |
| if query["query_id"] == catalog_op_query_id: |
| verify_query_record(query) |
| matched = True |
| if not matched: |
| LOG.info("Query id {0} not found in the completed queries list".format( |
| catalog_op_query_id)) |
| # Try to find the query in the in-flight queries list. It could be waiting to |
| # be closed. |
| for query in queries["in_flight_queries"]: |
| if query["query_id"] == catalog_op_query_id: |
| verify_query_record(query) |
| matched = True |
| |
| # Dump web pages for debug |
| if not matched: |
| LOG.info("Query id {0} not found in queries page!".format(catalog_op_query_id)) |
| LOG.info("Catalog operations: " + catalog_operations_page) |
| LOG.info("Queries: " + queries_page) |
| assert matched |
| |
| def test_catalog_operation_detail_endpoint(self, unique_database): |
| """Test the /operation_detail endpoint shows detailed information about catalog |
| operations including thrift request, timeline, and byte sizes.""" |
| # Execute a DDL to generate a catalog operation |
| # Use CTAS to generate multiple catalog operations |
| result = self.execute_query( |
| "create table {0}.test_detail as select * from functional.alltypes limit 10" |
| .format(unique_database)) |
| expected_query_id = result.query_id |
| |
| # Get the operation from /operations endpoint |
| catalog_operations_page = requests.get("http://localhost:25020/operations?json").text |
| catalog_operations = json.loads(catalog_operations_page) |
| assert "finished_catalog_operations" in catalog_operations |
| assert len(catalog_operations["finished_catalog_operations"]) > 0 |
| |
| # Find finished CTAS operations - there should be two: |
| # CREATE_TABLE_AS_SELECT and FINALIZE_CREATE_TABLE_AS_SELECT |
| ctas_operations = [] |
| for op in catalog_operations["finished_catalog_operations"]: |
| if expected_query_id in op["query_id"] and \ |
| ("CREATE_TABLE_AS_SELECT" in op["catalog_op_name"] |
| or "FINALIZE_CREATE_TABLE_AS_SELECT" in op["catalog_op_name"]): |
| ctas_operations.append(op) |
| |
| assert len(ctas_operations) == 2, \ |
| "Expected 2 CTAS operations (CREATE_TABLE_AS_SELECT and " \ |
| "FINALIZE_CREATE_TABLE_AS_SELECT), found {0}. " \ |
| "Operations: {1}".format(len(ctas_operations), |
| ", ".join([op["catalog_op_name"] for op in ctas_operations])) |
| |
| # Verify both operations have the correct query_id |
| for test_op in ctas_operations: |
| query_id = test_op["query_id"] |
| thread_id = test_op["thread_id"] |
| start_time_ms = test_op["start_time_ms"] |
| |
| # Verify query_id matches what we got from execute_query |
| assert expected_query_id in query_id, \ |
| "Query ID mismatch: expected {0}, got {1}".format(expected_query_id, query_id) |
| |
| LOG.info("Testing operation detail for query_id: {0}, thread_id: {1}, op: {2}" |
| .format(query_id, thread_id, test_op["catalog_op_name"])) |
| |
| # Test HTML endpoint returns 200 OK (requires start_time_ms for finished operations) |
| html_response = requests.get( |
| "http://localhost:25020/operation_detail?query_id={0}&thread_id={1}" |
| "&start_time_ms={2}".format(query_id, thread_id, start_time_ms)) |
| assert html_response.status_code == requests.codes.ok |
| assert "Operation Information" in html_response.text |
| assert "Execution Timeline" in html_response.text |
| |
| # Test JSON endpoint returns proper data |
| json_response = requests.get( |
| "http://localhost:25020/operation_detail?query_id={0}&thread_id={1}" |
| "&start_time_ms={2}&json".format(query_id, thread_id, start_time_ms)) |
| assert json_response.status_code == requests.codes.ok |
| detail_data = json.loads(json_response.text) |
| |
| # Verify basic operation fields are present |
| assert "query_id" in detail_data |
| assert detail_data["query_id"] == query_id |
| assert "catalog_op_name" in detail_data |
| assert test_op["catalog_op_name"] in detail_data["catalog_op_name"] |
| assert "target_name" in detail_data |
| assert "user" in detail_data |
| assert "status" in detail_data |
| assert "start_time" in detail_data |
| assert "finish_time" in detail_data |
| assert "duration" in detail_data |
| assert "timeline" in detail_data, "timeline field missing" |
| assert "request_size_bytes" in detail_data, "request_size_bytes field missing" |
| assert "response_size_bytes" in detail_data, "response_size_bytes field missing" |
| |
| # Verify the fields contain actual data |
| assert len(detail_data["timeline"]) > 0, "timeline should not be empty" |
| assert detail_data["request_size_bytes"] > 0, \ |
| "request_size_bytes should be positive" |
| assert detail_data["response_size_bytes"] > 0, \ |
| "response_size_bytes should be positive" |
| |
| # Verify timeline is formatted text (pre-formatted on server side) |
| timeline_str = detail_data["timeline"] |
| assert isinstance(timeline_str, str), "timeline should be a formatted string" |
| |
| # Check for expected timeline format from RuntimeProfile |
| assert "Catalog Server Operation:" in timeline_str, \ |
| "timeline should have the operation name" |
| # Check that it contains typical timeline events |
| assert ("Got" in timeline_str or "DDL" in timeline_str |
| or "finished" in timeline_str), "timeline should contain at least one event" |
| |
| def test_catalog_operation_detail_invalid_query_id(self): |
| """Test that /operation_detail handles invalid query_id gracefully.""" |
| # Test with a non-existent query_id and thread_id |
| response = requests.get( |
| "http://localhost:25020/operation_detail" |
| "?query_id=nonexistent:123456789abcd" |
| "&thread_id=999&json" |
| ) |
| assert response.status_code == requests.codes.ok |
| detail_data = json.loads(response.text) |
| assert "error" in detail_data |
| |
| # Test without thread_id (should return error about missing parameter) |
| response = requests.get( |
| "http://localhost:25020/operation_detail?query_id=nonexistent:123456789abcd&json") |
| assert response.status_code == requests.codes.ok |
| detail_data = json.loads(response.text) |
| assert "error" in detail_data |
| assert "thread_id" in detail_data["error"].lower(), \ |
| "Should return error for non-existent query_id" |
| |
| # Test with missing query_id parameter |
| response = requests.get("http://localhost:25020/operation_detail?json") |
| assert response.status_code == requests.codes.ok |
| detail_data = json.loads(response.text) |
| assert "error" in detail_data, \ |
| "Should return error when query_id parameter is missing" |
| |
| # Test with missing start_time_ms for a finished operation |
| # First, execute a simple DDL to get a finished operation |
| self.execute_query("create database if not exists test_invalid_param_db") |
| catalog_operations_page = requests.get("http://localhost:25020/operations?json").text |
| catalog_operations = json.loads(catalog_operations_page) |
| if len(catalog_operations.get("finished_catalog_operations", [])) > 0: |
| finished_op = catalog_operations["finished_catalog_operations"][0] |
| query_id = finished_op["query_id"] |
| thread_id = finished_op["thread_id"] |
| # Try to access finished operation without start_time_ms (should fail) |
| response = requests.get( |
| "http://localhost:25020/operation_detail?query_id={0}&thread_id={1}&json" |
| .format(query_id, thread_id)) |
| assert response.status_code == requests.codes.ok |
| detail_data = json.loads(response.text) |
| assert "error" in detail_data, \ |
| "Should return error when start_time_ms is missing for finished operation" |
| assert "start_time_ms" in detail_data["error"].lower(), \ |
| "Error message should mention start_time_ms" |
| |
| @pytest.mark.execute_serially |
| def test_catalog_operation_detail_in_flight(self, unique_database): |
| """Test that /operation_detail shows real-time timeline for in-flight operations. |
| Uses a debug action to inject a delay in DDL execution.""" |
| # Create a table first, then refresh it with a delay |
| # Use catalogd_refresh_hdfs_listing_delay which triggers during REFRESH |
| self.client.execute("create table {0}.test_in_flight (id int)" |
| .format(unique_database)) |
| |
| delay_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@100" |
| |
| # Start the REFRESH asynchronously |
| refresh_stmt = "refresh {0}.test_in_flight".format(unique_database) |
| # Set the debug action before executing |
| self.client.set_configuration({"debug_action": delay_action}) |
| handle = self.client.execute_async(refresh_stmt) |
| |
| # Get the query_id from the handle |
| expected_query_id = self.client.get_query_id(handle) |
| |
| try: |
| # Poll for the in-flight operation (with timeout) |
| inflight_op = None |
| query_id = None |
| max_attempts = 20 |
| attempt = 0 |
| |
| while attempt < max_attempts and inflight_op is None: |
| attempt += 1 |
| |
| # Get in-flight operations |
| catalog_operations_page = requests.get( |
| "http://localhost:25020/operations?json").text |
| catalog_operations = json.loads(catalog_operations_page) |
| |
| # Find the in-flight REFRESH operation |
| if "inflight_catalog_operations" in catalog_operations: |
| for op in catalog_operations["inflight_catalog_operations"]: |
| if op["catalog_op_name"] == "REFRESH" and \ |
| op["target_name"] == "{0}.test_in_flight".format(unique_database): |
| inflight_op = op |
| query_id = op["query_id"] |
| break |
| |
| # If not found yet, sleep before next attempt |
| if inflight_op is None and attempt < max_attempts: |
| time.sleep(0.1) |
| |
| # Assert that we found the in-flight operation |
| assert inflight_op is not None, \ |
| "In-flight REFRESH operation not found after {0} attempts".format(max_attempts) |
| thread_id = inflight_op["thread_id"] |
| |
| # Verify query_id matches what we got from the handle |
| assert expected_query_id in query_id, \ |
| "Query ID mismatch: expected {0}, got {1}".format(expected_query_id, query_id) |
| |
| LOG.info("Found in-flight operation with query_id: {0}, thread_id: {1}".format( |
| query_id, thread_id)) |
| |
| # Test the operation_detail page for the in-flight operation |
| json_response = requests.get( |
| "http://localhost:25020/operation_detail?query_id={0}&thread_id={1}&json" |
| .format(query_id, thread_id)) |
| assert json_response.status_code == requests.codes.ok |
| detail_data = json.loads(json_response.text) |
| |
| # Verify the operation is in STARTED status |
| assert detail_data["status"] == "STARTED", \ |
| "In-flight operation should have STARTED status" |
| |
| # Verify timeline is present for in-flight operation |
| assert "timeline" in detail_data, \ |
| "Timeline should be present for in-flight operation" |
| assert len(detail_data["timeline"]) > 0, \ |
| "Timeline should not be empty for in-flight operation" |
| |
| # Verify timeline is formatted text (not JSON) |
| timeline_str = detail_data["timeline"] |
| assert isinstance(timeline_str, str), "Timeline should be a formatted string" |
| |
| # Check for expected timeline format from RuntimeProfile |
| assert "Catalog Server Operation:" in timeline_str, \ |
| "Timeline should have the operation name" |
| assert ("Got Metastore client" in timeline_str or "Got catalog version" |
| in timeline_str), "Timeline should contain at least one event" |
| |
| LOG.info("In-flight operation detail test passed") |
| |
| finally: |
| # Wait for the query to complete and close it |
| self.client.wait_for_finished_timeout(handle, 30) |
| self.client.close_query(handle) |
| |
| def test_catalog_metrics(self): |
| """Test /metrics of catalogd""" |
| url = self.METRICS_URL.format(*self.CATALOG_TEST_PORT) + "?json" |
| json_res = json.loads(requests.get(url).text) |
| metric_keys = {m["name"] for m in json_res["metric_group"]["metrics"]} |
| assert "catalog-server.metadata.file.num-loading-threads" in metric_keys |
| assert "catalog-server.metadata.file.num-loading-tasks" in metric_keys |
| assert "catalog-server.metadata.table.num-loading-file-metadata" in metric_keys |
| assert "catalog-server.metadata.table.num-loading-metadata" in metric_keys |
| assert "catalog-server.metadata.table.async-loading.num-in-progress" in metric_keys |
| assert "catalog-server.metadata.table.async-loading.queue-len" in metric_keys |
| assert "catalog.num-databases" in metric_keys |
| assert "catalog.num-tables" in metric_keys |
| assert "catalog.num-functions" in metric_keys |
| assert "catalog.hms-client-pool.num-idle" in metric_keys |
| assert "catalog.hms-client-pool.num-in-use" in metric_keys |
| assert "catalog.num-loaded-tables" in metric_keys |
| |
| def test_iceberg_table_metrics(self): |
| assert '23448' == self.__get_table_metric( |
| "functional_parquet", "iceberg_non_partitioned", "total-file-size-bytes") |
| assert '20' == self.__get_table_metric( |
| "functional_parquet", "iceberg_non_partitioned", "num-files") |
| if supports_storage_ids(): |
| # Target FS has block location information. |
| assert '20' == self.__get_table_metric( |
| "functional_parquet", "iceberg_non_partitioned", "num-blocks") |
| assert '13000' == self.__get_table_metric( |
| "functional_parquet", "iceberg_non_partitioned", "memory-estimate-bytes") |
| else: |
| # Target FS doesn't have block locations, so 'memory-estimate-bytes' differ. |
| assert '10000' == self.__get_table_metric( |
| "functional_parquet", "iceberg_non_partitioned", "memory-estimate-bytes") |
| |
| def __get_table_metric(self, db_name, tbl_name, metric): |
| self.client.execute("refresh %s.%s" % (db_name, tbl_name)) |
| responses = self.get_and_check_status(self.TABLE_METRICS_URL + "?name=%s.%s&json" % |
| (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT) |
| response_json = json.loads(responses[0].text) |
| metrics_text = response_json['table_metrics'] |
| for line in metrics_text.split('\n'): |
| if line.startswith(metric): |
| return line.split(": ")[1] |
| return None |
| |
| def test_query_progress(self): |
| """Tests that /queries page shows query progress.""" |
| query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)" |
| (query_id, response_json) = self.run_query_and_get_debug_page( |
| query, self.QUERIES_URL, expected_state=RUNNING) |
| found = False |
| for json_part in response_json['in_flight_queries']: |
| if query_id in json_part['query_id']: |
| found = True |
| assert json_part["state"] == "RUNNING" |
| assert json_part["waiting"] is False |
| assert json_part["executing"] is True |
| assert json_part["query_progress"] == "0 / 4 ( 0%)" |
| assert found, "Query {} not found in response_json\n{}".format( |
| query_id, json.dumps(response_json, sort_keys=True, indent=4)) |
| |
| # CatalogV2 doesn't have the delay loading metadata after invalidate, so this test |
| # is only applicable to CatalogV1. test_query_cancel_load_tables is sufficient for V2. |
| @SkipIfCatalogV2.catalog_v1_test() |
| @pytest.mark.execute_serially |
| def test_query_cancel_load_metadata(self): |
| """Tests that we can cancel a query in the CREATED state while catalogd loads |
| metadata. Invalidate metadata introduces a delay while catalogd loads metadata.""" |
| impalad = ImpalaCluster.get_e2e_test_cluster().impalads[0].service |
| wait_for_state(impalad, None) |
| result = self.execute_query("invalidate metadata functional_parquet.alltypes") |
| assert result.success |
| |
| # Start the query completely async. The server doesn't return a response until |
| # the query has exited the CREATED state, so we need to get the query ID another way. |
| proc, queue = start(self, "select count(*) from functional_parquet.alltypes") |
| wait_for_state(impalad, 'CREATED') |
| |
| in_flight_queries = impalad.get_debug_webpage_json('queries')['in_flight_queries'] |
| assert len(in_flight_queries) == 1 |
| assert in_flight_queries[0]['state'] == 'CREATED' |
| query_id = in_flight_queries[0]['query_id'] |
| cancel(impalad, query_id) |
| |
| # Verify query was cancelled. Cancel and fetch requests can return before cancellation |
| # is finalized, so wait for the original request to return. |
| assert "UserCancelledException: Query cancelled by user request" in join(proc, queue) |
| wait_for_state(impalad, None) |
| assert_query_stopped(impalad, query_id) |
| |
| response = impalad.read_debug_webpage( |
| "query_profile_plain_text?query_id={}".format(query_id)) |
| assert "Cancelled from Impala's debug web interface by user: " \ |
| "'anonymous' at" in response |
| |
| @pytest.mark.execute_serially |
| def test_query_cancel_load_tables(self): |
| """Tests that we can cancel a query in the CREATED state while loading tables.""" |
| impalad = ImpalaCluster.get_e2e_test_cluster().impalads[0].service |
| wait_for_state(impalad, None) |
| delay_created_action = "impalad_load_tables_delay:SLEEP@5000" |
| |
| # Start the query completely async. The server doesn't return a response until |
| # the query has exited the CREATED state, so we need to get the query ID another way. |
| proc, queue = start(self, "select count(*) from functional_parquet.alltypes", |
| {'debug_action': delay_created_action}) |
| wait_for_state(impalad, 'CREATED') |
| |
| in_flight_queries = impalad.get_debug_webpage_json('queries')['in_flight_queries'] |
| assert len(in_flight_queries) == 1 |
| assert in_flight_queries[0]['state'] == 'CREATED' |
| query_id = in_flight_queries[0]['query_id'] |
| |
| # Call cancel multiple times to ensure it's idempotent. |
| procs = [Process(target=cancel, args=(impalad, query_id)) for _ in range(3)] |
| for proc in procs: |
| proc.start() |
| for proc in procs: |
| proc.join() |
| |
| # Verify query was cancelled. Cancel and fetch requests can return before cancellation |
| # is finalized, so wait for the original request to return and retry get_queries. |
| assert "UserCancelledException: Query cancelled by user request" in join(proc, queue) |
| wait_for_state(impalad, None) |
| assert_query_stopped(impalad, query_id) |
| |
| response = impalad.read_debug_webpage( |
| "query_profile_plain_text?query_id={}".format(query_id)) |
| assert "Cancelled from Impala's debug web interface by user: " \ |
| "'anonymous' at" in response |
| |
| @pytest.mark.execute_serially |
| def test_hadoop_varz_page(self): |
| """test for /hadoop-var to check availability of hadoop configuration like |
| hive warehouse dir, fs.defaultFS""" |
| responses = self.get_and_check_status(self.HADOOP_VARZ_URL, |
| "hive.metastore.warehouse.dir", ports_to_test=self.TEST_PORTS_WITHOUT_SS) |
| responses = self.get_and_check_status(self.HADOOP_VARZ_URL, |
| "hive.metastore.warehouse.external.dir", ports_to_test=self.TEST_PORTS_WITHOUT_SS) |
| responses = self.get_and_check_status(self.HADOOP_VARZ_URL, |
| "fs.defaultFS", ports_to_test=self.TEST_PORTS_WITHOUT_SS) |
| # check if response size is 2 , for both catalog and impalad webUI |
| assert len(responses) == 2 |