blob: d6216a2690c593890935540ab3b6304056d84d04 [file] [log] [blame]
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
use server::streaming::partitions::partition::Partition;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION};
use tokio::fs;
#[tokio::test]
async fn should_persist_partition_with_segment() {
let setup = TestSetup::init().await;
let with_segment = true;
let stream_id = 1;
let topic_id = 2;
setup.create_partitions_directory(stream_id, topic_id).await;
let partition_ids = get_partition_ids();
for partition_id in partition_ids {
let partition = Partition::create(
stream_id,
topic_id,
partition_id,
with_segment,
setup.config.clone(),
setup.storage.clone(),
None,
);
partition.persist().await.unwrap();
assert_persisted_partition(&partition.path, with_segment).await;
}
}
#[tokio::test]
async fn should_load_existing_partition_from_disk() {
let setup = TestSetup::init().await;
let with_segment = true;
let stream_id = 1;
let topic_id = 2;
setup.create_partitions_directory(stream_id, topic_id).await;
let partition_ids = get_partition_ids();
for partition_id in partition_ids {
let partition = Partition::create(
stream_id,
topic_id,
partition_id,
with_segment,
setup.config.clone(),
setup.storage.clone(),
None,
);
partition.persist().await.unwrap();
assert_persisted_partition(&partition.path, with_segment).await;
let mut loaded_partition = Partition::create(
stream_id,
topic_id,
partition.partition_id,
false,
setup.config.clone(),
setup.storage.clone(),
None,
);
loaded_partition.load().await.unwrap();
assert_eq!(loaded_partition.stream_id, partition.stream_id);
assert_eq!(loaded_partition.partition_id, partition.partition_id);
assert_eq!(loaded_partition.path, partition.path);
assert_eq!(loaded_partition.current_offset, partition.current_offset);
assert_eq!(
loaded_partition.unsaved_messages_count,
partition.unsaved_messages_count
);
assert_eq!(
loaded_partition.get_segments().len(),
partition.get_segments().len()
);
assert_eq!(
loaded_partition.should_increment_offset,
partition.should_increment_offset
);
assert_eq!(loaded_partition.cache.is_some(), partition.cache.is_some());
assert_eq!(
loaded_partition.cache.unwrap().is_empty(),
partition.cache.unwrap().is_empty()
);
}
}
#[tokio::test]
async fn should_delete_existing_partition_from_disk() {
let setup = TestSetup::init().await;
let with_segment = true;
let stream_id = 1;
let topic_id = 2;
setup.create_partitions_directory(stream_id, topic_id).await;
let partition_ids = get_partition_ids();
for partition_id in partition_ids {
let partition = Partition::create(
stream_id,
topic_id,
partition_id,
with_segment,
setup.config.clone(),
setup.storage.clone(),
None,
);
partition.persist().await.unwrap();
assert_persisted_partition(&partition.path, with_segment).await;
partition.delete().await.unwrap();
assert!(fs::metadata(&partition.path).await.is_err());
}
}
#[tokio::test]
async fn should_purge_existing_partition_on_disk() {
let setup = TestSetup::init().await;
let with_segment = true;
let stream_id = 1;
let topic_id = 2;
setup.create_partitions_directory(stream_id, topic_id).await;
let partition_ids = get_partition_ids();
for partition_id in partition_ids {
let mut partition = Partition::create(
stream_id,
topic_id,
partition_id,
with_segment,
setup.config.clone(),
setup.storage.clone(),
None,
);
partition.persist().await.unwrap();
assert_persisted_partition(&partition.path, with_segment).await;
let messages = create_messages();
let messages_count = messages.len();
partition.append_messages(messages).await.unwrap();
let loaded_messages = partition.get_messages_by_offset(0, 100).await.unwrap();
assert_eq!(loaded_messages.len(), messages_count);
partition.purge().await.unwrap();
assert_eq!(partition.current_offset, 0);
assert_eq!(partition.unsaved_messages_count, 0);
assert!(!partition.should_increment_offset);
let loaded_messages = partition.get_messages_by_offset(0, 100).await.unwrap();
assert!(loaded_messages.is_empty());
}
}
async fn assert_persisted_partition(partition_path: &str, with_segment: bool) {
assert!(fs::metadata(&partition_path).await.is_ok());
if with_segment {
let start_offset = 0u64;
let segment_path = format!("{}/{:0>20}", partition_path, start_offset);
let log_path = format!("{}.{}", segment_path, LOG_EXTENSION);
let index_path = format!("{}.{}", segment_path, INDEX_EXTENSION);
let time_index_path = format!("{}.{}", segment_path, TIME_INDEX_EXTENSION);
assert!(fs::metadata(&log_path).await.is_ok());
assert!(fs::metadata(&index_path).await.is_ok());
assert!(fs::metadata(&time_index_path).await.is_ok());
}
}
fn get_partition_ids() -> Vec<u32> {
vec![1, 2, 3, 5, 10, 100, 1000, 99999]
}