blob: 48e29b947a94e09cb647560fc2ef069d41acdad4 [file] [log] [blame]
use super::defaults::*;
use super::examples::print_examples;
use super::props::BenchmarkKindProps;
use super::transport::BenchmarkTransportCommand;
use super::{common::IggyBenchArgs, simple::BenchmarkKind};
use clap::{error::ErrorKind, CommandFactory, Parser, Subcommand};
use core::panic;
use std::num::NonZeroU32;
#[derive(Subcommand, Debug)]
pub enum BenchmarkKindCommand {
Send(SendArgs),
Poll(PollArgs),
SendAndPoll(SendAndPollArgs),
ConsumerGroupPoll(ConsumerGroupArgs),
/// Prints examples
Examples,
}
impl BenchmarkKindCommand {
pub fn as_simple_kind(&self) -> BenchmarkKind {
match self {
BenchmarkKindCommand::Send(_) => BenchmarkKind::Send,
BenchmarkKindCommand::Poll(_) => BenchmarkKind::Poll,
BenchmarkKindCommand::SendAndPoll(_) => BenchmarkKind::SendAndPoll,
BenchmarkKindCommand::ConsumerGroupPoll(_) => BenchmarkKind::ConsumerGroupPoll,
BenchmarkKindCommand::Examples => {
print_examples();
std::process::exit(0);
}
}
}
}
impl BenchmarkKindProps for BenchmarkKindCommand {
fn message_size(&self) -> u32 {
self.inner().message_size()
}
fn messages_per_batch(&self) -> u32 {
self.inner().messages_per_batch()
}
fn message_batches(&self) -> u32 {
self.inner().message_batches()
}
fn number_of_streams(&self) -> u32 {
self.inner().number_of_streams()
}
fn number_of_partitions(&self) -> u32 {
self.inner().number_of_partitions()
}
fn consumers(&self) -> u32 {
self.inner().consumers()
}
fn producers(&self) -> u32 {
self.inner().producers()
}
fn disable_parallel_producer_streams(&self) -> bool {
self.inner().disable_parallel_producer_streams()
}
fn disable_parallel_consumer_streams(&self) -> bool {
self.inner().disable_parallel_consumer_streams()
}
fn transport_command(&self) -> &BenchmarkTransportCommand {
self.inner().transport_command()
}
fn number_of_consumer_groups(&self) -> u32 {
self.inner().number_of_consumer_groups()
}
fn inner(&self) -> &dyn BenchmarkKindProps {
match self {
BenchmarkKindCommand::Send(args) => args,
BenchmarkKindCommand::Poll(args) => args,
BenchmarkKindCommand::SendAndPoll(args) => args,
BenchmarkKindCommand::ConsumerGroupPoll(args) => args,
BenchmarkKindCommand::Examples => {
print_examples();
std::process::exit(0);
}
}
}
fn validate(&self) {
self.inner().validate()
}
}
/// Sending (writing) benchmark
#[derive(Parser, Debug)]
pub struct SendArgs {
#[command(subcommand)]
pub transport: BenchmarkTransportCommand,
/// Number of messages per batch
#[arg(long, default_value_t = DEFAULT_MESSAGES_PER_BATCH)]
pub messages_per_batch: NonZeroU32,
/// Number of message batches
#[arg(long, default_value_t = DEFAULT_MESSAGE_BATCHES)]
pub message_batches: NonZeroU32,
/// Message size in bytes
#[arg(long, default_value_t = DEFAULT_MESSAGE_SIZE)]
pub message_size: NonZeroU32,
/// Number of producers
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_PRODUCERS)]
pub producers: NonZeroU32,
/// Number of streams
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)]
pub streams: NonZeroU32,
/// Number of partitions
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)]
pub partitions: NonZeroU32,
/// Flag, disables parallel producers
#[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_PRODUCER_STREAMS)]
pub disable_parallel_producers: bool,
}
impl BenchmarkKindProps for SendArgs {
fn message_size(&self) -> u32 {
self.message_size.get()
}
fn message_batches(&self) -> u32 {
self.message_batches.get()
}
fn messages_per_batch(&self) -> u32 {
self.messages_per_batch.get()
}
fn number_of_streams(&self) -> u32 {
self.streams.get()
}
fn number_of_partitions(&self) -> u32 {
self.partitions.get()
}
fn consumers(&self) -> u32 {
panic!("")
}
fn producers(&self) -> u32 {
self.producers.get()
}
fn disable_parallel_producer_streams(&self) -> bool {
self.disable_parallel_producers
}
fn disable_parallel_consumer_streams(&self) -> bool {
false
}
fn transport_command(&self) -> &BenchmarkTransportCommand {
&self.transport
}
fn number_of_consumer_groups(&self) -> u32 {
panic!("No consumer groups in send benchmark");
}
fn validate(&self) {
let streams = self.streams.get();
let producers = self.producers.get();
let mut cmd = IggyBenchArgs::command();
if !self.disable_parallel_producers && streams < producers {
cmd.error(
ErrorKind::ArgumentConflict,
format!("Without parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).",
))
.exit();
}
}
}
/// Sending and Polling benchmark with consumer group
#[derive(Parser, Debug)]
pub struct ConsumerGroupArgs {
#[command(subcommand)]
pub transport: BenchmarkTransportCommand,
/// Number of messages per batch
#[arg(long, default_value_t = DEFAULT_MESSAGES_PER_BATCH)]
pub messages_per_batch: NonZeroU32,
/// Number of message batches
#[arg(long, default_value_t = DEFAULT_MESSAGE_BATCHES)]
pub message_batches: NonZeroU32,
/// Message size in bytes
#[arg(long, default_value_t = DEFAULT_MESSAGE_SIZE)]
pub message_size: NonZeroU32,
/// Number of streams
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS_CONSUMER_GROUP)]
pub streams: NonZeroU32,
/// Number of consumers
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_CONSUMERS)]
pub consumers: NonZeroU32,
/// Number of consumers
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_CONSUMER_GROUPS)]
pub consumer_groups: NonZeroU32,
}
impl BenchmarkKindProps for ConsumerGroupArgs {
fn message_size(&self) -> u32 {
self.message_size.get()
}
fn messages_per_batch(&self) -> u32 {
self.messages_per_batch.get()
}
fn message_batches(&self) -> u32 {
self.message_batches.get()
}
fn number_of_streams(&self) -> u32 {
self.streams.get()
}
fn number_of_partitions(&self) -> u32 {
panic!("No partitions in consumer group benchmark");
}
fn consumers(&self) -> u32 {
self.consumers.get()
}
fn producers(&self) -> u32 {
panic!("No producers in consumer group");
}
fn disable_parallel_producer_streams(&self) -> bool {
panic!("No parallel producer for consumer group");
}
fn disable_parallel_consumer_streams(&self) -> bool {
panic!("No parallel consumer for consumer group");
}
fn transport_command(&self) -> &BenchmarkTransportCommand {
&self.transport
}
fn number_of_consumer_groups(&self) -> u32 {
self.consumer_groups.get()
}
fn validate(&self) {
let cg_number = self.consumer_groups.get();
let consumers_number = self.consumers.get();
let mut cmd = IggyBenchArgs::command();
if cg_number < 1 {
cmd.error(
ErrorKind::ArgumentConflict,
"Consumer groups number must be greater than 0 for a consumer groups benchmark.",
)
.exit();
}
if consumers_number < 1 {
cmd.error(
ErrorKind::ArgumentConflict,
"Consumers number must be greater than 0 for a consumer groups benchmark.",
)
.exit();
}
}
}
/// Polling (reading) benchmark
#[derive(Parser, Debug)]
pub struct PollArgs {
#[command(subcommand)]
pub transport: BenchmarkTransportCommand,
/// Number of messages per batch
#[arg(long, default_value_t = DEFAULT_MESSAGES_PER_BATCH)]
pub messages_per_batch: NonZeroU32,
/// Number of message batches
#[arg(long, default_value_t = DEFAULT_MESSAGE_BATCHES)]
pub message_batches: NonZeroU32,
/// Message size in bytes
#[arg(long, default_value_t = DEFAULT_MESSAGE_SIZE)]
pub message_size: NonZeroU32,
/// Number of consumers
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_CONSUMERS)]
pub consumers: NonZeroU32,
/// Number of streams
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)]
pub streams: NonZeroU32,
/// Number of streams
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)]
pub partitions: NonZeroU32,
/// Flag, disables parallel consumers
#[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_CONSUMER_STREAMS)]
pub disable_parallel_consumers: bool,
}
impl BenchmarkKindProps for PollArgs {
fn message_size(&self) -> u32 {
self.message_size.get()
}
fn message_batches(&self) -> u32 {
self.message_batches.get()
}
fn messages_per_batch(&self) -> u32 {
self.messages_per_batch.get()
}
fn number_of_streams(&self) -> u32 {
self.streams.get()
}
fn number_of_partitions(&self) -> u32 {
self.partitions.get()
}
fn consumers(&self) -> u32 {
self.consumers.get()
}
fn producers(&self) -> u32 {
panic!("")
}
fn disable_parallel_producer_streams(&self) -> bool {
false
}
fn disable_parallel_consumer_streams(&self) -> bool {
self.disable_parallel_consumers
}
fn transport_command(&self) -> &BenchmarkTransportCommand {
&self.transport
}
fn number_of_consumer_groups(&self) -> u32 {
panic!("No consumer groups in poll benchmark");
}
fn validate(&self) {
let streams = self.streams.get();
let consumers = self.consumers.get();
let mut cmd = IggyBenchArgs::command();
if !self.disable_parallel_consumers && streams < consumers {
cmd.error(
ErrorKind::ArgumentConflict,
format!("Without parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."),
)
.exit();
}
}
}
/// Parallel sending and polling benchmark
#[derive(Parser, Debug)]
pub struct SendAndPollArgs {
#[command(subcommand)]
pub transport: BenchmarkTransportCommand,
/// Number of messages per batch
#[arg(long, default_value_t = DEFAULT_MESSAGES_PER_BATCH)]
pub messages_per_batch: NonZeroU32,
/// Number of message batches
#[arg(long, default_value_t = DEFAULT_MESSAGE_BATCHES)]
pub message_batches: NonZeroU32,
/// Message size in bytes
#[arg(long, default_value_t = DEFAULT_MESSAGE_SIZE)]
pub message_size: NonZeroU32,
/// Number of producers
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_PRODUCERS)]
pub producers: NonZeroU32,
/// Number of consumers
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_CONSUMERS)]
pub consumers: NonZeroU32,
/// Number of streams
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_STREAMS)]
pub streams: NonZeroU32,
/// Number of partitions
#[arg(long, default_value_t = DEFAULT_NUMBER_OF_PARTITIONS)]
pub partitions: NonZeroU32,
/// Flag, disables parallel producers
#[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_PRODUCER_STREAMS)]
pub disable_parallel_producers: bool,
/// Flag, disables parallel consumers
#[arg(long, default_value_t = DEFAULT_DISABLE_PARALLEL_CONSUMER_STREAMS)]
pub disable_parallel_consumers: bool,
}
impl BenchmarkKindProps for SendAndPollArgs {
fn message_size(&self) -> u32 {
self.message_size.get()
}
fn number_of_streams(&self) -> u32 {
self.streams.get()
}
fn number_of_partitions(&self) -> u32 {
self.partitions.get()
}
fn message_batches(&self) -> u32 {
self.message_batches.get()
}
fn messages_per_batch(&self) -> u32 {
self.messages_per_batch.get()
}
fn consumers(&self) -> u32 {
self.consumers.get()
}
fn producers(&self) -> u32 {
self.producers.get()
}
fn disable_parallel_producer_streams(&self) -> bool {
self.disable_parallel_producers
}
fn disable_parallel_consumer_streams(&self) -> bool {
self.disable_parallel_consumers
}
fn transport_command(&self) -> &BenchmarkTransportCommand {
&self.transport
}
fn number_of_consumer_groups(&self) -> u32 {
panic!("No consumer groups in send and poll benchmark");
}
fn validate(&self) {
let streams = self.streams.get();
let consumers = self.consumers.get();
let producers = self.producers.get();
let mut cmd = IggyBenchArgs::command();
if !self.disable_parallel_consumers && streams < consumers {
cmd.error(
ErrorKind::ArgumentConflict,
format!("Without parallel consumers flag, the number of streams ({streams}) must be greater than or equal to the number of consumers ({consumers})."),
)
.exit();
}
if !self.disable_parallel_producers && streams < producers {
cmd.error(
ErrorKind::ArgumentConflict,
format!("Without parallel producers flag, the number of streams ({streams}) must be greater than or equal to the number of producers ({producers}).",
))
.exit();
}
}
}