| #!/usr/bin/env impala-python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| import pytest |
| from time import sleep, time |
| |
| from tests.util.auto_scaler import AutoScaler |
| from tests.util.concurrent_workload import ConcurrentWorkload |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| from tests.common.skip import SkipIfEC |
| |
| LOG = logging.getLogger("test_auto_scaling") |
| |
| |
| class TestAutoScaling(CustomClusterTestSuite): |
| @classmethod |
| def get_workload(cls): |
| return 'functional-query' |
| |
| @classmethod |
| def setup_class(cls): |
| if cls.exploration_strategy() != 'exhaustive': |
| pytest.skip('runs only in exhaustive') |
| super(TestAutoScaling, cls).setup_class() |
| |
| """This class contains tests that exercise the logic related to scaling clusters up and |
| down by adding and removing groups of executors.""" |
| INITIAL_STARTUP_TIME_S = 10 |
| STATE_CHANGE_TIMEOUT_S = 45 |
| # This query will scan two partitions (month = 1, 2) and thus will have 1 fragment |
| # instance per executor on groups of size 2. Each partition has 2 rows, so it performs |
| # two comparisons and should take around 1 second to complete. |
| QUERY = """select * from functional_parquet.alltypestiny where month < 3 |
| and id + random() < sleep(500)""" |
| |
| def _get_total_admitted_queries(self): |
| return self.impalad_test_service.get_total_admitted_queries("default-pool") |
| |
| def _get_num_backends(self): |
| return self.impalad_test_service.get_metric_value("cluster-membership.backends.total") |
| |
| def _get_num_running_queries(self): |
| return self.impalad_test_service.get_num_running_queries("default-pool") |
| |
| @SkipIfEC.fix_later |
| def test_single_workload(self): |
| """This test exercises the auto-scaling logic in the admission controller. It spins up |
| a base cluster (coordinator, catalog, statestore), runs some queries to observe that |
| new executors are started, then stops the workload and observes that the cluster gets |
| shutdown.""" |
| GROUP_SIZE = 2 |
| EXECUTOR_SLOTS = 3 |
| auto_scaler = AutoScaler(executor_slots=EXECUTOR_SLOTS, group_size=GROUP_SIZE) |
| workload = None |
| try: |
| auto_scaler.start() |
| sleep(self.INITIAL_STARTUP_TIME_S) |
| |
| workload = ConcurrentWorkload(self.QUERY, num_streams=5) |
| LOG.info("Starting workload") |
| workload.start() |
| |
| # Wait for workers to spin up |
| cluster_size = GROUP_SIZE + 1 # +1 to include coordinator. |
| assert any(self._get_num_backends() >= cluster_size or sleep(1) |
| for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \ |
| "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S |
| assert self.impalad_test_service.get_metric_value( |
| "cluster-membership.executor-groups.total-healthy") >= 1 |
| |
| # Wait until we admitted at least 10 queries |
| assert any(self._get_total_admitted_queries() >= 10 or sleep(1) |
| for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \ |
| "Did not admit enough queries within %s s" % self.STATE_CHANGE_TIMEOUT_S |
| |
| # Wait for second executor group to start |
| cluster_size = (2 * GROUP_SIZE) + 1 |
| assert any(self._get_num_backends() >= cluster_size or sleep(1) |
| for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \ |
| "Number of backends did not reach %s within %s s" % ( |
| cluster_size, self.STATE_CHANGE_TIMEOUT_S) |
| assert self.impalad_test_service.get_metric_value( |
| "cluster-membership.executor-groups.total-healthy") >= 2 |
| |
| # Wait for query rate to reach the maximum for a single executor group plus 20% |
| min_query_rate = 1.2 * EXECUTOR_SLOTS |
| max_query_rate = 0 |
| # This barrier has been flaky in the past so we wait 2x as long as for the other |
| # checks. |
| end = time() + 2 * self.STATE_CHANGE_TIMEOUT_S |
| while time() < end: |
| current_rate = workload.get_query_rate() |
| LOG.info("Current rate: %s" % current_rate) |
| max_query_rate = max(max_query_rate, current_rate) |
| if max_query_rate >= min_query_rate: |
| break |
| sleep(1) |
| |
| assert max_query_rate >= min_query_rate, "Query rate did not reach %s within %s " \ |
| "s. Maximum was %s. Cluster size is %s." % (min_query_rate, |
| self.STATE_CHANGE_TIMEOUT_S, max_query_rate, cluster_size) |
| |
| LOG.info("Stopping workload") |
| workload.stop() |
| |
| # Wait for workers to spin down |
| self.impalad_test_service.wait_for_metric_value( |
| "cluster-membership.backends.total", 1, |
| timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1) |
| assert self.impalad_test_service.get_metric_value( |
| "cluster-membership.executor-groups.total") == 0 |
| |
| finally: |
| if workload: |
| workload.stop() |
| LOG.info("Stopping auto scaler") |
| auto_scaler.stop() |
| |
| @SkipIfEC.fix_later |
| def test_single_group_maxed_out(self): |
| """This test starts an auto scaler and limits it to a single executor group. It then |
| makes sure that the query throughput does not exceed the expected limit.""" |
| GROUP_SIZE = 2 |
| EXECUTOR_SLOTS = 3 |
| auto_scaler = AutoScaler(executor_slots=EXECUTOR_SLOTS, group_size=GROUP_SIZE, |
| max_groups=1, coordinator_slots=EXECUTOR_SLOTS) |
| workload = None |
| try: |
| auto_scaler.start() |
| sleep(self.INITIAL_STARTUP_TIME_S) |
| |
| workload = ConcurrentWorkload(self.QUERY, num_streams=5) |
| LOG.info("Starting workload") |
| workload.start() |
| |
| # Wait for workers to spin up |
| cluster_size = GROUP_SIZE + 1 # +1 to include coordinator. |
| self.impalad_test_service.wait_for_metric_value( |
| "cluster-membership.backends.total", cluster_size, |
| timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1) |
| |
| # Wait until we admitted at least 10 queries |
| assert any(self._get_total_admitted_queries() >= 10 or sleep(1) |
| for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \ |
| "Did not admit enough queries within %s s" % self.STATE_CHANGE_TIMEOUT_S |
| |
| # Sample the number of running queries for while |
| SAMPLE_NUM_RUNNING_S = 30 |
| end_time = time() + SAMPLE_NUM_RUNNING_S |
| num_running = [] |
| while time() < end_time: |
| num_running.append(self._get_num_running_queries()) |
| sleep(1) |
| |
| # Must reach EXECUTOR_SLOTS but not exceed it |
| assert max(num_running) == EXECUTOR_SLOTS, \ |
| "Unexpected number of running queries: %s" % num_running |
| |
| # Check that only a single group started |
| assert self.impalad_test_service.get_metric_value( |
| "cluster-membership.executor-groups.total-healthy") == 1 |
| |
| LOG.info("Stopping workload") |
| workload.stop() |
| |
| # Wait for workers to spin down |
| self.impalad_test_service.wait_for_metric_value( |
| "cluster-membership.backends.total", 1, |
| timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1) |
| assert self.impalad_test_service.get_metric_value( |
| "cluster-membership.executor-groups.total") == 0 |
| |
| finally: |
| if workload: |
| workload.stop() |
| LOG.info("Stopping auto scaler") |
| auto_scaler.stop() |
| |
| @SkipIfEC.fix_later |
| def test_sequential_startup(self): |
| """This test starts an executor group sequentially and observes that no queries are |
| admitted until the group has been fully started.""" |
| # Larger groups size so it takes a while to start up |
| GROUP_SIZE = 4 |
| EXECUTOR_SLOTS = 3 |
| auto_scaler = AutoScaler(executor_slots=EXECUTOR_SLOTS, group_size=GROUP_SIZE, |
| start_batch_size=1, max_groups=1) |
| workload = None |
| try: |
| auto_scaler.start() |
| sleep(self.INITIAL_STARTUP_TIME_S) |
| |
| workload = ConcurrentWorkload(self.QUERY, num_streams=5) |
| LOG.info("Starting workload") |
| workload.start() |
| |
| # Wait for first executor to start up |
| self.impalad_test_service.wait_for_metric_value( |
| "cluster-membership.executor-groups.total", 1, |
| timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1) |
| |
| # Wait for remaining executors to start up and make sure that no queries are |
| # admitted during startup |
| end_time = time() + self.STATE_CHANGE_TIMEOUT_S |
| startup_complete = False |
| cluster_size = GROUP_SIZE + 1 # +1 to include coordinator. |
| while time() < end_time: |
| num_admitted = self._get_total_admitted_queries() |
| num_backends = self._get_num_backends() |
| if num_backends < cluster_size: |
| assert num_admitted == 0, "%s/%s backends started but %s queries have " \ |
| "already been admitted." % (num_backends, cluster_size, num_admitted) |
| if num_admitted > 0: |
| assert num_backends == cluster_size |
| startup_complete = True |
| break |
| sleep(1) |
| |
| assert startup_complete, "Did not start up in %s s" % self.STATE_CHANGE_TIMEOUT_S |
| |
| LOG.info("Stopping workload") |
| workload.stop() |
| |
| # Wait for workers to spin down |
| self.impalad_test_service.wait_for_metric_value( |
| "cluster-membership.backends.total", 1, |
| timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1) |
| assert self.impalad_test_service.get_metric_value( |
| "cluster-membership.executor-groups.total") == 0 |
| |
| finally: |
| if workload: |
| workload.stop() |
| LOG.info("Stopping auto scaler") |
| auto_scaler.stop() |