blob: 7fcf74016d8d87c7166ca9fec0551856bf4f5827 [file] [log] [blame]
use crate::streaming::common::test_setup::TestSetup;
use bytes::Bytes;
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy::models::polled_messages::{MessageState, PolledMessage};
use iggy::utils::{checksum, timestamp::IggyTimestamp};
use server::configs::system::{PartitionConfig, SystemConfig};
use server::streaming::partitions::partition::Partition;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
#[tokio::test]
async fn should_persist_messages_and_then_load_them_from_disk() {
let setup = TestSetup::init().await;
let stream_id = 1;
let topic_id = 1;
let partition_id = 1;
let messages_count = 1000;
let config = Arc::new(SystemConfig {
path: setup.config.path.to_string(),
partition: PartitionConfig {
messages_required_to_save: messages_count,
..Default::default()
},
..Default::default()
});
let mut partition = Partition::create(
stream_id,
topic_id,
partition_id,
true,
config.clone(),
setup.storage.clone(),
None,
);
let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
for i in 1..=messages_count {
let offset = (i - 1) as u64;
let state = MessageState::Available;
let timestamp = IggyTimestamp::now().to_micros();
let id = i as u128;
let payload = Bytes::from(format!("message {}", i));
let checksum = checksum::calculate(&payload);
let mut headers = HashMap::new();
headers.insert(
HeaderKey::new("key_1").unwrap(),
HeaderValue::from_str("Value 1").unwrap(),
);
headers.insert(
HeaderKey::new("key 2").unwrap(),
HeaderValue::from_bool(true).unwrap(),
);
headers.insert(
HeaderKey::new("key-3").unwrap(),
HeaderValue::from_uint64(123456).unwrap(),
);
let appended_message = PolledMessage::create(
offset,
state,
timestamp,
id,
payload.clone(),
checksum,
Some(headers.clone()),
);
let message = PolledMessage::create(
offset,
state,
timestamp,
id,
payload,
checksum,
Some(headers),
);
appended_messages.push(appended_message);
messages.push(message);
}
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
partition.append_messages(messages).await.unwrap();
assert_eq!(partition.unsaved_messages_count, 0);
let mut loaded_partition = Partition::create(
stream_id,
topic_id,
partition.partition_id,
false,
config.clone(),
setup.storage.clone(),
None,
);
loaded_partition.load().await.unwrap();
let loaded_messages = loaded_partition
.get_messages_by_offset(0, messages_count)
.await
.unwrap();
assert_eq!(loaded_messages.len(), messages_count as usize);
for i in 1..=messages_count {
let index = i as usize - 1;
let loaded_message = &loaded_messages[index];
let appended_message = &appended_messages[index];
assert_eq!(loaded_message.offset, appended_message.offset);
assert_eq!(loaded_message.state, appended_message.state);
assert_eq!(loaded_message.timestamp, appended_message.timestamp);
assert_eq!(loaded_message.id, appended_message.id);
assert_eq!(loaded_message.checksum, appended_message.checksum);
assert_eq!(loaded_message.length, appended_message.length);
assert_eq!(loaded_message.payload, appended_message.payload);
assert_eq!(loaded_message.headers, appended_message.headers);
}
}