blob: d480a3dfc81c549bde24749f2ff08f0ebea1e728 [file] [log] [blame]
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
use iggy::messages::append_messages::Partitioning;
use iggy::messages::poll_messages::PollingStrategy;
use server::streaming::polling_consumer::PollingConsumer;
use server::streaming::topics::topic::Topic;
use tokio::fs;
#[tokio::test]
async fn should_persist_topics_with_partitions_directories_and_info_file() {
let setup = TestSetup::init().await;
let stream_id = 1;
let partitions_count = 3;
setup.create_topics_directory(stream_id).await;
let topic_ids = get_topic_ids();
for topic_id in topic_ids {
let name = format!("test-{}", topic_id);
let topic = Topic::create(
stream_id,
topic_id,
&name,
partitions_count,
setup.config.clone(),
setup.storage.clone(),
None,
None,
1,
)
.unwrap();
topic.persist().await.unwrap();
assert_persisted_topic(
&topic.path,
&setup.config.get_partitions_path(stream_id, topic_id),
3,
)
.await;
}
}
#[tokio::test]
async fn should_load_existing_topic_from_disk() {
let setup = TestSetup::init().await;
let stream_id = 1;
setup.create_topics_directory(stream_id).await;
let partitions_count = 3;
let topic_ids = get_topic_ids();
for topic_id in topic_ids {
let name = format!("test-{}", topic_id);
let topic = Topic::create(
stream_id,
topic_id,
&name,
partitions_count,
setup.config.clone(),
setup.storage.clone(),
None,
None,
1,
)
.unwrap();
topic.persist().await.unwrap();
assert_persisted_topic(
&topic.path,
&setup.config.get_partitions_path(stream_id, topic_id),
partitions_count,
)
.await;
let mut loaded_topic = Topic::empty(
stream_id,
topic_id,
setup.config.clone(),
setup.storage.clone(),
);
loaded_topic.load().await.unwrap();
assert_eq!(loaded_topic.stream_id, topic.stream_id);
assert_eq!(loaded_topic.topic_id, topic.topic_id);
assert_eq!(loaded_topic.name, topic.name);
assert_eq!(loaded_topic.path, topic.path);
assert_eq!(loaded_topic.get_partitions().len() as u32, partitions_count);
}
}
#[tokio::test]
async fn should_delete_existing_topic_from_disk() {
let setup = TestSetup::init().await;
let stream_id = 1;
setup.create_topics_directory(stream_id).await;
let partitions_count = 3;
let topic_ids = get_topic_ids();
for topic_id in topic_ids {
let name = format!("test-{}", topic_id);
let topic = Topic::create(
stream_id,
topic_id,
&name,
partitions_count,
setup.config.clone(),
setup.storage.clone(),
None,
None,
1,
)
.unwrap();
topic.persist().await.unwrap();
assert_persisted_topic(
&topic.path,
&setup.config.get_partitions_path(stream_id, topic_id),
partitions_count,
)
.await;
topic.delete().await.unwrap();
assert!(fs::metadata(&topic.path).await.is_err());
}
}
#[tokio::test]
async fn should_purge_existing_topic_on_disk() {
let setup = TestSetup::init().await;
let stream_id = 1;
setup.create_topics_directory(stream_id).await;
let partitions_count = 3;
let topic_ids = get_topic_ids();
for topic_id in topic_ids {
let name = format!("test-{}", topic_id);
let topic = Topic::create(
stream_id,
topic_id,
&name,
partitions_count,
setup.config.clone(),
setup.storage.clone(),
None,
None,
1,
)
.unwrap();
topic.persist().await.unwrap();
assert_persisted_topic(
&topic.path,
&setup.config.get_partitions_path(stream_id, topic_id),
partitions_count,
)
.await;
let messages = create_messages();
let messages_count = messages.len();
topic
.append_messages(&Partitioning::partition_id(1), messages)
.await
.unwrap();
let loaded_messages = topic
.get_messages(
PollingConsumer::Consumer(1, 1),
1,
PollingStrategy::offset(0),
100,
)
.await
.unwrap();
assert_eq!(loaded_messages.messages.len(), messages_count);
topic.purge().await.unwrap();
let loaded_messages = topic
.get_messages(
PollingConsumer::Consumer(1, 1),
1,
PollingStrategy::offset(0),
100,
)
.await
.unwrap();
assert_eq!(loaded_messages.current_offset, 0);
assert!(loaded_messages.messages.is_empty());
}
}
async fn assert_persisted_topic(topic_path: &str, partitions_path: &str, partitions_count: u32) {
let topic_metadata = fs::metadata(topic_path).await.unwrap();
assert!(topic_metadata.is_dir());
for partition_id in 1..=partitions_count {
let partition_path = format!("{}/{}", partitions_path, partition_id);
let partition_metadata = fs::metadata(&partition_path).await.unwrap();
assert!(partition_metadata.is_dir());
}
}
fn get_topic_ids() -> Vec<u32> {
vec![1, 2, 3, 5, 10, 100, 1000, 99999]
}