blob: 8bb35b0a207ec13ff01f3b66eadd8965fd35dc55 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# We do not use Impala's python environment here, nor do we depend on
# non-standard python libraries to avoid needing extra build steps before
# triggering this.
import argparse
import datetime
import itertools
import logging
import multiprocessing
import multiprocessing.pool
import os
import re
import subprocess
import sys
import tempfile
import threading
import time
CLI_HELP = """\
Runs tests inside of docker containers, parallelizing different types of
tests. This script first creates a docker container, checks out this repo
into it, bootstraps the container with Impala dependencies, and builds Impala
and its test data. Then, it saves the resulting container, and launches new
containers to run tests in parallel. An HTML, visual timeline is generated
as part of the build, in logs/docker/*/timeline.html.
# To execute run:
# docker/
# After waiting for that to finish, inspect results in logs/docker.
# Visually, the timeline looks as follows, produced on a 32-core, 100GB RAM
# machine:
# ....... 1h05m Checkout, setup machine, build (8m with ccache),
# generate testdata (52m); missing ccache
# adds about 7 minutes (very sensitive to number
# of available cores)
# ... 11m Commit the Docker container
# . 10m FE tests
# . 10m JDBC tests
# .... 45m serial EE tests
# ...... 1h02m cluster tests
# ... 31m BE (C++) tests
# .... 36m parallel EE tests
# Total time: 2h25m.
# CPU usage is sustained high for the parallel EE tests and for
# the C++ compile (when it's not ccache'd), but is otherwise low.
# Because every parallel track consumes memory (a cluster),
# increasing parallelism and memory must be balanced.
# Memory usage is thorny. The minicluster memory can
# be tweaked somewhat by how much memory to give to the JVM
# and what to set --mem_limit too. Furthermore, parallel
# cluster tests use more memory when more parallelism happens.
# The code that runs inside of the containers is in,
# whereas the code that invokes docker is here.
# We avoid using Dockerfile and "docker build": they make it hard or impossible
# to cross-mount host directories into containers or use --privileged, and using
# them would require generating them dynamically. They're more trouble than
# they're worth for this use case.
# In practice, the containers are about 100GB (with 45GB
# being test data and ~40GB being the tests).
# Requirements:
# * Docker
# This has been tested on Ubuntu16.04 with Docker
# from the Ubuntu repos, i.e., Docker 1.13.1.
# * About 150 GB of disk space available to Docker.
# * 75GB of RAM.
# This script tries to clean up images and containers created by this process, though
# this can be disabled for debugging.
# To clean up containers and images manually, you can use:
# for x in $(docker ps -aq --filter label=pwd=$IMPALA_HOME); do
# docker stop $x; docker rm $x; done
# for x in $(docker images -q --filter label=pwd=$IMPALA_HOME); do docker rmi $x; done
# Core dumps:
# On an Ubuntu host, core dumps and Docker don't mix by default, because apport is not
# running inside of the container. See
# To enable core dumps, run the following command on the host:
# $echo 'core.%e.%p' | sudo tee /proc/sys/kernel/core_pattern
# TODOs:
# - Support for executing other flavors, like exhaustive, or file systems,
# like S3.
# Suggested speed improvement TODOs:
# - Speed up testdata generation
# - Skip generating test data for variants not being run
# - Make container image smaller
# - Analyze .xml junit files to find slow tests; eradicate
# or move to different suite.
# - Run BE tests earlier (during data load)
if __name__ == '__main__' and __package__ is None:
import monitor
base = os.path.dirname(os.path.abspath(__file__))
LOG_FORMAT="%(asctime)s %(threadName)s: %(message)s"
def main():
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
default_parallel_test_concurrency, default_suite_concurrency, default_memlimit_gb = \
parser = argparse.ArgumentParser(
description=CLI_HELP, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
group = parser.add_mutually_exclusive_group()
group.add_argument('--cleanup-containers', dest="cleanup_containers",
action='store_true', default=True,
help='Removes containers when finished.')
dest="cleanup_containers", action='store_false')
group = parser.add_mutually_exclusive_group()
'--parallel-test-concurrency', type=int,
help='For the ee-test-parallel suite, how many tests to run concurrently.')
'--suite-concurrency', type=int, default=default_suite_concurrency,
help='Number of concurrent suites to run in parallel.')
'--impalad-mem-limit-bytes', type=int, default=default_memlimit_gb,
help='Memlimit to pass to impalad for miniclusters.')
'--cleanup-image', dest="cleanup_image",
action='store_true', default=True,
help="Whether to remove image when done.")
group.add_argument('--no-cleanup-image', dest="cleanup_image", action='store_false')
parser.add_argument('--base-image', dest="base_image", default="ubuntu:16.04",
help="Base OS image to use. ubuntu:16.04 and centos:6 are known to work.")
'--build-image', metavar='IMAGE',
help='Skip building, and run tests on pre-existing image.')
suite_group = parser.add_mutually_exclusive_group()
'--suite', metavar='VARIANT', action='append',
Run specific test suites; can be specified multiple times.
Test-with-docker may shard some suites to improve parallelism.
If not specified, default tests are run.
Default: %s, All Choices: %s
""" % (",".join([ for s in DEFAULT_SUITES]),
",".join([ for s in ALL_SUITES ])))
suite_group.add_argument('--all-suites', action='store_true', default=False,
help="If set, run all available suites.")
'--name', metavar='NAME',
help="Use a specific name for the test run. The name is used " +
"as a prefix for the container and image names, and " +
"as part of the log directory naming. Defaults to include a timestamp.","i-%Y%m%d-%H%M%S"))
parser.add_argument('--ccache-dir', metavar='DIR',
help="CCache directory to use",
parser.add_argument('--tail', action="store_true",
help="Run tail on all container log files.")
help="Git repo for Impala-lzo repo")
parser.add_argument('--impala-lzo-ref', default='master',
help="Branch name for Impala-lzo repo.")
parser.add_argument('--env', metavar='K=V', default=[], action='append',
help="""Passes given environment variables (expressed as KEY=VALUE)
through containers.
parser.add_argument('--test', action="store_true")
args = parser.parse_args()
if not args.suite:
if args.all_suites:
# Ignore "NOOP" tasks, as they are just for testing.
args.suite = [ for s in ALL_SUITES if not"NOOP") ]
args.suite = [ for s in DEFAULT_SUITES ]
t = TestWithDocker(
build_image=args.build_image, suite_names=args.suite,, cleanup_containers=args.cleanup_containers,
cleanup_image=args.cleanup_image, ccache_dir=args.ccache_dir, test_mode=args.test,
env=args.env, base_image=args.base_image)
fh = logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt"))
logging.getLogger('').addHandler(fh)"Arguments: %s", args)
ret =
if not ret:
def _compute_defaults():
"""Compute default config options based on memory.
The goal is to work reasonably on machines with
about 60GB of memory, like Amazon's c4.8xlarge (36 CPUs, 60GB)
or c5.9xlarge (36 CPUs, 72GB) or m4.4xlarge (16 CPUs, 64 GB).
Based on some experiments, we set up defaults for different
machine sizes based on memory, with an eye towards
having reasonable runtimes as well.
Experiments on memory usage:
suite parallelism usage
Xmx memlimit
ee-test-parallel 4GB 8 5GB 33GB
ee-test-parallel 4GB 16 7GB 37GB
ee-test-serial 4GB - 5GB 18GB
cluster-test 4GB - - 13GB
be-test 4GB - 10GB 19GB
fe-test 4GB - 10GB 9GB
total_memory_gb = monitor.total_memory()
cpus = multiprocessing.cpu_count()"CPUs: %s Memory (GB): %s", cpus, total_memory_gb)
parallel_test_concurrency = min(cpus, 8)
memlimit_gb = 8
if total_memory_gb >= 95:
suite_concurrency = 4
memlimit_gb = 11
parallel_test_concurrency = min(cpus, 12)
elif total_memory_gb >= 65:
suite_concurrency = 3
elif total_memory_gb >= 35:
suite_concurrency = 2
logging.warning("This tool should be run on a machine with more memory.")
suite_concurrency = 1
return parallel_test_concurrency, suite_concurrency, memlimit_gb * 1024 * 1024 * 1024
class Suite(object):
"""Encapsulates a test suite.
A test suite is a named thing that the user can select to run,
and it runs in its own container, in parallel with other suites.
The actual running happens from and is controlled
mostly by environment variables. When complexity is easier
to handle in Python (with its richer data types), we prefer
it here.
def __init__(self, name, **envs):
"""Create suite with given name and environment.""" = name
self.envs = dict(
# If set, this suite is sharded past a certain suite concurrency threshold.
self.shard_at_concurrency = None
# Variable to which to append --shard_tests
self.sharding_variable = None
self.envs[name] = "true"
self.timeout_minutes = 120
def copy(self, name, **envs):
"""Duplicates current suite allowing for environment updates."""
v = dict()
ret = Suite(name, **v)
ret.shard_at_concurrency = self.shard_at_concurrency
ret.sharding_variable = self.sharding_variable
ret.timeout_minutes = self.timeout_minutes
return ret
def exhaustive(self):
"""Returns an "exhaustive" copy of the suite."""
r = self.copy( + "_EXHAUSTIVE", EXPLORATION_STRATEGY="exhaustive")
r.timeout_minutes = 240
return r
def asan(self):
"""Returns an ASAN copy of this suite."""
r = self.copy( + "_ASAN", REBUILD_ASAN="true")
r.timeout_minutes = self.timeout_minutes * 2.0 + 10
return r
def sharded(self, shards):
"""Returns a list of sharded copies of the list.
key is the name of the variable which needs to be appended with "--shard-tests=..."
ret = []
for i in range(1, shards + 1):
s = self.copy("%s_%d_of_%d" % (, i, shards))
s.envs[self.sharding_variable] = self.envs.get(self.sharding_variable, "") \
+ " --shard_tests=%s/%s" % (i, shards)
return ret
# Definitions of all known suites:
be_test = Suite("BE_TEST")
ee_test_serial = Suite("EE_TEST_SERIAL", EE_TEST="true",
RUN_TESTS_ARGS="--skip-parallel --skip-stress")
ee_test_serial.shard_at_concurrency = 4
ee_test_serial.sharding_variable = "RUN_TESTS_ARGS"
ee_test_serial_exhaustive = ee_test_serial.exhaustive()
ee_test_parallel = Suite("EE_TEST_PARALLEL", EE_TEST="true",
ee_test_parallel_exhaustive = ee_test_parallel.exhaustive()
cluster_test = Suite("CLUSTER_TEST")
cluster_test.shard_at_concurrency = 4
cluster_test.sharding_variable = "RUN_CUSTOM_CLUSTER_TESTS_ARGS"
cluster_test_exhaustive = cluster_test.exhaustive()
# Default supported suites. These are organized slowest-to-fastest, so that,
# when parallelism is limited, the total time is least impacted.
# These are used for testing this script
def _call(args, check=True):
"""Wrapper for calling a subprocess.
args is the first argument of subprocess.Popen, typically
an array, e.g., ["echo", "hi"].
If check is set, raise an exception on failure.
""""Calling: %s", args)
if check:
subprocess.check_call(args, stdin=None)
return, stdin=None)
def _check_output(*args, **kwargs):
"""Wrapper for subprocess.check_output, with logging.""""Running: %s, %s; cmdline: %s.", args, kwargs, " ".join(*args))
return subprocess.check_output(*args, **kwargs)
def _make_dir_if_not_exist(*parts):
d = os.path.join(*parts)
if not os.path.exists(d):
return d
class Container(object):
"""Encapsulates a container, with some metadata."""
def __init__(self, id_, name, logfile, exitcode=None, running=None): = id_ = name
self.logfile = logfile
self.exitcode = exitcode
self.running = running
self.start = None
self.end = None
self.removed = False
# Protects multiple calls to "docker rm <>"
self.lock = threading.Lock()
# Updated by Timeline class
self.total_user_cpu = -1
self.total_system_cpu = -1
self.peak_total_rss = -1
def runtime_seconds(self):
if self.start and self.end:
return self.end - self.start
def __str__(self):
return "Container<" + \
",".join(["%s=%s" % (k, v) for k, v in self.__dict__.items()]) \
+ ">"
class TestWithDocker(object):
"""Tests Impala using Docker containers for parallelism."""
def __init__(self, build_image, suite_names, name, cleanup_containers,
cleanup_image, ccache_dir, test_mode,
suite_concurrency, parallel_test_concurrency,
impalad_mem_limit_bytes, tail,
impala_lzo_repo, impala_lzo_ref, env, base_image):
self.build_image = build_image = name
self.containers = []
self.git_root = _check_output(["git", "rev-parse", "--show-toplevel"]).strip()
# Protects multiple concurrent calls to "docker create"
self.docker_lock = threading.Lock()
# If using worktrees, we need to find $GIT_COMMON_DIR; rev-parse
# supports finding it as of vesion 2.5.0; for older versions, we
# use $GIT_DIR.
git_common_dir = _check_output(["git", "rev-parse", "--git-common-dir"]).strip()
if git_common_dir == "--git-common-dir":
git_common_dir = _check_output(["git", "rev-parse", "--git-dir"]).strip()
self.git_common_dir = os.path.realpath(git_common_dir)
assert os.path.exists(self.git_common_dir)
self.git_head_rev = _check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip()
assert self.git_head_rev, \
"Could not get reference to HEAD using git rev-parse --abbrev-ref HEAD."
self.cleanup_containers = cleanup_containers
self.cleanup_image = cleanup_image
self.image = None
if build_image and cleanup_image:
# Refuse to clean up external image.
raise Exception("cleanup_image and build_image cannot be both specified")
self.ccache_dir = ccache_dir
self.log_dir = os.path.join(self.git_root, "logs", "docker",
self.monitoring_output_file = os.path.join(self.log_dir, "metrics.txt")
self.monitor = monitor.ContainerMonitor(self.monitoring_output_file)
self.test_mode = test_mode
self.suite_concurrency = suite_concurrency
self.parallel_test_concurrency = parallel_test_concurrency
self.impalad_mem_limit_bytes = impalad_mem_limit_bytes
self.tail = tail
self.impala_lzo_repo = impala_lzo_repo
self.impala_lzo_ref = impala_lzo_ref
self.env = env
self.base_image = base_image
# Map suites back into objects; we ignore case for this mapping.
suites = []
suites_by_name = {}
for suite in ALL_SUITES:
suites_by_name[] = suite
for suite_name in suite_names:
# If we have enough concurrency, shard some suites into two halves.
suites2 = []
for suite in suites:
if suite.shard_at_concurrency is not None and \
suite_concurrency >= suite.shard_at_concurrency:
suites = suites2
self.suite_runners = [TestSuiteRunner(self, suite) for suite in suites]
def _create_container(self, image, name, logdir, logname, entrypoint, extras=None):
"""Returns a new container.
logdir - subdirectory to create under self.log_dir,
which will get mounted to /logs
logname - name of file in logdir that will be created
extras - extra arguments to pass to docker
entrypoint - entrypoint arguments, as a list.
if extras is None:
extras = []
if self.test_mode:
extras = ["-e", "TEST_TEST_WITH_DOCKER=true"] + extras
# According to localtime(5), /etc/localtime is supposed
# to be a symlink to somewhere inside /usr/share/zoneinfo.
# Note that sometimes the symlink tree may be
# complicated, e.g.:
# /etc/localtime ->
# /usr/share/zoneinfo/America/Los_Angeles -> (readlink)
# ../US/Pacific-New (realpath)
# Using both readlink and realpath should work, but we've
# encountered one scenario (centos:6) where the Java tzdata
# database doesn't have US/Pacific-New, but has America/Los_Angeles.
# This is deemed sufficient to tip the scales to using readlink.
assert os.path.islink("/etc/localtime")
localtime_link_target = os.readlink("/etc/localtime")
assert localtime_link_target.startswith("/usr/share/zoneinfo")
# Workaround for what appears to be
# Namely, if we run too many "docker create" at the same time, one
# of them hangs forever. To avoid the issue, we serialize the invocations
# of "docker create".
with self.docker_lock:
container_id = _check_output([
"docker", "create",
# Required for some of the ntp handling in bootstrap and Kudu;
# requirement may be lifted in newer Docker versions.
"--name", name,
# Whereas the container names vary across containers, we use the same
# hostname repeatedly, so that the build container and the test
# containers have the same hostnames. Kudu errors out with "Remote
# error: Service unavailable: Timed out: could not wait for desired
# snapshot timestamp to be consistent: Tablet is lagging too much to be
# able to serve snapshot scan." if reading with READ_AT_SNAPSHOT
# if the hostnames change underneath it.
# Label with the git root directory for easier cleanup
"--label=pwd=" + self.git_root,
# Consistent locales
"-e", "LC_ALL=C",
# Mount the git directory so that clones can be local.
# We use /repo to have access to certain scripts,
# and we use /git_common_dir to have local clones,
# even when "git worktree" is being used.
"-v", self.git_root + ":/repo:ro",
"-v", self.git_common_dir + ":/git_common_dir:ro",
"-e", "GIT_HEAD_REV=" + self.git_head_rev,
"-e", "IMPALA_LZO_REPO=" + self.impala_lzo_repo,
"-e", "IMPALA_LZO_REF=" + self.impala_lzo_ref,
# Share timezone between host and container
"-e", "LOCALTIME_LINK_TARGET=" + localtime_link_target,
"-v", self.ccache_dir + ":/ccache",
"-e", "CCACHE_TEMPDIR=" + "/ccache/" + name,
"-v", _make_dir_if_not_exist(self.log_dir,
logdir) + ":/logs",
"-v", base + ":/mnt/base:ro"]
+ list(itertools.chain(*[["-e", env] for env in self.env]))
+ extras
+ [image]
+ entrypoint).strip()
ctr = Container(name=name, id_=container_id,
logfile=os.path.join(self.log_dir, logdir, logname))"Created container %s", ctr)
return ctr
def _run_container(self, container):
"""Runs container, and returns True if the container had a successful exit value.
This blocks while the container is running. The container output is
run through to add timestamps and saved into the container's log file.
container.running = True
tailer = None
with open(container.logfile, "aw") as log_output:
if self.tail:
tailer = subprocess.Popen(
["tail", "-f", "--pid", str(os.getpid()), "-v", container.logfile])
container.start = time.time()
# Sets up a "docker start ... | > logfile" pipeline using
# subprocess.
annotate = subprocess.Popen(
[os.path.join(self.git_root, "docker", "")],
stderr=log_output)"Starting container %s; logging to %s",,
docker = subprocess.Popen(["docker", "start", "--attach",],
stdin=None, stdout=annotate.stdin, stderr=annotate.stdin)
ret = docker.wait()
annotate.wait()"Container %s returned %s", container, ret)
container.exitcode = ret
container.running = False
container.end = time.time()
if tailer:
return ret == 0
def _stop_container(container):
"""Stops container. Ignores errors (e.g., if it's already exited)."""
if container.running:
_call(["docker", "stop",], check=False)
container.end = time.time()
container.running = False
def _rm_container(container):
"""Removes container."""
# We can have multiple threads trying to call "docker rm" on a container.
# Docker will fail one of those with "already running", but we actually
# want to block until it's removed. Using a lock to serialize the "docker # rm"
# calls handles that.
with container.lock:
if not container.removed:
_call(["docker", "rm",], check=False)
container.removed = True
def _create_build_image(self):
"""Creates the "build image", with Impala compiled and data loaded."""
container = self._create_container(
image=self.base_image, + "-build",
# will create a user with our uid; this
# allows the shared file systems to work seamlessly
entrypoint=["/mnt/base/", "build", str(os.getuid())])
try:"Docker container for build: %s", container)
_check_output(["docker", "start",])
if not self._run_container(container):
raise Exception("Build container failed.")"Committing docker container.")
self.image = _check_output(
["docker", "commit",
"-c", "LABEL pwd=" + self.git_root,
"-c", "USER impdev",
"-c", "WORKDIR /home/impdev/Impala",
"-c", 'CMD ["/home/impdev/Impala/docker/", "shell"]',, "impala:built-" +]).strip()"Committed docker image: %s", self.image)
if self.cleanup_containers:
def _run_tests(self):
pool = multiprocessing.pool.ThreadPool(processes=self.suite_concurrency)
outstanding_suites = []
for suite in self.suite_runners:
suite.task = pool.apply_async(
ret = True
while len(outstanding_suites) > 0:
for suite in list(outstanding_suites):
if suite.timed_out():
msg = "Task %s not finished within timeout %s" % (,
raise Exception(msg)
task = suite.task
if task.ready():
this_task_ret = task.get()
if this_task_ret:"Suite %s succeeded.",
else:"Suite %s failed.",
ret = False
except KeyboardInterrupt:"\n\nDetected KeyboardInterrupt; shutting down!\n\n")
return ret
def run(self):
# Create logs directories and ccache dir.
if not self.build_image:
self.image = self.build_image
ret = self._run_tests()
return ret
if self.cleanup_containers:
for c in self.containers:
if self.cleanup_image and self.image:
_call(["docker", "rmi", self.image], check=False)"Memory usage: %s GB min, %s GB max",
# Strings (really, regular expressions) pulled out into to the visual timeline.
">>> ",
_INTERESTING_RE = re.compile("|".join("(%s)" % (s,) for s in _INTERESTING_STRINGS))
def create_timeline(self):
"""Creates timeline into log directory."""
timeline = monitor.Timeline(
timeline.create(os.path.join(self.log_dir, "timeline.html"))
def log_summary(self):"Containers:")
def to_success_string(exitcode):
if exitcode == 0:
return "SUCCESS"
return "FAILURE"
for c in self.containers:"%s %s %s %0.1fm wall, %0.1fm user, %0.1fm system, " +
"%0.1fx parallelism, %0.1f GB peak RSS",
to_success_string(c.exitcode),, c.logfile,
c.runtime_seconds() / 60.0,
c.total_user_cpu / 60.0,
c.total_system_cpu / 60.0,
(c.total_user_cpu + c.total_system_cpu) / max(c.runtime_seconds(), 0.0001),
c.peak_total_rss / 1024.0 / 1024.0 / 1024.0)
class TestSuiteRunner(object):
"""Runs a single test suite."""
def __init__(self, test_with_docker, suite):
self.test_with_docker = test_with_docker
self.suite = suite
self.task = None =
# Set at the beginning of run and facilitates enforcing timeouts
# for individual suites.
self.deadline = None
def timed_out(self):
return self.deadline is not None and time.time() > self.deadline
def run(self):
"""Runs given test. Returns true on success, based on exit code."""
self.deadline = time.time() + self.suite.timeout_minutes * 60
test_with_docker = self.test_with_docker
suite = self.suite
envs = ["-e", "NUM_CONCURRENT_TESTS=" + str(test_with_docker.parallel_test_concurrency)]
for k, v in sorted(suite.envs.iteritems()):
envs.append("%s=%s" % (k, v))
self.start = time.time()
# io-file-mgr-test expects a real-ish file system at /tmp;
# we mount a temporary directory into the container to appease it.
tmpdir = tempfile.mkdtemp( + "-" +
os.chmod(tmpdir, 01777)
# Container names are sometimes used as hostnames, and DNS names shouldn't
# have underscores.
container_name = + "-" +"_", "-")
container = test_with_docker._create_container(
"-v", tmpdir + ":/tmp",
"-u", str(os.getuid())
] + envs,,
logname="log-test-" + + ".txt",
entrypoint=["/mnt/base/", "test_suite",])
return test_with_docker._run_container(container)
return False
finally:"Cleaning up containers for %s" % (,))
if test_with_docker.cleanup_containers:
if __name__ == "__main__":