blob: f32a67239a94dd75994e148a7db39e0fa4052a8b [file] [log] [blame]
use crate::cli::common::{
IggyCmdCommand, IggyCmdTest, IggyCmdTestCase, TestHelpCmd, TestStreamId, TestTopicId,
CLAP_INDENT, USAGE_PREFIX,
};
use assert_cmd::assert::Assert;
use async_trait::async_trait;
use humantime::Duration as HumanDuration;
use iggy::client::Client;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use predicates::str::diff;
use serial_test::parallel;
use std::time::Duration;
struct TestTopicUpdateCmd {
stream_id: u32,
stream_name: String,
topic_id: u32,
topic_name: String,
compression_algorithm: CompressionAlgorithm,
message_expiry: Option<Vec<String>>,
max_topic_size: MaxTopicSize,
replication_factor: u8,
topic_new_name: String,
topic_new_compression_algorithm: CompressionAlgorithm,
topic_new_message_expiry: Option<Vec<String>>,
topic_new_max_size: MaxTopicSize,
topic_new_replication_factor: u8,
using_stream_id: TestStreamId,
using_topic_id: TestTopicId,
}
impl TestTopicUpdateCmd {
#[allow(clippy::too_many_arguments)]
fn new(
stream_id: u32,
stream_name: String,
topic_id: u32,
topic_name: String,
compression_algorithm: CompressionAlgorithm,
message_expiry: Option<Vec<String>>,
max_topic_size: MaxTopicSize,
replication_factor: u8,
topic_new_name: String,
topic_new_compression_algorithm: CompressionAlgorithm,
topic_new_message_expiry: Option<Vec<String>>,
topic_new_max_size: MaxTopicSize,
topic_new_replication_factor: u8,
using_stream_id: TestStreamId,
using_topic_id: TestTopicId,
) -> Self {
Self {
stream_id,
stream_name,
topic_id,
topic_name,
compression_algorithm,
message_expiry,
max_topic_size,
replication_factor,
topic_new_name,
topic_new_compression_algorithm,
topic_new_message_expiry,
topic_new_max_size,
topic_new_replication_factor,
using_stream_id,
using_topic_id,
}
}
fn to_args(&self) -> Vec<String> {
let mut command = match self.using_stream_id {
TestStreamId::Numeric => vec![format!("{}", self.stream_id)],
TestStreamId::Named => vec![self.stream_name.clone()],
};
command.push(match self.using_topic_id {
TestTopicId::Numeric => format!("{}", self.topic_id),
TestTopicId::Named => self.topic_name.clone(),
});
command.push(self.topic_new_name.clone());
command.push(self.topic_new_compression_algorithm.to_string());
if let MaxTopicSize::Custom(max_size) = &self.topic_new_max_size {
command.push(format!("--max-topic-bytes={}", max_size));
}
if self.topic_new_replication_factor != 1 {
command.push(format!(
"--replication-factor={}",
self.topic_new_replication_factor
));
}
if let Some(message_expiry) = &self.topic_new_message_expiry {
command.extend(message_expiry.clone());
}
command
}
}
#[async_trait]
impl IggyCmdTestCase for TestTopicUpdateCmd {
async fn prepare_server_state(&mut self, client: &dyn Client) {
let stream = client
.create_stream(&self.stream_name, Some(self.stream_id))
.await;
assert!(stream.is_ok());
let message_expiry = match &self.message_expiry {
None => IggyExpiry::NeverExpire,
Some(message_expiry) => {
let duration: Duration =
*message_expiry.join(" ").parse::<HumanDuration>().unwrap();
IggyExpiry::ExpireDuration(duration.into())
}
};
let topic = client
.create_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_name,
1,
self.compression_algorithm,
Some(self.replication_factor),
Some(self.topic_id),
message_expiry,
self.max_topic_size,
)
.await;
assert!(topic.is_ok());
}
fn get_command(&self) -> IggyCmdCommand {
IggyCmdCommand::new()
.arg("topic")
.arg("update")
.args(self.to_args())
.with_env_credentials()
}
fn verify_command(&self, command_state: Assert) {
let stream_id = match self.using_stream_id {
TestStreamId::Numeric => format!("{}", self.stream_id),
TestStreamId::Named => self.stream_name.clone(),
};
let topic_id = match self.using_topic_id {
TestTopicId::Numeric => format!("{}", self.topic_id),
TestTopicId::Named => self.topic_name.clone(),
};
let compression_algorithm = &self.topic_new_compression_algorithm;
let message_expiry = (match &self.topic_new_message_expiry {
Some(value) => value.join(" "),
None => IggyExpiry::NeverExpire.to_string(),
})
.to_string();
let max_topic_size = self.max_topic_size.to_string();
let replication_factor = self.replication_factor;
let new_topic_name = &self.topic_new_name;
let expected_message = format!("Executing update topic with ID: {topic_id}, name: {new_topic_name}, \
message expiry: {message_expiry}, compression algorithm: {compression_algorithm}, max topic size: {max_topic_size}, \
replication factor: {replication_factor}, in stream with ID: {stream_id}\n\
Topic with ID: {topic_id} updated name: {new_topic_name}, updated message expiry: {message_expiry}, \
updated compression algorithm: {compression_algorithm} in stream with ID: {stream_id}\n");
command_state.success().stdout(diff(expected_message));
}
async fn verify_server_state(&self, client: &dyn Client) {
let topic = client
.get_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
)
.await;
assert!(topic.is_ok());
let topic_details = topic.unwrap().expect("Failed to get topic");
assert_eq!(topic_details.name, self.topic_new_name);
assert_eq!(topic_details.id, self.topic_id);
assert_eq!(topic_details.messages_count, 0);
if self.topic_new_message_expiry.is_some() {
let duration: Duration = *self
.topic_new_message_expiry
.clone()
.unwrap()
.join(" ")
.parse::<HumanDuration>()
.unwrap();
assert_eq!(
topic_details.message_expiry,
IggyExpiry::ExpireDuration(duration.into())
);
}
let topic = client
.delete_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
)
.await;
assert!(topic.is_ok());
let stream = client
.delete_stream(&self.stream_id.try_into().unwrap())
.await;
assert!(stream.is_ok());
}
}
#[tokio::test]
#[parallel]
pub async fn should_be_successful() {
let mut iggy_cmd_test = IggyCmdTest::default();
iggy_cmd_test.setup().await;
iggy_cmd_test
.execute_test(TestTopicUpdateCmd::new(
1,
String::from("main"),
1,
String::from("sync"),
Default::default(),
None,
MaxTopicSize::ServerDefault,
1,
String::from("new_name"),
CompressionAlgorithm::Gzip,
None,
MaxTopicSize::ServerDefault,
1,
TestStreamId::Numeric,
TestTopicId::Numeric,
))
.await;
iggy_cmd_test
.execute_test(TestTopicUpdateCmd::new(
2,
String::from("production"),
3,
String::from("topic"),
Default::default(),
None,
MaxTopicSize::ServerDefault,
1,
String::from("testing"),
CompressionAlgorithm::Gzip,
None,
MaxTopicSize::ServerDefault,
1,
TestStreamId::Named,
TestTopicId::Numeric,
))
.await;
iggy_cmd_test
.execute_test(TestTopicUpdateCmd::new(
3,
String::from("testing"),
5,
String::from("development"),
Default::default(),
None,
MaxTopicSize::ServerDefault,
1,
String::from("development"),
CompressionAlgorithm::Gzip,
None,
MaxTopicSize::ServerDefault,
1,
TestStreamId::Numeric,
TestTopicId::Named,
))
.await;
iggy_cmd_test
.execute_test(TestTopicUpdateCmd::new(
2,
String::from("other"),
2,
String::from("probe"),
Default::default(),
None,
MaxTopicSize::ServerDefault,
1,
String::from("development"),
CompressionAlgorithm::Gzip,
Some(vec![
String::from("1day"),
String::from("1h"),
String::from("1m"),
String::from("1s"),
]),
MaxTopicSize::ServerDefault,
1,
TestStreamId::Numeric,
TestTopicId::Numeric,
))
.await;
iggy_cmd_test
.execute_test(TestTopicUpdateCmd::new(
3,
String::from("stream"),
1,
String::from("testing"),
Default::default(),
Some(vec![String::from("1s")]),
MaxTopicSize::ServerDefault,
1,
String::from("testing"),
CompressionAlgorithm::Gzip,
Some(vec![String::from("1m 6s")]),
MaxTopicSize::ServerDefault,
1,
TestStreamId::Numeric,
TestTopicId::Numeric,
))
.await;
iggy_cmd_test
.execute_test(TestTopicUpdateCmd::new(
4,
String::from("testing"),
2,
String::from("testing"),
Default::default(),
Some(vec![
String::from("1s"),
String::from("1m"),
String::from("1h"),
]),
MaxTopicSize::ServerDefault,
1,
String::from("testing"),
CompressionAlgorithm::Gzip,
None,
MaxTopicSize::ServerDefault,
1,
TestStreamId::Numeric,
TestTopicId::Named,
))
.await;
}
#[tokio::test]
#[parallel]
pub async fn should_help_match() {
let mut iggy_cmd_test = IggyCmdTest::help_message();
iggy_cmd_test
.execute_test_for_help_command(TestHelpCmd::new(
vec!["topic", "update", "--help"],
format!(
r#"Update topic name, compression algorithm and message expiry time for given topic ID in given stream ID
Stream ID can be specified as a stream name or ID
Topic ID can be specified as a topic name or ID
Examples
iggy update 1 1 sensor3 none
iggy update prod sensor3 old-sensor none
iggy update test debugs ready gzip 15days
iggy update 1 1 new-name gzip
iggy update 1 2 new-name none 1day 1hour 1min 1sec
{USAGE_PREFIX} topic update [OPTIONS] <STREAM_ID> <TOPIC_ID> <NAME> <COMPRESSION_ALGORITHM> [MESSAGE_EXPIRY]...
Arguments:
<STREAM_ID>
Stream ID to update topic
{CLAP_INDENT}
Stream ID can be specified as a stream name or ID
<TOPIC_ID>
Topic ID to update
{CLAP_INDENT}
Topic ID can be specified as a topic name or ID
<NAME>
New name for the topic
<COMPRESSION_ALGORITHM>
Compression algorithm for the topic, set to "none" for no compression
[MESSAGE_EXPIRY]...
New message expiry time in human-readable format like 15days 2min 2s
{CLAP_INDENT}
("unlimited" or skipping parameter causes removal of expiry parameter in topic)
Options:
-m, --max-topic-size <MAX_TOPIC_SIZE>
New max topic size
{CLAP_INDENT}
("unlimited" or skipping parameter causes removal of max topic size parameter in topic)
Can't be lower than segment size in the config.
{CLAP_INDENT}
[default: unlimited]
-r, --replication-factor <REPLICATION_FACTOR>
New replication factor for the topic
{CLAP_INDENT}
[default: 1]
-h, --help
Print help (see a summary with '-h')
"#,
),
))
.await;
}
#[tokio::test]
#[parallel]
pub async fn should_short_help_match() {
let mut iggy_cmd_test = IggyCmdTest::default();
iggy_cmd_test
.execute_test_for_help_command(TestHelpCmd::new(
vec!["topic", "update", "-h"],
format!(
r#"Update topic name, compression algorithm and message expiry time for given topic ID in given stream ID
{USAGE_PREFIX} topic update [OPTIONS] <STREAM_ID> <TOPIC_ID> <NAME> <COMPRESSION_ALGORITHM> [MESSAGE_EXPIRY]...
Arguments:
<STREAM_ID> Stream ID to update topic
<TOPIC_ID> Topic ID to update
<NAME> New name for the topic
<COMPRESSION_ALGORITHM> Compression algorithm for the topic, set to "none" for no compression
[MESSAGE_EXPIRY]... New message expiry time in human-readable format like 15days 2min 2s
Options:
-m, --max-topic-size <MAX_TOPIC_SIZE> New max topic size [default: unlimited]
-r, --replication-factor <REPLICATION_FACTOR> New replication factor for the topic [default: 1]
-h, --help Print help (see more with '--help')
"#,
),
))
.await;
}