#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

CLI_HELP = """\
Runs tests inside of docker containers, parallelizing different types of
tests. This script first creates a docker container, checks out this repo
into it, bootstraps the container with Impala dependencies, and builds Impala
and its test data.  Then, it saves the resulting container, and launches new
containers to run tests in parallel.  An HTML, visual timeline is generated
as part of the build, in logs/docker/*/timeline.html.
"""

# To execute run:
#   docker/test-with-docker.py
# After waiting for that to finish, inspect results in logs/docker.
#
# Visually, the timeline looks as follows, produced on a 32-core, 100GB RAM
# machine:
# .......                      1h05m Checkout, setup machine, build (8m with ccache),
#                                    generate testdata (52m); missing ccache
#                                    adds about 7 minutes (very sensitive to number
#                                    of available cores)
#        ...                     11m Commit the Docker container
#           .                    10m FE tests
#           .                    10m JDBC tests
#           ....                 45m serial EE tests
#           ......             1h02m cluster tests
#           ...                  31m BE (C++) tests
#           ....                 36m parallel EE tests
# Total time: 2h25m.
#
# CPU usage is sustained high for the parallel EE tests and for
# the C++ compile (when it's not ccache'd), but is otherwise low.
# Because every parallel track consumes memory (a cluster),
# increasing parallelism and memory must be balanced.
#
# Memory usage is thorny. The minicluster memory can
# be tweaked somewhat by how much memory to give to the JVM
# and what to set --mem_limit too. Furthermore, parallel
# cluster tests use more memory when more parallelism happens.
#
# The code that runs inside of the containers is in entrypoint.sh,
# whereas the code that invokes docker is here.
#
# We avoid using Dockerfile and "docker build": they make it hard or impossible
# to cross-mount host directories into containers or use --privileged, and using
# them would require generating them dynamically. They're more trouble than
# they're worth for this use case.
#
# In practice, the containers are about 100GB (with 45GB
# being test data and ~40GB being the tests).
#
# Requirements:
#  * Docker
#    This has been tested on Ubuntu16.04 with Docker
#    from the Ubuntu repos, i.e., Docker 1.13.1.
#  * About 150 GB of disk space available to Docker.
#  * 75GB of RAM.
#
# This script tries to clean up images and containers created by this process, though
# this can be disabled for debugging.
#
# To clean up containers and images manually, you can use:
#   for x in $(docker ps -aq --filter label=pwd=$IMPALA_HOME); do
#       docker stop $x; docker rm $x; done
#   for x in $(docker images -q --filter label=pwd=$IMPALA_HOME); do docker rmi $x; done
#
# Core dumps:
# On an Ubuntu host, core dumps and Docker don't mix by default, because apport is not
# running inside of the container. See https://github.com/moby/moby/issues/11740
# To enable core dumps, run the following command on the host:
#   $echo 'core.%e.%p' | sudo tee /proc/sys/kernel/core_pattern
#
# TODOs:
#  - Support for executing other flavors, like exhaustive, or file systems,
#    like S3.
#
# Suggested speed improvement TODOs:
#   - Speed up testdata generation
#   - Skip generating test data for variants not being run
#   - Make container image smaller
#   - Analyze .xml junit files to find slow tests; eradicate
#     or move to different suite.
#   - Run BE tests earlier (during data load)

# We do not use Impala's python environment here, nor do we depend on
# non-standard python libraries to avoid needing extra build steps before
# triggering this.
import argparse
import datetime
import logging
import multiprocessing
import multiprocessing.pool
import os
import re
import subprocess
import sys
import tempfile
import threading
import time

if __name__ == '__main__' and __package__ is None:
  sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  import monitor

base = os.path.dirname(os.path.abspath(__file__))

LOG_FORMAT="%(asctime)s %(threadName)s: %(message)s"


