blob: e8ee8ebd3d63102fb7f24263dbb11b11770f9572 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use bench_report::{
benchmark_kind::BenchmarkKind, individual_metrics::BenchmarkIndividualMetrics,
numeric_parameter::BenchmarkNumericParameter, params::BenchmarkParams,
transport::BenchmarkTransport,
};
use iggy::prelude::*;
use std::{fs, path::Path, sync::Arc};
use tracing::{error, info};
use crate::args::{
common::IggyBenchArgs,
defaults::{
DEFAULT_BALANCED_NUMBER_OF_PARTITIONS, DEFAULT_BALANCED_NUMBER_OF_STREAMS,
DEFAULT_HTTP_SERVER_ADDRESS, DEFAULT_MESSAGE_BATCHES, DEFAULT_MESSAGE_SIZE,
DEFAULT_MESSAGES_PER_BATCH, DEFAULT_NUMBER_OF_CONSUMER_GROUPS, DEFAULT_NUMBER_OF_CONSUMERS,
DEFAULT_NUMBER_OF_PRODUCERS, DEFAULT_PINNED_NUMBER_OF_PARTITIONS,
DEFAULT_PINNED_NUMBER_OF_STREAMS, DEFAULT_QUIC_SERVER_ADDRESS, DEFAULT_TCP_SERVER_ADDRESS,
DEFAULT_TOTAL_MESSAGES_SIZE, DEFAULT_WARMUP_TIME,
},
};
pub use client_factory::{ClientFactory, login_root};
pub mod batch_generator;
pub mod client_factory;
pub mod cpu_name;
pub mod finish_condition;
pub mod rate_limiter;
pub fn batch_total_size_bytes(polled_messages: &PolledMessages) -> u64 {
polled_messages
.messages
.iter()
.map(|m| m.get_size_bytes().as_bytes_u64())
.sum()
}
pub fn batch_user_size_bytes(polled_messages: &PolledMessages) -> u64 {
polled_messages
.messages
.iter()
.map(|m| m.payload.len() as u64)
.sum()
}
pub async fn get_server_stats(client_factory: &Arc<dyn ClientFactory>) -> Result<Stats, IggyError> {
let client = client_factory.create_client().await;
let client = IggyClient::create(client, None, None);
client.connect().await?;
client
.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
.await?;
client.get_stats().await
}
pub async fn collect_server_logs_and_save_to_file(
client_factory: &Arc<dyn ClientFactory>,
output_dir: &Path,
) -> Result<(), IggyError> {
let client = client_factory.create_client().await;
let client = IggyClient::create(client, None, None);
client.connect().await?;
client
.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
.await?;
let snapshot = client
.snapshot(
SnapshotCompression::Deflated,
vec![SystemSnapshotType::ServerLogs],
)
.await?
.0;
fs::write(output_dir.join("server_logs.zip"), snapshot).map_err(|e| {
error!("Failed to write server logs to file: {:?}", e);
IggyError::CannotWriteToFile
})
}
fn message_batches_from_metrics(individual_metrics: &[BenchmarkIndividualMetrics]) -> u64 {
individual_metrics
.iter()
.map(|s| s.summary.total_message_batches)
.sum()
}
pub fn params_from_args_and_metrics(
args: &IggyBenchArgs,
metrics: &[BenchmarkIndividualMetrics],
) -> BenchmarkParams {
let benchmark_kind = args.benchmark_kind.as_simple_kind();
// Ugly conversion but let it stay here to have `bench-report` not depend on `iggy` or `integration`
let transport = match args.transport() {
TransportProtocol::Tcp => BenchmarkTransport::Tcp,
TransportProtocol::Quic => BenchmarkTransport::Quic,
TransportProtocol::Http => BenchmarkTransport::Http,
TransportProtocol::WebSocket => BenchmarkTransport::WebSocket,
};
let server_address = args.server_address().to_string();
let remark = args.remark();
let extra_info = args.extra_info();
let gitref = args.gitref();
let gitref_date = args.gitref_date();
let messages_per_batch = args.messages_per_batch();
let message_size = args.message_size();
let message_batches = message_batches_from_metrics(metrics);
let producers = args.producers();
let consumers = args.consumers();
let streams = args.streams();
let partitions = args.number_of_partitions();
let consumer_groups = args.number_of_consumer_groups();
let rate_limit = args.rate_limit().map(|limit| limit.to_string());
let pretty_name = args.generate_pretty_name();
let bench_command = recreate_bench_command(args);
let remark_for_identifier = remark
.clone()
.unwrap_or_else(|| "no_remark".to_string())
.replace(' ', "_");
let data_volume_identifier = args.data_volume_identifier();
let params_identifier = vec![
benchmark_kind.to_string(),
transport.to_string(),
remark_for_identifier,
messages_per_batch.to_string(),
data_volume_identifier,
message_size.to_string(),
producers.to_string(),
consumers.to_string(),
streams.to_string(),
partitions.to_string(),
consumer_groups.to_string(),
];
let params_identifier = params_identifier.join("_");
BenchmarkParams {
benchmark_kind,
transport,
server_address,
remark,
extra_info,
gitref,
gitref_date,
messages_per_batch,
message_batches,
message_size,
producers,
consumers,
streams,
partitions,
consumer_groups,
rate_limit,
pretty_name,
bench_command,
params_identifier,
}
}
fn recreate_bench_command(args: &IggyBenchArgs) -> String {
let mut parts = Vec::new();
add_environment_variables(&mut parts, args.server_address());
parts.push("iggy-bench".to_string());
add_basic_arguments(&mut parts, args);
add_benchmark_kind_arguments(&mut parts, args);
add_infrastructure_arguments(&mut parts, args);
add_output_arguments(&mut parts, args);
parts.join(" ")
}
fn add_environment_variables(parts: &mut Vec<String>, server_address: &str) {
let is_localhost = server_address
.split(':')
.next()
.is_some_and(|host| host == "localhost" || host == "127.0.0.1");
if is_localhost {
let iggy_vars: Vec<_> = std::env::vars()
.filter(|(k, _)| k.starts_with("IGGY_"))
.collect();
if !iggy_vars.is_empty() {
info!("Found env vars starting with IGGY_: {:?}", iggy_vars);
parts.extend(iggy_vars.into_iter().map(|(k, v)| format!("{k}={v}")));
}
}
}
fn add_basic_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
let messages_per_batch = args.messages_per_batch();
if messages_per_batch != BenchmarkNumericParameter::Value(DEFAULT_MESSAGES_PER_BATCH.get()) {
parts.push(format!("--messages-per-batch {messages_per_batch}"));
}
if let Some(message_batches) = args.message_batches()
&& message_batches != DEFAULT_MESSAGE_BATCHES
{
parts.push(format!("--message-batches {message_batches}"));
}
if let Some(total_messages_size) = args.total_data()
&& total_messages_size != DEFAULT_TOTAL_MESSAGES_SIZE
{
parts.push(format!("--total-messages-size {total_messages_size}"));
}
let message_size = args.message_size();
if message_size != BenchmarkNumericParameter::Value(DEFAULT_MESSAGE_SIZE.get()) {
parts.push(format!("--message-size {message_size}"));
}
if let Some(rate_limit) = args.rate_limit() {
parts.push(format!("--rate-limit \'{rate_limit}\'"));
}
if args.warmup_time().to_string() != DEFAULT_WARMUP_TIME {
parts.push(format!("--warmup-time \'{}\'", args.warmup_time()));
}
}
fn add_benchmark_kind_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
let kind_str = match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::PinnedProducer => "pinned-producer",
BenchmarkKind::PinnedConsumer => "pinned-consumer",
BenchmarkKind::PinnedProducerAndConsumer => "pinned-producer-and-consumer",
BenchmarkKind::BalancedProducer => "balanced-producer",
BenchmarkKind::BalancedConsumerGroup => "balanced-consumer-group",
BenchmarkKind::BalancedProducerAndConsumerGroup => "balanced-producer-and-consumer-group",
BenchmarkKind::EndToEndProducingConsumer => "end-to-end-producing-consumer",
BenchmarkKind::EndToEndProducingConsumerGroup => "end-to-end-producing-consumer-group",
};
parts.push(kind_str.to_string());
add_actor_arguments(parts, args);
}
fn add_actor_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
let producers = args.producers();
let consumers = args.consumers();
let number_of_consumer_groups = args.number_of_consumer_groups();
match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::PinnedProducer
| BenchmarkKind::BalancedProducer
| BenchmarkKind::EndToEndProducingConsumer => {
if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
parts.push(format!("--producers {producers}"));
}
}
BenchmarkKind::PinnedConsumer | BenchmarkKind::BalancedConsumerGroup => {
if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
parts.push(format!("--consumers {consumers}"));
}
}
BenchmarkKind::PinnedProducerAndConsumer
| BenchmarkKind::BalancedProducerAndConsumerGroup => {
if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
parts.push(format!("--producers {producers}"));
}
if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
parts.push(format!("--consumers {consumers}"));
}
}
BenchmarkKind::EndToEndProducingConsumerGroup => {
if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
parts.push(format!("--producers {producers}"));
}
if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
parts.push(format!("--consumers {consumers}"));
}
if number_of_consumer_groups != DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get() {
parts.push(format!("--consumer-groups {number_of_consumer_groups}"));
}
}
}
}
fn add_infrastructure_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
let streams = args.streams();
let default_streams = match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::BalancedProducerAndConsumerGroup
| BenchmarkKind::BalancedConsumerGroup
| BenchmarkKind::BalancedProducer => DEFAULT_BALANCED_NUMBER_OF_STREAMS.get(),
_ => DEFAULT_PINNED_NUMBER_OF_STREAMS.get(),
};
if streams != default_streams {
parts.push(format!("--streams {streams}"));
}
let partitions = args.number_of_partitions();
let default_partitions = match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::BalancedProducerAndConsumerGroup
| BenchmarkKind::BalancedConsumerGroup
| BenchmarkKind::BalancedProducer => DEFAULT_BALANCED_NUMBER_OF_PARTITIONS.get(),
_ => DEFAULT_PINNED_NUMBER_OF_PARTITIONS.get(),
};
if partitions != default_partitions {
parts.push(format!("--partitions {partitions}"));
}
let consumer_groups = args.number_of_consumer_groups();
if (args.benchmark_kind.as_simple_kind() == BenchmarkKind::BalancedConsumerGroup
|| args.benchmark_kind.as_simple_kind() == BenchmarkKind::BalancedProducerAndConsumerGroup)
&& consumer_groups != DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get()
{
parts.push(format!("--consumer-groups {consumer_groups}"));
}
if let Some(max_topic_size) = args.max_topic_size() {
parts.push(format!("--max-topic-size \'{max_topic_size}\'"));
}
let transport = args.transport().to_string().to_lowercase();
parts.push(transport.clone());
let server_address = args.server_address();
let default_address = match transport.as_str() {
"tcp" => DEFAULT_TCP_SERVER_ADDRESS,
"quic" => DEFAULT_QUIC_SERVER_ADDRESS,
"http" => DEFAULT_HTTP_SERVER_ADDRESS,
_ => "",
};
if server_address != default_address {
parts.push(format!("--server-address {server_address}"));
}
}
fn add_output_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
parts.push("output".to_string());
parts.push("-o performance_results".to_string());
if let Some(remark) = args.remark() {
parts.push(format!("--remark \'{remark}\'"));
}
}