|  | use super::benchmark::{BenchmarkFutures, Benchmarkable}; | 
|  | use crate::actors::consumer::Consumer; | 
|  | use crate::args::common::IggyBenchArgs; | 
|  | use crate::rate_limiter::RateLimiter; | 
|  | use async_trait::async_trait; | 
|  | use iggy_benchmark_report::benchmark_kind::BenchmarkKind; | 
|  | use integration::test_server::ClientFactory; | 
|  | use std::sync::Arc; | 
|  | use tracing::info; | 
|  |  | 
|  | pub struct PollMessagesBenchmark { | 
|  | args: Arc<IggyBenchArgs>, | 
|  | client_factory: Arc<dyn ClientFactory>, | 
|  | } | 
|  |  | 
|  | impl PollMessagesBenchmark { | 
|  | pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { | 
|  | Self { | 
|  | args, | 
|  | client_factory, | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | #[async_trait] | 
|  | impl Benchmarkable for PollMessagesBenchmark { | 
|  | async fn run(&mut self) -> BenchmarkFutures { | 
|  | self.check_streams().await?; | 
|  | let clients_count = self.args.consumers(); | 
|  | info!("Creating {} client(s)...", clients_count); | 
|  | let messages_per_batch = self.args.messages_per_batch(); | 
|  | let message_batches = self.args.message_batches(); | 
|  |  | 
|  | let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(clients_count as usize)); | 
|  | for client_id in 1..=clients_count { | 
|  | let args = self.args.clone(); | 
|  | let client_factory = self.client_factory.clone(); | 
|  | info!("Executing the benchmark on client #{}...", client_id); | 
|  | let args = args.clone(); | 
|  | let start_stream_id = args.start_stream_id(); | 
|  | let client_factory = client_factory.clone(); | 
|  | let parallel_consumer_streams = !args.disable_parallel_consumer_streams(); | 
|  | let stream_id = match parallel_consumer_streams { | 
|  | true => start_stream_id + client_id, | 
|  | false => start_stream_id + 1, | 
|  | }; | 
|  | let warmup_time = args.warmup_time(); | 
|  |  | 
|  | let consumer = Consumer::new( | 
|  | client_factory, | 
|  | client_id, | 
|  | None, | 
|  | stream_id, | 
|  | messages_per_batch, | 
|  | message_batches, | 
|  | warmup_time, | 
|  | args.sampling_time(), | 
|  | args.moving_average_window(), | 
|  | self.args | 
|  | .rate_limit() | 
|  | .map(|rl| RateLimiter::new(rl.as_bytes_u64())), | 
|  | self.args.polling_kind(), | 
|  | ); | 
|  |  | 
|  | let future = Box::pin(async move { consumer.run().await }); | 
|  | futures.as_mut().unwrap().push(future); | 
|  | } | 
|  | info!("Created {} client(s).", clients_count); | 
|  | futures | 
|  | } | 
|  |  | 
|  | fn kind(&self) -> BenchmarkKind { | 
|  | BenchmarkKind::Poll | 
|  | } | 
|  |  | 
|  | fn args(&self) -> &IggyBenchArgs { | 
|  | &self.args | 
|  | } | 
|  |  | 
|  | fn client_factory(&self) -> &Arc<dyn ClientFactory> { | 
|  | &self.client_factory | 
|  | } | 
|  | } |