blob: 160caa5febacf1b9a2ce02cba98c8922f28a4b43 [file]
# SPDX-License-Identifier: Apache-2.0
#
# Modifications by Apache Solr contributors; see git log for details.
# Licensed under the Apache License, Version 2.0.
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import json
import logging
import os
import pickle
import sys
import traceback
from collections import defaultdict
import thespian.actors
class NotFoundError(Exception):
pass
from solrorbit import (PROGRAM_NAME, actor, client, config, exceptions,
metrics, paths)
from solrorbit.builder import (launcher, provisioner,
supplier)
from solrorbit.builder import cluster_config as cc
from solrorbit.utils import console, net
METRIC_FLUSH_INTERVAL_SECONDS = 30
def download(cfg):
cluster_config, _ = load_cluster_config(cfg, external=False)
s = supplier.create(cfg, sources=False, cluster_config=cluster_config)
binaries = s()
console.println(json.dumps(binaries, indent=2), force=True)
def install(cfg):
root_path = paths.install_root(cfg)
cluster_config, plugins = load_cluster_config(cfg, external=False)
# A non-empty distribution-version is provided
distribution = bool(cfg.opts("builder", "distribution.version", mandatory=False))
sources = not distribution
build_type = cfg.opts("builder", "build.type")
ip = cfg.opts("builder", "network.host")
http_port = int(cfg.opts("builder", "network.http.port"))
node_name = cfg.opts("builder", "node.name")
master_nodes = cfg.opts("builder", "master.nodes")
seed_hosts = cfg.opts("builder", "seed.hosts")
# Ensure node_name and master_nodes match, using node_name as the default
if node_name not in master_nodes:
print(
f"The provided --node-name '{node_name}' and --master-nodes '{master_nodes}' are different. "
f"Using '{node_name}' for both node name and initial master node."
)
master_nodes = [node_name]
if build_type == "tar":
binary_supplier = supplier.create(cfg, sources, cluster_config)
p = provisioner.local(cfg=cfg, cluster_config=cluster_config, ip=ip, http_port=http_port,
all_node_ips=seed_hosts, all_node_names=master_nodes, target_root=root_path,
node_name=node_name)
node_config = p.prepare(binary=binary_supplier())
elif build_type == "docker":
if len(plugins) > 0:
raise exceptions.SystemSetupError("You cannot specify any plugins for Docker clusters. Please remove "
"\"--plugins\" and try again.")
p = provisioner.docker(
cfg=cfg, cluster_config=cluster_config,
ip=ip, http_port=http_port, target_root=root_path, node_name=node_name)
# there is no binary for Docker that can be downloaded / built upfront
node_config = p.prepare(binary=None)
else:
raise exceptions.SystemSetupError("Unknown build type [{}]".format(build_type))
provisioner.save_node_configuration(root_path, node_config)
console.println(json.dumps({"installation-id": cfg.opts("system", "install.id")}, indent=2), force=True, stderr=True)
def start(cfg):
root_path = paths.install_root(cfg)
test_run_id = cfg.opts("system", "test_run.id")
# avoid double-launching - we expect that the node file is absent
with contextlib.suppress(FileNotFoundError):
_load_node_file(root_path)
install_id = cfg.opts("system", "install.id")
raise exceptions.SystemSetupError("A node with this installation id is already running. Please stop it first "
"with {} stop --installation-id={}".format(PROGRAM_NAME, install_id))
node_config = provisioner.load_node_configuration(root_path)
if node_config.build_type == "tar":
node_launcher = launcher.ProcessLauncher(cfg)
elif node_config.build_type == "docker":
node_launcher = launcher.DockerLauncher(cfg)
else:
raise exceptions.SystemSetupError("Unknown build type [{}]".format(node_config.build_type))
nodes = node_launcher.start([node_config])
_store_node_file(root_path, (nodes, test_run_id))
def stop(cfg):
root_path = paths.install_root(cfg)
node_config = provisioner.load_node_configuration(root_path)
if node_config.build_type == "tar":
node_launcher = launcher.ProcessLauncher(cfg)
elif node_config.build_type == "docker":
node_launcher = launcher.DockerLauncher(cfg)
else:
raise exceptions.SystemSetupError("Unknown build type [{}]".format(node_config.build_type))
nodes, test_run_id = _load_node_file(root_path)
cls = metrics.metrics_store_class(cfg)
metrics_store = cls(cfg)
test_run_store = metrics.test_run_store(cfg)
try:
current_test_run = test_run_store.find_by_test_run_id(test_run_id)
metrics_store.open(
test_run_id=current_test_run.test_run_id,
test_run_timestamp=current_test_run.test_run_timestamp,
workload_name=current_test_run.workload_name,
test_procedure_name=current_test_run.test_procedure_name
)
except exceptions.NotFound:
logging.getLogger(__name__).info("Could not find test_run [%s] and will thus not persist system metrics.", test_run_id)
# Don't persist system metrics if we can't retrieve the test_run as we cannot derive the required meta-data.
current_test_run = None
metrics_store = None
node_launcher.stop(nodes, metrics_store)
_delete_node_file(root_path)
if current_test_run:
metrics_store.flush(refresh=True)
for node in nodes:
results = metrics.calculate_system_results(metrics_store, node.node_name)
current_test_run.add_results(results)
metrics.results_store(cfg).store_results(current_test_run)
metrics_store.close()
# TODO: Do we need to expose this as a separate command as well?
provisioner.cleanup(preserve=cfg.opts("builder", "preserve.install"),
install_dir=node_config.binary_path,
data_paths=node_config.data_paths)
def _load_node_file(root_path):
with open(os.path.join(root_path, "node"), "rb") as f:
return pickle.load(f)
def _store_node_file(root_path, data):
with open(os.path.join(root_path, "node"), "wb") as f:
pickle.dump(data, f)
def _delete_node_file(root_path):
os.remove(os.path.join(root_path, "node"))
##############################
# Public Messages
##############################
class StartEngine:
def __init__(self, cfg, open_metrics_context, sources, distribution, external, docker, ip=None, port=None,
node_id=None):
self.cfg = cfg
self.open_metrics_context = open_metrics_context
self.sources = sources
self.distribution = distribution
self.external = external
self.docker = docker
self.ip = ip
self.port = port
self.node_id = node_id
def for_nodes(self, all_node_ips=None, all_node_ids=None, ip=None, port=None, node_ids=None):
"""
Creates a StartNodes instance for a concrete IP, port and their associated node_ids.
:param all_node_ips: The IPs of all nodes in the cluster (including the current one).
:param all_node_ids: The numeric id of all nodes in the cluster (including the current one).
:param ip: The IP to set.
:param port: The port number to set.
:param node_ids: A list of node id to set.
:return: A corresponding ``StartNodes`` message with the specified IP, port number and node ids.
"""
return StartNodes(self.cfg, self.open_metrics_context, self.sources, self.distribution,
self.external, self.docker, all_node_ips, all_node_ids, ip, port, node_ids)
class EngineStarted:
def __init__(self, cluster_config_revision):
self.cluster_config_revision = cluster_config_revision
class StopEngine:
pass
class EngineStopped:
pass
class ResetRelativeTime:
def __init__(self, reset_in_seconds):
self.reset_in_seconds = reset_in_seconds
##############################
# Builder internal messages
##############################
class StartNodes:
def __init__(self, cfg, open_metrics_context, sources, distribution, external, docker,
all_node_ips, all_node_ids, ip, port, node_ids):
self.cfg = cfg
self.open_metrics_context = open_metrics_context
self.sources = sources
self.distribution = distribution
self.external = external
self.docker = docker
self.all_node_ips = all_node_ips
self.all_node_ids = all_node_ids
self.ip = ip
self.port = port
self.node_ids = node_ids
class NodesStarted:
pass
class StopNodes:
pass
class NodesStopped:
pass
def cluster_distribution_version(cfg, client_factory=client.ClientFactory):
"""
Attempt to get the cluster's distribution version even before it is actually started (which makes only sense for externally
provisioned clusters).
:param cfg: The current config object.
:param client_factory: Factory class that creates the client.
:return: The distribution version.
"""
hosts = cfg.opts("client", "hosts").default
client_options = cfg.opts("client", "options").default
client_instance = client_factory(hosts, client_options).create()
if isinstance(client_instance, client.SolrClient):
return "9.10.1"
return None
def to_ip_port(hosts):
ip_port_pairs = []
for host in hosts:
host = host.copy()
host_or_ip = host.pop("host")
port = host.pop("port", 8983)
if host:
raise exceptions.SystemSetupError("When specifying nodes to be managed by solr-orbit you can only supply "
"hostname:port pairs (e.g. 'localhost:8983'), any additional options cannot "
"be supported.")
ip = net.resolve(host_or_ip)
ip_port_pairs.append((ip, port))
return ip_port_pairs
def extract_all_node_ips(ip_port_pairs):
all_node_ips = set()
for ip, _ in ip_port_pairs:
all_node_ips.add(ip)
return all_node_ips
def extract_all_node_ids(all_nodes_by_host):
all_node_ids = set()
for node_ids_per_host in all_nodes_by_host.values():
all_node_ids.update(node_ids_per_host)
return all_node_ids
def nodes_by_host(ip_port_pairs):
nodes = {}
node_id = 0
for ip_port in ip_port_pairs:
if ip_port not in nodes:
nodes[ip_port] = []
nodes[ip_port].append(node_id)
node_id += 1
return nodes
class BuilderActor(actor.BenchmarkActor):
WAKEUP_RESET_RELATIVE_TIME = "relative_time"
"""
This actor coordinates all associated builders on remote hosts (which do the actual work).
"""
def __init__(self):
super().__init__()
self.cfg = None
self.test_run_orchestrator = None
self.cluster_launcher = None
self.cluster = None
self.cluster_config = None
self.cluster_config_revision = None
self.externally_provisioned = False
def receiveUnrecognizedMessage(self, msg, sender):
self.logger.info("BuilderActor#receiveMessage unrecognized(msg = [%s] sender = [%s])", str(type(msg)), str(sender))
def receiveMsg_ChildActorExited(self, msg, sender):
if self.is_current_status_expected(["cluster_stopping", "cluster_stopped"]):
self.logger.info("Child actor exited while engine is stopping or stopped: [%s]", msg)
return
failmsg = "Child actor exited with [%s] while in status [%s]." % (msg, self.status)
self.logger.error(failmsg)
self.send(self.test_run_orchestrator, actor.BenchmarkFailure(failmsg))
def receiveMsg_PoisonMessage(self, msg, sender):
self.logger.info("BuilderActor#receiveMessage poison(msg = [%s] sender = [%s])", str(msg.poisonMessage), str(sender))
# something went wrong with a child actor (or another actor with which we have communicated)
if isinstance(msg.poisonMessage, StartEngine):
failmsg = "Could not start benchmark candidate. Are Solr Orbit daemons on all targeted machines running?"
else:
failmsg = msg.details
self.logger.error(failmsg)
self.send(self.test_run_orchestrator, actor.BenchmarkFailure(failmsg))
@actor.no_retry("builder") # pylint: disable=no-value-for-parameter
def receiveMsg_StartEngine(self, msg, sender):
self.logger.info("Received signal from test run orchestrator to start engine.")
self.test_run_orchestrator = sender
self.cfg = msg.cfg
self.cluster_config, _ = load_cluster_config(self.cfg, msg.external)
# TODO: This is implicitly set by #load_cluster_config() - can we gather this elsewhere?
self.cluster_config_revision = self.cfg.opts("builder", "repository.revision")
# In our startup procedure we first create all builders. Only if this succeeds we'll continue.
hosts = self.cfg.opts("client", "hosts").default
if len(hosts) == 0:
raise exceptions.LaunchError("No target hosts are configured.")
self.externally_provisioned = msg.external
if self.externally_provisioned:
self.logger.info("Cluster will not be provisioned by Solr Orbit.")
self.status = "nodes_started"
self.received_responses = []
self.on_all_nodes_started()
self.status = "cluster_started"
else:
console.info("Preparing for test run ...", flush=True)
self.logger.info("Cluster consisting of %s will be provisioned by Solr Orbit.", hosts)
msg.hosts = hosts
# Initialize the children array to have the right size to
# ensure waiting for all responses
self.children = [None] * len(nodes_by_host(to_ip_port(hosts)))
self.send(self.createActor(Dispatcher), msg)
self.status = "starting"
self.received_responses = []
@actor.no_retry("builder") # pylint: disable=no-value-for-parameter
def receiveMsg_NodesStarted(self, msg, sender):
# Initially the addresses of the children are not
# known and there is just a None placeholder in the
# array. As addresses become known, fill them in.
if sender not in self.children:
# Length-limited FIFO characteristics:
self.children.insert(0, sender)
self.children.pop()
self.transition_when_all_children_responded(sender, msg, "starting", "cluster_started", self.on_all_nodes_started)
@actor.no_retry("builder") # pylint: disable=no-value-for-parameter
def receiveMsg_ResetRelativeTime(self, msg, sender):
if msg.reset_in_seconds > 0:
self.wakeupAfter(msg.reset_in_seconds, payload=BuilderActor.WAKEUP_RESET_RELATIVE_TIME)
else:
self.reset_relative_time()
def receiveMsg_WakeupMessage(self, msg, sender):
if msg.payload == BuilderActor.WAKEUP_RESET_RELATIVE_TIME:
self.reset_relative_time()
else:
raise exceptions.BenchmarkAssertionError("Unknown wakeup reason [{}]".format(msg.payload))
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.send(self.test_run_orchestrator, msg)
@actor.no_retry("builder") # pylint: disable=no-value-for-parameter
def receiveMsg_StopEngine(self, msg, sender):
# we might have experienced a launch error or the user has cancelled the benchmark. Hence we need to allow to stop the
# cluster from various states and we don't check here for a specific one.
if self.externally_provisioned:
self.on_all_nodes_stopped()
else:
self.send_to_children_and_transition(sender, StopNodes(), [], "cluster_stopping")
@actor.no_retry("builder") # pylint: disable=no-value-for-parameter
def receiveMsg_NodesStopped(self, msg, sender):
self.transition_when_all_children_responded(sender, msg, "cluster_stopping", "cluster_stopped", self.on_all_nodes_stopped)
def on_all_nodes_started(self):
self.send(self.test_run_orchestrator, EngineStarted(self.cluster_config_revision))
def reset_relative_time(self):
for m in self.children:
self.send(m, ResetRelativeTime(0))
def on_all_nodes_stopped(self):
self.send(self.test_run_orchestrator, EngineStopped())
# clear all state as the builder might get reused later
for m in self.children:
self.send(m, thespian.actors.ActorExitRequest())
self.children = []
# do not self-terminate, let the parent actor handle this
@thespian.actors.requireCapability('coordinator')
class Dispatcher(actor.BenchmarkActor):
"""This Actor receives a copy of the startmsg (with the computed hosts
attached) and creates a NodeBuilderActor on each targeted
remote host. It uses Thespian SystemRegistration to get
notification of when remote nodes are available. As a special
case, if an IP address is localhost, the NodeBuilderActor is
immediately created locally. Once All NodeBuilderActors are
started, it will send them all their startup message, with a
reply-to back to the actor that made the request of the
Dispatcher.
"""
def __init__(self):
super().__init__()
self.start_sender = None
self.pending = None
self.remotes = None
@actor.no_retry("builder dispatcher") # pylint: disable=no-value-for-parameter
def receiveMsg_StartEngine(self, startmsg, sender):
self.start_sender = sender
self.pending = []
self.remotes = defaultdict(list)
all_ips_and_ports = to_ip_port(startmsg.hosts)
all_node_ips = extract_all_node_ips(all_ips_and_ports)
all_nodes_by_host = nodes_by_host(all_ips_and_ports)
all_node_ids = extract_all_node_ids(all_nodes_by_host)
for (ip, port), node in all_nodes_by_host.items():
submsg = startmsg.for_nodes(all_node_ips, all_node_ids, ip, port, node)
submsg.reply_to = sender
if ip == '127.0.0.1':
m = self.createActor(NodeBuilderActor,
targetActorRequirements={"coordinator": True})
self.pending.append((m, submsg))
else:
self.remotes[ip].append(submsg)
if self.remotes:
# Now register with the ActorSystem to be told about all
# remote nodes (via the ActorSystemConventionUpdate below).
self.notifyOnSystemRegistrationChanges(True)
else:
self.send_all_pending()
# Could also initiate a wakeup message to fail this if not all
# remotes come online within the expected amount of time... TBD
def receiveMsg_ActorSystemConventionUpdate(self, convmsg, sender):
if not convmsg.remoteAdded:
self.logger.warning("Remote Solr Orbit node [%s] exited during NodeBuilderActor startup process.", convmsg.remoteAdminAddress)
self.start_sender(actor.BenchmarkFailure(
"Remote Solr Orbit node [%s] has been shutdown prematurely." % convmsg.remoteAdminAddress))
else:
remote_ip = convmsg.remoteCapabilities.get('ip', None)
self.logger.info("Remote Solr Orbit node [%s] has started.", remote_ip)
for eachmsg in self.remotes[remote_ip]:
self.pending.append((self.createActor(NodeBuilderActor,
targetActorRequirements={"ip": remote_ip}),
eachmsg))
if remote_ip in self.remotes:
del self.remotes[remote_ip]
if not self.remotes:
# Notifications are no longer needed
self.notifyOnSystemRegistrationChanges(False)
self.send_all_pending()
def send_all_pending(self):
# Invoked when all remotes have checked in and self.pending is
# the list of remote NodeBuilder actors and messages to send.
for each in self.pending:
self.send(*each)
self.pending = []
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.send(self.start_sender, msg)
def receiveMsg_PoisonMessage(self, msg, sender):
self.send(self.start_sender, actor.BenchmarkFailure(msg.details))
def receiveUnrecognizedMessage(self, msg, sender):
self.logger.info("builder.Dispatcher#receiveMessage unrecognized(msg = [%s] sender = [%s])", str(type(msg)), str(sender))
class NodeBuilderActor(actor.BenchmarkActor):
"""
One instance of this actor is run on each target host and coordinates the actual work of starting / stopping all nodes that should run
on this host.
"""
def __init__(self):
super().__init__()
self.builder = None
self.host = None
def receiveMsg_StartNodes(self, msg, sender):
try:
self.host = msg.ip
if msg.external:
self.logger.info("Connecting to externally provisioned nodes on [%s].", msg.ip)
else:
self.logger.info("Starting node(s) %s on [%s].", msg.node_ids, msg.ip)
# Load node-specific configuration
cfg = config.auto_load_local_config(msg.cfg, additional_sections=[
# only copy the relevant bits
"workload", "builder", "client", "telemetry",
# allow metrics store to extract test_run meta-data
"test_run",
"source"
])
# set root path (normally done by the main entry point)
cfg.add(config.Scope.application, "node", "benchmark.root", paths.benchmark_root())
if not msg.external:
cfg.add(config.Scope.benchmark, "provisioning", "node.ids", msg.node_ids)
cls = metrics.metrics_store_class(cfg)
metrics_store = cls(cfg)
metrics_store.open(ctx=msg.open_metrics_context)
# avoid follow-up errors in case we receive an unexpected ActorExitRequest due to an early failure in a parent actor.
self.builder = create(cfg, metrics_store, msg.ip, msg.port, msg.all_node_ips, msg.all_node_ids,
msg.sources, msg.distribution, msg.external, msg.docker)
self.builder.start_engine()
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
self.send(getattr(msg, "reply_to", sender), NodesStarted())
except Exception:
self.logger.exception("Cannot process message [%s]", msg)
# avoid "can't pickle traceback objects"
_, ex_value, _ = sys.exc_info()
self.send(getattr(msg, "reply_to", sender), actor.BenchmarkFailure(ex_value, traceback.format_exc()))
def receiveMsg_PoisonMessage(self, msg, sender):
if sender != self.myAddress:
self.send(sender, actor.BenchmarkFailure(msg.details))
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.send(getattr(msg, "reply_to", sender), msg)
def receiveUnrecognizedMessage(self, msg, sender):
# at the moment, we implement all message handling blocking. This is not ideal but simple to get started with. Besides, the caller
# needs to block anyway. The only reason we implement builder as an actor is to distribute them.
# noinspection PyBroadException
try:
self.logger.debug("NodeBuilderActor#receiveMessage(msg = [%s] sender = [%s])", str(type(msg)), str(sender))
if isinstance(msg, ResetRelativeTime) and self.builder:
self.builder.reset_relative_time()
elif isinstance(msg, thespian.actors.WakeupMessage) and self.builder:
self.builder.flush_metrics()
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
elif isinstance(msg, StopNodes):
self.builder.stop_engine()
self.send(sender, NodesStopped())
self.builder = None
elif isinstance(msg, thespian.actors.ActorExitRequest):
if self.builder:
self.builder.stop_engine()
self.builder = None
except BaseException as e:
self.logger.exception("Cannot process message [%s]", msg)
self.send(getattr(msg, "reply_to", sender), actor.BenchmarkFailure("Error on host %s" % str(self.host), e))
#####################################################
# Internal API (only used by the actor and for tests)
#####################################################
def load_cluster_config(cfg, external):
# externally provisioned clusters do not support cluster_configs / plugins
if external:
cluster_config = None
plugins = []
else:
cluster_config_path = cc.cluster_config_path(cfg)
cluster_config = cc.load_cluster_config(
cluster_config_path,
cfg.opts("builder", "cluster_config.names"),
cfg.opts("builder", "cluster_config.params"))
plugins = cc.load_plugins(cluster_config_path,
cfg.opts("builder", "cluster_config.plugins", mandatory=False),
cfg.opts("builder", "plugin.params", mandatory=False))
# Store cluster_config_instance in config for TestRun to access (for result metadata)
cfg.add(config.Scope.applicationOverride, "builder", "cluster_config.instance", cluster_config)
return cluster_config, plugins
def create(cfg, metrics_store, node_ip, node_http_port, all_node_ips, all_node_ids, sources=False, distribution=False,
external=False, docker=False):
test_run_root_path = paths.test_run_root(cfg)
node_ids = cfg.opts("provisioning", "node.ids", mandatory=False)
node_name_prefix = cfg.opts("provisioning", "node.name.prefix")
cluster_config, plugins = load_cluster_config(cfg, external)
if sources or distribution:
s = supplier.create(cfg, sources, cluster_config)
p = []
all_node_names = ["%s-%s" % (node_name_prefix, n) for n in all_node_ids]
for node_id in node_ids:
node_name = "%s-%s" % (node_name_prefix, node_id)
p.append(
provisioner.local(cfg, cluster_config, node_ip, node_http_port, all_node_ips,
all_node_names, test_run_root_path, node_name))
l = launcher.ProcessLauncher(cfg)
elif external:
raise exceptions.BenchmarkAssertionError("Externally provisioned clusters should not need to be managed by Solr Orbit's builder")
elif docker:
if len(plugins) > 0:
raise exceptions.SystemSetupError("You cannot specify any plugins for Docker clusters. Please remove "
"\"--plugin-params\" and try again.")
s = lambda: None
p = []
for node_id in node_ids:
node_name = "%s-%s" % (node_name_prefix, node_id)
p.append(provisioner.docker(cfg, cluster_config, node_ip, node_http_port, test_run_root_path, node_name))
l = launcher.DockerLauncher(cfg)
else:
# It is a programmer error (and not a user error) if this function is called with wrong parameters
raise RuntimeError("One of sources, distribution, docker or external must be True")
return Builder(cfg, metrics_store, s, p, l)
class Builder:
"""
Builder is responsible for preparing the benchmark candidate (i.e. all benchmark candidate related activities before and after
running the benchmark).
"""
def __init__(self, cfg, metrics_store, supply, provisioners, launcher):
self.cfg = cfg
self.preserve_install = cfg.opts("builder", "preserve.install")
self.metrics_store = metrics_store
self.supply = supply
self.provisioners = provisioners
self.launcher = launcher
self.nodes = []
self.node_configs = []
self.logger = logging.getLogger(__name__)
def start_engine(self):
binaries = self.supply()
self.node_configs = []
for p in self.provisioners:
self.node_configs.append(p.prepare(binaries))
self.nodes = self.launcher.start(self.node_configs)
return self.nodes
def reset_relative_time(self):
self.logger.info("Resetting relative time of system metrics store.")
self.metrics_store.reset_relative_time()
def flush_metrics(self, refresh=False):
self.logger.debug("Flushing system metrics.")
self.metrics_store.flush(refresh=refresh)
def stop_engine(self):
self.logger.info("Stopping nodes %s.", self.nodes)
self.launcher.stop(self.nodes, self.metrics_store)
self.flush_metrics(refresh=True)
try:
current_test_run = self._current_test_run()
for node in self.nodes:
self._add_results(current_test_run, node)
except exceptions.NotFound as e:
self.logger.warning("Cannot store system metrics: %s.", str(e))
self.metrics_store.close()
self.nodes = []
for node_config in self.node_configs:
provisioner.cleanup(preserve=self.preserve_install,
install_dir=node_config.binary_path,
data_paths=node_config.data_paths)
self.node_configs = []
def _current_test_run(self):
test_run_id = self.cfg.opts("system", "test_run.id")
return metrics.test_run_store(self.cfg).find_by_test_run_id(test_run_id)
def _add_results(self, current_test_run, node):
results = metrics.calculate_system_results(self.metrics_store, node.node_name)
current_test_run.add_results(results)
metrics.results_store(self.cfg).store_results(current_test_run)