blob: bb0c90e89996d51fb7d59ddd3fcbc4881a7fe2e7 [file]
# SPDX-License-Identifier: Apache-2.0
#
# Modifications by Apache Solr contributors; see git log for details.
# Licensed under the Apache License, Version 2.0.
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
import os
import re
import threading
from abc import abstractmethod
import tabulate
from solrorbit import metrics, time, exceptions
from solrorbit.utils import io, sysstats, console, process
def list_telemetry():
console.println("Available telemetry devices:\n")
# --- Solr-native devices (always enabled) ---
console.println("Always-enabled Solr devices (no --telemetry flag needed):\n")
solr_devices = [
[d.command, d.human_name, d.help] for d in [
SolrJvmStats,
SolrNodeStats,
SolrCollectionStats,
SolrQueryStats,
SolrIndexingStats,
SolrCacheStats,
]
]
console.println(tabulate.tabulate(solr_devices, ["Command", "Name", "Description"]))
console.println("\nAll always-on devices poll /solr/admin/metrics (JSON on Solr 9.x, Prometheus text on Solr 10.x).")
# --- Optional REST devices (all pipelines) ---
console.println("\n\nOptional REST devices (all pipelines — enable with --telemetry <command>):\n")
rest_devices = [[device.command, device.human_name, device.help] for device in [
SegmentStats, ShardStats, ClusterEnvironmentInfo,
]]
console.println(tabulate.tabulate(rest_devices, ["Command", "Name", "Description"]))
# --- Optional JVM/process devices (provisioned pipelines only) ---
console.println("\n\nOptional JVM/process devices (docker or from-distribution pipelines only):\n")
jvm_devices = [[device.command, device.human_name, device.help] for device in [
FlightRecorder, Gc, JitCompiler, Heapdump,
]]
console.println(tabulate.tabulate(jvm_devices, ["Command", "Name", "Description"]))
console.println("\nJVM/process devices inject flags into SOLR_OPTS before Solr starts.")
console.println("They are silently skipped when pipeline is benchmark-only.")
console.println("\nNote: disk-io (disk I/O byte counters) is always active on provisioned pipelines.")
class Telemetry:
def __init__(self, enabled_devices=None, devices=None):
if devices is None:
devices = []
if enabled_devices is None:
enabled_devices = []
self.enabled_devices = enabled_devices
self.devices = devices
def instrument_candidate_java_opts(self):
opts = []
for device in self.devices:
if self._enabled(device):
additional_opts = device.instrument_java_opts()
# properly merge values with the same key
opts.extend(additional_opts)
return opts
def on_pre_node_start(self, node_name):
for device in self.devices:
if self._enabled(device):
device.on_pre_node_start(node_name)
def attach_to_node(self, node):
for device in self.devices:
if self._enabled(device):
device.attach_to_node(node)
def detach_from_node(self, node, running):
for device in self.devices:
if self._enabled(device):
device.detach_from_node(node, running)
def on_benchmark_start(self):
for device in self.devices:
if self._enabled(device):
device.on_benchmark_start()
def on_benchmark_stop(self):
for device in self.devices:
if self._enabled(device):
device.on_benchmark_stop()
def store_system_metrics(self, node, metrics_store):
for device in self.devices:
if self._enabled(device):
device.store_system_metrics(node, metrics_store)
def _enabled(self, device):
return device.internal or device.command in self.enabled_devices
########################################################################################
#
# Telemetry devices
#
########################################################################################
class TelemetryDevice:
def __init__(self):
self.logger = logging.getLogger(__name__)
def instrument_java_opts(self):
return {}
def on_pre_node_start(self, node_name):
pass
def attach_to_node(self, node):
pass
def detach_from_node(self, node, running):
pass
def on_benchmark_start(self):
pass
def on_benchmark_stop(self):
pass
def store_system_metrics(self, node, metrics_store):
pass
def __getstate__(self):
state = self.__dict__.copy()
del state["logger"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.logger = logging.getLogger(__name__)
class InternalTelemetryDevice(TelemetryDevice):
internal = True
class SamplerThread(threading.Thread):
def __init__(self, recorder):
threading.Thread.__init__(self)
self.stop = False
self.recorder = recorder
def finish(self):
self.stop = True
self.join()
def run(self):
# noinspection PyBroadException
try:
while not self.stop:
self.recorder.record()
time.sleep(self.recorder.sample_interval)
except BaseException:
logging.getLogger(__name__).exception("Could not determine %s", self.recorder)
class FlightRecorder(TelemetryDevice):
internal = False
command = "jfr"
human_name = "Flight Recorder"
help = "Enables Java Flight Recorder (requires OpenJDK 11+); injected into SOLR_OPTS."
def __init__(self, telemetry_params, log_root, java_major_version):
super().__init__()
self.telemetry_params = telemetry_params
self.log_root = log_root
self.java_major_version = java_major_version
def instrument_java_opts(self):
if self.telemetry_params.get("pipeline", "") == "benchmark-only":
self.logger.warning("jfr: Solr was not provisioned by Solr Orbit; skipping JFR flags.")
return []
io.ensure_dir(self.log_root)
log_file = os.path.join(self.log_root, "profile.jfr")
console.info("%s: Writing flight recording to [%s]" % (self.human_name, log_file), logger=self.logger)
java_opts = self.java_opts(log_file)
self.logger.info("jfr: Adding JVM arguments: [%s].", java_opts)
return java_opts
def java_opts(self, log_file):
recording_template = self.telemetry_params.get("recording-template")
java_opts = ["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints"]
jfr_cmd = "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format(log_file)
if recording_template:
self.logger.info("jfr: Using recording template [%s].", recording_template)
jfr_cmd += ",settings={}".format(recording_template)
else:
self.logger.info("jfr: Using default recording template.")
java_opts.append(jfr_cmd)
return java_opts
class JitCompiler(TelemetryDevice):
internal = False
command = "jit"
human_name = "JIT Compiler Profiler"
help = "Enables JIT compiler logs; injected into SOLR_OPTS."
def __init__(self, log_root, telemetry_params=None):
super().__init__()
self.log_root = log_root
self.telemetry_params = telemetry_params or {}
def instrument_java_opts(self):
if self.telemetry_params.get("pipeline", "") == "benchmark-only":
self.logger.warning("jit: Solr was not provisioned by Solr Orbit; skipping JIT flags.")
return []
io.ensure_dir(self.log_root)
log_file = os.path.join(self.log_root, "jit.log")
console.info("%s: Writing JIT compiler log to [%s]" % (self.human_name, log_file), logger=self.logger)
return ["-XX:+UnlockDiagnosticVMOptions", "-XX:+TraceClassLoading", "-XX:+LogCompilation",
"-XX:LogFile={}".format(log_file), "-XX:+PrintAssembly"]
class Gc(TelemetryDevice):
internal = False
command = "gc"
human_name = "GC log"
help = "Enables GC logs (Java 9+ -Xlog: format); injected into SOLR_OPTS."
def __init__(self, telemetry_params, log_root, java_major_version):
super().__init__()
self.telemetry_params = telemetry_params
self.log_root = log_root
self.java_major_version = java_major_version
def instrument_java_opts(self):
if self.telemetry_params.get("pipeline", "") == "benchmark-only":
self.logger.warning("gc: Solr was not provisioned by Solr Orbit; skipping GC flags.")
return []
io.ensure_dir(self.log_root)
log_file = os.path.join(self.log_root, "gc.log")
console.info("%s: Writing GC log to [%s]" % (self.human_name, log_file), logger=self.logger)
log_config = self.telemetry_params.get("gc-log-config", "gc*=info,safepoint=info,age*=trace")
# see https://docs.oracle.com/javase/9/tools/java.htm#JSWOR-GUID-BE93ABDC-999C-4CB5-A88B-1994AAAC74D5
return [f"-Xlog:{log_config}:file={log_file}:utctime,uptimemillis,level,tags:filecount=0"]
class Heapdump(TelemetryDevice):
internal = False
command = "heapdump"
human_name = "Heap Dump"
help = "Captures a heap dump from the Solr JVM on benchmark stop."
def __init__(self, log_root, docker_container=None):
super().__init__()
self.log_root = log_root
self.docker_container = docker_container
def detach_from_node(self, node, running):
if running:
heap_dump_file = os.path.join(self.log_root, "heap_at_exit_{}.hprof".format(node.pid))
console.info("{}: Writing heap dump to [{}]".format(self.human_name, heap_dump_file), logger=self.logger)
# noinspection PyBroadException
try:
if self.docker_container:
cmd = "docker exec {} jmap -dump:format=b,file={} {}".format(
self.docker_container, heap_dump_file, node.pid)
else:
cmd = "jmap -dump:format=b,file={} {}".format(heap_dump_file, node.pid)
if process.run_subprocess_with_logging(cmd):
self.logger.warning("Could not write heap dump to [%s]", heap_dump_file)
except BaseException:
self.logger.warning("Could not write heap dump to [%s]", heap_dump_file)
class SegmentStats(TelemetryDevice):
internal = False
command = "segment-stats"
human_name = "Segment Stats"
help = "Captures per-collection segment stats (numDocs, deletedDocs, segmentCount, sizeInBytes) via the Solr Luke API."
def __init__(self, log_root, admin_client):
super().__init__()
self.log_root = log_root
self.admin_client = admin_client
def on_benchmark_stop(self):
# noinspection PyBroadException
try:
collections = self.admin_client.list_collections()
stats_file = os.path.join(self.log_root, "segment_stats.log")
console.info(f"{self.human_name}: Writing segment stats to [{stats_file}]", logger=self.logger)
io.ensure_dir(self.log_root)
with open(stats_file, "wt") as f:
for coll in collections:
try:
idx = self.admin_client.get_luke_stats(coll)
row = {
"collection": coll,
"numDocs": idx.get("numDocs"),
"maxDoc": idx.get("maxDoc"),
"deletedDocs": idx.get("deletedDocs"),
"segmentCount": idx.get("segmentCount"),
"sizeInBytes": idx.get("sizeInBytes"),
}
f.write(json.dumps(row) + "\n")
except BaseException:
self.logger.warning("Could not retrieve Luke stats for collection [%s].", coll)
except BaseException:
self.logger.exception("Could not retrieve segment stats.")
class ShardStats(TelemetryDevice):
"""
Collects per-shard document count and index size for SolrCloud clusters.
Skipped silently on standalone Solr (no cluster.collections in CLUSTERSTATUS).
"""
internal = False
command = "shard-stats"
human_name = "Shard Stats"
help = "Regularly samples per-shard document count and index size (SolrCloud only)."
def __init__(self, telemetry_params, admin_client, metrics_store):
"""
:param telemetry_params: May optionally specify
``shard-stats-sample-interval``: positive integer, seconds between polls. Default: 60.
:param admin_client: A SolrClient instance used for admin API calls.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.admin_client = admin_client
self.metrics_store = metrics_store
self.sample_interval = telemetry_params.get("shard-stats-sample-interval", 60)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'shard-stats-sample-interval' must be greater than zero but was {self.sample_interval}."
)
self.samplers = []
def on_benchmark_start(self):
# noinspection PyBroadException
try:
data = self.admin_client.get_clusterstatus()
except BaseException:
self.logger.exception("ShardStats: could not retrieve CLUSTERSTATUS; device will not run.")
return
if "cluster" not in data or "collections" not in data.get("cluster", {}):
self.logger.info("ShardStats: no cluster.collections in CLUSTERSTATUS — skipping (standalone Solr).")
return
recorder = ShardStatsRecorder(self.admin_client, self.metrics_store, self.sample_interval)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
sampler.start()
def on_benchmark_stop(self):
for sampler in self.samplers:
sampler.finish()
class ShardStatsRecorder:
"""
Polls CLUSTERSTATUS and Core STATUS for each shard leader; pushes metrics per shard.
"""
def __init__(self, admin_client, metrics_store, sample_interval):
self.admin_client = admin_client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)
def __str__(self):
return "shard stats"
def record(self):
# noinspection PyBroadException
try:
data = self.admin_client.get_clusterstatus()
collections = data.get("cluster", {}).get("collections", {})
except BaseException:
self.logger.exception("ShardStats: could not retrieve CLUSTERSTATUS.")
return
for _coll_name, coll_data in collections.items():
shards = coll_data.get("shards", {})
for shard_name, shard_data in shards.items():
replicas = shard_data.get("replicas", {})
for _replica_key, replica in replicas.items():
if replica.get("state") == "active" and replica.get("leader") == "true":
core_name = replica.get("core")
if not core_name:
continue
# noinspection PyBroadException
try:
core_status = self.admin_client.get_core_status(core_name)
idx = core_status.get("index", {})
num_docs = idx.get("numDocs", 0)
size_bytes = idx.get("sizeInBytes", 0)
self.metrics_store.put_value_cluster_level(
f"shard_{shard_name}_num_docs", num_docs, "")
self.metrics_store.put_value_cluster_level(
f"shard_{shard_name}_size_bytes", size_bytes, "byte")
except BaseException:
self.logger.warning("ShardStats: could not get core STATUS for [%s].", core_name)
break # only need the leader replica per shard
class StartupTime(InternalTelemetryDevice):
def __init__(self, stopwatch=time.StopWatch):
super().__init__()
self.timer = stopwatch()
def on_pre_node_start(self, node_name):
self.timer.start()
def attach_to_node(self, node):
self.timer.stop()
def store_system_metrics(self, node, metrics_store):
metrics_store.put_value_node_level(node.node_name, "node_startup_time", self.timer.total_time(), "s")
class DiskIo(InternalTelemetryDevice):
"""
Gathers disk I/O stats.
"""
def __init__(self, node_count_on_host):
super().__init__()
self.node_count_on_host = node_count_on_host
self.read_bytes = None
self.write_bytes = None
def attach_to_node(self, node):
os_process = sysstats.setup_process_stats(node.pid)
process_start = sysstats.process_io_counters(os_process)
if process_start:
self.read_bytes = process_start.read_bytes
self.write_bytes = process_start.write_bytes
self.logger.info("Using more accurate process-based I/O counters.")
else:
# noinspection PyBroadException
try:
disk_start = sysstats.disk_io_counters()
self.read_bytes = disk_start.read_bytes
self.write_bytes = disk_start.write_bytes
self.logger.warning("Process I/O counters are not supported on this platform. Falling back to less "
"accurate disk I/O counters.")
except BaseException:
self.logger.exception("Could not determine I/O stats at benchmark start.")
def detach_from_node(self, node, running):
if running:
# Be aware the semantics of write counts etc. are different for disk and process statistics.
# Thus we're conservative and only publish I/O bytes now.
# noinspection PyBroadException
try:
os_process = sysstats.setup_process_stats(node.pid)
process_end = sysstats.process_io_counters(os_process)
# we have process-based disk counters, no need to worry how many nodes are on this host
if process_end:
self.read_bytes = process_end.read_bytes - self.read_bytes
self.write_bytes = process_end.write_bytes - self.write_bytes
else:
disk_end = sysstats.disk_io_counters()
if self.node_count_on_host > 1:
self.logger.info("There are [%d] nodes on this host and Solr Orbit fell back to disk I/O counters. "
"Attributing [1/%d] of total I/O to [%s].",
self.node_count_on_host, self.node_count_on_host, node.node_name)
self.read_bytes = (disk_end.read_bytes - self.read_bytes) // self.node_count_on_host
self.write_bytes = (disk_end.write_bytes - self.write_bytes) // self.node_count_on_host
# Catching RuntimeException is not sufficient: psutil might raise AccessDenied (derived from Exception)
except BaseException:
self.logger.exception("Could not determine I/O stats at benchmark end.")
# reset all counters so we don't attempt to write inconsistent numbers to the metrics store later on
self.read_bytes = None
self.write_bytes = None
def store_system_metrics(self, node, metrics_store):
if self.write_bytes is not None:
metrics_store.put_value_node_level(node.node_name, "disk_io_write_bytes", self.write_bytes, "byte")
if self.read_bytes is not None:
metrics_store.put_value_node_level(node.node_name, "disk_io_read_bytes", self.read_bytes, "byte")
def store_node_attribute_metadata(metrics_store, nodes_info):
# push up all node level attributes to cluster level iff the values are identical for all nodes
pseudo_cluster_attributes = {}
for node in nodes_info:
if "attributes" in node:
for k, v in node["attributes"].items():
attribute_key = "attribute_%s" % str(k)
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node["name"], attribute_key, v)
if attribute_key not in pseudo_cluster_attributes:
pseudo_cluster_attributes[attribute_key] = set()
pseudo_cluster_attributes[attribute_key].add(v)
for k, v in pseudo_cluster_attributes.items():
if len(v) == 1:
metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, k, next(iter(v)))
def store_plugin_metadata(metrics_store, nodes_info):
# push up all plugins to cluster level iff all nodes have the same ones
all_nodes_plugins = []
all_same = False
for node in nodes_info:
plugins = [p["name"] for p in extract_value(node, ["plugins"], fallback=[]) if "name" in p]
if not all_nodes_plugins:
all_nodes_plugins = plugins.copy()
all_same = True
else:
# order does not matter so we do a set comparison
all_same = all_same and set(all_nodes_plugins) == set(plugins)
if plugins:
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node["name"], "plugins", plugins)
if all_same and all_nodes_plugins:
metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "plugins", all_nodes_plugins)
def extract_value(node, path, fallback="unknown"):
value = node
try:
for k in path:
value = value[k]
except KeyError:
value = fallback
return value
class ClusterEnvironmentInfo(TelemetryDevice):
"""
Gathers static environment information on a cluster level (Solr version, JVM, CPU).
Called once at benchmark start; stores results as run metadata.
"""
internal = False
command = "cluster-environment-info"
human_name = "Cluster Environment Info"
help = "Stores Solr version, JVM version, and CPU core count as benchmark metadata."
def __init__(self, admin_client, metrics_store):
super().__init__()
self.admin_client = admin_client
self.metrics_store = metrics_store
def on_benchmark_start(self):
# noinspection PyBroadException
try:
resp = self.admin_client.raw_request("GET", "/api/node/system")
resp.raise_for_status()
data = resp.json()
except BaseException:
self.logger.exception("ClusterEnvironmentInfo: could not retrieve /api/node/system")
return
lucene = data.get("lucene", {})
jvm = data.get("jvm", {})
system = data.get("system", {})
distribution_version = lucene.get("solr-spec-version", "unknown")
jvm_version = jvm.get("version", "unknown")
jvm_vendor = jvm.get("name", "unknown")
cpu_logical_cores = system.get("availableProcessors", -1)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "distribution_version", distribution_version)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "jvm_version", jvm_version)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "jvm_vendor", jvm_vendor)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "cpu_logical_cores", cpu_logical_cores)
# noinspection PyBroadException
try:
cs_resp = self.admin_client.raw_request("GET", "/solr/admin/collections?action=CLUSTERSTATUS&wt=json")
cs_resp.raise_for_status()
cluster = cs_resp.json().get("cluster", {})
live_nodes = cluster.get("liveNodes", [])
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "cluster_node_count", len(live_nodes))
except BaseException:
self.logger.warning("ClusterEnvironmentInfo: could not retrieve CLUSTERSTATUS node count.")
def add_metadata_for_node(metrics_store, node_name, host_name):
"""
Gathers static environment information like OS or CPU details for benchmark-provisioned nodes.
"""
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "os_name", sysstats.os_name())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "os_version", sysstats.os_version())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_logical_cores", sysstats.logical_cpu_cores())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_physical_cores", sysstats.physical_cpu_cores())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_model", sysstats.cpu_model())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "node_name", node_name)
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "host_name", host_name)
class IndexSize(InternalTelemetryDevice):
"""
Measures the final size of the index
"""
def __init__(self, data_paths):
super().__init__()
self.data_paths = data_paths
self.attached = False
self.index_size_bytes = None
def attach_to_node(self, node):
self.attached = True
def detach_from_node(self, node, running):
# we need to gather the file size after the node has terminated so we can be sure that it has written all its buffers.
if not running and self.attached and self.data_paths:
self.attached = False
index_size_bytes = 0
for data_path in self.data_paths:
index_size_bytes += io.get_size(data_path)
self.index_size_bytes = index_size_bytes
def store_system_metrics(self, node, metrics_store):
if self.index_size_bytes:
metrics_store.put_value_node_level(node.node_name, "final_index_size_bytes", self.index_size_bytes, "byte")
# ===========================================================================
# Solr telemetry devices
# ===========================================================================
# ---------------------------------------------------------------------------
# Prometheus text format parser (shared with runner.py)
# ---------------------------------------------------------------------------
def _parse_prometheus_text(text: str) -> dict:
"""
Parse Prometheus exposition text format into a flat dict of {metric_name: float}.
Lines starting with '#' are comments/help/type headers and are skipped.
Handles optional labels: metric_name{label="value"} value [timestamp]
When multiple series share the same base metric name (different labels),
values are accumulated (summed).
"""
parsed_metrics = {}
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) < 2:
continue
name_part = parts[0]
try:
value = float(parts[1])
except ValueError:
continue
base_name = re.sub(r"\{[^}]*\}", "", name_part)
parsed_metrics[base_name] = parsed_metrics.get(base_name, 0.0) + value
return parsed_metrics
# ---------------------------------------------------------------------------
# Base class
# ---------------------------------------------------------------------------
class SolrTelemetryDevice(TelemetryDevice):
"""
Abstract base for Solr telemetry polling devices.
Extends TelemetryDevice so that Solr devices integrate seamlessly with
the existing Telemetry wrapper. Setting ``internal = True`` means the
device is always enabled (not filtered by the ``--telemetry`` flag).
Subclasses implement `_collect()` which is called periodically on a
background thread between `on_benchmark_start()` and `on_benchmark_stop()`.
"""
internal = True
command = None
human_name = "Solr Telemetry"
help = "Solr-specific background polling telemetry device."
def __init__(self, admin_client, metrics_store, sample_interval_s: float = 5.0):
super().__init__()
self._client = admin_client
self._metrics_store = metrics_store
self._sample_interval = sample_interval_s
self._thread = None
self._stop_event = threading.Event()
def on_benchmark_start(self) -> None:
"""Start background polling thread."""
self._stop_event.clear()
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
def on_benchmark_stop(self) -> None:
"""Stop background polling thread and flush any remaining metrics."""
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=self._sample_interval * 2 + 5)
def _poll_loop(self) -> None:
while not self._stop_event.is_set():
try:
self._collect()
except Exception as exc:
logging.getLogger(__name__).warning("%s: collection error: %s", self.__class__.__name__, exc)
self._stop_event.wait(self._sample_interval)
@abstractmethod
def _collect(self) -> None:
"""Collect metrics and store them via self._metrics_store."""
# ------------------------------------------------------------------
# Dual-format helpers
# ------------------------------------------------------------------
def _fetch_node_metrics_parsed(self):
"""
Fetch /admin/metrics and return ``(format_str, data_dict)``.
``format_str`` is ``"json"`` or ``"prometheus"``.
"""
raw = self._client.get_node_metrics()
if isinstance(raw, str):
return "prometheus", _parse_prometheus_text(raw)
return "json", raw if isinstance(raw, dict) else {}
@staticmethod
def _get_metric_json(data: dict, *keys, default=None):
"""Navigate a nested dict using successive key lookups."""
current = data
for key in keys:
if not isinstance(current, dict):
return default
current = current.get(key)
if current is None:
return default
return current
@staticmethod
def _get_metric_prometheus(data: dict, metric_name: str, default=None):
"""Look up a metric by exact base name from a parsed Prometheus dict."""
return data.get(metric_name, default)
def _put(self, name: str, value, unit: str, task: str = "", meta: dict = None) -> None:
"""Write a single metric to the metrics store."""
if not hasattr(self._metrics_store, "put_value_cluster_level"):
self._metrics_store[name] = {"value": value, "unit": unit}
return
self._metrics_store.put_value_cluster_level(
name=name, value=value, unit=unit,
task=task, operation_type="telemetry",
meta_data=meta or {},
)
# ---------------------------------------------------------------------------
# Device: SolrJvmStats
# ---------------------------------------------------------------------------
class SolrJvmStats(SolrTelemetryDevice):
"""
Collect JVM heap, GC, thread, and buffer pool metrics from Solr.
Metrics: jvm_heap_used_bytes, jvm_heap_max_bytes, jvm_gc_count, jvm_gc_time_ms,
jvm_gc_young_count, jvm_gc_young_time_ms, jvm_gc_old_count, jvm_gc_old_time_ms,
jvm_thread_count, jvm_thread_peak_count, jvm_buffer_pool_direct_bytes,
jvm_buffer_pool_mapped_bytes
"""
human_name = "Solr JVM Stats"
help = "JVM heap, GC (total/young/old), threads, and buffer pool metrics"
def _collect(self) -> None:
fmt, data = self._fetch_node_metrics_parsed()
if fmt == "prometheus":
self._collect_prometheus(data)
else:
self._collect_json(data)
def _collect_json(self, data: dict) -> None:
jvm = self._get_metric_json(data, "metrics", "solr.jvm") or {}
heap_used = jvm.get("memory.heap.used")
heap_max = jvm.get("memory.heap.max")
if heap_used is not None:
self._put("jvm_heap_used_bytes", heap_used, "bytes")
if heap_max is not None:
self._put("jvm_heap_max_bytes", heap_max, "bytes")
thread_count = jvm.get("threads.count")
thread_peak = jvm.get("threads.peak.count")
if thread_count is not None:
self._put("jvm_thread_count", thread_count, "")
if thread_peak is not None:
self._put("jvm_thread_peak_count", thread_peak, "")
direct_bytes = jvm.get("buffers.direct.MemoryUsed")
mapped_bytes = jvm.get("buffers.mapped.MemoryUsed")
if direct_bytes is not None:
self._put("jvm_buffer_pool_direct_bytes", direct_bytes, "bytes")
if mapped_bytes is not None:
self._put("jvm_buffer_pool_mapped_bytes", mapped_bytes, "bytes")
gc_count_total = None
gc_time_total = None
gc_young_count = None
gc_young_time = None
gc_old_count = None
gc_old_time = None
for k, v in jvm.items():
if v is None:
continue
if k.endswith(".count") and "gc." in k:
gc_count_total = (gc_count_total or 0) + v
k_lower = k.lower()
if "young" in k_lower or "minor" in k_lower or "eden" in k_lower:
gc_young_count = (gc_young_count or 0) + v
elif "old" in k_lower or "major" in k_lower or "tenured" in k_lower:
gc_old_count = (gc_old_count or 0) + v
if k.endswith(".time") and "gc." in k:
gc_time_total = (gc_time_total or 0) + v
k_lower = k.lower()
if "young" in k_lower or "minor" in k_lower or "eden" in k_lower:
gc_young_time = (gc_young_time or 0) + v
elif "old" in k_lower or "major" in k_lower or "tenured" in k_lower:
gc_old_time = (gc_old_time or 0) + v
if gc_count_total is not None:
self._put("jvm_gc_count", gc_count_total, "")
if gc_time_total is not None:
self._put("jvm_gc_time_ms", gc_time_total, "ms")
if gc_young_count is not None:
self._put("jvm_gc_young_count", gc_young_count, "")
if gc_young_time is not None:
self._put("jvm_gc_young_time_ms", gc_young_time, "ms")
if gc_old_count is not None:
self._put("jvm_gc_old_count", gc_old_count, "")
if gc_old_time is not None:
self._put("jvm_gc_old_time_ms", gc_old_time, "ms")
def _collect_prometheus(self, data: dict) -> None:
mapping = {
"jvm_memory_heap_used_bytes": ("jvm_heap_used_bytes", "bytes"),
"jvm_memory_heap_max_bytes": ("jvm_heap_max_bytes", "bytes"),
"jvm_gc_collection_count": ("jvm_gc_count", ""),
"jvm_gc_collection_time_ms": ("jvm_gc_time_ms", "ms"),
"jvm_threads_current": ("jvm_thread_count", ""),
"jvm_threads_peak": ("jvm_thread_peak_count", ""),
"jvm_buffer_pool_used_bytes": ("jvm_buffer_pool_direct_bytes", "bytes"),
}
for prom_name, (osb_name, unit) in mapping.items():
val = self._get_metric_prometheus(data, prom_name)
if val is not None:
self._put(osb_name, val, unit)
# ---------------------------------------------------------------------------
# Device: SolrNodeStats
# ---------------------------------------------------------------------------
class SolrNodeStats(SolrTelemetryDevice):
"""
Collect OS, file-descriptor, HTTP, and query-handler metrics from Solr.
Metrics: cpu_usage_percent, os_memory_free_bytes, node_file_descriptors_open,
node_file_descriptors_max, node_http_requests_total,
query_handler_requests_total, query_handler_errors_total,
query_handler_avg_latency_ms
"""
human_name = "Solr Node Stats"
help = "CPU usage, OS memory, file descriptors, HTTP requests, and query handler latency"
def _collect(self) -> None:
self._collect_system_stats()
self._collect_metrics_stats()
def _collect_system_stats(self) -> None:
try:
resp = self._client._get("/api/node/system")
system = resp.json()
os_data = system.get("system", {})
cpu = os_data.get("processCpuLoad") or os_data.get("systemCpuLoad")
if cpu is not None:
self._put("cpu_usage_percent", cpu * 100.0, "%")
free_mem = os_data.get("freePhysicalMemorySize")
if free_mem is not None:
self._put("os_memory_free_bytes", free_mem, "bytes")
open_fds = os_data.get("openFileDescriptorCount")
max_fds = os_data.get("maxFileDescriptorCount")
if open_fds is not None:
self._put("node_file_descriptors_open", open_fds, "")
if max_fds is not None:
self._put("node_file_descriptors_max", max_fds, "")
except Exception as exc:
logging.getLogger(__name__).debug("SolrNodeStats: /api/node/system error: %s", exc)
def _collect_metrics_stats(self) -> None:
try:
fmt, data = self._fetch_node_metrics_parsed()
if fmt == "prometheus":
self._collect_metrics_prometheus(data)
else:
self._collect_metrics_json(data)
except Exception as exc:
logging.getLogger(__name__).debug("SolrNodeStats: metrics error: %s", exc)
def _collect_metrics_json(self, data: dict) -> None:
core = self._get_metric_json(data, "metrics", "solr.core") or {}
requests = core.get("QUERY./select.requests")
errors = core.get("QUERY./select.errors")
avg_latency = core.get("QUERY./select.requestTimes.mean")
if requests is not None:
self._put("query_handler_requests_total", requests, "")
if errors is not None:
self._put("query_handler_errors_total", errors, "")
if avg_latency is not None:
self._put("query_handler_avg_latency_ms", avg_latency, "ms")
jetty = self._get_metric_json(data, "metrics", "solr.jetty") or {}
http_requests = jetty.get(
"org.eclipse.jetty.server.handler.StatisticsHandler.requests"
)
if http_requests is not None:
self._put("node_http_requests_total", http_requests, "")
def _collect_metrics_prometheus(self, data: dict) -> None:
mapping = {
"solr_metrics_core_query_requests_total": ("query_handler_requests_total", ""),
"solr_metrics_core_query_errors_total": ("query_handler_errors_total", ""),
"solr_metrics_core_query_request_times_mean_ms": ("query_handler_avg_latency_ms", "ms"),
"solr_metrics_jetty_requests_total": ("node_http_requests_total", ""),
}
for prom_name, (osb_name, unit) in mapping.items():
val = self._get_metric_prometheus(data, prom_name)
if val is not None:
self._put(osb_name, val, unit)
# ---------------------------------------------------------------------------
# Device: SolrCollectionStats
# ---------------------------------------------------------------------------
class SolrCollectionStats(SolrTelemetryDevice):
"""
Collect per-collection document count, index size, segment count, and deleted docs.
Metrics (per collection): num_docs, index_size_bytes, segment_count, num_deleted_docs
"""
human_name = "Solr Collection Stats"
help = "Per-collection: doc count, deleted docs, index size, and segment count (30 s interval)"
def __init__(self, admin_client, metrics_store,
collections: list = None, sample_interval_s: float = 30.0):
super().__init__(admin_client, metrics_store, sample_interval_s)
self._collections = collections
def _collect(self) -> None:
try:
cluster = self._client.get_cluster_status()
col_state = cluster.get("collections", {})
target_collections = self._collections or list(col_state.keys())
for col_name in target_collections:
self._collect_collection(col_name)
except Exception as exc:
logging.getLogger(__name__).debug("SolrCollectionStats: cluster status error: %s", exc)
def _collect_collection(self, collection: str) -> None:
try:
resp = self._client._get(f"/api/collections/{collection}/core-properties")
data = resp.json()
num_docs = 0
index_size = 0
for _core_name, props in data.get("core-properties", {}).items():
num_docs += props.get("numDocs", 0)
index_size += props.get("indexHeapUsageBytes", 0)
self._put("num_docs", num_docs, "docs", meta={"collection": collection})
if index_size:
self._put("index_size_bytes", index_size, "bytes",
meta={"collection": collection})
except Exception:
pass
self._fetch_luke_stats(collection)
def _fetch_luke_stats(self, collection: str) -> None:
try:
resp = self._client._get(
f"/solr/{collection}/admin/luke?numTerms=0&wt=json"
)
info = resp.json().get("index", {})
num_docs = info.get("numDocs")
deleted_docs = info.get("deletedDocs") or info.get("numDeletedDocs")
segment_count = info.get("segmentCount")
if num_docs is not None:
self._put("num_docs", num_docs, "docs", meta={"collection": collection})
if deleted_docs is not None:
self._put("num_deleted_docs", deleted_docs, "docs",
meta={"collection": collection})
if segment_count is not None:
self._put("segment_count", segment_count, "",
meta={"collection": collection})
except Exception as exc:
logging.getLogger(__name__).debug("SolrCollectionStats: luke fallback failed for %s: %s",
collection, exc)
# ---------------------------------------------------------------------------
# Device: SolrQueryStats
# ---------------------------------------------------------------------------
class SolrQueryStats(SolrTelemetryDevice):
"""
Collect query latency percentiles and cache hit ratio metrics from Solr.
Metrics: query_latency_p50_ms, query_latency_p99_ms, query_latency_p999_ms,
query_requests_total, query_errors_total, query_cache_hit_ratio
"""
human_name = "Solr Query Stats"
help = "Query latency percentiles (p50/p99/p999), cache hit ratio, request and error totals"
def _collect(self) -> None:
fmt, data = self._fetch_node_metrics_parsed()
if fmt == "prometheus":
self._collect_prometheus(data)
else:
self._collect_json(data)
def _collect_json(self, data: dict) -> None:
core = self._get_metric_json(data, "metrics", "solr.core") or {}
mappings = [
("QUERY./select.requestTimes.p_50", "query_latency_p50_ms", "ms"),
("QUERY./select.requestTimes.p_99", "query_latency_p99_ms", "ms"),
("QUERY./select.requestTimes.p_99_9", "query_latency_p999_ms", "ms"),
("QUERY./select.requests", "query_requests_total", ""),
("QUERY./select.errors", "query_errors_total", ""),
("CACHE.searcher.filterCache.hitratio", "query_cache_hit_ratio", ""),
]
for json_key, osb_name, unit in mappings:
val = core.get(json_key)
if val is None and json_key.endswith("p_99_9"):
val = core.get(json_key.replace("p_99_9", "p_999"))
if val is not None:
self._put(osb_name, val, unit)
def _collect_prometheus(self, data: dict) -> None:
mapping = {
"solr_metrics_core_query_request_times_p50_ms": ("query_latency_p50_ms", "ms"),
"solr_metrics_core_query_request_times_p99_ms": ("query_latency_p99_ms", "ms"),
"solr_metrics_core_query_request_times_p999_ms": ("query_latency_p999_ms", "ms"),
"solr_metrics_core_query_requests_total": ("query_requests_total", ""),
"solr_metrics_core_query_errors_total": ("query_errors_total", ""),
"solr_metrics_core_cache_hitratio": ("query_cache_hit_ratio", ""),
}
for prom_name, (osb_name, unit) in mapping.items():
val = self._get_metric_prometheus(data, prom_name)
if val is not None:
self._put(osb_name, val, unit)
# ---------------------------------------------------------------------------
# Device: SolrIndexingStats
# ---------------------------------------------------------------------------
class SolrIndexingStats(SolrTelemetryDevice):
"""
Collect indexing throughput and merge metrics from Solr.
Metrics: indexing_requests_total, indexing_errors_total, indexing_avg_time_ms,
index_merge_major_running, index_merge_minor_running
"""
human_name = "Solr Indexing Stats"
help = "Indexing request counts, average indexing time, and major/minor merge activity"
def _collect(self) -> None:
fmt, data = self._fetch_node_metrics_parsed()
if fmt == "prometheus":
self._collect_prometheus(data)
else:
self._collect_json(data)
def _collect_json(self, data: dict) -> None:
core = self._get_metric_json(data, "metrics", "solr.core") or {}
mappings = [
("UPDATE./update.requests", "indexing_requests_total", ""),
("UPDATE./update.errors", "indexing_errors_total", ""),
("UPDATE./update.requestTimes.mean", "indexing_avg_time_ms", "ms"),
("INDEX.merge.major.running", "index_merge_major_running", ""),
("INDEX.merge.minor.running", "index_merge_minor_running", ""),
]
for json_key, osb_name, unit in mappings:
val = core.get(json_key)
if val is not None:
self._put(osb_name, val, unit)
def _collect_prometheus(self, data: dict) -> None:
mapping = {
"solr_metrics_core_update_requests_total": ("indexing_requests_total", ""),
"solr_metrics_core_update_errors_total": ("indexing_errors_total", ""),
"solr_metrics_core_update_request_times_mean_ms": ("indexing_avg_time_ms", "ms"),
"solr_metrics_core_index_merge_major_running": ("index_merge_major_running", ""),
"solr_metrics_core_index_merge_minor_running": ("index_merge_minor_running", ""),
}
for prom_name, (osb_name, unit) in mapping.items():
val = self._get_metric_prometheus(data, prom_name)
if val is not None:
self._put(osb_name, val, unit)
# ---------------------------------------------------------------------------
# Device: SolrCacheStats
# ---------------------------------------------------------------------------
class SolrCacheStats(SolrTelemetryDevice):
"""
Collect Solr internal cache statistics for the three primary caches.
Metrics (per cache): cache_hits_total, cache_inserts_total, cache_evictions_total,
cache_memory_bytes, cache_hit_ratio
"""
human_name = "Solr Cache Stats"
help = "Per-cache hits, inserts, evictions, memory, and hit ratio (query/filter/document caches)"
CACHE_NAMES = ["queryResultCache", "filterCache", "documentCache"]
def _collect(self) -> None:
fmt, data = self._fetch_node_metrics_parsed()
if fmt == "prometheus":
self._collect_prometheus(data)
else:
self._collect_json(data)
def _collect_json(self, data: dict) -> None:
core = self._get_metric_json(data, "metrics", "solr.core") or {}
for cache_name in self.CACHE_NAMES:
prefix = f"CACHE.searcher.{cache_name}."
hits = core.get(f"{prefix}hits")
inserts = core.get(f"{prefix}inserts")
evictions = core.get(f"{prefix}evictions")
ram_bytes = core.get(f"{prefix}ramBytesUsed")
hitratio = core.get(f"{prefix}hitratio")
meta = {"cache": cache_name}
if hits is not None:
self._put("cache_hits_total", hits, "", meta=meta)
if inserts is not None:
self._put("cache_inserts_total", inserts, "", meta=meta)
if evictions is not None:
self._put("cache_evictions_total", evictions, "", meta=meta)
if ram_bytes is not None:
self._put("cache_memory_bytes", ram_bytes, "bytes", meta=meta)
if hitratio is not None:
self._put("cache_hit_ratio", hitratio, "", meta=meta)
def _collect_prometheus(self, data: dict) -> None:
aggregate_mappings = {
"solr_metrics_core_cache_hits_total": ("cache_hits_total", ""),
"solr_metrics_core_cache_inserts_total": ("cache_inserts_total", ""),
"solr_metrics_core_cache_evictions_total": ("cache_evictions_total", ""),
"solr_metrics_core_cache_ram_bytes_used": ("cache_memory_bytes", "bytes"),
"solr_metrics_core_cache_hitratio": ("cache_hit_ratio", ""),
}
for prom_name, (osb_name, unit) in aggregate_mappings.items():
val = self._get_metric_prometheus(data, prom_name)
if val is not None:
self._put(osb_name, val, unit, meta={"cache": "aggregate"})