blob: aa04c3b16edc848e28806246aeb087aef8da5129 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use clap::Parser;
use iggy::prelude::IggyDuration;
use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
use std::str::FromStr;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(long, default_value = "10")]
pub message_batches_limit: u64,
#[arg(long, default_value = DEFAULT_ROOT_USERNAME)]
pub username: String,
#[arg(long, default_value = DEFAULT_ROOT_PASSWORD)]
pub password: String,
#[arg(long, default_value = "1ms")]
pub interval: String,
#[arg(long, default_value = "example-stream")]
pub stream_id: String,
#[arg(long, default_value = "example-topic")]
pub topic_id: String,
#[arg(long, default_value = "0")]
pub partition_id: u32,
#[arg(long, default_value = "1")]
pub partitions_count: u32,
#[arg(long, default_value = "1")]
pub compression_algorithm: u8,
#[arg(long, default_value = "1")]
pub consumer_kind: u8,
#[arg(long, default_value = "0")]
pub consumer_id: u32,
#[arg(long, default_value = "false")]
pub balanced_producer: bool,
#[arg(long, default_value = "1")]
pub messages_per_batch: u32,
#[arg(long, default_value = "0")]
pub offset: u64,
#[arg(long, default_value = "false")]
pub auto_commit: bool,
#[arg(long, default_value = "tcp")]
pub transport: String,
#[arg(long, default_value = "")]
pub encryption_key: String,
#[arg(long, default_value = "http://localhost:3000")]
pub http_api_url: String,
#[arg(long, default_value = "3")]
pub http_retries: u32,
#[arg(long, default_value = "true")]
pub tcp_reconnection_enabled: bool,
#[arg(long)]
pub tcp_reconnection_max_retries: Option<u32>,
#[arg(long, default_value = "1s")]
pub tcp_reconnection_interval: String,
#[arg(long, default_value = "5s")]
pub tcp_reconnection_reestablish_after: String,
#[arg(long, default_value = "5s")]
pub tcp_heartbeat_interval: String,
#[arg(long, default_value = "127.0.0.1:8090")]
pub tcp_server_address: String,
#[arg(long, default_value = "false")]
pub tcp_tls_enabled: bool,
#[arg(long, default_value = "")]
pub tcp_tls_domain: String,
#[arg(long, default_value = "")]
pub tcp_tls_ca_file: String,
#[arg(long, default_value = "false")]
pub tcp_nodelay: bool,
#[arg(long, default_value = "127.0.0.1:0")]
pub quic_client_address: String,
#[arg(long, default_value = "127.0.0.1:8080")]
pub quic_server_address: String,
#[arg(long, default_value = "localhost")]
pub quic_server_name: String,
#[arg(long, default_value = "true")]
pub quic_reconnection_enabled: bool,
#[arg(long)]
pub quic_reconnection_max_retries: Option<u32>,
#[arg(long, default_value = "1s")]
pub quic_reconnection_interval: String,
#[arg(long, default_value = "5s")]
pub quic_reconnection_reestablish_after: String,
#[arg(long, default_value = "10000")]
pub quic_max_concurrent_bidi_streams: u64,
#[arg(long, default_value = "100000")]
pub quic_datagram_send_buffer_size: u64,
#[arg(long, default_value = "1200")]
pub quic_initial_mtu: u16,
#[arg(long, default_value = "100000")]
pub quic_send_window: u64,
#[arg(long, default_value = "100000")]
pub quic_receive_window: u64,
#[arg(long, default_value = "1048576")]
pub quic_response_buffer_size: u64,
#[arg(long, default_value = "5000")]
pub quic_keep_alive_interval: u64,
#[arg(long, default_value = "10000")]
pub quic_max_idle_timeout: u64,
#[arg(long, default_value = "false")]
pub quic_validate_certificate: bool,
#[arg(long, default_value = "5s")]
pub quic_heartbeat_interval: String,
#[arg(long, default_value = "127.0.0.1:8092")]
pub websocket_server_address: String,
#[arg(long, default_value = "true")]
pub websocket_reconnection_enabled: bool,
#[arg(long)]
pub websocket_reconnection_max_retries: Option<u32>,
#[arg(long, default_value = "1s")]
pub websocket_reconnection_interval: String,
#[arg(long, default_value = "5s")]
pub websocket_reconnection_reestablish_after: String,
#[arg(long, default_value = "5s")]
pub websocket_heartbeat_interval: String,
#[arg(long, default_value = "false")]
pub websocket_tls_enabled: bool,
#[arg(long, default_value = "localhost")]
pub websocket_tls_domain: String,
#[arg(long, default_value = "false")]
pub websocket_tls_ca_file: Option<String>,
#[arg(long, default_value = "false")]
pub websocket_tls_validate_certificate: bool,
}
impl Args {
#[inline]
pub fn new(stream_id: String, topic_id: String) -> Self {
Self {
stream_id,
topic_id,
..Default::default()
}
}
}
impl Default for Args {
fn default() -> Self {
Self {
message_batches_limit: 10,
username: DEFAULT_ROOT_PASSWORD.to_string(),
password: DEFAULT_ROOT_USERNAME.to_string(),
interval: "1ms".to_string(),
stream_id: "example-stream".to_string(),
topic_id: "example-topic".to_string(),
partition_id: 1,
partitions_count: 1,
compression_algorithm: 1,
consumer_kind: 1,
consumer_id: 1,
balanced_producer: false,
messages_per_batch: 1,
offset: 0,
auto_commit: false,
transport: "tcp".to_string(),
encryption_key: String::new(),
http_api_url: "http://localhost:3000".to_string(),
http_retries: 3,
tcp_reconnection_enabled: true,
tcp_reconnection_max_retries: Some(3),
tcp_reconnection_interval: "1s".to_string(),
tcp_reconnection_reestablish_after: "5s".to_string(),
tcp_heartbeat_interval: "5s".to_string(),
tcp_server_address: "127.0.0.1:8090".to_string(),
tcp_tls_enabled: false,
tcp_tls_domain: "localhost".to_string(),
tcp_tls_ca_file: "".to_string(),
tcp_nodelay: true,
quic_client_address: "127.0.0.1:0".to_string(),
quic_server_address: "127.0.0.1:8080".to_string(),
quic_server_name: "localhost".to_string(),
quic_reconnection_enabled: true,
quic_reconnection_max_retries: None,
quic_reconnection_interval: "1s".to_string(),
quic_reconnection_reestablish_after: "5s".to_string(),
quic_max_concurrent_bidi_streams: 10000,
quic_datagram_send_buffer_size: 100000,
quic_initial_mtu: 1200,
quic_send_window: 100000,
quic_receive_window: 100000,
quic_response_buffer_size: 1048576,
quic_keep_alive_interval: 5000,
quic_max_idle_timeout: 10000,
quic_validate_certificate: false,
quic_heartbeat_interval: "5s".to_string(),
websocket_server_address: "127.0.0.1:8092".to_string(),
websocket_reconnection_enabled: true,
websocket_reconnection_max_retries: None,
websocket_reconnection_interval: "1s".to_string(),
websocket_reconnection_reestablish_after: "5s".to_string(),
websocket_heartbeat_interval: "5s".to_string(),
websocket_tls_enabled: false,
websocket_tls_domain: "localhost".to_string(),
websocket_tls_ca_file: None,
websocket_tls_validate_certificate: false,
}
}
}
impl Args {
/// Parse command line arguments and apply example-specific defaults if not overridden
pub fn parse_with_defaults(example_name: &str) -> Self {
let mut args = Self::parse();
// If stream_id was not explicitly provided (still has default value)
if args.stream_id == "example-stream" {
args.stream_id = Self::get_default_stream_id(example_name);
}
// If topic_id was not explicitly provided (still has default value)
if args.topic_id == "example-topic" {
args.topic_id = Self::get_default_topic_id(example_name);
}
args
}
/// Get default stream ID based on example name
fn get_default_stream_id(example_name: &str) -> String {
match example_name {
"basic-producer" | "basic-consumer" => "basic-stream",
"getting-started-producer" | "getting-started-consumer" => "getting-started",
"message-envelope-producer" | "message-envelope-consumer" => "envelope-stream",
"message-headers-producer" | "message-headers-consumer" => "headers-stream",
"multi-tenant-producer" | "multi-tenant-consumer" => "tenant-stream",
"new-sdk-producer" | "new-sdk-consumer" => "new-sdk-stream",
"sink-data-producer" => "sink-stream",
"stream-basic" => "stream-basic",
"stream-producer" | "stream-consumer" => "stream-example",
"stream-producer-config" | "stream-consumer-config" => "stream-config",
_ => "example-stream",
}
.to_string()
}
/// Get default topic ID based on example name
fn get_default_topic_id(example_name: &str) -> String {
match example_name {
"basic-producer" | "basic-consumer" => "basic-topic",
"getting-started-producer" | "getting-started-consumer" => "getting-started-topic",
"message-envelope-producer" | "message-envelope-consumer" => "orders",
"message-headers-producer" | "message-headers-consumer" => "orders",
"multi-tenant-producer" | "multi-tenant-consumer" => "events",
"new-sdk-producer" | "new-sdk-consumer" => "new-sdk-topic",
"sink-data-producer" => "users",
"stream-basic" => "stream-topic",
"stream-producer" | "stream-consumer" => "stream-topic",
"stream-producer-config" | "stream-consumer-config" => "config-topic",
_ => "example-topic",
}
.to_string()
}
pub fn to_sdk_args(&self) -> iggy::prelude::Args {
iggy::prelude::Args {
transport: self.transport.clone(),
encryption_key: self.encryption_key.clone(),
http_api_url: self.http_api_url.clone(),
http_retries: self.http_retries,
username: self.username.clone(),
password: self.password.clone(),
tcp_server_address: self.tcp_server_address.clone(),
tcp_reconnection_enabled: self.tcp_reconnection_enabled,
tcp_reconnection_max_retries: self.tcp_reconnection_max_retries,
tcp_reconnection_interval: self.tcp_reconnection_interval.clone(),
tcp_reconnection_reestablish_after: self.tcp_reconnection_reestablish_after.clone(),
tcp_heartbeat_interval: self.tcp_heartbeat_interval.clone(),
tcp_tls_enabled: self.tcp_tls_enabled,
tcp_tls_domain: self.tcp_tls_domain.clone(),
tcp_tls_ca_file: if self.tcp_tls_ca_file.is_empty() {
None
} else {
Some(self.tcp_tls_ca_file.clone())
},
tcp_nodelay: self.tcp_nodelay,
quic_client_address: self.quic_client_address.clone(),
quic_server_address: self.quic_server_address.clone(),
quic_server_name: self.quic_server_name.clone(),
quic_reconnection_enabled: self.quic_reconnection_enabled,
quic_reconnection_max_retries: self.quic_reconnection_max_retries,
quic_reconnection_reestablish_after: self.quic_reconnection_reestablish_after.clone(),
quic_reconnection_interval: self.quic_reconnection_interval.clone(),
quic_max_concurrent_bidi_streams: self.quic_max_concurrent_bidi_streams,
quic_datagram_send_buffer_size: self.quic_datagram_send_buffer_size,
quic_initial_mtu: self.quic_initial_mtu,
quic_send_window: self.quic_send_window,
quic_receive_window: self.quic_receive_window,
quic_response_buffer_size: self.quic_response_buffer_size,
quic_keep_alive_interval: self.quic_keep_alive_interval,
quic_max_idle_timeout: self.quic_max_idle_timeout,
quic_validate_certificate: self.quic_validate_certificate,
quic_heartbeat_interval: self.quic_heartbeat_interval.clone(),
websocket_server_address: self.websocket_server_address.clone(),
websocket_reconnection_enabled: self.websocket_reconnection_enabled,
websocket_reconnection_max_retries: self.websocket_reconnection_max_retries,
websocket_reconnection_interval: self.websocket_reconnection_interval.clone(),
websocket_reconnection_reestablish_after: self
.websocket_reconnection_reestablish_after
.clone(),
websocket_heartbeat_interval: self.websocket_heartbeat_interval.clone(),
websocket_tls_enabled: self.websocket_tls_enabled,
websocket_tls_domain: self.websocket_tls_domain.clone(),
websocket_tls_ca_file: self.websocket_tls_ca_file.clone(),
websocket_tls_validate_certificate: self.websocket_tls_validate_certificate,
}
}
pub fn get_interval(&self) -> Option<IggyDuration> {
match self.interval.to_lowercase().as_str() {
"" | "0" | "none" => None,
x => Some(IggyDuration::from_str(x).expect("Invalid interval format")),
}
}
}