| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from tests.common.impala_cluster import ImpalaCluster |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| import json |
| import requests |
| |
| class TestWebPage(ImpalaTestSuite): |
| |
| GET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/get_java_loglevel" |
| SET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/set_java_loglevel" |
| RESET_JAVA_LOGLEVEL_URL = "http://localhost:{0}/reset_java_loglevel" |
| SET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/set_glog_level" |
| RESET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/reset_glog_level" |
| CATALOG_URL = "http://localhost:{0}/catalog" |
| CATALOG_OBJECT_URL = "http://localhost:{0}/catalog_object" |
| TABLE_METRICS_URL = "http://localhost:{0}/table_metrics" |
| QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends" |
| QUERY_FINSTANCES_URL = "http://localhost:{0}/query_finstances" |
| THREAD_GROUP_URL = "http://localhost:{0}/thread-group" |
| # log4j changes do not apply to the statestore since it doesn't |
| # have an embedded JVM. So we make two sets of ports to test the |
| # log level endpoints, one without the statestore port and the |
| # one with it. |
| TEST_PORTS_WITHOUT_SS = ["25000", "25020"] |
| TEST_PORTS_WITH_SS = ["25000", "25010", "25020"] |
| CATALOG_TEST_PORT = ["25020"] |
| |
| def test_memz(self): |
| """test /memz at impalad / statestored / catalogd""" |
| |
| page = requests.get("http://localhost:25000/memz") |
| assert page.status_code == requests.codes.ok |
| page = requests.get("http://localhost:25010/memz") |
| assert page.status_code == requests.codes.ok |
| page = requests.get("http://localhost:25020/memz") |
| assert page.status_code == requests.codes.ok |
| |
| def test_query_profile_encoded_unknown_query_id(self): |
| """Test that /query_profile_encoded error message starts with the expected line in |
| case of missing query and does not contain any leading whitespace. |
| """ |
| cluster = ImpalaCluster() |
| impalad = cluster.get_any_impalad() |
| result = impalad.service.read_debug_webpage("query_profile_encoded?query_id=123") |
| assert result.startswith("Could not obtain runtime profile: Query id") |
| |
| def get_and_check_status(self, url, string_to_search = "", ports_to_test = None): |
| """Helper method that polls a given url and asserts the return code is ok and |
| the response contains the input string.""" |
| if ports_to_test is None: |
| ports_to_test = self.TEST_PORTS_WITH_SS |
| for port in ports_to_test: |
| input_url = url.format(port) |
| response = requests.get(input_url) |
| assert response.status_code == requests.codes.ok\ |
| and string_to_search in response.text, "Offending url: " + input_url |
| return response.text |
| |
| def get_and_check_status_jvm(self, url, string_to_search = ""): |
| """Calls get_and_check_status() for impalad and catalogd only""" |
| return self.get_and_check_status(url, string_to_search, |
| ports_to_test=self.TEST_PORTS_WITHOUT_SS) |
| |
| def test_log_level(self): |
| """Test that the /log_level page outputs are as expected and work well on basic and |
| malformed inputs. This however does not test that the log level changes are actually |
| in effect.""" |
| # Check that the log_level end points are accessible. |
| self.get_and_check_status_jvm(self.GET_JAVA_LOGLEVEL_URL) |
| self.get_and_check_status_jvm(self.SET_JAVA_LOGLEVEL_URL) |
| self.get_and_check_status_jvm(self.RESET_JAVA_LOGLEVEL_URL) |
| self.get_and_check_status(self.SET_GLOG_LOGLEVEL_URL) |
| self.get_and_check_status(self.RESET_GLOG_LOGLEVEL_URL) |
| # Try getting log level of a class. |
| get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class" + |
| "=org.apache.impala.catalog.HdfsTable") |
| self.get_and_check_status_jvm(get_loglevel_url, "DEBUG") |
| |
| # Set the log level of a class to TRACE and confirm the setting is in place |
| set_loglevel_url = (self.SET_JAVA_LOGLEVEL_URL + "?class" + |
| "=org.apache.impala.catalog.HdfsTable&level=trace") |
| self.get_and_check_status_jvm(set_loglevel_url, "Effective log level: TRACE") |
| |
| get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class" + |
| "=org.apache.impala.catalog.HdfsTable") |
| self.get_and_check_status_jvm(get_loglevel_url, "TRACE") |
| # Check the log level of a different class and confirm it is still DEBUG |
| get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class" + |
| "=org.apache.impala.catalog.HdfsPartition") |
| self.get_and_check_status_jvm(get_loglevel_url, "DEBUG") |
| |
| # Reset Java logging levels and check the logging level of the class again |
| self.get_and_check_status_jvm(self.RESET_JAVA_LOGLEVEL_URL, "Java log levels reset.") |
| get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class" + |
| "=org.apache.impala.catalog.HdfsTable") |
| self.get_and_check_status_jvm(get_loglevel_url, "DEBUG") |
| |
| # Set a new glog level and make sure the setting has been applied. |
| set_glog_url = (self.SET_GLOG_LOGLEVEL_URL + "?glog=3") |
| self.get_and_check_status(set_glog_url, "v set to 3") |
| |
| # Try resetting the glog logging defaults again. |
| self.get_and_check_status( self.RESET_GLOG_LOGLEVEL_URL, "v set to ") |
| |
| # Try to get the log level of an empty class input |
| get_loglevel_url = (self.GET_JAVA_LOGLEVEL_URL + "?class=") |
| self.get_and_check_status_jvm(get_loglevel_url) |
| |
| # Same as above, for set log level request |
| set_loglevel_url = (self.SET_JAVA_LOGLEVEL_URL + "?class=") |
| self.get_and_check_status_jvm(get_loglevel_url) |
| |
| # Empty input for setting a glog level request |
| set_glog_url = (self.SET_GLOG_LOGLEVEL_URL + "?glog=") |
| self.get_and_check_status(set_glog_url) |
| |
| # Try setting a non-existent log level on a valid class. In such cases, |
| # log4j automatically sets it as DEBUG. This is the behavior of |
| # Level.toLevel() method. |
| set_loglevel_url = (self.SET_JAVA_LOGLEVEL_URL + "?class" + |
| "=org.apache.impala.catalog.HdfsTable&level=foo&") |
| self.get_and_check_status_jvm(set_loglevel_url, "Effective log level: DEBUG") |
| |
| # Try setting an invalid glog level. |
| set_glog_url = self.SET_GLOG_LOGLEVEL_URL + "?glog=foo" |
| self.get_and_check_status(set_glog_url, "Bad glog level input") |
| |
| # Try a non-existent endpoint on log_level URL. |
| bad_loglevel_url = self.SET_GLOG_LOGLEVEL_URL + "?badurl=foo" |
| self.get_and_check_status(bad_loglevel_url) |
| |
| def test_catalog(self): |
| """Tests the /catalog and /catalog_object endpoints.""" |
| self.get_and_check_status_jvm(self.CATALOG_URL, "functional") |
| self.get_and_check_status_jvm(self.CATALOG_URL, "alltypes") |
| # IMPALA-5028: Test toThrift() of a partitioned table via the WebUI code path. |
| self.__test_catalog_object("functional", "alltypes") |
| self.__test_catalog_object("functional_parquet", "alltypes") |
| self.__test_catalog_object("functional", "alltypesnopart") |
| self.__test_catalog_object("functional_kudu", "alltypes") |
| self.__test_table_metrics("functional", "alltypes", "total-file-size-bytes") |
| self.__test_table_metrics("functional_kudu", "alltypes", "alter-duration") |
| |
| def __test_catalog_object(self, db_name, tbl_name): |
| """Tests the /catalog_object endpoint for the given db/table. Runs |
| against an unloaded as well as a loaded table.""" |
| self.client.execute("invalidate metadata %s.%s" % (db_name, tbl_name)) |
| self.get_and_check_status(self.CATALOG_OBJECT_URL + |
| "?object_type=TABLE&object_name=%s.%s" % (db_name, tbl_name), tbl_name, |
| ports_to_test=self.TEST_PORTS_WITHOUT_SS) |
| self.client.execute("select count(*) from %s.%s" % (db_name, tbl_name)) |
| self.get_and_check_status(self.CATALOG_OBJECT_URL + |
| "?object_type=TABLE&object_name=%s.%s" % (db_name, tbl_name), tbl_name, |
| ports_to_test=self.TEST_PORTS_WITHOUT_SS) |
| |
| def __test_table_metrics(self, db_name, tbl_name, metric): |
| self.client.execute("refresh %s.%s" % (db_name, tbl_name)) |
| self.get_and_check_status(self.TABLE_METRICS_URL + |
| "?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT) |
| |
| def __run_query_and_get_debug_page(self, query, page_url): |
| """Runs a query to obtain the content of the debug page pointed to by page_url, then |
| cancels the query.""" |
| query_handle = self.client.execute_async(query) |
| response_json = "" |
| try: |
| response = self.get_and_check_status( |
| page_url + "?query_id=%s&json" % query_handle.get_handle().id, |
| ports_to_test=[25000]) |
| response_json = json.loads(response) |
| finally: |
| self.client.cancel(query_handle) |
| return response_json |
| |
| def test_backend_states(self, unique_database): |
| """Test that /query_backends returns the list of backend states for DML or queries; |
| nothing for DDL statements""" |
| CROSS_JOIN = ("select count(*) from functional.alltypes a " |
| "CROSS JOIN functional.alltypes b CROSS JOIN functional.alltypes c") |
| for q in [CROSS_JOIN, |
| "CREATE TABLE {0}.foo AS {1}".format(unique_database, CROSS_JOIN), |
| "DESCRIBE functional.alltypes"]: |
| response_json = self.__run_query_and_get_debug_page(q, self.QUERY_BACKENDS_URL) |
| |
| if "DESCRIBE" not in q: |
| assert len(response_json['backend_states']) > 0 |
| else: |
| assert 'backend_states' not in response_json |
| |
| def test_backend_instances(self, unique_database): |
| """Test that /query_finstances returns the list of fragment instances for DML or |
| queries; nothing for DDL statements""" |
| CROSS_JOIN = ("select count(*) from functional.alltypes a " |
| "CROSS JOIN functional.alltypes b CROSS JOIN functional.alltypes c") |
| for q in [CROSS_JOIN, |
| "CREATE TABLE {0}.foo AS {1}".format(unique_database, CROSS_JOIN), |
| "DESCRIBE functional.alltypes"]: |
| response_json = self.__run_query_and_get_debug_page(q, self.QUERY_FINSTANCES_URL) |
| |
| if "DESCRIBE" not in q: |
| assert len(response_json['backend_instances']) > 0 |
| else: |
| assert 'backend_instances' not in response_json |
| |
| def test_io_mgr_threads(self): |
| """Test that IoMgr threads have readable names. This test assumed that all systems we |
| support have a disk called 'sda'.""" |
| response = self.get_and_check_status( |
| self.THREAD_GROUP_URL + "?group=disk-io-mgr&json", ports_to_test=[25000]) |
| response_json = json.loads(response) |
| thread_names = [t["name"] for t in response_json['threads']] |
| expected_name_patterns = ["ADLS remote", "S3 remote", "HDFS remote", "sda"] |
| for pattern in expected_name_patterns: |
| assert any(pattern in t for t in thread_names), \ |
| "Could not find thread matching '%s'" % pattern |