blob: bb6925e7d26f1a7c16e68d1f33678531d1bdb8a0 [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import time
import logging
import os
import pipes
from subprocess import check_call
from tests.common.impala_cluster import ImpalaCluster
from threading import Event, Thread
IMPALA_HOME = os.environ["IMPALA_HOME"]
class AutoScaler(object):
"""This class implements a simple autoscaling algorithm: if queries queue up for a
configurable duration, a new executor group is started. Likewise, if the number of
concurrently running queries indicated that an executor group can be removed, such
measure is taken.
Users of this class can start an auto scaler by calling start() and must call stop()
before exiting (see main() below for an example).
This class only uses the default admission control pool.
"""
DEFAULT_POOL_NAME = "default-pool"
def __init__(self, executor_slots, group_size, start_batch_size=0, max_groups=0,
wait_up_s=0, wait_down_s=0, coordinator_slots=128):
# Number of queries that can run concurrently on each executor
self.executor_slots = executor_slots
self.coordinator_slots = coordinator_slots
# Number of executors per executor group
self.group_size = group_size
# New executor groups will be started in increments of this size
self.start_batch_size = group_size
if start_batch_size > 0:
self.start_batch_size = start_batch_size
# Maximum number of executor groups. We only have 10 TCP ports free on our
# miniclusters and we need one for the dedicated coordinator.
self.max_groups = 9 / self.group_size
# max_groups can further bound the maximum number of groups we are going to start,
# but we won't start more than possible.
if max_groups > 0 and max_groups < self.max_groups:
self.max_groups = max_groups
# Number of seconds to wait before scaling up/down
self.scale_wait_up_s = 5
if wait_up_s > 0:
self.scale_wait_up_s = wait_up_s
self.scale_wait_down_s = 5
if wait_down_s > 0:
self.scale_wait_down_s = wait_down_s
self.groups = []
self.num_groups = 0
# Stopwatches to track how long the conditions for scaling up/down have been met.
self.scale_up_sw = time.time()
self.scale_down_sw = time.time()
self.loop_thread = None
# Event to signal that the control loop should exit
self.stop_ev = Event()
def get_cluster(self):
return ImpalaCluster.get_e2e_test_cluster()
def get_coordinator(self):
cluster = self.get_cluster()
assert len(cluster.impalads) > 0
return cluster.get_first_impalad()
def get_service(self):
return self.get_coordinator().service
def get_client(self):
return self.get_coordinator().service.create_hs2_client()
def group_name(self, idx):
# By convention, group names must start with their associated resource pool name
# followed by a "-".
return "%s-group-%s" % (self.DEFAULT_POOL_NAME, idx)
def start_base_cluster(self):
"""Starts the base cluster consisting of an exclusive coordinator, catalog, and
statestore. Does not add any executors."""
logging.info("Starting base cluster (coordinator, catalog, statestore)")
cluster_args = ["--impalad_args=-executor_groups=coordinator"]
self._start_impala_cluster(cluster_args, cluster_size=1,
executor_slots=self.coordinator_slots,
expected_num_executors=0, add_executors=False)
logging.info("Done, number of running executor groups: %s" % self.num_groups)
def start_group(self):
"""Starts an executor group. The name of the group is automatically determined based
on the current number of total executor groups. Executors in the group will be started
in batches."""
self.num_groups += 1
name = self.group_name(self.num_groups)
desc = "%s:%s" % (name, self.group_size)
logging.info("Starting executor group %s with %s members" % (name, self.group_size))
cluster_args = ["--impalad_args=-executor_groups=%s" % desc]
batch_size = self.start_batch_size
num_started = 0
num_expected = (self.num_groups - 1) * self.group_size
while (num_started < self.group_size):
to_start = min(batch_size, self.group_size - num_started)
num_expected += to_start
if to_start == 1:
start_msg = "Starting executor %s" % (num_started + 1)
else:
start_msg = "Starting executors %s-%s" % (num_started + 1,
num_started + to_start)
logging.info(start_msg)
self._start_impala_cluster(cluster_args, cluster_size=to_start,
executor_slots=self.executor_slots,
expected_num_executors=num_expected, add_executors=True)
num_started += to_start
logging.info("Done, number of running executor groups: %s" % self.num_groups)
def stop_group(self):
"""Stops the executor group that was added last."""
name = self.group_name(self.num_groups)
group_hosts = self.get_groups()[name]
logging.info("Stopping executor group %s" % name)
for host in group_hosts:
logging.debug("Stopping host %s" % host)
query = ":shutdown('%s');" % host
self.execute(query)
self.wait_for_group_gone(name)
self.num_groups -= 1
logging.info("Done, number of running executor groups: %s" % self.num_groups)
def wait_for_group_gone(self, group_name, timeout=120):
"""Waits until all executors in group 'group_name' have unregistered themselves from
the coordinator's cluster membership view."""
end = time.time() + timeout
while time.time() < end:
groups = self.get_groups()
if group_name not in groups:
return
time.sleep(0.5)
assert False, "Timeout waiting for group %s to shut down" % group_name
def get_groups(self):
return self.get_service().get_executor_groups()
def execute(self, query):
return self.get_client().execute(query)
def get_num_queued_queries(self):
"""Returns the number of queries currently queued in the default pool on the
coordinator."""
return self.get_service().get_num_queued_queries(pool_name=self.DEFAULT_POOL_NAME)
def get_num_running_queries(self):
"""Returns the number of queries currently queued in the default pool on the
coordinator."""
return self.get_service().get_num_running_queries(self.DEFAULT_POOL_NAME)
def loop(self):
"""Controls whether new executor groups need to be started or existing ones need to be
stopped, based on the number of queries that are currently queued and running.
"""
while not self.stop_ev.is_set():
now = time.time()
num_queued = self.get_num_queued_queries()
num_running = self.get_num_running_queries()
capacity = self.executor_slots * self.num_groups
logging.debug("queued: %s, running: %s, capacity: %s" % (num_queued, num_running,
capacity))
if num_queued == 0:
self.scale_up_sw = now
scale_up = self.scale_up_sw < now - self.scale_wait_up_s
if scale_up and self.num_groups < self.max_groups:
self.start_group()
self.scale_up_sw = time.time()
self.scale_down_sw = self.scale_up_sw
continue
surplus = capacity - num_running
if surplus < self.executor_slots:
self.scale_down_sw = now
if self.scale_down_sw < now - self.scale_wait_down_s:
self.stop_group()
self.scale_up_sw = time.time()
self.scale_down_sw = self.scale_up_sw
continue
time.sleep(1)
def start(self):
"""Starts a base cluster with coordinator and statestore and the control loop to start
and stop additional executor groups."""
self.start_base_cluster()
assert self.loop_thread is None
self.loop_thread = Thread(target=self.loop)
self.loop_thread.start()
def stop(self):
"""Stops the AutoScaler and its cluster."""
if self.stop_ev.is_set():
return
self.stop_ev.set()
if self.loop_thread:
self.loop_thread.join()
self.loop_thread = None
self._kill_whole_cluster()
def _start_impala_cluster(self, options, cluster_size, executor_slots,
expected_num_executors, add_executors):
"""Starts an Impala cluster and waits for all impalads to come online.
If 'add_executors' is True, new executors will be added to the cluster and the
existing daemons will not be restarted. In that case 'cluster_size' must specify the
number of nodes that will be added and 'expected_num_executors' must be the total
expected number of executors after the additional ones have started.
If 'add_executors' is false, 'cluster_size' must be 1 and a single exclusive
coordinator will be started (together with catalog and statestore).
"""
assert cluster_size > 0, "cluster_size cannot be 0"
impala_log_dir = os.getenv("LOG_DIR", "/tmp/")
cmd = [os.path.join(IMPALA_HOME, "bin/start-impala-cluster.py"),
"--cluster_size=%d" % cluster_size,
"--log_dir=%s" % impala_log_dir,
"--log_level=1"]
if add_executors:
cmd.append("--add_executors")
else:
assert expected_num_executors == 0
assert cluster_size == 1
cmd.append("--use_exclusive_coordinators")
impalad_args = [
"-vmodule=admission-controller=3,cluster-membership-mgr=3",
"-admission_control_slots=%s" % executor_slots,
"-shutdown_grace_period_s=2"]
options += ["--impalad_args=%s" % a for a in impalad_args]
logging.debug("Starting cluster with command: %s" %
" ".join(pipes.quote(arg) for arg in cmd + options))
log_debug = logging.getLogger().getEffectiveLevel() == logging.DEBUG
log_file = None
if not log_debug:
log_file = open("/dev/null", "w")
check_call(cmd + options, close_fds=True, stdout=log_file, stderr=log_file)
# The number of statestore subscribers is
# cluster_size (# of impalad) + 1 (for catalogd).
if expected_num_executors > 0:
expected_subscribers = expected_num_executors + 2
expected_backends = expected_num_executors + 1
else:
expected_subscribers = cluster_size + 1
expected_backends = 1
cluster = self.get_cluster()
statestored = cluster.statestored
if statestored is None:
raise Exception("statestored was not found")
logging.debug("Waiting for %s subscribers to come online" % expected_subscribers)
statestored.service.wait_for_live_subscribers(expected_subscribers, timeout=60)
for impalad in cluster.impalads:
logging.debug("Waiting for %s executors to come online" % expected_backends)
impalad.service.wait_for_num_known_live_backends(expected_backends, timeout=60)
def _kill_whole_cluster(self):
"""Terminates the whole cluster, i.e. all impalads, catalogd, and statestored."""
logging.info("terminating cluster")
check_call([os.path.join(IMPALA_HOME, "bin/start-impala-cluster.py"), "--kill_only"])
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--executor_slots", help="Concurrent queries per executor "
"group", type=int, default=3)
parser.add_argument("-g", "--group_size", help="Number of executors per group",
type=int, default=2)
parser.add_argument("-b", "--batch_size", help="Start executors of a group "
"in batches instead of all at once", type=int, default=0)
parser.add_argument("-m", "--max_groups", help="Maximum number of groups to start",
type=int, default=0)
parser.add_argument("-d", "--wait_down", help="Time to wait before scaling down (s)",
type=int, default=5)
parser.add_argument("-u", "--wait_up", help="Time to wait before scaling up (s)",
type=int, default=5)
parser.add_argument("-v", "--verbose", help="Verbose logging", action="store_true")
args = parser.parse_args()
# Restrict some logging for command line usage
logging.getLogger("impala_cluster").setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("impala.hiveserver2").setLevel(logging.INFO)
else:
logging.basicConfig(level=logging.INFO)
# Also restrict other modules' debug output
logging.getLogger("impala_connection").setLevel(logging.WARNING)
logging.getLogger("impala_service").setLevel(logging.WARNING)
logging.getLogger("impala.hiveserver2").setLevel(logging.WARNING)
a = AutoScaler(executor_slots=args.executor_slots, group_size=args.group_size,
start_batch_size=args.batch_size, max_groups=args.max_groups,
wait_up_s=args.wait_up, wait_down_s=args.wait_down)
a.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.debug("Caught KeyboardInterrupt, stopping autoscaler")
a.stop()
if __name__ == "__main__":
main()