blob: 5d40a762363b027ab73ac1eb9fa4690376e3b6ad [file] [log] [blame]
use crate::streaming::common::test_setup::TestSetup;
use bytes::Bytes;
use iggy::bytes_serializable::BytesSerializable;
use iggy::messages::send_messages::Message;
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy::utils::timestamp::IggyTimestamp;
use server::configs::system::{PartitionConfig, SystemConfig};
use server::streaming::batching::appendable_batch_info::AppendableBatchInfo;
use server::streaming::partitions::partition::Partition;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::Arc;
#[tokio::test]
async fn should_persist_messages_and_then_load_them_by_timestamp() {
let setup = TestSetup::init().await;
let stream_id = 1;
let topic_id = 1;
let partition_id = 1;
let messages_count = 100;
let config = Arc::new(SystemConfig {
path: setup.config.path.to_string(),
partition: PartitionConfig {
messages_required_to_save: messages_count,
enforce_fsync: true,
..Default::default()
},
..Default::default()
});
let mut partition = Partition::create(
stream_id,
topic_id,
partition_id,
true,
config.clone(),
setup.storage.clone(),
None,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
);
let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
let mut messages_two = Vec::with_capacity(messages_count as usize);
for i in 1..=messages_count {
let id = i as u128;
let payload = Bytes::from(format!("message {}", i));
let mut headers = HashMap::new();
headers.insert(
HeaderKey::new("key_1").unwrap(),
HeaderValue::from_str("Value 1").unwrap(),
);
headers.insert(
HeaderKey::new("key 2").unwrap(),
HeaderValue::from_bool(true).unwrap(),
);
headers.insert(
HeaderKey::new("key-3").unwrap(),
HeaderValue::from_uint64(123456).unwrap(),
);
let message = Message {
id,
length: payload.len() as u32,
payload: payload.clone(),
headers: Some(headers),
};
messages.push(message);
}
for i in (messages_count + 1)..=(messages_count * 2) {
let id = i as u128;
let payload = Bytes::from(format!("message {}", i));
let mut headers = HashMap::new();
headers.insert(
HeaderKey::new("key_1").unwrap(),
HeaderValue::from_str("Value 1").unwrap(),
);
headers.insert(
HeaderKey::new("key 2").unwrap(),
HeaderValue::from_bool(true).unwrap(),
);
headers.insert(
HeaderKey::new("key-3").unwrap(),
HeaderValue::from_uint64(123456).unwrap(),
);
let message = Message {
id,
length: payload.len() as u32,
payload: payload.clone(),
headers: Some(headers),
};
appended_messages.push(message.clone());
messages_two.push(message);
}
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
let appendable_batch_info = AppendableBatchInfo::new(
messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(),
partition.partition_id,
);
let appendable_batch_info_two = AppendableBatchInfo::new(
messages_two
.iter()
.map(|msg| msg.get_size_bytes() as u64)
.sum(),
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.await
.unwrap();
let test_timestamp = IggyTimestamp::now().to_micros();
partition
.append_messages(appendable_batch_info_two, messages_two)
.await
.unwrap();
let loaded_messages = partition
.get_messages_by_timestamp(test_timestamp, messages_count)
.await
.unwrap();
assert_eq!(loaded_messages.len(), messages_count as usize);
for i in (messages_count + 1)..=(messages_count * 2) {
let index = (i - messages_count - 1) as usize;
let loaded_message = &loaded_messages[index];
let appended_message = &appended_messages[index];
assert_eq!(loaded_message.id, appended_message.id);
assert_eq!(loaded_message.payload, appended_message.payload);
assert!(loaded_message.timestamp >= test_timestamp);
assert_eq!(
loaded_message
.headers
.as_ref()
.map(|bytes| HashMap::from_bytes(bytes.clone()).unwrap()),
appended_message.headers
);
}
}
#[tokio::test]
async fn should_persist_messages_and_then_load_them_from_disk() {
let setup = TestSetup::init().await;
let stream_id = 1;
let topic_id = 1;
let partition_id = 1;
let messages_count = 1000;
let config = Arc::new(SystemConfig {
path: setup.config.path.to_string(),
partition: PartitionConfig {
messages_required_to_save: messages_count,
enforce_fsync: true,
..Default::default()
},
..Default::default()
});
let mut partition = Partition::create(
stream_id,
topic_id,
partition_id,
true,
config.clone(),
setup.storage.clone(),
None,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
);
let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
for i in 1..=messages_count {
let id = i as u128;
let payload = Bytes::from(format!("message {}", i));
let mut headers = HashMap::new();
headers.insert(
HeaderKey::new("key_1").unwrap(),
HeaderValue::from_str("Value 1").unwrap(),
);
headers.insert(
HeaderKey::new("key 2").unwrap(),
HeaderValue::from_bool(true).unwrap(),
);
headers.insert(
HeaderKey::new("key-3").unwrap(),
HeaderValue::from_uint64(123456).unwrap(),
);
let appended_message = Message {
id,
length: payload.len() as u32,
payload: payload.clone(),
headers: Some(headers.clone()),
};
let message = Message {
id,
length: payload.len() as u32,
payload: payload.clone(),
headers: Some(headers),
};
appended_messages.push(appended_message);
messages.push(message);
}
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
let appendable_batch_info = AppendableBatchInfo::new(
messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(),
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.await
.unwrap();
assert_eq!(partition.unsaved_messages_count, 0);
let mut loaded_partition = Partition::create(
stream_id,
topic_id,
partition.partition_id,
false,
config.clone(),
setup.storage.clone(),
None,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
);
loaded_partition.load().await.unwrap();
let loaded_messages = loaded_partition
.get_messages_by_offset(0, messages_count)
.await
.unwrap();
assert_eq!(loaded_messages.len(), messages_count as usize);
for i in 1..=messages_count {
let index = i as usize - 1;
let loaded_message = &loaded_messages[index];
let appended_message = &appended_messages[index];
assert_eq!(loaded_message.id, appended_message.id);
assert_eq!(loaded_message.payload, appended_message.payload);
assert_eq!(
loaded_message
.headers
.as_ref()
.map(|bytes| HashMap::from_bytes(bytes.clone()).unwrap()),
appended_message.headers
);
}
}