blob: b84b54dd27e4ca894321d39fbe4cbac53ce5a02e [file]
# SPDX-License-Identifier: Apache-2.0
#
# Modifications by Apache Solr contributors; see git log for details.
# Licensed under the Apache License, Version 2.0.
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
import glob
import logging
import os
import re
import shutil
import urllib.error
from solrorbit import exceptions, paths, PROGRAM_NAME
from solrorbit.exceptions import BuildError, SystemSetupError
from solrorbit.utils import git, io, process, net, jvm, convert
# e.g. my-plugin:current - we cannot simply use String#split(":") as this would not work for timestamp-based revisions
REVISION_PATTERN = r"(\w.*?):(.*)"
def create(cfg, sources, cluster_config):
logger = logging.getLogger(__name__)
caching_enabled = cfg.opts("source", "cache", mandatory=False, default_value=True)
revisions = _extract_revisions(cfg.opts("builder", "source.revision", mandatory=sources))
distribution_version = cfg.opts("builder", "distribution.version", mandatory=False)
supply_requirements = _supply_requirements(sources, revisions, distribution_version)
build_needed = any([build for _, _, build in supply_requirements.values()])
os_supplier_type, os_version, _ = supply_requirements["solr"]
src_config = cfg.all_opts("source")
suppliers = []
template_renderer = TemplateRenderer(version=os_version)
if build_needed:
raw_build_jdk = cluster_config.mandatory_var("build.jdk")
try:
build_jdk = int(raw_build_jdk)
except ValueError:
raise exceptions.SystemSetupError(f"ClusterConfigInstance config key [build.jdk] is invalid: [{raw_build_jdk}] (must be int)")
os_src_dir = os.path.join(_src_dir(cfg), _config_value(src_config, "src.subdir"))
builder = Builder(os_src_dir, build_jdk, paths.logs())
else:
builder = None
distributions_root = os.path.join(cfg.opts("node", "root.dir"), cfg.opts("source", "distribution.dir"))
dist_cfg = {}
dist_cfg.update(cluster_config.variables)
dist_cfg.update(cfg.all_opts("distributions"))
if caching_enabled:
logger.info("Enabling source artifact caching.")
max_age_days = int(cfg.opts("source", "cache.days", mandatory=False, default_value=7))
if max_age_days <= 0:
raise exceptions.SystemSetupError(f"cache.days must be a positive number but is {max_age_days}")
source_distributions_root = os.path.join(distributions_root, "src")
_prune(source_distributions_root, max_age_days)
else:
logger.info("Disabling source artifact caching.")
source_distributions_root = None
if os_supplier_type == "source":
os_src_dir = os.path.join(_src_dir(cfg), _config_value(src_config, "src.subdir"))
source_supplier = SourceSupplier(os_version,
os_src_dir,
remote_url=cfg.opts("source", "remote.repo.url"),
cluster_config=cluster_config,
builder=builder,
template_renderer=template_renderer)
if caching_enabled:
os_file_resolver = FileNameResolver(dist_cfg, template_renderer)
source_supplier = CachedSourceSupplier(source_distributions_root,
source_supplier,
os_file_resolver)
suppliers.append(source_supplier)
else:
repo = DistributionRepository(name=cfg.opts("builder", "distribution.repository"),
distribution_config=dist_cfg,
template_renderer=template_renderer)
suppliers.append(DistributionSupplier(repo, os_version, distributions_root))
return CompositeSupplier(suppliers)
def _required_version(version):
if not version or version.strip() == "":
raise exceptions.SystemSetupError("Could not determine version. Please specify the Solr distribution "
"to download with the command line parameter --distribution-version.")
else:
return version
def _required_revision(revisions, key, name=None):
try:
return revisions[key]
except KeyError:
n = name if name is not None else key
raise exceptions.SystemSetupError("No revision specified for %s" % n)
def _supply_requirements(sources, revisions, distribution_version):
# * key: artifact
# * value: ("source" | "distribution", distribution_version | revision, build = True | False)
supply_requirements = {}
# can only build Solr with source-related pipelines -> ignore revision in that case
if "solr" in revisions and sources:
supply_requirements["solr"] = ("source", _required_revision(revisions, "solr", "Solr"), True)
else:
# no revision given or explicitly specified that it's from a distribution -> must use a distribution
supply_requirements["solr"] = ("distribution", _required_version(distribution_version), False)
# Solr does not support plugin installation via the benchmark tool
return supply_requirements
def _src_dir(cfg, mandatory=True):
# Don't let this spread across the whole module
try:
return cfg.opts("node", "src.root.dir", mandatory=mandatory)
except exceptions.ConfigError:
raise exceptions.SystemSetupError("You cannot benchmark Solr from sources. Did you install Gradle? Please install"
" all prerequisites and reconfigure with %s configure" % PROGRAM_NAME)
def _prune(root_path, max_age_days):
"""
Removes files that are older than ``max_age_days`` from ``root_path``. Subdirectories are not traversed.
:param root_path: A directory which should be checked.
:param max_age_days: Files that have been created more than ``max_age_days`` ago are deleted.
"""
logger = logging.getLogger(__name__)
if not os.path.exists(root_path):
logger.info("[%s] does not exist. Skipping pruning.", root_path)
return
for f in os.listdir(root_path):
artifact = os.path.join(root_path, f)
if os.path.isfile(artifact):
max_age = datetime.datetime.now() - datetime.timedelta(days=max_age_days)
try:
created_at = datetime.datetime.fromtimestamp(os.lstat(artifact).st_ctime)
if created_at < max_age:
logger.info("Deleting [%s] from artifact cache (reached max age).", f)
os.remove(artifact)
else:
logger.debug("Keeping [%s] (max age not yet reached)", f)
except OSError:
logger.exception("Could not check whether [%s] needs to be deleted from artifact cache.", artifact)
else:
logger.info("Skipping [%s] (not a file).", artifact)
class TemplateRenderer:
def __init__(self, version):
self.version = version
def render(self, template):
return template.replace("{{VERSION}}", str(self.version))
class CompositeSupplier:
def __init__(self, suppliers):
self.suppliers = suppliers
self.logger = logging.getLogger(__name__)
self.logger.info("Suppliers: %s", self.suppliers)
def __call__(self, *args, **kwargs):
binaries = {}
for supplier in self.suppliers:
supplier.fetch()
for supplier in self.suppliers:
supplier.prepare()
for supplier in self.suppliers:
supplier.add(binaries)
return binaries
class FileNameResolver:
def __init__(self, distribution_config, template_renderer):
self.cfg = distribution_config
self.template_renderer = template_renderer
@property
def revision(self):
return self.template_renderer.version
@revision.setter
def revision(self, revision):
self.template_renderer.version = revision
@property
def file_name(self):
# Solr distributions never include a JDK, so we always use release_url
url_key = "release_url"
url = self.template_renderer.render(self.cfg[url_key])
return url[url.rfind("/") + 1:]
@property
def artifact_key(self):
return "solr"
def to_artifact_path(self, file_system_path):
return file_system_path
def to_file_system_path(self, artifact_path):
return artifact_path
class CachedSourceSupplier:
def __init__(self, distributions_root, source_supplier, file_resolver):
self.distributions_root = distributions_root
self.source_supplier = source_supplier
self.file_resolver = file_resolver
self.cached_path = None
self.logger = logging.getLogger(__name__)
@property
def file_name(self):
return self.file_resolver.file_name
@property
def cached(self):
return self.cached_path is not None and os.path.exists(self.cached_path)
def fetch(self):
# Can we already resolve the artifact without fetching the source tree at all? This is the case when a specific
# revision (instead of a meta-revision like "current") is provided and the artifact is already cached. This is
# also needed if an external process pushes artifacts to OSB's cache which might have been built from a
# fork. In that case the provided commit hash would not be present in any case in the main OS repo.
maybe_an_artifact = os.path.join(self.distributions_root, self.file_name)
if os.path.exists(maybe_an_artifact):
self.cached_path = maybe_an_artifact
else:
resolved_revision = self.source_supplier.fetch()
if resolved_revision:
# ensure we use the resolved revision for rendering the artifact
self.file_resolver.revision = resolved_revision
self.cached_path = os.path.join(self.distributions_root, self.file_name)
def prepare(self):
if not self.cached:
self.source_supplier.prepare()
def add(self, binaries):
if self.cached:
self.logger.info("Using cached artifact in [%s]", self.cached_path)
binaries[self.file_resolver.artifact_key] = self.file_resolver.to_artifact_path(self.cached_path)
else:
self.source_supplier.add(binaries)
original_path = self.file_resolver.to_file_system_path(binaries[self.file_resolver.artifact_key])
# this can be None if the OpenSearch does not reside in a git repo and the user has only
# copied all source files. In that case, we cannot resolve a revision hash and thus we cannot cache.
if self.cached_path:
try:
io.ensure_dir(io.dirname(self.cached_path))
shutil.copy(original_path, self.cached_path)
self.logger.info("Caching artifact in [%s]", self.cached_path)
binaries[self.file_resolver.artifact_key] = self.file_resolver.to_artifact_path(self.cached_path)
except OSError:
self.logger.exception("Not caching [%s].", original_path)
else:
self.logger.info("Not caching [%s] (no revision info).", original_path)
class SourceSupplier:
def __init__(self, revision, os_src_dir, remote_url, cluster_config, builder, template_renderer):
self.revision = revision
self.src_dir = os_src_dir
self.remote_url = remote_url
self.cluster_config = cluster_config
self.builder = builder
self.template_renderer = template_renderer
def fetch(self):
return SourceRepository("Solr", self.remote_url, self.src_dir).fetch(self.revision)
def prepare(self):
if self.builder:
self.builder.build([
self.template_renderer.render(self.cluster_config.mandatory_var("clean_command")),
self.template_renderer.render(self.cluster_config.mandatory_var("system.build_command"))
])
def add(self, binaries):
binaries["solr"] = self.resolve_binary()
def resolve_binary(self):
try:
path = os.path.join(self.src_dir,
self.template_renderer.render(self.cluster_config.mandatory_var("system.artifact_path_pattern")))
return glob.glob(path)[0]
except IndexError:
raise SystemSetupError("Couldn't find a tar.gz distribution. Please run Solr Orbit with the pipeline 'from-sources'.")
class DistributionSupplier:
def __init__(self, repo, version, distributions_root):
self.repo = repo
self.version = version
self.distributions_root = distributions_root
# will be defined in the prepare phase
self.distribution_path = None
self.logger = logging.getLogger(__name__)
def fetch(self):
io.ensure_dir(self.distributions_root)
download_url = self.repo.download_url
distribution_path = os.path.join(self.distributions_root, self.repo.file_name)
self.logger.info("Resolved download URL [%s] for version [%s]", download_url, self.version)
if not os.path.isfile(distribution_path) or not self.repo.cache:
try:
self.logger.info("Starting download of Solr [%s]", self.version)
progress = net.Progress("[INFO] Downloading Solr %s" % self.version)
net.download(download_url, distribution_path, progress_indicator=progress)
progress.finish()
self.logger.info("Successfully downloaded Solr [%s].", self.version)
except urllib.error.HTTPError:
self.logger.exception("Cannot download Solr distribution for version [%s] from [%s].", self.version, download_url)
raise exceptions.SystemSetupError("Cannot download Solr distribution from [%s]. Please check that the specified "
"version [%s] is correct." % (download_url, self.version))
else:
self.logger.info("Skipping download for version [%s]. Found an existing binary at [%s].", self.version, distribution_path)
self.distribution_path = distribution_path
def prepare(self):
pass
def add(self, binaries):
binaries["solr"] = self.distribution_path
def _config_value(src_config, key):
try:
return src_config[key]
except KeyError:
raise exceptions.SystemSetupError("Mandatory config key [%s] is undefined. Please add it in the [source] section of the "
"config file." % key)
def _extract_revisions(revision):
revisions = revision.split(",") if revision else []
if len(revisions) == 1:
r = revisions[0]
if r.startswith("solr:"):
r = r[len("solr:"):]
# may as well be just a single plugin
m = re.match(REVISION_PATTERN, r)
if m:
return {
m.group(1): m.group(2)
}
else:
return {
"solr": r,
# use a catch-all value
"all": r
}
else:
results = {}
for r in revisions:
m = re.match(REVISION_PATTERN, r)
if m:
results[m.group(1)] = m.group(2)
else:
raise exceptions.SystemSetupError("Revision [%s] does not match expected format [name:revision]." % r)
return results
class SourceRepository:
"""
Supplier fetches the benchmark candidate source tree from the remote repository.
"""
def __init__(self, name, remote_url, src_dir):
self.name = name
self.remote_url = remote_url
self.src_dir = src_dir
self.logger = logging.getLogger(__name__)
def fetch(self, revision):
# if and only if we want to benchmark the current revision, OSB may skip repo initialization (if it is already present)
self._try_init(may_skip_init=revision == "current")
return self._update(revision)
def has_remote(self):
return self.remote_url is not None
def _try_init(self, may_skip_init=False):
if not git.is_working_copy(self.src_dir):
if self.has_remote():
self.logger.info("Downloading sources for %s from %s to %s.", self.name, self.remote_url, self.src_dir)
git.clone(self.src_dir, self.remote_url)
elif os.path.isdir(self.src_dir) and may_skip_init:
self.logger.info("Skipping repository initialization for %s.", self.name)
else:
exceptions.SystemSetupError("A remote repository URL is mandatory for %s" % self.name)
def _update(self, revision):
if self.has_remote() and revision == "latest":
self.logger.info("Fetching latest sources for %s from origin.", self.name)
git.pull(self.src_dir)
elif revision == "current":
self.logger.info("Skip fetching sources for %s.", self.name)
elif self.has_remote() and revision.startswith("@"):
# convert timestamp annotated for OSB to something git understands -> we strip leading and trailing " and the @.
git_ts_revision = revision[1:]
self.logger.info("Fetching from remote and checking out revision with timestamp [%s] for %s.", git_ts_revision, self.name)
git.pull_ts(self.src_dir, git_ts_revision)
elif self.has_remote(): # assume a git commit hash
self.logger.info("Fetching from remote and checking out revision [%s] for %s.", revision, self.name)
git.pull_revision(self.src_dir, revision)
else:
self.logger.info("Checking out local revision [%s] for %s.", revision, self.name)
git.checkout(self.src_dir, revision)
if git.is_working_copy(self.src_dir):
git_revision = git.head_revision(self.src_dir)
self.logger.info("User-specified revision [%s] for [%s] results in git revision [%s]", revision, self.name, git_revision)
return git_revision
else:
self.logger.info("Skipping git revision resolution for %s (%s is not a git repository).", self.name, self.src_dir)
return None
@classmethod
def is_commit_hash(cls, revision):
return revision != "latest" and revision != "current" and not revision.startswith("@")
class Builder:
"""
A builder is responsible for creating an installable binary from the source files.
It is not intended to be used directly but should be triggered by its builder.
"""
def __init__(self, src_dir, build_jdk=None, log_dir=None):
self.src_dir = src_dir
self.build_jdk = build_jdk
self._java_home = None
self.log_dir = log_dir
self.logger = logging.getLogger(__name__)
@property
def java_home(self):
if not self._java_home:
_, self._java_home = jvm.resolve_path(self.build_jdk)
return self._java_home
def build(self, commands, override_src_dir=None):
for command in commands:
self.run(command, override_src_dir)
def run(self, command, override_src_dir=None):
src_dir = self.src_dir if override_src_dir is None else override_src_dir
io.ensure_dir(self.log_dir)
log_file = os.path.join(self.log_dir, "build.log")
# we capture all output to a dedicated build log file
build_cmd = "export JAVA_HOME={}; cd {}; {} < /dev/null > {} 2>&1".format(self.java_home, src_dir, command, log_file)
self.logger.info("Running build command [%s]", build_cmd)
if process.run_subprocess(build_cmd):
msg = "Executing '{}' failed. The last 20 lines in the build log file are:\n".format(command)
msg += "=========================================================================================================\n"
with open(log_file, "r", encoding="utf-8") as f:
msg += "\t"
msg += "\t".join(f.readlines()[-20:])
msg += "=========================================================================================================\n"
msg += "The full build log is available at [{}].".format(log_file)
raise BuildError(msg)
class DistributionRepository:
def __init__(self, name, distribution_config, template_renderer):
self.name = name
self.cfg = distribution_config
self.template_renderer = template_renderer
self.logger = logging.getLogger(__name__)
@property
def download_url(self):
# Solr distributions never include a JDK, so we always use simple key names
default_key = "{}_url".format(self.name)
# benchmark.ini
override_key = "{}.url".format(self.name)
self.logger.info("keys: [%s] and [%s]", override_key, default_key)
return self._url_for(override_key, default_key)
@property
def file_name(self):
url = self.download_url
return url[url.rfind("/") + 1:]
def plugin_download_url(self, plugin_name):
# cluster_config repo
default_key = "plugin_{}_{}_url".format(plugin_name, self.name)
# benchmark.ini
override_key = "plugin.{}.{}.url".format(plugin_name, self.name)
return self._url_for(override_key, default_key, mandatory=False)
def _url_for(self, user_defined_key, default_key, mandatory=True):
try:
if user_defined_key in self.cfg:
url_template = self.cfg[user_defined_key]
else:
url_template = self.cfg[default_key]
except KeyError:
if mandatory:
raise exceptions.SystemSetupError("Neither config key [{}] nor [{}] is defined.".format(user_defined_key, default_key))
else:
return None
return self.template_renderer.render(url_template)
@property
def cache(self):
k = "{}.cache".format(self.name)
try:
raw_value = self.cfg[k]
except KeyError:
raise exceptions.SystemSetupError("Mandatory config key [%s] is undefined." % k)
try:
return convert.to_bool(raw_value)
except ValueError:
raise exceptions.SystemSetupError("Value [%s] for config key [%s] is not a valid boolean value." % (raw_value, k))