Add polling kind to consumer in order to fix consumer group poll bench (#1469)
This commit introduces a new `polling_kind` field to the `Consumer`
struct. The `PollingKind` enum is used to determine the polling strategy, either
`Offset` or `Next`, and automatically sets the `auto_commit` flag accordingly.
diff --git a/bench/src/actors/consumer.rs b/bench/src/actors/consumer.rs
index 30b2555..5ab1876 100644
--- a/bench/src/actors/consumer.rs
+++ b/bench/src/actors/consumer.rs
@@ -4,7 +4,7 @@
use iggy::clients::client::IggyClient;
use iggy::consumer::Consumer as IggyConsumer;
use iggy::error::IggyError;
-use iggy::messages::poll_messages::PollingStrategy;
+use iggy::messages::poll_messages::{PollingKind, PollingStrategy};
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;
use iggy::utils::sizeable::Sizeable;
@@ -27,6 +27,7 @@
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
+ polling_kind: PollingKind,
}
impl Consumer {
@@ -41,6 +42,7 @@
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
+ polling_kind: PollingKind,
) -> Self {
Self {
client_factory,
@@ -52,6 +54,7 @@
warmup_time,
sampling_time,
moving_average_window,
+ polling_kind,
}
}
@@ -92,7 +95,6 @@
let mut current_iteration: u64 = 0;
let mut received_messages = 0;
let mut topic_not_found_counter = 0;
- let mut strategy = PollingStrategy::offset(0);
if self.warmup_time.get_duration() != Duration::from_millis(0) {
if let Some(cg_id) = self.consumer_group_id {
@@ -109,7 +111,14 @@
let warmup_end = Instant::now() + self.warmup_time.get_duration();
while Instant::now() < warmup_end {
let offset = current_iteration * messages_per_batch as u64;
- strategy.set_value(offset);
+ let (strategy, auto_commit) = match self.polling_kind {
+ PollingKind::Offset => (PollingStrategy::offset(offset), false),
+ PollingKind::Next => (PollingStrategy::next(), true),
+ _ => panic!(
+ "Unsupported polling kind for benchmark: {:?}",
+ self.polling_kind
+ ),
+ };
let polled_messages = client
.poll_messages(
&stream_id,
@@ -118,7 +127,7 @@
&consumer,
&strategy,
messages_per_batch,
- false,
+ auto_commit,
)
.await?;
@@ -135,9 +144,9 @@
if let Some(cg_id) = self.consumer_group_id {
info!(
- "Consumer #{}, part of consumer group #{} → polling {} messages in {} batches of {} messages...",
- self.consumer_id, cg_id, total_messages, message_batches, messages_per_batch
- );
+ "Consumer #{}, part of consumer group #{} → polling {} messages in {} batches of {} messages...",
+ self.consumer_id, cg_id, total_messages, message_batches, messages_per_batch
+ );
} else {
info!(
"Consumer #{} → polling {} messages in {} batches of {} messages...",
@@ -153,6 +162,14 @@
while received_messages < total_messages {
let offset = current_iteration * messages_per_batch as u64;
+ let (strategy, auto_commit) = match self.polling_kind {
+ PollingKind::Offset => (PollingStrategy::offset(offset), false),
+ PollingKind::Next => (PollingStrategy::next(), true),
+ _ => panic!(
+ "Unsupported polling kind for benchmark: {:?}",
+ self.polling_kind
+ ),
+ };
let before_poll = Instant::now();
let polled_messages = client
.poll_messages(
@@ -160,9 +177,9 @@
&topic_id,
partition_id,
&consumer,
- &PollingStrategy::offset(offset),
+ &strategy,
messages_per_batch,
- false,
+ auto_commit,
)
.await;
let latency = before_poll.elapsed();
diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs
index 9a47d47..9300bfd 100644
--- a/bench/src/benchmarks/consumer_group_benchmark.rs
+++ b/bench/src/benchmarks/consumer_group_benchmark.rs
@@ -4,7 +4,10 @@
benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX},
};
use async_trait::async_trait;
-use iggy::{client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError};
+use iggy::{
+ client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError,
+ messages::poll_messages::PollingKind,
+};
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use integration::test_server::{login_root, ClientFactory};
use std::sync::Arc;
@@ -96,6 +99,7 @@
warmup_time,
self.args.sampling_time(),
self.args.moving_average_window(),
+ PollingKind::Next,
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);
diff --git a/bench/src/benchmarks/poll_benchmark.rs b/bench/src/benchmarks/poll_benchmark.rs
index 751f825..15bc74a 100644
--- a/bench/src/benchmarks/poll_benchmark.rs
+++ b/bench/src/benchmarks/poll_benchmark.rs
@@ -2,6 +2,7 @@
use crate::actors::consumer::Consumer;
use crate::args::common::IggyBenchArgs;
use async_trait::async_trait;
+use iggy::messages::poll_messages::PollingKind;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use integration::test_server::ClientFactory;
use std::sync::Arc;
@@ -55,6 +56,7 @@
warmup_time,
args.sampling_time(),
args.moving_average_window(),
+ PollingKind::Offset,
);
let future = Box::pin(async move { consumer.run().await });
diff --git a/bench/src/benchmarks/send_and_poll_benchmark.rs b/bench/src/benchmarks/send_and_poll_benchmark.rs
index 332a006..7fdec65 100644
--- a/bench/src/benchmarks/send_and_poll_benchmark.rs
+++ b/bench/src/benchmarks/send_and_poll_benchmark.rs
@@ -3,6 +3,7 @@
use crate::actors::producer::Producer;
use crate::args::common::IggyBenchArgs;
use async_trait::async_trait;
+use iggy::messages::poll_messages::PollingKind;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use integration::test_server::ClientFactory;
use std::sync::Arc;
@@ -75,6 +76,7 @@
warmup_time,
self.args.sampling_time(),
self.args.moving_average_window(),
+ PollingKind::Offset,
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);