blob: 574991e373c85de88a140cfedbf14f022667bb00 [file]
# SPDX-License-Identifier: Apache-2.0
#
# Modifications by Apache Solr contributors; see git log for details.
# Licensed under the Apache License, Version 2.0.
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import datetime
import logging
import os
import platform
import sys
import time
import uuid
import shutil
# macOS fork-safety: Python programs that use multiprocessing (Thespian) can trigger
# an Objective-C runtime crash when forking after any thread has started (e.g. from
# background telemetry polling). Setting this env-var before any fork suppresses
# the crash. This must be done before the actor system (and its admin process) is
# bootstrapped. See: https://bugs.python.org/issue33725
if sys.platform == "darwin":
os.environ.setdefault("OBJC_DISABLE_INITIALIZE_FORK_SAFETY", "YES")
import thespian.actors
from solrorbit import PROGRAM_NAME, BANNER, FORUM_LINK, SKULL, check_python_version, doc_link, telemetry
from solrorbit import version, actor, config, paths, \
test_run_orchestrator, publisher, \
metrics, workload, exceptions, log
from solrorbit.builder import cluster_config, builder
from solrorbit.synthetic_data_generator import synthetic_data_generator_orchestrator
from solrorbit.workload_generator import workload_generator
from solrorbit.utils import io, convert, process, console, net, opts, versions
from solrorbit import aggregator
def create_arg_parser():
def positive_number(v):
value = int(v)
if value <= 0:
raise argparse.ArgumentTypeError(f"must be positive but was {value}")
return value
def non_empty_list(arg):
lst = opts.csv_to_list(arg)
if len(lst) < 1:
raise argparse.ArgumentError(argument=None, message="At least one argument required!")
return lst
def runtime_jdk(v):
if v == "bundled":
return v
else:
try:
return positive_number(v)
except argparse.ArgumentTypeError:
raise argparse.ArgumentTypeError(f"must be a positive number or 'bundled' but was {v}")
def supported_os_version(v):
if v:
min_solr_version = versions.Version.from_string(version.minimum_solr_version())
specified_version = versions.Version.from_string(v)
if specified_version < min_solr_version:
raise argparse.ArgumentTypeError(f"must be at least {min_solr_version} but was {v}")
return v
def add_workload_source(subparser):
workload_source_group = subparser.add_mutually_exclusive_group()
workload_source_group.add_argument(
"--workload-repository",
help="Define the repository from where solr-orbit will load workloads (default: default).",
# argparse is smart enough to use this default only if the user did not use --workload-path and also did not specify anything
default="default"
)
workload_source_group.add_argument(
"--workload-path",
help="Define the path to a workload.")
subparser.add_argument(
"--workload-revision",
help="Define a specific revision in the workload repository that solr-orbit should use.",
default=None)
# try to preload configurable defaults, but this does not work together with `--configuration-name` (which is undocumented anyway)
cfg = config.Config()
if cfg.config_present():
cfg.load_config()
preserve_install = cfg.opts("defaults", "preserve_benchmark_candidate", default_value=False, mandatory=False)
else:
preserve_install = False
parser = argparse.ArgumentParser(prog=PROGRAM_NAME,
description=BANNER + "\n\n A macrobenchmarking tool for Apache Solr",
epilog="Find out more at {}".format(console.format.link(doc_link())),
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-v', '--version', action='version', version="%(prog)s " + version.version())
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
subparsers = parser.add_subparsers(
title="subcommands",
dest="subcommand",
help="")
test_run_parser = subparsers.add_parser("run", help="Run a benchmark")
# change in favor of "list telemetry", "list workloads", "list pipelines"
list_parser = subparsers.add_parser("list", help="List configuration options")
list_parser.add_argument(
"configuration",
metavar="configuration",
help="The configuration for which the tool should show the available options. "
"Possible values are: telemetry, workloads, pipelines, test-runs, cluster-configs",
choices=["telemetry", "workloads", "pipelines", "test-runs", "aggregated-results",
"cluster-configs"])
list_parser.add_argument(
"--limit",
help="Limit the number of search results for recent test-runs (default: 10).",
default=10,
)
add_workload_source(list_parser)
info_parser = subparsers.add_parser("info", help="Show info about a workload")
add_workload_source(info_parser)
info_parser.add_argument(
"--workload",
"-w",
help=f"Define the workload to use. List possible workloads with `{PROGRAM_NAME} list workloads`."
# we set the default value later on because we need to determine whether the user has provided this value.
# default="geonames"
)
info_parser.add_argument(
"--workload-params",
"-wp",
help="Define a comma-separated list of key:value pairs that are injected verbatim to the workload as variables.",
default=""
)
info_parser.add_argument(
"--test-procedure",
help=f"Define the test_procedure to use. List possible test_procedures for workloads with `{PROGRAM_NAME} list workloads`."
)
info_task_filter_group = info_parser.add_mutually_exclusive_group()
info_task_filter_group.add_argument(
"--include-tasks",
help="Defines a comma-separated list of tasks to run. By default all tasks of a test_procedure are run.")
info_task_filter_group.add_argument(
"--exclude-tasks",
help="Defines a comma-separated list of tasks not to run. By default all tasks of a test_procedure are run.")
synthetic_data_generator_parser = subparsers.add_parser("generate-data",
help="Generate synthetic data based on existing index mappings or custom module." +
"This data can be ported into Solr Orbit workloads." )
exclusive_file_inputs = synthetic_data_generator_parser.add_mutually_exclusive_group(required=True)
exclusive_file_inputs.add_argument(
"--index-mappings",
"-i",
help="Index mappings (JSON) to generate synthetic data from."
)
exclusive_file_inputs.add_argument(
"--custom-module",
"-m",
help="Custom Python module that defines how to generate documents. " +
"It can contain function definitions and even class definitions. " +
"This gives users more granular control over how data is generated. " +
"This module must contain generate_synthetic_document() definition."
)
exclusive_params = synthetic_data_generator_parser.add_mutually_exclusive_group(required=True)
exclusive_params.add_argument(
"--total-size",
"-s",
type=int,
help="Total size in GB of synthetically generated data corpora"
)
synthetic_data_generator_parser.add_argument(
"--index-name",
"-n",
required=True,
help="Index name associated with generated corpora"
)
synthetic_data_generator_parser.add_argument(
"--output-path",
"-p",
default=os.path.join(os.getcwd(), "generated_corpora"),
help="Output path for data corpora. Data corpora will be written in a directory."
)
synthetic_data_generator_parser.add_argument(
"--custom-config",
"-c",
default=None,
help="Optional config where users can specify overrides for mapping synthetic data generator or values that module should use."
)
synthetic_data_generator_parser.add_argument(
"--test-document",
"-t",
default=False,
action="store_true",
help="Generates a single synthetic document and displays it to the console so that users can validate generated values and output."
)
create_workload_parser = subparsers.add_parser("create-workload", help="Create a workload from existing data")
create_workload_parser.add_argument(
"--workload",
"-w",
required=True,
help="Name of the generated workload")
create_workload_parser.add_argument(
"--indices",
"-i",
type=non_empty_list,
required=True,
help="Comma-separated list of indices to include in the workload")
create_workload_parser.add_argument(
"--target-hosts",
"-t",
default="",
required=True,
help="Comma-separated list of host:port pairs which should be targeted")
create_workload_parser.add_argument(
"--client-options",
"-c",
default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS,
help=f"Comma-separated list of client options to use. (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS})")
create_workload_parser.add_argument(
"--output-path",
default=os.path.join(os.getcwd(), "workloads"),
help="Workload output directory (default: workloads/)")
create_workload_parser.add_argument(
"--custom-queries",
type=argparse.FileType('r'),
help="Input JSON file to use containing custom workload queries that override the default match_all query")
create_workload_parser.add_argument(
"--number-of-docs",
action=opts.StoreKeyPairAsDict,
nargs='+',
metavar="KEY:VAL",
help="Map of index name and integer doc count to extract. Ensure that index name also exists in --indices parameter. " +
"To specify several indices and doc counts, use format: <index1>:<doc_count1> <index2>:<doc_count2> ...")
create_workload_parser.add_argument(
"--sample-frequency",
action=opts.StoreKeyPairAsDict,
nargs='+',
metavar="KEY:VAL",
help="Map of index name and an integer, representing the sample frequency of docs that should be extracted per index. " +
"Ensure that index name also exists in --indices parameter. " +
"To specify several indices and doc counts, use format: <index1>:<sample-frequency-1> <index2>:<sample-frequency-2> ...")
convert_workload_parser = subparsers.add_parser(
"convert-workload",
help="Convert an OpenSearch Benchmark workload to Solr-native format"
)
convert_workload_parser.add_argument(
"--workload-path",
required=True,
help="Path to the source OpenSearch Benchmark workload directory (must contain workload.json)."
)
convert_workload_parser.add_argument(
"--output-path",
default=None,
help="Path where the converted Solr workload will be written "
"(default: <workload-path>-solr)."
)
convert_workload_parser.add_argument(
"--force",
action="store_true",
default=False,
help="Overwrite an existing converted workload directory."
)
compare_parser = subparsers.add_parser("compare", help="Compare two test_runs")
compare_parser.add_argument(
"--baseline",
"-b",
required=True,
help=f"TestRun ID of the baseline (see {PROGRAM_NAME} list test-runs).")
compare_parser.add_argument(
"--contender",
"-c",
required=True,
help=f"TestRun ID of the contender (see {PROGRAM_NAME} list test-runs).")
compare_parser.add_argument(
"--percentiles",
help=f"A comma-separated list of percentiles to report latency and service time."
f"(default: {metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES}).",
default=metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES)
compare_parser.add_argument(
"--results-format",
help="Define the output format for the command line results (default: markdown).",
choices=["markdown", "csv"],
default="markdown")
compare_parser.add_argument(
"--results-numbers-align",
help="Define the output column number alignment for the command line results (default: right).",
choices=["right", "center", "left", "decimal"],
default="right")
compare_parser.add_argument(
"--results-file",
help="Write the command line results also to the provided file.",
default="")
compare_parser.add_argument(
"--show-in-results",
help="Whether to include the comparison in the results file.",
default=True)
visualize_parser = subparsers.add_parser("visualize", help="Generate HTML visualization for a test run")
visualize_parser.add_argument(
"--test-run-id",
"-tid",
dest="test_run_id",
required=True,
help=f"TestRun ID to visualize (see {PROGRAM_NAME} list test_runs).")
visualize_parser.add_argument(
"--output-path",
dest="output_path",
help="Path where the HTML report should be saved. If not specified, it will be saved in the test run directory, where test_run.json can be found.",
default=None)
aggregate_parser = subparsers.add_parser("aggregate", help="Aggregate multiple test-runs")
aggregate_parser.add_argument(
"--test-runs",
type=non_empty_list,
required=True,
help="Comma-separated list of TestRun IDs to aggregate")
aggregate_parser.add_argument(
"--test-runs-id",
"-tid",
help="Define a unique id for this aggregated test-run.",
default="")
aggregate_parser.add_argument(
"--results-file",
help="Write the aggregated results to the provided file.",
default="")
aggregate_parser.add_argument(
"--workload-repository",
help="Define the repository from where solr-orbit will load workloads (default: default).",
default="default")
download_parser = subparsers.add_parser("download", help="Downloads an artifact")
download_parser.add_argument(
"--cluster-config-repository",
help="Define the repository from where solr-orbit will load cluster-configs (default: default).",
default="default")
download_parser.add_argument(
"--cluster-config-revision",
help="Define a specific revision in the cluster-config repository that solr-orbit should use.",
default=None)
download_parser.add_argument(
"--cluster-config-path",
help="Define the path to the cluster-config and plugin configurations to use.")
download_parser.add_argument(
"--distribution-version",
type=supported_os_version,
help="Define the version of the distribution to download. "
"Check https://projects.apache.org/project.html?solr for released versions.",
default="")
download_parser.add_argument(
"--distribution-repository",
help="Define the repository from where the distribution should be downloaded (default: release).",
default="release")
download_parser.add_argument(
"--cluster-config",
help=f"Define the cluster-config to use. List possible "
f"cluster-configs with `{PROGRAM_NAME} list "
f"cluster-configs` (default: defaults).",
default="defaults") # optimized for local usage
download_parser.add_argument(
"--cluster-config-params",
help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the cluster-config.",
default=""
)
install_parser = subparsers.add_parser("install", help="Installs a Solr node locally")
install_parser.add_argument(
"--revision",
help="Define the source code revision for building the benchmark candidate. 'current' uses the source tree as is,"
" 'latest' fetches the latest version on main. It is also possible to specify a commit id or a timestamp."
" The timestamp must be specified as: \"@ts\" where \"ts\" must be a valid ISO 8601 timestamp, "
"e.g. \"@2013-07-27T10:37:00Z\" (default: current).",
default="current") # optimized for local usage, don't fetch sources
# Intentionally undocumented as we do not consider Docker a fully supported option.
install_parser.add_argument(
"--build-type",
help=argparse.SUPPRESS,
choices=["tar", "docker"],
default="tar")
install_parser.add_argument(
"--cluster-config-repository",
help="Define the repository from where solr-orbit will load cluster-configs (default: default).",
default="default")
install_parser.add_argument(
"--cluster-config-revision",
help="Define a specific revision in the cluster-config repository that solr-orbit should use.",
default=None)
install_parser.add_argument(
"--cluster-config-path",
help="Define the path to the cluster-config and plugin configurations to use.")
install_parser.add_argument(
"--runtime-jdk",
type=runtime_jdk,
help="The major version of the runtime JDK to use during installation.",
default=None)
install_parser.add_argument(
"--distribution-repository",
help="Define the repository from where the distribution should be downloaded (default: release).",
default="release")
install_parser.add_argument(
"--distribution-version",
type=supported_os_version,
help="Define the version of the distribution to download. "
"Check https://archive.apache.org/dist/solr/solr/ for released versions.",
default="")
install_parser.add_argument(
"--cluster-config",
help=f"Define the cluster-config to use. List possible "
f"cluster-configs with `{PROGRAM_NAME} list "
f"cluster-configs` (default: defaults).",
default="defaults") # optimized for local usage
install_parser.add_argument(
"--cluster-config-params",
help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the cluster-config.",
default=""
)
install_parser.add_argument(
"--solr-modules",
help="Comma-separated list of Solr modules to enable (sets SOLR_MODULES). "
"Example: --solr-modules=analytics,extraction",
default="")
install_parser.add_argument(
"--plugin-params",
help="Define a comma-separated list of key:value pairs that are injected verbatim to all plugins as variables.",
default=""
)
install_parser.add_argument(
"--network-host",
help="The IP address to bind to and publish",
default="127.0.0.1"
)
install_parser.add_argument(
"--http-port",
help="The port to expose for HTTP traffic",
default="38983"
)
install_parser.add_argument(
"--node-name",
help="The name of this Solr node",
default="benchmark-node-0"
)
install_parser.add_argument(
"--master-nodes",
help="A comma-separated list of the initial master node names",
default=""
)
install_parser.add_argument(
"--seed-hosts",
help="A comma-separated list of the initial seed host IPs",
default=""
)
start_parser = subparsers.add_parser("start", help="Starts a Solr node locally")
start_parser.add_argument(
"--installation-id",
required=True,
help="The id of the installation to start",
# the default will be dynamically derived by
# test_run_orchestrator based on the
# presence / absence of other command line options
default="")
start_parser.add_argument(
"--test-run-id",
"-tid",
required=True,
help="Define a unique id for this test_run.",
default="")
start_parser.add_argument(
"--runtime-jdk",
type=runtime_jdk,
help="The major version of the runtime JDK to use.",
default=None)
start_parser.add_argument(
"--telemetry",
help=f"Enable the provided telemetry devices, provided as a comma-separated list. List possible telemetry "
f"devices with `{PROGRAM_NAME} list telemetry`.",
default="")
start_parser.add_argument(
"--telemetry-params",
help="Define a comma-separated list of key:value pairs that are injected verbatim to the telemetry devices as parameters.",
default=""
)
stop_parser = subparsers.add_parser("stop", help="Stops a Solr node locally")
stop_parser.add_argument(
"--installation-id",
required=True,
help="The id of the installation to stop",
# the default will be dynamically derived by
# test_run_orchestrator based on the
# presence / absence of other command line options
default="")
stop_parser.add_argument(
"--preserve-install",
help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).",
default=preserve_install,
action="store_true")
for p in [list_parser, test_run_parser]:
p.add_argument(
"--distribution-version",
type=supported_os_version,
help="Define the version of the distribution to download.",
default="")
p.add_argument(
"--cluster-config-path",
help="Define the path to the cluster-config and plugin configurations to use.")
p.add_argument(
"--cluster-config-repository",
help="Define repository from where solr-orbit will load cluster-configs (default: default).",
default="default")
p.add_argument(
"--cluster-config-revision",
help="Define a specific revision in the cluster-config repository that solr-orbit should use.",
default=None)
test_run_parser.add_argument(
"--test-run-id",
"-tid",
help="Define a unique id for this test-run.",
default=str(uuid.uuid4()))
test_run_parser.add_argument(
"--pipeline",
help="Select the pipeline to run.",
# the default will be dynamically derived by
# test_run_orchestrator based on the
# presence / absence of other command line options
default="")
test_run_parser.add_argument(
"--revision",
help="Define the source code revision for building the benchmark candidate. 'current' uses the source tree as is,"
" 'latest' fetches the latest version on main. It is also possible to specify a commit id or a timestamp."
" The timestamp must be specified as: \"@ts\" where \"ts\" must be a valid ISO 8601 timestamp, "
"e.g. \"@2013-07-27T10:37:00Z\" (default: current).",
default="current") # optimized for local usage, don't fetch sources
add_workload_source(test_run_parser)
test_run_parser.add_argument(
"--workload",
"-w",
help=f"Define the workload to use. List possible workloads with `{PROGRAM_NAME} list workloads`."
)
test_run_parser.add_argument(
"--workload-params",
"-wp",
help="Define a comma-separated list of key:value pairs that are injected verbatim to the workload as variables.",
default=""
)
test_run_parser.add_argument(
"--test-procedure",
help=f"Define the test_procedure to use. List possible test_procedures for workloads with `{PROGRAM_NAME} list workloads`.")
test_run_parser.add_argument(
"--cluster-config",
help=f"Define the cluster-config to use. List possible "
f"cluster-configs with `{PROGRAM_NAME} list "
f"cluster-configs` (default: defaults).",
default="defaults") # optimized for local usage
test_run_parser.add_argument(
"--cluster-config-params",
help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the cluster-config.",
default=""
)
test_run_parser.add_argument(
"--runtime-jdk",
type=runtime_jdk,
help="The major version of the runtime JDK to use.",
default=None)
test_run_parser.add_argument(
"--solr-modules",
help="Comma-separated list of Solr modules to enable (sets SOLR_MODULES). "
"Example: --solr-modules=analytics,extraction",
default="")
test_run_parser.add_argument(
"--plugin-params",
help="Define a comma-separated list of key:value pairs that are injected verbatim to all plugins as variables.",
default=""
)
test_run_parser.add_argument(
"--target-hosts",
"-t",
help="Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' "
"(default: localhost:9200).",
default="") # actually the default is pipeline specific and it is set later
test_run_parser.add_argument(
"--worker-ips",
help="Define a comma-separated list of hosts which should generate load (default: localhost).",
default="localhost")
test_run_parser.add_argument(
"--grpc-target-hosts",
help="Define a comma-separated list of host:port pairs for gRPC endpoints "
"(default: localhost:9400).",
default="")
test_run_parser.add_argument(
"--client-options",
"-c",
help=f"Define a comma-separated list of client options to use. The options will be passed to the benchmark "
f"client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).",
default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS)
test_run_parser.add_argument("--on-error",
choices=["continue", "abort"],
help="Controls how solr-orbit behaves on response errors (default: continue).",
default="continue")
test_run_parser.add_argument(
"--telemetry",
help=f"Enable the provided telemetry devices, provided as a comma-separated list. List possible telemetry "
f"devices with `{PROGRAM_NAME} list telemetry`.",
default="")
test_run_parser.add_argument(
"--telemetry-params",
help="Define a comma-separated list of key:value pairs that are injected verbatim to the telemetry devices as parameters.",
default=""
)
test_run_parser.add_argument(
"--distribution-repository",
help="Define the repository from where the distribution should be downloaded (default: release).",
default="release")
task_filter_group = test_run_parser.add_mutually_exclusive_group()
task_filter_group.add_argument(
"--include-tasks",
help="Defines a comma-separated list of tasks to run. By default all tasks of a test_procedure are run.")
task_filter_group.add_argument(
"--exclude-tasks",
help="Defines a comma-separated list of tasks not to run. By default all tasks of a test_procedure are run.")
test_run_parser.add_argument(
"--user-tag",
help="Define a user-specific key-value pair (separated by ':'). It is added to each metric record as meta info. "
"Example: intention:baseline-ticket-12345",
default="")
test_run_parser.add_argument(
"--results-format",
help="Define the output format for the command line results (default: markdown).",
choices=["markdown", "csv"],
default="markdown")
test_run_parser.add_argument(
"--results-numbers-align",
help="Define the output column number alignment for the command line results (default: right).",
choices=["right", "center", "left", "decimal"],
default="right")
test_run_parser.add_argument(
"--show-in-results",
help="Define which values are shown in the summary results published (default: available).",
choices=["available", "all-percentiles", "all"],
default="available")
test_run_parser.add_argument(
"--results-file",
help="Write the command line results also to the provided file.",
default="")
test_run_parser.add_argument(
"--preserve-install",
help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).",
default=preserve_install,
action="store_true")
test_run_parser.add_argument(
"--test-mode",
help="Runs the given workload in 'test mode'. Meant to check a workload for errors but not for real benchmarks (default: false).",
default=False,
action="store_true")
test_run_parser.add_argument(
"--enable-worker-coordinator-profiling",
help="Enables a profiler for analyzing the performance of calls in solr-orbit's worker coordinator (default: false).",
default=False,
action="store_true")
test_run_parser.add_argument(
"--enable-assertions",
help="Enables assertion checks for tasks (default: false).",
default=False,
action="store_true")
test_run_parser.add_argument(
"--kill-running-processes",
"-k",
action="store_true",
default=False,
help="If any processes is running, it is going to kill them and allow solr-orbit to continue to run."
)
test_run_parser.add_argument(
"--latency-percentiles",
help=f"A comma-separated list of percentiles to report for latency "
f"(default: {metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES}).",
default=metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES
)
test_run_parser.add_argument(
"--throughput-percentiles",
help=f"A comma-separated list of percentiles to report for throughput, in addition to mean/median/max/min "
f"(default: {metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES}).",
default=metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES
)
test_run_parser.add_argument(
"--randomization-enabled",
help="Runs the given workload with query randomization enabled (default: false).",
default=False,
action="store_true")
test_run_parser.add_argument(
"--randomization-repeat-frequency",
help=f"The repeat_frequency for query randomization. Ignored if randomization is off"
f"(default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_RF}).",
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_RF)
test_run_parser.add_argument(
"--randomization-n",
help=f"The number of standard values to generate for each field for query randomization."
f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_N}).",
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_N)
test_run_parser.add_argument(
"--randomization-alpha",
help=f"The alpha parameter used for the Zipf distribution for query randomization. Low values spread the distribution out, "
f"high values favor the most common queries. "
f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA}).",
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA)
test_run_parser.add_argument(
"--test-iterations",
help="The number of times to run the workload (default: 1).",
default=1)
test_run_parser.add_argument(
"--aggregate",
type=lambda x: (str(x).lower() in ['true', '1', 'yes', 'y']),
help="Aggregate the results of multiple test runs (default: true).",
default=True)
test_run_parser.add_argument(
"--sleep-timer",
help="Sleep for the specified number of seconds before starting the next test run (default: 5).",
default=5)
test_run_parser.add_argument(
"--cancel-on-error",
action="store_true",
help="Stop running tests if an error occurs in one of the test iterations (default: false).",
)
test_run_parser.add_argument(
"--load-test-qps",
help="Run a load test on your cluster, up to a certain QPS value (default: 0)",
default=0
)
test_run_parser.add_argument(
"--redline-test",
help="Run a redline test on your cluster, up to a certain QPS value (default: 1000)",
nargs='?',
const=1000, # Value to use when flag is present but no value given
default=0, # Value to use when flag is not present
type=int
)
test_run_parser.add_argument(
"--redline-scale-step",
type=int,
help="How many clients to add while scaling up during redline testing (default: 5).",
default=None
)
test_run_parser.add_argument(
"--redline-scaledown-percentage",
type=float,
help="What percentage of clients to remove when errors occur (default: 10%%).",
default=None
)
test_run_parser.add_argument(
"--redline-post-scaledown-sleep",
type=int,
help="How many seconds to wait before scaling up again after a scale down (default: 30).",
default=None
)
test_run_parser.add_argument(
"--redline-max-clients",
type=int,
help="Maximum number of clients to allow during redline testing. If not set, will default to clients defined in the test procedure.",
default=None
)
test_run_parser.add_argument(
"--redline-max-cpu-usage",
type=int,
help="Maximum CPU utilization before scaling back client numbers. Used to activate CPU-based feedback in solr-orbit.",
default=None
)
test_run_parser.add_argument(
"--redline-cpu-window-seconds",
type=int,
help="How many seconds the window for average CPU load should be in seconds during CPU-based redline testing. (Default: 30)",
default=None
)
test_run_parser.add_argument(
"--redline-cpu-check-interval",
type=int,
help="How many seconds between CPU checks there should be during CPU-based redline testing. (Default: 30)",
default=None
)
test_run_parser.add_argument(
"--visualize",
help="Generate HTML visualizations for benchmark results. Stored in the test runs directory by default",
action="store_true",
default=False
)
test_run_parser.add_argument(
"--visualize-output-path",
help="Path where the HTML visualization should be saved when --visualize is enabled. If not specified, it will be saved in the test run directory.",
default=None
)
###############################################################################
#
# The options below are undocumented and can be removed or changed at any time.
#
###############################################################################
# This option is intended to tell solr-orbit to assume a different start date than 'now'. This is effectively just useful for things like
# backtesting or a benchmark run across environments (think: comparison of EC2 and bare metal) but never for the typical user.
test_run_parser.add_argument(
"--effective-start-date",
help=argparse.SUPPRESS,
type=lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"),
default=None)
# Skips checking that the REST API is available before proceeding with the benchmark
test_run_parser.add_argument(
"--skip-rest-api-check",
help=argparse.SUPPRESS,
action="store_true",
default=False)
for p in [list_parser, test_run_parser, compare_parser, aggregate_parser,
download_parser, install_parser, start_parser, stop_parser, info_parser,
synthetic_data_generator_parser, create_workload_parser, visualize_parser,
convert_workload_parser]:
# This option is needed to support a separate configuration for the integration tests on the same machine
p.add_argument(
"--configuration-name",
help=argparse.SUPPRESS,
default=None)
p.add_argument(
"--quiet",
help="Suppress as much as output as possible (default: false).",
default=False,
action="store_true")
p.add_argument(
"--offline",
help="Assume that solr-orbit has no connection to the Internet (default: false).",
default=False,
action="store_true")
return parser
def dispatch_list(cfg):
what = cfg.opts("system", "list.config.option")
if what == "telemetry":
telemetry.list_telemetry()
elif what == "workloads":
workload.list_workloads(cfg)
elif what == "pipelines":
test_run_orchestrator.list_pipelines()
elif what == "test-runs":
metrics.list_test_runs(cfg)
elif what == "aggregated-results":
metrics.list_aggregated_results(cfg)
elif what == "cluster-configs":
cluster_config.list_cluster_configs(cfg)
else:
raise exceptions.SystemSetupError("Cannot list unknown configuration option [%s]" % what)
def dispatch_visualize(cfg):
test_run_id = cfg.opts("system", "test_run.id")
output_path = cfg.opts("visualize", "output.path", mandatory=False, default_value=None)
store = metrics.test_run_store(cfg)
try:
# load the run
te = store.find_by_test_run_id(test_run_id)
# render, write, and open the HTML
html_path = (
store.file_store.store_html_results(te)
if isinstance(store, metrics.CompositeTestRunStore)
else store.store_html_results(te)
)
# if the user asked for --output-path, just copy the file there
if output_path:
dest = os.path.expanduser(output_path)
os.makedirs(os.path.dirname(dest), exist_ok=True)
shutil.copy2(html_path, dest)
console.info(f"HTML report copied to: {dest}")
except exceptions.NotFound:
raise exceptions.SystemSetupError(f"No test run with id [{test_run_id}]")
except Exception as e:
raise exceptions.SystemSetupError(f"Error visualizing test run: {e}")
def print_help_on_errors():
heading = "Getting further help:"
console.println(console.format.bold(heading))
console.println(console.format.underline_for(heading))
console.println(f"* Check the log files in {paths.logs()} for errors.")
console.println(f"* Read the documentation at {console.format.link(doc_link())}.")
console.println(f"* Ask a question on the forum at {console.format.link(FORUM_LINK)}.")
console.println(f"* Raise an issue in the project issue tracker "
f"and include the log files in {paths.logs()}.")
def run_test(cfg, kill_running_processes=False):
logger = logging.getLogger(__name__)
if kill_running_processes:
logger.info("Killing running solr-orbit processes")
# Kill any lingering solr-orbit processes before attempting to continue - the actor system needs to be a singleton on this machine
# noinspection PyBroadException
try:
process.kill_running_benchmark_instances()
except BaseException:
logger.exception(
"Could not terminate potentially running solr-orbit instances correctly. Attempting to go on anyway.")
else:
other_benchmark_processes = process.find_all_other_benchmark_processes()
if other_benchmark_processes:
pids = [p.pid for p in other_benchmark_processes]
msg = f"There are other solr-orbit processes running on this machine (PIDs: {pids}) but only one " \
f"benchmark is allowed to run at the same time.\n\nYou can use --kill-running-processes flag " \
f"to kill running processes automatically and allow solr-orbit to continue to run a new benchmark. " \
f"Otherwise, you need to manually kill them."
raise exceptions.BenchmarkError(msg)
# redline testing: check metrics store type before running cpu based feedback test
cpu_max = cfg.opts("workload", "redline.max_cpu_usage", default_value=None, mandatory=False)
if cpu_max is not None:
store = metrics.metrics_store(cfg, read_only=False)
try:
if isinstance(store, metrics.InMemoryMetricsStore):
raise exceptions.SystemSetupError(
"CPU-based feedback requires a metrics store, but you're using the in-memory store. "
"Specify a metrics store in your benchmark.ini or via CLI to continue."
)
finally:
store.close()
with_actor_system(test_run_orchestrator.run, cfg)
def with_actor_system(runnable, cfg):
logger = logging.getLogger(__name__)
already_running = actor.actor_system_already_running()
logger.info("Actor system already running locally? [%s]", str(already_running))
try:
actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running)
# We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1
cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running)
# This happens when the admin process could not be started, e.g. because it could not open a socket.
except thespian.actors.InvalidActorAddress:
logger.info("Falling back to offline actor system.")
actor.use_offline_actor_system()
actors = actor.bootstrap_actor_system(try_join=False, prefer_local_only=True)
except Exception as e:
logger.exception("Could not bootstrap actor system.")
if str(e) == "Unable to determine valid external socket address.":
console.warn("Could not determine a socket address. Are you running without any network? Switching to degraded mode.",
logger=logger)
logger.info("Falling back to offline actor system.")
actor.use_offline_actor_system()
actors = actor.bootstrap_actor_system(try_join=False, prefer_local_only=True)
else:
raise
try:
runnable(cfg)
finally:
# We only shutdown the actor system if it was not already running before
if not already_running:
shutdown_complete = False
times_interrupted = 0
# give some time for any outstanding messages to be delivered to the actor system
time.sleep(3)
while not shutdown_complete and times_interrupted < 2:
try:
logger.info("Attempting to shutdown internal actor system.")
actors.shutdown()
# note that this check will only evaluate to True for a TCP-based actor system.
timeout = 15
while actor.actor_system_already_running() and timeout > 0:
logger.info("Actor system is still running. Waiting...")
time.sleep(1)
timeout -= 1
if timeout > 0:
shutdown_complete = True
logger.info("Shutdown completed.")
else:
logger.warning("Shutdown timed out. Actor system is still running.")
break
except KeyboardInterrupt:
times_interrupted += 1
logger.warning("User interrupted shutdown of internal actor system.")
console.info("Please wait a moment for solr-orbit's internal components to shutdown.")
if not shutdown_complete and times_interrupted > 0:
logger.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.",
times_interrupted)
console.println("")
console.warn("Terminating now at the risk of leaving child processes behind.")
console.println("")
console.warn("The next test_run may fail due to an unclean shutdown.")
console.println("")
console.println(SKULL)
console.println("")
elif not shutdown_complete:
console.warn("Could not terminate all internal processes within timeout. Please check and force-terminate "
"all solr-orbit processes.")
def configure_telemetry_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "telemetry", "devices", opts.csv_to_list(args.telemetry))
cfg.add(config.Scope.applicationOverride, "telemetry", "params", opts.to_dict(args.telemetry_params))
def configure_workload_params(arg_parser, args, cfg, command_requires_workload=True):
cfg.add(config.Scope.applicationOverride, "workload", "repository.revision", args.workload_revision)
# We can assume here that if a workload-path is given, the user did not specify a repository either (although argparse sets it to
# its default value)
if args.workload_path:
cfg.add(config.Scope.applicationOverride, "workload", "workload.path", os.path.abspath(io.normalize_path(args.workload_path)))
cfg.add(config.Scope.applicationOverride, "workload", "repository.name", None)
if args.workload_revision:
# stay as close as possible to argparse errors although we have a custom validation.
arg_parser.error("argument --workload-revision not allowed with argument --workload-path")
if command_requires_workload and args.workload:
# stay as close as possible to argparse errors although we have a custom validation.
arg_parser.error("argument --workload not allowed with argument --workload-path")
else:
cfg.add(config.Scope.applicationOverride, "workload", "repository.name", args.workload_repository)
if command_requires_workload:
if not args.workload:
raise arg_parser.error("argument --workload is required")
cfg.add(config.Scope.applicationOverride, "workload", "workload.name", args.workload)
if command_requires_workload:
cfg.add(config.Scope.applicationOverride, "workload", "params", opts.to_dict(args.workload_params))
cfg.add(config.Scope.applicationOverride, "workload", "test_procedure.name", args.test_procedure)
cfg.add(config.Scope.applicationOverride, "workload", "include.tasks", opts.csv_to_list(args.include_tasks))
cfg.add(config.Scope.applicationOverride, "workload", "exclude.tasks", opts.csv_to_list(args.exclude_tasks))
def configure_builder_params(args, cfg, command_requires_cluster_config=True):
if args.cluster_config_path:
cfg.add(
config.Scope.applicationOverride, "builder",
"cluster_config.path", os.path.abspath(
io.normalize_path(args.cluster_config_path)))
cfg.add(config.Scope.applicationOverride, "builder", "repository.name", None)
cfg.add(config.Scope.applicationOverride, "builder", "repository.revision", None)
else:
cfg.add(config.Scope.applicationOverride, "builder", "repository.name", args.cluster_config_repository)
cfg.add(config.Scope.applicationOverride, "builder", "repository.revision", args.cluster_config_revision)
if command_requires_cluster_config:
if args.distribution_version:
cfg.add(config.Scope.applicationOverride, "builder", "distribution.version", args.distribution_version)
cfg.add(config.Scope.applicationOverride, "builder", "distribution.repository", args.distribution_repository)
cfg.add(config.Scope.applicationOverride, "builder",
"cluster_config.names", opts.csv_to_list(
args.cluster_config))
cfg.add(config.Scope.applicationOverride, "builder",
"cluster_config.params", opts.to_dict(
args.cluster_config_params))
cfg.add(config.Scope.applicationOverride, "solr", "modules", getattr(args, "solr_modules", ""))
pipeline = getattr(args, "pipeline", None)
if pipeline == "benchmark-only" and args.cluster_config != "defaults":
raise SystemExit(
"ERROR: --cluster-config is only valid for provisioning pipelines "
"(from-distribution, docker, from-sources). "
"It cannot be used with the 'benchmark-only' pipeline."
)
def configure_connection_params(arg_parser, args, cfg):
# Check if multiple hosts are specified using comma separator
if args.target_hosts and "," in args.target_hosts:
console.warn(
"WARNING: Benchmark runs with multiple target hosts should be passed in as a JSON file such as:\n"
"{\n"
' "default": [\n'
' {"host": "127.0.0.1", "port": 9200} # Specify nodes for cluster 1\n'
" ],\n"
' "remote":[\n'
' {"host": "10.127.0.3", "port": 9200} # Specify nodes for cluster 2\n'
" ]\n"
"}"
)
# Also needed by builder (-> telemetry) - duplicate by module?
target_hosts = opts.TargetHosts(args.target_hosts)
cfg.add(config.Scope.applicationOverride, "client", "hosts", target_hosts)
client_options = opts.ClientOptions(args.client_options, target_hosts=target_hosts)
cfg.add(config.Scope.applicationOverride, "client", "options", client_options)
# Configure gRPC target hosts
grpc_target_hosts = opts.TargetHosts(args.grpc_target_hosts) if hasattr(args, "grpc_target_hosts") and args.grpc_target_hosts else None
cfg.add(config.Scope.applicationOverride, "client", "grpc_hosts", grpc_target_hosts)
if "timeout" not in client_options.default:
console.info("You did not provide an explicit timeout in the client options. Assuming default of 10 seconds.")
if list(target_hosts.all_hosts) != list(client_options.all_client_options):
arg_parser.error("--target-hosts and --client-options must define the same keys for multi cluster setups.")
def configure_reporting_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "reporting", "format", args.results_format)
cfg.add(config.Scope.applicationOverride, "reporting", "values", args.show_in_results)
cfg.add(config.Scope.applicationOverride, "reporting", "output.path", args.results_file)
cfg.add(config.Scope.applicationOverride, "reporting", "numbers.align", args.results_numbers_align)
def prepare_test_runs_dict(args, cfg):
cfg.add(config.Scope.applicationOverride, "reporting", "output.path", args.results_file)
test_runs_dict = {}
if args.test_runs:
for run in args.test_runs:
run = run.strip()
if run:
test_runs_dict[run] = None
return test_runs_dict
def configure_test(arg_parser, args, cfg):
# As the run command is doing more work than necessary at the moment, we duplicate several parameters
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "run" only
# to run the actual benchmark (i.e. generating load).
print_test_run_id(args)
if args.effective_start_date:
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date)
cfg.add(config.Scope.applicationOverride, "system", "test_run.id", args.test_run_id)
# use the test-run id implicitly also as the install id.
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_run_id)
cfg.add(config.Scope.applicationOverride, "test_run", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "test_run", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error)
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"worker_ips",
opts.csv_to_list(args.worker_ips))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "load.test.clients", int(args.load_test_qps))
if args.redline_test:
cfg.add(config.Scope.applicationOverride, "workload", "redline.test", int(args.redline_test))
cfg.add(config.Scope.applicationOverride, "workload", "redline.scale_step", args.redline_scale_step)
cfg.add(config.Scope.applicationOverride, "workload", "redline.scale_down_pct", args.redline_scaledown_percentage)
cfg.add(config.Scope.applicationOverride, "workload", "redline.sleep_seconds", args.redline_post_scaledown_sleep)
cfg.add(config.Scope.applicationOverride, "workload", "redline.max_clients", args.redline_max_clients)
cfg.add(config.Scope.applicationOverride, "workload", "redline.max_cpu_usage", args.redline_max_cpu_usage)
cfg.add(config.Scope.applicationOverride, "workload", "redline.cpu_window_seconds", args.redline_cpu_window_seconds)
cfg.add(config.Scope.applicationOverride, "workload", "redline.cpu_check_interval", args.redline_cpu_check_interval)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha)
cfg.add(config.Scope.applicationOverride, "workload", "visualize", args.visualize)
cfg.add(config.Scope.applicationOverride, "workload", "visualize.output.path", args.visualize_output_path)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
configure_builder_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision)
# cfg.add(config.Scope.applicationOverride, "builder",
# "cluster_config_instance.plugins", opts.csv_to_list(
# args.opensearch_plugins))
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params))
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))
configure_reporting_params(args, cfg)
def print_test_run_id(args):
console.info(f"[Test Run ID]: {args.test_run_id}")
def dispatch_sub_command(arg_parser, args, cfg):
sub_command = args.subcommand
cfg.add(config.Scope.application, "system", "quiet.mode", args.quiet)
cfg.add(config.Scope.application, "system", "offline.mode", args.offline)
try:
if sub_command == "compare":
configure_reporting_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "reporting", "percentiles", args.percentiles)
publisher.compare(cfg, args.baseline, args.contender)
elif sub_command == "aggregate":
test_runs_dict = prepare_test_runs_dict(args, cfg)
aggregator_instance = aggregator.Aggregator(cfg, test_runs_dict, args)
aggregator_instance.aggregate()
elif sub_command == "list":
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration)
cfg.add(config.Scope.applicationOverride, "system", "list.test_runs.max_results", args.limit)
configure_builder_params(args, cfg, command_requires_cluster_config=False)
configure_workload_params(arg_parser, args, cfg, command_requires_workload=False)
dispatch_list(cfg)
elif sub_command == "download":
configure_builder_params(args, cfg)
builder.download(cfg)
elif sub_command == "install":
cfg.add(config.Scope.applicationOverride, "system", "install.id", str(uuid.uuid4()))
cfg.add(config.Scope.applicationOverride, "builder", "network.host", args.network_host)
cfg.add(config.Scope.applicationOverride, "builder", "network.http.port", args.http_port)
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision)
cfg.add(config.Scope.applicationOverride, "builder", "build.type", args.build_type)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "builder", "node.name", args.node_name)
cfg.add(config.Scope.applicationOverride, "builder", "master.nodes", opts.csv_to_list(args.master_nodes))
cfg.add(config.Scope.applicationOverride, "builder", "seed.hosts", opts.csv_to_list(args.seed_hosts))
# cfg.add(config.Scope.applicationOverride, "builder",
# "cluster_config.plugins", opts.csv_to_list(
# args.opensearch_plugins))
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params))
configure_builder_params(args, cfg)
builder.install(cfg)
elif sub_command == "start":
print_test_run_id(args)
cfg.add(config.Scope.applicationOverride, "system", "test_run.id", args.test_run_id)
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
configure_telemetry_params(args, cfg)
builder.start(cfg)
elif sub_command == "stop":
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id)
builder.stop(cfg)
elif sub_command == "run":
iterations = int(args.test_iterations)
if iterations > 1:
test_runs = []
for _ in range(iterations):
try:
configure_test(arg_parser, args, cfg)
run_test(cfg, args.kill_running_processes)
time.sleep(int(args.sleep_timer))
test_runs.append(args.test_run_id)
args.test_run_id = str(uuid.uuid4())
except Exception as e:
console.error(f"Error occurred during test run {_+1}: {str(e)}")
if args.cancel_on_error:
console.info("Cancelling remaining test runs.")
break
if args.aggregate:
args.test_runs = test_runs
test_runs_dict = prepare_test_runs_dict(args, cfg)
aggregator_instance = aggregator.Aggregator(cfg, test_runs_dict, args)
aggregator_instance.aggregate()
elif args.test_iterations == 1:
configure_test(arg_parser, args, cfg)
run_test(cfg, args.kill_running_processes)
else:
console.info("Please enter a valid number of test iterations")
elif sub_command == "generate-data":
cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "index_name", args.index_name)
cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "index_mappings", args.index_mappings)
cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "custom_module", args.custom_module)
cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "custom_config", args.custom_config)
cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "output_path", args.output_path)
cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "total_size", args.total_size)
cfg.add(config.Scope.applicationOverride, "synthetic_data_generator", "test_document", args.test_document)
synthetic_data_generator_orchestrator.orchestrate_data_generation(cfg)
elif sub_command == "create-workload":
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "workload", "workload.name", args.workload)
cfg.add(config.Scope.applicationOverride, "workload", "custom_queries", args.custom_queries)
cfg.add(config.Scope.applicationOverride, "generator", "sample_frequency", args.sample_frequency)
configure_connection_params(arg_parser, args, cfg)
workload_generator.create_workload(cfg)
elif sub_command == "visualize":
cfg.add(config.Scope.applicationOverride, "system", "test_run.id", args.test_run_id)
cfg.add(config.Scope.applicationOverride, "visualize", "output.path", args.output_path)
# Always set visualize to true for the visualize command
cfg.add(config.Scope.applicationOverride, "workload", "visualize", True)
dispatch_visualize(cfg)
elif sub_command == "convert-workload":
from solrorbit.conversion import workload_converter
source_dir = os.path.abspath(args.workload_path)
output_dir = os.path.abspath(args.output_path) if args.output_path else source_dir.rstrip("/") + "-solr"
force = getattr(args, "force", False)
if workload_converter.is_already_converted(output_dir) and not force:
console.info(
f"Workload already converted at: {output_dir}\n"
"Use --force to overwrite."
)
return True
console.info(f"Converting workload: {source_dir} → {output_dir}")
result = workload_converter.convert_opensearch_workload(source_dir, output_dir)
console.println(f"\nConversion complete: {result['output_dir']}")
if result["skipped"]:
console.println(f" Skipped operations ({len(result['skipped'])}): {', '.join(result['skipped'])}")
if result["issues"]:
console.println(f" Issues ({len(result['issues'])}):")
for issue in result["issues"]:
console.println(f" - {issue}")
console.println(f" See {os.path.join(result['output_dir'], workload_converter.CONVERTED_MARKER)} for details.")
elif sub_command == "info":
configure_workload_params(arg_parser, args, cfg)
workload.workload_info(cfg)
else:
raise exceptions.SystemSetupError(f"Unknown subcommand [{sub_command}]")
return True
except exceptions.BenchmarkError as e:
logging.getLogger(__name__).exception("Cannot run subcommand [%s].", sub_command)
msg = str(e.message)
nesting = 0
while hasattr(e, "cause") and e.cause:
nesting += 1
e = e.cause
if hasattr(e, "message"):
msg += "\n%s%s" % ("\t" * nesting, e.message)
else:
msg += "\n%s%s" % ("\t" * nesting, str(e))
console.error(f"❌ Cannot {sub_command}. {msg}")
console.println("")
print_help_on_errors()
return False
except BaseException as e:
logging.getLogger(__name__).exception("A fatal error occurred while running subcommand [%s].", sub_command)
console.error(f"❌ Cannot {sub_command}. {e}.")
console.println("")
print_help_on_errors()
return False
def handle_command_suggestions():
"""
Check for common command mistakes and provide helpful suggestions
Returns True if suggestion was provided, False otherwise
"""
# check-deprecated-terms-disable-1x
DEPRECATED_SUBCOMMANDS = ["execute-test", "execute"]
if len(sys.argv) > 1 and sys.argv[1] in DEPRECATED_SUBCOMMANDS:
console.info("Did you mean 'run'?")
console.info(f"Example: {PROGRAM_NAME} run --workload=geonames --test-mode")
console.info(f"For more information, run: {PROGRAM_NAME} run --help")
return True
return False
def main():
check_python_version()
log.install_default_log_config()
log.configure_logging()
logger = logging.getLogger(__name__)
start = time.time()
# Early init of console output so we start to show everything consistently.
console.init(quiet=False)
# Handle command suggestions before argument parsing
if handle_command_suggestions():
sys.exit(1)
arg_parser = create_arg_parser()
args = arg_parser.parse_args()
console.init(quiet=args.quiet)
console.println(BANNER)
cfg = config.Config(config_name=args.configuration_name)
if not cfg.config_present():
cfg.install_default_config()
cfg.load_config(auto_upgrade=True)
cfg.add(config.Scope.application, "system", "time.start", datetime.datetime.utcnow())
# Local config per node
cfg.add(config.Scope.application, "node", "benchmark.root", paths.benchmark_root())
cfg.add(config.Scope.application, "node", "benchmark.cwd", os.getcwd())
logger.info("OS [%s]", str(platform.uname()))
logger.info("Python [%s]", str(sys.implementation))
logger.info("solr-orbit version [%s]", version.version())
logger.debug("Command line arguments: %s", args)
# Configure networking
net.init()
if not args.offline:
probing_url = cfg.opts("system", "probing.url", default_value="https://github.com", mandatory=False)
if not net.has_internet_connection(probing_url):
console.warn("No Internet connection detected. Automatic download of workload data sets etc. is disabled.",
logger=logger)
cfg.add(config.Scope.applicationOverride, "system", "offline.mode", True)
else:
logger.info("Detected a working Internet connection.")
success = dispatch_sub_command(arg_parser, args, cfg)
end = time.time()
if success:
message = "✅ SUCCESS"
status = 0
else:
message = "❌ FAILURE"
status = 64
message = message + f" (took {int(round(end - start))} seconds)"
console.println("")
console.info(message, overline="-", underline="-")
if hasattr(args, "results_file") and args.results_file and getattr(args, "results_format", None) != "csv":
with open(args.results_file, "a") as fh:
print("\n", message, file=fh)
sys.exit(status)
if __name__ == "__main__":
main()