Add rate limiting and browser chart opening features

This commit introduces a rate limiting feature for both producers and
consumers, allowing users to specify a rate limit in bytes per second.
The rate limiter is implemented using a linger-based algorithm to ensure
smooth throughput control.

Additionally, a new command-line argument is added to open generated
charts in the browser automatically after the benchmark is completed.
This feature enhances user experience by providing immediate access to
visualized results.

The changes also include updates to the latency calculation logic,
switching from average latency to worst latency per bucket for more
accurate performance insights.
diff --git a/bench/src/actors/consumer.rs b/bench/src/actors/consumer.rs
index 5ab1876..45e7340 100644
--- a/bench/src/actors/consumer.rs
+++ b/bench/src/actors/consumer.rs
@@ -1,5 +1,6 @@
 use crate::analytics::metrics::individual::from_records;
 use crate::analytics::record::BenchmarkRecord;
+use crate::rate_limiter::RateLimiter;
 use iggy::client::{ConsumerGroupClient, MessageClient};
 use iggy::clients::client::IggyClient;
 use iggy::consumer::Consumer as IggyConsumer;
@@ -27,6 +28,7 @@
     warmup_time: IggyDuration,
     sampling_time: IggyDuration,
     moving_average_window: u32,
+    rate_limiter: Option<RateLimiter>,
     polling_kind: PollingKind,
 }
 
@@ -42,6 +44,7 @@
         warmup_time: IggyDuration,
         sampling_time: IggyDuration,
         moving_average_window: u32,
+        rate_limiter: Option<RateLimiter>,
         polling_kind: PollingKind,
     ) -> Self {
         Self {
@@ -54,6 +57,7 @@
             warmup_time,
             sampling_time,
             moving_average_window,
+            rate_limiter,
             polling_kind,
         }
     }
