blob: e5b1e149679308bd17d377b914de4ba8c87891aa [file] [log] [blame]
use crate::args::simple::BenchmarkKind;
use crate::benchmark_result::BenchmarkResult;
use iggy::client::MessageClient;
use iggy::clients::client::{IggyClient, IggyClientConfig};
use iggy::error::IggyError;
use iggy::identifier::Identifier;
use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use integration::test_server::{login_root, ClientFactory};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tracing::info;
pub struct Producer {
client_factory: Arc<dyn ClientFactory>,
producer_id: u32,
stream_id: u32,
messages_per_batch: u32,
message_batches: u32,
message_size: u32,
}
impl Producer {
pub fn new(
client_factory: Arc<dyn ClientFactory>,
producer_id: u32,
stream_id: u32,
messages_per_batch: u32,
message_batches: u32,
message_size: u32,
) -> Self {
Producer {
client_factory,
producer_id,
stream_id,
messages_per_batch,
message_batches,
message_size,
}
}
pub async fn run(&self) -> Result<BenchmarkResult, IggyError> {
let topic_id: u32 = 1;
let partition_id: u32 = 1;
let total_messages = (self.messages_per_batch * self.message_batches) as u64;
let client = self.client_factory.create_client().await;
let client = IggyClient::create(client, IggyClientConfig::default(), None, None, None);
login_root(&client).await;
info!(
"Producer #{} → preparing the test messages...",
self.producer_id
);
let payload = Self::create_payload(self.message_size);
let mut messages = Vec::with_capacity(self.messages_per_batch as usize);
for _ in 0..self.messages_per_batch {
let message = AppendableMessage::from_str(&payload).unwrap();
messages.push(message);
}
let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(self.stream_id)?,
topic_id: Identifier::numeric(topic_id)?,
partitioning: Partitioning::partition_id(partition_id),
messages,
};
info!(
"Producer #{} → sending {} messages in {} batches of {} messages...",
self.producer_id, total_messages, self.message_batches, self.messages_per_batch
);
let start_timestamp = Instant::now();
let mut latencies: Vec<Duration> = Vec::with_capacity(self.message_batches as usize);
for _ in 0..self.message_batches {
let latency_start = Instant::now();
client.send_messages(&mut send_messages).await?;
let latency_end = latency_start.elapsed();
latencies.push(latency_end);
}
let end_timestamp = Instant::now();
let duration = end_timestamp - start_timestamp;
let average_latency: Duration = latencies.iter().sum::<Duration>() / latencies.len() as u32;
let total_size_bytes = total_messages * self.message_size as u64;
let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6;
info!(
"Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {} bytes, average latency: {:.2} ms, average throughput: {:.2} MB/s",
self.producer_id,
total_messages,
self.message_batches,
self.messages_per_batch,
duration.as_secs_f64(),
total_size_bytes,
average_latency.as_secs_f64() * 1000.0,
average_throughput
);
Ok(BenchmarkResult {
kind: BenchmarkKind::Send,
start_timestamp,
end_timestamp,
average_latency,
total_size_bytes,
total_messages,
})
}
fn create_payload(size: u32) -> String {
let mut payload = String::with_capacity(size as usize);
for i in 0..size {
let char = (i % 26 + 97) as u8 as char;
payload.push(char);
}
payload
}
}