blob: 8072b4ff3fbbf6cec0375a9f317bc35293329e9e [file]
use crate::args::simple::BenchmarkKind;
use crate::benchmark_result::BenchmarkResult;
use iggy::client::MessageClient;
use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
use iggy::consumer::Consumer as IggyConsumer;
use iggy::error::IggyError;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::utils::duration::IggyDuration;
use integration::test_server::{login_root, ClientFactory};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tracing::{error, info, warn};
pub struct Consumer {
client_factory: Arc<dyn ClientFactory>,
consumer_id: u32,
stream_id: u32,
messages_per_batch: u32,
message_batches: u32,
warmup_time: IggyDuration,
}
impl Consumer {
pub fn new(
client_factory: Arc<dyn ClientFactory>,
consumer_id: u32,
stream_id: u32,
messages_per_batch: u32,
message_batches: u32,
warmup_time: IggyDuration,
) -> Self {
Self {
client_factory,
consumer_id,
stream_id,
messages_per_batch,
message_batches,
warmup_time,
}
}
pub async fn run(&self) -> Result<BenchmarkResult, IggyError> {
let topic_id: u32 = 1;
let partition_id: u32 = 1;
let total_messages = (self.messages_per_batch * self.message_batches) as u64;
let client = self.client_factory.create_client().await;
let client = IggyClient::create(
client,
IggyClientBackgroundConfig::default(),
None,
None,
None,
);
login_root(&client).await;
let consumer = IggyConsumer::new(self.consumer_id.try_into().unwrap());
let stream_id = self.stream_id.try_into().unwrap();
let topic_id = topic_id.try_into().unwrap();
let partition_id = Some(partition_id);
let mut latencies: Vec<Duration> = Vec::with_capacity(self.message_batches as usize);
let mut total_size_bytes = 0;
let mut current_iteration: u64 = 0;
let mut received_messages = 0;
let mut topic_not_found_counter = 0;
let mut strategy = PollingStrategy::offset(0);
if self.warmup_time.get_duration() != Duration::from_millis(0) {
info!(
"Consumer #{} → warming up for {}...",
self.consumer_id, self.warmup_time
);
let warmup_end = Instant::now() + self.warmup_time.get_duration();
while Instant::now() < warmup_end {
let offset = current_iteration * self.messages_per_batch as u64;
strategy.set_value(offset);
client
.poll_messages(
&stream_id,
&topic_id,
partition_id,
&consumer,
&strategy,
self.messages_per_batch,
false,
)
.await?;
current_iteration += 1;
}
}
info!(
"Consumer #{} → polling {} messages in {} batches of {} messages...",
self.consumer_id, total_messages, self.message_batches, self.messages_per_batch
);
current_iteration = 0;
let start_timestamp = Instant::now();
while received_messages < total_messages {
let offset = current_iteration * self.messages_per_batch as u64;
let latency_start = Instant::now();
let polled_messages = client
.poll_messages(
&stream_id,
&topic_id,
partition_id,
&consumer,
&PollingStrategy::offset(offset),
self.messages_per_batch,
false,
)
.await;
let latency_end = latency_start.elapsed();
if let Err(e) = polled_messages {
if let IggyError::InvalidResponse(code, _, _) = e {
if code == 2010 {
topic_not_found_counter += 1;
if topic_not_found_counter > 1000 {
return Err(e);
}
}
} else {
return Err(e);
}
error!("Unexpected error: {:?}", e);
continue;
}
let polled_messages = polled_messages.unwrap();
if polled_messages.messages.is_empty() {
warn!("Messages are empty for offset: {}, retrying...", offset);
continue;
}
if polled_messages.messages.len() != self.messages_per_batch as usize {
warn!(
"Consumer #{} → expected {} messages, but got {} messages, retrying...",
self.consumer_id,
self.messages_per_batch,
polled_messages.messages.len()
);
continue;
}
latencies.push(latency_end);
received_messages += polled_messages.messages.len() as u64;
for message in polled_messages.messages {
total_size_bytes += message.get_size_bytes() as u64;
}
current_iteration += 1;
}
let end_timestamp = Instant::now();
let duration = end_timestamp - start_timestamp;
let average_latency: Duration = latencies.iter().sum::<Duration>() / latencies.len() as u32;
let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6;
info!(
"Consumer #{} → polled {} messages ({} batches of {} messages in {} ms, total size: {} bytes, average latency: {:.2} ms, average throughput: {:.2} MB/s",
self.consumer_id,
total_messages,
self.message_batches,
self.messages_per_batch,
duration.as_millis(),
total_size_bytes,
average_latency.as_millis(),
average_throughput
);
Ok(BenchmarkResult {
kind: BenchmarkKind::Poll,
start_timestamp,
end_timestamp,
average_latency,
total_size_bytes,
total_messages,
})
}
}