@@ -162,6 +166,11 @@
         while received_messages < total_messages {
             let offset = current_iteration * messages_per_batch as u64;
 
+            // Apply rate limiting if configured
+            if let Some(limiter) = &self.rate_limiter {
+                limiter.wait_and_consume(batch_size_total_bytes).await;
+            }
+
             let (strategy, auto_commit) = match self.polling_kind {
                 PollingKind::Offset => (PollingStrategy::offset(offset), false),
                 PollingKind::Next => (PollingStrategy::next(), true),
diff --git a/bench/src/actors/producer.rs b/bench/src/actors/producer.rs
index e4cc214..3b3e74b 100644
--- a/bench/src/actors/producer.rs
+++ b/bench/src/actors/producer.rs
@@ -1,5 +1,6 @@
 use crate::analytics::metrics::individual::from_records;
 use crate::analytics::record::BenchmarkRecord;
+use crate::rate_limiter::RateLimiter;
 use iggy::client::MessageClient;
 use iggy::clients::client::IggyClient;
 use iggy::error::IggyError;
@@ -28,6 +29,7 @@
     warmup_time: IggyDuration,
     sampling_time: IggyDuration,
     moving_average_window: u32,
+    rate_limiter: Option<RateLimiter>,
 }
 
 impl Producer {
@@ -43,6 +45,7 @@
         warmup_time: IggyDuration,
         sampling_time: IggyDuration,
         moving_average_window: u32,
+        rate_limiter: Option<RateLimiter>,
     ) -> Self {
         Producer {
             client_factory,
@@ -55,6 +58,7 @@
             warmup_time,
             sampling_time,
             moving_average_window,
+            rate_limiter,
         }
     }
 
@@ -117,6 +121,10 @@
         let mut latencies: Vec<Duration> = Vec::with_capacity(message_batches as usize);
         let mut records = Vec::with_capacity(message_batches as usize);
         for i in 1..=message_batches {
+            // Apply rate limiting if configured
+            if let Some(limiter) = &self.rate_limiter {
+                limiter.wait_and_consume(batch_total_bytes).await;
+            }
             let before_send = Instant::now();
             client
                 .send_messages(&stream_id, &topic_id, &partitioning, &mut messages)
diff --git a/bench/src/analytics/time_series/calculators/latency.rs b/bench/src/analytics/time_series/calculators/latency.rs
index 91c9220..6c60ab8 100644
--- a/bench/src/analytics/time_series/calculators/latency.rs
+++ b/bench/src/analytics/time_series/calculators/latency.rs
@@ -8,6 +8,54 @@
 pub struct LatencyTimeSeriesCalculator;
 
 impl TimeSeriesCalculation for LatencyTimeSeriesCalculator {
+    // fn calculate(&self, records: &[BenchmarkRecord], bucket_size: IggyDuration) -> TimeSeries {
+    //     if records.len() < 2 {
+    //         warn!("Not enough records to calculate latency");
+    //         return TimeSeries {
+    //             points: Vec::new(),
+    //             kind: TimeSeriesKind::Latency,
+    //         };
+    //     }
+
+    //     let bucket_size_us = bucket_size.as_micros();
+    //     let max_time_us = records.iter().map(|r| r.elapsed_time_us).max().unwrap();
+    //     let num_buckets = max_time_us.div_ceil(bucket_size_us);
+    //     let mut worst_latency_per_bucket = vec![0u64; num_buckets as usize];
+
+    //     for window in records.windows(2) {
+    //         let (prev, current) = (&window[0], &window[1]);
+    //         let bucket_index = current.elapsed_time_us / bucket_size_us;
+    //         if bucket_index >= num_buckets {
+    //             continue;
+    //         }
+
+    //         let delta_messages = current.messages.saturating_sub(prev.messages);
+    //         if delta_messages == 0 {
+    //             continue;
+    //         }
+
+    //         let current_latency = current.latency_us;
+    //         worst_latency_per_bucket[bucket_index as usize] =
+    //             worst_latency_per_bucket[bucket_index as usize].max(current_latency);
+    //     }
+
+    //     let points = (0..num_buckets)
+    //         .filter(|&i| worst_latency_per_bucket[i as usize] > 0)
+    //         .map(|i| {
+    //             let time_s = (i * bucket_size_us) as f64 / 1_000_000.0;
+    //             let worst_latency_us = worst_latency_per_bucket[i as usize] as f64;
+    //             let worst_latency_ms = worst_latency_us / 1000.0;
+    //             let rounded_worst_latency_ms = (worst_latency_ms * 1000.0).round() / 1000.0;
+    //             TimePoint::new(time_s, rounded_worst_latency_ms)
+    //         })
+    //         .collect();
+
+    //     TimeSeries {
+    //         points,
+    //         kind: TimeSeriesKind::Latency,
+    //     }
+    // }
+
     fn calculate(&self, records: &[BenchmarkRecord], bucket_size: IggyDuration) -> TimeSeries {
         if records.len() < 2 {
             warn!("Not enough records to calculate latency");
diff --git a/bench/src/args/common.rs b/bench/src/args/common.rs
index 71a7ed9..2986646 100644
--- a/bench/src/args/common.rs
+++ b/bench/src/args/common.rs
@@ -4,6 +4,7 @@
 use clap::error::ErrorKind;
 use clap::{CommandFactory, Parser};
 use iggy::messages::poll_messages::PollingKind;
+use iggy::utils::byte_size::IggyByteSize;
 use iggy::utils::duration::IggyDuration;
 use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
 use iggy_benchmark_report::params::BenchmarkParams;
@@ -49,6 +50,11 @@
     #[arg(long, default_value_t = DEFAULT_MOVING_AVERAGE_WINDOW)]
     pub moving_average_window: u32,
 
+    /// Optional rate limit per individual producer/consumer in bytes per second (not aggregate).
+    /// Accepts human-readable formats like "50KB", "10MB", or "1GB"
+    #[arg(long)]
+    pub rate_limit: Option<IggyByteSize>,
+
     /// Skip server start
     #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)]
     pub skip_server_start: bool,
@@ -76,6 +82,10 @@
     /// Git reference date used for note in the benchmark results, preferably merge date of the commit
     #[arg(long)]
     pub gitref_date: Option<String>,
+
+    /// Open generated charts in browser after benchmark is finished
+    #[arg(long, default_value_t = false)]
+    pub open_charts: bool,
 }
 
 fn validate_server_executable_path(v: &str) -> Result<String, String> {
@@ -202,6 +212,10 @@
         self.moving_average_window
     }
 
+    pub fn rate_limit(&self) -> Option<IggyByteSize> {
+        self.rate_limit
+    }
+
     pub fn output_dir(&self) -> Option<String> {
         self.output_dir.clone()
     }
@@ -423,6 +437,10 @@
         parts.push(format!("--consumer-groups {}", consumer_groups));
     }
 
+    if let Some(rate_limit) = args.rate_limit() {
+        parts.push(format!("--rate-limit {}", rate_limit));
+    }
+
     parts.join(" ")
 }
 
diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs
index 59376cf..f8f6c00 100644
--- a/bench/src/benchmarks/consumer_group_benchmark.rs
+++ b/bench/src/benchmarks/consumer_group_benchmark.rs
@@ -2,6 +2,7 @@
     actors::consumer::Consumer,
     args::common::IggyBenchArgs,
     benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX},
+    rate_limiter::RateLimiter,
 };
 use async_trait::async_trait;
 use iggy::{client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError};
