blob: 0f9213fb49df02d01f72353b22a2f21bbe1d0557 [file] [log] [blame]
use super::benchmark::{BenchmarkFutures, Benchmarkable};
use crate::actors::consumer::Consumer;
use crate::args::common::IggyBenchArgs;
use crate::rate_limiter::RateLimiter;
use async_trait::async_trait;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use integration::test_server::ClientFactory;
use std::sync::Arc;
use tracing::info;
pub struct PollMessagesBenchmark {
args: Arc<IggyBenchArgs>,
client_factory: Arc<dyn ClientFactory>,
}
impl PollMessagesBenchmark {
pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self {
Self {
args,
client_factory,
}
}
}
#[async_trait]
impl Benchmarkable for PollMessagesBenchmark {
async fn run(&mut self) -> BenchmarkFutures {
self.check_streams().await?;
let clients_count = self.args.consumers();
info!("Creating {} client(s)...", clients_count);
let messages_per_batch = self.args.messages_per_batch();
let message_batches = self.args.message_batches();
let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(clients_count as usize));
for client_id in 1..=clients_count {
let args = self.args.clone();
let client_factory = self.client_factory.clone();
info!("Executing the benchmark on client #{}...", client_id);
let args = args.clone();
let start_stream_id = args.start_stream_id();
let client_factory = client_factory.clone();
let parallel_consumer_streams = !args.disable_parallel_consumer_streams();
let stream_id = match parallel_consumer_streams {
true => start_stream_id + client_id,
false => start_stream_id + 1,
};
let warmup_time = args.warmup_time();
let consumer = Consumer::new(
client_factory,
client_id,
None,
stream_id,
messages_per_batch,
message_batches,
warmup_time,
args.sampling_time(),
args.moving_average_window(),
self.args
.rate_limit()
.map(|rl| RateLimiter::new(rl.as_bytes_u64())),
self.args.polling_kind(),
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);
}
info!("Created {} client(s).", clients_count);
futures
}
fn kind(&self) -> BenchmarkKind {
BenchmarkKind::Poll
}
fn args(&self) -> &IggyBenchArgs {
&self.args
}
fn client_factory(&self) -> &Arc<dyn ClientFactory> {
&self.client_factory
}
}