blob: 070e34cddc61005a253a83b93271e83831b4ed5f [file]
# SPDX-License-Identifier: Apache-2.0
#
# Modifications by Apache Solr contributors; see git log for details.
# Licensed under the Apache License, Version 2.0.
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import collections
import glob
import json
import logging
import math
import os
import pickle
import statistics
import sys
import zlib
import webbrowser
from enum import Enum, IntEnum
import psutil
import tabulate
from solrorbit import time, exceptions, version, paths
from solrorbit.utils import convert, console, io, versions
from solrorbit.visualizations.benchmark_report_renderer import render_results_html
class MetaInfoScope(Enum):
"""
Defines the scope of a meta-information. Meta-information provides more context for a metric, for example the concrete version
of OpenSearch that has been benchmarked or environment information like CPU model or OS.
"""
cluster = 1
"""
Cluster level meta-information is valid for all nodes in the cluster (e.g. the benchmarked OpenSearch version)
"""
node = 3
"""
Node level meta-information is valid for a single node (e.g. GC times)
"""
def calculate_results(store, test_run):
calc = GlobalStatsCalculator(
store,
test_run.workload,
test_run.test_procedure,
latency_percentiles=test_run.latency_percentiles,
throughput_percentiles=test_run.throughput_percentiles
)
return calc()
def calculate_system_results(store, node_name):
calc = SystemStatsCalculator(store, node_name)
return calc()
def metrics_store(cfg, read_only=True, workload=None, test_procedure=None, cluster_config=None, meta_info=None):
"""
Creates a proper metrics store based on the current configuration.
:param cfg: Config object.
:param read_only: Whether to open the metrics store only for reading (Default: True).
:return: A metrics store implementation.
"""
store_type = cfg.opts("reporting", "datastore.type", mandatory=False, default_value="in-memory")
cls = FilesystemMetricsStore if store_type == "filesystem" else InMemoryMetricsStore
store = cls(cfg=cfg, meta_info=meta_info)
logging.getLogger(__name__).info("Creating %s", str(store))
test_run_id = cfg.opts("system", "test_run.id")
test_run_timestamp = cfg.opts("system", "time.start")
selected_cluster_config = cfg.opts("builder", "cluster_config.names") \
if cluster_config is None else cluster_config
store.open(
test_run_id, test_run_timestamp,
workload, test_procedure, selected_cluster_config,
create=not read_only)
return store
def extract_user_tags_from_config(cfg):
"""
Extracts user tags into a structured dict
:param cfg: The current configuration object.
:return: A dict containing user tags. If no user tags are given, an empty dict is returned.
"""
user_tags = cfg.opts("test_run", "user.tag", mandatory=False)
return extract_user_tags_from_string(user_tags)
def extract_user_tags_from_string(user_tags):
"""
Extracts user tags into a structured dict
:param user_tags: A string containing user tags (tags separated by comma, key and value separated by colon).
:return: A dict containing user tags. If no user tags are given, an empty dict is returned.
"""
user_tags_dict = {}
if user_tags and user_tags.strip() != "":
try:
for user_tag in user_tags.split(","):
user_tag_key, user_tag_value = user_tag.split(":")
user_tags_dict[user_tag_key] = user_tag_value
except ValueError:
msg = "User tag keys and values have to separated by a ':'. Invalid value [%s]" % user_tags
logging.getLogger(__name__).exception(msg)
raise exceptions.SystemSetupError(msg)
return user_tags_dict
class SampleType(IntEnum):
Warmup = 0
Normal = 1
class MetricsStore:
"""
Abstract metrics store
"""
def __init__(self, cfg, clock=time.Clock, meta_info=None):
"""
Creates a new metrics store.
:param cfg: The config object. Mandatory.
:param clock: This parameter is optional and needed for testing.
:param meta_info: This parameter is optional and intended for creating a metrics store with a previously serialized meta-info.
"""
self._config = cfg
self._test_run_id = None
self._test_run_timestamp = None
self._workload = None
self._workload_params = cfg.opts("workload", "params", default_value={}, mandatory=False)
self._test_procedure = None
self._cluster_config = None
self._cluster_config_name = None
self._environment_name = cfg.opts("system", "env.name")
self.opened = False
if meta_info is None:
self._meta_info = {}
else:
self._meta_info = meta_info
# ensure mandatory keys are always present
if MetaInfoScope.cluster not in self._meta_info:
self._meta_info[MetaInfoScope.cluster] = {}
if MetaInfoScope.node not in self._meta_info:
self._meta_info[MetaInfoScope.node] = {}
self._clock = clock
self._stop_watch = self._clock.stop_watch()
self.logger = logging.getLogger(__name__)
def open(self, test_run_id=None, test_run_timestamp=None, workload_name=None,\
test_procedure_name=None, cluster_config_name=None, ctx=None,\
create=False):
"""
Opens a metrics store for a specific test_run, workload, test_procedure and cluster_config.
:param test_run_id: The test run id. This attribute is sufficient to uniquely identify a test_run.
:param test_run_timestamp: The test run timestamp as a datetime.
:param workload_name: Workload name.
:param test_procedure_name: TestProcedure name.
:param cluster_config_name: ClusterConfigInstance name.
:param ctx: An metrics store open context retrieved from another metrics store with ``#open_context``.
:param create: True if an index should be created (if necessary). This is typically True, when attempting to write metrics and
False when it is just opened for reading (as we can assume all necessary indices exist at this point).
"""
if ctx:
self._test_run_id = ctx["test-run-id"]
self._test_run_timestamp = ctx["test-run-timestamp"]
self._workload = ctx["workload"]
self._test_procedure = ctx["test_procedure"]
self._cluster_config = ctx["cluster-config-instance"]
else:
self._test_run_id = test_run_id
self._test_run_timestamp = time.to_iso8601(test_run_timestamp)
self._workload = workload_name
self._test_procedure = test_procedure_name
self._cluster_config = cluster_config_name
assert self._test_run_id is not None, "Attempting to open metrics store without a test run id"
assert self._test_run_timestamp is not None, "Attempting to open metrics store without a test run timestamp"
self._cluster_config_name = "+".join(self._cluster_config) \
if isinstance(self._cluster_config, list) \
else self._cluster_config
self.logger.info("Opening metrics store for test run timestamp=[%s], workload=[%s],"
"test_procedure=[%s], cluster_config=[%s]",
self._test_run_timestamp, self._workload, self._test_procedure, self._cluster_config_name)
user_tags = extract_user_tags_from_config(self._config)
for k, v in user_tags.items():
# prefix user tag with "tag_" in order to avoid clashes with our internal meta data
self.add_meta_info(MetaInfoScope.cluster, None, "tag_%s" % k, v)
# Don't store it for each metrics record as it's probably sufficient on test run level
# self.add_meta_info(MetaInfoScope.cluster, None, "benchmark_version", version.version())
self._stop_watch.start()
self.opened = True
def reset_relative_time(self):
"""
Resets the internal relative-time counter to zero.
"""
self._stop_watch.start()
def flush(self, refresh=True):
"""
Explicitly flushes buffered metrics to the metric store. It is not required to flush before closing the metrics store.
"""
raise NotImplementedError("abstract method")
def close(self):
"""
Closes the metric store. Note that it is mandatory to close the metrics store when it is no longer needed as it only persists
metrics on close (in order to avoid additional latency during the benchmark).
"""
self.logger.info("Closing metrics store.")
self.flush()
self._clear_meta_info()
self.opened = False
def add_meta_info(self, scope, scope_key, key, value):
"""
Adds new meta information to the metrics store. All metrics entries that are created after calling this method are guaranteed to
contain the added meta info (provided is on the same level or a level below, e.g. a cluster level metric will not contain node
level meta information but all cluster level meta information will be contained in a node level metrics record).
:param scope: The scope of the meta information. See MetaInfoScope.
:param scope_key: The key within the scope. For cluster level metrics None is expected, for node level metrics the node name.
:param key: The key of the meta information.
:param value: The value of the meta information.
"""
if scope == MetaInfoScope.cluster:
self._meta_info[MetaInfoScope.cluster][key] = value
elif scope == MetaInfoScope.node:
if scope_key not in self._meta_info[MetaInfoScope.node]:
self._meta_info[MetaInfoScope.node][scope_key] = {}
self._meta_info[MetaInfoScope.node][scope_key][key] = value
else:
raise exceptions.SystemSetupError("Unknown meta info scope [%s]" % scope)
def _clear_meta_info(self):
"""
Clears all internally stored meta-info. This is considered Solr Orbit internal API and not intended for normal client consumption.
"""
self._meta_info = {
MetaInfoScope.cluster: {},
MetaInfoScope.node: {}
}
@property
def open_context(self):
return {
"test-run-id": self._test_run_id,
"test-run-timestamp": self._test_run_timestamp,
"workload": self._workload,
"test_procedure": self._test_procedure,
"cluster-config-instance": self._cluster_config
}
def put_value_cluster_level(self, name, value, unit=None, task=None, operation=None, operation_type=None, sample_type=SampleType.Normal,
absolute_time=None, relative_time=None, meta_data=None):
"""
Adds a new cluster level value metric.
:param name: The name of the metric.
:param value: The metric value. It is expected to be numeric.
:param unit: The unit of this metric value (e.g. ms, docs/s). Optional. Defaults to None.
:param task: The task name to which this value applies. Optional. Defaults to None.
:param operation: The operation name to which this value applies. Optional. Defaults to None.
:param operation_type: The operation type to which this value applies. Optional. Defaults to None.
:param sample_type: Whether this is a warmup or a normal measurement sample. Defaults to SampleType.Normal.
:param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
store will derive the timestamp automatically.
:param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
Defaults to None. The metrics store will derive the timestamp automatically.
:param meta_data: A dict, containing additional key-value pairs. Defaults to None.
"""
self._put_metric(MetaInfoScope.cluster, None, name, value, unit, task, operation, operation_type, sample_type, absolute_time,
relative_time, meta_data)
def put_value_node_level(self, node_name, name, value, unit=None, task=None, operation=None, operation_type=None,
sample_type=SampleType.Normal, absolute_time=None, relative_time=None, meta_data=None):
"""
Adds a new node level value metric.
:param name: The name of the metric.
:param node_name: The name of the cluster node for which this metric has been determined.
:param value: The metric value. It is expected to be numeric.
:param unit: The unit of this metric value (e.g. ms, docs/s). Optional. Defaults to None.
:param task: The task name to which this value applies. Optional. Defaults to None.
:param operation: The operation name to which this value applies. Optional. Defaults to None.
:param operation_type: The operation type to which this value applies. Optional. Defaults to None.
:param sample_type: Whether this is a warmup or a normal measurement sample. Defaults to SampleType.Normal.
:param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
store will derive the timestamp automatically.
:param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
Defaults to None. The metrics store will derive the timestamp automatically.
:param meta_data: A dict, containing additional key-value pairs. Defaults to None.
"""
self._put_metric(MetaInfoScope.node, node_name, name, value, unit, task, operation, operation_type, sample_type, absolute_time,
relative_time, meta_data)
def _put_metric(self, level, level_key, name, value, unit, task, operation, operation_type, sample_type, absolute_time=None,
relative_time=None, meta_data=None):
if level == MetaInfoScope.cluster:
meta = self._meta_info[MetaInfoScope.cluster].copy()
elif level == MetaInfoScope.node:
meta = self._meta_info[MetaInfoScope.cluster].copy()
if level_key in self._meta_info[MetaInfoScope.node]:
meta.update(self._meta_info[MetaInfoScope.node][level_key])
else:
raise exceptions.SystemSetupError("Unknown meta info level [%s] for metric [%s]" % (level, name))
if meta_data:
meta.update(meta_data)
if absolute_time is None:
absolute_time = self._clock.now()
if relative_time is None:
relative_time = self._stop_watch.split_time()
doc = {
"@timestamp": time.to_epoch_millis(absolute_time),
"relative-time-ms": convert.seconds_to_ms(relative_time),
"test-run-id": self._test_run_id,
"test-run-timestamp": self._test_run_timestamp,
"environment": self._environment_name,
"workload": self._workload,
"test_procedure": self._test_procedure,
"cluster-config-instance": self._cluster_config_name,
"name": name,
"value": value,
"unit": unit,
"sample-type": sample_type.name.lower(),
"meta": meta
}
if task:
doc["task"] = task
if operation:
doc["operation"] = operation
if operation_type:
doc["operation-type"] = operation_type
if self._workload_params:
doc["workload-params"] = self._workload_params
self._add(doc)
def put_doc(self, doc, level=None, node_name=None, meta_data=None, absolute_time=None, relative_time=None):
"""
Adds a new document to the metrics store. It will merge additional properties into the doc such as timestamps or workload info.
:param doc: The raw document as a ``dict``. Ownership is transferred to the metrics store (i.e. don't reuse that object).
:param level: Whether these are cluster or node-level metrics. May be ``None`` if not applicable.
:param node_name: The name of the node in case metrics are on node level.
:param meta_data: A dict, containing additional key-value pairs. Defaults to None.
:param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
store will derive the timestamp automatically.
:param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
Defaults to None. The metrics store will derive the timestamp automatically.
"""
if level == MetaInfoScope.cluster:
meta = self._meta_info[MetaInfoScope.cluster].copy()
elif level == MetaInfoScope.node:
meta = self._meta_info[MetaInfoScope.cluster].copy()
if node_name in self._meta_info[MetaInfoScope.node]:
meta.update(self._meta_info[MetaInfoScope.node][node_name])
elif level is None:
meta = None
else:
raise exceptions.SystemSetupError("Unknown meta info level [{}]".format(level))
if meta and meta_data:
meta.update(meta_data)
if absolute_time is None:
absolute_time = self._clock.now()
if relative_time is None:
relative_time = self._stop_watch.split_time()
doc.update({
"@timestamp": time.to_epoch_millis(absolute_time),
"relative-time-ms": convert.seconds_to_ms(relative_time),
"test-run-id": self._test_run_id,
"test-run-timestamp": self._test_run_timestamp,
"environment": self._environment_name,
"workload": self._workload,
"test_procedure": self._test_procedure,
"cluster-config-instance": self._cluster_config_name,
})
if meta:
doc["meta"] = meta
if self._workload_params:
doc["workload-params"] = self._workload_params
self._add(doc)
def bulk_add(self, memento):
"""
Adds raw metrics store documents previously created with #to_externalizable()
:param memento: The external representation as returned by #to_externalizable().
"""
if memento:
self.logger.debug("Restoring in-memory representation of metrics store.")
for doc in pickle.loads(zlib.decompress(memento)):
self._add(doc)
def to_externalizable(self, clear=False):
raise NotImplementedError("abstract method")
def _add(self, doc):
"""
Adds a new document to the metrics store
:param doc: The new document.
"""
raise NotImplementedError("abstract method")
def get_one(self, name, sample_type=None, node_name=None, task=None, mapper=lambda doc: doc["value"],
sort_key=None, sort_reverse=False):
"""
Gets one value for the given metric name (even if there should be more than one).
:param name: The metric name to query.
:param sample_type The sample type to query. Optional. By default, all samples are considered.
:param node_name The name of the node where this metric was gathered. Optional.
:param task The task name to query. Optional.
:param sort_key The key to sort the docs before returning the first value. Optional.
:param sort_reverse The flag to reverse the sort. Optional.
:return: The corresponding value for the given metric name or None if there is no value.
"""
raise NotImplementedError("abstract method")
@staticmethod
def _first_or_none(values):
return values[0] if values else None
def get(self, name, task=None, operation_type=None, sample_type=None, node_name=None):
"""
Gets all raw values for the given metric name.
:param name: The metric name to query.
:param task The task name to query. Optional.
:param operation_type The operation type to query. Optional.
:param sample_type The sample type to query. Optional. By default, all samples are considered.
:param node_name The name of the node where this metric was gathered. Optional.
:return: A list of all values for the given metric.
"""
return self._get(name, task, operation_type, sample_type, node_name, lambda doc: doc["value"])
def get_raw(self, name, task=None, operation_type=None, sample_type=None, node_name=None, mapper=lambda doc: doc):
"""
Gets all raw records for the given metric name.
:param name: The metric name to query.
:param task The task name to query. Optional.
:param operation_type The operation type to query. Optional.
:param sample_type The sample type to query. Optional. By default, all samples are considered.
:param node_name The name of the node where this metric was gathered. Optional.
:param mapper A record mapper. By default, the complete record is returned.
:return: A list of all raw records for the given metric.
"""
return self._get(name, task, operation_type, sample_type, node_name, mapper)
def get_unit(self, name, task=None, operation_type=None, node_name=None):
"""
Gets the unit for the given metric name.
:param name: The metric name to query.
:param task The task name to query. Optional.
:param operation_type The operation type to query. Optional.
:param node_name The name of the node where this metric was gathered. Optional.
:return: The corresponding unit for the given metric name or None if no metric record is available.
"""
# does not make too much sense to ask for a sample type here
return self._first_or_none(self._get(name, task, operation_type, None, node_name, lambda doc: doc["unit"]))
def _get(self, name, task, operation_type, sample_type, node_name, mapper):
raise NotImplementedError("abstract method")
def get_error_rate(self, task, operation_type=None, sample_type=None):
"""
Gets the error rate for a specific task.
:param task The task name to query.
:param operation_type The operation type to query. Optional.
:param sample_type The sample type to query. Optional. By default, all samples are considered.
:return: A float between 0.0 and 1.0 (inclusive) representing the error rate.
"""
raise NotImplementedError("abstract method")
def get_stats(self, name, task=None, operation_type=None, sample_type=None):
"""
Gets standard statistics for the given metric.
:param name: The metric name to query.
:param task The task name to query. Optional.
:param operation_type The operation type to query. Optional.
:param sample_type The sample type to query. Optional. By default, all samples are considered.
:return: A metric_stats structure.
"""
raise NotImplementedError("abstract method")
def get_percentiles(self, name, task=None, operation_type=None, sample_type=None, percentiles=None):
"""
Retrieves percentile metrics for the given metric.
:param name: The metric name to query.
:param task The task name to query. Optional.
:param operation_type The operation type to query. Optional.
:param sample_type The sample type to query. Optional. By default, all samples are considered.
:param percentiles: An optional list of percentiles to show. If None is provided, by default the 99th, 99.9th and 100th percentile
are determined. Ensure that there are enough data points in the metrics store (e.g. it makes no sense to retrieve a 99.9999
percentile when there are only 10 values).
:return: An ordered dictionary of the determined percentile values in ascending order. Key is the percentile, value is the
determined value at this percentile. If no percentiles could be determined None is returned.
"""
raise NotImplementedError("abstract method")
def get_median(self, name, task=None, operation_type=None, sample_type=None):
"""
Retrieves median value of the given metric.
:param name: The metric name to query.
:param task The task name to query. Optional.
:param operation_type The operation type to query. Optional.
:param sample_type The sample type to query. Optional. By default, all samples are considered.
:return: The median value.
"""
median = "50.0"
percentiles = self.get_percentiles(name, task, operation_type, sample_type, percentiles=[median])
return percentiles[median] if percentiles else None
def get_mean(self, name, task=None, operation_type=None, sample_type=None):
"""
Retrieves mean of the given metric.
:param name: The metric name to query.
:param task The task name to query. Optional.
:param operation_type The operation type to query. Optional.
:param sample_type The sample type to query. Optional. By default, all samples are considered.
:return: The mean.
"""
stats = self.get_stats(name, task, operation_type, sample_type)
return stats["avg"] if stats else None
class InMemoryMetricsStore(MetricsStore):
# Note that this implementation can run out of memory; generally, this can occur when ingesting very large corpora.
# Approx size of a metrics doc (after tracking memory consumption during ingestion.
DOC_SIZE_IN_BYTES = 1500
# Warn when memory consumption crosses this threshold (percentage usage).
MEMORY_WARNING_THRESHOLD = 85
# Check memory usage every-so-many docs.
MEMORY_CHECK_FREQUENCY = 10000
def __init__(self, cfg, clock=time.Clock, meta_info=None):
"""
Creates a new metrics store.
:param cfg: The config object. Mandatory.
:param clock: This parameter is optional and needed for testing.
:param meta_info: This parameter is optional and intended for creating a metrics store with a previously serialized meta-info.
"""
super().__init__(cfg=cfg, clock=clock, meta_info=meta_info)
self.docs = []
self.doc_count = 0
self.logger = logging.getLogger(__name__)
self.out_of_memory = False
self.memory_available_threshold = psutil.virtual_memory().total * (100 - self.MEMORY_WARNING_THRESHOLD) / 100
def __del__(self):
"""
Deletes the metrics store instance.
"""
del self.docs
def _add(self, doc):
if self.out_of_memory:
return
if self.doc_count % self.MEMORY_CHECK_FREQUENCY == 0 and psutil.virtual_memory().available < self.memory_available_threshold:
console.warn("Memory threshold exceeded by in-memory metrics store, not adding additional entries", logger=self.logger)
self.out_of_memory = True
return
self.docs.append(doc)
self.doc_count += 1
def flush(self, refresh=True):
pass
def to_externalizable(self, clear=False):
docs = self.docs
if clear:
self.docs = []
self.doc_count = 0
self.out_of_memory = False
if len(docs) * self.DOC_SIZE_IN_BYTES > psutil.virtual_memory().available - self.memory_available_threshold:
console.warn("Memory threshold exceeded by in-memory metrics store, skipping summary generation for current operation",
logger=self.logger)
return None
compressed = zlib.compress(pickle.dumps(docs))
self.logger.debug("Compression changed size of metric store from [%d] bytes to [%d] bytes",
sys.getsizeof(docs, -1), sys.getsizeof(compressed, -1))
return compressed
def get_percentiles(self, name, task=None, operation_type=None, sample_type=None, percentiles=None):
if percentiles is None:
percentiles = [99, 99.9, 100]
result = collections.OrderedDict()
values = self.get(name, task, operation_type, sample_type)
if len(values) > 0:
sorted_values = sorted(values)
for percentile in percentiles:
result[percentile] = self.percentile_value(sorted_values, percentile)
return result
@staticmethod
def percentile_value(sorted_values, percentile):
"""
Calculates a percentile value for a given list of values and a percentile.
The implementation is based on http://onlinestatbook.com/2/introduction/percentiles.html
:param sorted_values: A sorted list of raw values for which a percentile should be calculated.
:param percentile: A percentile between [0, 100]
:return: the corresponding percentile value.
"""
rank = float(percentile) / 100.0 * (len(sorted_values) - 1)
if rank == int(rank):
return sorted_values[int(rank)]
else:
lr = math.floor(rank)
lr_next = math.ceil(rank)
fr = rank - lr
lower_score = sorted_values[lr]
higher_score = sorted_values[lr_next]
return lower_score + (higher_score - lower_score) * fr
def get_error_rate(self, task, operation_type=None, sample_type=None):
error = 0
total_count = 0
for doc in self.docs:
# we can use any request metrics record (i.e. service time or latency)
if doc["name"] == "service_time" and doc["task"] == task and \
(operation_type is None or doc["operation-type"] == operation_type) and \
(sample_type is None or doc["sample-type"] == sample_type.name.lower()):
total_count += 1
if doc["meta"]["success"] is False:
error += 1
if total_count > 0:
return error / total_count
else:
return 0.0
def get_stats(self, name, task=None, operation_type=None, sample_type=SampleType.Normal):
values = self.get(name, task, operation_type, sample_type)
sorted_values = sorted(values)
if len(sorted_values) > 0:
return {
"count": len(sorted_values),
"min": sorted_values[0],
"max": sorted_values[-1],
"avg": statistics.mean(sorted_values),
"sum": sum(sorted_values)
}
else:
return None
def _get(self, name, task, operation_type, sample_type, node_name, mapper):
return [mapper(doc)
for doc in self.docs
if doc["name"] == name and
(task is None or doc["task"] == task) and
(operation_type is None or doc["operation-type"] == operation_type) and
(sample_type is None or doc["sample-type"] == sample_type.name.lower()) and
(node_name is None or doc.get("meta", {}).get("node_name") == node_name)
]
def get_one(self, name, sample_type=None, node_name=None, task=None, mapper=lambda doc: doc["value"],
sort_key=None, sort_reverse=False):
if sort_key:
docs = sorted(self.docs, key=lambda k: k[sort_key], reverse=sort_reverse)
else:
docs = self.docs
for doc in docs:
if (doc["name"] == name and (task is None or doc["task"] == task) and
(sample_type is None or doc["sample-type"] == sample_type.name.lower()) and
(node_name is None or doc.get("meta", {}).get("node_name") == node_name)):
return mapper(doc)
return None
def __str__(self):
return "in-memory metrics store"
class FilesystemMetricsStore(InMemoryMetricsStore):
"""Extends InMemoryMetricsStore to also persist each metric document to disk as a JSONL file."""
def __init__(self, cfg, clock=time.Clock, meta_info=None):
super().__init__(cfg=cfg, clock=clock, meta_info=meta_info)
self._metrics_file = None
def open(self, test_run_id=None, test_run_timestamp=None, workload_name=None,
test_procedure_name=None, cluster_config_name=None, ctx=None, create=False):
super().open(test_run_id, test_run_timestamp, workload_name,
test_procedure_name, cluster_config_name, ctx, create)
if create:
run_dir = paths.test_run_root(self._config, test_run_id=self._test_run_id)
io.ensure_dir(run_dir)
metrics_path = os.path.join(run_dir, "metrics.jsonl")
# line-buffered so each metric line is flushed immediately
self._metrics_file = open(metrics_path, "wt", encoding="utf-8", buffering=1) # pylint: disable=consider-using-with
self.logger.info("FilesystemMetricsStore writing raw metrics to %s", metrics_path)
def _add(self, doc):
super()._add(doc)
if self._metrics_file is not None:
self._metrics_file.write(json.dumps(doc) + "\n")
def flush(self, refresh=True):
if self._metrics_file is not None:
self._metrics_file.flush()
def close(self):
super().close()
if self._metrics_file is not None:
self._metrics_file.close()
self._metrics_file = None
def __str__(self):
return "filesystem metrics store"
def test_run_store(cfg):
"""
Creates a proper test_run store based on the current configuration.
:param cfg: Config object. Mandatory.
:return: A test_run store implementation.
"""
logger = logging.getLogger(__name__)
logger.info("Creating file test_run store")
return FileTestRunStore(cfg)
def results_store(cfg):
"""
Creates a proper test_run store based on the current configuration.
:param cfg: Config object. Mandatory.
:return: A test_run store implementation.
"""
logger = logging.getLogger(__name__)
logger.info("Creating no-op results store")
return NoopResultsStore()
def list_test_helper(store_item, title):
def format_dict(d):
if d:
items = sorted(d.items())
return ", ".join(["%s=%s" % (k, v) for k, v in items])
else:
return None
test_runs = []
for test_run in store_item:
test_runs.append([
test_run.test_run_id,
time.to_iso8601(test_run.test_run_timestamp),
test_run.workload,
format_dict(test_run.workload_params),
test_run.test_procedure_name,
test_run.cluster_config_name,
format_dict(test_run.user_tags),
test_run.workload_revision,
test_run.cluster_config_revision])
if len(test_runs) > 0:
console.println(f"\nRecent {title}:\n")
console.println(tabulate.tabulate(
test_runs,
headers=[
"TestRun ID",
"TestRun Timestamp",
"Workload",
"Workload Parameters",
"TestProcedure",
"ClusterConfigInstance",
"User Tags",
"workload Revision",
"Cluster Config Revision"
]))
else:
console.println("")
console.println(f"No recent {title} found.")
def list_test_runs(cfg):
list_test_helper(test_run_store(cfg).list(), "test-runs")
def list_aggregated_results(cfg):
list_test_helper(test_run_store(cfg).list_aggregations(), "aggregated-results")
def create_test_run(cfg, workload, test_procedure, workload_revision=None):
cluster_config = cfg.opts("builder", "cluster_config.names")
environment = cfg.opts("system", "env.name")
test_run_id = cfg.opts("system", "test_run.id")
test_run_timestamp = cfg.opts("system", "time.start")
user_tags = extract_user_tags_from_config(cfg)
pipeline = cfg.opts("test_run", "pipeline")
workload_params = cfg.opts("workload", "params")
cluster_config_params = cfg.opts("builder", "cluster_config.params")
plugin_params = cfg.opts("builder", "plugin.params")
benchmark_version = version.version()
benchmark_revision = version.revision()
latency_percentiles = cfg.opts("workload", "latency.percentiles", mandatory=False,
default_value=GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES)
throughput_percentiles = cfg.opts("workload", "throughput.percentiles", mandatory=False,
default_value=GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES)
# In tests, we don't get the default command-line arg value for percentiles,
# so supply them as defaults here as well
# Get cluster_config_instance if available (stored during provisioning)
cluster_config_instance = cfg.opts("builder", "cluster_config.instance", mandatory=False, default_value=None)
return TestRun(benchmark_version, benchmark_revision,
environment, test_run_id, test_run_timestamp,
pipeline, user_tags, workload,
workload_params, test_procedure, cluster_config, cluster_config_params,
plugin_params, workload_revision, latency_percentiles=latency_percentiles,
throughput_percentiles=throughput_percentiles, cluster_config_instance=cluster_config_instance)
class TestRun:
def __init__(self, benchmark_version, benchmark_revision, environment_name,
test_run_id, test_run_timestamp, pipeline, user_tags,
workload, workload_params, test_procedure, cluster_config,
cluster_config_params, plugin_params,
workload_revision=None, cluster_config_revision=None,
distribution_version=None, distribution_flavor=None,
revision=None, results=None, meta_data=None, latency_percentiles=None, throughput_percentiles=None,
cluster_config_instance=None):
if results is None:
results = {}
# this happens when the test run is created initially
if meta_data is None:
meta_data = {}
if workload:
meta_data.update(workload.meta_data)
if test_procedure:
meta_data.update(test_procedure.meta_data)
if latency_percentiles:
# split comma-separated string into list of floats
latency_percentiles = [float(value) for value in latency_percentiles.split(",")]
if throughput_percentiles:
throughput_percentiles = [float(value) for value in throughput_percentiles.split(",")]
self.benchmark_version = benchmark_version
self.benchmark_revision = benchmark_revision
self.environment_name = environment_name
self.test_run_id = test_run_id
self.test_run_timestamp = test_run_timestamp
self.pipeline = pipeline
self.user_tags = user_tags
self.workload = workload
self.workload_params = workload_params
self.test_procedure = test_procedure
self.cluster_config = cluster_config
self.cluster_config_params = cluster_config_params
self.plugin_params = plugin_params
self.workload_revision = workload_revision
self.cluster_config_revision = cluster_config_revision
self.distribution_version = distribution_version
self.distribution_flavor = distribution_flavor
self.revision = revision
self.results = results
self.meta_data = meta_data
self.latency_percentiles = latency_percentiles
self.throughput_percentiles = throughput_percentiles
self.cluster_config_instance = cluster_config_instance
@property
def workload_name(self):
return str(self.workload)
@property
def test_procedure_name(self):
return str(self.test_procedure) if self.test_procedure else None
@property
def cluster_config_name(self):
return "+".join(self.cluster_config) \
if isinstance(self.cluster_config, list) \
else self.cluster_config
def add_results(self, results):
self.results = results
def as_dict(self):
"""
:return: A dict representation suitable for persisting this test run instance as JSON.
"""
d = {
"benchmark-version": self.benchmark_version,
"benchmark-revision": self.benchmark_revision,
"environment": self.environment_name,
"test-run-id": self.test_run_id,
"test-run-timestamp": time.to_iso8601(self.test_run_timestamp),
"pipeline": self.pipeline,
"user-tags": self.user_tags,
"workload": self.workload_name,
"cluster-config-instance": self.cluster_config,
"cluster": {
"revision": self.revision,
"distribution-version": self.distribution_version,
"distribution-flavor": self.distribution_flavor,
"cluster-config-revision": self.cluster_config_revision,
}
}
if self.results:
# if results was loaded from JSON it’s already a dict
if isinstance(self.results, dict):
d["results"] = self.results
elif hasattr(self.results, "as_dict"):
d["results"] = self.results.as_dict()
else:
# Fallback: just stick whatever it is in there
d["results"] = self.results
if self.workload_revision:
d["workload-revision"] = self.workload_revision
d["test_procedure"] = self.test_procedure_name
if self.workload_params:
d["workload-params"] = self.workload_params
if self.cluster_config_params:
d["cluster-config-instance-params"] = self.cluster_config_params
if self.plugin_params:
d["plugin-params"] = self.plugin_params
# Add complete cluster-config specification for result portal display and time-series analysis
if self.cluster_config_instance:
d["cluster-config-spec"] = {
"names": self.cluster_config_instance.names if hasattr(self.cluster_config_instance, "names") else [],
"variables": self.cluster_config_instance.variables if hasattr(self.cluster_config_instance, "variables") else {},
"config_paths": self.cluster_config_instance.config_paths if hasattr(self.cluster_config_instance, "config_paths") else [],
"root_path": self.cluster_config_instance.root_path if hasattr(self.cluster_config_instance, "root_path") else None,
"provider": str(self.cluster_config_instance.provider) if hasattr(self.cluster_config_instance, "provider") else None,
"flavor": str(self.cluster_config_instance.flavor) if hasattr(self.cluster_config_instance, "flavor") else None,
}
return d
def to_result_dicts(self):
"""
:return: a list of dicts, suitable for persisting the results of this test run in a format that is Kibana-friendly.
"""
result_template = {
"benchmark-version": self.benchmark_version,
"benchmark-revision": self.benchmark_revision,
"environment": self.environment_name,
"test-run-id": self.test_run_id,
"test-run-timestamp": time.to_iso8601(self.test_run_timestamp),
"distribution-version": self.distribution_version,
"distribution-flavor": self.distribution_flavor,
"user-tags": self.user_tags,
"workload": self.workload_name,
"test_procedure": self.test_procedure_name,
"cluster-config-instance": self.cluster_config_name,
# allow to logically delete records, e.g. for UI purposes when we only want to show the latest result
"active": True
}
if self.distribution_version:
result_template["distribution-major-version"] = versions.major_version(self.distribution_version)
if self.cluster_config_revision:
result_template["cluster-config-revision"] = self.cluster_config_revision
if self.workload_revision:
result_template["workload-revision"] = self.workload_revision
if self.workload_params:
result_template["workload-params"] = self.workload_params
if self.cluster_config_params:
result_template["cluster-config-instance-params"] = self.cluster_config_params
if self.plugin_params:
result_template["plugin-params"] = self.plugin_params
if self.meta_data:
result_template["meta"] = self.meta_data
all_results = []
for item in self.results.as_flat_list():
result = result_template.copy()
result.update(item)
all_results.append(result)
return all_results
@classmethod
def from_dict(cls, d):
user_tags = d.get("user-tags", {})
# TODO: cluster is optional for BWC. This can be removed after some grace period.
cluster = d.get("cluster", {})
return TestRun(d["benchmark-version"], d.get("benchmark-revision"), d["environment"], d["test-run-id"],
time.from_is8601(d["test-run-timestamp"]),
d["pipeline"], user_tags, d["workload"], d.get("workload-params"),
d.get("test_procedure"), d["cluster-config-instance"],
d.get("cluster-config-instance-params"), d.get("plugin-params"),
workload_revision=d.get("workload-revision"),
cluster_config_revision=cluster.get("cluster-config-revision"),
distribution_version=cluster.get("distribution-version"),
distribution_flavor=cluster.get("distribution-flavor"),
revision=cluster.get("revision"), results=d.get("results"), meta_data=d.get("meta", {}))
class TestRunStore:
def __init__(self, cfg):
self.cfg = cfg
self.environment_name = cfg.opts("system", "env.name")
def find_by_test_run_id(self, test_run_id):
raise NotImplementedError("abstract method")
def list(self):
raise NotImplementedError("abstract method")
def store_test_run(self, test_run):
raise NotImplementedError("abstract method")
def _max_results(self):
return int(self.cfg.opts("system", "list.test_runs.max_results"))
# NOT USED — retained as placeholder for a future external store (e.g. S3TestRunStore).
class CompositeTestRunStore:
"""
Placeholder delegating test-run store for a future external store integration
(e.g. S3TestRunStore). Delegates writes to both an external store and the local
file store; reads are served from the external store.
Not wired into any active code path. Does not inherit from TestRunStore —
it is a delegator with the same API.
"""
def __init__(self, external_store, file_store):
self.external_store = external_store
self.file_store = file_store
def find_by_test_run_id(self, test_run_id):
return self.external_store.find_by_test_run_id(test_run_id)
def store_test_run(self, test_run):
self.file_store.store_test_run(test_run)
self.external_store.store_test_run(test_run)
def store_html_results(self, test_run):
self.file_store.store_html_results(test_run)
def list(self):
return self.external_store.list()
class FileTestRunStore(TestRunStore):
_browser_opened = {}
def __init__(self, cfg):
super().__init__(cfg)
self._max_results = lambda: int(cfg.opts("system", "list.test_runs.max_results"))
def store_test_run(self, test_run):
open_browser = False
doc = test_run.as_dict()
test_run_path = paths.test_run_root(self.cfg, test_run_id=test_run.test_run_id)
io.ensure_dir(test_run_path)
with open(self._test_run_file(), mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))
# Check if visualization is enabled
if self.cfg.opts("workload", "visualize", mandatory=False, default_value=False):
# Reset the browser opened flag when we're storing the final results
if hasattr(test_run, "results") and test_run.results:
FileTestRunStore._browser_opened[test_run.test_run_id] = False
open_browser = True
self.store_html_results(test_run, open_browser)
def store_aggregated_run(self, test_run):
doc = test_run.as_dict()
aggregated_execution_path = paths.aggregated_results_root(self.cfg, test_run_id=test_run.test_run_id)
io.ensure_dir(aggregated_execution_path)
aggregated_file = os.path.join(aggregated_execution_path, "aggregated_test_run.json")
with open(aggregated_file, mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))
def store_html_results(self, test_run, open_browser=True):
# Check if custom output path is specified
custom_output_path = self.cfg.opts("workload", "visualize.output.path", mandatory=False, default_value=None)
html_content = render_results_html(test_run, self.cfg)
if custom_output_path:
dest = os.path.expanduser(custom_output_path)
os.makedirs(os.path.dirname(dest), exist_ok=True)
print("[DEBUG]: ", dest)
with open(dest, 'w', encoding='utf-8') as f:
f.write(html_content)
console.info(f"HTML report saved to: {dest}")
if open_browser:
webbrowser.open(f"file://{dest}")
return dest
else:
# Use default behavior
report_file = os.path.join(paths.test_run_root(self.cfg, test_run_id=test_run.test_run_id), "test_run_report.html")
print("[DEBUG]: ", report_file)
with open(report_file, mode="w", encoding="utf-8") as f:
f.write(html_content)
if open_browser:
webbrowser.open(f"file://{report_file}")
console.info(f"HTML report saved to: {report_file}")
return report_file
def _test_run_file(self, test_run_id=None, is_aggregated=False):
if is_aggregated:
return os.path.join(paths.aggregated_results_root(cfg=self.cfg, test_run_id=test_run_id),
"aggregated_test_run.json")
else:
return os.path.join(paths.test_run_root(cfg=self.cfg, test_run_id=test_run_id), "test_run.json")
def list(self):
results = glob.glob(self._test_run_file(test_run_id="*"))
all_test_runs = self._to_test_runs(results)
return all_test_runs[:self._max_results()]
def list_aggregations(self):
aggregated_results = glob.glob(self._test_run_file(test_run_id="*", is_aggregated=True))
return self._to_test_runs(aggregated_results)
def find_by_test_run_id(self, test_run_id):
is_aggregated = test_run_id.startswith('aggregate')
test_run_file = self._test_run_file(test_run_id=test_run_id, is_aggregated=is_aggregated)
if io.exists(test_run_file):
test_runs = self._to_test_runs([test_run_file])
if test_runs:
return test_runs[0]
raise exceptions.NotFound("No test run with test run id [{}]".format(test_run_id))
def _to_test_runs(self, results):
test_runs = []
for result in results:
# noinspection PyBroadException
try:
with open(result, mode="rt", encoding="utf-8") as f:
test_runs.append(TestRun.from_dict(json.loads(f.read())))
except BaseException:
logging.getLogger(__name__).exception("Could not load test_run file [%s] (incompatible format?) Skipping...", result)
return sorted(test_runs, key=lambda r: r.test_run_timestamp, reverse=True)
class NoopResultsStore:
"""
Does not store any results separately as these are stored as part of the test_run on the file system.
"""
def store_results(self, test_run):
pass
# helper function for encoding and decoding float keys so that the OpenSearch metrics store can save them.
def encode_float_key(k):
# ensure that the key is indeed a float to unify the representation (e.g. 50 should be represented as "50_0")
return str(float(k)).replace(".", "_")
def filter_percentiles_by_sample_size(sample_size, percentiles):
# Don't show percentiles if there aren't enough samples for the value to be distinct.
# For example, we should only show p99.9, p45.6, or p0.01 if there are at least 1000 values.
# If nothing is suitable, default to just returning [100] rather than an empty list.
if sample_size < 1:
raise AssertionError("Percentiles require at least one sample")
filtered_percentiles = []
# Treat the cases below 10 separately, to return p25, 50, 75, 100 if present
if sample_size == 1:
filtered_percentiles = [100]
elif sample_size < 4:
for p in [50, 100]:
if p in percentiles:
filtered_percentiles.append(p)
elif sample_size < 10:
for p in [25, 50, 75, 100]:
if p in percentiles:
filtered_percentiles.append(p)
else:
effective_sample_size = 10 ** (int(math.log10(sample_size))) # round down to nearest power of ten
delta = 0.000001 # If (p / 100) * effective_sample_size is within this value of a whole number,
# assume the discrepancy is due to floating point and allow it
for p in percentiles:
fraction = p / 100
# check if fraction * effective_sample_size is close enough to a whole number
if abs((effective_sample_size * fraction) - round(effective_sample_size*fraction)) < delta or p in [25, 75]:
filtered_percentiles.append(p)
# if no percentiles are suitable, just return 100
if len(filtered_percentiles) == 0:
return [100]
return filtered_percentiles
def percentiles_for_sample_size(sample_size, percentiles_list=None):
# If latency_percentiles is present, as a list, display those values instead (assuming there are enough samples)
percentiles = []
if percentiles_list:
percentiles = percentiles_list # Defaults get overridden if a value is provided
percentiles.sort()
return filter_percentiles_by_sample_size(sample_size, percentiles)
class GlobalStatsCalculator:
DEFAULT_LATENCY_PERCENTILES = "50,90,99,99.9,99.99,100"
DEFAULT_LATENCY_PERCENTILES_LIST = [float(value) for value in DEFAULT_LATENCY_PERCENTILES.split(",")]
DEFAULT_THROUGHPUT_PERCENTILES = ""
DEFAULT_THROUGHPUT_PERCENTILES_LIST = []
OTHER_PERCENTILES = [50,90,99,99.9,99.99,100]
# Use these percentiles when the single_latency fn is called for something other than latency
def __init__(self, store, workload, test_procedure, latency_percentiles=None, throughput_percentiles=None):
self.store = store
self.logger = logging.getLogger(__name__)
self.workload = workload
self.test_procedure = test_procedure
self.latency_percentiles = latency_percentiles
self.throughput_percentiles = throughput_percentiles
def __call__(self):
result = GlobalStats()
for tasks in self.test_procedure.schedule:
for task in tasks:
task_name = task.name
op_type = task.operation.type
error_rate = self.error_rate(task_name, op_type)
duration = self.duration(task_name)
if task.operation.include_in_reporting or error_rate > 0:
self.logger.debug("Gathering request metrics for [%s].", task_name)
result.add_op_metrics(
task_name,
task.operation.name,
self.summary_stats("throughput", task_name, op_type, percentiles_list=self.throughput_percentiles),
self.single_latency(task_name, op_type),
self.single_latency(task_name, op_type, metric_name="service_time"),
self.single_latency(task_name, op_type, metric_name="client_processing_time"),
self.single_latency(task_name, op_type, metric_name="processing_time"),
error_rate,
duration,
self.merge(
self.workload.meta_data,
self.test_procedure.meta_data,
task.operation.meta_data,
task.meta_data,
),
)
result.add_correctness_metrics(
task_name,
task.operation.name,
self.single_latency(task_name, op_type, metric_name="recall@k"),
self.single_latency(task_name, op_type, metric_name="recall@1"),
error_rate,
duration
)
profile_metrics = task.operation.params.get("profile-metrics", None)
if profile_metrics:
profile_metrics.append("query_time")
result.add_profile_metrics(
task_name,
task.operation.name,
{name: self.single_latency(task_name, op_type, metric_name=name) for name in profile_metrics}
)
self.logger.debug("Gathering indexing metrics.")
result.total_time = self.sum("indexing_total_time")
result.total_time_per_shard = self.shard_stats("indexing_total_time")
result.indexing_throttle_time = self.sum("indexing_throttle_time")
result.indexing_throttle_time_per_shard = self.shard_stats("indexing_throttle_time")
result.merge_time = self.sum("merges_total_time")
result.merge_time_per_shard = self.shard_stats("merges_total_time")
result.merge_count = self.sum("merges_total_count")
result.refresh_time = self.sum("refresh_total_time")
result.refresh_time_per_shard = self.shard_stats("refresh_total_time")
result.refresh_count = self.sum("refresh_total_count")
result.flush_time = self.sum("flush_total_time")
result.flush_time_per_shard = self.shard_stats("flush_total_time")
result.flush_count = self.sum("flush_total_count")
result.merge_throttle_time = self.sum("merges_total_throttled_time")
result.merge_throttle_time_per_shard = self.shard_stats("merges_total_throttled_time")
self.logger.debug("Gathering ML max processing times.")
result.ml_processing_time = self.ml_processing_time_stats()
self.logger.debug("Gathering garbage collection metrics.")
result.young_gc_time = self.sum("node_total_young_gen_gc_time")
result.young_gc_count = self.sum("node_total_young_gen_gc_count")
result.old_gc_time = self.sum("node_total_old_gen_gc_time")
result.old_gc_count = self.sum("node_total_old_gen_gc_count")
self.logger.debug("Gathering segment memory metrics.")
result.memory_segments = self.median("segments_memory_in_bytes")
result.memory_doc_values = self.median("segments_doc_values_memory_in_bytes")
result.memory_terms = self.median("segments_terms_memory_in_bytes")
result.memory_norms = self.median("segments_norms_memory_in_bytes")
result.memory_points = self.median("segments_points_memory_in_bytes")
result.memory_stored_fields = self.median("segments_stored_fields_memory_in_bytes")
result.store_size = self.sum("store_size_in_bytes")
result.translog_size = self.sum("translog_size_in_bytes")
# convert to int, fraction counts are senseless
median_segment_count = self.median("segments_count")
result.segment_count = int(median_segment_count) if median_segment_count is not None else median_segment_count
self.logger.debug("Gathering transform processing times.")
result.total_transform_processing_times = self.total_transform_metric("total_transform_processing_time")
result.total_transform_index_times = self.total_transform_metric("total_transform_index_time")
result.total_transform_search_times = self.total_transform_metric("total_transform_search_time")
result.total_transform_throughput = self.total_transform_metric("total_transform_throughput")
return result
def merge(self, *args):
# This is similar to dict(collections.ChainMap(args)) except that we skip `None` in our implementation.
result = {}
for arg in args:
if arg is not None:
result.update(arg)
return result
def sum(self, metric_name):
values = self.store.get(metric_name)
if values:
return sum(values)
else:
return None
def one(self, metric_name):
return self.store.get_one(metric_name)
def summary_stats(self, metric_name, task_name, operation_type, percentiles_list=None):
mean = self.store.get_mean(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
median = self.store.get_median(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
unit = self.store.get_unit(metric_name, task=task_name, operation_type=operation_type)
stats = self.store.get_stats(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
result = {}
if mean and median and stats:
result = {
"min": stats["min"],
"mean": mean,
"median": median,
"max": stats["max"],
"unit": unit
}
else:
result = {
"min": None,
"mean": None,
"median": None,
"max": None,
"unit": unit
}
if percentiles_list: # modified from single_latency()
sample_size = stats["count"]
percentiles = self.store.get_percentiles(metric_name,
task=task_name,
operation_type=operation_type,
sample_type=SampleType.Normal,
percentiles=percentiles_for_sample_size(
sample_size,
percentiles_list=percentiles_list))
for k, v in percentiles.items():
# safely encode so we don't have any dots in field names
result[encode_float_key(k)] = v
return result
def shard_stats(self, metric_name):
values = self.store.get_raw(metric_name, mapper=lambda doc: doc["per-shard"])
unit = self.store.get_unit(metric_name)
if values:
flat_values = [w for v in values for w in v]
return {
"min": min(flat_values),
"median": statistics.median(flat_values),
"max": max(flat_values),
"unit": unit
}
else:
return {}
def ml_processing_time_stats(self):
values = self.store.get_raw("ml_processing_time")
result = []
if values:
for v in values:
result.append({
"job": v["job"],
"min": v["min"],
"mean": v["mean"],
"median": v["median"],
"max": v["max"],
"unit": v["unit"]
})
return result
def total_transform_metric(self, metric_name):
values = self.store.get_raw(metric_name)
result = []
if values:
for v in values:
transform_id = v.get("meta", {}).get("transform_id")
if transform_id is not None:
result.append({
"id": transform_id,
"mean": v["value"],
"unit": v["unit"]
})
return result
def error_rate(self, task_name, operation_type):
return self.store.get_error_rate(task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
def duration(self, task_name):
return self.store.get_one("service_time", task=task_name, mapper=lambda doc: doc["relative-time-ms"],
sort_key="relative-time-ms", sort_reverse=True)
def median(self, metric_name, task_name=None, operation_type=None, sample_type=None):
return self.store.get_median(metric_name, task=task_name, operation_type=operation_type, sample_type=sample_type)
def single_latency(self, task, operation_type, metric_name="latency"):
sample_type = SampleType.Normal
stats = self.store.get_stats(metric_name, task=task, operation_type=operation_type, sample_type=sample_type)
sample_size = stats["count"] if stats else 0
percentiles_list = self.OTHER_PERCENTILES
if metric_name == "latency":
percentiles_list = self.latency_percentiles
if sample_size > 0:
# The custom latency percentiles have to be supplied here as the workload runs,
# or else they aren't present when results are published
percentiles = self.store.get_percentiles(metric_name,
task=task,
operation_type=operation_type,
sample_type=sample_type,
percentiles=percentiles_for_sample_size(
sample_size,
percentiles_list=percentiles_list
))
mean = self.store.get_mean(metric_name,
task=task,
operation_type=operation_type,
sample_type=sample_type)
unit = self.store.get_unit(metric_name, task=task, operation_type=operation_type)
stats = collections.OrderedDict()
for k, v in percentiles.items():
# safely encode so we don't have any dots in field names
stats[encode_float_key(k)] = v
stats["mean"] = mean
stats["unit"] = unit
return stats
else:
return {}
class GlobalStats:
def __init__(self, d=None):
self.op_metrics = self.v(d, "op_metrics", default=[])
self.correctness_metrics = self.v(d, "correctness_metrics", default=[])
self.profile_metrics = self.v(d, "profile_metrics", default=[])
self.total_time = self.v(d, "total_time")
self.total_time_per_shard = self.v(d, "total_time_per_shard", default={})
self.indexing_throttle_time = self.v(d, "indexing_throttle_time")
self.indexing_throttle_time_per_shard = self.v(d, "indexing_throttle_time_per_shard", default={})
self.merge_time = self.v(d, "merge_time")
self.merge_time_per_shard = self.v(d, "merge_time_per_shard", default={})
self.merge_count = self.v(d, "merge_count")
self.refresh_time = self.v(d, "refresh_time")
self.refresh_time_per_shard = self.v(d, "refresh_time_per_shard", default={})
self.refresh_count = self.v(d, "refresh_count")
self.flush_time = self.v(d, "flush_time")
self.flush_time_per_shard = self.v(d, "flush_time_per_shard", default={})
self.flush_count = self.v(d, "flush_count")
self.merge_throttle_time = self.v(d, "merge_throttle_time")
self.merge_throttle_time_per_shard = self.v(d, "merge_throttle_time_per_shard", default={})
self.ml_processing_time = self.v(d, "ml_processing_time", default=[])
self.young_gc_time = self.v(d, "young_gc_time")
self.young_gc_count = self.v(d, "young_gc_count")
self.old_gc_time = self.v(d, "old_gc_time")
self.old_gc_count = self.v(d, "old_gc_count")
self.memory_segments = self.v(d, "memory_segments")
self.memory_doc_values = self.v(d, "memory_doc_values")
self.memory_terms = self.v(d, "memory_terms")
self.memory_norms = self.v(d, "memory_norms")
self.memory_points = self.v(d, "memory_points")
self.memory_stored_fields = self.v(d, "memory_stored_fields")
self.store_size = self.v(d, "store_size")
self.translog_size = self.v(d, "translog_size")
self.segment_count = self.v(d, "segment_count")
self.total_transform_search_times = self.v(d, "total_transform_search_times")
self.total_transform_index_times = self.v(d, "total_transform_index_times")
self.total_transform_processing_times = self.v(d, "total_transform_processing_times")
self.total_transform_throughput = self.v(d, "total_transform_throughput")
def as_dict(self):
return self.__dict__
def as_flat_list(self):
def op_metrics(op_item, key, single_value=False):
doc = {
"task": op_item["task"],
"operation": op_item["operation"],
"name": key
}
if single_value:
doc["value"] = {"single": op_item[key]}
else:
doc["value"] = op_item[key]
if "meta" in op_item:
doc["meta"] = op_item["meta"]
return doc
all_results = []
for metric, value in self.as_dict().items():
if metric == "op_metrics":
for item in value:
if "throughput" in item:
all_results.append(op_metrics(item, "throughput"))
if "latency" in item:
all_results.append(op_metrics(item, "latency"))
if "service_time" in item:
all_results.append(op_metrics(item, "service_time"))
if "client_processing_time" in item:
all_results.append(op_metrics(item, "client_processing_time"))
if "processing_time" in item:
all_results.append(op_metrics(item, "processing_time"))
if "error_rate" in item:
all_results.append(op_metrics(item, "error_rate", single_value=True))
if "duration" in item:
all_results.append(op_metrics(item, "duration", single_value=True))
elif metric == "ml_processing_time":
for item in value:
all_results.append({
"job": item["job"],
"name": "ml_processing_time",
"value": {
"min": item["min"],
"mean": item["mean"],
"median": item["median"],
"max": item["max"]
}
})
elif metric == "correctness_metrics":
for item in value:
for knn_metric in ["recall@k", "recall@1"]:
if knn_metric in item:
all_results.append({
"task": item["task"],
"operation": item["operation"],
"name": knn_metric,
"value": item[knn_metric]
})
elif metric == "profile_metrics":
for item in value:
for metric_name in item.keys():
if metric_name not in ["task", "operation", "error_rate", "duration"]:
all_results.append({
"task": item["task"],
"operation": item["operation"],
"name": metric_name,
"value": item[metric_name]
})
elif metric.startswith("total_transform_") and value is not None:
for item in value:
all_results.append({
"id": item["id"],
"name": metric,
"value": {
"single": item["mean"]
}
})
elif metric.endswith("_time_per_shard"):
if value:
all_results.append({"name": metric, "value": value})
elif value is not None:
result = {
"name": metric,
"value": {
"single": value
}
}
all_results.append(result)
# sorting is just necessary to have a stable order for tests. As we just have a small number of metrics, the overhead is neglible.
return sorted(all_results, key=lambda m: m["name"])
def v(self, d, k, default=None):
return d.get(k, default) if d else default
def add_op_metrics(self, task, operation, throughput, latency, service_time, client_processing_time,
processing_time, error_rate, duration, meta):
doc = {
"task": task,
"operation": operation,
"throughput": throughput,
"latency": latency,
"service_time": service_time,
"client_processing_time": client_processing_time,
"processing_time": processing_time,
"error_rate": error_rate,
"duration": duration
}
if meta:
doc["meta"] = meta
self.op_metrics.append(doc)
def add_correctness_metrics(self, task, operation, recall_at_k_stats, recall_at_1_stats, error_rate, duration):
self.correctness_metrics.append({
"task": task,
"operation": operation,
"recall@k": recall_at_k_stats,
"recall@1":recall_at_1_stats,
"error_rate": error_rate,
"duration": duration,
})
def add_profile_metrics(self, task, operation, profile_metrics):
self.profile_metrics.append({
"task": task,
"operation": operation,
"metrics": profile_metrics
})
def tasks(self):
# ensure we can read test_run.json files before Solr Orbit 0.8.0
return [v.get("task", v["operation"]) for v in self.op_metrics]
def metrics(self, task):
# ensure we can read test_run.json files before Solr Orbit 0.8.0
for r in self.op_metrics:
if r.get("task", r["operation"]) == task:
return r
return None
class SystemStatsCalculator:
def __init__(self, store, node_name):
self.store = store
self.logger = logging.getLogger(__name__)
self.node_name = node_name
def __call__(self):
result = SystemStats()
self.logger.debug("Calculating system metrics for [%s]", self.node_name)
self.logger.debug("Gathering disk metrics.")
self.add(result, "final_index_size_bytes", "index_size")
self.add(result, "disk_io_write_bytes", "bytes_written")
self.logger.debug("Gathering node startup time metrics.")
self.add(result, "node_startup_time", "startup_time")
return result
def add(self, result, raw_metric_key, summary_metric_key):
metric_value = self.store.get_one(raw_metric_key, node_name=self.node_name)
metric_unit = self.store.get_unit(raw_metric_key, node_name=self.node_name)
if metric_value:
self.logger.debug("Adding record for [%s] with value [%s].", raw_metric_key, str(metric_value))
result.add_node_metrics(self.node_name, summary_metric_key, metric_value, metric_unit)
else:
self.logger.debug("Skipping incomplete [%s] record.", raw_metric_key)
class SystemStats:
def __init__(self, d=None):
self.node_metrics = self.v(d, "node_metrics", default=[])
def v(self, d, k, default=None):
return d.get(k, default) if d else default
def add_node_metrics(self, node, name, value, unit):
metric = {
"node": node,
"name": name,
"value": value
}
if unit:
metric["unit"] = unit
self.node_metrics.append(metric)
def as_flat_list(self):
all_results = []
for v in self.node_metrics:
all_results.append({"node": v["node"], "name": v["name"], "value": {"single": v["value"]}})
# Sort for a stable order in tests.
return sorted(all_results, key=lambda m: m["name"])