| use crate::{ |
| actors::consumer::Consumer, |
| args::common::IggyBenchArgs, |
| benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX}, |
| rate_limiter::RateLimiter, |
| }; |
| use async_trait::async_trait; |
| use iggy::{client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError}; |
| use iggy_benchmark_report::benchmark_kind::BenchmarkKind; |
| use integration::test_server::{login_root, ClientFactory}; |
| use std::sync::Arc; |
| use tracing::{error, info}; |
| |
| use super::benchmark::{BenchmarkFutures, Benchmarkable}; |
| |
| pub struct ConsumerGroupBenchmark { |
| args: Arc<IggyBenchArgs>, |
| client_factory: Arc<dyn ClientFactory>, |
| } |
| |
| impl ConsumerGroupBenchmark { |
| pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { |
| Self { |
| args, |
| client_factory, |
| } |
| } |
| |
| pub async fn init_consumer_groups(&self, consumer_groups_count: u32) -> Result<(), IggyError> { |
| let start_stream_id = self.args().start_stream_id(); |
| let topic_id: u32 = 1; |
| let client = self.client_factory().create_client().await; |
| let client = IggyClient::create(client, None, None); |
| login_root(&client).await; |
| for i in 1..=consumer_groups_count { |
| let consumer_group_id = CONSUMER_GROUP_BASE_ID + i; |
| let stream_id = start_stream_id + i; |
| let consumer_group_name = |
| format!("{}-{}", CONSUMER_GROUP_NAME_PREFIX, consumer_group_id); |
| info!( |
| "Creating test consumer group with name: {}, id: {}, stream id: {}, topic id: {}", |
| consumer_group_name, consumer_group_id, stream_id, topic_id |
| ); |
| |
| let cg = client |
| .create_consumer_group( |
| &stream_id.try_into().unwrap(), |
| &topic_id.try_into().unwrap(), |
| &consumer_group_name, |
| Some(consumer_group_id), |
| ) |
| .await; |
| if cg.is_err() { |
| let error = cg.err().unwrap(); |
| match error { |
| IggyError::ConsumerGroupIdAlreadyExists(_, _) => { |
| continue; |
| } |
| _ => error!("Error when creating consumer group : {error}"), |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl Benchmarkable for ConsumerGroupBenchmark { |
| async fn run(&mut self) -> BenchmarkFutures { |
| self.check_streams().await?; |
| let consumer_groups_count = self.args.number_of_consumer_groups(); |
| self.init_consumer_groups(consumer_groups_count) |
| .await |
| .expect("Failed to init consumer group"); |
| |
| let start_stream_id = self.args.start_stream_id(); |
| let start_consumer_group_id = CONSUMER_GROUP_BASE_ID; |
| let consumers = self.args.consumers(); |
| let messages_per_batch = self.args.messages_per_batch(); |
| let message_batches = self.args.message_batches(); |
| let warmup_time = self.args.warmup_time(); |
| let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize)); |
| |
| for consumer_id in 1..=consumers { |
| let consumer_group_id = |
| start_consumer_group_id + 1 + (consumer_id % consumer_groups_count); |
| let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count); |
| |
| let consumer = Consumer::new( |
| self.client_factory.clone(), |
| consumer_id, |
| Some(consumer_group_id), |
| stream_id, |
| messages_per_batch, |
| message_batches, |
| warmup_time, |
| self.args.sampling_time(), |
| self.args.moving_average_window(), |
| self.args |
| .rate_limit() |
| .map(|rl| RateLimiter::new(rl.as_bytes_u64())), |
| self.args.polling_kind(), |
| ); |
| let future = Box::pin(async move { consumer.run().await }); |
| futures.as_mut().unwrap().push(future); |
| } |
| info!( |
| "Starting consumer group benchmark with {} messages", |
| self.total_messages() |
| ); |
| futures |
| } |
| |
| fn kind(&self) -> BenchmarkKind { |
| BenchmarkKind::ConsumerGroupPoll |
| } |
| |
| fn args(&self) -> &IggyBenchArgs { |
| &self.args |
| } |
| |
| fn client_factory(&self) -> &Arc<dyn ClientFactory> { |
| &self.client_factory |
| } |
| } |