blob: 208cc358f7868911a3b6d2384e0853dc437f8bcd [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Starts up an Impala cluster (ImpalaD + State Store) with the specified number of
# ImpalaD instances. Each ImpalaD runs on a different port allowing this to be run
# on a single machine.
import logging
import os
import psutil
import sys
from datetime import datetime
from getpass import getuser
from time import sleep, time
from optparse import OptionParser, SUPPRESS_HELP
from testdata.common import cgroups
from tests.common.environ import specific_build_type_timeout
logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(threadName)s: %(message)s",
datefmt="%H:%M:%S")
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
LOG.setLevel(level=logging.DEBUG)
KUDU_MASTER_HOSTS = os.getenv("KUDU_MASTER_HOSTS", "127.0.0.1")
DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get("IMPALA_MAX_LOG_FILES", 10)
# Options
parser = OptionParser()
parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", default=3,
help="Size of the cluster (number of impalad instances to start).")
parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators",
default=3, help="Number of coordinators.")
parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinators",
action="store_true", default=False, help="If true, coordinators only "
"coordinate queries and execute coordinator fragments. If false, "
"coordinators also act as executors.")
parser.add_option("--build_type", dest="build_type", default= "latest",
help="Build type to use - debug / release / latest")
parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
default=[],
help="Additional arguments to pass to each Impalad during startup")
parser.add_option("--state_store_args", dest="state_store_args", action="append",
type="string", default=[],
help="Additional arguments to pass to State Store during startup")
parser.add_option("--catalogd_args", dest="catalogd_args", action="append",
type="string", default=[],
help="Additional arguments to pass to the Catalog Service at startup")
parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true",
default=False, help="Instead of starting the cluster, just kill all"
" the running impalads and the statestored.")
parser.add_option("--force_kill", dest="force_kill", action="store_true", default=False,
help="Force kill impalad and statestore processes.")
parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
action="store_true", default=False,
help="Restarts only the impalad processes")
parser.add_option("--in-process", dest="inprocess", action="store_true", default=False,
help="Start all Impala backends and state store in a single process.")
parser.add_option("--log_dir", dest="log_dir",
default=os.environ["IMPALA_CLUSTER_LOGS_DIR"],
help="Directory to store output logs to.")
parser.add_option("--max_log_files", default=DEFAULT_IMPALA_MAX_LOG_FILES,
help="Max number of log files before rotation occurs.")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
help="Prints all output to stderr/stdout.")
parser.add_option("--log_level", type="int", dest="log_level", default=1,
help="Set the impalad backend logging level")
parser.add_option("--jvm_args", dest="jvm_args", default="",
help="Additional arguments to pass to the JVM(s) during startup.")
parser.add_option("--kudu_master_hosts", default=KUDU_MASTER_HOSTS,
help="The host name or address of the Kudu master. Multiple masters "
"can be specified using a comma separated list.")
# For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog
# replica initialization. The ith delay is applied to the ith impalad.
parser.add_option("--catalog_init_delays", dest="catalog_init_delays", default="",
help=SUPPRESS_HELP)
# For testing: Semi-colon separated list of startup arguments to be passed per impalad.
# The ith group of options is applied to the ith impalad.
parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string"
,default="", help=SUPPRESS_HELP)
options, args = parser.parse_args()
IMPALA_HOME = os.environ["IMPALA_HOME"]
KNOWN_BUILD_TYPES = ["debug", "release", "latest"]
IMPALAD_PATH = os.path.join(IMPALA_HOME,
"bin/start-impalad.sh -build_type={build_type}".format(
build_type=options.build_type))
STATE_STORE_PATH = os.path.join(IMPALA_HOME,
"bin/start-statestored.sh -build_type={build_type}".format(
build_type=options.build_type))
CATALOGD_PATH = os.path.join(IMPALA_HOME,
"bin/start-catalogd.sh -build_type={build_type}".format(
build_type=options.build_type))
MINI_IMPALA_CLUSTER_PATH = IMPALAD_PATH + " -in-process"
CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
# Kills have a timeout to prevent automated scripts from hanging indefinitely.
# It is set to a high value to avoid failing if processes are slow to shut down.
KILL_TIMEOUT_IN_SECONDS = 240
# For build types like ASAN, modify the default Kudu rpc timeout.
KUDU_RPC_TIMEOUT = specific_build_type_timeout(0, slow_build_timeout=60000)
def find_user_processes(binaries):
"""Returns an iterator over all processes owned by the current user with a matching
binary name from the provided list."""
for pid in psutil.get_pid_list():
try:
process = psutil.Process(pid)
if process.username == getuser() and process.name in binaries: yield process
except KeyError, e:
if "uid not found" not in str(e):
raise
except psutil.NoSuchProcess, e:
# Ignore the case when a process no longer exists.
pass
def check_process_exists(binary, attempts=1):
"""Checks if a process exists given the binary name. The `attempts` count allows us to
control the time a process needs to settle until it becomes available. After each try
the script will sleep for one second and retry. Returns True if it exists and False
otherwise.
"""
for _ in range(attempts):
for proc in find_user_processes([binary]):
return True
sleep(1)
return False
def exec_impala_process(cmd, args, stderr_log_file_path):
redirect_output = str()
if options.verbose:
args += " -logtostderr=1"
else:
redirect_output = "1>{stderr_log_file_path}".format(
stderr_log_file_path=stderr_log_file_path)
cmd = "{cmd} {args} {redirect_output} 2>&1 &".format(
cmd=cmd,
args=args,
redirect_output=redirect_output)
os.system(cmd)
def kill_cluster_processes(force=False):
binaries = ["catalogd", "impalad", "statestored"]
kill_matching_processes(binaries, force)
def kill_matching_processes(binary_names, force=False):
"""Kills all processes with the given binary name, waiting for them to exit"""
# Send all the signals before waiting so that processes can clean up in parallel.
processes = list(find_user_processes(binary_names))
for process in processes:
try:
if force:
process.kill()
else:
process.terminate()
except psutil.NoSuchProcess:
pass
for process in processes:
try:
process.wait(KILL_TIMEOUT_IN_SECONDS)
except psutil.TimeoutExpired:
raise RuntimeError(("Unable to kill {process_name} (pid {process_pid}) "
"after {num_seconds} seconds.").format(
process_name=process.name,
process_pid=process.pid,
num_seconds=KILL_TIMEOUT_IN_SECONDS))
def start_statestore():
LOG.info("Starting State Store logging to {log_dir}/statestored.INFO".format(
log_dir=options.log_dir))
stderr_log_file_path = os.path.join(options.log_dir, "statestore-error.log")
args = "{impalad_logging_args} {state_store_args}".format(
impalad_logging_args=build_impalad_logging_args(0, "statestored"),
state_store_args=" ".join(options.state_store_args))
exec_impala_process(STATE_STORE_PATH, args, stderr_log_file_path)
if not check_process_exists("statestored", 10):
raise RuntimeError("Unable to start statestored. Check log or file permissions"
" for more details.")
def start_catalogd():
LOG.info("Starting Catalog Service logging to {log_dir}/catalogd.INFO".format(
log_dir=options.log_dir))
stderr_log_file_path = os.path.join(options.log_dir, "catalogd-error.log")
args = "{impalad_logging_args} {catalogd_args} {jvm_args}".format(
impalad_logging_args=build_impalad_logging_args(0, "catalogd"),
catalogd_args=" ".join(options.catalogd_args),
jvm_args=build_jvm_args(options.cluster_size))
exec_impala_process(CATALOGD_PATH, args, stderr_log_file_path)
if not check_process_exists("catalogd", 10):
raise RuntimeError("Unable to start catalogd. Check log or file permissions"
" for more details.")
def build_impalad_port_args(instance_num):
IMPALAD_PORTS = (
"-beeswax_port={beeswax_port} "
"-hs2_port={hs2_port} "
"-be_port={be_port} "
"-krpc_port={krpc_port} "
"-state_store_subscriber_port={state_store_subscriber_port} "
"-webserver_port={webserver_port}")
BASE_BEESWAX_PORT = 21000
BASE_HS2_PORT = 21050
BASE_BE_PORT = 22000
BASE_KRPC_PORT = 27000
BASE_STATE_STORE_SUBSCRIBER_PORT = 23000
BASE_WEBSERVER_PORT = 25000
return IMPALAD_PORTS.format(
beeswax_port=BASE_BEESWAX_PORT + instance_num,
hs2_port=BASE_HS2_PORT + instance_num,
be_port=BASE_BE_PORT + instance_num,
krpc_port=BASE_KRPC_PORT + instance_num,
state_store_subscriber_port=BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
webserver_port=BASE_WEBSERVER_PORT + instance_num)
def build_impalad_logging_args(instance_num, service_name):
return ("-log_filename={log_filename} "
"-log_dir={log_dir} "
"-v={log_level} "
"-logbufsecs=5 "
"-max_log_files={max_log_files}").format(
log_filename=service_name,
log_dir=options.log_dir,
log_level=options.log_level,
max_log_files=options.max_log_files)
def build_jvm_args(instance_num):
BASE_JVM_DEBUG_PORT = 30000
return "-jvm_debug_port={jvm_debug_port} -jvm_args={jvm_args}".format(
jvm_debug_port=BASE_JVM_DEBUG_PORT + instance_num,
jvm_args=options.jvm_args)
def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordinators):
"""Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators
will only execute coordinator fragments."""
if cluster_size == 0:
# No impalad instances should be started.
return
# Set mem_limit of each impalad to the smaller of 12GB or
# 1/cluster_size (typically 1/3) of 70% of system memory.
#
# The default memory limit for an impalad is 80% of the total system memory. On a
# mini-cluster with 3 impalads that means 240%. Since having an impalad be OOM killed
# is very annoying, the mem limit will be reduced. This can be overridden using the
# --impalad_args flag. virtual_memory().total returns the total physical memory.
# The exact ratio to use is somewhat arbitrary. Peak memory usage during
# tests depends on the concurrency of parallel tests as well as their ordering.
# On the other hand, to avoid using too much memory, we limit the
# memory choice here to max out at 12GB. This should be sufficient for tests.
#
# Beware that ASAN builds use more memory than regular builds.
mem_limit = int(0.7 * psutil.virtual_memory().total / cluster_size)
mem_limit = min(12 * 1024 * 1024 * 1024, mem_limit)
delay_list = []
if options.catalog_init_delays != "":
delay_list = [delay.strip() for delay in options.catalog_init_delays.split(",")]
per_impalad_args = []
if options.per_impalad_args != "":
per_impalad_args = [args.strip() for args in options.per_impalad_args.split(";")]
# Start each impalad instance and optionally redirect the output to a log file.
for i in range(cluster_size):
if i == 0:
# The first impalad always logs to impalad.INFO
service_name = "impalad"
else:
service_name = "impalad_node{node_num}".format(node_num=i)
# Sleep between instance startup: simultaneous starts hurt the minikdc
# Yes, this is a hack, but it's easier than modifying the minikdc...
# TODO: is this really necessary?
sleep(1)
LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format(
log_dir=options.log_dir,
service_name=service_name))
# impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
args = ("--mem_limit={mem_limit} "
"{impala_logging_args} "
"{jvm_args} "
"{impala_port_args} "
"{param_args}").format(
mem_limit=mem_limit, # Goes first so --impalad_args will override it.
impala_logging_args=build_impalad_logging_args(i, service_name),
jvm_args=build_jvm_args(i),
impala_port_args=build_impalad_port_args(i),
param_args=param_args)
if options.kudu_master_hosts:
# Must be prepended, otherwise the java options interfere.
args = "-kudu_master_hosts {kudu_master_hosts} {args}".format(
kudu_master_hosts=options.kudu_master_hosts,
args=args)
if "kudu_client_rpc_timeout" not in args:
args = "-kudu_client_rpc_timeout_ms {kudu_rpc_timeout} {args}".format(
kudu_rpc_timeout=KUDU_RPC_TIMEOUT,
args=args)
if i >= num_coordinators:
args = "-is_coordinator=false {args}".format(args=args)
elif use_exclusive_coordinators:
# Coordinator instance that doesn't execute non-coordinator fragments
args = "-is_executor=false {args}".format(args=args)
if i < len(delay_list):
args = "-stress_catalog_init_delay_ms={delay} {args}".format(
delay=delay_list[i],
args=args)
# Appended at the end so they can override previous args.
if i < len(per_impalad_args):
args = "{args} {per_impalad_args}".format(
args=args,
per_impalad_args=per_impalad_args[i])
stderr_log_file_path = os.path.join(
options.log_dir,
"{service_name}-error.log".format(service_name=service_name))
exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
def wait_for_impala_process_count(impala_cluster, retries=10):
"""Checks that the desired number of impalad/statestored processes are running.
Refresh until the number running impalad/statestored processes reaches the expected
number based on CLUSTER_SIZE, or the retry limit is hit. Failing this, raise a
RuntimeError.
"""
for i in range(retries):
if len(impala_cluster.impalads) < options.cluster_size or \
not impala_cluster.statestored or not impala_cluster.catalogd:
sleep(1)
impala_cluster.refresh()
msg = str()
if len(impala_cluster.impalads) < options.cluster_size:
impalads_found = len(impala_cluster.impalads)
msg += "Expected {expected_num} impalad(s), only {actual_num} found\n".format(
expected_num=options.cluster_size,
actual_num=impalads_found)
if not impala_cluster.statestored:
msg += "statestored failed to start.\n"
if not impala_cluster.catalogd:
msg += "catalogd failed to start.\n"
if msg:
raise RuntimeError(msg)
def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
"""Checks if the cluster is "ready"
A cluster is deemed "ready" if:
- All backends are registered with the statestore.
- Each impalad knows about all other impalads.
- Each coordinator impalad's catalog cache is ready.
This information is retrieved by querying the statestore debug webpage
and each individual impalad's metrics webpage.
"""
impala_cluster = ImpalaCluster()
# impalad processes may take a while to come up.
wait_for_impala_process_count(impala_cluster)
# TODO: fix this for coordinator-only nodes as well.
expected_num_backends = options.cluster_size
if options.catalog_init_delays != "":
for delay in options.catalog_init_delays.split(","):
if int(delay.strip()) != 0: expected_num_backends -= 1
for impalad in impala_cluster.impalads:
impalad.service.wait_for_num_known_live_backends(expected_num_backends,
timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
if impalad._get_arg_value("is_coordinator", default="true") == "true" and \
impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0:
wait_for_catalog(impalad)
def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
"""Waits for a catalog copy to be received by the impalad. When its received,
additionally waits for client ports to be opened."""
start_time = time()
client_beeswax = None
client_hs2 = None
num_dbs = 0
num_tbls = 0
while (time() - start_time < timeout_in_seconds):
try:
num_dbs, num_tbls = impalad.service.get_metric_values(
["catalog.num-databases", "catalog.num-tables"])
client_beeswax = impalad.service.create_beeswax_client()
client_hs2 = impalad.service.create_hs2_client()
break
except Exception as e:
LOG.exception(("Client services not ready. Waiting for catalog cache: "
"({num_dbs} DBs / {num_tbls} tables). Trying again ...").format(
num_dbs=num_dbs,
num_tbls=num_tbls))
finally:
if client_beeswax is not None: client_beeswax.close()
sleep(0.5)
if client_beeswax is None or client_hs2 is None:
raise RuntimeError("Unable to open client ports within {num_seconds} seconds.".format(
num_seconds=timeout_in_seconds))
def wait_for_cluster_cmdline(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
"""Checks if the cluster is "ready" by executing a simple query in a loop"""
start_time = time()
IMPALA_SHELL = os.path.join(IMPALA_HOME, "bin/impala-shell.sh")
cmd = "{impala_shell} -i localhost:21000 -q '{query}'".format(
impala_shell=IMPALA_SHELL,
query="select 1")
while os.system(cmd) != 0:
if time() - timeout_in_seconds > start_time:
raise RuntimeError("Cluster did not start within {num_seconds} seconds".format(
num_seconds=timeout_in_seconds))
LOG.info("Cluster not yet available. Sleeping...")
sleep(2)
if __name__ == "__main__":
if options.kill_only:
kill_cluster_processes(force=options.force_kill)
sys.exit(0)
if options.build_type not in KNOWN_BUILD_TYPES:
LOG.error("Invalid build type {0}".format(options.build_type))
LOG.error("Valid values: {0}".format(", ".join(KNOWN_BUILD_TYPES)))
sys.exit(1)
if options.cluster_size < 0:
LOG.error("Please specify a cluster size >= 0")
sys.exit(1)
if options.num_coordinators <= 0:
LOG.error("Please specify a valid number of coordinators > 0")
sys.exit(1)
if (options.use_exclusive_coordinators and
options.num_coordinators >= options.cluster_size):
LOG.error("Cannot start an Impala cluster with no executors")
sys.exit(1)
if not os.path.isdir(options.log_dir):
LOG.error("Log dir does not exist or is not a directory: {log_dir}".format(
log_dir=options.log_dir))
sys.exit(1)
# Kill existing cluster processes based on the current configuration.
if options.restart_impalad_only:
if options.inprocess:
LOG.error(
"Cannot perform individual component restarts using an in-process cluster")
sys.exit(1)
kill_matching_processes(["impalad"], force=options.force_kill)
else:
kill_cluster_processes(force=options.force_kill)
try:
import json
wait_for_cluster = wait_for_cluster_web
except ImportError:
LOG.exception("json module not found, checking "
"for cluster startup through the command-line")
wait_for_cluster = wait_for_cluster_cmdline
# If ImpalaCluster cannot be imported, fall back to the command-line to check
# whether impalads/statestore are up.
try:
from tests.common.impala_cluster import ImpalaCluster
if options.restart_impalad_only:
impala_cluster = ImpalaCluster()
if not impala_cluster.statestored or not impala_cluster.catalogd:
LOG.info("No running statestored or catalogd detected. "
"Restarting entire cluster.")
options.restart_impalad_only = False
except ImportError:
LOG.exception("ImpalaCluster module not found.")
# TODO: Update this code path to work similar to the ImpalaCluster code path when
# restarting only impalad processes. Specifically, we should do a full cluster
# restart if either the statestored or catalogd processes are down, even if
# restart_only_impalad=True.
wait_for_cluster = wait_for_cluster_cmdline
try:
if not options.restart_impalad_only:
start_statestore()
start_catalogd()
start_impalad_instances(options.cluster_size, options.num_coordinators,
options.use_exclusive_coordinators)
# Sleep briefly to reduce log spam: the cluster takes some time to start up.
sleep(3)
# Check for the cluster to be ready.
wait_for_cluster()
except Exception, e:
LOG.exception("Error starting cluster")
sys.exit(1)
if options.use_exclusive_coordinators == True:
executors = options.cluster_size - options.num_coordinators
else:
executors = options.cluster_size
LOG.info(("Impala Cluster Running with {num_nodes} nodes "
"({num_coordinators} coordinators, {num_executors} executors).").format(
num_nodes=options.cluster_size,
num_coordinators=min(options.cluster_size, options.num_coordinators),
num_executors=executors))