| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Modifications by Apache Solr contributors; see git log for details. |
| # Licensed under the Apache License, Version 2.0. |
| # |
| # The OpenSearch Contributors require contributions made to |
| # this file be licensed under the Apache-2.0 license or a |
| # compatible open source license. |
| # Modifications Copyright OpenSearch Contributors. See |
| # GitHub history for details. |
| # Licensed to Elasticsearch B.V. under one or more contributor |
| # license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright |
| # ownership. Elasticsearch B.V. licenses this file to you under |
| # the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import errno |
| import functools |
| import json |
| import os |
| import random |
| import socket |
| import time |
| import datetime |
| |
| import pytest |
| |
| from solrorbit import client, config, version, paths |
| from solrorbit.utils import process |
| |
| CONFIG_NAMES = ["in-memory-it"] |
| DISTRIBUTIONS = ["9.10.1", "10.1.0"] |
| WORKLOADS = ["geonames", "nyc_taxis"] |
| BASE_COMMANDS = ["solr-orbit"] |
| ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) |
| |
| |
| def all_benchmark_configs(t): |
| @functools.wraps(t) |
| @pytest.mark.parametrize("cfg", CONFIG_NAMES) |
| def wrapper(cfg, *args, **kwargs): |
| t(cfg, *args, **kwargs) |
| |
| return wrapper |
| |
| |
| def random_benchmark_config(t): |
| @functools.wraps(t) |
| @pytest.mark.parametrize("cfg", [random.choice(CONFIG_NAMES)]) |
| def wrapper(cfg, *args, **kwargs): |
| t(cfg, *args, **kwargs) |
| |
| return wrapper |
| |
| |
| def benchmark_in_mem(t): |
| @functools.wraps(t) |
| @pytest.mark.parametrize("cfg", ["in-memory-it"]) |
| def wrapper(cfg, *args, **kwargs): |
| t(cfg, *args, **kwargs) |
| |
| return wrapper |
| |
| |
| def benchmark_os(t): |
| @functools.wraps(t) |
| @pytest.mark.parametrize("cfg", ["os-it"]) |
| def wrapper(cfg, *args, **kwargs): |
| t(cfg, *args, **kwargs) |
| |
| return wrapper |
| |
| |
| def solrorbit_command_line_for(cfg, command_line): |
| return f"{random.choice(BASE_COMMANDS)} {command_line} --configuration-name='{cfg}'" |
| |
| |
| def solrorbit(cfg, command_line): |
| """ |
| This method should be used for benchmark invocations of the all commands besides test_run. |
| These commands may have different CLI options than test_run. |
| """ |
| cmd = solrorbit_command_line_for(cfg, command_line) |
| print(f'\n{datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")} Invoking solr-orbit: {cmd}') |
| err, retcode = process.run_subprocess_with_stderr(cmd) |
| if retcode != 0: |
| print(err) |
| return retcode |
| |
| |
| def run_test(cfg, command_line): |
| """ |
| This method should be used for benchmark invocations of the test_run command. |
| It sets up some defaults for how the integration tests expect to run test_runs. |
| """ |
| return solrorbit(cfg, f"run {command_line} --kill-running-processes --on-error='abort'") |
| |
| |
| def shell_cmd(command_line): |
| """ |
| Executes a given command_line in a subshell. |
| |
| :param command_line: (str) The command to execute |
| :return: (int) the exit code |
| """ |
| |
| return os.system(command_line) |
| |
| |
| def command_in_docker(command_line, python_version): |
| docker_command = f"docker run --rm -v {ROOT_DIR}:/benchmark_ro:ro python:{python_version} bash -c '{command_line}'" |
| |
| return shell_cmd(docker_command) |
| |
| |
| def wait_until_port_is_free(port_number=39200, timeout=120): |
| start = time.perf_counter() |
| end = start + timeout |
| while time.perf_counter() < end: |
| c = socket.socket() |
| connect_result = c.connect_ex(("127.0.0.1", port_number)) |
| # noinspection PyBroadException |
| try: |
| if connect_result == errno.ECONNREFUSED: |
| c.close() |
| return |
| else: |
| c.close() |
| time.sleep(0.5) |
| except Exception: |
| pass |
| |
| raise TimeoutError(f"Port [{port_number}] is occupied after [{timeout}] seconds") |
| |
| |
| def check_prerequisites(): |
| if process.run_subprocess_with_logging("docker ps") != 0: |
| raise AssertionError("Docker must be installed and the daemon must be up and running to run integration tests.") |
| if process.run_subprocess_with_logging("docker-compose --help") != 0: |
| raise AssertionError("Docker Compose is required to run integration tests.") |
| |
| |
| class ConfigFile: |
| def __init__(self, config_name): |
| self.user_home = os.getenv("BENCHMARK_HOME", os.path.expanduser("~")) |
| self.benchmark_home = paths.benchmark_confdir() |
| if config_name is not None: |
| self.config_file_name = f"benchmark-{config_name}.ini" |
| else: |
| self.config_file_name = "benchmark.ini" |
| self.source_path = os.path.join(os.path.dirname(__file__), "resources", self.config_file_name) |
| self.target_path = os.path.join(self.benchmark_home, self.config_file_name) |
| |
| |
| class TestCluster: |
| def __init__(self, cfg): |
| self.cfg = cfg |
| self.installation_id = None |
| self.http_port = None |
| |
| def install(self, distribution_version, node_name, cluster_config, http_port): |
| self.http_port = http_port |
| transport_port = http_port + 100 |
| try: |
| err, retcode = process.run_subprocess_with_stderr( |
| "solr-orbit install --configuration-name={cfg} --distribution-version={dist} --build-type=tar " |
| "--http-port={http_port} --node={node_name} --master-nodes=" |
| "{node_name} --cluster-config={cluster_config} " |
| "--seed-hosts=\"127.0.0.1:{transport_port}\"".format(cfg=self.cfg, |
| dist=distribution_version, |
| http_port=http_port, |
| node_name=node_name, |
| cluster_config=cluster_config, |
| transport_port=transport_port)) |
| if retcode != 0: |
| raise AssertionError("Failed to install node {}.".format(distribution_version), err) |
| self.installation_id = json.loads(err)["installation-id"] |
| except BaseException as e: |
| raise AssertionError("Failed to install node {}.".format(distribution_version), e) |
| |
| def start(self, test_run_id): |
| cmd = "start --runtime-jdk=\"bundled\" --installation-id={} --test-run-id={}".format(self.installation_id, test_run_id) |
| if solrorbit(self.cfg, cmd) != 0: |
| raise AssertionError("Failed to start test cluster.") |
| solr_client = client.ClientFactory(hosts=[{"host": "127.0.0.1", "port": self.http_port}], client_options={}).create() |
| client.wait_for_rest_layer(solr_client) |
| |
| def stop(self): |
| if self.installation_id: |
| if solrorbit(self.cfg, "stop --installation-id={}".format(self.installation_id)) != 0: |
| raise AssertionError("Failed to stop test cluster.") |
| |
| def __str__(self): |
| return f"TestCluster[installation-id={self.installation_id}]" |
| |
| |
| class MetricsStore: |
| VERSION = "1.3.9" |
| |
| def __init__(self): |
| self.cluster = TestCluster("in-memory-it") |
| |
| def start(self): |
| self.cluster.install(distribution_version=MetricsStore.VERSION, |
| node_name="metrics-store", |
| cluster_config="defaults", |
| http_port=10200) |
| self.cluster.start(test_run_id="metrics-store") |
| |
| def stop(self): |
| self.cluster.stop() |
| |
| |
| def install_integration_test_config(): |
| def copy_config(name): |
| source_path = os.path.join(os.path.dirname(__file__), "resources", f"benchmark-{name}.ini") |
| f = config.ConfigFile(name) |
| f.store_default_config(template_path=source_path) |
| |
| for n in CONFIG_NAMES: |
| copy_config(n) |
| |
| |
| def remove_integration_test_config(): |
| for config_name in CONFIG_NAMES: |
| os.remove(config.ConfigFile(config_name).location) |
| |
| |
| METRICS_STORE = MetricsStore() |
| |
| |
| def get_license(): |
| with open(os.path.join(ROOT_DIR, 'LICENSE')) as license_file: |
| return license_file.readlines()[1].strip() |
| |
| |
| def build_docker_image(): |
| benchmark_version = version.__version__ |
| |
| env_variables = os.environ.copy() |
| env_variables['BENCHMARK_VERSION'] = benchmark_version |
| env_variables['BENCHMARK_LICENSE'] = get_license() |
| |
| command = f"docker build -t apache/solr-orbit:{benchmark_version}" \ |
| f" --build-arg BENCHMARK_VERSION --build-arg BENCHMARK_LICENSE " \ |
| f"-f {ROOT_DIR}/docker/Dockerfiles/Dockerfile-dev {ROOT_DIR}" |
| |
| if process.run_subprocess_with_logging(command, env=env_variables) != 0: |
| raise AssertionError("It was not possible to build the docker image from Dockerfile-dev") |
| |
| |
| def setup_module(): |
| check_prerequisites() |
| install_integration_test_config() |
| METRICS_STORE.start() |
| |
| |
| def teardown_module(): |
| METRICS_STORE.stop() |
| remove_integration_test_config() |