@@ -96,6 +97,9 @@
                 warmup_time,
                 self.args.sampling_time(),
                 self.args.moving_average_window(),
+                self.args
+                    .rate_limit()
+                    .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
                 self.args.polling_kind(),
             );
             let future = Box::pin(async move { consumer.run().await });
diff --git a/bench/src/benchmarks/poll_benchmark.rs b/bench/src/benchmarks/poll_benchmark.rs
index 076e70d..0f9213f 100644
--- a/bench/src/benchmarks/poll_benchmark.rs
+++ b/bench/src/benchmarks/poll_benchmark.rs
@@ -1,6 +1,7 @@
 use super::benchmark::{BenchmarkFutures, Benchmarkable};
 use crate::actors::consumer::Consumer;
 use crate::args::common::IggyBenchArgs;
+use crate::rate_limiter::RateLimiter;
 use async_trait::async_trait;
 use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
 use integration::test_server::ClientFactory;
@@ -55,6 +56,9 @@
                 warmup_time,
                 args.sampling_time(),
                 args.moving_average_window(),
+                self.args
+                    .rate_limit()
+                    .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
                 self.args.polling_kind(),
             );
 
diff --git a/bench/src/benchmarks/send_and_poll_benchmark.rs b/bench/src/benchmarks/send_and_poll_benchmark.rs
index bdfb990..6811687 100644
--- a/bench/src/benchmarks/send_and_poll_benchmark.rs
+++ b/bench/src/benchmarks/send_and_poll_benchmark.rs
@@ -2,6 +2,7 @@
 use crate::actors::consumer::Consumer;
 use crate::actors::producer::Producer;
 use crate::args::common::IggyBenchArgs;
+use crate::rate_limiter::RateLimiter;
 use async_trait::async_trait;
 use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
 use integration::test_server::ClientFactory;
@@ -55,6 +56,9 @@
                 warmup_time,
                 self.args.sampling_time(),
                 self.args.moving_average_window(),
+                self.args
+                    .rate_limit()
+                    .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
             );
             let future = Box::pin(async move { producer.run().await });
             futures.as_mut().unwrap().push(future);
@@ -75,6 +79,9 @@
                 warmup_time,
                 self.args.sampling_time(),
                 self.args.moving_average_window(),
