| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from __future__ import absolute_import, division, print_function |
| |
| import os |
| import pytest |
| import string |
| import tempfile |
| |
| from random import choice, randint |
| from signal import SIGRTMIN |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| from tests.common.test_vector import ImpalaTestDimension |
| from tests.util.retry import retry |
| from tests.util.workload_management import assert_query, COMPRESSED_BYTES_SPILLED, \ |
| BYTES_READ_CACHE_TOTAL |
| from time import sleep, time |
| |
| |
| class TestQueryLogTableBase(CustomClusterTestSuite): |
| """Base class for all query log tests. Sets up the tests to use the Beeswax and HS2 |
| client protocols.""" |
| |
| WM_DB = "sys" |
| QUERY_TBL = "{0}.impala_query_log".format(WM_DB) |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestQueryLogTableBase, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax', 'hs2')) |
| |
| |
| class TestQueryLogTableBeeswax(TestQueryLogTableBase): |
| """Tests to assert the query log table is correctly populated when using the Beeswax |
| client protocol.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestQueryLogTableBeeswax, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_constraint(lambda v: |
| v.get_value('protocol') == 'beeswax') |
| |
| CACHE_DIR = tempfile.mkdtemp(prefix="cache_dir") |
| MAX_SQL_PLAN_LEN = 2000 |
| LOG_DIR_MAX_WRITES = tempfile.mkdtemp(prefix="max_writes") |
| FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + str(int(time())) |
| FLUSH_MAX_RECORDS_QUERY_COUNT = 2 |
| OTHER_TBL = "completed_queries_table_{0}".format(int(time())) |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_max_select " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60 " |
| "--query_log_max_sql_length={0} " |
| "--query_log_max_plan_length={0}" |
| .format(MAX_SQL_PLAN_LEN), |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_lower_max_sql_plan(self, vector): |
| """Asserts that lower limits on the sql and plan columns in the completed queries |
| table are respected.""" |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| rand_long_str = "".join(choice(string.ascii_letters) for _ in |
| range(self.MAX_SQL_PLAN_LEN)) |
| |
| try: |
| handle = client.execute_async("select '{0}'".format(rand_long_str)) |
| query_id = handle.get_handle().id |
| client.wait_for_finished_timeout(handle, 10) |
| client.close_query(handle) |
| |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 1, 60) |
| |
| # Force Impala to process the inserts to the completed queries table. |
| client.execute("refresh " + self.QUERY_TBL) |
| |
| res = client.execute("select length(sql),plan from {0} where query_id='{1}'" |
| .format(self.QUERY_TBL, query_id)) |
| assert res.success |
| assert len(res.data) == 1 |
| |
| data = res.data[0].split("\t") |
| assert len(data) == 2 |
| assert int(data[0]) == self.MAX_SQL_PLAN_LEN - 1, "incorrect sql statement length" |
| assert len(data[1]) == self.MAX_SQL_PLAN_LEN - data[1].count("\n") - 1, \ |
| "incorrect plan length" |
| |
| finally: |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_max_select " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_over_max_sql_plan(self, vector): |
| """Asserts that very long queries have their corresponding plan and sql columns |
| shortened in the completed queries table.""" |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| rand_long_str = "".join(choice(string.ascii_letters) for _ in range(16778200)) |
| |
| try: |
| client.set_configuration_option("MAX_STATEMENT_LENGTH_BYTES", 16780000) |
| handle = client.execute_async("select '{0}'".format(rand_long_str)) |
| query_id = handle.get_handle().id |
| client.wait_for_finished_timeout(handle, 10) |
| client.close_query(handle) |
| |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 1, 60) |
| |
| # Force Impala to process the inserts to the completed queries table. |
| client.execute("refresh " + self.QUERY_TBL) |
| |
| client.set_configuration_option("MAX_ROW_SIZE", 35000000) |
| res = client.execute("select length(sql),plan from {0} where query_id='{1}'" |
| .format(self.QUERY_TBL, query_id)) |
| assert res.success |
| assert len(res.data) == 1 |
| data = res.data[0].split("\t") |
| assert len(data) == 2 |
| assert data[0] == "16777215" |
| |
| # Newline characters are not counted by Impala's length function. |
| assert len(data[1]) == 16777216 - data[1].count("\n") - 1 |
| |
| finally: |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_query_hist_1 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60 " |
| "--query_log_size=0 " |
| "--query_log_size_in_bytes=0", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_no_query_log_select(self, vector): |
| """Asserts queries are written to the completed queries table when the query log is |
| turned off.""" |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # Run a select query. |
| random_val = randint(1, 1000000) |
| select_sql = "select {0}".format(random_val) |
| res = client.execute(select_sql) |
| assert res.success |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 1, 60) |
| |
| # Force Impala to process the inserts to the completed queries table. |
| client.execute("refresh " + self.QUERY_TBL) |
| |
| actual = client.execute("select sql from {0} where query_id='{1}'".format( |
| self.QUERY_TBL, res.query_id)) |
| assert actual.success |
| assert len(actual.data) == 1 |
| assert actual.data[0] == select_sql |
| finally: |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_query_hist_2 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60 " |
| "--always_use_data_cache " |
| "--data_cache={0}:5GB".format(CACHE_DIR), |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True, |
| cluster_size=1) |
| def test_query_log_table_query_cache(self, vector): |
| """Asserts the values written to the query log table match the values from the |
| query profile. Specifically focuses on the data cache metrics.""" |
| tbl_name = "default.test_query_log_cache" + str(int(time())) |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # Create the test table. |
| create_tbl_sql = "create table {0} (id INT, product_name STRING) " \ |
| "partitioned by (category INT)".format(tbl_name) |
| create_tbl_results = client.execute(create_tbl_sql) |
| assert create_tbl_results.success |
| |
| # Insert some rows into the test table. |
| insert_sql = "insert into {0} (id,category,product_name) VALUES ".format(tbl_name) |
| for i in range(1, 11): |
| for j in range(1, 11): |
| if i * j > 1: |
| insert_sql += "," |
| |
| random_product_name = "".join(choice(string.ascii_letters) |
| for _ in range(10)) |
| insert_sql += "({0},{1},'{2}')".format((i * j), i, random_product_name) |
| |
| insert_results = client.execute(insert_sql) |
| assert insert_results.success |
| |
| # Select all rows from the test table. Run the query multiple times to ensure data |
| # is cached. |
| select_sql = "select * from {0}".format(tbl_name) |
| for i in range(3): |
| res = client.execute(select_sql) |
| assert res.success |
| sleep(1) |
| |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 5, 60) |
| |
| # Allow some time for the cache to be written to disk. |
| sleep(10) |
| |
| # Run the same query again so results are read from the data cache. |
| res = client.execute(select_sql, fetch_profile_after_close=True) |
| assert res.success |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 6, 60) |
| |
| data = assert_query(self.QUERY_TBL, client, "test_query_hist_2", |
| res.runtime_profile) |
| |
| # Since the assert_query function only asserts that the bytes read from cache |
| # column is equal to the bytes read from cache in the profile, there is a potential |
| # for this test to not actually assert anything different than other tests. Thus, an |
| # additional assert is needed to ensure that there actually was data read from the |
| # cache. |
| assert data[BYTES_READ_CACHE_TOTAL] != "0", "bytes read from cache total was " \ |
| "zero, test did not assert anything" |
| finally: |
| client.execute("drop table if exists {0}".format(tbl_name)) |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=5 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60 " |
| "--log_dir={0}" |
| .format(LOG_DIR_MAX_WRITES), |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_max_attempts_exceeded(self, vector): |
| """Asserts that completed queries are only attempted 3 times to be inserted into the |
| completed queries table. This test deletes the completed queries table thus it must |
| not come last otherwise the table stays deleted. Subsequent tests will re-create |
| the table.""" |
| |
| print("USING LOG DIRECTORY: {0}".format(self.LOG_DIR_MAX_WRITES)) |
| |
| impalad = self.cluster.get_first_impalad() |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| res = client.execute("drop table {0} purge".format(self.QUERY_TBL)) |
| assert res.success |
| impalad.service.wait_for_metric_value( |
| "impala-server.completed-queries.scheduled-writes", 3, 60) |
| impalad.service.wait_for_metric_value("impala-server.completed-queries.failure", 3, |
| 60) |
| |
| query_count = 0 |
| |
| # Allow time for logs to be written to disk. |
| sleep(5) |
| |
| with open(os.path.join(self.LOG_DIR_MAX_WRITES, "impalad.ERROR")) as file: |
| for line in file: |
| if line.find('could not write completed query table="{0}" query_id="{1}"' |
| .format(self.QUERY_TBL, res.query_id)) >= 0: |
| query_count += 1 |
| |
| assert query_count == 1 |
| |
| assert impalad.service.get_metric_value( |
| "impala-server.completed-queries.max-records-writes") == 0 |
| assert impalad.service.get_metric_value( |
| "impala-server.completed-queries.queued") == 0 |
| assert impalad.service.get_metric_value( |
| "impala-server.completed-queries.failure") == 3 |
| assert impalad.service.get_metric_value( |
| "impala-server.completed-queries.scheduled-writes") == 4 |
| assert impalad.service.get_metric_value( |
| "impala-server.completed-queries.written") == 0 |
| finally: |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_max_queued={0} " |
| "--query_log_write_interval_s=9999 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60 " |
| "--cluster_id={1}" |
| .format(FLUSH_MAX_RECORDS_QUERY_COUNT, |
| FLUSH_MAX_RECORDS_CLUSTER_ID), |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_flush_max_records(self, vector): |
| """Asserts that queries that have completed are written to the query log table when |
| the maximum number of queued records it reached.""" |
| |
| impalad = self.cluster.get_first_impalad() |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| rand_str = "{0}-{1}".format(vector.get_value('protocol'), time()) |
| |
| test_sql = "select '{0}','{1}'".format(rand_str, |
| self.FLUSH_MAX_RECORDS_CLUSTER_ID) |
| test_sql_assert = "select '{0}', count(*) from {1} where sql='{2}'".format( |
| rand_str, self.QUERY_TBL, test_sql.replace("'", r"\'")) |
| |
| try: |
| for _ in range(0, self.FLUSH_MAX_RECORDS_QUERY_COUNT): |
| res = client.execute(test_sql) |
| assert res.success |
| |
| # Running this query results in the number of queued completed queries to exceed |
| # the max and thus all completed queries will be written to the query log table. |
| res = client.execute(test_sql_assert) |
| assert res.success |
| assert 1 == len(res.data) |
| assert "0" == res.data[0].split("\t")[1] |
| |
| # Wait until the completed queries have all been written out because the max queued |
| # count was exceeded. |
| impalad.service.wait_for_metric_value( |
| "impala-server.completed-queries.max-records-writes", 1, 60) |
| |
| # Force Impala to process the inserts to the completed queries table. |
| sleep(5) |
| client.execute("refresh " + self.QUERY_TBL) |
| |
| # This query will remain queued due to the long write interval and max queued |
| # records limit not being reached. |
| res = client.execute(r"select count(*) from {0} where sql like 'select \'{1}\'%'" |
| .format(self.QUERY_TBL, rand_str)) |
| assert res.success |
| assert 1 == len(res.data) |
| assert "3" == res.data[0] |
| impalad.service.wait_for_metric_value( |
| "impala-server.completed-queries.queued", 2, 60) |
| finally: |
| client.close() |
| |
| assert impalad.service.get_metric_value( |
| "impala-server.completed-queries.max-records-writes") == 1 |
| assert impalad.service.get_metric_value( |
| "impala-server.completed-queries.scheduled-writes") == 0 |
| assert impalad.service.get_metric_value("impala-server.completed-queries.written") \ |
| == self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1 |
| assert impalad.service.get_metric_value( |
| "impala-server.completed-queries.queued") == 2 |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=30 " |
| "--blacklisted_dbs=information_schema " |
| "--query_log_table_name={0}" |
| .format(OTHER_TBL), |
| catalogd_args="--enable_workload_mgmt " |
| "--blacklisted_dbs=information_schema", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_different_table(self, vector): |
| """Asserts that queries that have completed but are not yet written to the query |
| log table are flushed to the table before the coordinator exits.""" |
| |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| res = client.execute("show tables in {0}".format(self.WM_DB)) |
| assert res.success |
| assert len(res.data) > 0, "could not find any tables in database {0}" \ |
| .format(self.DB) |
| |
| tbl_found = False |
| for tbl in res.data: |
| if tbl.startswith(self.OTHER_TBL): |
| tbl_found = True |
| break |
| assert tbl_found, "could not find table '{0}' in database '{1}'" \ |
| .format(self.OTHER_TBL, self.DB) |
| finally: |
| client.execute("drop table {0}.{1} purge".format(self.WM_DB, self.OTHER_TBL)) |
| client.close() |
| |
| |
| class TestQueryLogTableHS2(TestQueryLogTableBase): |
| """Tests to assert the query log table is correctly populated when using the HS2 |
| client protocol.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestQueryLogTableHS2, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_constraint(lambda v: |
| v.get_value('protocol') == 'hs2') |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_query_hist_mult " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60", |
| catalogd_args="--enable_workload_mgmt", |
| cluster_size=2, |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_query_multiple(self, vector): |
| """Asserts the values written to the query log table match the values from the |
| query profile for a query that reads from multiple tables.""" |
| tbl_name = "default.test_query_log_" + str(int(time())) |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # Create the first test table. |
| create_tbl_sql = "create table {0}_products (id INT, product_name STRING)" \ |
| .format(tbl_name) |
| create_tbl_results = client.execute(create_tbl_sql) |
| assert create_tbl_results.success |
| |
| # Insert some rows into the test products table. |
| insert_sql = "insert into {0}_products (id,product_name) VALUES ".format(tbl_name) |
| for i in range(1, 11): |
| for j in range(1, 11): |
| if i * j > 1: |
| insert_sql += "," |
| |
| random_product_name = "".join(choice(string.ascii_letters) for _ in range(10)) |
| insert_sql += "({0},'{1}')".format((i * j), random_product_name) |
| |
| insert_results = client.execute(insert_sql) |
| assert insert_results.success |
| |
| # Create the second test table. |
| create_tbl_sql = "create table {0}_customers (id INT, name STRING) " \ |
| .format(tbl_name) |
| create_tbl_results = client.execute(create_tbl_sql) |
| assert create_tbl_results.success |
| |
| # Insert rows into the test customers table. |
| insert_sql = "insert into {0}_customers (id,name) VALUES ".format(tbl_name) |
| for i in range(1, 11): |
| if i > 1: |
| insert_sql += "," |
| rand_cust_name = "".join(choice(string.ascii_letters) for _ in range(10)) |
| insert_sql += "({0},'{1}')".format(i, rand_cust_name) |
| |
| insert_results = client.execute(insert_sql) |
| assert insert_results.success |
| |
| # Create the third test table. |
| create_tbl_sql = "create table {0}_sales (id INT, product_id INT, " \ |
| "customer_id INT) ".format(tbl_name) |
| create_tbl_results = client.execute(create_tbl_sql) |
| assert create_tbl_results.success |
| |
| # Insert rows into the test sales table. |
| insert_sql = "insert into {0}_sales (id, product_id, customer_id) VALUES " \ |
| .format(tbl_name) |
| for i in range(1, 1001): |
| if i != 1: |
| insert_sql += "," |
| insert_sql += "({0},{1},{2})".format(i * j, randint(1, 100), randint(1, 10)) |
| |
| insert_results = client.execute(insert_sql) |
| assert insert_results.success |
| |
| # Select all rows from the test table. |
| client.set_configuration_option("MAX_MEM_ESTIMATE_FOR_ADMISSION", "10MB") |
| res = client.execute("select s.id, p.product_name, c.name from {0}_sales s " |
| "inner join {0}_products p on s.product_id=p.id " |
| "inner join {0}_customers c on s.customer_id=c.id".format(tbl_name), |
| fetch_profile_after_close=True) |
| assert res.success |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 7, 60) |
| |
| client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol')) |
| assert client2 is not None |
| assert_query(self.QUERY_TBL, client2, "test_query_hist_mult", res.runtime_profile, |
| max_mem_for_admission=10485760) |
| finally: |
| client.execute("drop table if exists {0}_sales".format(tbl_name)) |
| client.execute("drop table if exists {0}_customers".format(tbl_name)) |
| client.execute("drop table if exists {0}_products".format(tbl_name)) |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_query_hist_3 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_query_insert_select(self, vector): |
| """Asserts the values written to the query log table match the values from the |
| query profile for a query that insert selects.""" |
| tbl_name = "default.test_query_log_insert_select" + str(int(time())) |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # Create the source test table. |
| assert client.execute("create table {0}_source (id INT, product_name STRING) " |
| .format(tbl_name)).success, "could not create source table" |
| |
| # Insert some rows into the test table. |
| insert_sql = "insert into {0}_source (id,product_name) VALUES " \ |
| .format(tbl_name) |
| for i in range(1, 100): |
| if i > 1: |
| insert_sql += "," |
| |
| random_product_name = "".join(choice(string.ascii_letters) |
| for _ in range(10)) |
| insert_sql += "({0},'{1}')".format(i, random_product_name) |
| |
| assert client.execute(insert_sql).success, "could not insert rows" |
| |
| # Create the destination test table. |
| assert client.execute("create table {0}_dest (id INT, product_name STRING) " |
| .format(tbl_name)).success, "could not create destination table" |
| |
| # Insert select from the source table to the destination table. |
| res = client.execute("insert into {0}_dest (id, product_name) select id, " |
| "product_name from {0}_source".format(tbl_name), fetch_profile_after_close=True) |
| assert res.success, "could not insert select" |
| |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 4, 60) |
| |
| client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) |
| assert client2 is not None |
| assert_query(self.QUERY_TBL, client2, "test_query_hist_3", res.runtime_profile) |
| finally: |
| client.execute("drop table if exists {0}_source".format(tbl_name)) |
| client.execute("drop table if exists {0}_dest".format(tbl_name)) |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=15 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_flush_interval(self, vector): |
| """Asserts that queries that have completed are written to the query log table |
| after the specified write interval elapses.""" |
| |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| query_count = 10 |
| |
| for i in range(query_count): |
| res = client.execute("select sleep(1000)") |
| assert res.success |
| |
| # At least 10 seconds have already elapsed, wait up to 10 more seconds for the |
| # queries to be written to the completed queries table. |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", query_count, 10) |
| finally: |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=9999 " |
| "--shutdown_grace_period_s=30 " |
| "--shutdown_deadline_s=30", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=False) |
| def test_query_log_table_flush_on_shutdown(self, vector): |
| """Asserts that queries that have completed but are not yet written to the query |
| log table are flushed to the table before the coordinator exits.""" |
| |
| impalad = self.cluster.get_first_impalad() |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # Execute sql statements to ensure all get written to the query log table. |
| sql1 = client.execute("select 1") |
| assert sql1.success |
| |
| sql2 = client.execute("select 2") |
| assert sql2.success |
| |
| sql3 = client.execute("select 3") |
| assert sql3.success |
| |
| impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 3, |
| 60) |
| |
| impalad.kill_and_wait_for_exit(SIGRTMIN) |
| |
| client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol')) |
| |
| def assert_func(last_iteration): |
| results = client2.execute("select query_id,sql from {0} where query_id in " |
| "('{1}','{2}','{3}')".format(self.QUERY_TBL, |
| sql1.query_id, sql2.query_id, sql3.query_id)) |
| |
| success = len(results.data) == 3 |
| if last_iteration: |
| assert len(results.data) == 3 |
| |
| return success |
| |
| assert retry(func=assert_func, max_attempts=5, sleep_time_s=5) |
| finally: |
| client.close() |
| client2.close() |
| |
| |
| class TestQueryLogTableAll(TestQueryLogTableBase): |
| """Tests to assert the query log table is correctly populated when using all the |
| client protocols.""" |
| |
| SCRATCH_DIR = tempfile.mkdtemp(prefix="scratch_dir") |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_query_hist_2 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_ddl(self, vector): |
| """Asserts the values written to the query log table match the values from the |
| query profile for a DDL query.""" |
| tbl_name = "default.test_query_log_ddl_" + str(int(time())) |
| create_tbl_sql = "create table {0} (id INT, product_name STRING) " \ |
| "partitioned by (category INT)".format(tbl_name) |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| res = client.execute(create_tbl_sql, fetch_profile_after_close=True) |
| assert res.success |
| |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 1, 60) |
| |
| client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) |
| assert client2 is not None |
| assert_query(self.QUERY_TBL, client2, "test_query_hist_2", res.runtime_profile) |
| finally: |
| client.execute("drop table if exists {0}".format(tbl_name)) |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_query_hist_3 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_dml(self, vector): |
| """Asserts the values written to the query log table match the values from the |
| query profile for a DML query.""" |
| tbl_name = "default.test_query_log_dml_" + str(int(time())) |
| create_tbl_sql = "create table {0} (id INT, product_name STRING) " \ |
| "partitioned by (category INT)".format(tbl_name) |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # Create the test table. |
| create_tbl_sql = "create table {0} (id INT, product_name STRING) " \ |
| "partitioned by (category INT)".format(tbl_name) |
| create_tbl_results = client.execute(create_tbl_sql) |
| assert create_tbl_results.success |
| |
| insert_sql = "insert into {0} (id,category,product_name) values " \ |
| "(0,1,'the product')".format(tbl_name) |
| res = client.execute(insert_sql, fetch_profile_after_close=True) |
| assert res.success |
| |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", 2, 60) |
| |
| client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) |
| assert client2 is not None |
| assert_query(self.QUERY_TBL, client2, "test_query_hist_3", res.runtime_profile) |
| finally: |
| client.execute("drop table if exists {0}".format(tbl_name)) |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_query_hist_1 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60 " |
| "--scratch_dirs={0}:5G" |
| .format(SCRATCH_DIR), |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| @pytest.mark.parametrize("buffer_pool_limit", [(None), ("16.05MB")]) |
| def test_query_log_table_query_select(self, vector, buffer_pool_limit): |
| """Asserts the values written to the query log table match the values from the |
| query profile. If the buffer_pool_limit parameter is not None, then this test |
| requires that the query spills to disk to assert that the spill metrics are correct |
| in the completed queries table.""" |
| tbl_name = "default.test_query_log_" + str(int(time())) |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| query_cnt = 0 |
| |
| try: |
| # Create the test table. |
| create_tbl_sql = "create table {0} (id INT, product_name STRING, create_dt STRING" \ |
| ",descr STRING) partitioned by (category INT)".format(tbl_name) |
| print("CREATE TABLE SQL: {0}".format(create_tbl_sql)) |
| create_tbl_results = client.execute(create_tbl_sql) |
| assert create_tbl_results.success |
| query_cnt += 1 |
| |
| # Insert some rows into the test table. |
| def __run_insert(values): |
| insert_results = client.execute("insert into {0} (id,category,product_name," |
| "create_dt,descr) VALUES ({1})".format(tbl_name, values)) |
| assert insert_results.success |
| |
| # When buffer pool limit is not None, the test is forcing the query to spill. Thus, |
| # a large number of records is needed to force the spilling. |
| record_count_to_insert = 99 |
| if buffer_pool_limit is not None: |
| record_count_to_insert = 24999 |
| |
| insert_vals = "" |
| for i in range(1, record_count_to_insert): |
| random_product_name = "".join(choice(string.ascii_letters) for _ in range(100)) |
| random_dt = "{:}-{:0>2}-{:0>2}".format(randint(1982, 2022), randint(1, 12), |
| randint(1, 31)) |
| random_desc = "".join(choice(string.ascii_letters) for _ in range(1000)) |
| insert_vals += "({0},{1},'{2}','{3}','{4}'),".format(i, (i % 50), |
| random_product_name, random_dt, random_desc) |
| |
| if i % 500 == 0: |
| __run_insert(insert_vals[:-1]) |
| query_cnt += 1 |
| insert_vals = "" |
| |
| __run_insert(insert_vals[:-1]) |
| query_cnt += 1 |
| |
| # Set up query configuration |
| client.set_configuration_option("MAX_MEM_ESTIMATE_FOR_ADMISSION", "10MB") |
| if buffer_pool_limit is not None: |
| client.set_configuration_option("BUFFER_POOL_LIMIT", buffer_pool_limit) |
| client.set_configuration_option("SPOOL_QUERY_RESULTS", "TRUE") |
| |
| # Select all rows from the test table. |
| res = client.execute("select * from {0} order by create_dt".format(tbl_name), |
| fetch_profile_after_close=True) |
| assert res.success |
| query_cnt += 1 |
| |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", query_cnt, 60) |
| |
| client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) |
| assert client2 is not None |
| data = assert_query(self.QUERY_TBL, client2, "test_query_hist_1", |
| res.runtime_profile, max_mem_for_admission=10485760) |
| |
| if buffer_pool_limit is not None: |
| # Since the assert_query function only asserts that the compressed bytes spilled |
| # column is equal to the compressed bytes spilled in the profile, there is a |
| # potential for this test to not actually assert anything different than other |
| # tests. Thus, an additional assert is needed to ensure that there actually was |
| # data that was spilled. |
| assert data[COMPRESSED_BYTES_SPILLED] != "0", "compressed bytes spilled total " \ |
| "was zero, test did not assert anything" |
| finally: |
| client.execute("drop table if exists {0}".format(tbl_name)) |
| client.close() |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--cluster_id=test_query_hist_2 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60 ", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_invalid_query(self, vector): |
| """Asserts correct values are written to the completed queries table for a failed |
| query. The query profile is used as the source of expected values.""" |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| # Assert an invalid query |
| unix_now = time() |
| try: |
| client.execute("{0}".format(unix_now)) |
| except Exception as _: |
| pass |
| |
| # Get the query id from the completed queries table since the call to execute errors |
| # instead of return the results object which contains the query id. |
| impalad = self.cluster.get_first_impalad() |
| impalad.service.wait_for_metric_value("impala-server.completed-queries.written", 1, |
| 60) |
| |
| result = client.execute("select query_id from {0} where sql='{1}'" |
| .format(self.QUERY_TBL, unix_now), |
| fetch_profile_after_close=True) |
| assert result.success |
| assert len(result.data) == 1 |
| |
| assert_query(query_tbl=self.QUERY_TBL, client=client, |
| expected_cluster_id="test_query_hist_2", impalad=impalad, query_id=result.data[0]) |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_ignored_sqls(self, vector): |
| """Asserts that expected queries are not written to the query log table.""" |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| sqls = {} |
| sqls["use default"] = False |
| sqls["USE default"] = False |
| sqls["uSe default"] = False |
| sqls["--mycomment\nuse default"] = False |
| sqls["/*mycomment*/ use default"] = False |
| |
| sqls["set all"] = False |
| sqls["SET all"] = False |
| sqls["SeT all"] = False |
| sqls["--mycomment\nset all"] = False |
| sqls["/*mycomment*/ set all"] = False |
| |
| sqls["show tables"] = False |
| sqls["SHOW tables"] = False |
| sqls["ShoW tables"] = False |
| sqls["ShoW create table {0}".format(self.QUERY_TBL)] = False |
| sqls["show databases"] = False |
| sqls["SHOW databases"] = False |
| sqls["ShoW databases"] = False |
| sqls["show schemas"] = False |
| sqls["SHOW schemas"] = False |
| sqls["ShoW schemas"] = False |
| sqls["--mycomment\nshow tables"] = False |
| sqls["/*mycomment*/ show tables"] = False |
| sqls["/*mycomment*/ show tables"] = False |
| sqls["/*mycomment*/ show create table {0}".format(self.QUERY_TBL)] = False |
| sqls["/*mycomment*/ show files in {0}".format(self.QUERY_TBL)] = False |
| sqls["/*mycomment*/ show functions"] = False |
| sqls["/*mycomment*/ show data sources"] = False |
| sqls["/*mycomment*/ show views"] = False |
| |
| sqls["describe database default"] = False |
| sqls["/*mycomment*/ describe database default"] = False |
| sqls["describe {0}".format(self.QUERY_TBL)] = False |
| sqls["/*mycomment*/ describe {0}".format(self.QUERY_TBL)] = False |
| sqls["describe history {0}".format(self.QUERY_TBL)] = False |
| sqls["/*mycomment*/ describe history {0}".format(self.QUERY_TBL)] = False |
| sqls["select 1"] = True |
| |
| control_queries_count = 0 |
| try: |
| for sql, experiment_control in sqls.items(): |
| results = client.execute(sql) |
| assert results.success, "could not execute query '{0}'".format(sql) |
| sqls[sql] = results.query_id |
| |
| # Ensure at least one sql statement was written to the completed queries table |
| # to avoid false negatives where the sql statements that are ignored are not |
| # written to the completed queries table because of another issue. |
| if experiment_control: |
| control_queries_count += 1 |
| sql_results = None |
| for _ in range(6): |
| sql_results = client.execute("select * from {0} where query_id='{1}'".format( |
| self.QUERY_TBL, results.query_id)) |
| control_queries_count += 1 |
| if sql_results.success and len(sql_results.data) == 1: |
| break |
| else: |
| sleep(5) |
| assert sql_results.success |
| assert len(sql_results.data) == 1, "query not found in completed queries table" |
| sqls.pop(sql) |
| |
| for sql, query_id in sqls.items(): |
| log_results = client.execute("select * from {0} where query_id='{1}'" |
| .format(self.QUERY_TBL, query_id)) |
| assert log_results.success |
| assert len(log_results.data) == 0, "found query in query log table: {0}" \ |
| .format(sql) |
| finally: |
| client.close() |
| |
| # Assert there was one query per sql item written to the query log table. The queries |
| # inserted into the completed queries table are the queries used to assert the ignored |
| # queries were not written to the table. |
| self.cluster.get_first_impalad().service.wait_for_metric_value( |
| "impala-server.completed-queries.written", len(sqls) + control_queries_count, 60) |
| assert self.cluster.get_first_impalad().service.get_metric_value( |
| "impala-server.completed-queries.failure") == 0 |
| |
| @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " |
| "--query_log_write_interval_s=1 " |
| "--shutdown_grace_period_s=10 " |
| "--shutdown_deadline_s=60", |
| catalogd_args="--enable_workload_mgmt", |
| impalad_graceful_shutdown=True) |
| def test_query_log_table_sql_injection(self, vector): |
| tbl_name = "default.test_query_log_sql_injection_" + str(int(time())) |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # Create the test table. |
| create_tbl_sql = "create table {0} (id INT, product_name STRING) " \ |
| "partitioned by (category INT)".format(tbl_name) |
| create_tbl_results = client.execute(create_tbl_sql) |
| assert create_tbl_results.success |
| |
| # Insert some rows into the test table. |
| insert_sql = "insert into {0} (id,category,product_name) VALUES ".format(tbl_name) |
| for i in range(1, 11): |
| for j in range(1, 11): |
| if i * j > 1: |
| insert_sql += "," |
| |
| insert_sql += "({0},{1},'{2}')".format((i * j), i, |
| "product-{0}-{1}".format(i, j)) |
| |
| insert_results = client.execute(insert_sql) |
| assert insert_results.success |
| |
| impalad = self.cluster.get_first_impalad() |
| |
| # Try a sql injection attack with closing quotes. |
| sql1_str = "select * from {0} where product_name='product-2-3'".format(tbl_name) |
| self.__run_sql_inject(impalad, client, sql1_str, "closing quotes", 3) |
| |
| # Try a sql inject attack with terminating quote and semicolon. |
| sql2_str = "select 1'); drop table {0}; select('" \ |
| .format(self.QUERY_TBL) |
| self.__run_sql_inject(impalad, client, sql2_str, "terminating semicolon", 6) |
| |
| # Attempt to cause an error using multiline comments. |
| sql3_str = "select 1' /* foo" |
| self.__run_sql_inject(impalad, client, sql3_str, "multiline comments", 9, False) |
| |
| # Attempt to cause an error using single line comments. |
| sql4_str = "select 1' -- foo" |
| self.__run_sql_inject(impalad, client, sql4_str, "single line comments", 12, False) |
| |
| finally: |
| client.execute("drop table if exists {0}".format(tbl_name)) |
| client.close() |
| |
| def __run_sql_inject(self, impalad, client, sql, test_case, expected_writes, |
| expect_success=True): |
| sql_result = None |
| try: |
| sql_result = client.execute(sql) |
| except Exception as e: |
| if expect_success: |
| raise e |
| |
| if expect_success: |
| assert sql_result.success |
| |
| impalad.service.wait_for_metric_value( |
| "impala-server.completed-queries.written", expected_writes, 60) |
| |
| # Force Impala to process the inserts to the completed queries table. |
| client.execute("refresh " + self.QUERY_TBL) |
| |
| if expect_success: |
| sql_verify = client.execute( |
| "select sql from {0} where query_id='{1}'" |
| .format(self.QUERY_TBL, sql_result.query_id)) |
| |
| assert sql_verify.success, test_case |
| assert len(sql_verify.data) == 1, "did not find query '{0}' in query log " \ |
| "table for test case '{1}" \ |
| .format(sql_result.query_id, test_case) |
| assert sql_verify.data[0] == sql, test_case |
| else: |
| esc_sql = sql.replace("'", "\\'") |
| sql_verify = client.execute("select sql from {0} where sql='{1}' " |
| "and start_time_utc > " |
| "date_sub(utc_timestamp(), interval 25 seconds);" |
| .format(self.QUERY_TBL, esc_sql)) |
| assert sql_verify.success, test_case |
| assert len(sql_verify.data) == 1, "did not find query '{0}' in query log " \ |
| "table for test case '{1}" \ |
| .format(esc_sql, test_case) |