| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Modifications by Apache Solr contributors; see git log for details. |
| # Licensed under the Apache License, Version 2.0. |
| # |
| # The OpenSearch Contributors require contributions made to |
| # this file be licensed under the Apache-2.0 license or a |
| # compatible open source license. |
| # Modifications Copyright OpenSearch Contributors. See |
| # GitHub history for details. |
| # Licensed to Elasticsearch B.V. under one or more contributor |
| # license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright |
| # ownership. Elasticsearch B.V. licenses this file to you under |
| # the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import argparse |
| import datetime |
| import logging |
| import os |
| import platform |
| import sys |
| import time |
| import uuid |
| import shutil |
| |
| # macOS fork-safety: Python programs that use multiprocessing (Thespian) can trigger |
| # an Objective-C runtime crash when forking after any thread has started (e.g. from |
| # background telemetry polling). Setting this env-var before any fork suppresses |
| # the crash. This must be done before the actor system (and its admin process) is |
| # bootstrapped. See: https://bugs.python.org/issue33725 |
| if sys.platform == "darwin": |
| os.environ.setdefault("OBJC_DISABLE_INITIALIZE_FORK_SAFETY", "YES") |
| |
| import thespian.actors |
| |
| from solrorbit import PROGRAM_NAME, BANNER, FORUM_LINK, SKULL, check_python_version, doc_link, telemetry |
| from solrorbit import version, actor, config, paths, \ |
| test_run_orchestrator, publisher, \ |
| metrics, workload, exceptions, log |
| from solrorbit.builder import cluster_config, builder |
| from solrorbit.synthetic_data_generator import synthetic_data_generator_orchestrator |
| from solrorbit.workload_generator import workload_generator |
| from solrorbit.utils import io, convert, process, console, net, opts, versions |
| from solrorbit import aggregator |
| |
| def create_arg_parser(): |
| def positive_number(v): |
| value = int(v) |
| if value <= 0: |
| raise argparse.ArgumentTypeError(f"must be positive but was {value}") |
| return value |
| |
| def non_empty_list(arg): |
| lst = opts.csv_to_list(arg) |
| if len(lst) < 1: |
| raise argparse.ArgumentError(argument=None, message="At least one argument required!") |
| return lst |
| |
| def runtime_jdk(v): |
| if v == "bundled": |
| return v |
| else: |
| try: |
| return positive_number(v) |
| except argparse.ArgumentTypeError: |
| raise argparse.ArgumentTypeError(f"must be a positive number or 'bundled' but was {v}") |
| |
| def supported_os_version(v): |
| if v: |
| min_solr_version = versions.Version.from_string(version.minimum_solr_version()) |
| specified_version = versions.Version.from_string(v) |
| if specified_version < min_solr_version: |
| raise argparse.ArgumentTypeError(f"must be at least {min_solr_version} but was {v}") |
| return v |
| |
| def add_workload_source(subparser): |
| workload_source_group = subparser.add_mutually_exclusive_group() |
| workload_source_group.add_argument( |
| "--workload-repository", |
| help="Define the repository from where solr-orbit will load workloads (default: default).", |
| # argparse is smart enough to use this default only if the user did not use --workload-path and also did not specify anything |
| default="default" |
| ) |
| workload_source_group.add_argument( |
| "--workload-path", |
| help="Define the path to a workload.") |
| subparser.add_argument( |
| "--workload-revision", |
| help="Define a specific revision in the workload repository that solr-orbit should use.", |
| default=None) |
| |
| # try to preload configurable defaults, but this does not work together with `--configuration-name` (which is undocumented anyway) |
| cfg = config.Config() |
| if cfg.config_present(): |
| cfg.load_config() |
| preserve_install = cfg.opts("defaults", "preserve_benchmark_candidate", default_value=False, mandatory=False) |
| else: |
| preserve_install = False |
| |
| parser = argparse.ArgumentParser(prog=PROGRAM_NAME, |
| description=BANNER + "\n\n A macrobenchmarking tool for Apache Solr", |
| epilog="Find out more at {}".format(console.format.link(doc_link())), |
| formatter_class=argparse.RawDescriptionHelpFormatter) |
| parser.add_argument('-v', '--version', action='version', version="%(prog)s " + version.version()) |
| |
| if len(sys.argv) == 1: |
| parser.print_help() |
| sys.exit(1) |
| |
| subparsers = parser.add_subparsers( |
| title="subcommands", |
| dest="subcommand", |
| help="") |
| |
| test_run_parser = subparsers.add_parser("run", help="Run a benchmark") |
| # change in favor of "list telemetry", "list workloads", "list pipelines" |
| list_parser = subparsers.add_parser("list", help="List configuration options") |
| list_parser.add_argument( |
| "configuration", |
| metavar="configuration", |
| help="The configuration for which the tool should show the available options. " |
| "Possible values are: telemetry, workloads, pipelines, test-runs, cluster-configs", |
| choices=["telemetry", "workloads", "pipelines", "test-runs", "aggregated-results", |
| "cluster-configs"]) |
| list_parser.add_argument( |
| "--limit", |
| help="Limit the number of search results for recent test-runs (default: 10).", |
| default=10, |
| ) |
| add_workload_source(list_parser) |
| |
| info_parser = subparsers.add_parser("info", help="Show info about a workload") |
| add_workload_source(info_parser) |
| info_parser.add_argument( |
| "--workload", |
| "-w", |
| help=f"Define the workload to use. List possible workloads with `{PROGRAM_NAME} list workloads`." |
| # we set the default value later on because we need to determine whether the user has provided this value. |
| # default="geonames" |
| ) |
| |
| info_parser.add_argument( |
| "--workload-params", |
| "-wp", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim to the workload as variables.", |
| default="" |
| ) |
| info_parser.add_argument( |
| "--test-procedure", |
| help=f"Define the test_procedure to use. List possible test_procedures for workloads with `{PROGRAM_NAME} list workloads`." |
| ) |
| info_task_filter_group = info_parser.add_mutually_exclusive_group() |
| info_task_filter_group.add_argument( |
| "--include-tasks", |
| help="Defines a comma-separated list of tasks to run. By default all tasks of a test_procedure are run.") |
| info_task_filter_group.add_argument( |
| "--exclude-tasks", |
| help="Defines a comma-separated list of tasks not to run. By default all tasks of a test_procedure are run.") |
| |
| synthetic_data_generator_parser = subparsers.add_parser("generate-data", |
| help="Generate synthetic data based on existing index mappings or custom module." + |
| "This data can be ported into Solr Orbit workloads." ) |
| |
| exclusive_file_inputs = synthetic_data_generator_parser.add_mutually_exclusive_group(required=True) |
| exclusive_file_inputs.add_argument( |
| "--index-mappings", |
| "-i", |
| help="Index mappings (JSON) to generate synthetic data from." |
| ) |
| exclusive_file_inputs.add_argument( |
| "--custom-module", |
| "-m", |
| help="Custom Python module that defines how to generate documents. " + |
| "It can contain function definitions and even class definitions. " + |
| "This gives users more granular control over how data is generated. " + |
| "This module must contain generate_synthetic_document() definition." |
| ) |
| |
| exclusive_params = synthetic_data_generator_parser.add_mutually_exclusive_group(required=True) |
| exclusive_params.add_argument( |
| "--total-size", |
| "-s", |
| type=int, |
| help="Total size in GB of synthetically generated data corpora" |
| ) |
| synthetic_data_generator_parser.add_argument( |
| "--index-name", |
| "-n", |
| required=True, |
| help="Index name associated with generated corpora" |
| ) |
| synthetic_data_generator_parser.add_argument( |
| "--output-path", |
| "-p", |
| default=os.path.join(os.getcwd(), "generated_corpora"), |
| help="Output path for data corpora. Data corpora will be written in a directory." |
| ) |
| synthetic_data_generator_parser.add_argument( |
| "--custom-config", |
| "-c", |
| default=None, |
| help="Optional config where users can specify overrides for mapping synthetic data generator or values that module should use." |
| ) |
| synthetic_data_generator_parser.add_argument( |
| "--test-document", |
| "-t", |
| default=False, |
| action="store_true", |
| help="Generates a single synthetic document and displays it to the console so that users can validate generated values and output." |
| ) |
| |
| create_workload_parser = subparsers.add_parser("create-workload", help="Create a workload from existing data") |
| create_workload_parser.add_argument( |
| "--workload", |
| "-w", |
| required=True, |
| help="Name of the generated workload") |
| create_workload_parser.add_argument( |
| "--indices", |
| "-i", |
| type=non_empty_list, |
| required=True, |
| help="Comma-separated list of indices to include in the workload") |
| create_workload_parser.add_argument( |
| "--target-hosts", |
| "-t", |
| default="", |
| required=True, |
| help="Comma-separated list of host:port pairs which should be targeted") |
| create_workload_parser.add_argument( |
| "--client-options", |
| "-c", |
| default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS, |
| help=f"Comma-separated list of client options to use. (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS})") |
| create_workload_parser.add_argument( |
| "--output-path", |
| default=os.path.join(os.getcwd(), "workloads"), |
| help="Workload output directory (default: workloads/)") |
| create_workload_parser.add_argument( |
| "--custom-queries", |
| type=argparse.FileType('r'), |
| help="Input JSON file to use containing custom workload queries that override the default match_all query") |
| create_workload_parser.add_argument( |
| "--number-of-docs", |
| action=opts.StoreKeyPairAsDict, |
| nargs='+', |
| metavar="KEY:VAL", |
| help="Map of index name and integer doc count to extract. Ensure that index name also exists in --indices parameter. " + |
| "To specify several indices and doc counts, use format: <index1>:<doc_count1> <index2>:<doc_count2> ...") |
| create_workload_parser.add_argument( |
| "--sample-frequency", |
| action=opts.StoreKeyPairAsDict, |
| nargs='+', |
| metavar="KEY:VAL", |
| help="Map of index name and an integer, representing the sample frequency of docs that should be extracted per index. " + |
| "Ensure that index name also exists in --indices parameter. " + |
| "To specify several indices and doc counts, use format: <index1>:<sample-frequency-1> <index2>:<sample-frequency-2> ...") |
| |
| convert_workload_parser = subparsers.add_parser( |
| "convert-workload", |
| help="Convert an OpenSearch Benchmark workload to Solr-native format" |
| ) |
| convert_workload_parser.add_argument( |
| "--workload-path", |
| required=True, |
| help="Path to the source OpenSearch Benchmark workload directory (must contain workload.json)." |
| ) |
| convert_workload_parser.add_argument( |
| "--output-path", |
| default=None, |
| help="Path where the converted Solr workload will be written " |
| "(default: <workload-path>-solr)." |
| ) |
| convert_workload_parser.add_argument( |
| "--force", |
| action="store_true", |
| default=False, |
| help="Overwrite an existing converted workload directory." |
| ) |
| |
| compare_parser = subparsers.add_parser("compare", help="Compare two test_runs") |
| compare_parser.add_argument( |
| "--baseline", |
| "-b", |
| required=True, |
| help=f"TestRun ID of the baseline (see {PROGRAM_NAME} list test-runs).") |
| compare_parser.add_argument( |
| "--contender", |
| "-c", |
| required=True, |
| help=f"TestRun ID of the contender (see {PROGRAM_NAME} list test-runs).") |
| compare_parser.add_argument( |
| "--percentiles", |
| help=f"A comma-separated list of percentiles to report latency and service time." |
| f"(default: {metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES}).", |
| default=metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES) |
| compare_parser.add_argument( |
| "--results-format", |
| help="Define the output format for the command line results (default: markdown).", |
| choices=["markdown", "csv"], |
| default="markdown") |
| compare_parser.add_argument( |
| "--results-numbers-align", |
| help="Define the output column number alignment for the command line results (default: right).", |
| choices=["right", "center", "left", "decimal"], |
| default="right") |
| compare_parser.add_argument( |
| "--results-file", |
| help="Write the command line results also to the provided file.", |
| default="") |
| compare_parser.add_argument( |
| "--show-in-results", |
| help="Whether to include the comparison in the results file.", |
| default=True) |
| |
| visualize_parser = subparsers.add_parser("visualize", help="Generate HTML visualization for a test run") |
| visualize_parser.add_argument( |
| "--test-run-id", |
| "-tid", |
| dest="test_run_id", |
| required=True, |
| help=f"TestRun ID to visualize (see {PROGRAM_NAME} list test_runs).") |
| visualize_parser.add_argument( |
| "--output-path", |
| dest="output_path", |
| help="Path where the HTML report should be saved. If not specified, it will be saved in the test run directory, where test_run.json can be found.", |
| default=None) |
| |
| aggregate_parser = subparsers.add_parser("aggregate", help="Aggregate multiple test-runs") |
| aggregate_parser.add_argument( |
| "--test-runs", |
| type=non_empty_list, |
| required=True, |
| help="Comma-separated list of TestRun IDs to aggregate") |
| aggregate_parser.add_argument( |
| "--test-runs-id", |
| "-tid", |
| help="Define a unique id for this aggregated test-run.", |
| default="") |
| aggregate_parser.add_argument( |
| "--results-file", |
| help="Write the aggregated results to the provided file.", |
| default="") |
| aggregate_parser.add_argument( |
| "--workload-repository", |
| help="Define the repository from where solr-orbit will load workloads (default: default).", |
| default="default") |
| |
| download_parser = subparsers.add_parser("download", help="Downloads an artifact") |
| download_parser.add_argument( |
| "--cluster-config-repository", |
| help="Define the repository from where solr-orbit will load cluster-configs (default: default).", |
| default="default") |
| download_parser.add_argument( |
| "--cluster-config-revision", |
| help="Define a specific revision in the cluster-config repository that solr-orbit should use.", |
| default=None) |
| download_parser.add_argument( |
| "--cluster-config-path", |
| help="Define the path to the cluster-config and plugin configurations to use.") |
| download_parser.add_argument( |
| "--distribution-version", |
| type=supported_os_version, |
| help="Define the version of the distribution to download. " |
| "Check https://projects.apache.org/project.html?solr for released versions.", |
| default="") |
| download_parser.add_argument( |
| "--distribution-repository", |
| help="Define the repository from where the distribution should be downloaded (default: release).", |
| default="release") |
| download_parser.add_argument( |
| "--cluster-config", |
| help=f"Define the cluster-config to use. List possible " |
| f"cluster-configs with `{PROGRAM_NAME} list " |
| f"cluster-configs` (default: defaults).", |
| default="defaults") # optimized for local usage |
| download_parser.add_argument( |
| "--cluster-config-params", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the cluster-config.", |
| default="" |
| ) |
| |
| install_parser = subparsers.add_parser("install", help="Installs a Solr node locally") |
| install_parser.add_argument( |
| "--revision", |
| help="Define the source code revision for building the benchmark candidate. 'current' uses the source tree as is," |
| " 'latest' fetches the latest version on main. It is also possible to specify a commit id or a timestamp." |
| " The timestamp must be specified as: \"@ts\" where \"ts\" must be a valid ISO 8601 timestamp, " |
| "e.g. \"@2013-07-27T10:37:00Z\" (default: current).", |
| default="current") # optimized for local usage, don't fetch sources |
| # Intentionally undocumented as we do not consider Docker a fully supported option. |
| install_parser.add_argument( |
| "--build-type", |
| help=argparse.SUPPRESS, |
| choices=["tar", "docker"], |
| default="tar") |
| install_parser.add_argument( |
| "--cluster-config-repository", |
| help="Define the repository from where solr-orbit will load cluster-configs (default: default).", |
| default="default") |
| install_parser.add_argument( |
| "--cluster-config-revision", |
| help="Define a specific revision in the cluster-config repository that solr-orbit should use.", |
| default=None) |
| install_parser.add_argument( |
| "--cluster-config-path", |
| help="Define the path to the cluster-config and plugin configurations to use.") |
| install_parser.add_argument( |
| "--runtime-jdk", |
| type=runtime_jdk, |
| help="The major version of the runtime JDK to use during installation.", |
| default=None) |
| install_parser.add_argument( |
| "--distribution-repository", |
| help="Define the repository from where the distribution should be downloaded (default: release).", |
| default="release") |
| install_parser.add_argument( |
| "--distribution-version", |
| type=supported_os_version, |
| help="Define the version of the distribution to download. " |
| "Check https://archive.apache.org/dist/solr/solr/ for released versions.", |
| default="") |
| install_parser.add_argument( |
| "--cluster-config", |
| help=f"Define the cluster-config to use. List possible " |
| f"cluster-configs with `{PROGRAM_NAME} list " |
| f"cluster-configs` (default: defaults).", |
| default="defaults") # optimized for local usage |
| install_parser.add_argument( |
| "--cluster-config-params", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the cluster-config.", |
| default="" |
| ) |
| install_parser.add_argument( |
| "--solr-modules", |
| help="Comma-separated list of Solr modules to enable (sets SOLR_MODULES). " |
| "Example: --solr-modules=analytics,extraction", |
| default="") |
| install_parser.add_argument( |
| "--plugin-params", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim to all plugins as variables.", |
| default="" |
| ) |
| install_parser.add_argument( |
| "--network-host", |
| help="The IP address to bind to and publish", |
| default="127.0.0.1" |
| ) |
| install_parser.add_argument( |
| "--http-port", |
| help="The port to expose for HTTP traffic", |
| default="38983" |
| ) |
| install_parser.add_argument( |
| "--node-name", |
| help="The name of this Solr node", |
| default="benchmark-node-0" |
| ) |
| install_parser.add_argument( |
| "--master-nodes", |
| help="A comma-separated list of the initial master node names", |
| default="" |
| ) |
| install_parser.add_argument( |
| "--seed-hosts", |
| help="A comma-separated list of the initial seed host IPs", |
| default="" |
| ) |
| |
| start_parser = subparsers.add_parser("start", help="Starts a Solr node locally") |
| start_parser.add_argument( |
| "--installation-id", |
| required=True, |
| help="The id of the installation to start", |
| # the default will be dynamically derived by |
| # test_run_orchestrator based on the |
| # presence / absence of other command line options |
| default="") |
| start_parser.add_argument( |
| "--test-run-id", |
| "-tid", |
| required=True, |
| help="Define a unique id for this test_run.", |
| default="") |
| start_parser.add_argument( |
| "--runtime-jdk", |
| type=runtime_jdk, |
| help="The major version of the runtime JDK to use.", |
| default=None) |
| start_parser.add_argument( |
| "--telemetry", |
| help=f"Enable the provided telemetry devices, provided as a comma-separated list. List possible telemetry " |
| f"devices with `{PROGRAM_NAME} list telemetry`.", |
| default="") |
| start_parser.add_argument( |
| "--telemetry-params", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim to the telemetry devices as parameters.", |
| default="" |
| ) |
| |
| stop_parser = subparsers.add_parser("stop", help="Stops a Solr node locally") |
| stop_parser.add_argument( |
| "--installation-id", |
| required=True, |
| help="The id of the installation to stop", |
| # the default will be dynamically derived by |
| # test_run_orchestrator based on the |
| # presence / absence of other command line options |
| default="") |
| stop_parser.add_argument( |
| "--preserve-install", |
| help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).", |
| default=preserve_install, |
| action="store_true") |
| |
| for p in [list_parser, test_run_parser]: |
| p.add_argument( |
| "--distribution-version", |
| type=supported_os_version, |
| help="Define the version of the distribution to download.", |
| default="") |
| p.add_argument( |
| "--cluster-config-path", |
| help="Define the path to the cluster-config and plugin configurations to use.") |
| p.add_argument( |
| "--cluster-config-repository", |
| help="Define repository from where solr-orbit will load cluster-configs (default: default).", |
| default="default") |
| p.add_argument( |
| "--cluster-config-revision", |
| help="Define a specific revision in the cluster-config repository that solr-orbit should use.", |
| default=None) |
| |
| test_run_parser.add_argument( |
| "--test-run-id", |
| "-tid", |
| help="Define a unique id for this test-run.", |
| default=str(uuid.uuid4())) |
| test_run_parser.add_argument( |
| "--pipeline", |
| help="Select the pipeline to run.", |
| # the default will be dynamically derived by |
| # test_run_orchestrator based on the |
| # presence / absence of other command line options |
| default="") |
| test_run_parser.add_argument( |
| "--revision", |
| help="Define the source code revision for building the benchmark candidate. 'current' uses the source tree as is," |
| " 'latest' fetches the latest version on main. It is also possible to specify a commit id or a timestamp." |
| " The timestamp must be specified as: \"@ts\" where \"ts\" must be a valid ISO 8601 timestamp, " |
| "e.g. \"@2013-07-27T10:37:00Z\" (default: current).", |
| default="current") # optimized for local usage, don't fetch sources |
| add_workload_source(test_run_parser) |
| test_run_parser.add_argument( |
| "--workload", |
| "-w", |
| help=f"Define the workload to use. List possible workloads with `{PROGRAM_NAME} list workloads`." |
| ) |
| test_run_parser.add_argument( |
| "--workload-params", |
| "-wp", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim to the workload as variables.", |
| default="" |
| ) |
| test_run_parser.add_argument( |
| "--test-procedure", |
| help=f"Define the test_procedure to use. List possible test_procedures for workloads with `{PROGRAM_NAME} list workloads`.") |
| test_run_parser.add_argument( |
| "--cluster-config", |
| help=f"Define the cluster-config to use. List possible " |
| f"cluster-configs with `{PROGRAM_NAME} list " |
| f"cluster-configs` (default: defaults).", |
| default="defaults") # optimized for local usage |
| test_run_parser.add_argument( |
| "--cluster-config-params", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the cluster-config.", |
| default="" |
| ) |
| test_run_parser.add_argument( |
| "--runtime-jdk", |
| type=runtime_jdk, |
| help="The major version of the runtime JDK to use.", |
| default=None) |
| test_run_parser.add_argument( |
| "--solr-modules", |
| help="Comma-separated list of Solr modules to enable (sets SOLR_MODULES). " |
| "Example: --solr-modules=analytics,extraction", |
| default="") |
| test_run_parser.add_argument( |
| "--plugin-params", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim to all plugins as variables.", |
| default="" |
| ) |
| test_run_parser.add_argument( |
| "--target-hosts", |
| "-t", |
| help="Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' " |
| "(default: localhost:9200).", |
| default="") # actually the default is pipeline specific and it is set later |
| test_run_parser.add_argument( |
| "--worker-ips", |
| help="Define a comma-separated list of hosts which should generate load (default: localhost).", |
| default="localhost") |
| test_run_parser.add_argument( |
| "--grpc-target-hosts", |
| help="Define a comma-separated list of host:port pairs for gRPC endpoints " |
| "(default: localhost:9400).", |
| default="") |
| test_run_parser.add_argument( |
| "--client-options", |
| "-c", |
| help=f"Define a comma-separated list of client options to use. The options will be passed to the benchmark " |
| f"client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).", |
| default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS) |
| test_run_parser.add_argument("--on-error", |
| choices=["continue", "abort"], |
| help="Controls how solr-orbit behaves on response errors (default: continue).", |
| default="continue") |
| test_run_parser.add_argument( |
| "--telemetry", |
| help=f"Enable the provided telemetry devices, provided as a comma-separated list. List possible telemetry " |
| f"devices with `{PROGRAM_NAME} list telemetry`.", |
| default="") |
| test_run_parser.add_argument( |
| "--telemetry-params", |
| help="Define a comma-separated list of key:value pairs that are injected verbatim to the telemetry devices as parameters.", |
| default="" |
| ) |
| test_run_parser.add_argument( |
| "--distribution-repository", |
| help="Define the repository from where the distribution should be downloaded (default: release).", |
| default="release") |
| |
| task_filter_group = test_run_parser.add_mutually_exclusive_group() |
| task_filter_group.add_argument( |
| "--include-tasks", |
| help="Defines a comma-separated list of tasks to run. By default all tasks of a test_procedure are run.") |
| task_filter_group.add_argument( |
| "--exclude-tasks", |
| help="Defines a comma-separated list of tasks not to run. By default all tasks of a test_procedure are run.") |
| test_run_parser.add_argument( |
| "--user-tag", |
| help="Define a user-specific key-value pair (separated by ':'). It is added to each metric record as meta info. " |
| "Example: intention:baseline-ticket-12345", |
| default="") |
| test_run_parser.add_argument( |
| "--results-format", |
| help="Define the output format for the command line results (default: markdown).", |
| choices=["markdown", "csv"], |
| default="markdown") |
| test_run_parser.add_argument( |
| "--results-numbers-align", |
| help="Define the output column number alignment for the command line results (default: right).", |
| choices=["right", "center", "left", "decimal"], |
| default="right") |
| test_run_parser.add_argument( |
| "--show-in-results", |
| help="Define which values are shown in the summary results published (default: available).", |
| choices=["available", "all-percentiles", "all"], |
| default="available") |
| test_run_parser.add_argument( |
| "--results-file", |
| help="Write the command line results also to the provided file.", |
| default="") |
| test_run_parser.add_argument( |
| "--preserve-install", |
| help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).", |
| default=preserve_install, |
| action="store_true") |
| test_run_parser.add_argument( |
| "--test-mode", |
| help="Runs the given workload in 'test mode'. Meant to check a workload for errors but not for real benchmarks (default: false).", |
| default=False, |
| action="store_true") |
| test_run_parser.add_argument( |
| "--enable-worker-coordinator-profiling", |
| help="Enables a profiler for analyzing the performance of calls in solr-orbit's worker coordinator (default: false).", |
| default=False, |
| action="store_true") |
| test_run_parser.add_argument( |
| "--enable-assertions", |
| help="Enables assertion checks for tasks (default: false).", |
| default=False, |
| action="store_true") |
| test_run_parser.add_argument( |
| "--kill-running-processes", |
| "-k", |
| action="store_true", |
| default=False, |
| help="If any processes is running, it is going to kill them and allow solr-orbit to continue to run." |
| ) |
| test_run_parser.add_argument( |
| "--latency-percentiles", |
| help=f"A comma-separated list of percentiles to report for latency " |
| f"(default: {metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES}).", |
| default=metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES |
| ) |
| test_run_parser.add_argument( |
| "--throughput-percentiles", |
| help=f"A comma-separated list of percentiles to report for throughput, in addition to mean/median/max/min " |
| f"(default: {metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES}).", |
| default=metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES |
| ) |
| test_run_parser.add_argument( |
| "--randomization-enabled", |
| help="Runs the given workload with query randomization enabled (default: false).", |
| default=False, |
| action="store_true") |
| test_run_parser.add_argument( |
| "--randomization-repeat-frequency", |
| help=f"The repeat_frequency for query randomization. Ignored if randomization is off" |
| f"(default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_RF}).", |
| default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_RF) |
| test_run_parser.add_argument( |
| "--randomization-n", |
| help=f"The number of standard values to generate for each field for query randomization." |
| f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_N}).", |
| default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_N) |
| test_run_parser.add_argument( |
| "--randomization-alpha", |
| help=f"The alpha parameter used for the Zipf distribution for query randomization. Low values spread the distribution out, " |
| f"high values favor the most common queries. " |
| f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA}).", |
| default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA) |
| test_run_parser.add_argument( |
| "--test-iterations", |
| help="The number of times to run the workload (default: 1).", |
| default=1) |
| test_run_parser.add_argument( |
| "--aggregate", |
| type=lambda x: (str(x).lower() in ['true', '1', 'yes', 'y']), |
| help="Aggregate the results of multiple test runs (default: true).", |
| default=True) |
| test_run_parser.add_argument( |
| "--sleep-timer", |
| help="Sleep for the specified number of seconds before starting the next test run (default: 5).", |
| default=5) |
| test_run_parser.add_argument( |
| "--cancel-on-error", |
| action="store_true", |
| help="Stop running tests if an error occurs in one of the test iterations (default: false).", |
| ) |
| test_run_parser.add_argument( |
| "--load-test-qps", |
| help="Run a load test on your cluster, up to a certain QPS value (default: 0)", |
| default=0 |
| ) |
| test_run_parser.add_argument( |
| "--redline-test", |
| help="Run a redline test on your cluster, up to a certain QPS value (default: 1000)", |
| nargs='?', |
| const=1000, # Value to use when flag is present but no value given |
| default=0, # Value to use when flag is not present |
| type=int |
| ) |
| test_run_parser.add_argument( |
| "--redline-scale-step", |
| type=int, |
| help="How many clients to add while scaling up during redline testing (default: 5).", |
| default=None |
| ) |
| test_run_parser.add_argument( |
| "--redline-scaledown-percentage", |
| type=float, |
| help="What percentage of clients to remove when errors occur (default: 10%%).", |
| default=None |
| ) |
| test_run_parser.add_argument( |
| "--redline-post-scaledown-sleep", |
| type=int, |
| help="How many seconds to wait before scaling up again after a scale down (default: 30).", |
| default=None |
| ) |
| test_run_parser.add_argument( |
| "--redline-max-clients", |
| type=int, |
| help="Maximum number of clients to allow during redline testing. If not set, will default to clients defined in the test procedure.", |
| default=None |
| ) |
| test_run_parser.add_argument( |
| "--redline-max-cpu-usage", |
| type=int, |
| help="Maximum CPU utilization before scaling back client numbers. Used to activate CPU-based feedback in solr-orbit.", |
| default=None |
| ) |
| test_run_parser.add_argument( |
| "--redline-cpu-window-seconds", |
| type=int, |
| help="How many seconds the window for average CPU load should be in seconds during CPU-based redline testing. (Default: 30)", |
| default=None |
| ) |
| test_run_parser.add_argument( |
| "--redline-cpu-check-interval", |
| type=int, |
| help="How many seconds between CPU checks there should be during CPU-based redline testing. (Default: 30)", |
| default=None |
| ) |
| test_run_parser.add_argument( |
| "--visualize", |
| help="Generate HTML visualizations for benchmark results. Stored in the test runs directory by default", |
| action="store_true", |
| default=False |
| ) |
| test_run_parser.add_argument( |
| "--visualize-output-path", |
| help="Path where the HTML visualization should be saved when --visualize is enabled. If not specified, it will be saved in the test run directory.", |
| default=None |
| ) |
| |
| ############################################################################### |
| # |
| # The options below are undocumented and can be removed or changed at any time. |
| # |
| ############################################################################### |
| # This option is intended to tell solr-orbit to assume a different start date than 'now'. This is effectively just useful for things like |
| # backtesting or a benchmark run across environments (think: comparison of EC2 and bare metal) but never for the typical user. |
| test_run_parser.add_argument( |
| "--effective-start-date", |
| help=argparse.SUPPRESS, |
| type=lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"), |
| default=None) |
| # Skips checking that the REST API is available before proceeding with the benchmark |
| test_run_parser.add_argument( |
| "--skip-rest-api-check", |
| help=argparse.SUPPRESS, |
| action="store_true", |
| default=False) |
| |
| for p in [list_parser, test_run_parser, compare_parser, aggregate_parser, |
| download_parser, install_parser, start_parser, stop_parser, info_parser, |
| synthetic_data_generator_parser, create_workload_parser, visualize_parser, |
| convert_workload_parser]: |
| # This option is needed to support a separate configuration for the integration tests on the same machine |
| p.add_argument( |
| "--configuration-name", |
| help=argparse.SUPPRESS, |
| default=None) |
| p.add_argument( |
| "--quiet", |
| help="Suppress as much as output as possible (default: false).", |
| default=False, |
| action="store_true") |
| p.add_argument( |
| "--offline", |
| help="Assume that solr-orbit has no connection to the Internet (default: false).", |
| default=False, |
| action="store_true") |
| |
| return parser |
| |
| |
| def dispatch_list(cfg): |
| what = cfg.opts("system", "list.config.option") |
| if what == "telemetry": |
| telemetry.list_telemetry() |
| elif what == "workloads": |
| workload.list_workloads(cfg) |
| elif what == "pipelines": |
| test_run_orchestrator.list_pipelines() |
| elif what == "test-runs": |
| metrics.list_test_runs(cfg) |
| elif what == "aggregated-results": |
| metrics.list_aggregated_results(cfg) |
| elif what == "cluster-configs": |
| cluster_config.list_cluster_configs(cfg) |
| else: |
| raise exceptions.SystemSetupError("Cannot list unknown configuration option [%s]" % what) |
| |
| def dispatch_visualize(cfg): |
| test_run_id = cfg.opts("system", "test_run.id") |
| output_path = cfg.opts("visualize", "output.path", mandatory=False, default_value=None) |
| store = metrics.test_run_store(cfg) |
| |
| try: |
| # load the run |
| te = store.find_by_test_run_id(test_run_id) |
| |
| # render, write, and open the HTML |
| html_path = ( |
| store.file_store.store_html_results(te) |
| if isinstance(store, metrics.CompositeTestRunStore) |
| else store.store_html_results(te) |
| ) |
| |
| # if the user asked for --output-path, just copy the file there |
| if output_path: |
| dest = os.path.expanduser(output_path) |
| os.makedirs(os.path.dirname(dest), exist_ok=True) |
| shutil.copy2(html_path, dest) |
| console.info(f"HTML report copied to: {dest}") |
| |
| except exceptions.NotFound: |
| raise exceptions.SystemSetupError(f"No test run with id [{test_run_id}]") |
| except Exception as e: |
| raise exceptions.SystemSetupError(f"Error visualizing test run: {e}") |
| |
| def print_help_on_errors(): |
| heading = "Getting further help:" |
| console.println(console.format.bold(heading)) |
| console.println(console.format.underline_for(heading)) |
| console.println(f"* Check the log files in {paths.logs()} for errors.") |
| console.println(f"* Read the documentation at {console.format.link(doc_link())}.") |
| console.println(f"* Ask a question on the forum at {console.format.link(FORUM_LINK)}.") |
| console.println(f"* Raise an issue in the project issue tracker " |
| f"and include the log files in {paths.logs()}.") |
| |
| |
| def run_test(cfg, kill_running_processes=False): |
| logger = logging.getLogger(__name__) |
| |
| if kill_running_processes: |
| logger.info("Killing running solr-orbit processes") |
| |
| # Kill any lingering solr-orbit processes before attempting to continue - the actor system needs to be a singleton on this machine |
| # noinspection PyBroadException |
| try: |
| process.kill_running_benchmark_instances() |
| except BaseException: |
| logger.exception( |
| "Could not terminate potentially running solr-orbit instances correctly. Attempting to go on anyway.") |
| else: |
| other_benchmark_processes = process.find_all_other_benchmark_processes() |
| if other_benchmark_processes: |
| pids = [p.pid for p in other_benchmark_processes] |
| |
| msg = f"There are other solr-orbit processes running on this machine (PIDs: {pids}) but only one " \ |
| f"benchmark is allowed to run at the same time.\n\nYou can use --kill-running-processes flag " \ |
| f"to kill running processes automatically and allow solr-orbit to continue to run a new benchmark. " \ |
| f"Otherwise, you need to manually kill them." |
| raise exceptions.BenchmarkError(msg) |
| |
| # redline testing: check metrics store type before running cpu based feedback test |
| cpu_max = cfg.opts("workload", "redline.max_cpu_usage", default_value=None, mandatory=False) |
| if cpu_max is not None: |
| store = metrics.metrics_store(cfg, read_only=False) |
| try: |
| if isinstance(store, metrics.InMemoryMetricsStore): |
| raise exceptions.SystemSetupError( |
| "CPU-based feedback requires a metrics store, but you're using the in-memory store. " |
| "Specify a metrics store in your benchmark.ini or via CLI to continue." |
| ) |
| finally: |
| store.close() |
| |
| with_actor_system(test_run_orchestrator.run, cfg) |
| |
| |
| def with_actor_system(runnable, cfg): |
| logger = logging.getLogger(__name__) |
| already_running = actor.actor_system_already_running() |
| logger.info("Actor system already running locally? [%s]", str(already_running)) |
| try: |
| actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) |
| # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 |
| cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) |
| # This happens when the admin process could not be started, e.g. because it could not open a socket. |
| except thespian.actors.InvalidActorAddress: |
| logger.info("Falling back to offline actor system.") |
| actor.use_offline_actor_system() |
| actors = actor.bootstrap_actor_system(try_join=False, prefer_local_only=True) |
| except Exception as e: |
| logger.exception("Could not bootstrap actor system.") |
| if str(e) == "Unable to determine valid external socket address.": |
| console.warn("Could not determine a socket address. Are you running without any network? Switching to degraded mode.", |
| logger=logger) |
| logger.info("Falling back to offline actor system.") |
| actor.use_offline_actor_system() |
| actors = actor.bootstrap_actor_system(try_join=False, prefer_local_only=True) |
| else: |
| raise |
| try: |
| runnable(cfg) |
| finally: |
| # We only shutdown the actor system if it was not already running before |
| if not already_running: |
| shutdown_complete = False |
| times_interrupted = 0 |
| # give some time for any outstanding messages to be delivered to the actor system |
| time.sleep(3) |
| while not shutdown_complete and times_interrupted < 2: |
| try: |
| logger.info("Attempting to shutdown internal actor system.") |
| actors.shutdown() |
| # note that this check will only evaluate to True for a TCP-based actor system. |
| timeout = 15 |
| while actor.actor_system_already_running() and timeout > 0: |
| logger.info("Actor system is still running. Waiting...") |
| time.sleep(1) |
| timeout -= 1 |
| if timeout > 0: |
| shutdown_complete = True |
| logger.info("Shutdown completed.") |
| else: |
| logger.warning("Shutdown timed out. Actor system is still running.") |
| break |
| except KeyboardInterrupt: |
| times_interrupted += 1 |
| logger.warning("User interrupted shutdown of internal actor system.") |
| console.info("Please wait a moment for solr-orbit's internal components to shutdown.") |
| if not shutdown_complete and times_interrupted > 0: |
| logger.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", |
| times_interrupted) |
| console.println("") |
| console.warn("Terminating now at the risk of leaving child processes behind.") |
| console.println("") |
| console.warn("The next test_run may fail due to an unclean shutdown.") |
| console.println("") |
| console.println(SKULL) |
| console.println("") |
| elif not shutdown_complete: |
| console.warn("Could not terminate all internal processes within timeout. Please check and force-terminate " |
| "all solr-orbit processes.") |
| |
| |
| |
| def configure_telemetry_params(args, cfg): |
| cfg.add(config.Scope.applicationOverride, "telemetry", "devices", opts.csv_to_list(args.telemetry)) |
| cfg.add(config.Scope.applicationOverride, "telemetry", "params", opts.to_dict(args.telemetry_params)) |
| |
| |
| def configure_workload_params(arg_parser, args, cfg, command_requires_workload=True): |
| cfg.add(config.Scope.applicationOverride, "workload", "repository.revision", args.workload_revision) |
| # We can assume here that if a workload-path is given, the user did not specify a repository either (although argparse sets it to |
| # its default value) |
| if args.workload_path: |
| cfg.add(config.Scope.applicationOverride, "workload", "workload.path", os.path.abspath(io.normalize_path(args.workload_path))) |
| cfg.add(config.Scope.applicationOverride, "workload", "repository.name", None) |
| if args.workload_revision: |
| # stay as close as possible to argparse errors although we have a custom validation. |
| arg_parser.error("argument --workload-revision not allowed with argument --workload-path") |
| if command_requires_workload and args.workload: |
| # stay as close as possible to argparse errors although we have a custom validation. |
| arg_parser.error("argument --workload not allowed with argument --workload-path") |
| else: |
| cfg.add(config.Scope.applicationOverride, "workload", "repository.name", args.workload_repository) |
| if command_requires_workload: |
| if not args.workload: |
| raise arg_parser.error("argument --workload is required") |
| cfg.add(config.Scope.applicationOverride, "workload", "workload.name", args.workload) |
| |
| if command_requires_workload: |
| cfg.add(config.Scope.applicationOverride, "workload", "params", opts.to_dict(args.workload_params)) |
| cfg.add(config.Scope.applicationOverride, "workload", "test_procedure.name", args.test_procedure) |
| cfg.add(config.Scope.applicationOverride, "workload", "include.tasks", opts.csv_to_list(args.include_tasks)) |
| cfg.add(config.Scope.applicationOverride, "workload", "exclude.tasks", opts.csv_to_list(args.exclude_tasks)) |
| |
| |
| def configure_builder_params(args, cfg, command_requires_cluster_config=True): |
| if args.cluster_config_path: |
| cfg.add( |
| config.Scope.applicationOverride, "builder", |
| "cluster_config.path", os.path.abspath( |
| io.normalize_path(args.cluster_config_path))) |
| cfg.add(config.Scope.applicationOverride, "builder", "repository.name", None) |
| cfg.add(config.Scope.applicationOverride, "builder", "repository.revision", None) |
| else: |
| cfg.add(config.Scope.applicationOverride, "builder", "repository.name", args.cluster_config_repository) |
| cfg.add(config.Scope.applicationOverride, "builder", "repository.revision", args.cluster_config_revision) |
| |
| if command_requires_cluster_config: |
| if args.distribution_version: |
| cfg.add(config.Scope.applicationOverride, "builder", "distribution.version", args.distribution_version) |
| cfg.add(config.Scope.applicationOverride, "builder", "distribution.repository", args.distribution_repository) |
| cfg.add(config.Scope.applicationOverride, "builder", |
| "cluster_config.names", opts.csv_to_list( |
| args.cluster_config)) |
| cfg.add(config.Scope.applicationOverride, "builder", |
| "cluster_config.params", opts.to_dict( |
| args.cluster_config_params)) |
| cfg.add(config.Scope.applicationOverride, "solr", "modules", getattr(args, "solr_modules", "")) |
| pipeline = getattr(args, "pipeline", None) |
| if pipeline == "benchmark-only" and args.cluster_config != "defaults": |
| raise SystemExit( |
| "ERROR: --cluster-config is only valid for provisioning pipelines " |
| "(from-distribution, docker, from-sources). " |
| "It cannot be used with the 'benchmark-only' pipeline." |
| ) |
| |
| |
| def configure_connection_params(arg_parser, args, cfg): |
| # Check if multiple hosts are specified using comma separator |
| if args.target_hosts and "," in args.target_hosts: |
| console.warn( |
| "WARNING: Benchmark runs with multiple target hosts should be passed in as a JSON file such as:\n" |
| "{\n" |
| ' "default": [\n' |
| ' {"host": "127.0.0.1", "port": 9200} # Specify nodes for cluster 1\n' |
| " ],\n" |
| ' "remote":[\n' |
| ' {"host": "10.127.0.3", "port": 9200} # Specify nodes for cluster 2\n' |
| " ]\n" |
| "}" |
| ) |
| # Also needed by builder (-> telemetry) - duplicate by module? |
| target_hosts = opts.TargetHosts(args.target_hosts) |
| cfg.add(config.Scope.applicationOverride, "client", "hosts", target_hosts) |
| client_options = opts.ClientOptions(args.client_options, target_hosts=target_hosts) |
| cfg.add(config.Scope.applicationOverride, "client", "options", client_options) |
| |
| # Configure gRPC target hosts |
| grpc_target_hosts = opts.TargetHosts(args.grpc_target_hosts) if hasattr(args, "grpc_target_hosts") and args.grpc_target_hosts else None |
| cfg.add(config.Scope.applicationOverride, "client", "grpc_hosts", grpc_target_hosts) |
| if "timeout" not in client_options.default: |
| console.info("You did not provide an explicit timeout in the client options. Assuming default of 10 seconds.") |
| if list(target_hosts.all_hosts) != list(client_options.all_client_options): |
| arg_parser.error("--target-hosts and --client-options must define the same keys for multi cluster setups.") |
| |
| |
| def configure_reporting_params(args, cfg): |
| cfg.add(config.Scope.applicationOverride, "reporting", "format", args.results_format) |
| cfg.add(config.Scope.applicationOverride, "reporting", "values", args.show_in_results) |
| cfg.add(config.Scope.applicationOverride, "reporting", "output.path", args.results_file) |
| cfg.add(config.Scope.applicationOverride, "reporting", "numbers.align", args.results_numbers_align) |
| |
| def prepare_test_runs_dict(args, cfg): |
| cfg.add(config.Scope.applicationOverride, "reporting", "output.path", args.results_file) |
| test_runs_dict = {} |
| if args.test_runs: |
| for run in args.test_runs: |
| run = run.strip() |
| if run: |
| test_runs_dict[run] = None |
| return test_runs_dict |
| |
| def configure_test(arg_parser, args, cfg): |
| # As the run command is doing more work than necessary at the moment, we duplicate several parameters |
| # in this section that actually belong to dedicated subcommands (like install, start or stop). Over time |
| # these duplicated parameters will vanish as we move towards dedicated subcommands and use "run" only |
| # to run the actual benchmark (i.e. generating load). |
| print_test_run_id(args) |
| if args.effective_start_date: |
| cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date) |
| cfg.add(config.Scope.applicationOverride, "system", "test_run.id", args.test_run_id) |
| # use the test-run id implicitly also as the install id. |
| cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_run_id) |
| cfg.add(config.Scope.applicationOverride, "test_run", "pipeline", args.pipeline) |
| cfg.add(config.Scope.applicationOverride, "test_run", "user.tag", args.user_tag) |
| cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling) |
| cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions) |
| cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error) |
| cfg.add( |
| config.Scope.applicationOverride, |
| "worker_coordinator", |
| "worker_ips", |
| opts.csv_to_list(args.worker_ips)) |
| cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) |
| cfg.add(config.Scope.applicationOverride, "workload", "load.test.clients", int(args.load_test_qps)) |
| if args.redline_test: |
| cfg.add(config.Scope.applicationOverride, "workload", "redline.test", int(args.redline_test)) |
| cfg.add(config.Scope.applicationOverride, "workload", "redline.scale_step", args.redline_scale_step) |
| cfg.add(config.Scope.applicationOverride, "workload", "redline.scale_down_pct", args.redline_scaledown_percentage) |
| cfg.add(config.Scope.applicationOverride, "workload", "redline.sleep_seconds", args.redline_post_scaledown_sleep) |
| cfg.add(config.Scope.applicationOverride, "workload", "redline.max_clients", args.redline_max_clients) |
| cfg.add(config.Scope.applicationOverride, "workload", "redline.max_cpu_usage", args.redline_max_cpu_usage) |
| cfg.add(config.Scope.applicationOverride, "workload", "redline.cpu_window_seconds", args.redline_cpu_window_seconds) |
| cfg.add(config.Scope.applicationOverride, "workload", "redline.cpu_check_interval", args.redline_cpu_check_interval) |
| cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles) |
| cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles) |
| cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled) |
| cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency) |
| cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n) |
| cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha) |
| cfg.add(config.Scope.applicationOverride, "workload", "visualize", args.visualize) |
| cfg.add(config.Scope.applicationOverride, "workload", "visualize.output.path", args.visualize_output_path) |
| configure_workload_params(arg_parser, args, cfg) |
| configure_connection_params(arg_parser, args, cfg) |
| configure_telemetry_params(args, cfg) |
| configure_builder_params(args, cfg) |
| cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) |
| cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision) |
| # cfg.add(config.Scope.applicationOverride, "builder", |
| # "cluster_config_instance.plugins", opts.csv_to_list( |
| # args.opensearch_plugins)) |
| cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params)) |
| cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install)) |
| cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check)) |
| |
| configure_reporting_params(args, cfg) |
| |
| def print_test_run_id(args): |
| console.info(f"[Test Run ID]: {args.test_run_id}") |
| |
| def dispatch_sub_command(arg_parser, args, cfg): |
| sub_command = args.subcommand |
| |
| cfg.add(config.Scope.application, "system", "quiet.mode", args.quiet) |
| cfg.add(config.Scope.application, "system", "offline.mode", args.offline) |
| |
| try: |
| if sub_command == "compare": |
| configure_reporting_params(args, cfg) |
| cfg.add(config.Scope.applicationOverride, "reporting", "percentiles", args.percentiles) |
| publisher.compare(cfg, args.baseline, args.contender) |
| elif sub_command == "aggregate": |
| test_runs_dict = prepare_test_runs_dict(args, cfg) |
| aggregator_instance = aggregator.Aggregator(cfg, test_runs_dict, args) |
| aggregator_instance.aggregate() |
| elif sub_command == "list": |
| cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration) |
| cfg.add(config.Scope.applicationOverride, "system", "list.test_runs.max_results", args.limit) |
| configure_builder_params(args, cfg, command_requires_cluster_config=False) |
| configure_workload_params(arg_parser, args, cfg, command_requires_workload=False) |
| dispatch_list(cfg) |
| elif sub_command == "download": |
| configure_builder_params(args, cfg) |
| builder.download(cfg) |
| elif sub_command == "install": |
| cfg.add(config.Scope.applicationOverride, "system", "install.id", str(uuid.uuid4())) |
| cfg.add(config.Scope.applicationOverride, "builder", "network.host", args.network_host) |
| cfg.add(config.Scope.applicationOverride, "builder", "network.http.port", args.http_port) |
| cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision) |
| cfg.add(config.Scope.applicationOverride, "builder", "build.type", args.build_type) |
| cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) |
| cfg.add(config.Scope.applicationOverride, "builder", "node.name", args.node_name) |
| cfg.add(config.Scope.applicationOverride, "builder", "master.nodes", opts.csv_to_list(args.master_nodes)) |
| cfg.add(config.Scope.applicationOverride, "builder", "seed.hosts", opts.csv_to_list(args.seed_hosts)) |
| # cfg.add(config.Scope.applicationOverride, "builder", |
| # "cluster_config.plugins", opts.csv_to_list( |
| # args.opensearch_plugins)) |
| cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params)) |
| configure_builder_params(args, cfg) |
| builder.install(cfg) |
| elif sub_command == "start": |
| print_test_run_id(args) |
| cfg.add(config.Scope.applicationOverride, "system", "test_run.id", args.test_run_id) |
| cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id) |
| cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) |
| configure_telemetry_params(args, cfg) |
| builder.start(cfg) |
| elif sub_command == "stop": |
| cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install)) |
| cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id) |
| builder.stop(cfg) |
| elif sub_command == "run": |
| iterations = int(args.test_iterations) |
| if iterations > 1: |
| test_runs = [] |
| for _ in range(iterations): |
| try: |
| configure_test(arg_parser, args, cfg) |
| run_test(cfg, args.kill_running_processes) |
| time.sleep(int(args.sleep_timer)) |
| test_runs.append(args.test_run_id) |
| args.test_run_id = str(uuid.uuid4()) |
| except Exception as e: |
| console.error(f"Error occurred during test run {_+1}: {str(e)}") |
| if args.cancel_on_error: |
| console.info("Cancelling remaining test runs.") |
| break |
| |
| if args.aggregate: |
| args.test_runs = test_runs |
| test_runs_dict = prepare_test_runs_dict(args, cfg) |
| aggregator_instance = aggregator.Aggregator(cfg, test_runs_dict, args) |
| aggregator_instance.aggregate() |
| elif args.test_iterations == 1: |
| configure_test(arg_parser, args, cfg) |
| run_test(cfg, args.kill_running_processes) |
| else: |
| console.info("Please enter a valid number of test iterations") |
| elif sub_command == "generate-data": |
| cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "index_name", args.index_name) |
| cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "index_mappings", args.index_mappings) |
| cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "custom_module", args.custom_module) |
| cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "custom_config", args.custom_config) |
| cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "output_path", args.output_path) |
| cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "total_size", args.total_size) |
| cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "test_document", args.test_document) |
| |
| synthetic_data_generator_orchestrator.orchestrate_data_generation(cfg) |
| |
| elif sub_command == "create-workload": |
| cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices) |
| cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs) |
| cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path) |
| cfg.add(config.Scope.applicationOverride, "workload", "workload.name", args.workload) |
| cfg.add(config.Scope.applicationOverride, "workload", "custom_queries", args.custom_queries) |
| cfg.add(config.Scope.applicationOverride, "generator", "sample_frequency", args.sample_frequency) |
| configure_connection_params(arg_parser, args, cfg) |
| |
| workload_generator.create_workload(cfg) |
| elif sub_command == "visualize": |
| cfg.add(config.Scope.applicationOverride, "system", "test_run.id", args.test_run_id) |
| cfg.add(config.Scope.applicationOverride, "visualize", "output.path", args.output_path) |
| # Always set visualize to true for the visualize command |
| cfg.add(config.Scope.applicationOverride, "workload", "visualize", True) |
| dispatch_visualize(cfg) |
| elif sub_command == "convert-workload": |
| from solrorbit.conversion import workload_converter |
| source_dir = os.path.abspath(args.workload_path) |
| output_dir = os.path.abspath(args.output_path) if args.output_path else source_dir.rstrip("/") + "-solr" |
| force = getattr(args, "force", False) |
| |
| if workload_converter.is_already_converted(output_dir) and not force: |
| console.info( |
| f"Workload already converted at: {output_dir}\n" |
| "Use --force to overwrite." |
| ) |
| return True |
| |
| console.info(f"Converting workload: {source_dir} → {output_dir}") |
| result = workload_converter.convert_opensearch_workload(source_dir, output_dir) |
| |
| console.println(f"\nConversion complete: {result['output_dir']}") |
| if result["skipped"]: |
| console.println(f" Skipped operations ({len(result['skipped'])}): {', '.join(result['skipped'])}") |
| if result["issues"]: |
| console.println(f" Issues ({len(result['issues'])}):") |
| for issue in result["issues"]: |
| console.println(f" - {issue}") |
| console.println(f" See {os.path.join(result['output_dir'], workload_converter.CONVERTED_MARKER)} for details.") |
| elif sub_command == "info": |
| configure_workload_params(arg_parser, args, cfg) |
| workload.workload_info(cfg) |
| else: |
| raise exceptions.SystemSetupError(f"Unknown subcommand [{sub_command}]") |
| return True |
| except exceptions.BenchmarkError as e: |
| logging.getLogger(__name__).exception("Cannot run subcommand [%s].", sub_command) |
| msg = str(e.message) |
| nesting = 0 |
| while hasattr(e, "cause") and e.cause: |
| nesting += 1 |
| e = e.cause |
| if hasattr(e, "message"): |
| msg += "\n%s%s" % ("\t" * nesting, e.message) |
| else: |
| msg += "\n%s%s" % ("\t" * nesting, str(e)) |
| |
| console.error(f"❌ Cannot {sub_command}. {msg}") |
| console.println("") |
| print_help_on_errors() |
| return False |
| except BaseException as e: |
| logging.getLogger(__name__).exception("A fatal error occurred while running subcommand [%s].", sub_command) |
| console.error(f"❌ Cannot {sub_command}. {e}.") |
| console.println("") |
| print_help_on_errors() |
| return False |
| |
| |
| def handle_command_suggestions(): |
| """ |
| Check for common command mistakes and provide helpful suggestions |
| Returns True if suggestion was provided, False otherwise |
| """ |
| # check-deprecated-terms-disable-1x |
| DEPRECATED_SUBCOMMANDS = ["execute-test", "execute"] |
| if len(sys.argv) > 1 and sys.argv[1] in DEPRECATED_SUBCOMMANDS: |
| console.info("Did you mean 'run'?") |
| console.info(f"Example: {PROGRAM_NAME} run --workload=geonames --test-mode") |
| console.info(f"For more information, run: {PROGRAM_NAME} run --help") |
| return True |
| return False |
| |
| |
| def main(): |
| check_python_version() |
| log.install_default_log_config() |
| log.configure_logging() |
| logger = logging.getLogger(__name__) |
| start = time.time() |
| |
| # Early init of console output so we start to show everything consistently. |
| console.init(quiet=False) |
| |
| # Handle command suggestions before argument parsing |
| if handle_command_suggestions(): |
| sys.exit(1) |
| |
| arg_parser = create_arg_parser() |
| args = arg_parser.parse_args() |
| |
| console.init(quiet=args.quiet) |
| console.println(BANNER) |
| |
| cfg = config.Config(config_name=args.configuration_name) |
| if not cfg.config_present(): |
| cfg.install_default_config() |
| cfg.load_config(auto_upgrade=True) |
| cfg.add(config.Scope.application, "system", "time.start", datetime.datetime.utcnow()) |
| # Local config per node |
| cfg.add(config.Scope.application, "node", "benchmark.root", paths.benchmark_root()) |
| cfg.add(config.Scope.application, "node", "benchmark.cwd", os.getcwd()) |
| |
| logger.info("OS [%s]", str(platform.uname())) |
| logger.info("Python [%s]", str(sys.implementation)) |
| logger.info("solr-orbit version [%s]", version.version()) |
| logger.debug("Command line arguments: %s", args) |
| # Configure networking |
| net.init() |
| if not args.offline: |
| probing_url = cfg.opts("system", "probing.url", default_value="https://github.com", mandatory=False) |
| if not net.has_internet_connection(probing_url): |
| console.warn("No Internet connection detected. Automatic download of workload data sets etc. is disabled.", |
| logger=logger) |
| cfg.add(config.Scope.applicationOverride, "system", "offline.mode", True) |
| else: |
| logger.info("Detected a working Internet connection.") |
| |
| success = dispatch_sub_command(arg_parser, args, cfg) |
| |
| end = time.time() |
| if success: |
| message = "✅ SUCCESS" |
| status = 0 |
| else: |
| message = "❌ FAILURE" |
| status = 64 |
| message = message + f" (took {int(round(end - start))} seconds)" |
| console.println("") |
| console.info(message, overline="-", underline="-") |
| if hasattr(args, "results_file") and args.results_file and getattr(args, "results_format", None) != "csv": |
| with open(args.results_file, "a") as fh: |
| print("\n", message, file=fh) |
| |
| sys.exit(status) |
| |
| |
| if __name__ == "__main__": |
| main() |