blob: a17e76c1f562e07ed110819260b43c112e92a043 [file]
# SPDX-License-Identifier: Apache-2.0
#
# Modifications by Apache Solr contributors; see git log for details.
# Licensed under the Apache License, Version 2.0.
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more
# contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import shlex
import subprocess
import psutil
from solrorbit import time, exceptions, telemetry
from solrorbit.builder import cluster, java_resolver
from solrorbit.utils import io, opts, process
class DockerLauncher:
# May download a Docker image and that can take some time
PROCESS_WAIT_TIMEOUT_SECONDS = 10 * 60
def __init__(self, cfg, clock=time.Clock):
self.cfg = cfg
self.clock = clock
self.logger = logging.getLogger(__name__)
def start(self, node_configurations):
nodes = []
for node_configuration in node_configurations:
node_name = node_configuration.node_name
host_name = node_configuration.ip
binary_path = node_configuration.binary_path
self.logger.info("Starting node [%s] in Docker.", node_name)
self._start_process(binary_path)
node_telemetry = [
# Don't attach any telemetry devices for now but keep the infrastructure in place
]
t = telemetry.Telemetry(devices=node_telemetry)
node = cluster.Node(0, binary_path, host_name, node_name, t)
t.attach_to_node(node)
nodes.append(node)
return nodes
def _start_process(self, binary_path):
compose_cmd = self._docker_compose(binary_path, "up -d")
ret = process.run_subprocess_with_logging(compose_cmd)
if ret != 0:
msg = "Docker daemon startup failed with exit code [{}]".format(ret)
logging.error(msg)
raise exceptions.LaunchError(msg)
container_id = self._get_container_id(binary_path)
self._wait_for_healthy_running_container(container_id, DockerLauncher.PROCESS_WAIT_TIMEOUT_SECONDS)
def _docker_compose(self, compose_config, cmd):
return "docker-compose -f {} {}".format(os.path.join(compose_config, "docker-compose.yml"), cmd)
def _get_container_id(self, compose_config):
compose_ps_cmd = self._docker_compose(compose_config, "ps -q")
return process.run_subprocess_with_output(compose_ps_cmd)[0]
def _wait_for_healthy_running_container(self, container_id, timeout):
cmd = 'docker ps -a --filter "id={}" --filter "status=running" --filter "health=healthy" -q'.format(container_id)
stop_watch = self.clock.stop_watch()
stop_watch.start()
while stop_watch.split_time() < timeout:
containers = process.run_subprocess_with_output(cmd)
if len(containers) > 0:
return
time.sleep(0.5)
msg = "No healthy running container after {} seconds!".format(timeout)
logging.error(msg)
raise exceptions.LaunchError(msg)
def stop(self, nodes, metrics_store):
self.logger.info("Shutting down [%d] nodes running in Docker on this host.", len(nodes))
for node in nodes:
self.logger.info("Stopping node [%s].", node.node_name)
if metrics_store:
telemetry.add_metadata_for_node(metrics_store, node.node_name, node.host_name)
node.telemetry.detach_from_node(node, running=True)
process.run_subprocess_with_logging(self._docker_compose(node.binary_path, "down"))
node.telemetry.detach_from_node(node, running=False)
if metrics_store:
node.telemetry.store_system_metrics(node, metrics_store)
def wait_for_pidfile(pidfilename, timeout=60, clock=time.Clock):
stop_watch = clock.stop_watch()
stop_watch.start()
while stop_watch.split_time() < timeout:
try:
with open(pidfilename, "rb") as f:
buf = f.read()
if not buf:
raise EOFError
return int(buf)
except (FileNotFoundError, EOFError):
time.sleep(0.5)
msg = "pid file not available after {} seconds!".format(timeout)
logging.error(msg)
raise exceptions.LaunchError(msg)
class ProcessLauncher:
"""
Launcher is responsible for starting and stopping the benchmark candidate.
"""
PROCESS_WAIT_TIMEOUT_SECONDS = 90.0
def __init__(self, cfg, clock=time.Clock):
self.cfg = cfg
self._clock = clock
self.logger = logging.getLogger(__name__)
self.pass_env_vars = opts.csv_to_list(self.cfg.opts("system", "passenv", mandatory=False, default_value="PATH"))
def start(self, node_configurations):
node_count_on_host = len(node_configurations)
return [self._start_node(node_configuration, node_count_on_host) for node_configuration in node_configurations]
def _start_node(self, node_configuration, node_count_on_host):
host_name = node_configuration.ip
node_name = node_configuration.node_name
binary_path = node_configuration.binary_path
data_paths = node_configuration.data_paths
node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry")
java_major_version, java_home = java_resolver.java_home(node_configuration.cluster_config_runtime_jdks,
self.cfg.opts("builder", "runtime.jdk"))
self.logger.info("Java major version: %s", java_major_version)
self.logger.info("Java home: %s", java_home)
self.logger.info("Starting node [%s].", node_name)
enabled_devices = self.cfg.opts("telemetry", "devices")
telemetry_params = self.cfg.opts("telemetry", "params")
node_telemetry = [
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.Heapdump(node_telemetry_dir),
telemetry.DiskIo(node_count_on_host),
telemetry.IndexSize(data_paths),
telemetry.StartupTime(),
]
t = telemetry.Telemetry(enabled_devices, devices=node_telemetry)
env = self._prepare_env(node_name, java_home, t)
t.on_pre_node_start(node_name)
# Get Solr version for version-specific startup command
distribution_version = self.cfg.opts("builder", "distribution.version", mandatory=False)
node_pid = self._start_process(binary_path, env, distribution_version)
self.logger.info("Successfully started node [%s] with PID [%s].", node_name, node_pid)
node = cluster.Node(node_pid, binary_path, host_name, node_name, t)
self.logger.info("Attaching telemetry devices to node [%s].", node_name)
t.attach_to_node(node)
return node
def _prepare_env(self, node_name, java_home, t):
env = {k: v for k, v in os.environ.items() if k in self.pass_env_vars}
if java_home:
self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep, prepend=True)
env["SOLR_JAVA_HOME"] = java_home
env["JAVA_HOME"] = java_home
self.logger.info("JAVA HOME: %s", env["JAVA_HOME"])
if not env.get("SOLR_JAVA_OPTS"):
env["SOLR_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError"
# we just blindly trust telemetry here...
for v in t.instrument_candidate_java_opts():
self._set_env(env, "SOLR_JAVA_OPTS", v)
self.logger.debug("env for [%s]: %s", node_name, str(env))
return env
def _set_env(self, env, k, v, separator=' ', prepend=False):
if v is not None:
if k not in env:
env[k] = v
elif prepend:
env[k] = v + separator + env[k]
else:
env[k] = env[k] + separator + v
@staticmethod
def _run_subprocess(command_line, env):
command_line_args = shlex.split(command_line)
with subprocess.Popen(command_line_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env,
start_new_session=True) as command_line_process:
# wait for it to finish
command_line_process.wait()
return command_line_process.returncode
@staticmethod
def _start_process(binary_path, env, distribution_version=None):
if os.name == "posix" and os.geteuid() == 0:
raise exceptions.LaunchError("Cannot launch Solr as root. Please run as a non-root user.")
os.chdir(binary_path)
# Solr uses bin/solr instead of bin/opensearch
cmd = [io.escape_path(os.path.join(".", "bin", "solr"))]
# Solr startup command varies by version:
# - Solr 9.x: requires --cloud flag for SolrCloud mode
# - Solr 10.x+: just "start" enables SolrCloud mode with embedded ZooKeeper by default
# The bin/solr script handles daemonization and PID file creation
cmd.append("start")
# Determine if we need --cloud flag based on version
if distribution_version:
# Extract major version (handle formats like "9.10.1", "10.0.0-SNAPSHOT", "11.0.0-SNAPSHOT")
version_parts = distribution_version.split("-")[0].split(".")
if version_parts:
try:
major_version = int(version_parts[0])
if major_version < 10:
# Solr 9.x and earlier require --cloud flag
cmd.append("--cloud")
logging.info("Using --cloud flag for Solr %s", distribution_version)
else:
logging.info("Solr %s uses embedded cloud mode by default", distribution_version)
except (ValueError, IndexError):
# If we can't parse version, assume newer Solr (no flag)
logging.warning("Could not parse Solr version from '%s', assuming 10.x+ (no --cloud flag)",
distribution_version)
ret = ProcessLauncher._run_subprocess(command_line=" ".join(cmd), env=env)
if ret != 0:
msg = "Daemon startup failed with exit code [{}]".format(ret)
logging.error(msg)
raise exceptions.LaunchError(msg)
# Solr creates PID file at solr-<port>.pid in the bin directory
port = env.get("SOLR_PORT", "8983")
pid_file = os.path.join(binary_path, "bin", f"solr-{port}.pid")
return wait_for_pidfile(pid_file)
def stop(self, nodes, metrics_store):
self.logger.info("Shutting down [%d] nodes on this host.", len(nodes))
stopped_nodes = []
for node in nodes:
node_name = node.node_name
if metrics_store:
telemetry.add_metadata_for_node(metrics_store, node_name, node.host_name)
try:
node_process = psutil.Process(pid=node.pid)
node.telemetry.detach_from_node(node, running=True)
except psutil.NoSuchProcess:
self.logger.warning("No process found with PID [%s] for node [%s].", node.pid, node_name)
node_process = None
if node_process:
stop_watch = self._clock.stop_watch()
stop_watch.start()
try:
node_process.terminate()
node_process.wait(10.0)
stopped_nodes.append(node)
except psutil.NoSuchProcess:
self.logger.warning("No process found with PID [%s] for node [%s].", node_process.pid, node_name)
except psutil.TimeoutExpired:
self.logger.info("kill -KILL node [%s]", node_name)
try:
# kill -9
node_process.kill()
stopped_nodes.append(node)
except psutil.NoSuchProcess:
self.logger.warning("No process found with PID [%s] for node [%s].", node_process.pid, node_name)
self.logger.info("Done shutting down node [%s] in [%.1f] s.", node_name, stop_watch.split_time())
node.telemetry.detach_from_node(node, running=False)
# store system metrics in any case (telemetry devices may derive system metrics while the node is running)
if metrics_store:
node.telemetry.store_system_metrics(node, metrics_store)
return stopped_nodes