blob: e870852b1805b61d650aa865d9f9288204524066 [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Starts up an Impala cluster (ImpalaD + State Store) with the specified number of
# ImpalaD instances. Each ImpalaD runs on a different port allowing this to be run
# on a single machine.
import os
import psutil
import sys
from getpass import getuser
from time import sleep, time
from optparse import OptionParser, SUPPRESS_HELP
from testdata.common import cgroups
KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1')
DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
# Options
parser = OptionParser()
parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", default=3,
help="Size of the cluster (number of impalad instances to start).")
parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators",
default=3, help="Number of coordinators.")
parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinators",
action="store_true", default=False, help="If true, coordinators only "
"coordinate queries and execute coordinator fragments. If false, "
"coordinators also act as executors.")
parser.add_option("--build_type", dest="build_type", default= 'latest',
help="Build type to use - debug / release / latest")
parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
default=[],
help="Additional arguments to pass to each Impalad during startup")
parser.add_option("--state_store_args", dest="state_store_args", action="append",
type="string", default=[],
help="Additional arguments to pass to State Store during startup")
parser.add_option("--catalogd_args", dest="catalogd_args", action="append",
type="string", default=[],
help="Additional arguments to pass to the Catalog Service at startup")
parser.add_option("--disable_krpc", dest="disable_krpc", action="store_true",
default=False, help="Disable KRPC DataStream service during startup.")
parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true",
default=False, help="Instead of starting the cluster, just kill all"
" the running impalads and the statestored.")
parser.add_option("--force_kill", dest="force_kill", action="store_true", default=False,
help="Force kill impalad and statestore processes.")
parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
action="store_true", default=False,
help="Restarts only the impalad processes")
parser.add_option("--in-process", dest="inprocess", action="store_true", default=False,
help="Start all Impala backends and state store in a single process.")
parser.add_option("--log_dir", dest="log_dir",
default=os.environ['IMPALA_CLUSTER_LOGS_DIR'],
help="Directory to store output logs to.")
parser.add_option('--max_log_files', default=DEFAULT_IMPALA_MAX_LOG_FILES,
help='Max number of log files before rotation occurs.')
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
help="Prints all output to stderr/stdout.")
parser.add_option("--log_level", type="int", dest="log_level", default=1,
help="Set the impalad backend logging level")
parser.add_option("--jvm_args", dest="jvm_args", default="",
help="Additional arguments to pass to the JVM(s) during startup.")
parser.add_option("--kudu_master_hosts", default=KUDU_MASTER_HOSTS,
help="The host name or address of the Kudu master. Multiple masters "
"can be specified using a comma separated list.")
# For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog
# replica initialization. The ith delay is applied to the ith impalad.
parser.add_option("--catalog_init_delays", dest="catalog_init_delays", default="",
help=SUPPRESS_HELP)
options, args = parser.parse_args()
IMPALA_HOME = os.environ['IMPALA_HOME']
KNOWN_BUILD_TYPES = ['debug', 'release', 'latest']
IMPALAD_PATH = os.path.join(IMPALA_HOME,
'bin/start-impalad.sh -build_type=%s' % options.build_type)
STATE_STORE_PATH = os.path.join(IMPALA_HOME,
'bin/start-statestored.sh -build_type=%s' % options.build_type)
CATALOGD_PATH = os.path.join(IMPALA_HOME,
'bin/start-catalogd.sh -build_type=%s' % options.build_type)
MINI_IMPALA_CLUSTER_PATH = IMPALAD_PATH + " -in-process"
IMPALA_SHELL = os.path.join(IMPALA_HOME, 'bin/impala-shell.sh')
IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d -be_port=%d -krpc_port=%d "
"-state_store_subscriber_port=%d -webserver_port=%d")
JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s"
BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 -max_log_files=%s"
CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
# Kills have a timeout to prevent automated scripts from hanging indefinitely.
# It is set to a high value to avoid failing if processes are slow to shut down.
KILL_TIMEOUT_IN_SECONDS = 240
def find_user_processes(binaries):
"""Returns an iterator over all processes owned by the current user with a matching
binary name from the provided list."""
for pid in psutil.get_pid_list():
try:
process = psutil.Process(pid)
if process.username == getuser() and process.name in binaries: yield process
except KeyError, e:
if "uid not found" not in str(e):
raise
except psutil.NoSuchProcess, e:
# Ignore the case when a process no longer exists.
pass
def check_process_exists(binary, attempts=1):
"""Checks if a process exists given the binary name. The `attempts` count allows us to
control the time a process needs to settle until it becomes available. After each try
the script will sleep for one second and retry. Returns True if it exists and False
otherwise.
"""
for _ in range(attempts):
for proc in find_user_processes([binary]):
return True
sleep(1)
return False
def exec_impala_process(cmd, args, stderr_log_file_path):
redirect_output = str()
if options.verbose:
args += ' -logtostderr=1'
else:
redirect_output = "1>%s" % stderr_log_file_path
cmd = '%s %s %s 2>&1 &' % (cmd, args, redirect_output)
os.system(cmd)
def kill_cluster_processes(force=False):
binaries = ['catalogd', 'impalad', 'statestored']
kill_matching_processes(binaries, force)
def kill_matching_processes(binary_names, force=False):
"""Kills all processes with the given binary name, waiting for them to exit"""
# Send all the signals before waiting so that processes can clean up in parallel.
processes = list(find_user_processes(binary_names))
for process in processes:
try:
if force:
process.kill()
else:
process.terminate()
except psutil.NoSuchProcess:
pass
for process in processes:
try:
process.wait(KILL_TIMEOUT_IN_SECONDS)
except psutil.TimeoutExpired:
raise RuntimeError("Unable to kill %s (pid %d) after %d seconds." % (process.name,
process.pid, KILL_TIMEOUT_IN_SECONDS))
def start_statestore():
print "Starting State Store logging to %s/statestored.INFO" % options.log_dir
stderr_log_file_path = os.path.join(options.log_dir, "statestore-error.log")
args = "%s %s" % (build_impalad_logging_args(0, "statestored"),
" ".join(options.state_store_args))
exec_impala_process(STATE_STORE_PATH, args, stderr_log_file_path)
if not check_process_exists("statestored", 10):
raise RuntimeError("Unable to start statestored. Check log or file permissions"
" for more details.")
def start_catalogd():
print "Starting Catalog Service logging to %s/catalogd.INFO" % options.log_dir
stderr_log_file_path = os.path.join(options.log_dir, "catalogd-error.log")
args = "%s %s %s" % (build_impalad_logging_args(0, "catalogd"),
" ".join(options.catalogd_args),
build_jvm_args(options.cluster_size))
exec_impala_process(CATALOGD_PATH, args, stderr_log_file_path)
if not check_process_exists("catalogd", 10):
raise RuntimeError("Unable to start catalogd. Check log or file permissions"
" for more details.")
def build_impalad_port_args(instance_num):
BASE_BEESWAX_PORT = 21000
BASE_HS2_PORT = 21050
BASE_BE_PORT = 22000
BASE_KRPC_PORT = 27000
BASE_STATE_STORE_SUBSCRIBER_PORT = 23000
BASE_WEBSERVER_PORT = 25000
return IMPALAD_PORTS % (BASE_BEESWAX_PORT + instance_num, BASE_HS2_PORT + instance_num,
BASE_BE_PORT + instance_num,
BASE_KRPC_PORT + instance_num,
BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
BASE_WEBSERVER_PORT + instance_num)
def build_impalad_logging_args(instance_num, service_name):
return BE_LOGGING_ARGS % (service_name, options.log_dir, options.log_level,
options.max_log_files)
def build_jvm_args(instance_num):
BASE_JVM_DEBUG_PORT = 30000
return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordinators):
"""Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators
will only execute coordinator fragments."""
if cluster_size == 0:
# No impalad instances should be started.
return
# Set mem_limit of each impalad to the smaller of 12GB or
# 1/cluster_size (typically 1/3) of 70% of system memory.
#
# The default memory limit for an impalad is 80% of the total system memory. On a
# mini-cluster with 3 impalads that means 240%. Since having an impalad be OOM killed
# is very annoying, the mem limit will be reduced. This can be overridden using the
# --impalad_args flag. virtual_memory().total returns the total physical memory.
# The exact ratio to use is somewhat arbitrary. Peak memory usage during
# tests depends on the concurrency of parallel tests as well as their ordering.
# On the other hand, to avoid using too much memory, we limit the
# memory choice here to max out at 12GB. This should be sufficient for tests.
#
# Beware that ASAN builds use more memory than regular builds.
mem_limit = int(0.7 * psutil.virtual_memory().total / cluster_size)
mem_limit = min(12 * 1024 * 1024 * 1024, mem_limit)
delay_list = []
if options.catalog_init_delays != "":
delay_list = [delay.strip() for delay in options.catalog_init_delays.split(",")]
# Start each impalad instance and optionally redirect the output to a log file.
for i in range(cluster_size):
if i == 0:
# The first impalad always logs to impalad.INFO
service_name = "impalad"
else:
service_name = "impalad_node%s" % i
# Sleep between instance startup: simultaneous starts hurt the minikdc
# Yes, this is a hack, but it's easier than modifying the minikdc...
# TODO: is this really necessary?
sleep(1)
print "Starting Impala Daemon logging to %s/%s.INFO" % (options.log_dir,
service_name)
# impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
args = "--mem_limit=%s %s %s %s %s" %\
(mem_limit, # Goes first so --impalad_args will override it.
build_impalad_logging_args(i, service_name), build_jvm_args(i),
build_impalad_port_args(i), param_args)
if options.kudu_master_hosts:
# Must be prepended, otherwise the java options interfere.
args = "-kudu_master_hosts %s %s" % (options.kudu_master_hosts, args)
if i >= num_coordinators:
args = "-is_coordinator=false %s" % (args)
elif use_exclusive_coordinators:
# Coordinator instance that doesn't execute non-coordinator fragments
args = "-is_executor=false %s" % (args)
if i < len(delay_list):
args = "-stress_catalog_init_delay_ms=%s %s" % (delay_list[i], args)
if options.disable_krpc:
args = "-use_krpc=false %s" % (args)
stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
def wait_for_impala_process_count(impala_cluster, retries=10):
"""Checks that the desired number of impalad/statestored processes are running.
Refresh until the number running impalad/statestored processes reaches the expected
number based on CLUSTER_SIZE, or the retry limit is hit. Failing this, raise a
RuntimeError.
"""
for i in range(retries):
if len(impala_cluster.impalads) < options.cluster_size or \
not impala_cluster.statestored or not impala_cluster.catalogd:
sleep(1)
impala_cluster.refresh()
msg = str()
if len(impala_cluster.impalads) < options.cluster_size:
impalads_found = len(impala_cluster.impalads)
msg += "Expected %d impalad(s), only %d found\n" %\
(options.cluster_size, impalads_found)
if not impala_cluster.statestored:
msg += "statestored failed to start.\n"
if not impala_cluster.catalogd:
msg += "catalogd failed to start.\n"
if msg:
raise RuntimeError(msg)
def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
"""Checks if the cluster is "ready"
A cluster is deemed "ready" if:
- All backends are registered with the statestore.
- Each impalad knows about all other impalads.
- Each coordinator impalad's catalog cache is ready.
This information is retrieved by querying the statestore debug webpage
and each individual impalad's metrics webpage.
"""
impala_cluster = ImpalaCluster()
# impalad processes may take a while to come up.
wait_for_impala_process_count(impala_cluster)
# TODO: fix this for coordinator-only nodes as well.
expected_num_backends = options.cluster_size
if options.catalog_init_delays != "":
for delay in options.catalog_init_delays.split(","):
if int(delay.strip()) != 0: expected_num_backends -= 1
for impalad in impala_cluster.impalads:
impalad.service.wait_for_num_known_live_backends(expected_num_backends,
timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
if impalad._get_arg_value('is_coordinator', default='true') == 'true' and \
impalad._get_arg_value('stress_catalog_init_delay_ms', default=0) == 0:
wait_for_catalog(impalad)
def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
"""Waits for a catalog copy to be received by the impalad. When its received,
additionally waits for client ports to be opened."""
start_time = time()
client_beeswax = None
client_hs2 = None
num_dbs = 0
num_tbls = 0
while (time() - start_time < timeout_in_seconds):
try:
num_dbs = impalad.service.get_metric_value('catalog.num-databases')
num_tbls = impalad.service.get_metric_value('catalog.num-tables')
client_beeswax = impalad.service.create_beeswax_client()
client_hs2 = impalad.service.create_hs2_client()
break
except Exception as e:
print 'Client services not ready.'
print 'Waiting for catalog cache: (%s DBs / %s tables). Trying again ...' %\
(num_dbs, num_tbls)
finally:
if client_beeswax is not None: client_beeswax.close()
sleep(0.5)
if client_beeswax is None or client_hs2 is None:
raise RuntimeError('Unable to open client ports within %s seconds.'\
% timeout_in_seconds)
def wait_for_cluster_cmdline(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
"""Checks if the cluster is "ready" by executing a simple query in a loop"""
start_time = time()
while os.system('%s -i localhost:21000 -q "%s"' % (IMPALA_SHELL, 'select 1')) != 0:
if time() - timeout_in_seconds > start_time:
raise RuntimeError('Cluster did not start within %d seconds' % timeout_in_seconds)
print 'Cluster not yet available. Sleeping...'
sleep(2)
if __name__ == "__main__":
if options.kill_only:
kill_cluster_processes(force=options.force_kill)
sys.exit(0)
if options.build_type not in KNOWN_BUILD_TYPES:
print 'Invalid build type %s' % options.build_type
print 'Valid values: %s' % ', '.join(KNOWN_BUILD_TYPES)
sys.exit(1)
if options.cluster_size < 0:
print 'Please specify a cluster size >= 0'
sys.exit(1)
if options.num_coordinators <= 0:
print 'Please specify a valid number of coordinators > 0'
sys.exit(1)
if options.use_exclusive_coordinators and options.num_coordinators >= options.cluster_size:
print 'Cannot start an Impala cluster with no executors'
sys.exit(1)
if not os.path.isdir(options.log_dir):
print 'Log dir does not exist or is not a directory: %s' % options.log_dir
sys.exit(1)
# Kill existing cluster processes based on the current configuration.
if options.restart_impalad_only:
if options.inprocess:
print 'Cannot perform individual component restarts using an in-process cluster'
sys.exit(1)
kill_matching_processes(['impalad'], force=options.force_kill)
else:
kill_cluster_processes(force=options.force_kill)
try:
import json
wait_for_cluster = wait_for_cluster_web
except ImportError:
print "json module not found, checking for cluster startup through the command-line"
wait_for_cluster = wait_for_cluster_cmdline
# If ImpalaCluster cannot be imported, fall back to the command-line to check
# whether impalads/statestore are up.
try:
from tests.common.impala_cluster import ImpalaCluster
if options.restart_impalad_only:
impala_cluster = ImpalaCluster()
if not impala_cluster.statestored or not impala_cluster.catalogd:
print 'No running statestored or catalogd detected. Restarting entire cluster.'
options.restart_impalad_only = False
except ImportError:
print 'ImpalaCluster module not found.'
# TODO: Update this code path to work similar to the ImpalaCluster code path when
# restarting only impalad processes. Specifically, we should do a full cluster
# restart if either the statestored or catalogd processes are down, even if
# restart_only_impalad=True.
wait_for_cluster = wait_for_cluster_cmdline
try:
if not options.restart_impalad_only:
start_statestore()
start_catalogd()
start_impalad_instances(options.cluster_size, options.num_coordinators,
options.use_exclusive_coordinators)
# Sleep briefly to reduce log spam: the cluster takes some time to start up.
sleep(3)
# Check for the cluster to be ready.
wait_for_cluster()
except Exception, e:
print 'Error starting cluster: %s' % e
sys.exit(1)
if options.use_exclusive_coordinators == True:
executors = options.cluster_size - options.num_coordinators
else:
executors = options.cluster_size
print 'Impala Cluster Running with %d nodes (%d coordinators, %d executors).' % (
options.cluster_size,
min(options.cluster_size, options.num_coordinators),
executors)