blob: bbed25eda13cc8958f869eebea5d0a0c09109a62 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Basic object model of an Impala cluster (set of Impala processes).
import json
import logging
import os
import pipes
import psutil
import socket
import sys
import time
from getpass import getuser
from random import choice
from signal import SIGKILL
from subprocess import check_call
from time import sleep
import tests.common.environ
from tests.common.impala_service import (
from tests.util.shell_util import exec_process, exec_process_async
if sys.version_info >= (2, 7):
# We use some functions in the docker code that don't exist in Python 2.6.
from subprocess import check_output
LOG = logging.getLogger('impala_cluster')
START_DAEMON_PATH = os.path.join(IMPALA_HOME, 'bin/')
# Timeout to use when waiting for a cluster to start up. Set quite high to avoid test
# flakiness.
# Represents a set of Impala processes.
# Handles two cases:
# * The traditional minicluster with many processes running as the current user on
# the local system. In this case various settings are detected based on command
# line options(beeswax_port, webserver_port, etc)
# * The docker minicluster with one container per process connected to a user-defined
# bridge network.
class ImpalaCluster(object):
def __init__(self, docker_network=None):
self.docker_network = docker_network
def get_e2e_test_cluster(cls):
"""Within end-to-end tests, get the cluster under test with settings detected from
the environment."""
return ImpalaCluster(docker_network=tests.common.environ.docker_network)
def refresh(self):
""" Re-loads the impalad/statestored/catalogd processes if they exist.
Helpful to confirm that processes have been killed.
if self.docker_network is None:
self.__impalads, self.__statestoreds, self.__catalogd =\
self.__impalads, self.__statestoreds, self.__catalogd =\
LOG.debug("Found %d impalad/%d statestored/%d catalogd process(es)" %
(len(self.__impalads), len(self.__statestoreds), 1 if self.__catalogd else 0))
def statestored(self):
Returns the statestore process
Note: Currently we expectly a single statestore process, in the future this might
change in which case this should return the "active" statestore.
# If no statestored process exists, return None.
return self.__statestoreds[0] if len(self.__statestoreds) > 0 else None
def impalads(self):
"""Returns a list of the known impalad processes"""
return self.__impalads
def catalogd(self):
"""Returns the catalogd process, or None if no catalogd process was found"""
return self.__catalogd
def get_first_impalad(self):
return self.impalads[0]
def get_any_impalad(self):
"""Selects a random impalad from the list of known processes"""
return choice(self.impalads)
def get_different_impalad(self, other_impalad):
"""Selects an impalad that is different from the given impalad"""
if len(self.impalads) <= 1:
assert 0, "Only %d impalads available to choose from" % len(self.impalads)"other_impalad: " + str(other_impalad))"Cluster: " + str(len(self.impalads)))"Cluster: " + str(self.impalads))
return choice([impalad for impalad in self.impalads if impalad != other_impalad])
def num_responsive_coordinators(self):
"""Find the number of impalad coordinators that can evaluate a test query."""
n = 0
for impalad in self.impalads:
client = impalad.service.create_beeswax_client()
result = client.execute("select 1")
assert result.success
except Exception as e: print e
return n
def wait_until_ready(self, expected_num_impalads=1, expected_num_ready_impalads=None):
"""Waits for this 'cluster' to be ready to submit queries.
A cluster is deemed "ready" if:
- expected_num_impalads impala processes are up (or, if not specified, at least
one impalad is up).
- expected_num_ready_impalads backends are registered with the statestore.
expected_num_ready_impalads defaults to expected_num_impalads.
- All impalads knows about all other ready impalads.
- Each coordinator impalad's catalog cache is ready.
This information is retrieved by querying the statestore debug webpage
and each individual impalad's metrics webpage.
# TODO: fix this for coordinator-only nodes as well.
if expected_num_ready_impalads is None:
expected_num_ready_impalads = len(self.impalads)
def check_processes_still_running():
"""Check that the processes we waited for above (i.e. impalads, statestored,
catalogd) are still running. Throw an exception otherwise."""
# The number of impalad processes may temporarily increase if breakpad forked a
# process to write a minidump.
assert len(self.impalads) >= expected_num_impalads
assert self.statestored is not None
assert self.catalogd is not None
for impalad in self.impalads:
if (impalad._get_arg_value("is_coordinator", default="true") == "true" and
impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0):
def wait_for_num_impalads(self, num_impalads, retries=10):
"""Checks that at least 'num_impalads' impalad processes are running, along with
the statestored and catalogd.
Refresh until the number running impalad processes reaches the expected
number based on num_impalads, or the retry limit is hit. Failing this, raise a
for i in range(retries):
if len(self.impalads) < num_impalads or not self.statestored or not self.catalogd:
msg = ""
if len(self.impalads) < num_impalads:
msg += "Expected {expected_num} impalad(s), only {actual_num} found\n".format(
expected_num=num_impalads, actual_num=len(self.impalads))
if not self.statestored:
msg += "statestored failed to start.\n"
if not self.catalogd:
msg += "catalogd failed to start.\n"
if msg:
raise RuntimeError(msg)
def __build_impala_process_lists(self):
Gets all the running Impala procs (with start arguments) on the machine.
Note: This currently only works for the local case. To support running in a cluster
environment this would need to enumerate each machine in the cluster.
impalads = list()
statestored = list()
catalogd = None
for binary, process in find_user_processes(['impalad', 'catalogd', 'statestored']):
# IMPALA-6889: When a process shuts down and becomes a zombie its cmdline becomes
# empty for a brief moment, before it gets reaped by its parent (see man proc). We
# copy the cmdline to prevent it from changing between the following checks and
# the construction of the *Process objects.
cmdline = ''
cmdline = process.cmdline()
except psutil.NoSuchProcess:
# IMPALA-8320: psutil.Process.cmdline is a property and the process could have
# disappeared between the time we built the process list and now.
if len(cmdline) == 0:
if binary == 'impalad':
elif binary == 'statestored':
elif binary == 'catalogd':
catalogd = CatalogdProcess(cmdline)
return impalads, statestored, catalogd
def __find_docker_containers(self):
Gets all the running Impala containers on self.docker_network.
impalads = []
statestoreds = []
catalogd = None
output = check_output(["docker", "network", "inspect", self.docker_network])
# Only one network should be present in the top level array.
for container_id in json.loads(output)[0]["Containers"]:
container_info = get_container_info(container_id)
if container_info["State"]["Status"] != "running":
# Skip over stopped containers.
args = container_info["Args"]
executable = os.path.basename(args[0])
port_map = {}
for k, v in container_info["NetworkSettings"]["Ports"].iteritems():
# Key looks like "25000/tcp"..
port = int(k.split("/")[0])
# Value looks like { "HostPort": "25002", "HostIp": "" }.
host_port = int(v[0]["HostPort"])
port_map[port] = host_port
if executable == 'impalad':
impalads.append(ImpaladProcess(args, container_id=container_id,
elif executable == 'statestored':
statestoreds.append(StateStoreProcess(args, container_id=container_id,
elif executable == 'catalogd':
assert catalogd is None
catalogd = CatalogdProcess(args, container_id=container_id,
return impalads, statestoreds, catalogd
def __sort_impalads(self, impalads):
"""Does an in-place sort of a list of ImpaladProcess objects into a canonical order.
We order them by their HS2 port, so that get_first_impalad() always returns the
first one. We need to use a port that is exposed and mapped to a host port for
the containerised cluster."""
impalads.sort(key=lambda i: i.service.hs2_port)
# Represents a process running on a machine and common actions that can be performed
# on a process such as restarting or killing. The process may be the main process in
# a Docker container, if the cluster is containerised (in this case container_id must
# be provided). Note that containerised processes are really just processes running
# on the local system with some additional virtualisation, so some operations are
# the same for both containerised and non-containerised cases.
# For containerised processes, 'port_map' should be provided to map from the container's
# ports to ports on the host. Methods from this class always return the host port.
class Process(object):
def __init__(self, cmd, container_id=None, port_map=None):
assert cmd is not None and len(cmd) >= 1,\
'Process object must be created with valid command line argument list'
assert container_id is None or port_map is not None,\
"Must provide port_map for containerised process"
self.cmd = cmd
self.container_id = container_id
self.port_map = port_map
def __class_name(self):
return self.__class__.__name__
def __str__(self):
return "<%s PID: %s (%s)>" % (self.__class_name(), self.__get_pid(),
' '.join(self.cmd))
def __repr__(self):
return str(self)
def get_pid(self):
"""Gets the PID of the process. Returns None if the PID cannot be determined"""
pid = self.__get_pid()
if pid:"Found PID %s for %s" % (pid, " ".join(self.cmd)))
else:"No PID found for process cmdline: %s. Process is dead?" %
" ".join(self.cmd))
return pid
def get_pids(self):
"""Gets the PIDs of the process. In some circumstances, a process can run multiple
times, e.g. when it forks in the Breakpad crash handler. Returns an empty list if no
PIDs can be determined."""
pids = self.__get_pids()
if pids:"Found PIDs %s for %s" % (", ".join(map(str, pids)), " ".join(self.cmd)))
else:"No PID found for process cmdline: %s. Process is dead?" %
" ".join(self.cmd))
return pids
def __get_pid(self):
pids = self.__get_pids()
assert len(pids) < 2, "Expected single pid but found %s" % ", ".join(map(str, pids))
return len(pids) == 1 and pids[0] or None
def __get_pids(self):
if self.container_id is not None:
container_info = get_container_info(self.container_id)
if container_info["State"]["Status"] != "running":
return []
return [container_info["State"]["Pid"]]
# In non-containerised case, search for process based on matching command lines.
pids = []
for pid in psutil.pids():
process = psutil.Process(pid)
if set(self.cmd) == set(process.cmdline()):
except psutil.NoSuchProcess:
# A process from psutil.pids() no longer exists, continue. We don't log this
# error since it can refer to arbitrary processes outside of our testing code.
return pids
def kill(self, signal=SIGKILL):
Kills the given processes.
if self.container_id is None:
pid = self.__get_pid()
assert pid is not None, "No processes for %s" % self'Killing %s with signal %s' % (self, signal))
exec_process("kill -%d %d" % (signal, pid))
else:"Stopping container: {0}".format(self.container_id))
check_call(["docker", "container", "stop", self.container_id])
def start(self):
"""Start the process with the same arguments after it was stopped."""
if self.container_id is None:
binary = os.path.basename(self.cmd[0])
restart_args = self.cmd[1:]"Starting {0} with arguments".format(binary, restart_args))
run_daemon(binary, restart_args)
else:"Starting container: {0}".format(self.container_id))
check_call(["docker", "container", "start", self.container_id])
def restart(self):
"""Kills and restarts the process"""
def wait_for_exit(self):
"""Wait until the process exits (or return immediately if it already has exited."""'Waiting for exit: {0} (PID: {1})'.format(
' '.join(self.cmd), self.get_pid()))
while self.__get_pid() is not None:
def kill_and_wait_for_exit(self, signal=SIGKILL):
"""Kill the process and wait for it to exit"""
# Base class for all Impala processes
class BaseImpalaProcess(Process):
def __init__(self, cmd, container_id=None, port_map=None):
super(BaseImpalaProcess, self).__init__(cmd, container_id, port_map)
self.hostname = self._get_hostname()
self.webserver_interface = self._get_webserver_interface()
def get_webserver_port(self):
"""Return the port for the webserver of this process."""
return int(self._get_port('webserver_port', self._get_default_webserver_port()))
def _get_default_webserver_port(self):
"""Different daemons have different defaults. Subclasses must override."""
raise NotImplementedError()
def _get_webserver_certificate_file(self):
# TODO: if this is containerised, the path will likely not be the same on the host.
return self._get_arg_value("webserver_certificate_file", "")
def _get_hostname(self):
return self._get_arg_value("hostname", socket.gethostname())
def _get_webserver_interface(self):
return self._get_arg_value("webserver_interface", socket.gethostname())
def _get_arg_value(self, arg_name, default=None):
"""Gets the argument value for given argument name"""
for arg in self.cmd:
if ('%s=' % arg_name) in arg.strip().lstrip('-'):
return arg.split('=')[1]
if default is None:
assert 0, "Argument '{0}' not found in cmd '{1}'.".format(arg_name, self.cmd)
return default
def _get_port(self, arg_name, default):
"""Return the host port for the specified by the command line argument 'arg_name'.
If 'self.port_map' is set, maps from container ports to host ports."""
port = int(self._get_arg_value(arg_name, default))
if self.port_map is not None:
port = self.port_map.get(port, port)
return port
# Represents an impalad process
class ImpaladProcess(BaseImpalaProcess):
def __init__(self, cmd, container_id=None, port_map=None):
super(ImpaladProcess, self).__init__(cmd, container_id, port_map)
self.service = ImpaladService(self.hostname, self.webserver_interface,
self.get_webserver_port(), self.__get_beeswax_port(), self.__get_be_port(),
self.__get_krpc_port(), self.__get_hs2_port(), self.__get_hs2_http_port(),
def _get_default_webserver_port(self):
def __get_beeswax_port(self):
return int(self._get_port('beeswax_port', DEFAULT_BEESWAX_PORT))
def __get_be_port(self):
return int(self._get_port('be_port', DEFAULT_BE_PORT))
def __get_krpc_port(self):
return int(self._get_port('krpc_port', DEFAULT_KRPC_PORT))
def __get_hs2_port(self):
return int(self._get_port('hs2_port', DEFAULT_HS2_PORT))
def __get_hs2_http_port(self):
return int(self._get_port('hs2_http_port', DEFAULT_HS2_HTTP_PORT))
def start(self, wait_until_ready=True):
"""Starts the impalad and waits until the service is ready to accept connections."""
restart_args = self.cmd[1:]"Starting Impalad process with args: {0}".format(restart_args))
run_daemon("impalad", restart_args)
if wait_until_ready:
expected_value=1, timeout=30)
def wait_for_catalog(self):
"""Waits for a catalog copy to be received by the impalad. When its received,
additionally waits for client ports to be opened."""
start_time = time.time()
beeswax_port_is_open = False
hs2_port_is_open = False
num_dbs = 0
num_tbls = 0
while ((time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS) and
not (beeswax_port_is_open and hs2_port_is_open)):
num_dbs, num_tbls = self.service.get_metric_values(
["catalog.num-databases", "catalog.num-tables"])
beeswax_port_is_open = self.service.beeswax_port_is_open()
hs2_port_is_open = self.service.hs2_port_is_open()
except Exception:
LOG.exception(("Client services not ready. Waiting for catalog cache: "
"({num_dbs} DBs / {num_tbls} tables). Trying again ...").format(
if not hs2_port_is_open or not beeswax_port_is_open:
raise RuntimeError(
"Unable to open client ports within {num_seconds} seconds.".format(
# Represents a statestored process
class StateStoreProcess(BaseImpalaProcess):
def __init__(self, cmd, container_id=None, port_map=None):
super(StateStoreProcess, self).__init__(cmd, container_id, port_map)
self.service = StateStoredService(self.hostname, self.webserver_interface,
self.get_webserver_port(), self._get_webserver_certificate_file())
def _get_default_webserver_port(self):
# Represents a catalogd process
class CatalogdProcess(BaseImpalaProcess):
def __init__(self, cmd, container_id=None, port_map=None):
super(CatalogdProcess, self).__init__(cmd, container_id, port_map)
self.service = CatalogdService(self.hostname, self.webserver_interface,
self.get_webserver_port(), self._get_webserver_certificate_file(),
def _get_default_webserver_port(self):
def __get_port(self):
return int(self._get_port('catalog_service_port', DEFAULT_CATALOG_SERVICE_PORT))
def start(self, wait_until_ready=True):
"""Starts catalogd and waits until the service is ready to accept connections."""
restart_args = self.cmd[1:]"Starting Catalogd process: {0}".format(restart_args))
run_daemon("catalogd", restart_args)
if wait_until_ready:
expected_value=1, timeout=30)
def find_user_processes(binaries):
"""Returns an iterator over all processes owned by the current user with a matching
binary name from the provided list. Return a iterable of tuples, with each tuple
containing the binary name and the psutil.Process object."""
for pid in psutil.pids():
process = psutil.Process(pid)
cmdline = process.cmdline()
if process.username() != getuser() or len(cmdline) == 0:
# IMPALA-8820 - sometimes the process name does not reflect the executed binary
# because the process can change its own name at runtime. Checking the command
# line is more robust.
binary_name = os.path.basename(cmdline[0])
if binary_name in binaries:
yield binary_name, process
except KeyError, e:
if "uid not found" not in str(e):
except psutil.NoSuchProcess, e:
# Ignore the case when a process no longer exists.
def run_daemon(daemon_binary, args, build_type="latest", env_vars={}, output_file=None):
"""Starts up an impalad with the specified command line arguments. args must be a list
of strings. An optional build_type parameter can be passed to determine the build type
to use for the impalad instance. Any values in the env_vars override environment
variables inherited from this process. If output_file is specified, stdout and stderr
are redirected to that file.
bin_path = os.path.join(IMPALA_HOME, "be", "build", build_type, "service",
redirect = ""
if output_file is not None:
redirect = "1>{0} 2>&1".format(output_file)
cmd = [START_DAEMON_PATH, bin_path] + args
# Use os.system() to start 'cmd' in the background via a shell so its parent will be
# init after the shell exits. Otherwise, the parent of 'cmd' will be py.test and we
# cannot cleanly kill it until py.test exits. In theory, Popen(shell=True) should
# achieve the same thing but it doesn't work on some platforms for some reasons.
sys_cmd = ("{set_cmds} {cmd} {redirect} &".format(
set_cmds=''.join(["export {0}={1};".format(k, pipes.quote(v))
for k, v in env_vars.iteritems()]),
cmd=' '.join([pipes.quote(tok) for tok in cmd]),
def get_container_info(container_id):
"""Get the output of "docker container inspect" as a python data structure."""
containers = json.loads(
check_output(["docker", "container", "inspect", container_id]))
# Only one container should be present in the top level array.
assert len(containers) == 1, json.dumps(containers, indent=4)
return containers[0]