blob: 2b85755f2d578c01317fef22ae1729ac10c924e5 [file] [log] [blame]
use crate::cli::common::{
IggyCmdCommand, IggyCmdTest, IggyCmdTestCase, TestHelpCmd, TestStreamId, CLAP_INDENT,
USAGE_PREFIX,
};
use assert_cmd::assert::Assert;
use async_trait::async_trait;
use humantime::Duration as HumanDuration;
use iggy::client::Client;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use predicates::str::diff;
use serial_test::parallel;
use std::time::Duration;
struct TestTopicCreateCmd {
stream_id: u32,
stream_name: String,
topic_id: Option<u32>,
topic_name: String,
partitions_count: u32,
compression_algorithm: CompressionAlgorithm,
message_expiry: Option<Vec<String>>,
max_topic_size: MaxTopicSize,
replication_factor: u8,
using_identifier: TestStreamId,
}
impl TestTopicCreateCmd {
#[allow(clippy::too_many_arguments)]
fn new(
stream_id: u32,
stream_name: String,
topic_id: Option<u32>,
topic_name: String,
partitions_count: u32,
compression_algorithm: CompressionAlgorithm,
message_expiry: Option<Vec<String>>,
max_topic_size: MaxTopicSize,
replication_factor: u8,
using_identifier: TestStreamId,
) -> Self {
Self {
stream_id,
stream_name,
topic_id,
topic_name,
partitions_count,
compression_algorithm,
message_expiry,
max_topic_size,
replication_factor,
using_identifier,
}
}
fn to_args(&self) -> Vec<String> {
let mut args = Vec::new();
if let Some(topic_id) = self.topic_id {
args.push("-t".to_string());
args.push(format!("{}", topic_id));
};
match self.using_identifier {
TestStreamId::Numeric => args.extend(vec![format!("{}", self.stream_id)]),
TestStreamId::Named => args.extend(vec![self.stream_name.clone()]),
};
args.push(self.topic_name.clone());
args.push(format!("{}", self.partitions_count));
args.push(format!("{}", self.compression_algorithm));
args.extend(self.message_expiry.clone().unwrap_or_default());
args
}
}
#[async_trait]
impl IggyCmdTestCase for TestTopicCreateCmd {
async fn prepare_server_state(&mut self, client: &dyn Client) {
let stream = client
.create_stream(&self.stream_name, Some(self.stream_id))
.await;
assert!(stream.is_ok());
}
fn get_command(&self) -> IggyCmdCommand {
IggyCmdCommand::new()
.arg("topic")
.arg("create")
.args(self.to_args())
.with_env_credentials()
}
fn verify_command(&self, command_state: Assert) {
let stream_id = match self.using_identifier {
TestStreamId::Numeric => format!("{}", self.stream_id),
TestStreamId::Named => self.stream_name.clone(),
};
let partitions_count = self.partitions_count;
let topic_id = match self.topic_id {
Some(topic_id) => format!("ID: {}", topic_id),
None => "ID auto incremented".to_string(),
};
let topic_name = &self.topic_name;
let compression_algorithm = &self.compression_algorithm;
let message_expiry = (match &self.message_expiry {
Some(value) => value.join(" "),
None => IggyExpiry::NeverExpire.to_string(),
})
.to_string();
let max_topic_size = self.max_topic_size.to_string();
let replication_factor = self.replication_factor;
let message = format!(
"Executing create topic with name: {topic_name}, {topic_id}, message expiry: {message_expiry}, compression algorithm: {compression_algorithm}, \
max topic size: {max_topic_size}, replication factor: {replication_factor} in stream with ID: {stream_id}\n\
Topic with name: {topic_name}, {topic_id}, partitions count: {partitions_count}, compression algorithm: {compression_algorithm}, message expiry: {message_expiry}, \
max topic size: {max_topic_size}, replication factor: {replication_factor} created in stream with ID: {stream_id}\n",
);
command_state.success().stdout(diff(message));
}
async fn verify_server_state(&self, client: &dyn Client) {
let topic = client
.get_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_name.clone().try_into().unwrap(),
)
.await;
assert!(topic.is_ok());
let topic_details = topic.unwrap().expect("Topic not found");
assert_eq!(topic_details.name, self.topic_name);
assert_eq!(topic_details.partitions_count, self.partitions_count);
assert_eq!(topic_details.messages_count, 0);
if let Some(topic_id) = self.topic_id {
assert_eq!(topic_details.id, topic_id);
}
if self.message_expiry.is_some() {
let duration: Duration = *self
.message_expiry
.clone()
.unwrap()
.join(" ")
.parse::<HumanDuration>()
.unwrap();
assert_eq!(
topic_details.message_expiry,
IggyExpiry::ExpireDuration(duration.into())
);
}
let delete_topic = client
.delete_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_name.clone().try_into().unwrap(),
)
.await;
assert!(delete_topic.is_ok());
let delete_stream = client
.delete_stream(&self.stream_id.try_into().unwrap())
.await;
assert!(delete_stream.is_ok());
}
}
#[tokio::test]
#[parallel]
pub async fn should_be_successful() {
let mut iggy_cmd_test = IggyCmdTest::default();
iggy_cmd_test.setup().await;
iggy_cmd_test
.execute_test(TestTopicCreateCmd::new(
1,
String::from("main"),
None,
String::from("sync"),
1,
Default::default(),
None,
MaxTopicSize::ServerDefault,
1,
TestStreamId::Numeric,
))
.await;
iggy_cmd_test
.execute_test(TestTopicCreateCmd::new(
2,
String::from("testing"),
Some(2),
String::from("topic"),
5,
Default::default(),
None,
MaxTopicSize::ServerDefault,
1,
TestStreamId::Named,
))
.await;
iggy_cmd_test
.execute_test(TestTopicCreateCmd::new(
3,
String::from("prod"),
None,
String::from("named"),
1,
Default::default(),
Some(vec![String::from("3days"), String::from("5s")]),
MaxTopicSize::ServerDefault,
1,
TestStreamId::Named,
))
.await;
iggy_cmd_test
.execute_test(TestTopicCreateCmd::new(
4,
String::from("big"),
Some(1),
String::from("probe"),
2,
Default::default(),
Some(vec![
String::from("1day"),
String::from("1h"),
String::from("1m"),
String::from("1s"),
]),
MaxTopicSize::ServerDefault,
1,
TestStreamId::Numeric,
))
.await;
}
#[tokio::test]
#[parallel]
pub async fn should_help_match() {
let mut iggy_cmd_test = IggyCmdTest::help_message();
iggy_cmd_test
.execute_test_for_help_command(TestHelpCmd::new(
vec!["topic", "create", "--help"],
format!(
r#"Create topic with given name, number of partitions, compression algorithm and expiry time for given stream ID
Stream ID can be specified as a stream name or ID
If topic ID is not provided then the server will automatically assign it
Examples
iggy topic create 1 sensor1 2 gzip 15days
iggy topic create prod sensor2 2 none
iggy topic create test debugs 2 gzip 1day 1hour 1min 1sec
iggy topic create -t 3 1 sensor3 2 none unlimited
{USAGE_PREFIX} topic create [OPTIONS] <STREAM_ID> <NAME> <PARTITIONS_COUNT> <COMPRESSION_ALGORITHM> [MESSAGE_EXPIRY]...
Arguments:
<STREAM_ID>
Stream ID to create topic
{CLAP_INDENT}
Stream ID can be specified as a stream name or ID
<NAME>
Name of the topic
<PARTITIONS_COUNT>
Number of partitions inside the topic
<COMPRESSION_ALGORITHM>
Compression algorithm for the topic, set to "none" for no compression
[MESSAGE_EXPIRY]...
Message expiry time in human-readable format like 15days 2min 2s
{CLAP_INDENT}
("unlimited" or skipping parameter disables message expiry functionality in topic)
Options:
-t, --topic-id <TOPIC_ID>
Topic ID to create
-m, --max-topic-size <MAX_TOPIC_SIZE>
Max topic size
{CLAP_INDENT}
("unlimited" or skipping parameter disables max topic size functionality in topic)
Can't be lower than segment size in the config.
{CLAP_INDENT}
[default: unlimited]
-r, --replication-factor <REPLICATION_FACTOR>
Replication factor for the topic
{CLAP_INDENT}
[default: 1]
-h, --help
Print help (see a summary with '-h')
"#,
)))
.await;
}
#[tokio::test]
#[parallel]
pub async fn should_short_help_match() {
let mut iggy_cmd_test = IggyCmdTest::default();
iggy_cmd_test
.execute_test_for_help_command(TestHelpCmd::new(
vec!["topic", "create", "-h"],
format!(
r#"Create topic with given name, number of partitions, compression algorithm and expiry time for given stream ID
{USAGE_PREFIX} topic create [OPTIONS] <STREAM_ID> <NAME> <PARTITIONS_COUNT> <COMPRESSION_ALGORITHM> [MESSAGE_EXPIRY]...
Arguments:
<STREAM_ID> Stream ID to create topic
<NAME> Name of the topic
<PARTITIONS_COUNT> Number of partitions inside the topic
<COMPRESSION_ALGORITHM> Compression algorithm for the topic, set to "none" for no compression
[MESSAGE_EXPIRY]... Message expiry time in human-readable format like 15days 2min 2s
Options:
-t, --topic-id <TOPIC_ID> Topic ID to create
-m, --max-topic-size <MAX_TOPIC_SIZE> Max topic size [default: unlimited]
-r, --replication-factor <REPLICATION_FACTOR> Replication factor for the topic [default: 1]
-h, --help Print help (see more with '--help')
"#,
),
))
.await;
}