| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Modifications by Apache Solr contributors; see git log for details. |
| # Licensed under the Apache License, Version 2.0. |
| # |
| # The OpenSearch Contributors require contributions made to |
| # this file be licensed under the Apache-2.0 license or a |
| # compatible open source license. |
| # Modifications Copyright OpenSearch Contributors. See |
| # GitHub history for details. |
| # Licensed to Elasticsearch B.V. under one or more contributor |
| # license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright |
| # ownership. Elasticsearch B.V. licenses this file to you under |
| # the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import glob |
| import json |
| import logging |
| import os |
| import shutil |
| import uuid |
| |
| import jinja2 |
| from jinja2 import select_autoescape |
| |
| from solrorbit import exceptions |
| from solrorbit.builder import cluster_config, java_resolver |
| from solrorbit.utils import console, io |
| |
| |
| def local(cfg, cluster_config, ip, http_port, all_node_ips, all_node_names, target_root, node_name): |
| distribution_version = cfg.opts("builder", "distribution.version", mandatory=False) |
| |
| node_root_dir = os.path.join(target_root, node_name) |
| |
| runtime_jdk = cluster_config.mandatory_var("runtime.jdk") |
| _, java_home = java_resolver.java_home(runtime_jdk, cfg.opts("builder", "runtime.jdk")) |
| |
| os_installer = NodeInstaller( |
| cluster_config, java_home, node_name, |
| node_root_dir, all_node_ips, all_node_names, ip, http_port) |
| return BareProvisioner(os_installer, distribution_version=distribution_version) |
| |
| |
| def docker(cfg, cluster_config, ip, http_port, target_root, node_name): |
| distribution_version = cfg.opts("builder", "distribution.version", mandatory=False) |
| benchmark_root = cfg.opts("node", "benchmark.root") |
| |
| node_root_dir = os.path.join(target_root, node_name) |
| |
| return DockerProvisioner(cluster_config, node_name, ip, http_port, node_root_dir, distribution_version, benchmark_root) |
| |
| |
| class NodeConfiguration: |
| def __init__(self, build_type, cluster_config_runtime_jdks, ip, node_name, node_root_path, binary_path, data_paths): |
| self.build_type = build_type |
| self.cluster_config_runtime_jdks = cluster_config_runtime_jdks |
| self.ip = ip |
| self.node_name = node_name |
| self.node_root_path = node_root_path |
| self.binary_path = binary_path |
| self.data_paths = data_paths |
| |
| def as_dict(self): |
| return { |
| "build-type": self.build_type, |
| "cluster-config-instance-runtime-jdks": self.cluster_config_runtime_jdks, |
| "ip": self.ip, |
| "node-name": self.node_name, |
| "node-root-path": self.node_root_path, |
| "binary-path": self.binary_path, |
| "data-paths": self.data_paths |
| } |
| |
| @staticmethod |
| def from_dict(d): |
| return NodeConfiguration( |
| d["build-type"], d["cluster-config-instance-runtime-jdks"], d["ip"], |
| d["node-name"], d["node-root-path"], d["binary-path"], d["data-paths"]) |
| |
| |
| def save_node_configuration(path, n): |
| with open(os.path.join(path, "node-config.json"), "wt") as f: |
| json.dump(n.as_dict(), f, indent=2) |
| |
| |
| def load_node_configuration(path): |
| with open(os.path.join(path, "node-config.json"), "rt") as f: |
| return NodeConfiguration.from_dict(json.load(f)) |
| |
| |
| class ConfigLoader: |
| def __init__(self): |
| pass |
| |
| def load(self): |
| pass |
| |
| |
| def _render_template(env, variables, file_name): |
| try: |
| template = env.get_template(io.basename(file_name)) |
| # force a new line at the end. Jinja seems to remove it. |
| return template.render(variables) + "\n" |
| except jinja2.exceptions.TemplateSyntaxError as e: |
| raise exceptions.InvalidSyntax("%s in %s" % (str(e), file_name)) |
| except BaseException as e: |
| raise exceptions.SystemSetupError("%s in %s" % (str(e), file_name)) |
| |
| |
| def plain_text(file): |
| _, ext = io.splitext(file) |
| return ext in [".ini", ".txt", ".json", ".yml", ".yaml", ".options", ".properties"] |
| |
| |
| def cleanup(preserve, install_dir, data_paths): |
| def delete_path(p): |
| if os.path.exists(p): |
| try: |
| logger.debug("Deleting [%s].", p) |
| shutil.rmtree(p) |
| except OSError: |
| logger.exception("Could not delete [%s]. Skipping...", p) |
| |
| logger = logging.getLogger(__name__) |
| if preserve: |
| console.info("Preserving benchmark candidate installation at [{}].".format(install_dir), logger=logger) |
| else: |
| logger.info("Wiping benchmark candidate installation at [%s].", install_dir) |
| for path in data_paths: |
| delete_path(path) |
| |
| delete_path(install_dir) |
| |
| |
| def _apply_config(source_root_path, target_root_path, config_vars): |
| logger = logging.getLogger(__name__) |
| for root, _, files in os.walk(source_root_path): |
| |
| env = jinja2.Environment(loader=jinja2.FileSystemLoader(root), autoescape=select_autoescape(['html', 'xml'])) |
| relative_root = root[len(source_root_path) + 1:] |
| absolute_target_root = os.path.join(target_root_path, relative_root) |
| io.ensure_dir(absolute_target_root) |
| |
| for name in files: |
| source_file = os.path.join(root, name) |
| target_file = os.path.join(absolute_target_root, name) |
| if plain_text(source_file): |
| logger.info("Reading config template file [%s] and writing to [%s].", source_file, target_file) |
| with open(target_file, mode="a", encoding="utf-8") as f: |
| f.write(_render_template(env, config_vars, source_file)) |
| else: |
| logger.info("Treating [%s] as binary and copying as is to [%s].", source_file, target_file) |
| shutil.copy(source_file, target_file) |
| |
| |
| class BareProvisioner: |
| """ |
| The provisioner prepares the runtime environment for running the benchmark. It prepares all configuration files and copies the binary |
| of the benchmark candidate to the appropriate place. |
| """ |
| |
| def __init__(self, os_installer, distribution_version=None, apply_config=_apply_config): |
| self.os_installer = os_installer |
| self.distribution_version = distribution_version |
| self.apply_config = apply_config |
| self.logger = logging.getLogger(__name__) |
| |
| def prepare(self, binary): |
| self.os_installer.install(binary["solr"]) |
| # we need to immediately delete it as plugins may copy their configuration during installation. |
| self.os_installer.delete_pre_bundled_configuration() |
| |
| # determine after installation because some variables will depend on the install directory |
| target_root_path = self.os_installer.os_home_path |
| provisioner_vars = self._provisioner_variables() |
| for p in self.os_installer.config_source_paths: |
| self.apply_config(p, target_root_path, provisioner_vars) |
| |
| # Never let install hooks modify our original provisioner variables and just provide a copy! |
| self.os_installer.invoke_install_hook(cluster_config.BootstrapPhase.post_install, provisioner_vars.copy()) |
| |
| return NodeConfiguration("tar", self.os_installer.cluster_config.mandatory_var("runtime.jdk"), |
| self.os_installer.node_ip, self.os_installer.node_name, |
| self.os_installer.node_root_dir, self.os_installer.os_home_path, |
| self.os_installer.data_paths) |
| |
| def _provisioner_variables(self): |
| provisioner_vars = {} |
| provisioner_vars.update(self.os_installer.variables) |
| provisioner_vars["cluster_settings"] = {} |
| |
| return provisioner_vars |
| |
| |
| class NodeInstaller: |
| def __init__(self, cluster_config, java_home, node_name, node_root_dir, all_node_ips, all_node_names, ip, http_port, |
| hook_handler_class=cluster_config.BootstrapHookHandler): |
| self.cluster_config = cluster_config |
| self.java_home = java_home |
| self.node_name = node_name |
| self.node_root_dir = node_root_dir |
| self.install_dir = os.path.join(node_root_dir, "install") |
| self.node_log_dir = os.path.join(node_root_dir, "logs", "server") |
| self.heap_dump_dir = os.path.join(node_root_dir, "heapdump") |
| self.all_node_ips = all_node_ips |
| self.all_node_names = all_node_names |
| self.node_ip = ip |
| self.http_port = http_port |
| self.hook_handler = hook_handler_class(self.cluster_config) |
| if self.hook_handler.can_load(): |
| self.hook_handler.load() |
| self.os_home_path = None |
| self.data_paths = None |
| self.logger = logging.getLogger(__name__) |
| |
| def install(self, binary): |
| self.logger.info("Preparing candidate locally in [%s].", self.install_dir) |
| io.ensure_dir(self.install_dir) |
| io.ensure_dir(self.node_log_dir) |
| io.ensure_dir(self.heap_dump_dir) |
| |
| self.logger.info("Unzipping %s to %s", binary, self.install_dir) |
| io.decompress(binary, self.install_dir) |
| self.os_home_path = glob.glob(os.path.join(self.install_dir, "solr*"))[0] |
| self.data_paths = self._data_paths() |
| |
| def delete_pre_bundled_configuration(self): |
| # Solr doesn't have a pre-bundled config directory like OpenSearch |
| # Configuration is managed through configsets in server/solr/configsets/ |
| config_path = os.path.join(self.os_home_path, "config") |
| if os.path.exists(config_path): |
| self.logger.info("Deleting pre-bundled configuration at [%s]", config_path) |
| shutil.rmtree(config_path) |
| else: |
| self.logger.info("No pre-bundled config directory found at [%s], skipping deletion", config_path) |
| |
| def invoke_install_hook(self, phase, variables): |
| env = {} |
| if self.java_home: |
| env["JAVA_HOME"] = self.java_home |
| self.hook_handler.invoke(phase.name, variables=variables, env=env) |
| |
| @property |
| def variables(self): |
| # bind as specifically as possible |
| network_host = self.node_ip |
| |
| defaults = { |
| "cluster_name": "benchmark-provisioned-cluster", |
| "node_name": self.node_name, |
| "data_paths": self.data_paths, |
| "log_path": self.node_log_dir, |
| "heap_dump_path": self.heap_dump_dir, |
| # this is the node's IP address as specified by the user when invoking OSB |
| "node_ip": self.node_ip, |
| # this is the IP address that the node will be bound to. OSB will bind to the node's IP address (but not to 0.0.0.0). The |
| "network_host": network_host, |
| "http_port": str(self.http_port), |
| "zookeeper_port": str(self.http_port + 1000), |
| "all_node_ips": "[\"%s\"]" % "\",\"".join(self.all_node_ips), |
| "all_node_names": "[\"%s\"]" % "\",\"".join(self.all_node_names), |
| # at the moment we are strict and enforce that all nodes are master eligible nodes |
| "minimum_master_nodes": len(self.all_node_ips), |
| "install_root_path": self.os_home_path |
| } |
| variables = {} |
| variables.update(self.cluster_config.variables) |
| variables.update(defaults) |
| return variables |
| |
| @property |
| def config_source_paths(self): |
| return self.cluster_config.config_paths |
| |
| def _data_paths(self): |
| if "data_paths" in self.cluster_config.variables: |
| data_paths = self.cluster_config.variables["data_paths"] |
| if isinstance(data_paths, str): |
| return [data_paths] |
| elif isinstance(data_paths, list): |
| return data_paths |
| else: |
| raise exceptions.SystemSetupError("Expected [data_paths] to be either a string or a list but was [%s]." % type(data_paths)) |
| else: |
| return [os.path.join(self.os_home_path, "data")] |
| |
| |
| class DockerProvisioner: |
| def __init__(self, cluster_config, node_name, ip, http_port, node_root_dir, distribution_version, benchmark_root): |
| self.cluster_config = cluster_config |
| self.node_name = node_name |
| self.node_ip = ip |
| self.http_port = http_port |
| self.node_root_dir = node_root_dir |
| self.node_log_dir = os.path.join(node_root_dir, "logs", "server") |
| self.heap_dump_dir = os.path.join(node_root_dir, "heapdump") |
| self.distribution_version = distribution_version |
| self.benchmark_root = benchmark_root |
| self.binary_path = os.path.join(node_root_dir, "install") |
| # use a random subdirectory to isolate multiple runs because an external (non-root) user cannot clean it up. |
| self.data_paths = [os.path.join(node_root_dir, "data", str(uuid.uuid4()))] |
| self.logger = logging.getLogger(__name__) |
| |
| provisioner_defaults = { |
| "cluster_name": "benchmark-provisioned-cluster", |
| "node_name": self.node_name, |
| # we bind-mount the directories below on the host to these ones. |
| "install_root_path": "/var/solr", |
| "data_paths": ["/var/solr/data"], |
| "log_path": "/var/solr/logs", |
| "heap_dump_path": "/var/solr/heapdump", |
| # Docker container needs to expose service on external interfaces |
| "network_host": "0.0.0.0", |
| "discovery_type": "single-node", |
| "http_port": str(self.http_port), |
| "zookeeper_port": str(self.http_port + 1000), |
| "cluster_settings": {} |
| } |
| |
| self.config_vars = {} |
| self.config_vars.update(self.cluster_config.variables) |
| self.config_vars.update(provisioner_defaults) |
| |
| def prepare(self, binary): |
| # we need to allow other users to write to these directories due to Docker. |
| # |
| # Although os.mkdir passes 0o777 by default, mkdir(2) uses `mode & ~umask & 0777` to determine the final flags and |
| # hence we need to modify the process' umask here. For details see https://linux.die.net/man/2/mkdir. |
| previous_umask = os.umask(0) |
| try: |
| io.ensure_dir(self.binary_path) |
| io.ensure_dir(self.node_log_dir) |
| io.ensure_dir(self.heap_dump_dir) |
| io.ensure_dir(self.data_paths[0]) |
| finally: |
| os.umask(previous_umask) |
| |
| mounts = {} |
| |
| for cluster_config_config_path in self.cluster_config.config_paths: |
| for root, _, files in os.walk(cluster_config_config_path): |
| env = jinja2.Environment(loader=jinja2.FileSystemLoader(root), autoescape=select_autoescape(['html', 'xml'])) |
| |
| relative_root = root[len(cluster_config_config_path) + 1:] |
| absolute_target_root = os.path.join(self.binary_path, relative_root) |
| io.ensure_dir(absolute_target_root) |
| |
| for name in files: |
| source_file = os.path.join(root, name) |
| target_file = os.path.join(absolute_target_root, name) |
| mounts[target_file] = os.path.join("/var/solr", relative_root, name) |
| if plain_text(source_file): |
| self.logger.info("Reading config template file [%s] and writing to [%s].", source_file, target_file) |
| with open(target_file, mode="a", encoding="utf-8") as f: |
| f.write(_render_template(env, self.config_vars, source_file)) |
| else: |
| self.logger.info("Treating [%s] as binary and copying as is to [%s].", source_file, target_file) |
| shutil.copy(source_file, target_file) |
| |
| docker_cfg = self._render_template_from_file(self.docker_vars(mounts)) |
| self.logger.info("Starting Docker container with configuration:\n%s", docker_cfg) |
| |
| with open(os.path.join(self.binary_path, "docker-compose.yml"), mode="wt", encoding="utf-8") as f: |
| f.write(docker_cfg) |
| |
| return NodeConfiguration("docker", self.cluster_config.mandatory_var("runtime.jdk"), |
| self.node_ip, self.node_name, self.node_root_dir, self.binary_path, self.data_paths) |
| |
| def docker_vars(self, mounts): |
| # Determine Docker image based on version type |
| # SNAPSHOT versions use apache/solr-nightly, release versions use solr |
| base_docker_image = self.cluster_config.mandatory_var("docker_image") |
| if self.distribution_version and "-SNAPSHOT" in self.distribution_version: |
| # For SNAPSHOT versions, use apache/solr-nightly |
| docker_image = "apache/solr-nightly" |
| else: |
| # For release versions, use the configured image (typically "solr") |
| docker_image = base_docker_image |
| |
| v = { |
| "solr_version": self.distribution_version, |
| "docker_image": docker_image, |
| "http_port": self.http_port, |
| "solr_data_dir": self.data_paths[0], |
| "solr_log_dir": self.node_log_dir, |
| "solr_heap_dump_dir": self.heap_dump_dir, |
| "mounts": mounts |
| } |
| self._add_if_defined_for_cluster_config(v, "docker_mem_limit") |
| self._add_if_defined_for_cluster_config(v, "docker_cpu_count") |
| return v |
| |
| def _add_if_defined_for_cluster_config(self, variables, key): |
| if key in self.cluster_config.variables: |
| variables[key] = self.cluster_config.variables[key] |
| |
| def _render_template(self, loader, template_name, variables): |
| try: |
| env = jinja2.Environment(loader=loader, autoescape=select_autoescape(['html', 'xml'])) |
| for k, v in variables.items(): |
| env.globals[k] = v |
| template = env.get_template(template_name) |
| |
| return template.render() |
| except jinja2.exceptions.TemplateSyntaxError as e: |
| raise exceptions.InvalidSyntax("%s in %s" % (str(e), template_name)) |
| except BaseException as e: |
| raise exceptions.SystemSetupError("%s in %s" % (str(e), template_name)) |
| |
| def _render_template_from_file(self, variables): |
| compose_file = os.path.join(self.benchmark_root, "resources", "docker-compose.yml.j2") |
| return self._render_template(loader=jinja2.FileSystemLoader(io.dirname(compose_file)), |
| template_name=io.basename(compose_file), |
| variables=variables) |