| #!/usr/bin/env python2 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # This tool allows tests to be submitted to a distributed testing |
| # service running on shared infrastructure. |
| # |
| # See dist_test.py --help for usage information. |
| |
| import argparse |
| import glob |
| try: |
| import simplejson as json |
| except: |
| import json |
| import logging |
| import os |
| import pprint |
| import re |
| import sys |
| import shlex |
| import shutil |
| import subprocess |
| import time |
| |
| TEST_TIMEOUT_SECS = int(os.environ.get('TEST_TIMEOUT_SECS', '900')) |
| ARTIFACT_ARCHIVE_GLOBS = ["build/*/test-logs/**/*"] |
| ISOLATE_SERVER = os.environ.get('ISOLATE_SERVER', |
| "http://isolate.cloudera.org:4242/") |
| DIST_TEST_HOME = os.environ.get('DIST_TEST_HOME', |
| os.path.expanduser("~/dist_test")) |
| |
| # The number of times that flaky tests will be retried. |
| # Our non-distributed implementation sets a number of _attempts_, not a number |
| # of retries, so we have to subtract 1. |
| FLAKY_TEST_RETRIES = int(os.environ.get('KUDU_FLAKY_TEST_ATTEMPTS', 1)) - 1 |
| |
| PATH_TO_REPO = "../" |
| |
| TEST_COMMAND_RE = re.compile('Test command: (.+)$') |
| LDD_RE = re.compile(r'^\s+.+? => (\S+) \(0x.+\)') |
| |
| DEPS_FOR_ALL = \ |
| ["build-support/stacktrace_addr2line.pl", |
| "build-support/run-test.sh", |
| "build-support/run_dist_test.py", |
| "build-support/tsan-suppressions.txt", |
| "build-support/lsan-suppressions.txt", |
| |
| # The LLVM symbolizer is necessary for suppressions to work |
| "thirdparty/installed/uninstrumented/bin/llvm-symbolizer", |
| |
| # Tests that use the external minicluster require these. |
| # TODO: declare these dependencies per-test. |
| "build/latest/bin/kudu-tserver", |
| "build/latest/bin/kudu-master", |
| |
| # parser-test requires these data files. |
| # TODO: again, we should do this with some per-test metadata file. |
| # TODO: these are broken now that we separate source and build trees. |
| #".../example-deletes.txt", |
| #".../example-tweets.txt", |
| |
| # Tests that require tooling require this. |
| "build/latest/bin/kudu", |
| ] |
| |
| # The number of shards to split tests into. This is set on a per-test basis |
| # since it's only worth doing when a test has lots of separate cases and |
| # more than one of them runs relatively long. |
| NUM_SHARDS_BY_TEST = { |
| 'cfile-test': 4, |
| 'client-test': 8, |
| 'delete_table-test': 8, |
| 'flex_partitioning-itest': 8, |
| 'mt-tablet-test': 4, |
| 'raft_consensus-itest': 8 |
| } |
| |
| |
| class StagingDir(object): |
| @staticmethod |
| def new(): |
| dir = rel_to_abs("build/isolate") |
| if os.path.isdir(dir): |
| shutil.rmtree(dir) |
| os.makedirs(dir) |
| return StagingDir(dir) |
| |
| def __init__(self, dir): |
| self.dir = dir |
| |
| def archive_dump_path(self): |
| return os.path.join(self.dir, "dump.json") |
| |
| def gen_json_paths(self): |
| return glob.glob(os.path.join(self.dir, "*.gen.json")) |
| |
| def tasks_json_path(self): |
| return os.path.join(self.dir, "tasks.json") |
| |
| |
| def rel_to_abs(rel_path): |
| dirname, _ = os.path.split(os.path.abspath(__file__)) |
| abs = os.path.abspath(os.path.join(dirname, PATH_TO_REPO, rel_path)) |
| if rel_path.endswith('/') and not abs.endswith('/'): |
| abs += '/' |
| return abs |
| |
| |
| def abs_to_rel(abs_path, staging): |
| rel = os.path.relpath(abs_path, staging.dir) |
| if abs_path.endswith('/') and not rel.endswith('/'): |
| rel += '/' |
| return rel |
| |
| |
| def get_test_commandlines(): |
| ctest_bin = os.path.join(rel_to_abs("thirdparty/installed/common/bin/ctest")) |
| p = subprocess.Popen([ctest_bin, "-V", "-N", "-LE", "no_dist_test"], stdout=subprocess.PIPE) |
| out, err = p.communicate() |
| if p.returncode != 0: |
| print >>sys.stderr, "Unable to list tests with ctest" |
| sys.exit(1) |
| lines = out.splitlines() |
| commands = [] |
| for l in lines: |
| m = TEST_COMMAND_RE.search(l) |
| if not m: |
| continue |
| commands.append(shlex.split(m.group(1))) |
| return commands |
| |
| |
| def is_lib_blacklisted(lib): |
| # These particular system libraries, we should ship to the remote nodes. |
| # No need to ship things like libc, libstdcxx, etc. |
| if "oauth" in lib: |
| return False |
| if lib.startswith("/lib") or lib.startswith("/usr"): |
| return True |
| return False |
| |
| |
| def is_outside_of_tree(path): |
| repo_dir = rel_to_abs("./") |
| rel = os.path.relpath(path, repo_dir) |
| return rel.startswith("../") |
| |
| def copy_system_library(lib): |
| """ |
| For most system libraries, we expect them to be installed on the test |
| machines. However, a couple are shipped from the submitter machine |
| to the cluster by putting them in a special directory inside the |
| isolated build tree. |
| |
| This function copies such libraries into that directory. |
| """ |
| sys_lib_dir = rel_to_abs("build/dist-test-system-libs") |
| if not os.path.exists(sys_lib_dir): |
| os.makedirs(sys_lib_dir) |
| dst = os.path.join(sys_lib_dir, os.path.basename(lib)) |
| # Copy if it doesn't exist, or the mtimes don't match. |
| # Using shutil.copy2 preserves the mtime after the copy (like cp -p) |
| if not os.path.exists(dst) or os.stat(dst).st_mtime != os.stat(lib).st_mtime: |
| logging.info("Copying system library %s to %s...", lib, dst) |
| shutil.copy2(rel_to_abs(lib), dst) |
| return dst |
| |
| |
| def ldd_deps(exe): |
| """ |
| Runs 'ldd' on the provided 'exe' path, returning a list of |
| any libraries it depends on. Blacklisted libraries are |
| removed from this list. |
| |
| If the provided 'exe' is not a binary executable, returns |
| an empty list. |
| """ |
| if (exe.endswith(".pl") or |
| exe.endswith(".py") or |
| exe.endswith(".sh") or |
| exe.endswith(".txt")): |
| return [] |
| p = subprocess.Popen(["ldd", exe], stdout=subprocess.PIPE) |
| out, err = p.communicate() |
| if p.returncode != 0: |
| print >>sys.stderr, "failed to run ldd on ", exe |
| return [] |
| ret = [] |
| for l in out.splitlines(): |
| m = LDD_RE.match(l) |
| if not m: |
| continue |
| lib = m.group(1) |
| if is_lib_blacklisted(lib): |
| continue |
| path = m.group(1) |
| ret.append(m.group(1)) |
| |
| # ldd will often point to symlinks. We need to upload the symlink |
| # as well as whatever it's pointing to, recursively. |
| while os.path.islink(path): |
| path = os.path.join(os.path.dirname(path), os.readlink(path)) |
| ret.append(path) |
| return ret |
| |
| |
| def create_archive_input(staging, argv, |
| disable_sharding=False, |
| collect_tmpdir=False): |
| """ |
| Generates .gen.json and .isolate files corresponding to the |
| test command 'argv'. The outputs are placed in the specified |
| staging directory. |
| |
| Some larger tests are automatically sharded into several tasks. |
| If 'disable_sharding' is True, this behavior will be suppressed. |
| """ |
| if not argv[0].endswith('run-test.sh') or len(argv) < 2: |
| print >>sys.stderr, "Unable to handle test: ", argv |
| return |
| test_name = os.path.basename(argv[1]) |
| abs_test_exe = os.path.realpath(argv[1]) |
| rel_test_exe = abs_to_rel(abs_test_exe, staging) |
| argv[1] = rel_test_exe |
| files = [] |
| files.append(rel_test_exe) |
| deps = ldd_deps(abs_test_exe) |
| for d in DEPS_FOR_ALL: |
| d = os.path.realpath(rel_to_abs(d)) |
| if os.path.isdir(d): |
| d += "/" |
| deps.append(d) |
| # DEPS_FOR_ALL may include binaries whose dependencies are not dependencies |
| # of the test executable. We must include those dependencies in the archive |
| # for the binaries to be usable. |
| deps.extend(ldd_deps(d)) |
| |
| # Deduplicate dependencies included via DEPS_FOR_ALL. |
| for d in set(deps): |
| # System libraries will end up being relative paths out |
| # of the build tree. We need to copy those into the build |
| # tree somewhere. |
| if is_outside_of_tree(d): |
| d = copy_system_library(d) |
| files.append(abs_to_rel(d, staging)) |
| |
| if disable_sharding: |
| num_shards = 1 |
| else: |
| num_shards = NUM_SHARDS_BY_TEST.get(test_name, 1) |
| for shard in xrange(0, num_shards): |
| out_archive = os.path.join(staging.dir, '%s.%d.gen.json' % (test_name, shard)) |
| out_isolate = os.path.join(staging.dir, '%s.%d.isolate' % (test_name, shard)) |
| |
| command = ['../../build-support/run_dist_test.py', |
| '-e', 'GTEST_SHARD_INDEX=%d' % shard, |
| '-e', 'GTEST_TOTAL_SHARDS=%d' % num_shards, |
| '-e', 'KUDU_TEST_TIMEOUT=%d' % (TEST_TIMEOUT_SECS - 30), |
| '-e', 'KUDU_ALLOW_SLOW_TESTS=%s' % os.environ.get('KUDU_ALLOW_SLOW_TESTS', 1), |
| '-e', 'KUDU_COMPRESS_TEST_OUTPUT=%s' % \ |
| os.environ.get('KUDU_COMPRESS_TEST_OUTPUT', 0)] |
| if collect_tmpdir: |
| command += ["--collect-tmpdir"] |
| command.append('--') |
| command += argv[1:] |
| |
| archive_json = dict(args=["-i", out_isolate, |
| "-s", out_isolate + "d"], |
| dir=rel_to_abs("."), |
| name='%s.%d/%d' % (test_name, shard + 1, num_shards), |
| version=1) |
| isolate_dict = dict(variables=dict(command=command, |
| files=files)) |
| with open(out_archive, "w") as f: |
| json.dump(archive_json, f) |
| with open(out_isolate, "w") as f: |
| pprint.pprint(isolate_dict, f) |
| |
| |
| def create_task_json(staging, |
| replicate_tasks=1, |
| flaky_test_set=set()): |
| """ |
| Create a task JSON file suitable for submitting to the distributed |
| test execution service. |
| |
| If 'replicate_tasks' is higher than one, each .isolate file will be |
| submitted multiple times. This can be useful for looping tests. |
| """ |
| tasks = [] |
| with file(staging.archive_dump_path(), "r") as isolate_dump: |
| inmap = json.load(isolate_dump) |
| |
| # Some versions of 'isolate batcharchive' directly list the items in |
| # the dumped JSON. Others list it in an 'items' dictionary. |
| items = inmap.get('items', inmap) |
| for k, v in items.iteritems(): |
| # The key is 'foo-test.<shard>'. So, chop off the last component |
| # to get the test name |
| test_name = ".".join(k.split(".")[:-1]) |
| max_retries = 0 |
| if test_name in flaky_test_set: |
| max_retries = FLAKY_TEST_RETRIES |
| |
| tasks += [{"isolate_hash": str(v), |
| "description": str(k), |
| "artifact_archive_globs": ARTIFACT_ARCHIVE_GLOBS, |
| "timeout": TEST_TIMEOUT_SECS + 30, |
| "max_retries": max_retries |
| }] * replicate_tasks |
| |
| outmap = {"tasks": tasks} |
| |
| with file(staging.tasks_json_path(), "wt") as f: |
| json.dump(outmap, f) |
| |
| |
| def run_isolate(staging): |
| """ |
| Runs 'isolate batcharchive' to archive all of the .gen.json files in |
| the provided staging directory. |
| |
| Throws an exception if the call fails. |
| """ |
| isolate_path = "isolate" |
| try: |
| subprocess.check_call([isolate_path, |
| 'batcharchive', |
| '-isolate-server=' + ISOLATE_SERVER, |
| '-dump-json=' + staging.archive_dump_path(), |
| '--'] + staging.gen_json_paths()) |
| except: |
| print >>sys.stderr, "Failed to run", isolate_path |
| raise |
| |
| def submit_tasks(staging, options): |
| """ |
| Runs the distributed testing tool to submit the tasks in the |
| provided staging directory. |
| |
| This requires that the tasks JSON file has already been generated |
| by 'create_task_json()'. |
| """ |
| if not os.path.exists(DIST_TEST_HOME): |
| print >>sys.stderr, "Cannot find dist_test tools at path %s " \ |
| "Set the DIST_TEST_HOME environment variable to the path to the dist_test directory. " \ |
| % DIST_TEST_HOME, |
| raise OSError("Cannot find path to dist_test tools") |
| client_py_path = os.path.join(DIST_TEST_HOME, "bin", "client") |
| try: |
| cmd = [client_py_path, "submit"] |
| if options.no_wait: |
| cmd.append('--no-wait') |
| cmd.append(staging.tasks_json_path()) |
| subprocess.check_call(cmd) |
| except: |
| print >>sys.stderr, "Failed to run", client_py_path |
| raise |
| |
| def get_flakies(): |
| path = os.getenv('KUDU_FLAKY_TEST_LIST') |
| if not path: |
| return set() |
| return set(l.strip() for l in file(path)) |
| |
| def run_all_tests(parser, options): |
| """ |
| Gets all of the test command lines from 'ctest', isolates them, |
| creates a task list, and submits the tasks to the testing service. |
| """ |
| commands = get_test_commandlines() |
| staging = StagingDir.new() |
| for command in commands: |
| create_archive_input(staging, command, |
| disable_sharding=options.disable_sharding, |
| collect_tmpdir=options.collect_tmpdir) |
| run_isolate(staging) |
| create_task_json(staging, flaky_test_set=get_flakies()) |
| submit_tasks(staging, options) |
| |
| def add_run_all_subparser(subparsers): |
| p = subparsers.add_parser('run-all', help='Run all of the dist-test-enabled tests') |
| p.set_defaults(func=run_all_tests) |
| |
| def loop_test(parser, options): |
| """ |
| Runs many instances of a user-provided test case on the testing service. |
| """ |
| if options.num_instances < 1: |
| parser.error("--num-instances must be >= 1") |
| command = ["run-test.sh", options.cmd] + options.args |
| staging = StagingDir.new() |
| create_archive_input(staging, command, |
| disable_sharding=options.disable_sharding, |
| collect_tmpdir=options.collect_tmpdir) |
| run_isolate(staging) |
| create_task_json(staging, options.num_instances) |
| submit_tasks(staging, options) |
| |
| def add_loop_test_subparser(subparsers): |
| p = subparsers.add_parser('loop', help='Run many instances of the same test', |
| epilog="if passing arguments to the test, you may want to use a '--' " + |
| "argument before <test-path>. e.g: loop -- foo-test --gtest_opt=123") |
| p.add_argument("--num-instances", "-n", dest="num_instances", type=int, |
| help="number of test instances to start", metavar="NUM", |
| default=100) |
| p.add_argument("cmd", help="test binary") |
| p.add_argument("args", nargs=argparse.REMAINDER, help="test arguments") |
| p.set_defaults(func=loop_test) |
| |
| |
| def main(argv): |
| logging.basicConfig(level=logging.INFO) |
| p = argparse.ArgumentParser() |
| p.add_argument("--disable-sharding", dest="disable_sharding", action="store_true", |
| help="Disable automatic sharding of tests", default=False) |
| p.add_argument("--collect-tmpdir", dest="collect_tmpdir", action="store_true", |
| help="Collect the test tmpdir of failed tasks as test artifacts", default=False) |
| p.add_argument("--no-wait", dest="no_wait", action="store_true", |
| help="Return without waiting for the job to complete", default=False) |
| sp = p.add_subparsers() |
| add_loop_test_subparser(sp) |
| add_run_all_subparser(sp) |
| args = p.parse_args(argv) |
| args.func(p, args) |
| |
| |
| if __name__ == "__main__": |
| main(sys.argv[1:]) |