| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| import os |
| import pytest |
| import psutil |
| import re |
| import signal |
| import socket |
| import time |
| import threading |
| |
| from subprocess import check_call |
| from tests.common.environ import build_flavor_timeout |
| from time import sleep |
| |
| from impala.error import HiveServer2Error |
| from TCLIService import TCLIService |
| |
| from beeswaxd.BeeswaxService import QueryState |
| from tests.beeswax.impala_beeswax import ImpalaBeeswaxException |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| from tests.common.skip import SkipIfNotHdfsMinicluster |
| from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session |
| |
| LOG = logging.getLogger(__name__) |
| |
| |
| class TestRestart(CustomClusterTestSuite): |
| @classmethod |
| def get_workload(cls): |
| return 'functional-query' |
| |
| @pytest.mark.execute_serially |
| def test_restart_statestore(self, cursor): |
| """ Regression test of IMPALA-6973. After the statestore restarts, the metadata should |
| eventually recover after being cleared by the new statestore. |
| """ |
| |
| self.cluster.statestored.restart() |
| # We need to wait for the impalad to register to the new statestored and for a |
| # non-empty catalog update from the new statestored. It cannot be expressed with the |
| # existing metrics yet so we wait for some time here. |
| wait_time_s = build_flavor_timeout(60, slow_build_timeout=100) |
| sleep(wait_time_s) |
| for retry in xrange(wait_time_s): |
| try: |
| cursor.execute("describe database functional") |
| return |
| except HiveServer2Error, e: |
| assert "AnalysisException: Database does not exist: functional" in e.message,\ |
| "Unexpected exception: " + e.message |
| sleep(1) |
| assert False, "Coordinator never received non-empty metadata from the restarted " \ |
| "statestore after {0} seconds".format(wait_time_s) |
| |
| @pytest.mark.execute_serially |
| def test_restart_impala(self): |
| """ This test aims to restart Impalad executor nodes between queries to exercise |
| the cluster membership callback which removes stale connections to the restarted |
| nodes.""" |
| |
| self._start_impala_cluster([], num_coordinators=1, cluster_size=3) |
| assert len(self.cluster.impalads) == 3 |
| |
| client = self.cluster.impalads[0].service.create_beeswax_client() |
| assert client is not None |
| |
| for i in xrange(5): |
| self.execute_query_expect_success(client, "select * from functional.alltypes") |
| node_to_restart = 1 + (i % 2) |
| self.cluster.impalads[node_to_restart].restart() |
| # Sleep for a bit for the statestore change in membership to propagate. The min |
| # update frequency for statestore is 100ms but using a larger sleep time here |
| # as certain builds (e.g. ASAN) can be really slow. |
| sleep(3) |
| |
| client.close() |
| |
| @pytest.mark.execute_serially |
| def test_catalog_connection_retries(self): |
| """Test that connections to the catalogd are retried, both new connections and cached |
| connections.""" |
| # Since this is a custom cluster test, each impalad should start off with no cached |
| # connections to the catalogd. So the first call to __test_catalog_connection_retries |
| # should test that new connections are retried. |
| coordinator_service = self.cluster.impalads[0].service |
| assert coordinator_service.get_metric_value( |
| "catalog.server.client-cache.total-clients") == 0 |
| self.__test_catalog_connection_retries() |
| |
| # Since a query was just run that required loading metadata from the catalogd, there |
| # should be a cached connection to the catalogd, so the second call to |
| # __test_catalog_connection_retries should assert that broken cached connections are |
| # retried. |
| assert coordinator_service.get_metric_value( |
| "catalog.server.client-cache.total-clients") == 1 |
| self.__test_catalog_connection_retries() |
| |
| def __test_catalog_connection_retries(self): |
| """Test that a query retries connecting to the catalogd. Kills the catalogd, launches |
| a query that requires catalogd access, starts the catalogd, and then validates that |
| the query eventually finishes successfully.""" |
| self.cluster.catalogd.kill_and_wait_for_exit() |
| |
| query = "select * from functional.alltypes limit 10" |
| query_handle = [] |
| |
| # self.execute_query_async has to be run in a dedicated thread because it does not |
| # truly run a query asynchronously. The query compilation has to complete before |
| # execute_query_async can return. Since compilation requires catalogd access, |
| # execute_query_async won't return until the catalogd is up and running. |
| def execute_query_async(): |
| query_handle.append(self.execute_query_async(query)) |
| |
| thread = threading.Thread(target=execute_query_async) |
| thread.start() |
| # Sleep until the query actually starts to try and access the catalogd. Set an |
| # explicitly high value to avoid any race conditions. The connection is retried 3 |
| # times by default with a 10 second interval, so a high sleep time should not cause |
| # any timeouts. |
| sleep(5) |
| |
| self.cluster.catalogd.start() |
| thread.join() |
| self.wait_for_state(query_handle[0], QueryState.FINISHED, 30000) |
| |
| SUBSCRIBER_TIMEOUT_S = 2 |
| CANCELLATION_GRACE_PERIOD_S = 5 |
| |
| @pytest.mark.execute_serially |
| @SkipIfNotHdfsMinicluster.scheduling |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--statestore_subscriber_timeout_seconds={timeout_s} " |
| "--statestore_subscriber_recovery_grace_period_ms={recovery_period_ms}" |
| .format(timeout_s=SUBSCRIBER_TIMEOUT_S, |
| recovery_period_ms=(CANCELLATION_GRACE_PERIOD_S * 1000)), |
| catalogd_args="--statestore_subscriber_timeout_seconds={timeout_s}".format( |
| timeout_s=SUBSCRIBER_TIMEOUT_S)) |
| def test_restart_statestore_query_resilience(self): |
| """IMPALA-7665: Test that after restarting statestore a momentary inconsistent |
| cluster membership state will not result in query cancellation. Also make sure that |
| queries get cancelled if a backend actually went down while the statestore was |
| down or during the grace period.""" |
| slow_query = \ |
| "select distinct * from tpch_parquet.lineitem where l_orderkey > sleep(1000)" |
| impalad = self.cluster.impalads[0] |
| client = impalad.service.create_beeswax_client() |
| try: |
| handle = client.execute_async(slow_query) |
| # Make sure query starts running. |
| self.wait_for_state(handle, QueryState.RUNNING, 1000) |
| profile = client.get_runtime_profile(handle) |
| assert "NumBackends: 3" in profile, profile |
| # Restart Statestore and wait till the grace period ends + some buffer. |
| self.cluster.statestored.restart() |
| self.cluster.statestored.service.wait_for_live_subscribers(4) |
| sleep(self.CANCELLATION_GRACE_PERIOD_S + 1) |
| assert client.get_state(handle) == QueryState.RUNNING |
| # Now restart statestore and kill a backend while it is down, and make sure the |
| # query fails when it comes back up. |
| start_time = time.time() |
| self.cluster.statestored.kill() |
| self.cluster.impalads[1].kill() |
| self.cluster.statestored.start() |
| try: |
| client.wait_for_finished_timeout(handle, 100) |
| assert False, "Query expected to fail" |
| except ImpalaBeeswaxException as e: |
| assert "Failed due to unreachable impalad" in str(e), str(e) |
| assert time.time() - start_time > self.CANCELLATION_GRACE_PERIOD_S + \ |
| self.SUBSCRIBER_TIMEOUT_S, \ |
| "Query got cancelled earlier than the cancellation grace period" |
| # Now restart statestore and kill a backend after it comes back up, and make sure |
| # the query eventually fails. |
| # Make sure the new statestore has received update from catalog and sent it to the |
| # impalad. |
| catalogd_version = self.cluster.catalogd.service.get_catalog_version() |
| impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version) |
| handle = client.execute_async(slow_query) |
| self.wait_for_state(handle, QueryState.RUNNING, 1000) |
| profile = client.get_runtime_profile(handle) |
| assert "NumBackends: 2" in profile, profile |
| start_time = time.time() |
| self.cluster.statestored.restart() |
| # Make sure it has connected to the impalads before killing one. |
| self.cluster.statestored.service.wait_for_live_subscribers(3) |
| self.cluster.impalads[2].kill() |
| try: |
| client.wait_for_finished_timeout(handle, 100) |
| assert False, "Query expected to fail" |
| except ImpalaBeeswaxException as e: |
| assert "Failed due to unreachable impalad" in str(e), str(e) |
| assert time.time() - start_time > self.CANCELLATION_GRACE_PERIOD_S + \ |
| self.SUBSCRIBER_TIMEOUT_S, \ |
| "Query got cancelled earlier than the cancellation grace period" |
| finally: |
| client.close() |
| |
| |
| def parse_shutdown_result(result): |
| """Parse the shutdown result string and return the strings (grace left, |
| deadline left, queries registered, queries executing).""" |
| assert len(result.data) == 1 |
| summary = result.data[0] |
| match = re.match(r'shutdown grace period left: ([0-9ms]*), deadline left: ([0-9ms]*), ' |
| r'queries registered on coordinator: ([0-9]*), queries executing: ' |
| r'([0-9]*), fragment instances: [0-9]*', summary) |
| assert match is not None, summary |
| return match.groups() |
| |
| |
| class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite): |
| IDLE_SHUTDOWN_GRACE_PERIOD_S = 1 |
| IMPALA_SHUTDOWN_SIGNAL = signal.SIGRTMIN |
| |
| @classmethod |
| def get_workload(cls): |
| return 'functional-query' |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--shutdown_grace_period_s={grace_period} \ |
| --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S, |
| hostname=socket.gethostname())) |
| def test_shutdown_idle(self): |
| """Test that idle impalads shut down in a timely manner after the shutdown grace |
| period elapses.""" |
| impalad1 = psutil.Process(self.cluster.impalads[0].get_pid()) |
| impalad2 = psutil.Process(self.cluster.impalads[1].get_pid()) |
| impalad3 = psutil.Process(self.cluster.impalads[2].get_pid()) |
| |
| # Test that a failed shut down from a bogus host or port fails gracefully. |
| ex = self.execute_query_expect_failure(self.client, |
| ":shutdown('e6c00ca5cd67b567eb96c6ecfb26f05')") |
| assert "Could not find IPv4 address for:" in str(ex) |
| ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:100000')") |
| assert "Invalid port:" in str(ex) |
| assert ("This may be because the port specified is wrong.") not in str(ex) |
| |
| # Test that pointing to the wrong thrift service (the HS2 port) fails gracefully-ish. |
| thrift_ports = [21051, 22001] # HS2 port, old backend port. |
| for port in thrift_ports: |
| ex = self.execute_query_expect_failure(self.client, |
| ":shutdown('localhost:{0}')".format(port)) |
| assert ("failed with error 'RemoteShutdown() RPC failed") in str(ex) |
| assert ("This may be because the port specified is wrong.") in str(ex) |
| |
| # Test RPC error handling with debug action. |
| ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:27001')", |
| query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'}) |
| assert 'Rpc to 127.0.0.1:27001 failed with error \'Debug Action: ' \ |
| 'CRS_SHUTDOWN_RPC:FAIL' in str(ex) |
| |
| # Test remote shutdown. |
| LOG.info("Start remote shutdown {0}".format(time.time())) |
| self.execute_query_expect_success(self.client, ":shutdown('localhost:27001')", |
| query_options={}) |
| |
| # Remote shutdown does not require statestore. |
| self.cluster.statestored.kill() |
| self.cluster.statestored.wait_for_exit() |
| self.execute_query_expect_success(self.client, ":shutdown('localhost:27002')", |
| query_options={}) |
| |
| # Test local shutdown, which should succeed even with injected RPC error. |
| LOG.info("Start local shutdown {0}".format(time.time())) |
| self.execute_query_expect_success(self.client, |
| ":shutdown('{0}:27000')".format(socket.gethostname()), |
| query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'}) |
| |
| # Make sure that the impala daemons exit after the shutdown grace period plus a 10 |
| # second margin of error. |
| start_time = time.time() |
| LOG.info("Waiting for impalads to exit {0}".format(start_time)) |
| impalad1.wait() |
| LOG.info("First impalad exited {0}".format(time.time())) |
| impalad2.wait() |
| LOG.info("Second impalad exited {0}".format(time.time())) |
| impalad3.wait() |
| LOG.info("Third impalad exited {0}".format(time.time())) |
| shutdown_duration = time.time() - start_time |
| assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10 |
| |
| EXEC_SHUTDOWN_GRACE_PERIOD_S = 5 |
| EXEC_SHUTDOWN_DEADLINE_S = 10 |
| |
| @pytest.mark.execute_serially |
| @SkipIfNotHdfsMinicluster.scheduling |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--shutdown_grace_period_s={grace_period} \ |
| --shutdown_deadline_s={deadline} \ |
| --hostname={hostname}".format(grace_period=EXEC_SHUTDOWN_GRACE_PERIOD_S, |
| deadline=EXEC_SHUTDOWN_DEADLINE_S, hostname=socket.gethostname())) |
| def test_shutdown_executor(self): |
| self.do_test_shutdown_executor(fetch_delay_s=0) |
| |
| @pytest.mark.execute_serially |
| @SkipIfNotHdfsMinicluster.scheduling |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--shutdown_grace_period_s={grace_period} \ |
| --shutdown_deadline_s={deadline} \ |
| --stress_status_report_delay_ms={status_report_delay_ms} \ |
| --hostname={hostname}".format(grace_period=EXEC_SHUTDOWN_GRACE_PERIOD_S, |
| deadline=EXEC_SHUTDOWN_DEADLINE_S, status_report_delay_ms=5000, |
| hostname=socket.gethostname())) |
| def test_shutdown_executor_with_delay(self): |
| """Regression test for IMPALA-7931 that adds delays to status reporting and |
| to fetching of results to trigger races that previously resulted in query failures.""" |
| print self.exploration_strategy |
| if self.exploration_strategy() != 'exhaustive': |
| pytest.skip() |
| self.do_test_shutdown_executor(fetch_delay_s=5) |
| |
| def do_test_shutdown_executor(self, fetch_delay_s): |
| """Implementation of test that shuts down and then restarts an executor. This should |
| not disrupt any queries that start after the shutdown or complete before the shutdown |
| time limit. The test is parameterized by 'fetch_delay_s', the amount to delay before |
| fetching from the query that must survive shutdown of an executor.""" |
| # Add sleeps to make sure that the query takes a couple of seconds to execute on the |
| # executors. |
| QUERY = "select count(*) from functional_parquet.alltypes where sleep(1) = bool_col" |
| # Subtle: use a splittable file format like text for lineitem so that each backend |
| # is guaranteed to get scan ranges that contain some actual rows. With Parquet on |
| # S3, the files get broken into 32MB scan ranges and a backend might get unlucky |
| # and only get scan ranges that don't contain the midpoint of any row group, and |
| # therefore not actually produce any rows. |
| SLOW_QUERY = "select count(*) from tpch.lineitem where sleep(1) = l_orderkey" |
| SHUTDOWN_EXEC2 = ": shutdown('localhost:27001')" |
| |
| # Run this query before shutdown and make sure that it executes successfully on |
| # all executors through the shutdown grace period without disruption. |
| before_shutdown_handle = self.__exec_and_wait_until_running(QUERY) |
| |
| # Run this query which simulates getting stuck in admission control until after |
| # the shutdown grace period expires. This demonstrates that queries don't get |
| # cancelled if the cluster membership changes while they're waiting for admission. |
| before_shutdown_admission_handle = self.execute_query_async(QUERY, |
| {'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@30000'}) |
| |
| # Shut down and wait for the shutdown state to propagate through statestore. |
| result = self.execute_query_expect_success(self.client, SHUTDOWN_EXEC2) |
| assert parse_shutdown_result(result) == ( |
| "{0}s000ms".format(self.EXEC_SHUTDOWN_GRACE_PERIOD_S), |
| "{0}s000ms".format(self.EXEC_SHUTDOWN_DEADLINE_S), "0", "1") |
| |
| # Check that the status is reflected on the debug page. |
| web_json = self.cluster.impalads[1].service.get_debug_webpage_json("") |
| assert web_json.get('is_quiescing', None) is True, web_json |
| assert 'shutdown_status' in web_json, web_json |
| |
| self.impalad_test_service.wait_for_num_known_live_backends(2, |
| timeout=self.EXEC_SHUTDOWN_GRACE_PERIOD_S + 5, interval=0.2, |
| include_shutting_down=False) |
| |
| # Run another query, which shouldn't get scheduled on the new executor. We'll let |
| # this query continue running through the full shutdown and restart cycle. |
| after_shutdown_handle = self.__exec_and_wait_until_running(QUERY) |
| |
| # Wait for the impalad to exit, then start it back up and run another query, which |
| # should be scheduled on it again. |
| self.cluster.impalads[1].wait_for_exit() |
| |
| # Finish fetching results from the first query (which will be buffered on the |
| # coordinator) after the backend exits. Add a delay before fetching to ensure |
| # that the query is not torn down on the coordinator when the failure is |
| # detected by the statestore (see IMPALA-7931). |
| assert self.__fetch_and_get_num_backends( |
| QUERY, before_shutdown_handle, delay_s=fetch_delay_s) == 3 |
| |
| # Confirm that the query stuck in admission succeeded. |
| assert self.__fetch_and_get_num_backends( |
| QUERY, before_shutdown_admission_handle, timeout_s=30) == 2 |
| |
| # Start the impalad back up and run another query, which should be scheduled on it |
| # again. |
| self.cluster.impalads[1].start() |
| self.impalad_test_service.wait_for_num_known_live_backends( |
| 3, timeout=30, interval=0.2, include_shutting_down=False) |
| after_restart_handle = self.__exec_and_wait_until_running(QUERY) |
| |
| # The query started while the backend was shut down should not run on that backend. |
| assert self.__fetch_and_get_num_backends(QUERY, after_shutdown_handle) == 2 |
| assert self.__fetch_and_get_num_backends(QUERY, after_restart_handle) == 3 |
| |
| # Test that a query will fail when the executor shuts down after the limit. |
| deadline_expiry_handle = self.__exec_and_wait_until_running(SLOW_QUERY) |
| result = self.execute_query_expect_success(self.client, SHUTDOWN_EXEC2) |
| assert parse_shutdown_result(result) == ( |
| "{0}s000ms".format(self.EXEC_SHUTDOWN_GRACE_PERIOD_S), |
| "{0}s000ms".format(self.EXEC_SHUTDOWN_DEADLINE_S), "0", "1") |
| self.cluster.impalads[1].wait_for_exit() |
| self.__check_deadline_expired(SLOW_QUERY, deadline_expiry_handle) |
| |
| # Test that we can reduce the deadline after setting it to a high value. |
| # Run a query that will fail as a result of the reduced deadline. |
| deadline_expiry_handle = self.__exec_and_wait_until_running(SLOW_QUERY) |
| SHUTDOWN_EXEC3 = ": shutdown('localhost:27002', {0})" |
| VERY_HIGH_DEADLINE = 5000 |
| HIGH_DEADLINE = 1000 |
| LOW_DEADLINE = 5 |
| result = self.execute_query_expect_success( |
| self.client, SHUTDOWN_EXEC3.format(HIGH_DEADLINE)) |
| grace, deadline, _, _ = parse_shutdown_result(result) |
| assert grace == "{0}s000ms".format(self.EXEC_SHUTDOWN_GRACE_PERIOD_S) |
| assert deadline == "{0}m{1}s".format(HIGH_DEADLINE / 60, HIGH_DEADLINE % 60) |
| |
| result = self.execute_query_expect_success( |
| self.client, SHUTDOWN_EXEC3.format(VERY_HIGH_DEADLINE)) |
| _, deadline, _, _ = parse_shutdown_result(result) |
| LOG.info("Deadline is {0}".format(deadline)) |
| min_string, sec_string = re.match("([0-9]*)m([0-9]*)s", deadline).groups() |
| assert int(min_string) * 60 + int(sec_string) <= HIGH_DEADLINE, \ |
| "Cannot increase deadline " + deadline |
| |
| result = self.execute_query_expect_success( |
| self.client, SHUTDOWN_EXEC3.format(LOW_DEADLINE)) |
| _, deadline, _, queries_executing = parse_shutdown_result(result) |
| assert deadline == "{0}s000ms".format(LOW_DEADLINE) |
| assert int(queries_executing) > 0, "Slow query should still be running." |
| self.cluster.impalads[2].wait_for_exit() |
| self.__check_deadline_expired(SLOW_QUERY, deadline_expiry_handle) |
| |
| COORD_SHUTDOWN_GRACE_PERIOD_S = 5 |
| COORD_SHUTDOWN_DEADLINE_S = 120 |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--shutdown_grace_period_s={grace_period} \ |
| --shutdown_deadline_s={deadline} \ |
| --hostname={hostname}".format( |
| grace_period=COORD_SHUTDOWN_GRACE_PERIOD_S, |
| deadline=COORD_SHUTDOWN_DEADLINE_S, hostname=socket.gethostname()), |
| default_query_options=[("num_scanner_threads", "1")]) |
| @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6, |
| close_session=False) |
| def test_shutdown_coordinator(self): |
| """Test that shuts down the coordinator. Running queries should finish but new |
| requests should be rejected.""" |
| # Start a query running. This should complete successfully and keep the coordinator |
| # up until it finishes. We set NUM_SCANNER_THREADS=1 above to make the runtime more |
| # predictable. |
| SLOW_QUERY = """select * from tpch_parquet.lineitem where sleep(1) < l_orderkey""" |
| SHUTDOWN = ": shutdown()" |
| SHUTDOWN_ERROR_PREFIX = 'Server is being shut down:' |
| |
| before_shutdown_handle = self.__exec_and_wait_until_running(SLOW_QUERY) |
| before_shutdown_hs2_handle = self.execute_statement(SLOW_QUERY).operationHandle |
| |
| # Shut down the coordinator. Operations that start after this point should fail. |
| result = self.execute_query_expect_success(self.client, SHUTDOWN) |
| grace, deadline, registered, _ = parse_shutdown_result(result) |
| assert grace == "{0}s000ms".format(self.COORD_SHUTDOWN_GRACE_PERIOD_S) |
| assert deadline == "{0}m".format(self.COORD_SHUTDOWN_DEADLINE_S / 60), "4" |
| assert registered == "3" |
| |
| # Expect that the beeswax shutdown error occurs when calling fn() |
| def expect_beeswax_shutdown_error(fn): |
| try: |
| fn() |
| except ImpalaBeeswaxException, e: |
| assert SHUTDOWN_ERROR_PREFIX in str(e) |
| expect_beeswax_shutdown_error(lambda: self.client.execute("select 1")) |
| expect_beeswax_shutdown_error(lambda: self.client.execute_async("select 1")) |
| |
| # Test that the HS2 shutdown error occurs for various HS2 operations. |
| self.execute_statement("select 1", None, TCLIService.TStatusCode.ERROR_STATUS, |
| SHUTDOWN_ERROR_PREFIX) |
| |
| def check_hs2_shutdown_error(hs2_response): |
| HS2TestSuite.check_response(hs2_response, TCLIService.TStatusCode.ERROR_STATUS, |
| SHUTDOWN_ERROR_PREFIX) |
| check_hs2_shutdown_error(self.hs2_client.OpenSession(TCLIService.TOpenSessionReq())) |
| check_hs2_shutdown_error(self.hs2_client.GetInfo(TCLIService.TGetInfoReq( |
| self.session_handle, TCLIService.TGetInfoType.CLI_MAX_DRIVER_CONNECTIONS))) |
| check_hs2_shutdown_error(self.hs2_client.GetTypeInfo( |
| TCLIService.TGetTypeInfoReq(self.session_handle))) |
| check_hs2_shutdown_error(self.hs2_client.GetCatalogs( |
| TCLIService.TGetCatalogsReq(self.session_handle))) |
| check_hs2_shutdown_error(self.hs2_client.GetSchemas( |
| TCLIService.TGetSchemasReq(self.session_handle))) |
| check_hs2_shutdown_error(self.hs2_client.GetTables( |
| TCLIService.TGetTablesReq(self.session_handle))) |
| check_hs2_shutdown_error(self.hs2_client.GetTableTypes( |
| TCLIService.TGetTableTypesReq(self.session_handle))) |
| check_hs2_shutdown_error(self.hs2_client.GetColumns( |
| TCLIService.TGetColumnsReq(self.session_handle))) |
| check_hs2_shutdown_error(self.hs2_client.GetFunctions( |
| TCLIService.TGetFunctionsReq(self.session_handle, functionName=""))) |
| |
| # Operations on running HS2 query still work. |
| self.fetch_until(before_shutdown_hs2_handle, |
| TCLIService.TFetchOrientation.FETCH_NEXT, 10) |
| HS2TestSuite.check_response(self.hs2_client.CancelOperation( |
| TCLIService.TCancelOperationReq(before_shutdown_hs2_handle))) |
| HS2TestSuite.check_response(self.hs2_client.CloseOperation( |
| TCLIService.TCloseOperationReq(before_shutdown_hs2_handle))) |
| |
| # Make sure that the beeswax query is still executing, then close it to allow the |
| # coordinator to shut down. |
| self.impalad_test_service.wait_for_query_state(self.client, before_shutdown_handle, |
| self.client.QUERY_STATES['FINISHED'], timeout=20) |
| self.client.close_query(before_shutdown_handle) |
| self.cluster.impalads[0].wait_for_exit() |
| |
| def __exec_and_wait_until_running(self, query, timeout=20): |
| """Execute 'query' with self.client and wait until it is in the RUNNING state. |
| 'timeout' controls how long we will wait""" |
| # Fix number of scanner threads to make runtime more deterministic. |
| handle = self.execute_query_async(query, {'num_scanner_threads': 1}) |
| self.impalad_test_service.wait_for_query_state(self.client, handle, |
| self.client.QUERY_STATES['RUNNING'], timeout=20) |
| return handle |
| |
| def __fetch_and_get_num_backends(self, query, handle, delay_s=0, timeout_s=20): |
| """Fetch the results of 'query' from the beeswax handle 'handle', close the |
| query and return the number of backends obtained from the profile.""" |
| self.impalad_test_service.wait_for_query_state(self.client, handle, |
| self.client.QUERY_STATES['FINISHED'], timeout=timeout_s) |
| if delay_s > 0: |
| LOG.info("sleeping for {0}s".format(delay_s)) |
| time.sleep(delay_s) |
| self.client.fetch(query, handle) |
| profile = self.client.get_runtime_profile(handle) |
| self.client.close_query(handle) |
| backends_match = re.search("NumBackends: ([0-9]*)", profile) |
| assert backends_match is not None, profile |
| return int(backends_match.group(1)) |
| |
| def __check_deadline_expired(self, query, handle): |
| """Check that the query with 'handle' fails because of a backend hitting the |
| deadline and shutting down.""" |
| try: |
| self.client.fetch(query, handle) |
| assert False, "Expected query to fail" |
| except Exception, e: |
| assert 'Failed due to unreachable impalad(s)' in str(e) |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--shutdown_grace_period_s={grace_period} \ |
| --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S, |
| hostname=socket.gethostname()), cluster_size=1) |
| def test_shutdown_signal(self): |
| """Test that an idle impalad shuts down in a timely manner after the shutdown grace |
| period elapses.""" |
| impalad = psutil.Process(self.cluster.impalads[0].get_pid()) |
| LOG.info( |
| "Sending IMPALA_SHUTDOWN_SIGNAL(SIGRTMIN = {0}) signal to impalad PID = {1}", |
| self.IMPALA_SHUTDOWN_SIGNAL, impalad.pid) |
| impalad.send_signal(self.IMPALA_SHUTDOWN_SIGNAL) |
| # Make sure that the impala daemon exits after the shutdown grace period plus a 10 |
| # second margin of error. |
| start_time = time.time() |
| LOG.info("Waiting for impalad to exit {0}".format(start_time)) |
| impalad.wait() |
| shutdown_duration = time.time() - start_time |
| assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10 |
| # Make sure signal was received and the grace period and deadline are as expected. |
| self.assert_impalad_log_contains('INFO', |
| "Shutdown signal received. Current Shutdown Status: shutdown grace period left: " |
| "{0}s000ms, deadline left: 8760h".format(self.IDLE_SHUTDOWN_GRACE_PERIOD_S)) |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args(cluster_size=1) |
| def test_sending_multiple_shutdown_signals(self): |
| """Test that multiple IMPALA_SHUTDOWN_SIGNAL signals are all handeled without |
| crashing the process.""" |
| impalad = psutil.Process(self.cluster.impalads[0].get_pid()) |
| NUM_SIGNALS_TO_SEND = 10 |
| LOG.info( |
| "Sending {0} IMPALA_SHUTDOWN_SIGNAL(SIGRTMIN = {1}) signals to impalad PID = {2}", |
| NUM_SIGNALS_TO_SEND, self.IMPALA_SHUTDOWN_SIGNAL, impalad.pid) |
| for i in range(NUM_SIGNALS_TO_SEND): |
| impalad.send_signal(self.IMPALA_SHUTDOWN_SIGNAL) |
| # Give shutdown thread some time to wake up and handle all the signals to avoid |
| # flakiness. |
| sleep(5) |
| # Make sure all signals were received and the process is still up. |
| self.assert_impalad_log_contains('INFO', "Shutdown signal received.", |
| NUM_SIGNALS_TO_SEND) |
| assert impalad.is_running(), "Impalad process should still be running." |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--shutdown_grace_period_s={grace_period} \ |
| --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S, |
| hostname=socket.gethostname()), cluster_size=1) |
| def test_graceful_shutdown_script(self): |
| impalad = psutil.Process(self.cluster.impalads[0].get_pid()) |
| script = os.path.join(os.environ['IMPALA_HOME'], 'bin', |
| 'graceful_shutdown_backends.sh') |
| start_time = time.time() |
| check_call([script, str(self.IDLE_SHUTDOWN_GRACE_PERIOD_S)]) |
| LOG.info("Waiting for impalad to exit {0}".format(start_time)) |
| impalad.wait() |
| shutdown_duration = time.time() - start_time |
| assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10 |