Add polling kind to consumer in order to fix consumer group poll bench (#1469)

This commit introduces a new `polling_kind` field to the `Consumer`
struct. The `PollingKind` enum is used to determine the polling strategy, either
`Offset` or `Next`, and automatically sets the `auto_commit` flag accordingly.
diff --git a/bench/src/actors/consumer.rs b/bench/src/actors/consumer.rs
index 30b2555..5ab1876 100644
--- a/bench/src/actors/consumer.rs
+++ b/bench/src/actors/consumer.rs
@@ -4,7 +4,7 @@
 use iggy::clients::client::IggyClient;
 use iggy::consumer::Consumer as IggyConsumer;
 use iggy::error::IggyError;
-use iggy::messages::poll_messages::PollingStrategy;
+use iggy::messages::poll_messages::{PollingKind, PollingStrategy};
 use iggy::utils::byte_size::IggyByteSize;
 use iggy::utils::duration::IggyDuration;
 use iggy::utils::sizeable::Sizeable;
@@ -27,6 +27,7 @@
     warmup_time: IggyDuration,
     sampling_time: IggyDuration,
     moving_average_window: u32,
+    polling_kind: PollingKind,
 }
 
 impl Consumer {
@@ -41,6 +42,7 @@
         warmup_time: IggyDuration,
         sampling_time: IggyDuration,
         moving_average_window: u32,
+        polling_kind: PollingKind,
     ) -> Self {
         Self {
             client_factory,
@@ -52,6 +54,7 @@
             warmup_time,
             sampling_time,
             moving_average_window,
+            polling_kind,
         }
     }
 
@@ -92,7 +95,6 @@
         let mut current_iteration: u64 = 0;
         let mut received_messages = 0;
         let mut topic_not_found_counter = 0;
-        let mut strategy = PollingStrategy::offset(0);
 
         if self.warmup_time.get_duration() != Duration::from_millis(0) {
             if let Some(cg_id) = self.consumer_group_id {
@@ -109,7 +111,14 @@
             let warmup_end = Instant::now() + self.warmup_time.get_duration();
             while Instant::now() < warmup_end {
                 let offset = current_iteration * messages_per_batch as u64;
-                strategy.set_value(offset);
+                let (strategy, auto_commit) = match self.polling_kind {
+                    PollingKind::Offset => (PollingStrategy::offset(offset), false),
+                    PollingKind::Next => (PollingStrategy::next(), true),
+                    _ => panic!(
+                        "Unsupported polling kind for benchmark: {:?}",
+                        self.polling_kind
+                    ),
+                };
                 let polled_messages = client
                     .poll_messages(
                         &stream_id,
@@ -118,7 +127,7 @@
                         &consumer,
                         &strategy,
                         messages_per_batch,
-                        false,
+                        auto_commit,
                     )
                     .await?;
 
@@ -135,9 +144,9 @@
 
         if let Some(cg_id) = self.consumer_group_id {
             info!(
-            "Consumer #{}, part of consumer group #{} → polling {} messages in {} batches of {} messages...",
-            self.consumer_id, cg_id, total_messages, message_batches, messages_per_batch
-        );
+                "Consumer #{}, part of consumer group #{} → polling {} messages in {} batches of {} messages...",
+                self.consumer_id, cg_id, total_messages, message_batches, messages_per_batch
+            );
         } else {
             info!(
                 "Consumer #{} → polling {} messages in {} batches of {} messages...",
@@ -153,6 +162,14 @@
         while received_messages < total_messages {
             let offset = current_iteration * messages_per_batch as u64;
 
+            let (strategy, auto_commit) = match self.polling_kind {
+                PollingKind::Offset => (PollingStrategy::offset(offset), false),
+                PollingKind::Next => (PollingStrategy::next(), true),
+                _ => panic!(
+                    "Unsupported polling kind for benchmark: {:?}",
+                    self.polling_kind
+                ),
+            };
             let before_poll = Instant::now();
             let polled_messages = client
                 .poll_messages(
@@ -160,9 +177,9 @@
                     &topic_id,
                     partition_id,
                     &consumer,
-                    &PollingStrategy::offset(offset),
+                    &strategy,
                     messages_per_batch,
-                    false,
+                    auto_commit,
                 )
                 .await;
             let latency = before_poll.elapsed();
diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs
index 9a47d47..9300bfd 100644
--- a/bench/src/benchmarks/consumer_group_benchmark.rs
+++ b/bench/src/benchmarks/consumer_group_benchmark.rs
@@ -4,7 +4,10 @@
     benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX},
 };
 use async_trait::async_trait;
-use iggy::{client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError};
+use iggy::{
+    client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError,
+    messages::poll_messages::PollingKind,
+};
 use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
 use integration::test_server::{login_root, ClientFactory};
 use std::sync::Arc;
@@ -96,6 +99,7 @@
                 warmup_time,
                 self.args.sampling_time(),
                 self.args.moving_average_window(),
+                PollingKind::Next,
             );
             let future = Box::pin(async move { consumer.run().await });
             futures.as_mut().unwrap().push(future);
diff --git a/bench/src/benchmarks/poll_benchmark.rs b/bench/src/benchmarks/poll_benchmark.rs
index 751f825..15bc74a 100644
--- a/bench/src/benchmarks/poll_benchmark.rs
+++ b/bench/src/benchmarks/poll_benchmark.rs
@@ -2,6 +2,7 @@
 use crate::actors::consumer::Consumer;
 use crate::args::common::IggyBenchArgs;
 use async_trait::async_trait;
+use iggy::messages::poll_messages::PollingKind;
 use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
 use integration::test_server::ClientFactory;
 use std::sync::Arc;
@@ -55,6 +56,7 @@
                 warmup_time,
                 args.sampling_time(),
                 args.moving_average_window(),
+                PollingKind::Offset,
             );
 
             let future = Box::pin(async move { consumer.run().await });
diff --git a/bench/src/benchmarks/send_and_poll_benchmark.rs b/bench/src/benchmarks/send_and_poll_benchmark.rs
index 332a006..7fdec65 100644
--- a/bench/src/benchmarks/send_and_poll_benchmark.rs
+++ b/bench/src/benchmarks/send_and_poll_benchmark.rs
@@ -3,6 +3,7 @@
 use crate::actors::producer::Producer;
 use crate::args::common::IggyBenchArgs;
 use async_trait::async_trait;
+use iggy::messages::poll_messages::PollingKind;
 use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
 use integration::test_server::ClientFactory;
 use std::sync::Arc;
@@ -75,6 +76,7 @@
                 warmup_time,
                 self.args.sampling_time(),
                 self.args.moving_average_window(),
+                PollingKind::Offset,
             );
             let future = Box::pin(async move { consumer.run().await });
             futures.as_mut().unwrap().push(future);