+                self.args
+                    .rate_limit()
+                    .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
                 self.args.polling_kind(),
             );
             let future = Box::pin(async move { consumer.run().await });
diff --git a/bench/src/benchmarks/send_benchmark.rs b/bench/src/benchmarks/send_benchmark.rs
index 6b7ffea..bc3b4e6 100644
--- a/bench/src/benchmarks/send_benchmark.rs
+++ b/bench/src/benchmarks/send_benchmark.rs
@@ -1,6 +1,7 @@
 use super::benchmark::{BenchmarkFutures, Benchmarkable};
 use crate::actors::producer::Producer;
 use crate::args::common::IggyBenchArgs;
+use crate::rate_limiter::RateLimiter;
 use async_trait::async_trait;
 use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
 use integration::test_server::ClientFactory;
@@ -60,6 +61,8 @@
                 warmup_time,
                 args.sampling_time(),
                 args.moving_average_window(),
+                args.rate_limit()
+                    .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
             );
             let future = Box::pin(async move { producer.run().await });
             futures.as_mut().unwrap().push(future);
diff --git a/bench/src/main.rs b/bench/src/main.rs
index 8caea14..9acc1fd 100644
--- a/bench/src/main.rs
+++ b/bench/src/main.rs
@@ -3,6 +3,7 @@
 mod args;
 mod benchmarks;
 mod plot;
+mod rate_limiter;
 mod runner;
 mod utils;
 
diff --git a/bench/src/plot.rs b/bench/src/plot.rs
index c265744..056ab90 100644
--- a/bench/src/plot.rs
+++ b/bench/src/plot.rs
@@ -3,6 +3,7 @@
 use iggy::utils::byte_size::IggyByteSize;
 use iggy_benchmark_report::report::BenchmarkReport;
 use std::path::Path;
+use std::process::Command;
 use std::time::Instant;
 use tracing::info;
 
@@ -43,28 +44,56 @@
     }
 }
 
