blob: b2e7db91b10b6fa2e5d9f09eab5ddbb8e36ecd98 [file] [log] [blame]
#!/usr/bin/env python2
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This tool allows tests to be submitted to a distributed testing
# service running on shared infrastructure.
#
# See dist_test.py --help for usage information.
import argparse
from collections import deque
import glob
try:
import simplejson as json
except:
import json
import logging
import os
import pprint
import re
import sys
import shlex
import shutil
import subprocess
import time
from kudu_util import init_logging
TEST_TIMEOUT_SECS = int(os.environ.get('TEST_TIMEOUT_SECS', '900'))
ARTIFACT_ARCHIVE_GLOBS = ["build/*/test-logs/**/*"]
ISOLATE_SERVER = os.environ.get('ISOLATE_SERVER',
"http://isolate.cloudera.org:4242/")
DIST_TEST_HOME = os.environ.get('DIST_TEST_HOME',
os.path.expanduser("~/dist_test"))
# Put some limit so someone doesn't accidentally try to loop all of the
# tests 10,000 times and cost a bunch of money. If someone really has a good
# reason to do this, they are can always edit this constant locally.
MAX_TASKS_PER_JOB=10000
# The number of times that flaky tests will be retried.
# Our non-distributed implementation sets a number of _attempts_, not a number
# of retries, so we have to subtract 1.
FLAKY_TEST_RETRIES = int(os.environ.get('KUDU_FLAKY_TEST_ATTEMPTS', 1)) - 1
# Whether to retry all failed C++ tests, rather than just known flaky tests.
# Since Java flaky tests are not reported by the test server, Java tests are
# always retried, regardless of this value.
RETRY_ALL_TESTS = int(os.environ.get('KUDU_RETRY_ALL_FAILED_TESTS', 0))
# Flags to include when running Gradle tasks
GRADLE_FLAGS = os.environ.get('EXTRA_GRADLE_FLAGS', "")
PATH_TO_REPO = "../"
# Matches the command line listings in 'ctest -V -N'. For example:
# 262: Test command: /src/kudu/build-support/run-test.sh "/src/kudu/build/debug/bin/jsonwriter-test"
TEST_COMMAND_RE = re.compile('Test command: (.+)$')
# Matches the environment variable listings in 'ctest -V -N'. For example:
# 262: GTEST_TOTAL_SHARDS=1
TEST_ENV_RE = re.compile('^\d+: (\S+)=(.+)')
# Matches the output lines of 'ldd'. For example:
# libcrypto.so.10 => /path/to/usr/lib64/libcrypto.so.10 (0x00007fb0cb0a5000)
LDD_RE = re.compile(r'^\s+.+? => (\S+) \(0x.+\)')
DEPS_FOR_ALL = \
["build-support/stacktrace_addr2line.pl",
"build-support/run-test.sh",
"build-support/run_dist_test.py",
"build-support/java-home-candidates.txt",
# The LLVM symbolizer is necessary for suppressions to work
"thirdparty/installed/uninstrumented/bin/llvm-symbolizer",
# Tests that use the external minicluster require these.
# TODO: declare these dependencies per-test.
"build/latest/bin/kudu-tserver",
"build/latest/bin/kudu-master",
# Tests that require tooling require this.
"build/latest/bin/kudu",
# The HMS and Sentry tests require the Hadoop, Hive, and Sentry libraries.
# These files are just symlinks, but dist-test will copy the entire
# directories they point to. The symlinks themselves won't be recreated,
# so we point to them with environment variables in run_dist_test.py.
"build/latest/bin/hive-home",
"build/latest/bin/hadoop-home",
"build/latest/bin/sentry-home",
# Add the Kudu HMS plugin.
"build/latest/bin/hms-plugin.jar",
]
class StagingDir(object):
@staticmethod
def new():
dir = rel_to_abs("build/isolate")
if os.path.isdir(dir):
shutil.rmtree(dir)
os.makedirs(dir)
return StagingDir(dir)
def __init__(self, dir):
self.dir = dir
def archive_dump_path(self):
return os.path.join(self.dir, "dump.json")
def gen_json_paths(self):
return glob.glob(os.path.join(self.dir, "*.gen.json"))
def tasks_json_path(self):
return os.path.join(self.dir, "tasks.json")
class TestExecution(object):
"""
An individual test execution that will be run.
One instance exists for each shard of a test case.
"""
def __init__(self, argv=None, env=None):
self.argv = argv or []
self.env = env or {}
@property
def test_name(self):
return "%s.%d" % (os.path.basename(self.argv[1]), self.shard())
def shard(self):
return int(self.env.get("GTEST_SHARD_INDEX", "0"))
def rel_to_abs(rel_path):
dirname, _ = os.path.split(os.path.abspath(__file__))
abs = os.path.abspath(os.path.join(dirname, PATH_TO_REPO, rel_path))
if rel_path.endswith('/') and not abs.endswith('/'):
abs += '/'
return abs
def abs_to_rel(abs_path, staging):
rel = os.path.relpath(abs_path, staging.dir)
if abs_path.endswith('/') and not rel.endswith('/'):
rel += '/'
return rel
def get_test_executions(options):
"""
Return an array of TestExecution objects.
"""
ctest_bin = os.path.join(rel_to_abs("thirdparty/installed/common/bin/ctest"))
ctest_argv = [ctest_bin, "-V", "-N", "-LE", "no_dist_test"]
if options.tests_regex:
ctest_argv.extend(['-R', options.tests_regex])
p = subprocess.Popen(ctest_argv,
stdout=subprocess.PIPE,
cwd=rel_to_abs("build/latest"))
out, err = p.communicate()
if p.returncode != 0:
logging.error("Unable to list tests with ctest")
sys.exit(1)
lines = deque(out.splitlines())
execs = []
# Output looks like:
# 262: Test command: /src/kudu/build-support/run-test.sh "/src/kudu/build/debug/bin/jsonwriter-test"
# 262: Environment variables:
# 262: KUDU_TEST_TIMEOUT=900
# 262: GTEST_TOTAL_SHARDS=1
# 262: GTEST_SHARD_INDEX=0
# Test #262: jsonwriter-test.0
#
# 263: Test command ...
# ...
while lines:
# Advance to the beginning of the next test block.
m = None
while lines and not m:
m = TEST_COMMAND_RE.search(lines.popleft())
if not m:
break
argv = shlex.split(m.group(1))
# Next line should b the 'Environment variables' heading
l = lines.popleft()
if "Environment variables:" not in l:
raise Exception("Unexpected line in ctest -V output: %s" % l)
# Following lines should be environment variable pairs.
env = {}
while lines:
m = TEST_ENV_RE.match(lines[0])
if not m:
break
lines.popleft()
env[m.group(1)] = m.group(2)
execs.append(TestExecution(argv=argv, env=env))
return execs
def is_lib_blacklisted(lib):
# No need to ship things like libc, libstdcxx, etc.
if lib.startswith("/lib") or lib.startswith("/usr"):
return True
return False
def get_base_deps():
deps = []
for d in DEPS_FOR_ALL:
d = os.path.realpath(rel_to_abs(d))
if os.path.isdir(d):
d += "/"
deps.append(d)
# DEPS_FOR_ALL may include binaries whose dependencies are not dependencies
# of the test executable. We must include those dependencies in the archive
# for the binaries to be usable.
deps.extend(ldd_deps(d))
return deps
def is_outside_of_tree(path):
repo_dir = rel_to_abs("./")
rel = os.path.relpath(path, repo_dir)
return rel.startswith("../")
def copy_system_library(lib):
"""
For most system libraries, we expect them to be installed on the test
machines. However, a couple are shipped from the submitter machine
to the cluster by putting them in a special directory inside the
isolated build tree.
This function copies such libraries into that directory.
"""
sys_lib_dir = rel_to_abs("build/dist-test-system-libs")
if not os.path.exists(sys_lib_dir):
os.makedirs(sys_lib_dir)
dst = os.path.join(sys_lib_dir, os.path.basename(lib))
# Copy if it doesn't exist, or the mtimes don't match.
# Using shutil.copy2 preserves the mtime after the copy (like cp -p)
if not os.path.exists(dst) or os.stat(dst).st_mtime != os.stat(lib).st_mtime:
logging.info("Copying system library %s to %s...", lib, dst)
shutil.copy2(rel_to_abs(lib), dst)
return dst
LDD_CACHE={}
def ldd_deps(exe):
"""
Runs 'ldd' on the provided 'exe' path, returning a list of
any libraries it depends on. Blacklisted libraries are
removed from this list.
If the provided 'exe' is not a binary executable, returns
an empty list.
"""
if (exe.endswith(".jar") or
exe.endswith(".pl") or
exe.endswith(".py") or
exe.endswith(".sh") or
exe.endswith(".txt") or
os.path.isdir(exe)):
return []
if exe not in LDD_CACHE:
p = subprocess.Popen(["ldd", exe], stdout=subprocess.PIPE)
out, err = p.communicate()
LDD_CACHE[exe] = (out, err, p.returncode)
out, err, rc = LDD_CACHE[exe]
if rc != 0:
logging.warning("failed to run ldd on %s", exe)
return []
ret = []
for l in out.splitlines():
m = LDD_RE.match(l)
if not m:
continue
lib = m.group(1)
if is_lib_blacklisted(lib):
continue
path = m.group(1)
ret.append(m.group(1))
# ldd will often point to symlinks. We need to upload the symlink
# as well as whatever it's pointing to, recursively.
while os.path.islink(path):
path = os.path.join(os.path.dirname(path), os.readlink(path))
ret.append(path)
return ret
def create_archive_input(staging, execution,
collect_tmpdir=False):
"""
Generates .gen.json and .isolate files corresponding to the
test 'execution', which must be a TestExecution instance.
The outputs are placed in the specified staging directory.
"""
argv = execution.argv
if not argv[0].endswith('run-test.sh') or len(argv) < 2:
logging.warning("Unable to handle test: %s", argv)
return
abs_test_exe = os.path.realpath(argv[1])
rel_test_exe = abs_to_rel(abs_test_exe, staging)
argv[1] = rel_test_exe
files = []
files.append(rel_test_exe)
deps = ldd_deps(abs_test_exe)
deps.extend(get_base_deps())
# Deduplicate dependencies included via DEPS_FOR_ALL.
for d in set(deps):
# System libraries will end up being relative paths out
# of the build tree. We need to copy those into the build
# tree somewhere.
if is_outside_of_tree(d):
d = copy_system_library(d)
files.append(abs_to_rel(d, staging))
# Add data file dependencies.
if 'KUDU_DATA_FILES' in execution.env:
for data_file in execution.env['KUDU_DATA_FILES'].split(","):
# Paths are relative to the test binary.
path = os.path.join(os.path.dirname(abs_test_exe), data_file)
files.append(abs_to_rel(path, staging))
out_archive = os.path.join(staging.dir, '%s.gen.json' % (execution.test_name))
out_isolate = os.path.join(staging.dir, '%s.isolate' % (execution.test_name))
command = ['../../build-support/run_dist_test.py',
'-e', 'KUDU_TEST_TIMEOUT=%d' % (TEST_TIMEOUT_SECS - 30),
'-e', 'KUDU_ALLOW_SLOW_TESTS=%s' % os.environ.get('KUDU_ALLOW_SLOW_TESTS', 1),
'-e', 'KUDU_COMPRESS_TEST_OUTPUT=%s' % \
os.environ.get('KUDU_COMPRESS_TEST_OUTPUT', 0)]
for k, v in execution.env.iteritems():
if k == 'KUDU_TEST_TIMEOUT':
# Currently we don't respect the test timeouts specified in ctest, since
# we want to make sure that the dist-test task timeout and the
# underlying test timeout are coordinated.
continue
command.extend(['-e', '%s=%s' % (k, v)])
if collect_tmpdir:
command += ["--collect-tmpdir"]
command.append('--')
command += argv[1:]
archive_json = dict(args=["-i", out_isolate,
"-s", out_isolate + "d"],
dir=rel_to_abs("."),
version=1)
isolate_dict = dict(variables=dict(command=command,
files=files))
with open(out_archive, "w") as f:
json.dump(archive_json, f)
with open(out_isolate, "w") as f:
pprint.pprint(isolate_dict, f)
def create_task_json(staging,
replicate_tasks=1,
flaky_test_set=set(),
retry_all_tests=False):
"""
Create a task JSON file suitable for submitting to the distributed
test execution service.
If 'replicate_tasks' is higher than one, each .isolate file will be
submitted multiple times. This can be useful for looping tests.
"""
tasks = []
with file(staging.archive_dump_path(), "r") as isolate_dump:
inmap = json.load(isolate_dump)
# Some versions of 'isolate batcharchive' directly list the items in
# the dumped JSON. Others list it in an 'items' dictionary.
items = inmap.get('items', inmap)
for k, v in items.iteritems():
# The key is 'foo-test.<shard>'. So, chop off the last component
# to get the test name
test_name = ".".join(k.split(".")[:-1])
max_retries = 0
if test_name in flaky_test_set or retry_all_tests:
max_retries = FLAKY_TEST_RETRIES
tasks += [{"isolate_hash": str(v),
"description": str(k),
"artifact_archive_globs": ARTIFACT_ARCHIVE_GLOBS,
"timeout": TEST_TIMEOUT_SECS + 30,
"max_retries": max_retries
}] * replicate_tasks
if len(tasks) > MAX_TASKS_PER_JOB:
logging.error("Job contains %d tasks which is more than the maximum %d",
len(tasks), MAX_TASKS_PER_JOB)
sys.exit(1)
outmap = {"tasks": tasks}
with file(staging.tasks_json_path(), "wt") as f:
json.dump(outmap, f)
def run_isolate(staging):
"""
Runs 'isolate batcharchive' to archive all of the .gen.json files in
the provided staging directory.
Throws an exception if the call fails.
"""
isolate_path = "isolate"
try:
subprocess.check_call([isolate_path,
'batcharchive',
'-isolate-server=' + ISOLATE_SERVER,
'-dump-json=' + staging.archive_dump_path(),
'--'] + staging.gen_json_paths())
except:
logging.error("Failed to run %s", isolate_path)
raise
def submit_tasks(staging, options):
"""
Runs the distributed testing tool to submit the tasks in the
provided staging directory.
This requires that the tasks JSON file has already been generated
by 'create_task_json()'.
"""
if not os.path.exists(DIST_TEST_HOME):
logging.error("Cannot find dist_test tools at path %s " \
"Set the DIST_TEST_HOME environment variable to the path to the dist_test directory. ",
DIST_TEST_HOME)
raise OSError("Cannot find path to dist_test tools")
client_py_path = os.path.join(DIST_TEST_HOME, "bin", "client")
try:
cmd = [client_py_path, "submit"]
if options.no_wait:
cmd.append('--no-wait')
cmd.append(staging.tasks_json_path())
subprocess.check_call(cmd)
except:
logging.error("Failed to run %s", client_py_path)
raise
def get_flakies():
path = os.getenv('KUDU_FLAKY_TEST_LIST')
if not path:
return set()
return set(l.strip() for l in file(path))
def run_tests(parser, options):
"""
Gets all of the test command lines from 'ctest', isolates them,
creates a task list, and submits the tasks to the testing service.
"""
executions = get_test_executions(options)
if options.extra_args:
if options.extra_args[0] == '--':
del options.extra_args[0]
for e in executions:
e.argv.extend(options.extra_args)
staging = StagingDir.new()
for execution in executions:
create_archive_input(staging, execution,
collect_tmpdir=options.collect_tmpdir)
run_isolate(staging)
retry_all = RETRY_ALL_TESTS > 0
create_task_json(staging,
flaky_test_set=get_flakies(),
replicate_tasks=options.num_instances,
retry_all_tests=retry_all)
submit_tasks(staging, options)
def add_run_subparser(subparsers):
p = subparsers.add_parser('run', help='Run the dist-test-enabled tests')
p.add_argument("--tests-regex", "-R", dest="tests_regex", type=str,
metavar="REGEX",
help="Only run tests matching regular expression. For example, " +
"'run -R consensus' will run any tests with the word consensus in " +
"their names.")
p.add_argument("--num-instances", "-n", dest="num_instances", type=int,
default=1, metavar="NUM",
help="Number of times to submit each matching test. This can be used to " +
"loop a suite of tests to test for flakiness. Typically this should be used " +
"in conjunction with the --tests-regex option above to select a small number " +
"of tests.")
p.add_argument("extra_args", nargs=argparse.REMAINDER,
help=("Optional arguments to append to the command line for all " +
"submitted tasks. Passing a '--' argument before the list of " +
"arguments to pass may be helpful."))
p.set_defaults(func=run_tests)
def loop_test(parser, options):
"""
Runs many instances of a user-provided test case on the testing service.
"""
if options.num_instances < 1:
parser.error("--num-instances must be >= 1")
execution = TestExecution(["run-test.sh", options.cmd] + options.args)
staging = StagingDir.new()
create_archive_input(staging, execution,
collect_tmpdir=options.collect_tmpdir)
run_isolate(staging)
create_task_json(staging, options.num_instances)
submit_tasks(staging, options)
def add_loop_test_subparser(subparsers):
p = subparsers.add_parser('loop',
help='Run many instances of the same test, specified by its full path',
epilog="NOTE: if you would like to loop an entire suite of tests, you may " +
"prefer to use the 'run' command instead. The 'run' command will automatically " +
"shard bigger test suites into more granular tasks based on the shard count " +
"configured in CMakeLists.txt. For example: " +
"dist_test.py run -R '^raft_consensus-itest' -n 1000")
p.add_argument("--num-instances", "-n", dest="num_instances", type=int,
metavar="NUM",
help="number of test instances to start. If passing arguments to the " +
"test, you may want to use a '--' argument before <test-path>. " +
"e.g: loop -- build/latest/bin/foo-test --gtest_opt=123",
default=100)
p.add_argument("cmd", help="the path to the test binary (e.g. build/latest/bin/foo-test)")
p.add_argument("args", nargs=argparse.REMAINDER, help="test arguments")
p.set_defaults(func=loop_test)
def run_java_tests(parser, options):
subprocess.check_call([rel_to_abs("java/gradlew")] + GRADLE_FLAGS.split() +
["distTest"],
cwd=rel_to_abs("java"))
staging = StagingDir(rel_to_abs("java/build/dist-test"))
run_isolate(staging)
# TODO(ghenke): Add Java tests to the flaky dashboard
# KUDU_FLAKY_TEST_LIST doesn't included Java tests.
# Instead we will retry all Java tests in case they are flaky.
create_task_json(staging, 1, retry_all_tests=True)
submit_tasks(staging, options)
def loop_java_test(parser, options):
"""
Runs many instances of a user-provided Java test class on the testing service.
"""
if options.num_instances < 1:
parser.error("--num-instances must be >= 1")
subprocess.check_call([rel_to_abs("java/gradlew")] + GRADLE_FLAGS.split() +
["distTest", "--classes", "**/%s" % options.pattern],
cwd=rel_to_abs("java"))
staging = StagingDir(rel_to_abs("java/build/dist-test"))
run_isolate(staging)
create_task_json(staging, options.num_instances)
submit_tasks(staging, options)
def add_java_subparser(subparsers):
p = subparsers.add_parser('java', help='Run java tests via dist-test')
sp = p.add_subparsers()
run_all = sp.add_parser("run-all",
help="Run all of the Java tests via dist-test")
run_all.set_defaults(func=run_java_tests)
loop = sp.add_parser("loop", help="Loop a single Java test")
loop.add_argument("--num-instances", "-n", dest="num_instances", type=int,
help="number of test instances to start", metavar="NUM",
default=100)
loop.add_argument("pattern", help="Pattern matching a Java test class to run")
loop.set_defaults(func=loop_java_test)
def dump_base_deps(parser, options):
print json.dumps(get_base_deps())
def add_internal_commands(subparsers):
p = subparsers.add_parser('internal', help="[Internal commands not for users]")
p.add_subparsers().add_parser('dump_base_deps').set_defaults(func=dump_base_deps)
def main(argv):
p = argparse.ArgumentParser()
p.add_argument("--collect-tmpdir", dest="collect_tmpdir", action="store_true",
help="Collect the test tmpdir of failed tasks as test artifacts", default=False)
p.add_argument("--no-wait", dest="no_wait", action="store_true",
help="Return without waiting for the job to complete", default=False)
sp = p.add_subparsers()
add_loop_test_subparser(sp)
add_run_subparser(sp)
add_java_subparser(sp)
add_internal_commands(sp)
args = p.parse_args(argv)
args.func(p, args)
if __name__ == "__main__":
init_logging()
main(sys.argv[1:])