blob: 2986646dae544714c4acb7f1f10665a92a75ba44 [file] [log] [blame]
use super::kind::BenchmarkKindCommand;
use super::props::{BenchmarkKindProps, BenchmarkTransportProps};
use super::{defaults::*, transport::BenchmarkTransportCommand};
use clap::error::ErrorKind;
use clap::{CommandFactory, Parser};
use iggy::messages::poll_messages::PollingKind;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use iggy_benchmark_report::params::BenchmarkParams;
use iggy_benchmark_report::transport::BenchmarkTransport;
use integration::test_server::Transport;
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use tracing::info;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct IggyBenchArgs {
/// Benchmark kind
#[command(subcommand)]
pub benchmark_kind: BenchmarkKindCommand,
/// iggy-server executable path
#[arg(long, short='e', default_value = None, value_parser = validate_server_executable_path)]
pub server_executable_path: Option<String>,
/// Shutdown iggy-server and remove server local_data directory after the benchmark is finished
#[arg(long, short='c', default_value_t = DEFAULT_PERFORM_CLEANUP)]
pub cleanup: bool,
/// Server local data directory path, if not provided a temporary directory will be created
#[arg(long, short='s', default_value_t = DEFAULT_SERVER_SYSTEM_PATH.to_owned())]
pub server_system_path: String,
/// Server stdout visibility
#[arg(long, short='v', default_value_t = DEFAULT_SERVER_STDOUT_VISIBILITY)]
pub verbose: bool,
/// Warmup time in human readable format, e.g. "1s", "2m", "3h"
#[arg(long, short = 'w', default_value_t = IggyDuration::from_str(DEFAULT_WARMUP_TIME).unwrap())]
pub warmup_time: IggyDuration,
/// Sampling time for metrics collection. It is also used as bucket size for time series calculations.
#[arg(long, default_value_t = IggyDuration::from_str(DEFAULT_SAMPLING_TIME).unwrap(), value_parser = IggyDuration::from_str)]
pub sampling_time: IggyDuration,
/// Window size for moving average calculations in time series data
#[arg(long, default_value_t = DEFAULT_MOVING_AVERAGE_WINDOW)]
pub moving_average_window: u32,
/// Optional rate limit per individual producer/consumer in bytes per second (not aggregate).
/// Accepts human-readable formats like "50KB", "10MB", or "1GB"
#[arg(long)]
pub rate_limit: Option<IggyByteSize>,
/// Skip server start
#[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)]
pub skip_server_start: bool,
/// Output directory path for storing benchmark results
#[arg(long, short = 'o')]
pub output_dir: Option<String>,
/// Identifier for the benchmark run (defaults to hostname if not provided)
#[arg(long, value_parser = get_identifier_or_hostname)]
pub identifier: Option<String>,
/// Additional remark for the benchmark (e.g., no-cache)
#[arg(long)]
pub remark: Option<String>,
/// Extra information for future use
#[arg(long)]
pub extra_info: Option<String>,
/// Git reference (commit hash, branch or tag) used for note in the benchmark results
#[arg(long)]
pub gitref: Option<String>,
/// Git reference date used for note in the benchmark results, preferably merge date of the commit
#[arg(long)]
pub gitref_date: Option<String>,
/// Open generated charts in browser after benchmark is finished
#[arg(long, default_value_t = false)]
pub open_charts: bool,
}
fn validate_server_executable_path(v: &str) -> Result<String, String> {
if Path::new(v).exists() {
Ok(v.to_owned())
} else {
Err(format!("Provided server executable '{v}' does not exist."))
}
}
fn get_identifier_or_hostname(v: &str) -> Result<String, String> {
Ok(v.to_owned())
}
impl IggyBenchArgs {
pub fn transport_command(&self) -> &BenchmarkTransportCommand {
self.benchmark_kind.transport_command()
}
pub fn transport(&self) -> &Transport {
self.benchmark_kind.transport_command().transport()
}
pub fn server_address(&self) -> &str {
self.benchmark_kind
.inner()
.transport_command()
.server_address()
}
pub fn start_stream_id(&self) -> u32 {
self.benchmark_kind.transport_command().start_stream_id()
}
pub fn validate(&self) {
let server_address = self.server_address().parse::<SocketAddr>().unwrap();
if (self.cleanup || self.verbose) && !server_address.ip().is_loopback() {
IggyBenchArgs::command()
.error(
ErrorKind::ArgumentConflict,
format!(
"Cannot use cleanup or verbose flags with a non-loopback server address: {}",
self.server_address()
),
)
.exit();
}
if self.output_dir.is_none()
&& (self.gitref.is_some()
|| self.identifier.is_some()
|| self.remark.is_some()
|| self.extra_info.is_some()
|| self.gitref_date.is_some())
{
IggyBenchArgs::command()
.error(
ErrorKind::ArgumentConflict,
"--git-ref, --git-ref-date, --identifier, --remark, --extra-info can only be used with --output-dir",
)
.exit();
}
self.benchmark_kind.inner().validate()
}
pub fn messages_per_batch(&self) -> u32 {
self.benchmark_kind.inner().messages_per_batch()
}
pub fn message_batches(&self) -> u32 {
self.benchmark_kind.inner().message_batches()
}
pub fn message_size(&self) -> u32 {
self.benchmark_kind.inner().message_size()
}
pub fn number_of_streams(&self) -> u32 {
self.benchmark_kind.inner().number_of_streams()
}
pub fn number_of_partitions(&self) -> u32 {
self.benchmark_kind.inner().number_of_partitions()
}
pub fn consumers(&self) -> u32 {
self.benchmark_kind.inner().consumers()
}
pub fn producers(&self) -> u32 {
self.benchmark_kind.inner().producers()
}
pub fn disable_parallel_producer_streams(&self) -> bool {
self.benchmark_kind
.inner()
.disable_parallel_producer_streams()
}
pub fn disable_parallel_consumer_streams(&self) -> bool {
self.benchmark_kind
.inner()
.disable_parallel_consumer_streams()
}
pub fn polling_kind(&self) -> PollingKind {
self.benchmark_kind.inner().polling_kind()
}
pub fn number_of_consumer_groups(&self) -> u32 {
self.benchmark_kind.inner().number_of_consumer_groups()
}
pub fn warmup_time(&self) -> IggyDuration {
self.warmup_time
}
pub fn sampling_time(&self) -> IggyDuration {
self.sampling_time
}
pub fn moving_average_window(&self) -> u32 {
self.moving_average_window
}
pub fn rate_limit(&self) -> Option<IggyByteSize> {
self.rate_limit
}
pub fn output_dir(&self) -> Option<String> {
self.output_dir.clone()
}
pub fn identifier(&self) -> Option<String> {
self.identifier.clone()
}
pub fn remark(&self) -> Option<String> {
self.remark.clone()
}
pub fn extra_info(&self) -> Option<String> {
self.extra_info.clone()
}
pub fn gitref(&self) -> Option<String> {
self.gitref.clone()
}
pub fn gitref_date(&self) -> Option<String> {
self.gitref_date.clone()
}
/// Generates the output directory name based on benchmark parameters.
pub fn generate_dir_name(&self) -> String {
let benchmark_kind = match &self.benchmark_kind {
BenchmarkKindCommand::Send(_) => "send",
BenchmarkKindCommand::Poll(_) => "poll",
BenchmarkKindCommand::SendAndPoll(_) => "send_and_poll",
BenchmarkKindCommand::ConsumerGroupPoll(_) => "consumer_group_poll",
BenchmarkKindCommand::Examples => unreachable!(),
};
let transport = match self.transport_command() {
BenchmarkTransportCommand::Tcp(_) => "tcp",
BenchmarkTransportCommand::Quic(_) => "quic",
BenchmarkTransportCommand::Http(_) => "http",
};
let mut parts = vec![
benchmark_kind.to_string(),
match benchmark_kind {
"send" => self.producers().to_string(),
_ => self.consumers().to_string(),
},
self.message_size().to_string(),
self.messages_per_batch().to_string(),
self.message_batches().to_string(),
transport.to_string(),
];
if let Some(remark) = &self.remark {
parts.push(remark.to_string());
}
if let Some(gitref) = &self.gitref {
parts.push(gitref.to_string());
}
if let Some(identifier) = &self.identifier {
parts.push(identifier.to_string());
}
parts.join("_")
}
/// Generates a human-readable pretty name for the benchmark
pub fn generate_pretty_name(&self) -> String {
let consumer_or_producer = match &self.benchmark_kind {
BenchmarkKindCommand::Send(_) => format!("{} producers", self.producers()),
_ => format!("{} consumers", self.consumers()),
};
let mut name = format!(
"{}, {}B msgs, {} msgs/batch",
consumer_or_producer,
self.message_size(),
self.messages_per_batch(),
);
if let Some(remark) = &self.remark {
name.push_str(&format!(" ({})", remark));
}
name
}
}
fn recreate_bench_command(args: &IggyBenchArgs) -> String {
let mut parts = Vec::new();
// If using localhost, add env vars
let server_address = args.server_address();
let is_localhost = server_address
.split(':')
.next()
.map(|host| host == "localhost" || host == "127.0.0.1")
.unwrap_or(false);
if is_localhost {
// Get all env vars starting with IGGY_
let iggy_vars: Vec<_> = std::env::vars()
.filter(|(k, _)| k.starts_with("IGGY_"))
.collect();
if !iggy_vars.is_empty() {
info!("Found env vars starting with IGGY_: {:?}", iggy_vars);
parts.extend(iggy_vars.into_iter().map(|(k, v)| format!("{}={}", k, v)));
}
}
parts.push("iggy-bench".to_string());
// Add optional global args
if let Some(ref remark) = args.remark() {
parts.push(format!("--remark \'{}\'", remark));
}
if let Some(ref output_dir) = args.output_dir() {
parts.push(format!("--output-dir \'{}\'", output_dir));
}
// Add warmup time if not default
if args.warmup_time().to_string() != DEFAULT_WARMUP_TIME {
parts.push(format!("--warmup-time {}", args.warmup_time()));
}
// Add benchmark kind
let kind_str = match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::Send => "send",
BenchmarkKind::Poll => "poll",
BenchmarkKind::SendAndPoll => "send-and-poll",
BenchmarkKind::ConsumerGroupPoll => "consumer-group-poll",
};
parts.push(kind_str.to_string());
// Add benchmark params, skipping defaults
let producers = args.producers();
let consumers = args.consumers();
match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::Send => {
if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
parts.push(format!("--producers {}", producers));
}
}
BenchmarkKind::Poll | BenchmarkKind::ConsumerGroupPoll => {
if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
parts.push(format!("--consumers {}", consumers));
}
}
BenchmarkKind::SendAndPoll => {
if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
parts.push(format!("--producers {}", producers));
}
if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
parts.push(format!("--consumers {}", consumers));
}
}
}
let messages_per_batch = args.messages_per_batch();
if messages_per_batch != DEFAULT_MESSAGES_PER_BATCH.get() {
parts.push(format!("--messages-per-batch {}", messages_per_batch));
}
let message_batches = args.message_batches();
if message_batches != DEFAULT_MESSAGE_BATCHES.get() {
parts.push(format!("--message-batches {}", message_batches));
}
let message_size = args.message_size();
if message_size != DEFAULT_MESSAGE_SIZE.get() {
parts.push(format!("--message-size {}", message_size));
}
let streams = args.number_of_streams();
let default_streams = match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::ConsumerGroupPoll => DEFAULT_NUMBER_OF_STREAMS_CONSUMER_GROUP.get(),
_ => DEFAULT_NUMBER_OF_STREAMS.get(),
};
if streams != default_streams {
parts.push(format!("--streams {}", streams));
}
let partitions = args.number_of_partitions();
if partitions != DEFAULT_NUMBER_OF_PARTITIONS.get() {
parts.push(format!("--partitions {}", partitions));
}
// Add transport and server address, skipping if default
let transport = args.transport().to_string().to_lowercase();
parts.push(transport.clone());
let default_address = match transport.as_str() {
"tcp" => DEFAULT_TCP_SERVER_ADDRESS,
"quic" => DEFAULT_QUIC_SERVER_ADDRESS,
"http" => DEFAULT_HTTP_SERVER_ADDRESS,
_ => "",
};
if server_address != default_address {
parts.push(format!("--server-address {}", server_address));
}
// Add optional flags, skipping defaults
if args.disable_parallel_producer_streams() != DEFAULT_DISABLE_PARALLEL_PRODUCER_STREAMS {
parts.push("--disable-parallel-producer-streams".to_string());
}
if args.disable_parallel_consumer_streams() != DEFAULT_DISABLE_PARALLEL_CONSUMER_STREAMS {
parts.push("--disable-parallel-consumer-streams".to_string());
}
// Only add consumer groups for consumer group poll
let consumer_groups = args.number_of_consumer_groups();
if args.benchmark_kind.as_simple_kind() == BenchmarkKind::ConsumerGroupPoll
&& consumer_groups != DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get()
{
parts.push(format!("--consumer-groups {}", consumer_groups));
}
if let Some(rate_limit) = args.rate_limit() {
parts.push(format!("--rate-limit {}", rate_limit));
}
parts.join(" ")
}
impl From<&IggyBenchArgs> for BenchmarkParams {
fn from(args: &IggyBenchArgs) -> Self {
let benchmark_kind = args.benchmark_kind.as_simple_kind();
// Ugly conversion but let it stay here to not have dependencies on other modules
let transport = match args.transport() {
Transport::Tcp => BenchmarkTransport::Tcp,
Transport::Quic => BenchmarkTransport::Quic,
Transport::Http => BenchmarkTransport::Http,
};
let server_address = args.server_address().to_string();
let remark = args.remark();
let extra_info = args.extra_info();
let gitref = args.gitref();
let gitref_date = args.gitref_date();
let messages_per_batch = args.messages_per_batch();
let message_batches = args.message_batches();
let message_size = args.message_size();
let producers = args.producers();
let consumers = args.consumers();
let streams = args.number_of_streams();
let partitions = args.number_of_partitions();
let number_of_consumer_groups = args.number_of_consumer_groups();
let disable_parallel_consumers = args.disable_parallel_consumer_streams();
let disable_parallel_producers = args.disable_parallel_producer_streams();
let pretty_name = args.generate_pretty_name();
let bench_command = recreate_bench_command(args);
let remark_for_identifier = remark
.clone()
.unwrap_or("no_remark".to_string())
.replace(' ', "_");
let params_identifier = vec![
benchmark_kind.to_string(),
transport.to_string(),
remark_for_identifier,
messages_per_batch.to_string(),
message_batches.to_string(),
message_size.to_string(),
producers.to_string(),
consumers.to_string(),
streams.to_string(),
partitions.to_string(),
number_of_consumer_groups.to_string(),
];
let params_identifier = params_identifier.join("_");
BenchmarkParams {
benchmark_kind,
transport,
server_address,
remark,
extra_info,
gitref,
gitref_date,
messages_per_batch,
message_batches,
message_size,
producers,
consumers,
streams,
partitions,
number_of_consumer_groups,
disable_parallel_consumers,
disable_parallel_producers,
pretty_name,
bench_command,
params_identifier,
}
}
}