blob: 6e80dcbc5465230dffc5b60ef8503b2e30459a0e [file] [log] [blame]
use crate::args::simple::BenchmarkKind;
use crate::benchmark_result::{BenchmarkResult, LatencyPercentiles};
use iggy::client::{ConsumerGroupClient, MessageClient};
use iggy::clients::client::IggyClient;
use iggy::consumer::Consumer as IggyConsumer;
use iggy::error::IggyError;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::utils::duration::IggyDuration;
use integration::test_server::{login_root, ClientFactory};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tracing::{error, info, warn};
pub struct Consumer {
client_factory: Arc<dyn ClientFactory>,
consumer_id: u32,
consumer_group_id: Option<u32>,
stream_id: u32,
messages_per_batch: u32,
message_batches: u32,
warmup_time: IggyDuration,
}
impl Consumer {
pub fn new(
client_factory: Arc<dyn ClientFactory>,
consumer_id: u32,
consumer_group_id: Option<u32>,
stream_id: u32,
messages_per_batch: u32,
message_batches: u32,
warmup_time: IggyDuration,
) -> Self {
Self {
client_factory,
consumer_id,
consumer_group_id,
stream_id,
messages_per_batch,
message_batches,
warmup_time,
}
}
pub async fn run(&self) -> Result<BenchmarkResult, IggyError> {
let topic_id: u32 = 1;
let default_partition_id: u32 = 1;
let total_messages = (self.messages_per_batch * self.message_batches) as u64;
let client = self.client_factory.create_client().await;
let client = IggyClient::create(client, None, None);
login_root(&client).await;
let stream_id = self.stream_id.try_into().unwrap();
let topic_id = topic_id.try_into().unwrap();
let partition_id = if self.consumer_group_id.is_some() {
None
} else {
Some(default_partition_id)
};
let consumer = match self.consumer_group_id {
Some(consumer_group_id) => {
client
.join_consumer_group(
&stream_id,
&topic_id,
&consumer_group_id.try_into().unwrap(),
)
.await
.expect("Failed to join consumer group");
IggyConsumer::group(consumer_group_id.try_into().unwrap())
}
None => IggyConsumer::new(self.consumer_id.try_into().unwrap()),
};
let mut latencies: Vec<Duration> = Vec::with_capacity(self.message_batches as usize);
let mut total_size_bytes = 0;
let mut current_iteration: u64 = 0;
let mut received_messages = 0;
let mut topic_not_found_counter = 0;
let mut strategy = PollingStrategy::offset(0);
if self.warmup_time.get_duration() != Duration::from_millis(0) {
if let Some(cg_id) = self.consumer_group_id {
info!(
"Consumer #{}, part of consumer group #{}, → warming up for {}...",
self.consumer_id, cg_id, self.warmup_time
);
} else {
info!(
"Consumer #{} → warming up for {}...",
self.consumer_id, self.warmup_time
);
}
let warmup_end = Instant::now() + self.warmup_time.get_duration();
while Instant::now() < warmup_end {
let offset = current_iteration * self.messages_per_batch as u64;
strategy.set_value(offset);
let polled_messages = client
.poll_messages(
&stream_id,
&topic_id,
partition_id,
&consumer,
&strategy,
self.messages_per_batch,
false,
)
.await?;
if polled_messages.messages.is_empty() {
warn!(
"Consumer: {} - Messages are empty for offset: {}, retrying...",
self.consumer_id, offset
);
continue;
}
current_iteration += 1;
}
}
if let Some(cg_id) = self.consumer_group_id {
info!(
"Consumer #{}, part of consumer group #{} → polling {} messages in {} batches of {} messages...",
self.consumer_id, cg_id, total_messages, self.message_batches, self.messages_per_batch
);
} else {
info!(
"Consumer #{} → polling {} messages in {} batches of {} messages...",
self.consumer_id, total_messages, self.message_batches, self.messages_per_batch
);
}
current_iteration = 0;
let start_timestamp = Instant::now();
while received_messages < total_messages {
let offset = current_iteration * self.messages_per_batch as u64;
let latency_start = Instant::now();
let polled_messages = client
.poll_messages(
&stream_id,
&topic_id,
partition_id,
&consumer,
&PollingStrategy::offset(offset),
self.messages_per_batch,
false,
)
.await;
let latency_end = latency_start.elapsed();
if let Err(e) = polled_messages {
if let IggyError::InvalidResponse(code, _, _) = e {
if code == 2010 {
topic_not_found_counter += 1;
if topic_not_found_counter > 1000 {
return Err(e);
}
}
} else {
return Err(e);
}
error!("Unexpected error: {:?}", e);
continue;
}
let polled_messages = polled_messages.unwrap();
if polled_messages.messages.is_empty() {
warn!(
"Consumer: {} - Messages are empty for offset: {}, retrying...",
self.consumer_id, offset
);
continue;
}
if polled_messages.messages.len() != self.messages_per_batch as usize {
warn!(
"Consumer #{} → expected {} messages, but got {} messages, retrying...",
self.consumer_id,
self.messages_per_batch,
polled_messages.messages.len()
);
continue;
}
latencies.push(latency_end);
received_messages += polled_messages.messages.len() as u64;
for message in polled_messages.messages {
total_size_bytes += message.get_size_bytes() as u64;
}
current_iteration += 1;
}
let end_timestamp = Instant::now();
latencies.sort();
let last_idx = latencies.len() - 1;
let p50 = latencies[last_idx / 2];
let p90 = latencies[last_idx * 9 / 10];
let p95 = latencies[last_idx * 95 / 100];
let p99 = latencies[last_idx * 99 / 100];
let p999 = latencies[last_idx * 999 / 1000];
let latency_percentiles = LatencyPercentiles {
p50,
p90,
p95,
p99,
p999,
};
let duration = end_timestamp - start_timestamp;
let average_latency: Duration = latencies.iter().sum::<Duration>() / latencies.len() as u32;
let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6;
info!(
"Consumer #{} → polled {} messages {} batches of {} messages in {:.2} s, total size: {} bytes, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms",
self.consumer_id,
total_messages,
self.message_batches,
self.messages_per_batch,
duration.as_secs_f64(),
total_size_bytes,
average_throughput,
p50.as_secs_f64() * 1000.0,
p90.as_secs_f64() * 1000.0,
p95.as_secs_f64() * 1000.0,
p99.as_secs_f64() * 1000.0,
p999.as_secs_f64() * 1000.0,
average_latency.as_secs_f64() * 1000.0
);
Ok(BenchmarkResult {
kind: BenchmarkKind::Poll,
start_timestamp,
end_timestamp,
average_latency,
latency_percentiles,
total_size_bytes,
total_messages,
})
}
}