| #!/usr/bin/env impala-python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from __future__ import absolute_import, division, print_function |
| import argparse |
| import time |
| import logging |
| import os |
| import pipes |
| from subprocess import check_call |
| from tests.common.impala_cluster import ImpalaCluster |
| from tests.util.filesystem_utils import IS_EC |
| from threading import Event, Thread |
| |
| IMPALA_HOME = os.environ["IMPALA_HOME"] |
| |
| |
| class AutoScaler(object): |
| """This class implements a simple autoscaling algorithm: if queries queue up for a |
| configurable duration, a new executor group is started. Likewise, if the number of |
| concurrently running queries indicated that an executor group can be removed, such |
| measure is taken. |
| |
| Users of this class can start an auto scaler by calling start() and must call stop() |
| before exiting (see main() below for an example). |
| |
| This class only uses the default admission control pool. |
| """ |
| DEFAULT_POOL_NAME = "default-pool" |
| |
| def __init__(self, executor_slots, group_size, start_batch_size=0, max_groups=0, |
| wait_up_s=0, wait_down_s=0, coordinator_slots=128): |
| # Number of queries that can run concurrently on each executor |
| self.executor_slots = executor_slots |
| self.coordinator_slots = coordinator_slots |
| # Number of executors per executor group |
| self.group_size = group_size |
| # New executor groups will be started in increments of this size |
| self.start_batch_size = group_size |
| if start_batch_size > 0: |
| self.start_batch_size = start_batch_size |
| # Maximum number of executor groups. We only have 10 TCP ports free on our |
| # miniclusters and we need one for the dedicated coordinator. |
| self.max_groups = 9 // self.group_size |
| # max_groups can further bound the maximum number of groups we are going to start, |
| # but we won't start more than possible. |
| if max_groups > 0 and max_groups < self.max_groups: |
| self.max_groups = max_groups |
| # Number of seconds to wait before scaling up/down |
| self.scale_wait_up_s = 5 |
| if wait_up_s > 0: |
| self.scale_wait_up_s = wait_up_s |
| self.scale_wait_down_s = 5 |
| if wait_down_s > 0: |
| self.scale_wait_down_s = wait_down_s |
| self.groups = [] |
| self.num_groups = 0 |
| # Stopwatches to track how long the conditions for scaling up/down have been met. |
| self.scale_up_sw = time.time() |
| self.scale_down_sw = time.time() |
| |
| self.loop_thread = None |
| # Event to signal that the control loop should exit |
| self.stop_ev = Event() |
| |
| def get_cluster(self): |
| return ImpalaCluster.get_e2e_test_cluster() |
| |
| def get_coordinator(self): |
| cluster = self.get_cluster() |
| assert len(cluster.impalads) > 0 |
| return cluster.get_first_impalad() |
| |
| def get_service(self): |
| return self.get_coordinator().service |
| |
| def get_client(self): |
| return self.get_coordinator().service.create_hs2_client() |
| |
| def group_name(self, idx): |
| # By convention, group names must start with their associated resource pool name |
| # followed by a "-". |
| return "%s-group-%s" % (self.DEFAULT_POOL_NAME, idx) |
| |
| def start_base_cluster(self): |
| """Starts the base cluster consisting of an exclusive coordinator, catalog, and |
| statestore. Does not add any executors.""" |
| logging.info("Starting base cluster (coordinator, catalog, statestore)") |
| cluster_args = ["--impalad_args=-executor_groups=coordinator"] |
| self._start_impala_cluster(cluster_args, cluster_size=1, |
| executor_slots=self.coordinator_slots, |
| expected_num_executors=0, add_executors=False) |
| logging.info("Done, number of running executor groups: %s" % self.num_groups) |
| |
| def start_group(self): |
| """Starts an executor group. The name of the group is automatically determined based |
| on the current number of total executor groups. Executors in the group will be started |
| in batches.""" |
| self.num_groups += 1 |
| name = self.group_name(self.num_groups) |
| desc = "%s:%s" % (name, self.group_size) |
| logging.info("Starting executor group %s with %s members" % (name, self.group_size)) |
| cluster_args = ["--impalad_args=-executor_groups=%s" % desc] |
| batch_size = self.start_batch_size |
| num_started = 0 |
| num_expected = (self.num_groups - 1) * self.group_size |
| while (num_started < self.group_size): |
| to_start = min(batch_size, self.group_size - num_started) |
| num_expected += to_start |
| if to_start == 1: |
| start_msg = "Starting executor %s" % (num_started + 1) |
| else: |
| start_msg = "Starting executors %s-%s" % (num_started + 1, |
| num_started + to_start) |
| logging.info(start_msg) |
| self._start_impala_cluster(cluster_args, cluster_size=to_start, |
| executor_slots=self.executor_slots, |
| expected_num_executors=num_expected, add_executors=True) |
| num_started += to_start |
| logging.info("Done, number of running executor groups: %s" % self.num_groups) |
| |
| def stop_group(self): |
| """Stops the executor group that was added last.""" |
| name = self.group_name(self.num_groups) |
| group_hosts = self.get_groups()[name] |
| logging.info("Stopping executor group %s" % name) |
| for host in group_hosts: |
| logging.debug("Stopping host %s" % host) |
| query = ":shutdown('%s');" % host |
| self.execute(query) |
| self.wait_for_group_gone(name) |
| self.num_groups -= 1 |
| logging.info("Done, number of running executor groups: %s" % self.num_groups) |
| |
| def wait_for_group_gone(self, group_name, timeout=120): |
| """Waits until all executors in group 'group_name' have unregistered themselves from |
| the coordinator's cluster membership view.""" |
| end = time.time() + timeout |
| while time.time() < end: |
| groups = self.get_groups() |
| if group_name not in groups: |
| return |
| time.sleep(0.5) |
| assert False, "Timeout waiting for group %s to shut down" % group_name |
| |
| def get_groups(self): |
| return self.get_service().get_executor_groups() |
| |
| def execute(self, query): |
| return self.get_client().execute(query) |
| |
| def get_num_queued_queries(self): |
| """Returns the number of queries currently queued in the default pool on the |
| coordinator.""" |
| return self.get_service().get_num_queued_queries(pool_name=self.DEFAULT_POOL_NAME) |
| |
| def get_num_running_queries(self): |
| """Returns the number of queries currently queued in the default pool on the |
| coordinator.""" |
| return self.get_service().get_num_running_queries(self.DEFAULT_POOL_NAME) |
| |
| def loop(self): |
| """Controls whether new executor groups need to be started or existing ones need to be |
| stopped, based on the number of queries that are currently queued and running. |
| """ |
| while not self.stop_ev.is_set(): |
| now = time.time() |
| num_queued = self.get_num_queued_queries() |
| num_running = self.get_num_running_queries() |
| capacity = self.executor_slots * self.num_groups |
| |
| logging.debug("queued: %s, running: %s, capacity: %s" % (num_queued, num_running, |
| capacity)) |
| |
| if num_queued == 0: |
| self.scale_up_sw = now |
| |
| scale_up = self.scale_up_sw < now - self.scale_wait_up_s |
| if scale_up and self.num_groups < self.max_groups: |
| self.start_group() |
| self.scale_up_sw = time.time() |
| self.scale_down_sw = self.scale_up_sw |
| continue |
| |
| surplus = capacity - num_running |
| if surplus < self.executor_slots: |
| self.scale_down_sw = now |
| |
| if self.scale_down_sw < now - self.scale_wait_down_s: |
| self.stop_group() |
| self.scale_up_sw = time.time() |
| self.scale_down_sw = self.scale_up_sw |
| continue |
| |
| time.sleep(1) |
| |
| def start(self): |
| """Starts a base cluster with coordinator and statestore and the control loop to start |
| and stop additional executor groups.""" |
| self.start_base_cluster() |
| assert self.loop_thread is None |
| self.loop_thread = Thread(target=self.loop) |
| self.loop_thread.start() |
| |
| def stop(self): |
| """Stops the AutoScaler and its cluster.""" |
| if self.stop_ev.is_set(): |
| return |
| self.stop_ev.set() |
| if self.loop_thread: |
| self.loop_thread.join() |
| self.loop_thread = None |
| self._kill_whole_cluster() |
| |
| def _start_impala_cluster(self, options, cluster_size, executor_slots, |
| expected_num_executors, add_executors): |
| """Starts an Impala cluster and waits for all impalads to come online. |
| |
| If 'add_executors' is True, new executors will be added to the cluster and the |
| existing daemons will not be restarted. In that case 'cluster_size' must specify the |
| number of nodes that will be added and 'expected_num_executors' must be the total |
| expected number of executors after the additional ones have started. |
| |
| If 'add_executors' is false, 'cluster_size' must be 1 and a single exclusive |
| coordinator will be started (together with catalog and statestore). |
| """ |
| assert cluster_size > 0, "cluster_size cannot be 0" |
| impala_log_dir = os.getenv("LOG_DIR", "/tmp/") |
| cmd = [os.path.join(IMPALA_HOME, "bin/start-impala-cluster.py"), |
| "--cluster_size=%d" % cluster_size, |
| "--log_dir=%s" % impala_log_dir, |
| "--log_level=1"] |
| if add_executors: |
| cmd.append("--add_executors") |
| else: |
| assert expected_num_executors == 0 |
| assert cluster_size == 1 |
| cmd.append("--use_exclusive_coordinators") |
| |
| impalad_args = [ |
| "-vmodule=admission-controller=3,cluster-membership-mgr=3", |
| "-admission_control_slots=%s" % executor_slots, |
| "-shutdown_grace_period_s=2"] |
| |
| options += ["--impalad_args=%s" % a for a in impalad_args] |
| |
| logging.debug("Starting cluster with command: %s" % |
| " ".join(pipes.quote(arg) for arg in cmd + options)) |
| log_debug = logging.getLogger().getEffectiveLevel() == logging.DEBUG |
| log_file = None |
| if not log_debug: |
| log_file = open("/dev/null", "w") |
| |
| check_call(cmd + options, close_fds=True, stdout=log_file, stderr=log_file) |
| |
| # The number of statestore subscribers is |
| # cluster_size (# of impalad) + 1 (for catalogd). |
| if expected_num_executors > 0: |
| expected_subscribers = expected_num_executors + 2 |
| expected_backends = expected_num_executors + 1 |
| else: |
| expected_subscribers = cluster_size + 1 |
| expected_backends = 1 |
| |
| cluster = self.get_cluster() |
| statestored = cluster.statestored |
| if statestored is None: |
| raise Exception("statestored was not found") |
| |
| logging.debug("Waiting for %s subscribers to come online" % expected_subscribers) |
| statestored.service.wait_for_live_subscribers(expected_subscribers, timeout=60) |
| for impalad in cluster.impalads: |
| logging.debug("Waiting for %s executors to come online" % expected_backends) |
| impalad.service.wait_for_num_known_live_backends(expected_backends, timeout=60) |
| |
| def _kill_whole_cluster(self): |
| """Terminates the whole cluster, i.e. all impalads, catalogd, and statestored.""" |
| logging.info("terminating cluster") |
| check_call([os.path.join(IMPALA_HOME, "bin/start-impala-cluster.py"), "--kill_only"]) |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("-n", "--executor_slots", help="Concurrent queries per executor " |
| "group", type=int, default=3) |
| parser.add_argument("-g", "--group_size", help="Number of executors per group", |
| type=int, default=2) |
| parser.add_argument("-b", "--batch_size", help="Start executors of a group " |
| "in batches instead of all at once", type=int, default=0) |
| parser.add_argument("-m", "--max_groups", help="Maximum number of groups to start", |
| type=int, default=0) |
| parser.add_argument("-d", "--wait_down", help="Time to wait before scaling down (s)", |
| type=int, default=5) |
| parser.add_argument("-u", "--wait_up", help="Time to wait before scaling up (s)", |
| type=int, default=5) |
| parser.add_argument("-v", "--verbose", help="Verbose logging", action="store_true") |
| args = parser.parse_args() |
| |
| # Restrict some logging for command line usage |
| logging.getLogger("impala_cluster").setLevel(logging.INFO) |
| logging.getLogger("requests").setLevel(logging.WARNING) |
| if args.verbose: |
| logging.basicConfig(level=logging.DEBUG) |
| logging.getLogger("impala.hiveserver2").setLevel(logging.INFO) |
| else: |
| logging.basicConfig(level=logging.INFO) |
| # Also restrict other modules' debug output |
| logging.getLogger("impala_connection").setLevel(logging.WARNING) |
| logging.getLogger("impala_service").setLevel(logging.WARNING) |
| logging.getLogger("impala.hiveserver2").setLevel(logging.WARNING) |
| |
| a = AutoScaler(executor_slots=args.executor_slots, group_size=args.group_size, |
| start_batch_size=args.batch_size, max_groups=args.max_groups, |
| wait_up_s=args.wait_up, wait_down_s=args.wait_down) |
| a.start() |
| try: |
| while True: |
| time.sleep(1) |
| except KeyboardInterrupt: |
| logging.debug("Caught KeyboardInterrupt, stopping autoscaler") |
| a.stop() |
| |
| |
| if __name__ == "__main__": |
| main() |