def main():
  logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

  default_parallel_test_concurrency, default_suite_concurrency, default_memlimit_gb = \
      _compute_defaults()
  parser = argparse.ArgumentParser(
      description=CLI_HELP, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  group = parser.add_mutually_exclusive_group()
  group.add_argument('--cleanup-containers', dest="cleanup_containers",
                     action='store_true', default=True,
                     help='Removes containers when finished.')
  group.add_argument('--no-cleanup-containers',
                     dest="cleanup_containers", action='store_false')
  group = parser.add_mutually_exclusive_group()
  parser.add_argument(
      '--parallel-test-concurrency', type=int,
      default=default_parallel_test_concurrency,
      help='For the ee-test-parallel suite, how many tests to run concurrently.')
  parser.add_argument(
      '--suite-concurrency', type=int, default=default_suite_concurrency,
      help='Number of concurrent suites to run in parallel.')
  parser.add_argument(
      '--impalad-mem-limit-bytes', type=int, default=default_memlimit_gb,
      help='Memlimit to pass to impalad for miniclusters.')
  group.add_argument(
      '--cleanup-image', dest="cleanup_image",
      action='store_true', default=True,
      help="Whether to remove image when done.")
  group.add_argument('--no-cleanup-image', dest="cleanup_image", action='store_false')
  parser.add_argument(
      '--build-image', metavar='IMAGE',
      help='Skip building, and run tests on pre-existing image.')

  suite_group = parser.add_mutually_exclusive_group()
  suite_group.add_argument(
      '--suite', metavar='VARIANT', action='append',
      help="""
        Run specific test suites; can be specified multiple times.
        Test-with-docker may shard some suites to improve parallelism.
        If not specified, default tests are run.
        Default: %s, All Choices: %s
        """ % (",".join([ s.name for s in DEFAULT_SUITES]),
          ",".join([ s.name for s in ALL_SUITES ])))
  suite_group.add_argument('--all-suites', action='store_true', default=False,
      help="If set, run all available suites.")
  parser.add_argument(
      '--name', metavar='NAME',
      help="Use a specific name for the test run. The name is used " +
      "as a prefix for the container and image names, and " +
      "as part of the log directory naming. Defaults to include a timestamp.",
      default=datetime.datetime.now().strftime("i-%Y%m%d-%H%M%S"))
  parser.add_argument('--ccache-dir', metavar='DIR',
                      help="CCache directory to use",
                      default=os.path.expanduser("~/.ccache"))
  parser.add_argument('--tail', action="store_true",
      help="Run tail on all container log files.")
  parser.add_argument('--test', action="store_true")
  args = parser.parse_args()

  if not args.suite:
    if args.all_suites:
      # Ignore "NOOP" tasks, as they are just for testing.
      args.suite = [ s.name for s in ALL_SUITES if not s.name.startswith("NOOP") ]
    else:
      args.suite = [ s.name for s in DEFAULT_SUITES ]
  t = TestWithDocker(
      build_image=args.build_image, suite_names=args.suite,
      name=args.name, cleanup_containers=args.cleanup_containers,
      cleanup_image=args.cleanup_image, ccache_dir=args.ccache_dir, test_mode=args.test,
      parallel_test_concurrency=args.parallel_test_concurrency,
      suite_concurrency=args.suite_concurrency,
      impalad_mem_limit_bytes=args.impalad_mem_limit_bytes,
      tail=args.tail)

  fh = logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt"))
  fh.setFormatter(logging.Formatter(LOG_FORMAT))
  logging.getLogger('').addHandler(fh)

  logging.info("Arguments: %s", args)

  ret = t.run()
  t.create_timeline()
  t.log_summary()

  if not ret:
    sys.exit(1)


def _compute_defaults():
  """Compute default config options based on memory.

  The goal is to work reasonably on machines with
  about 60GB of memory, like Amazon's c4.8xlarge (36 CPUs, 60GB)
  or c5.9xlarge (36 CPUs, 72GB) or m4.4xlarge (16 CPUs, 64 GB).

  Based on some experiments, we set up defaults for different
  machine sizes based on memory, with an eye towards
  having reasonable runtimes as well.

  Experiments on memory usage:

  suite               parallelism usage
                    Xmx    memlimit
  ee-test-parallel  4GB  8  5GB   33GB
  ee-test-parallel  4GB 16  7GB   37GB
  ee-test-serial    4GB  -  5GB   18GB
  cluster-test      4GB  -    -   13GB
  be-test           4GB  - 10GB   19GB
  fe-test           4GB  - 10GB    9GB
  """
  total_memory_gb = monitor.total_memory()
  cpus = multiprocessing.cpu_count()
  logging.info("CPUs: %s Memory (GB): %s", cpus, total_memory_gb)

  parallel_test_concurrency = min(cpus, 8)
  memlimit_gb = 8

  if total_memory_gb >= 95:
    suite_concurrency = 4
    memlimit_gb = 11
    parallel_test_concurrency = min(cpus, 12)
  elif total_memory_gb >= 65:
    suite_concurrency = 3
  elif total_memory_gb >= 35:
    suite_concurrency = 2
  else:
    logging.warning("This tool should be run on a machine with more memory.")
    suite_concurrency = 1

  return parallel_test_concurrency, suite_concurrency, memlimit_gb * 1024 * 1024 * 1024

class Suite(object):
  """Encapsulates a test suite.

  A test suite is a named thing that the user can select to run,
  and it runs in its own container, in parallel with other suites.
  The actual running happens from entrypoint.sh and is controlled
  mostly by environment variables. When complexity is easier
  to handle in Python (with its richer data types), we prefer
  it here.
  """
  def __init__(self, name, **envs):
    """Create suite with given name and environment."""
    self.name = name
    self.envs = dict(
        FE_TEST="false",
        BE_TEST="false",
        EE_TEST="false",
        JDBC_TEST="false",
        CLUSTER_TEST="false")
    # If set, this suite is sharded past a certain suite concurrency threshold.
    self.shard_at_concurrency = None
    # Variable to which to append --shard_tests
    self.sharding_variable = None
    self.envs[name] = "true"
    self.envs.update(envs)
    self.timeout_minutes = 120

  def copy(self, name, **envs):
    """Duplicates current suite allowing for environment updates."""
    v = dict()
    v.update(self.envs)
    v.update(envs)
    ret = Suite(name, **v)
    ret.shard_at_concurrency = self.shard_at_concurrency
    ret.sharding_variable = self.sharding_variable
    ret.timeout_minutes = self.timeout_minutes
    return ret

  def exhaustive(self):
    """Returns an "exhaustive" copy of the suite."""
    r = self.copy(self.name + "_EXHAUSTIVE", EXPLORATION_STRATEGY="exhaustive")
    r.timeout_minutes = 240
    return r

  def asan(self):
    """Returns an ASAN copy of this suite."""
    r = self.copy(self.name + "_ASAN", REBUILD_ASAN="true")
    r.timeout_minutes = self.timeout_minutes * 2.0 + 10
    return r

  def sharded(self, shards):
    """Returns a list of sharded copies of the list.

    key is the name of the variable which needs to be appended with "--shard-tests=..."
    """
    # RUN_TESTS_ARGS
    ret = []
    for i in range(1, shards + 1):
      s = self.copy("%s_%d_of_%d" % (self.name, i, shards))
      s.envs[self.sharding_variable] = self.envs.get(self.sharding_variable, "") \
          + " --shard_tests=%s/%s" % (i, shards)
      ret.append(s)
    return ret

# Definitions of all known suites:
be_test = Suite("BE_TEST")
ee_test_serial = Suite("EE_TEST_SERIAL", EE_TEST="true",
    RUN_TESTS_ARGS="--skip-parallel --skip-stress")
ee_test_serial.shard_at_concurrency = 4
ee_test_serial.sharding_variable = "RUN_TESTS_ARGS"
ee_test_serial_exhaustive = ee_test_serial.exhaustive()
ee_test_parallel = Suite("EE_TEST_PARALLEL", EE_TEST="true",
    RUN_TESTS_ARGS="--skip-serial")
ee_test_parallel_exhaustive = ee_test_parallel.exhaustive()
cluster_test = Suite("CLUSTER_TEST")
cluster_test.shard_at_concurrency = 4
cluster_test.sharding_variable = "RUN_CUSTOM_CLUSTER_TESTS_ARGS"
cluster_test_exhaustive = cluster_test.exhaustive()

# Default supported suites. These are organized slowest-to-fastest, so that,
# when parallelism is limited, the total time is least impacted.
DEFAULT_SUITES = [
    ee_test_serial,
    ee_test_parallel,
    cluster_test,
    Suite("BE_TEST"),
    Suite("FE_TEST"),
    Suite("JDBC_TEST")
]

OTHER_SUITES = [
    ee_test_parallel_exhaustive,
    ee_test_serial_exhaustive,
    cluster_test_exhaustive,

    # ASAN
    be_test.asan(),
    cluster_test.asan(),
    ee_test_parallel.asan(),
    ee_test_serial.asan(),

    Suite("RAT_CHECK"),
    # These are used for testing this script
    Suite("NOOP"),
    Suite("NOOP_FAIL"),
    Suite("NOOP_SLEEP_FOREVER")
]
ALL_SUITES = DEFAULT_SUITES + OTHER_SUITES

def _call(args, check=True):
  """Wrapper for calling a subprocess.

  args is the first argument of subprocess.Popen, typically
  an array, e.g., ["echo", "hi"].

  If check is set, raise an exception on failure.
  """
  logging.info("Calling: %s", args)
  if check:
    subprocess.check_call(args, stdin=None)
  else:
    return subprocess.call(args, stdin=None)


def _check_output(*args, **kwargs):
  """Wrapper for subprocess.check_output, with logging."""
  logging.info("Running: %s, %s; cmdline: %s.", args, kwargs, " ".join(*args))
  return subprocess.check_output(*args, **kwargs)


def _make_dir_if_not_exist(*parts):
  d = os.path.join(*parts)
  if not os.path.exists(d):
    os.makedirs(d)
  return d


class Container(object):
  """Encapsulates a container, with some metadata."""

  def __init__(self, id_, name, logfile, exitcode=None, running=None):
    self.id = id_
    self.name = name
    self.logfile = logfile
    self.exitcode = exitcode
    self.running = running
    self.start = None
    self.end = None
    self.removed = False

    # Protects multiple calls to "docker rm <self.id>"
    self.lock = threading.Lock()

    # Updated by Timeline class
    self.total_user_cpu = -1
    self.total_system_cpu = -1
    self.peak_total_rss = -1

  def runtime_seconds(self):
    if self.start and self.end:
      return self.end - self.start

  def __str__(self):
    return "Container<" + \
        ",".join(["%s=%s" % (k, v) for k, v in self.__dict__.items()]) \
        + ">"


class TestWithDocker(object):
  """Tests Impala using Docker containers for parallelism."""

  def __init__(self, build_image, suite_names, name, cleanup_containers,
               cleanup_image, ccache_dir, test_mode,
               suite_concurrency, parallel_test_concurrency,
               impalad_mem_limit_bytes, tail):
    self.build_image = build_image
    self.name = name
    self.containers = []
    self.git_root = _check_output(["git", "rev-parse", "--show-toplevel"]).strip()

    # If using worktrees, we need to find $GIT_COMMON_DIR; rev-parse
    # supports finding it as of vesion 2.5.0; for older versions, we
    # use $GIT_DIR.
    git_common_dir = _check_output(["git", "rev-parse", "--git-common-dir"]).strip()
    if git_common_dir == "--git-common-dir":
      git_common_dir = _check_output(["git", "rev-parse", "--git-dir"]).strip()
    self.git_common_dir = os.path.realpath(git_common_dir)
    assert os.path.exists(self.git_common_dir)

    self.git_head_rev = _check_output(
        ["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip()
    assert self.git_head_rev, \
        "Could not get reference to HEAD using git rev-parse --abbrev-ref HEAD."
    self.cleanup_containers = cleanup_containers
    self.cleanup_image = cleanup_image
    self.image = None
    if build_image and cleanup_image:
      # Refuse to clean up external image.
      raise Exception("cleanup_image and build_image cannot be both specified")
    self.ccache_dir = ccache_dir
    self.log_dir = os.path.join(self.git_root, "logs", "docker", self.name)
    self.monitoring_output_file = os.path.join(self.log_dir, "metrics.txt")
    self.monitor = monitor.ContainerMonitor(self.monitoring_output_file)
    self.test_mode = test_mode
    self.suite_concurrency = suite_concurrency
    self.parallel_test_concurrency = parallel_test_concurrency
    self.impalad_mem_limit_bytes = impalad_mem_limit_bytes
    self.tail = tail

    # Map suites back into objects; we ignore case for this mapping.
    suites = []
    suites_by_name = {}
    for suite in ALL_SUITES:
      suites_by_name[suite.name.lower()] = suite
    for suite_name in suite_names:
      suites.append(suites_by_name[suite_name.lower()])

    # If we have enough concurrency, shard some suites into two halves.
    suites2 = []
    for suite in suites:
      if suite.shard_at_concurrency is not None and \
          suite_concurrency >= suite.shard_at_concurrency:
        suites2.extend(suite.sharded(2))
      else:
        suites2.append(suite)
    suites = suites2

    self.suite_runners = [TestSuiteRunner(self, suite) for suite in suites]

  def _create_container(self, image, name, logdir, logname, entrypoint, extras=None):
    """Returns a new container.

    logdir - subdirectory to create under self.log_dir,
      which will get mounted to /logs
    logname - name of file in logdir that will be created
    extras - extra arguments to pass to docker
    entrypoint - entrypoint arguments, as a list.
    """
    if extras is None:
      extras = []
    if self.test_mode:
      extras = ["-e", "TEST_TEST_WITH_DOCKER=true"] + extras

    container_id = _check_output([
        "docker", "create",
        # Required for some of the ntp handling in bootstrap and Kudu;
        # requirement may be lifted in newer Docker versions.
        "--privileged",
        "--name", name,
        "--hostname", name,
        # Label with the git root directory for easier cleanup
        "--label=pwd=" + self.git_root,
        # Consistent locales
        "-e", "LC_ALL=C",
        "-e", "IMPALAD_MEM_LIMIT_BYTES=" +
        str(self.impalad_mem_limit_bytes),
        # Mount the git directory so that clones can be local.
        # We use /repo to have access to certain scripts,
        # and we use /git_common_dir to have local clones,
        # even when "git worktree" is being used.
        "-v", self.git_root + ":/repo:ro",
        "-v", self.git_common_dir + ":/git_common_dir:ro",
        "-e", "GIT_HEAD_REV=" + self.git_head_rev,
        "-v", self.ccache_dir + ":/ccache",
        # Share timezone between host and container
        "-v", "/etc/localtime:/mnt/localtime",
        "-v", _make_dir_if_not_exist(self.log_dir,
                                     logdir) + ":/logs",
        "-v", base + ":/mnt/base:ro"]
        + extras
        + [image]
        + entrypoint).strip()
    ctr = Container(name=name, id_=container_id,
                     logfile=os.path.join(self.log_dir, logdir, logname))
    logging.info("Created container %s", ctr)
    return ctr

  def _run_container(self, container):
    """Runs container, and returns True if the container had a successful exit value.

    This blocks while the container is running. The container output is
    run through annotate.py to add timestamps and saved into the container's log file.
    """
    container.running = True
    tailer = None

    with open(container.logfile, "aw") as log_output:
      if self.tail:
        tailer = subprocess.Popen(
            ["tail", "-f", "--pid", str(os.getpid()), "-v", container.logfile])

      container.start = time.time()
      # Sets up a "docker start ... | annotate.py > logfile" pipeline using
      # subprocess.
      annotate = subprocess.Popen(
          [os.path.join(self.git_root, "docker", "annotate.py")],
          stdin=subprocess.PIPE,
          stdout=log_output,
          stderr=log_output)

      logging.info("Starting container %s; logging to %s", container.name,
                   container.logfile)
      docker = subprocess.Popen(["docker", "start", "--attach", container.id],
                                stdin=None, stdout=annotate.stdin, stderr=annotate.stdin)

      ret = docker.wait()
      annotate.stdin.close()
      annotate.wait()

      logging.info("Container %s returned %s", container, ret)
      container.exitcode = ret
      container.running = False
      container.end = time.time()
      if tailer:
        tailer.kill()
      return ret == 0

  @staticmethod
  def _stop_container(container):
    """Stops container. Ignores errors (e.g., if it's already exited)."""
    if container.running:
      _call(["docker", "stop", container.id], check=False)
      container.end = time.time()
      container.running = False

  @staticmethod
  def _rm_container(container):
    """Removes container."""
    # We can have multiple threads trying to call "docker rm" on a container.
    # Docker will fail one of those with "already running", but we actually
    # want to block until it's removed. Using a lock to serialize the "docker # rm"
    # calls handles that.
    with container.lock:
      if not container.removed:
        _call(["docker", "rm", container.id], check=False)
      container.removed = True

  def _create_build_image(self):
    """Creates the "build image", with Impala compiled and data loaded."""
    container = self._create_container(
        image="ubuntu:16.04", name=self.name,
        logdir="build",
        logname="log-build.txt",
        # entrypoint.sh will create a user with our uid; this
        # allows the shared file systems to work seamlessly
        entrypoint=["/mnt/base/entrypoint.sh", "build", str(os.getuid())])
    self.containers.append(container)
    self.monitor.add(container)
    try:
      logging.info("Docker container for build: %s", container)
      _check_output(["docker", "start", container.id])
      if not self._run_container(container):
        raise Exception("Build container failed.")
      logging.info("Committing docker container.")
      self.image = _check_output(
          ["docker", "commit",
           "-c", "LABEL pwd=" + self.git_root,
           container.id, "impala:built-" + self.name]).strip()
      logging.info("Committed docker image: %s", self.image)
    finally:
      if self.cleanup_containers:
        self._stop_container(container)
        self._rm_container(container)

  def _run_tests(self):
    pool = multiprocessing.pool.ThreadPool(processes=self.suite_concurrency)
    outstanding_suites = []
    for suite in self.suite_runners:
      suite.task = pool.apply_async(suite.run)
      outstanding_suites.append(suite)

    ret = True
    try:
      while len(outstanding_suites) > 0:
        for suite in list(outstanding_suites):
          if suite.timed_out():
            msg = "Task %s not finished within timeout %s" % (suite.name,
                suite.suite.timeout_minutes,)
            logging.error(msg)
            raise Exception(msg)
          task = suite.task
          if task.ready():
            this_task_ret = task.get()
            outstanding_suites.remove(suite)
            if this_task_ret:
              logging.info("Suite %s succeeded.", suite.name)
            else:
              logging.info("Suite %s failed.", suite.name)
              ret = False
        time.sleep(5)
    except KeyboardInterrupt:
      logging.info("\n\nDetected KeyboardInterrupt; shutting down!\n\n")
      raise
    finally:
      pool.terminate()
    return ret

  def run(self):
    # Create logs directories and ccache dir.
    _make_dir_if_not_exist(self.ccache_dir)
    _make_dir_if_not_exist(self.log_dir)

    self.monitor.start()
    try:
      if not self.build_image:
        self._create_build_image()
      else:
        self.image = self.build_image
      ret = self._run_tests()
      return ret
    finally:
      self.monitor.stop()
      if self.cleanup_containers:
        for c in self.containers:
          self._stop_container(c)
          self._rm_container(c)
      if self.cleanup_image and self.image:
        _call(["docker", "rmi", self.image], check=False)
      logging.info("Memory usage: %s GB min, %s GB max",
                   self.monitor.min_memory_usage_gb,
                   self.monitor.max_memory_usage_gb)

  # Strings (really, regular expressions) pulled out into to the visual timeline.
  _INTERESTING_STRINGS = [
      ">>> ",
  ]
  _INTERESTING_RE = re.compile("|".join("(%s)" % (s,) for s in _INTERESTING_STRINGS))

  def create_timeline(self):
    """Creates timeline into log directory."""
    timeline = monitor.Timeline(
        monitor_file=self.monitoring_output_file,
        containers=self.containers,
        interesting_re=self._INTERESTING_RE)
    timeline.create(os.path.join(self.log_dir, "timeline.html"))

  def log_summary(self):
    logging.info("Containers:")
    def to_success_string(exitcode):
      if exitcode == 0:
        return "SUCCESS"
      return "FAILURE"

    for c in self.containers:
      logging.info("%s %s %s %0.1fm wall, %0.1fm user, %0.1fm system, " +
            "%0.1fx parallelism, %0.1f GB peak RSS",
          to_success_string(c.exitcode), c.name, c.logfile,
          c.runtime_seconds() / 60.0,
          c.total_user_cpu / 60.0,
          c.total_system_cpu / 60.0,
          (c.total_user_cpu + c.total_system_cpu) / max(c.runtime_seconds(), 0.0001),
          c.peak_total_rss / 1024.0 / 1024.0 / 1024.0)


class TestSuiteRunner(object):
  """Runs a single test suite."""

  def __init__(self, test_with_docker, suite):
    self.test_with_docker = test_with_docker
    self.suite = suite
    self.task = None
    self.name = suite.name.lower()
    # Set at the beginning of run and facilitates enforcing timeouts
    # for individual suites.
    self.deadline = None

  def timed_out(self):
    return self.deadline is not None and time.time() > self.deadline

  def run(self):
    """Runs given test. Returns true on success, based on exit code."""
    self.deadline = time.time() + self.suite.timeout_minutes * 60
    test_with_docker = self.test_with_docker
    suite = self.suite
    envs = ["-e", "NUM_CONCURRENT_TESTS=" + str(test_with_docker.parallel_test_concurrency)]
    for k, v in sorted(suite.envs.iteritems()):
      envs.append("-e")
      envs.append("%s=%s" % (k, v))

    self.start = time.time()

    # io-file-mgr-test expects a real-ish file system at /tmp;
    # we mount a temporary directory into the container to appease it.
    tmpdir = tempfile.mkdtemp(prefix=test_with_docker.name + "-" + self.name)
    # Container names are sometimes used as hostnames, and DNS names shouldn't
    # have underscores.
    container_name = test_with_docker.name + "-" + self.name.replace("_", "-")

    container = test_with_docker._create_container(
        image=test_with_docker.image,
        name=container_name,
        extras=[
            "-v", tmpdir + ":/tmp",
            "-u", str(os.getuid())
        ] + envs,
        logdir=self.name,
        logname="log-test-" + self.suite.name + ".txt",
        entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite.name])

    test_with_docker.containers.append(container)
    test_with_docker.monitor.add(container)
    try:
      return test_with_docker._run_container(container)
    except:
      return False
    finally:
      logging.info("Cleaning up containers for %s" % (suite.name,))
      test_with_docker._stop_container(container)
      if test_with_docker.cleanup_containers:
        test_with_docker._rm_container(container)


if __name__ == "__main__":
  main()
