| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Modifications by Apache Solr contributors; see git log for details. |
| # Licensed under the Apache License, Version 2.0. |
| # |
| # The OpenSearch Contributors require contributions made to |
| # this file be licensed under the Apache-2.0 license or a |
| # compatible open source license. |
| # Modifications Copyright OpenSearch Contributors. See |
| # GitHub history for details. |
| # Licensed to Elasticsearch B.V. under one or more contributor |
| # license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright |
| # ownership. Elasticsearch B.V. licenses this file to you under |
| # the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import configparser |
| import logging |
| import os |
| from enum import Enum |
| |
| import tabulate |
| |
| from solrorbit import exceptions, PROGRAM_NAME |
| from solrorbit.builder.models.cluster_flavors import ClusterFlavor |
| from solrorbit.builder.models.cluster_infra_providers import ClusterInfraProvider |
| from solrorbit.utils import console, repo, io, modules |
| |
| CLUSTER_CONFIG_FORMAT_VERSION = 1 |
| |
| |
| def _path_for(cluster_config_root_path, cluster_config_member_type): |
| root_path = os.path.join(cluster_config_root_path, cluster_config_member_type, "v{}".format(CLUSTER_CONFIG_FORMAT_VERSION)) |
| if not os.path.exists(root_path): |
| raise exceptions.SystemSetupError("Path {} for {} does not exist.".format(root_path, cluster_config_member_type)) |
| return root_path |
| |
| |
| def list_cluster_configs(cfg): |
| loader = ClusterConfigInstanceLoader(cluster_config_path(cfg)) |
| cluster_configs = [] |
| for name in loader.cluster_config_names(): |
| cluster_configs.append(loader.load_cluster_config(name)) |
| # first by type, then by name (we need to run the sort in reverse for that) |
| # idiomatic way according to https://docs.python.org/3/howto/sorting.html#sort-stability-and-complex-sorts |
| cluster_configs = sorted(sorted(cluster_configs, key=lambda c: c.name), key=lambda c: c.type) |
| console.println("Available cluster-configs:\n") |
| console.println(tabulate.tabulate( |
| [[c.name, c.type, c.description] for c in cluster_configs], |
| headers=["Name", "Type", "Description"])) |
| |
| |
| def load_cluster_config(repo, name, cluster_config_params=None): |
| class Component: |
| def __init__(self, root_path, entry_point): |
| self.root_path = root_path |
| self.entry_point = entry_point |
| |
| root_path = None |
| # preserve order as we append to existing config files later during provisioning. |
| all_config_paths = [] |
| all_config_base_vars = {} |
| all_cluster_config_vars = {} |
| |
| for n in name: |
| descriptor = ClusterConfigInstanceLoader(repo).load_cluster_config(n, cluster_config_params) |
| for p in descriptor.config_paths: |
| if p not in all_config_paths: |
| all_config_paths.append(p) |
| for p in descriptor.root_paths: |
| # probe whether we have a root path |
| if BootstrapHookHandler(Component(root_path=p, entry_point=ClusterConfigInstance.entry_point)).can_load(): |
| if not root_path: |
| root_path = p |
| # multiple cluster_configs are based on the same hook |
| elif root_path != p: |
| raise exceptions.SystemSetupError( |
| "Invalid cluster_config: {}. Multiple bootstrap hooks are forbidden.".format(name)) |
| all_config_base_vars.update(descriptor.config_base_variables) |
| all_cluster_config_vars.update(descriptor.variables) |
| |
| if len(all_config_paths) == 0: |
| raise exceptions.SystemSetupError("At least one config base is required for cluster_config {}".format(name)) |
| variables = {} |
| # cluster_config variables *always* take precedence over config base variables |
| variables.update(all_config_base_vars) |
| variables.update(all_cluster_config_vars) |
| |
| return ClusterConfigInstance(name, root_path, all_config_paths, variables=variables) |
| |
| |
| def load_plugin(repo, name, config, plugin_params=None): |
| return PluginLoader(repo).load_plugin(name, config, plugin_params) |
| |
| |
| def load_plugins(repo, plugin_names, plugin_params=None): |
| def name_and_config(p): |
| plugin_spec = p.split(":") |
| if len(plugin_spec) == 1: |
| return plugin_spec[0], None |
| elif len(plugin_spec) == 2: |
| return plugin_spec[0], plugin_spec[1].split("+") |
| else: |
| raise ValueError("Unrecognized plugin specification [%s]. Use either 'PLUGIN_NAME' or 'PLUGIN_NAME:PLUGIN_CONFIG'." % p) |
| |
| plugins = [] |
| if plugin_names: |
| for plugin in plugin_names: |
| plugin_name, plugin_config = name_and_config(plugin) |
| plugins.append(load_plugin(repo, plugin_name, plugin_config, plugin_params)) |
| return plugins |
| |
| |
| def cluster_config_path(cfg): |
| root_path = cfg.opts("builder", "cluster_config.path", mandatory=False) |
| if root_path: |
| return root_path |
| else: |
| distribution_version = cfg.opts("builder", "distribution.version", mandatory=False) |
| repo_name = cfg.opts("builder", "repository.name") |
| repo_revision = cfg.opts("builder", "repository.revision") |
| offline = cfg.opts("system", "offline.mode") |
| default_directory = cfg.opts("cluster_configs", "%s.dir" % repo_name, mandatory=False) |
| root = cfg.opts("node", "root.dir") |
| cluster_config_repositories = cfg.opts("builder", "cluster_config.repository.dir") |
| cluster_configs_dir = os.path.join(root, cluster_config_repositories) |
| |
| current_cluster_config_repo = repo.BenchmarkRepository( |
| default_directory, cluster_configs_dir, |
| repo_name, "cluster_configs", offline) |
| |
| current_cluster_config_repo.set_cluster_configs_dir(repo_revision, distribution_version, cfg) |
| return current_cluster_config_repo.repo_dir |
| |
| |
| class ClusterConfigInstanceLoader: |
| def __init__(self, cluster_config_root_path): |
| self.cluster_configs_dir = _path_for(cluster_config_root_path, "cluster_configs") |
| self.logger = logging.getLogger(__name__) |
| |
| def cluster_config_names(self): |
| def __cluster_config_name(path): |
| p, _ = io.splitext(path) |
| return io.basename(p) |
| |
| def __is_cluster_config(path): |
| _, extension = io.splitext(path) |
| return extension == ".ini" |
| return map(__cluster_config_name, filter( |
| __is_cluster_config, |
| os.listdir(self.cluster_configs_dir))) |
| |
| def _cluster_config_file(self, name): |
| return os.path.join(self.cluster_configs_dir, "{}.ini".format(name)) |
| |
| def load_cluster_config(self, name, cluster_config_params=None): |
| cluster_config_config_file = self._cluster_config_file(name) |
| if not io.exists(cluster_config_config_file): |
| raise exceptions.SystemSetupError( |
| "Unknown cluster-config [{}]. List the available " |
| "cluster-configs with {} list cluster-configs.".format(name, PROGRAM_NAME)) |
| config = self._config_loader(cluster_config_config_file) |
| root_paths = [] |
| config_paths = [] |
| config_base_vars = {} |
| description = self._value(config, ["meta", "description"], default="") |
| cluster_config_type = self._value(config, ["meta", "type"], default="cluster-config-instance") |
| config_bases = self._value(config, ["config", "base"], default="").split(",") |
| for base in config_bases: |
| if base: |
| root_path = os.path.join(self.cluster_configs_dir, base) |
| root_paths.append(root_path) |
| config_paths.append(os.path.join(root_path, "templates")) |
| config_file = os.path.join(root_path, "config.ini") |
| if io.exists(config_file): |
| base_config = self._config_loader(config_file) |
| self._copy_section(base_config, "variables", config_base_vars) |
| |
| # it's possible that some cluster_configs don't have a config base, e.g. mixins which only override variables |
| if len(config_paths) == 0: |
| self.logger.info("ClusterConfigInstance [%s] does not define any config paths. Assuming that it is used as a mixin.", name) |
| variables = self._copy_section(config, "variables", {}) |
| # add all cluster_config params here to override any defaults |
| if cluster_config_params: |
| variables.update(cluster_config_params) |
| |
| return ClusterConfigInstanceDescriptor( |
| name, description, cluster_config_type, |
| root_paths, config_paths, config_base_vars, variables) |
| |
| def _config_loader(self, file_name): |
| config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) |
| # Do not modify the case of option keys but read them as is |
| config.optionxform = lambda option: option |
| config.read(file_name) |
| return config |
| |
| def _value(self, cfg, section_path, default=None): |
| path = [section_path] if (isinstance(section_path, str)) else section_path |
| current_cfg = cfg |
| for k in path: |
| if k in current_cfg: |
| current_cfg = current_cfg[k] |
| else: |
| return default |
| return current_cfg |
| |
| def _copy_section(self, cfg, section, target): |
| if section in cfg.sections(): |
| for k, v in cfg[section].items(): |
| target[k] = v |
| return target |
| |
| |
| class ClusterConfigInstanceDescriptor: |
| def __init__(self, name, description, type, root_paths, config_paths, config_base_variables, variables): |
| self.name = name |
| self.description = description |
| self.type = type |
| self.root_paths = root_paths |
| self.config_paths = config_paths |
| self.config_base_variables = config_base_variables |
| self.variables = variables |
| |
| def __hash__(self): |
| return hash(self.name) |
| |
| def __eq__(self, other): |
| return isinstance(other, type(self)) and self.name == other.name |
| |
| |
| class ClusterConfigInstance: |
| # name of the initial Python file to load for cluster_configs. |
| entry_point = "config" |
| |
| def __init__(self, names, root_path, config_paths, provider=ClusterInfraProvider.LOCAL, |
| flavor=ClusterFlavor.SELF_MANAGED, variables=None): |
| """ |
| Creates new settings for a benchmark candidate. |
| |
| :param names: Descriptive name(s) for this cluster_config. |
| :param root_path: The root path from which bootstrap hooks should be loaded if any. May be ``None``. |
| :param config_paths: A non-empty list of paths where the raw config can be found. |
| ;param provider: The infrastructure provider for the cluster |
| ;param flavor: The flavor of cluster to be provisioned |
| :param variables: A dict containing variable definitions that need to be replaced. |
| """ |
| if variables is None: |
| variables = {} |
| if isinstance(names, str): |
| self.names = [names] |
| else: |
| self.names = names |
| self.root_path = root_path |
| self.provider = provider |
| self.flavor = flavor |
| self.config_paths = config_paths |
| self.variables = variables |
| |
| def mandatory_var(self, name): |
| try: |
| return self.variables[name] |
| except KeyError: |
| raise exceptions.SystemSetupError("ClusterConfigInstance \"{}\" requires config key \"{}\"".format(self.name, name)) |
| |
| @property |
| def name(self): |
| return "+".join(self.names) |
| |
| # Adapter method for BootstrapHookHandler |
| @property |
| def config(self): |
| return self.name |
| |
| @property |
| def safe_name(self): |
| return "_".join(self.names) |
| |
| def __str__(self): |
| return self.name |
| |
| |
| class PluginLoader: |
| def __init__(self, cluster_config_root_path): |
| self.plugins_root_path = _path_for(cluster_config_root_path, "plugins") |
| self.logger = logging.getLogger(__name__) |
| |
| def plugins(self, variables=None): |
| known_plugins = self._core_plugins(variables) + self._configured_plugins(variables) |
| sorted(known_plugins, key=lambda p: p.name) |
| return known_plugins |
| |
| def _core_plugins(self, variables=None): |
| core_plugins = [] |
| core_plugins_path = os.path.join(self.plugins_root_path, "core-plugins.txt") |
| if os.path.exists(core_plugins_path): |
| with open(core_plugins_path, mode="rt", encoding="utf-8") as f: |
| for line in f: |
| if not line.startswith("#"): |
| # be forward compatible and allow additional values (comma-separated). At the moment, we only use the plugin name. |
| values = line.strip().split(",") |
| core_plugins.append(PluginDescriptor(name=values[0], core_plugin=True, variables=variables)) |
| return core_plugins |
| |
| def _configured_plugins(self, variables=None): |
| configured_plugins = [] |
| # each directory is a plugin, each .ini is a config (just go one level deep) |
| for entry in os.listdir(self.plugins_root_path): |
| plugin_path = os.path.join(self.plugins_root_path, entry) |
| if os.path.isdir(plugin_path): |
| for child_entry in os.listdir(plugin_path): |
| if os.path.isfile(os.path.join(plugin_path, child_entry)) and io.has_extension(child_entry, ".ini"): |
| f, _ = io.splitext(child_entry) |
| plugin_name = self._file_to_plugin_name(entry) |
| config = io.basename(f) |
| configured_plugins.append(PluginDescriptor(name=plugin_name, config=config, variables=variables)) |
| return configured_plugins |
| |
| def _plugin_file(self, name, config): |
| return os.path.join(self._plugin_root_path(name), "%s.ini" % config) |
| |
| def _plugin_root_path(self, name): |
| return os.path.join(self.plugins_root_path, self._plugin_name_to_file(name)) |
| |
| # As we allow to store Python files in the plugin directory and the plugin directory also serves as the root path of the corresponding |
| # module, we need to adhere to the Python restrictions here. For us, this is that hyphens in module names are not allowed. Hence, we |
| # need to switch from underscores to hyphens and vice versa. |
| # |
| # We are implicitly assuming that plugin names stick to the convention of hyphen separation to simplify implementation and usage a bit. |
| def _file_to_plugin_name(self, file_name): |
| return file_name.replace("_", "-") |
| |
| def _plugin_name_to_file(self, plugin_name): |
| return plugin_name.replace("-", "_") |
| |
| def _core_plugin(self, name, variables=None): |
| return next((p for p in self._core_plugins(variables) if p.name == name and p.config is None), None) |
| |
| def load_plugin(self, name, config_names, plugin_params=None): |
| if config_names is not None: |
| self.logger.info("Loading plugin [%s] with configuration(s) [%s].", name, config_names) |
| else: |
| self.logger.info("Loading plugin [%s] with default configuration.", name) |
| |
| root_path = self._plugin_root_path(name) |
| # used to determine whether this is a core plugin |
| core_plugin = self._core_plugin(name) |
| if not config_names: |
| # maybe we only have a config folder but nothing else (e.g. if there is only an install hook) |
| if io.exists(root_path): |
| return PluginDescriptor(name=name, |
| core_plugin=core_plugin is not None, |
| config=config_names, |
| root_path=root_path, |
| variables=plugin_params) |
| else: |
| if core_plugin: |
| return core_plugin |
| # If we just have a plugin name then we assume that this is a community plugin and the user has specified a download URL |
| else: |
| self.logger.info("The plugin [%s] is neither a configured nor an official plugin. Assuming that this is a community " |
| "plugin not requiring any configuration and you have set a proper download URL.", name) |
| return PluginDescriptor(name, variables=plugin_params) |
| else: |
| variables = {} |
| config_paths = [] |
| # used for deduplication |
| known_config_bases = set() |
| |
| for config_name in config_names: |
| config_file = self._plugin_file(name, config_name) |
| # Do we have an explicit configuration for this plugin? |
| if not io.exists(config_file): |
| if core_plugin: |
| raise exceptions.SystemSetupError("Plugin [%s] does not provide configuration [%s]. List the available plugins " |
| "and configurations with %s list cluster-configs " |
| "--distribution-version=VERSION." % (name, config_name, PROGRAM_NAME)) |
| else: |
| raise exceptions.SystemSetupError("Unknown plugin [%s]. List the available plugins with %s list " |
| "cluster-configs --distribution-version=VERSION." % (name, PROGRAM_NAME)) |
| |
| config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) |
| # Do not modify the case of option keys but read them as is |
| config.optionxform = lambda option: option |
| config.read(config_file) |
| if "config" in config and "base" in config["config"]: |
| config_bases = config["config"]["base"].split(",") |
| for base in config_bases: |
| if base and base not in known_config_bases: |
| config_paths.append(os.path.join(root_path, base, "templates")) |
| known_config_bases.add(base) |
| |
| if "variables" in config.sections(): |
| for k, v in config["variables"].items(): |
| variables[k] = v |
| # add all plugin params here to override any defaults |
| if plugin_params: |
| variables.update(plugin_params) |
| |
| # maybe one of the configs is really just for providing variables. However, we still require one config base overall. |
| if len(config_paths) == 0: |
| raise exceptions.SystemSetupError("At least one config base is required for plugin [%s]" % name) |
| return PluginDescriptor(name=name, core_plugin=core_plugin is not None, config=config_names, root_path=root_path, |
| config_paths=config_paths, variables=variables) |
| |
| |
| class PluginDescriptor: |
| # name of the initial Python file to load for plugins. |
| entry_point = "plugin" |
| |
| def __init__(self, name, core_plugin=False, config=None, root_path=None, config_paths=None, variables=None): |
| if config_paths is None: |
| config_paths = [] |
| if variables is None: |
| variables = {} |
| self.name = name |
| self.core_plugin = core_plugin |
| self.config = config |
| self.root_path = root_path |
| self.config_paths = config_paths |
| self.variables = variables |
| |
| def __str__(self): |
| return "Plugin descriptor for [%s]" % self.name |
| |
| def __repr__(self): |
| r = [] |
| for prop, value in vars(self).items(): |
| r.append("%s = [%s]" % (prop, repr(value))) |
| return ", ".join(r) |
| |
| def __hash__(self): |
| return hash(self.name) ^ hash(self.config) ^ hash(self.core_plugin) |
| |
| def __eq__(self, other): |
| return isinstance(other, type(self)) and (self.name, self.config, self.core_plugin) == (other.name, other.config, other.core_plugin) |
| |
| |
| class BootstrapPhase(Enum): |
| post_install = 10 |
| |
| @classmethod |
| def valid(cls, name): |
| for n in BootstrapPhase.names(): |
| if n == name: |
| return True |
| return False |
| |
| @classmethod |
| def names(cls): |
| return [p.name for p in list(BootstrapPhase)] |
| |
| |
| class BootstrapHookHandler: |
| """ |
| Responsible for loading and executing component-specific intitialization code. |
| """ |
| def __init__(self, component, loader_class=modules.ComponentLoader): |
| """ |
| Creates a new BootstrapHookHandler. |
| |
| :param component: The component that should be loaded. |
| In practice, this is a PluginDescriptor or a ClusterConfigInstance instance. |
| :param loader_class: The implementation that loads the provided component's code. |
| """ |
| self.component = component |
| # Don't allow the loader to recurse. |
| self.loader = loader_class(root_path=self.component.root_path, component_entry_point=self.component.entry_point, recurse=False) |
| self.hooks = {} |
| self.logger = logging.getLogger(__name__) |
| |
| def can_load(self): |
| return self.loader.can_load() |
| |
| def load(self): |
| root_module = self.loader.load() |
| try: |
| # every module needs to have a register() method |
| root_module.register(self) |
| except exceptions.BenchmarkError: |
| # just pass our own exceptions transparently. |
| raise |
| except BaseException: |
| msg = "Could not load bootstrap hooks in [{}]".format(self.loader.root_path) |
| self.logger.exception(msg) |
| raise exceptions.SystemSetupError(msg) |
| |
| def register(self, phase, hook): |
| self.logger.info("Registering bootstrap hook [%s] for phase [%s] in component [%s]", hook.__name__, phase, self.component.name) |
| if not BootstrapPhase.valid(phase): |
| raise exceptions.SystemSetupError("Unknown bootstrap phase [{}]. Valid phases are: {}.".format(phase, BootstrapPhase.names())) |
| if phase not in self.hooks: |
| self.hooks[phase] = [] |
| self.hooks[phase].append(hook) |
| |
| def invoke(self, phase, **kwargs): |
| if phase in self.hooks: |
| self.logger.info("Invoking phase [%s] for component [%s] in config [%s]", phase, self.component.name, self.component.config) |
| for hook in self.hooks[phase]: |
| self.logger.info("Invoking bootstrap hook [%s].", hook.__name__) |
| # hooks should only take keyword arguments to be forwards compatible with OSB! |
| hook(config_names=self.component.config, **kwargs) |
| else: |
| self.logger.debug("Component [%s] in config [%s] has no hook registered for phase [%s].", |
| self.component.name, self.component.config, phase) |