blob: d82e90498dd932bca33ac67c2b867fc89d21dd96 [file] [log] [blame]
use crate::cli::common::{
IggyCmdCommand, IggyCmdTest, IggyCmdTestCase, TestHelpCmd, TestStreamId, TestTopicId,
CLAP_INDENT, USAGE_PREFIX,
};
use assert_cmd::assert::Assert;
use async_trait::async_trait;
use bytes::Bytes;
use iggy::client::Client;
use iggy::messages::poll_messages::{PollingKind, PollingStrategy};
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use predicates::str::{contains, starts_with};
use serial_test::parallel;
use std::collections::HashMap;
use std::str::FromStr;
struct TestMessagePollCmd {
stream_id: u32,
stream_name: String,
topic_id: u32,
topic_name: String,
partitions_count: u32,
messages: Vec<String>,
partition_id: u32,
message_count: usize,
strategy: PollingStrategy,
using_stream_id: TestStreamId,
using_topic_id: TestTopicId,
show_headers: bool,
headers: (HeaderKey, HeaderValue),
}
impl TestMessagePollCmd {
#[allow(clippy::too_many_arguments)]
fn new(
stream_id: u32,
stream_name: String,
topic_id: u32,
topic_name: String,
partitions_count: u32,
messages: &[String],
partition_id: u32,
message_count: usize,
strategy: PollingStrategy,
using_stream_id: TestStreamId,
using_topic_id: TestTopicId,
show_headers: bool,
headers: (HeaderKey, HeaderValue),
) -> Self {
assert!(partition_id <= partitions_count);
assert!(partition_id > 0);
assert!(message_count < messages.len());
Self {
stream_id,
stream_name,
topic_id,
topic_name,
partitions_count,
messages: messages.to_owned(),
partition_id,
message_count,
strategy,
using_stream_id,
using_topic_id,
show_headers,
headers,
}
}
fn to_args(&self) -> Vec<String> {
let mut command = match self.strategy.kind {
PollingKind::Offset => vec!["--offset".into(), format!("{}", self.strategy.value)],
PollingKind::Timestamp => {
todo!("Timestamp based message polling is not supported yet")
}
PollingKind::First => vec!["--first".into()],
PollingKind::Next => vec!["--next".into()],
PollingKind::Last => vec!["--last".into()],
};
command.extend(vec![
"--message-count".into(),
format!("{}", self.message_count),
]);
if self.show_headers {
command.extend(vec!["--show-headers".into()]);
}
command.extend(match self.using_stream_id {
TestStreamId::Numeric => vec![format!("{}", self.stream_id)],
TestStreamId::Named => vec![self.stream_name.clone()],
});
command.push(match self.using_topic_id {
TestTopicId::Numeric => format!("{}", self.topic_id),
TestTopicId::Named => self.topic_name.clone(),
});
command.push(format!("{}", self.partition_id));
command
}
}
#[async_trait]
impl IggyCmdTestCase for TestMessagePollCmd {
async fn prepare_server_state(&mut self, client: &dyn Client) {
let stream = client
.create_stream(&self.stream_name, self.stream_id.into())
.await;
assert!(stream.is_ok());
let topic = client
.create_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_name,
self.partitions_count,
Default::default(),
None,
Some(self.topic_id),
IggyExpiry::NeverExpire,
MaxTopicSize::ServerDefault,
)
.await;
assert!(topic.is_ok());
let mut messages = self
.messages
.iter()
.map(|s| {
let payload = Bytes::from(s.as_bytes().to_vec());
Message::new(None, payload, Some(HashMap::from([self.headers.clone()])))
})
.collect::<Vec<_>>();
let send_status = client
.send_messages(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
&Partitioning::partition_id(self.partition_id),
&mut messages,
)
.await;
assert!(send_status.is_ok());
}
fn get_command(&self) -> IggyCmdCommand {
IggyCmdCommand::new()
.arg("message")
.arg("poll")
.args(self.to_args())
.with_env_credentials()
}
fn verify_command(&self, command_state: Assert) {
let stream_id = match self.using_stream_id {
TestStreamId::Numeric => format!("{}", self.stream_id),
TestStreamId::Named => self.stream_name.clone(),
};
let topic_id = match self.using_topic_id {
TestTopicId::Numeric => format!("{}", self.topic_id),
TestTopicId::Named => self.topic_name.clone(),
};
let message = format!("Executing poll messages from topic ID: {} and stream with ID: {}\nPolled messages from topic with ID: {} and stream with ID: {} (from partition with ID: {})\nPolled {} messages",
topic_id, stream_id, topic_id, stream_id, self.partition_id, self.message_count);
let mut status = command_state.success().stdout(starts_with(message));
if self.show_headers {
status = status
.stdout(contains(format!("Header: {}", self.headers.0)))
.stdout(contains(self.headers.1.kind.to_string()))
.stdout(contains(self.headers.1.value_only_to_string()).count(self.message_count))
}
// Check if messages are printed based on the strategy
match self.strategy.kind {
PollingKind::Offset => {
self.messages
.iter()
.skip(self.strategy.value as usize)
.take(self.message_count)
.fold(status, |status, message| status.stdout(contains(message)));
}
PollingKind::First => {
self.messages
.iter()
.take(self.message_count)
.fold(status, |status, message| status.stdout(contains(message)));
}
PollingKind::Next => {
self.messages
.iter()
.take(self.message_count)
.fold(status, |status, message| status.stdout(contains(message)));
}
PollingKind::Last => {
self.messages
.iter()
.rev()
.take(self.message_count)
.fold(status, |status, message| status.stdout(contains(message)));
}
_ => {}
}
}
async fn verify_server_state(&self, client: &dyn Client) {
let topic = client
.delete_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
)
.await;
assert!(topic.is_ok());
let stream = client
.delete_stream(&self.stream_id.try_into().unwrap())
.await;
assert!(stream.is_ok());
}
}
#[tokio::test]
#[parallel]
pub async fn should_be_successful() {
let mut iggy_cmd_test = IggyCmdTest::default();
let test_messages: Vec<String> = vec![
"Lorem ipsum dolor sit amet, consectetur adipiscing elit".into(),
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua".into(),
"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris".into(),
"nisi ut aliquip ex ea commodo consequat".into(),
"Duis aute irure dolor in reprehenderit in voluptate velit esse".into(),
"cillum dolore eu fugiat nulla pariatur".into(),
"Excepteur sint occaecat cupidatat non proident, sunt in culpa".into(),
"qui officia deserunt mollit anim id est laborum".into(),
"Sed ut perspiciatis unde omnis iste natus error sit voluptatem".into(),
"accusantium doloremque laudantium, totam rem aperiam, eaque ipsa".into(),
];
let test_headers = (
HeaderKey::from_str("key1").unwrap(),
HeaderValue::from_str("value1").unwrap(),
);
let test_parameters: Vec<(u32, usize, PollingStrategy, TestStreamId, TestTopicId, bool)> = vec![
(
1,
1,
PollingStrategy::offset(0),
TestStreamId::Numeric,
TestTopicId::Numeric,
true,
),
(
2,
5,
PollingStrategy::offset(0),
TestStreamId::Numeric,
TestTopicId::Named,
true,
),
(
3,
3,
PollingStrategy::offset(3),
TestStreamId::Named,
TestTopicId::Numeric,
true,
),
(
4,
5,
PollingStrategy::first(),
TestStreamId::Named,
TestTopicId::Named,
true,
),
(
1,
4,
PollingStrategy::last(),
TestStreamId::Numeric,
TestTopicId::Numeric,
true,
),
(
2,
3,
PollingStrategy::next(),
TestStreamId::Numeric,
TestTopicId::Named,
false,
),
];
iggy_cmd_test.setup().await;
for (partition_id, message_count, strategy, using_stream_id, using_topic_id, show_headers) in
test_parameters
{
iggy_cmd_test
.execute_test(TestMessagePollCmd::new(
1,
String::from("stream"),
2,
String::from("topic"),
4,
&test_messages,
partition_id,
message_count,
strategy,
using_stream_id,
using_topic_id,
show_headers,
test_headers.clone(),
))
.await;
}
}
#[tokio::test]
#[parallel]
pub async fn should_help_match() {
let mut iggy_cmd_test = IggyCmdTest::help_message();
iggy_cmd_test
.execute_test_for_help_command(TestHelpCmd::new(
vec!["message", "poll", "--help"],
format!(
r#"Poll messages from given topic ID and given stream ID
Stream ID can be specified as a stream name or ID
Topic ID can be specified as a topic name or ID
Examples:
iggy message poll --offset 0 1 2 1
iggy message poll --offset 0 stream 2 1
iggy message poll --offset 0 1 topic 1
iggy message poll --offset 0 stream topic 1
{USAGE_PREFIX} message poll [OPTIONS] <--offset <OFFSET>|--first|--last|--next> <STREAM_ID> <TOPIC_ID> <PARTITION_ID>
Arguments:
<STREAM_ID>
ID of the stream from which message will be polled
{CLAP_INDENT}
Stream ID can be specified as a stream name or ID
<TOPIC_ID>
ID of the topic from which message will be polled
{CLAP_INDENT}
Topic ID can be specified as a topic name or ID
<PARTITION_ID>
Partition ID from which message will be polled
Options:
-m, --message-count <MESSAGE_COUNT>
Number of messages to poll
{CLAP_INDENT}
[default: 1]
-a, --auto-commit
Auto commit offset
{CLAP_INDENT}
Flag indicates whether to commit offset on the server automatically
after polling the messages.
-o, --offset <OFFSET>
Polling strategy - offset to start polling messages from
{CLAP_INDENT}
Offset must be specified as a number
-f, --first
Polling strategy - start polling from the first message in the partition
-l, --last
Polling strategy - start polling from the last message in the partition
-n, --next
Polling strategy - start polling from the next message
{CLAP_INDENT}
Start polling after the last polled message based
on the stored consumer offset
-c, --consumer <CONSUMER>
Regular consumer which will poll messages
{CLAP_INDENT}
Consumer ID can be specified as a consumer name or ID
{CLAP_INDENT}
[default: 1]
-s, --show-headers
Include the message headers in the output
{CLAP_INDENT}
Flag indicates whether to include headers in the output
after polling the messages.
-h, --help
Print help (see a summary with '-h')
"#,
),
))
.await;
}
#[tokio::test]
#[parallel]
pub async fn should_short_help_match() {
let mut iggy_cmd_test = IggyCmdTest::default();
iggy_cmd_test
.execute_test_for_help_command(TestHelpCmd::new(
vec!["message", "poll", "-h"],
format!(
r#"Poll messages from given topic ID and given stream ID
{USAGE_PREFIX} message poll [OPTIONS] <--offset <OFFSET>|--first|--last|--next> <STREAM_ID> <TOPIC_ID> <PARTITION_ID>
Arguments:
<STREAM_ID> ID of the stream from which message will be polled
<TOPIC_ID> ID of the topic from which message will be polled
<PARTITION_ID> Partition ID from which message will be polled
Options:
-m, --message-count <MESSAGE_COUNT> Number of messages to poll [default: 1]
-a, --auto-commit Auto commit offset
-o, --offset <OFFSET> Polling strategy - offset to start polling messages from
-f, --first Polling strategy - start polling from the first message in the partition
-l, --last Polling strategy - start polling from the last message in the partition
-n, --next Polling strategy - start polling from the next message
-c, --consumer <CONSUMER> Regular consumer which will poll messages [default: 1]
-s, --show-headers Include the message headers in the output
-h, --help Print help (see more with '--help')
"#,
),
))
.await;
}