blob: d334de05d672d8e0fa9ff3347bd74be025ca41c5 [file] [log] [blame]
use clap::builder::NonEmptyStringValueParser;
use clap::{ArgGroup, Args, Subcommand};
use iggy::error::IggyError;
use iggy::error::IggyError::InvalidFormat;
use iggy::identifier::Identifier;
use iggy::models::header::{HeaderKey, HeaderValue};
use std::str::FromStr;
#[derive(Debug, Clone, Subcommand)]
pub(crate) enum MessageAction {
/// Send messages to given topic ID and given stream ID
///
/// Stream ID can be specified as a stream name or ID
/// Topic ID can be specified as a topic name or ID
///
/// Examples
/// iggy message send 1 2 message
/// iggy message send stream 2 "long message"
/// iggy message send 1 topic message1 message2 message3
/// iggy message send stream topic "long message with spaces"
#[clap(verbatim_doc_comment, visible_alias = "s")]
Send(SendMessagesArgs),
/// Poll messages from given topic ID and given stream ID
///
/// Stream ID can be specified as a stream name or ID
/// Topic ID can be specified as a topic name or ID
///
/// Examples:
/// iggy message poll --offset 0 1 2 1
/// iggy message poll --offset 0 stream 2 1
/// iggy message poll --offset 0 1 topic 1
/// iggy message poll --offset 0 stream topic 1
#[clap(verbatim_doc_comment, visible_alias = "p")]
Poll(PollMessagesArgs),
/// Flush messages from given topic ID and given stream ID
///
/// Command is used to force a flush of unsaved_buffer to disk
/// for specific stream, topic and partition. If fsync is enabled
/// then the data is flushed to disk and fsynced, otherwise the
/// data is only flushed to disk.
///
/// Stream ID can be specified as a stream name or ID
/// Topic ID can be specified as a topic name or ID
///
/// Examples:
/// iggy message flush 1 2 1
/// iggy message flush stream 2 1
/// iggy message flush 1 topic 1
/// iggy message flush stream topic 1
#[clap(verbatim_doc_comment, visible_alias = "f")]
Flush(FlushMessagesArgs),
}
#[derive(Debug, Clone, Args)]
pub(crate) struct SendMessagesArgs {
/// ID of the stream to which the message will be sent
///
/// Stream ID can be specified as a stream name or ID
#[arg(value_parser = clap::value_parser!(Identifier))]
pub(crate) stream_id: Identifier,
/// ID of the topic to which the message will be sent
///
/// Topic ID can be specified as a topic name or ID
#[arg(value_parser = clap::value_parser!(Identifier))]
pub(crate) topic_id: Identifier,
/// ID of the partition to which the message will be sent
#[clap(short, long, group = "partitioning")]
pub(crate) partition_id: Option<u32>,
/// Messages key which will be used to partition the messages
///
/// Value of the key will be used by the server to calculate the partition ID
#[clap(verbatim_doc_comment)]
#[clap(short, long, group = "partitioning")]
pub(crate) message_key: Option<String>,
/// Messages to be sent
///
/// If no messages are provided, the command will read the messages from the
/// standard input and each line will be sent as a separate message.
/// If messages are provided, they will be sent as is. If message contains
/// spaces, it should be enclosed in quotes. Limit of the messages and size
/// of each message is defined by the used shell.
#[clap(verbatim_doc_comment)]
#[clap(group = "input_messages")]
pub(crate) messages: Option<Vec<String>>,
/// Comma separated list of key:kind:value, sent as header with the message
///
/// Headers are comma seperated key-value pairs that can be sent with the message.
/// Kind can be one of the following: raw, string, bool, int8, int16, int32, int64,
/// int128, uint8, uint16, uint32, uint64, uint128, float32, float64
#[clap(verbatim_doc_comment)]
#[clap(short = 'H', long, value_parser = parse_key_val, value_delimiter = ',')]
pub(crate) headers: Vec<(HeaderKey, HeaderValue)>,
/// Input file with messages to be sent
///
/// File should contain messages stored in binary format. If the file does
/// not exist, the command will fail. If the file is not specified, the command
/// will read the messages from the standard input and each line will
/// be sent as a separate message. If the file is specified, the messages
/// will be read from the file and sent as is. Option cannot be used
/// with the messages option (messages given as command line arguments).
#[clap(verbatim_doc_comment)]
#[clap(long, value_parser = NonEmptyStringValueParser::new(), group = "input_messages")]
pub(crate) input_file: Option<String>,
}
/// Parse Header Key, Kind and Value from the string separated by a ':'
fn parse_key_val(s: &str) -> Result<(HeaderKey, HeaderValue), IggyError> {
let lower = s.to_lowercase();
let parts = lower.split(':').collect::<Vec<_>>();
if parts.len() != 3 {
Err(InvalidFormat)?;
}
let key = HeaderKey::from_str(parts[0])?;
let value = HeaderValue::from_kind_str_and_value_str(parts[1], parts[2])?;
Ok((key, value))
}
#[derive(Debug, Clone, Args)]
#[command(group = ArgGroup::new("polling_strategy").required(true))]
pub(crate) struct PollMessagesArgs {
/// ID of the stream from which message will be polled
///
/// Stream ID can be specified as a stream name or ID
#[arg(value_parser = clap::value_parser!(Identifier))]
pub(crate) stream_id: Identifier,
/// ID of the topic from which message will be polled
///
/// Topic ID can be specified as a topic name or ID
#[arg(value_parser = clap::value_parser!(Identifier))]
pub(crate) topic_id: Identifier,
/// Partition ID from which message will be polled
#[arg(value_parser = clap::value_parser!(u32).range(1..))]
pub(crate) partition_id: u32,
/// Number of messages to poll
#[clap(verbatim_doc_comment)]
#[clap(short, long, default_value_t = 1, value_parser = clap::value_parser!(u32).range(1..))]
pub(crate) message_count: u32,
/// Auto commit offset
///
/// Flag indicates whether to commit offset on the server automatically
/// after polling the messages.
#[clap(verbatim_doc_comment)]
#[clap(short, long, default_value_t = false)]
pub(crate) auto_commit: bool,
/// Polling strategy - offset to start polling messages from
///
/// Offset must be specified as a number
#[clap(verbatim_doc_comment)]
#[clap(short, long, group = "polling_strategy")]
pub(crate) offset: Option<u64>,
/// Polling strategy - start polling from the first message in the partition
#[clap(verbatim_doc_comment)]
#[clap(short, long, default_value_t = false, group = "polling_strategy")]
pub(crate) first: bool,
/// Polling strategy - start polling from the last message in the partition
#[clap(verbatim_doc_comment)]
#[clap(short, long, default_value_t = false, group = "polling_strategy")]
pub(crate) last: bool,
/// Polling strategy - start polling from the next message
///
/// Start polling after the last polled message based
/// on the stored consumer offset
#[clap(verbatim_doc_comment)]
#[clap(short, long, default_value_t = false, group = "polling_strategy")]
pub(crate) next: bool,
/// Regular consumer which will poll messages
///
/// Consumer ID can be specified as a consumer name or ID
#[clap(verbatim_doc_comment)]
#[clap(short, long, default_value_t = Identifier::default(), value_parser = clap::value_parser!(Identifier))]
pub(crate) consumer: Identifier,
/// Include the message headers in the output
///
/// Flag indicates whether to include headers in the output
/// after polling the messages.
#[clap(verbatim_doc_comment)]
#[clap(short, long, default_value_t = false)]
pub(crate) show_headers: bool,
/// Store polled message into file in binary format
///
/// Polled messages will be stored in the file in binary format.
/// File can be used to replay the messages later. If the file
/// already exists, the messages will be appended to the file.
/// If the file does not exist, it will be created.
/// If the file is not specified, the messages will be printed
/// to the standard output.
#[clap(verbatim_doc_comment)]
#[clap(long, value_parser = NonEmptyStringValueParser::new())]
pub(crate) output_file: Option<String>,
}
#[derive(Debug, Clone, Args)]
pub(crate) struct FlushMessagesArgs {
/// ID of the stream for which messages will be flushed
///
/// Stream ID can be specified as a stream name or ID
#[arg(value_parser = clap::value_parser!(Identifier))]
pub(crate) stream_id: Identifier,
/// ID of the topic for which messages will be flushed
///
/// Topic ID can be specified as a topic name or ID
#[arg(value_parser = clap::value_parser!(Identifier))]
pub(crate) topic_id: Identifier,
/// Partition ID for which messages will be flushed
#[arg(value_parser = clap::value_parser!(u32).range(1..))]
pub(crate) partition_id: u32,
/// fsync flushed data to disk
///
/// If option is enabled then the data is flushed to disk and fsynced,
/// otherwise the data is only flushed to disk. Default is false.
#[clap(verbatim_doc_comment)]
#[clap(short, long, default_value_t = false)]
pub(crate) fsync: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn parse_key_val_should_parse_string() {
let expected_value: &str = "value";
let result = parse_key_val(&format!("key:String:{}", expected_value));
assert!(result.is_ok());
let (key, value) = result.unwrap();
assert_eq!(key, HeaderKey::from_str("key").unwrap());
assert_eq!(value.as_str().unwrap(), expected_value);
}
#[test]
fn parse_key_val_should_parse_uint8() {
let expected_value: u8 = 4;
let result = parse_key_val(&format!("key:uint8:{}", expected_value));
assert!(result.is_ok());
let (key, value) = result.unwrap();
assert_eq!(key, HeaderKey::from_str("key").unwrap());
assert_eq!(value.as_uint8().unwrap(), expected_value);
}
#[test]
fn parse_key_val_should_parse_float64() {
let expected_value: f64 = 42.0;
let result = parse_key_val(&format!("key:float64:{}", expected_value));
assert!(result.is_ok());
let (key, value) = result.unwrap();
assert_eq!(key, HeaderKey::from_str("key").unwrap());
assert_eq!(value.as_float64().unwrap(), expected_value);
}
#[test]
fn parse_key_val_should_parse_bool() {
let expected_value = true;
let result = parse_key_val(&format!("key:bool:{}", expected_value));
assert!(result.is_ok());
let (key, value) = result.unwrap();
assert_eq!(key, HeaderKey::from_str("key").unwrap());
assert_eq!(value.as_bool().unwrap(), expected_value);
}
#[test]
fn parse_key_val_to_less_params_should_return_err() {
let result = parse_key_val("key:string");
assert!(result.is_err());
}
#[test]
fn parse_key_val_wrong_kind_should_return_err() {
let result = parse_key_val("key:strin:value");
assert!(result.is_err());
}
#[test]
fn parse_key_val_no_matching_value_should_return_err() {
let result = parse_key_val("key:uint8:69.42");
assert!(result.is_err());
}
}