+fn open_in_browser(path: &str) -> std::io::Result<()> {
+    #[cfg(target_os = "linux")]
+    {
+        Command::new("xdg-open").arg(path).spawn().map(|_| ())
+    }
+
+    #[cfg(target_os = "macos")]
+    {
+        Command::new("open").arg(path).spawn().map(|_| ())
+    }
+
+    #[cfg(target_os = "windows")]
+    {
+        Command::new("cmd")
+            .args(["/C", "start", path])
+            .spawn()
+            .map(|_| ())
+    }
+}
+
 pub fn plot_chart(
     report: &BenchmarkReport,
     output_directory: &str,
     chart_type: ChartType,
+    should_open_in_browser: bool,
 ) -> std::io::Result<()> {
     let data_processing_start = Instant::now();
     let chart = (chart_type.create_chart())(report, true); // Use dark theme by default
     let data_processing_time = data_processing_start.elapsed();
 
     let chart_render_start = Instant::now();
-    let chart_path = format!("{}/{}.html", output_directory, chart_type.name());
-    save_chart(&chart, chart_type.name(), output_directory, 1600, 1200)?;
-    let chart_render_time = chart_render_start.elapsed();
+    let file_name = chart_type.name();
+    save_chart(&chart, file_name, output_directory, 1600, 1200)?;
+
+    if should_open_in_browser {
+        let chart_path = format!("{}/{}.html", output_directory, file_name);
+        open_in_browser(&chart_path)?;
+    }
 
     let total_samples = chart_type.get_samples(report);
     let report_path = format!("{}/report.json", output_directory);
     let report_size = IggyByteSize::from(std::fs::metadata(&report_path)?.len());
 
+    let chart_render_time = chart_render_start.elapsed();
+
     info!(
-        "Generated {} plot at: {} ({} samples, report.json size: {}, data processing: {:.2?}, chart render: {:.2?})",
-        chart_type.name(),
-        chart_path,
+        "Generated {} plot at: {}/{}.html ({} samples, report.json size: {}, data processing: {:.2?}, chart render: {:.2?})",
+        file_name,
+        output_directory,
+        file_name,
         total_samples,
         report_size,
         data_processing_time,
diff --git a/bench/src/rate_limiter/mod.rs b/bench/src/rate_limiter/mod.rs
new file mode 100644
index 0000000..d76a77b
--- /dev/null
+++ b/bench/src/rate_limiter/mod.rs
@@ -0,0 +1,67 @@
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use tokio::sync::Mutex;
+use tokio::time::sleep;
+
+/// Thread-safe rate limiter using linger-based algorithm
+pub struct RateLimiter {
+    bytes_per_second: u64,
+    last_operation: Arc<Mutex<Instant>>,
+}
+
+impl RateLimiter {
+    pub fn new(bytes_per_second: u64) -> Self {
+        Self {
+            bytes_per_second,
+            last_operation: Arc::new(Mutex::new(Instant::now())),
+        }
+    }
+
+    /// Wait for the required time based on the desired throughput rate
+    pub async fn wait_and_consume(&self, bytes: u64) {
+        let now = Instant::now();
+        let mut last_op = self.last_operation.lock().await;
+
+        // Calculate time per byte in seconds
+        let time_per_byte = 1.0 / self.bytes_per_second as f64;
+
+        // Calculate target duration for these bytes
+        let target_duration = Duration::from_secs_f64(bytes as f64 * time_per_byte);
+
+        // Calculate how long it's been since last operation
+        let elapsed = now.duration_since(*last_op);
+
+        // If we need to wait longer, sleep for the remaining time
+        if elapsed < target_duration {
+            let sleep_duration = target_duration - elapsed;
+            // Update last_op before sleeping to account for operation time
+            *last_op = now + sleep_duration;
+            drop(last_op); // Release the lock before sleeping
+            sleep(sleep_duration).await;
+        } else {
+            // If we don't need to wait, just update the timestamp
+            *last_op = now;
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_rate_limiter() {
+        let limiter = RateLimiter::new(1000); // 1000 bytes per second
+        let start = Instant::now();
+
+        // Try to send 100 bytes 5 times
+        for _ in 0..5 {
+            limiter.wait_and_consume(100).await;
+        }
+
+        // Should take approximately 0.5 seconds (500ms) to send 500 bytes at 1000 bytes/sec
+        let elapsed = start.elapsed();
+        assert!(elapsed >= Duration::from_millis(450)); // Allow some wiggle room
+        assert!(elapsed <= Duration::from_millis(550));
+    }
+}
diff --git a/bench/src/runner.rs b/bench/src/runner.rs
index 72e364d..72557b0 100644
--- a/bench/src/runner.rs
+++ b/bench/src/runner.rs
@@ -28,6 +28,7 @@
 
     pub async fn run(&mut self) -> Result<(), IggyError> {
         let mut args = self.args.take().unwrap();
+        let should_open_charts = args.open_charts;
         self.test_server = start_server_if_needed(&mut args).await;
 
         let transport = args.transport();
@@ -81,11 +82,23 @@
             report.dump_to_json(&full_output_path);
 
             // Generate the plots
-            plot_chart(&report, &full_output_path, ChartType::Throughput).map_err(|e| {
+            plot_chart(
+                &report,
+                &full_output_path,
+                ChartType::Throughput,
+                should_open_charts,
+            )
+            .map_err(|e| {
                 error!("Failed to generate plots: {e}");
                 IggyError::CannotWriteToFile
             })?;
-            plot_chart(&report, &full_output_path, ChartType::Latency).map_err(|e| {
+            plot_chart(
+                &report,
+                &full_output_path,
+                ChartType::Latency,
+                should_open_charts,
+            )
+            .map_err(|e| {
                 error!("Failed to generate plots: {e}");
                 IggyError::CannotWriteToFile
             })?;