blob: 5ab1876ec8ae97a7280d2dec89a591225dc29de3 [file] [log] [blame]
use crate::analytics::metrics::individual::from_records;
use crate::analytics::record::BenchmarkRecord;
use iggy::client::{ConsumerGroupClient, MessageClient};
use iggy::clients::client::IggyClient;
use iggy::consumer::Consumer as IggyConsumer;
use iggy::error::IggyError;
use iggy::messages::poll_messages::{PollingKind, PollingStrategy};
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;
use iggy::utils::sizeable::Sizeable;
use iggy_benchmark_report::actor_kind::ActorKind;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use iggy_benchmark_report::individual_metrics::BenchmarkIndividualMetrics;
use integration::test_server::{login_root, ClientFactory};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tracing::{error, info, warn};
pub struct Consumer {
client_factory: Arc<dyn ClientFactory>,
consumer_id: u32,
consumer_group_id: Option<u32>,
stream_id: u32,
messages_per_batch: u32,
message_batches: u32,
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
polling_kind: PollingKind,
}
impl Consumer {
#[allow(clippy::too_many_arguments)]
pub fn new(
client_factory: Arc<dyn ClientFactory>,
consumer_id: u32,
consumer_group_id: Option<u32>,
stream_id: u32,
messages_per_batch: u32,
message_batches: u32,
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
polling_kind: PollingKind,
) -> Self {
Self {
client_factory,
consumer_id,
consumer_group_id,
stream_id,
messages_per_batch,
message_batches,
warmup_time,
sampling_time,
moving_average_window,
polling_kind,
}
}
pub async fn run(&self) -> Result<BenchmarkIndividualMetrics, IggyError> {
let topic_id: u32 = 1;
let default_partition_id: u32 = 1;
let message_batches = self.message_batches as u64;
let messages_per_batch = self.messages_per_batch;
let total_messages = (self.messages_per_batch * self.message_batches) as u64;
let client = self.client_factory.create_client().await;
let client = IggyClient::create(client, None, None);
login_root(&client).await;
let stream_id = self.stream_id.try_into().unwrap();
let topic_id = topic_id.try_into().unwrap();
let partition_id = if self.consumer_group_id.is_some() {
None
} else {
Some(default_partition_id)
};
let consumer = match self.consumer_group_id {
Some(consumer_group_id) => {
client
.join_consumer_group(
&stream_id,
&topic_id,
&consumer_group_id.try_into().unwrap(),
)
.await
.expect("Failed to join consumer group");
IggyConsumer::group(consumer_group_id.try_into().unwrap())
}
None => IggyConsumer::new(self.consumer_id.try_into().unwrap()),
};
let mut latencies: Vec<Duration> = Vec::with_capacity(message_batches as usize);
let mut total_user_data_bytes = IggyByteSize::default();
let mut total_bytes = IggyByteSize::default();
let mut current_iteration: u64 = 0;
let mut received_messages = 0;
let mut topic_not_found_counter = 0;
if self.warmup_time.get_duration() != Duration::from_millis(0) {
if let Some(cg_id) = self.consumer_group_id {
info!(
"Consumer #{}, part of consumer group #{}, → warming up for {}...",
self.consumer_id, cg_id, self.warmup_time
);
} else {
info!(
"Consumer #{} → warming up for {}...",
self.consumer_id, self.warmup_time
);
}
let warmup_end = Instant::now() + self.warmup_time.get_duration();
while Instant::now() < warmup_end {
let offset = current_iteration * messages_per_batch as u64;
let (strategy, auto_commit) = match self.polling_kind {
PollingKind::Offset => (PollingStrategy::offset(offset), false),
PollingKind::Next => (PollingStrategy::next(), true),
_ => panic!(
"Unsupported polling kind for benchmark: {:?}",
self.polling_kind
),
};
let polled_messages = client
.poll_messages(
&stream_id,
&topic_id,
partition_id,
&consumer,
&strategy,
messages_per_batch,
auto_commit,
)
.await?;
if polled_messages.messages.is_empty() {
warn!(
"Consumer: {} - Messages are empty for offset: {}, retrying...",
self.consumer_id, offset
);
continue;
}
current_iteration += 1;
}
}
if let Some(cg_id) = self.consumer_group_id {
info!(
"Consumer #{}, part of consumer group #{} → polling {} messages in {} batches of {} messages...",
self.consumer_id, cg_id, total_messages, message_batches, messages_per_batch
);
} else {
info!(
"Consumer #{} → polling {} messages in {} batches of {} messages...",
self.consumer_id, total_messages, message_batches, messages_per_batch
);
}
current_iteration = 0;
let start_timestamp = Instant::now();
let mut records = Vec::with_capacity(message_batches as usize);
let mut batch_user_size_bytes = 0;
let mut batch_size_total_bytes = 0;
while received_messages < total_messages {
let offset = current_iteration * messages_per_batch as u64;
let (strategy, auto_commit) = match self.polling_kind {
PollingKind::Offset => (PollingStrategy::offset(offset), false),
PollingKind::Next => (PollingStrategy::next(), true),
_ => panic!(
"Unsupported polling kind for benchmark: {:?}",
self.polling_kind
),
};
let before_poll = Instant::now();
let polled_messages = client
.poll_messages(
&stream_id,
&topic_id,
partition_id,
&consumer,
&strategy,
messages_per_batch,
auto_commit,
)
.await;
let latency = before_poll.elapsed();
if let Err(e) = polled_messages {
if matches!(e, IggyError::TopicIdNotFound(_, _)) {
topic_not_found_counter += 1;
if topic_not_found_counter > 1000 {
return Err(e);
}
} else {
return Err(e);
}
error!("Unexpected error: {:?}", e);
continue;
}
let polled_messages = polled_messages.unwrap();
if polled_messages.messages.is_empty() {
warn!(
"Consumer: {} - Messages are empty for offset: {}, retrying...",
self.consumer_id, offset
);
continue;
}
if polled_messages.messages.len() != messages_per_batch as usize {
warn!(
"Consumer #{} → expected {} messages, but got {} messages, retrying...",
self.consumer_id,
messages_per_batch,
polled_messages.messages.len()
);
continue;
}
latencies.push(latency);
received_messages += polled_messages.messages.len() as u64;
// We don't need to calculate the size whole batch every time by iterating over it - just always use the size of the first message
if batch_user_size_bytes == 0 || batch_size_total_bytes == 0 {
batch_user_size_bytes =
polled_messages.messages[0].payload.len() as u64 * messages_per_batch as u64;
batch_size_total_bytes =
polled_messages.messages[0].get_size_bytes().as_bytes_u64()
* messages_per_batch as u64;
}
total_user_data_bytes += IggyByteSize::from(batch_user_size_bytes);
total_bytes += IggyByteSize::from(batch_size_total_bytes);
current_iteration += 1;
let message_batches = current_iteration;
records.push(BenchmarkRecord {
elapsed_time_us: start_timestamp.elapsed().as_micros() as u64,
latency_us: latency.as_micros() as u64,
messages: received_messages,
message_batches,
user_data_bytes: total_user_data_bytes.as_bytes_u64(),
total_bytes: total_bytes.as_bytes_u64(),
});
}
let metrics = from_records(
records,
BenchmarkKind::Poll,
ActorKind::Consumer,
self.consumer_id,
self.sampling_time,
self.moving_average_window,
);
Self::log_consumer_statistics(
self.consumer_id,
total_messages,
message_batches as u32,
messages_per_batch,
&metrics,
);
Ok(metrics)
}
pub fn log_consumer_statistics(
consumer_id: u32,
total_messages: u64,
message_batches: u32,
messages_per_batch: u32,
stats: &BenchmarkIndividualMetrics,
) {
info!(
"Consumer #{} polled {} messages, {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, \
p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, \
average latency: {:.2} ms, median latency: {:.2} ms",
consumer_id,
total_messages,
message_batches,
messages_per_batch,
stats.summary.total_time_secs,
IggyByteSize::from(stats.summary.total_user_data_bytes),
stats.summary.throughput_megabytes_per_second,
stats.summary.p50_latency_ms,
stats.summary.p90_latency_ms,
stats.summary.p95_latency_ms,
stats.summary.p99_latency_ms,
stats.summary.p999_latency_ms,
stats.summary.avg_latency_ms,
stats.summary.median_latency_ms
);
}
}