blob: 04bea44c5f7c2c491f4c91756d866451855bbd3a [file] [log] [blame]
use crate::cli::common::{
IggyCmdCommand, IggyCmdTest, IggyCmdTestCase, TestHelpCmd, TestStreamId, TestTopicId,
CLAP_INDENT, USAGE_PREFIX,
};
use assert_cmd::assert::Assert;
use async_trait::async_trait;
use iggy::client::Client;
use iggy::consumer::Consumer;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use predicates::str::diff;
use serial_test::parallel;
use std::collections::HashMap;
use std::str::from_utf8;
use xxhash_rust::xxh32::xxh32;
#[derive(Debug)]
enum PartitionSelection {
Balanced,
Id(u32),
Key(String),
}
impl PartitionSelection {
fn to_args(&self) -> Vec<String> {
match self {
PartitionSelection::Balanced => vec![],
PartitionSelection::Id(id) => vec!["-p".into(), format!("{}", id)],
PartitionSelection::Key(key) => vec!["-m".into(), key.clone()],
}
}
}
#[derive(Debug)]
enum ProvideMessages {
AsArgs,
ViaStdin,
}
struct TestMessageSendCmd {
stream_id: u32,
stream_name: String,
topic_id: u32,
topic_name: String,
partitions_count: u32,
messages: Vec<String>,
message_input: ProvideMessages,
using_stream_id: TestStreamId,
using_topic_id: TestTopicId,
partitioning: PartitionSelection,
header: Option<HashMap<HeaderKey, HeaderValue>>,
}
impl TestMessageSendCmd {
#[allow(clippy::too_many_arguments)]
fn new(
stream_id: u32,
stream_name: String,
topic_id: u32,
topic_name: String,
partitions_count: u32,
messages: Vec<String>,
message_input: ProvideMessages,
using_stream_id: TestStreamId,
using_topic_id: TestTopicId,
partitioning: PartitionSelection,
header: Option<HashMap<HeaderKey, HeaderValue>>,
) -> Self {
Self {
stream_id,
stream_name,
topic_id,
topic_name,
partitions_count,
messages,
message_input,
using_stream_id,
using_topic_id,
partitioning,
header,
}
}
fn to_args(&self) -> Vec<String> {
let mut command = self.partitioning.to_args();
command.extend(match self.using_stream_id {
TestStreamId::Numeric => vec![format!("{}", self.stream_id)],
TestStreamId::Named => vec![self.stream_name.clone()],
});
command.push(match self.using_topic_id {
TestTopicId::Numeric => format!("{}", self.topic_id),
TestTopicId::Named => self.topic_name.clone(),
});
if let Some(header) = &self.header {
command.push("--headers".to_string());
command.push(
header
.iter()
.map(|(k, v)| format!("{k}:{}:{}", v.kind, v.value_only_to_string()))
.collect::<Vec<_>>()
.join(","),
);
}
match &self.message_input {
ProvideMessages::AsArgs => {
command.extend(self.messages.clone());
}
ProvideMessages::ViaStdin => {}
}
command
}
fn calculate_partition_id_from_messages_key(&self, messages_key: &[u8]) -> u32 {
let messages_key_hash = xxh32(messages_key, 0);
let mut partition_id = messages_key_hash % self.partitions_count;
if partition_id == 0 {
partition_id = self.partitions_count;
}
partition_id
}
fn get_partition_id(&self) -> u32 {
match &self.partitioning {
PartitionSelection::Balanced => 1,
PartitionSelection::Id(id) => *id,
PartitionSelection::Key(key) => {
self.calculate_partition_id_from_messages_key(key.clone().as_bytes())
}
}
}
}
#[async_trait]
impl IggyCmdTestCase for TestMessageSendCmd {
async fn prepare_server_state(&mut self, client: &dyn Client) {
let stream = client
.create_stream(&self.stream_name, self.stream_id.into())
.await;
assert!(stream.is_ok());
let topic = client
.create_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_name,
self.partitions_count,
Default::default(),
None,
Some(self.topic_id),
IggyExpiry::NeverExpire,
MaxTopicSize::ServerDefault,
)
.await;
assert!(topic.is_ok());
}
fn get_command(&self) -> IggyCmdCommand {
IggyCmdCommand::new()
.arg("message")
.arg("send")
.args(self.to_args())
.with_env_credentials()
}
fn provide_stdin_input(&self) -> Option<Vec<String>> {
match &self.message_input {
ProvideMessages::ViaStdin => Some(self.messages.clone()),
ProvideMessages::AsArgs => None,
}
}
fn verify_command(&self, command_state: Assert) {
let stream_id = match self.using_stream_id {
TestStreamId::Numeric => format!("{}", self.stream_id),
TestStreamId::Named => self.stream_name.clone(),
};
let topic_id = match self.using_topic_id {
TestTopicId::Numeric => format!("{}", self.topic_id),
TestTopicId::Named => self.topic_name.clone(),
};
let message = format!("Executing send messages to topic with ID: {} and stream with ID: {}\nSent messages to topic with ID: {} and stream with ID: {}\n",
topic_id, stream_id, topic_id, stream_id);
command_state.success().stdout(diff(message));
}
async fn verify_server_state(&self, client: &dyn Client) {
let topic = client
.get_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
)
.await;
assert!(topic.is_ok());
let topic_details = topic.unwrap().expect("Failed to get topic");
assert_eq!(topic_details.messages_count, self.messages.len() as u64);
let polled_messages = client
.poll_messages(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
Some(self.get_partition_id()),
&Consumer::default(),
&PollingStrategy::offset(0),
self.messages.len() as u32,
false,
)
.await;
assert!(polled_messages.is_ok());
let polled_messages = polled_messages.unwrap();
assert_eq!(polled_messages.messages.len(), self.messages.len());
assert_eq!(
polled_messages
.messages
.iter()
.map(|m| from_utf8(&m.payload.clone()).unwrap().to_string())
.collect::<Vec<_>>(),
self.messages
);
if let Some(expected_header) = &self.header {
polled_messages.messages.iter().for_each(|m| {
assert!(m.headers.is_some());
assert_eq!(expected_header, m.headers.as_ref().unwrap());
})
};
let topic = client
.delete_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
)
.await;
assert!(topic.is_ok());
let stream = client
.delete_stream(&self.stream_id.try_into().unwrap())
.await;
assert!(stream.is_ok());
}
}
#[tokio::test]
#[parallel]
pub async fn should_be_successful() {
let mut iggy_cmd_test = IggyCmdTest::default();
let test_parameters = vec![
(
ProvideMessages::AsArgs,
TestStreamId::Numeric,
TestTopicId::Numeric,
PartitionSelection::Balanced,
),
(
ProvideMessages::ViaStdin,
TestStreamId::Numeric,
TestTopicId::Named,
PartitionSelection::Balanced,
),
(
ProvideMessages::AsArgs,
TestStreamId::Named,
TestTopicId::Numeric,
PartitionSelection::Balanced,
),
(
ProvideMessages::ViaStdin,
TestStreamId::Named,
TestTopicId::Named,
PartitionSelection::Balanced,
),
(
ProvideMessages::ViaStdin,
TestStreamId::Numeric,
TestTopicId::Numeric,
PartitionSelection::Id(1),
),
(
ProvideMessages::AsArgs,
TestStreamId::Named,
TestTopicId::Numeric,
PartitionSelection::Id(2),
),
(
ProvideMessages::AsArgs,
TestStreamId::Numeric,
TestTopicId::Named,
PartitionSelection::Id(3),
),
(
ProvideMessages::ViaStdin,
TestStreamId::Named,
TestTopicId::Named,
PartitionSelection::Id(4),
),
(
ProvideMessages::AsArgs,
TestStreamId::Numeric,
TestTopicId::Numeric,
PartitionSelection::Key(String::from("some-complex-key")),
),
(
ProvideMessages::ViaStdin,
TestStreamId::Named,
TestTopicId::Numeric,
PartitionSelection::Key(String::from("another random value")),
),
(
ProvideMessages::ViaStdin,
TestStreamId::Numeric,
TestTopicId::Named,
PartitionSelection::Key(String::from("some-random-key")),
),
(
ProvideMessages::AsArgs,
TestStreamId::Named,
TestTopicId::Named,
PartitionSelection::Key(String::from("just a key")),
),
];
iggy_cmd_test.setup().await;
for (message_input, using_stream_id, using_topic_id, using_partitioning) in test_parameters {
iggy_cmd_test
.execute_test(TestMessageSendCmd::new(
1,
String::from("stream"),
2,
String::from("topic"),
4,
vec![
String::from("test message1"),
String::from("test message2"),
String::from("test message3"),
],
message_input,
using_stream_id,
using_topic_id,
using_partitioning,
Some(HashMap::from([
(
HeaderKey::new("key1").unwrap(),
HeaderValue::from_kind_str_and_value_str("string", "value1").unwrap(),
),
(
HeaderKey::new("key2").unwrap(),
HeaderValue::from_kind_str_and_value_str("int32", "42").unwrap(),
),
])),
))
.await;
}
}
#[tokio::test]
#[parallel]
pub async fn should_help_match() {
let mut iggy_cmd_test = IggyCmdTest::help_message();
iggy_cmd_test
.execute_test_for_help_command(TestHelpCmd::new(
vec!["message", "send", "--help"],
format!(
r#"Send messages to given topic ID and given stream ID
Stream ID can be specified as a stream name or ID
Topic ID can be specified as a topic name or ID
Examples
iggy message send 1 2 message
iggy message send stream 2 "long message"
iggy message send 1 topic message1 message2 message3
iggy message send stream topic "long message with spaces"
{USAGE_PREFIX} message send [OPTIONS] <STREAM_ID> <TOPIC_ID> [MESSAGES]...
Arguments:
<STREAM_ID>
ID of the stream to which the message will be sent
{CLAP_INDENT}
Stream ID can be specified as a stream name or ID
<TOPIC_ID>
ID of the topic to which the message will be sent
{CLAP_INDENT}
Topic ID can be specified as a topic name or ID
[MESSAGES]...
Messages to be sent
{CLAP_INDENT}
If no messages are provided, the command will read the messages from the
standard input and each line will be sent as a separate message.
If messages are provided, they will be sent as is. If message contains
spaces, it should be enclosed in quotes. Limit of the messages and size
of each message is defined by the used shell.
Options:
-p, --partition-id <PARTITION_ID>
ID of the partition to which the message will be sent
-m, --message-key <MESSAGE_KEY>
Messages key which will be used to partition the messages
{CLAP_INDENT}
Value of the key will be used by the server to calculate the partition ID
-H, --headers <HEADERS>
Comma separated list of key:kind:value, sent as header with the message
{CLAP_INDENT}
Headers are comma seperated key-value pairs that can be sent with the message.
Kind can be one of the following: raw, string, bool, int8, int16, int32, int64,
int128, uint8, uint16, uint32, uint64, uint128, float32, float64
-h, --help
Print help (see a summary with '-h')
"#,
),
))
.await;
}
#[tokio::test]
#[parallel]
pub async fn should_short_help_match() {
let mut iggy_cmd_test = IggyCmdTest::default();
iggy_cmd_test
.execute_test_for_help_command(TestHelpCmd::new(
vec!["message", "send", "-h"],
format!(
r#"Send messages to given topic ID and given stream ID
{USAGE_PREFIX} message send [OPTIONS] <STREAM_ID> <TOPIC_ID> [MESSAGES]...
Arguments:
<STREAM_ID> ID of the stream to which the message will be sent
<TOPIC_ID> ID of the topic to which the message will be sent
[MESSAGES]... Messages to be sent
Options:
-p, --partition-id <PARTITION_ID> ID of the partition to which the message will be sent
-m, --message-key <MESSAGE_KEY> Messages key which will be used to partition the messages
-H, --headers <HEADERS> Comma separated list of key:kind:value, sent as header with the message
-h, --help Print help (see more with '--help')
"#,
),
))
.await;
}