blob: ffc56f4351e8b902f80f5da7fa2dd67046caf05d [file] [log] [blame]
use crate::streaming::models::messages::PolledMessages;
use crate::streaming::polling_consumer::PollingConsumer;
use crate::streaming::topics::topic::Topic;
use crate::streaming::utils::file::folder_size;
use crate::streaming::utils::hash;
use iggy::error::IggyError;
use iggy::messages::append_messages::{Partitioning, PartitioningKind};
use iggy::messages::poll_messages::{PollingKind, PollingStrategy};
use iggy::models::polled_messages::PolledMessage;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tracing::{info, trace, warn};
impl Topic {
pub async fn get_messages_count(&self) -> u64 {
let mut messages_count = 0;
for partition in self.get_partitions() {
let partition = partition.read().await;
messages_count += partition.get_messages_count();
}
messages_count
}
pub async fn get_messages(
&self,
consumer: PollingConsumer,
partition_id: u32,
strategy: PollingStrategy,
count: u32,
) -> Result<PolledMessages, IggyError> {
if !self.has_partitions() {
return Err(IggyError::NoPartitions(self.topic_id, self.stream_id));
}
let partition = self.partitions.get(&partition_id);
if partition.is_none() {
return Err(IggyError::PartitionNotFound(
partition_id,
self.topic_id,
self.stream_id,
));
}
let partition = partition.unwrap();
let partition = partition.read().await;
let value = strategy.value;
let messages = match strategy.kind {
PollingKind::Offset => partition.get_messages_by_offset(value, count).await,
PollingKind::Timestamp => partition.get_messages_by_timestamp(value, count).await,
PollingKind::First => partition.get_first_messages(count).await,
PollingKind::Last => partition.get_last_messages(count).await,
PollingKind::Next => partition.get_next_messages(consumer, count).await,
}?;
Ok(PolledMessages {
messages,
partition_id,
current_offset: partition.current_offset,
})
}
pub async fn append_messages(
&self,
partitioning: &Partitioning,
messages: Vec<PolledMessage>,
) -> Result<(), IggyError> {
if !self.has_partitions() {
return Err(IggyError::NoPartitions(self.topic_id, self.stream_id));
}
if messages.is_empty() {
return Ok(());
}
let partition_id = match partitioning.kind {
PartitioningKind::Balanced => self.get_next_partition_id(),
PartitioningKind::PartitionId => {
u32::from_le_bytes(partitioning.value[..partitioning.length as usize].try_into()?)
}
PartitioningKind::MessagesKey => {
self.calculate_partition_id_by_messages_key_hash(&partitioning.value)
}
};
self.append_messages_to_partition(partition_id, messages)
.await
}
async fn append_messages_to_partition(
&self,
partition_id: u32,
messages: Vec<PolledMessage>,
) -> Result<(), IggyError> {
let partition = self.partitions.get(&partition_id);
if partition.is_none() {
return Err(IggyError::PartitionNotFound(
partition_id,
self.topic_id,
self.stream_id,
));
}
let partition = partition.unwrap();
let mut partition = partition.write().await;
partition.append_messages(messages).await?;
Ok(())
}
fn get_next_partition_id(&self) -> u32 {
let mut partition_id = self.current_partition_id.fetch_add(1, Ordering::SeqCst);
let partitions_count = self.partitions.len() as u32;
if partition_id > partitions_count {
partition_id = 1;
self.current_partition_id
.swap(partition_id + 1, Ordering::SeqCst);
}
trace!("Next partition ID: {}", partition_id);
partition_id
}
fn calculate_partition_id_by_messages_key_hash(&self, messages_key: &[u8]) -> u32 {
let messages_key_hash = hash::calculate_32(messages_key);
let partitions_count = self.get_partitions_count();
let mut partition_id = messages_key_hash % partitions_count;
if partition_id == 0 {
partition_id = partitions_count;
}
trace!(
"Calculated partition ID: {} for messages key: {:?}, hash: {}",
partition_id,
messages_key,
messages_key_hash
);
partition_id
}
pub(crate) async fn load_messages_from_disk_to_cache(&mut self) -> Result<(), IggyError> {
if !self.config.cache.enabled {
return Ok(());
}
let path = self.config.get_system_path();
// TODO: load data from database instead of calculating the size on disk
let total_size_on_disk_bytes = folder_size(&path).await?;
for partition_lock in self.partitions.values_mut() {
let mut partition = partition_lock.write().await;
let end_offset = match partition.segments.last() {
Some(segment) => segment.current_offset,
None => {
warn!(
"No segments found for partition ID: {}, topic ID: {}, stream ID: {}",
partition.partition_id, partition.topic_id, partition.stream_id
);
continue;
}
};
trace!(
"Loading messages to cache for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}...",
partition.partition_id,
partition.topic_id,
partition.stream_id,
end_offset
);
let partition_size_bytes = partition.get_size_bytes();
let cache_limit_bytes = self.config.cache.size.clone().into();
// Fetch data from disk proportional to the partition size
// eg. 12 partitions, each has 300 MB, cache limit is 500 MB, so there is total 3600 MB of data on SSD.
// 500 MB * (300 / 3600 MB) ~= 41.6 MB to load from cache (assuming all partitions have the same size on disk)
let size_to_fetch_from_disk = (cache_limit_bytes as f64
* (partition_size_bytes as f64 / total_size_on_disk_bytes as f64))
as u64;
let messages = partition
.get_newest_messages_by_size(size_to_fetch_from_disk as u32)
.await?;
let sum: u64 = messages.iter().map(|m| m.get_size_bytes() as u64).sum();
if !Self::cache_integrity_check(&messages) {
warn!(
"Cache integrity check failed for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}. Emptying cache...",
partition.partition_id, partition.topic_id, partition.stream_id, end_offset
);
} else if let Some(cache) = &mut partition.cache {
for message in &messages {
cache.push_safe(message.clone());
}
info!(
"Loaded {} messages ({} bytes) to cache for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}.",
messages.len(), sum, partition.partition_id, partition.topic_id, partition.stream_id, end_offset
);
} else {
warn!(
"Cache is invalid for ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}",
partition.partition_id, partition.topic_id, partition.stream_id, end_offset
);
}
}
Ok(())
}
fn cache_integrity_check(cache: &[Arc<PolledMessage>]) -> bool {
if cache.is_empty() {
warn!("Cache is empty!");
return false;
}
let first_offset = cache[0].offset;
let last_offset = cache[cache.len() - 1].offset;
for i in 1..cache.len() {
if cache[i].offset != cache[i - 1].offset + 1 {
warn!("Offsets are not subsequent at index {} offset {}, for previous index {} offset is {}", i, cache[i].offset, i-1, cache[i-1].offset);
return false;
}
}
let expected_messages_count: u64 = last_offset - first_offset + 1;
if cache.len() != expected_messages_count as usize {
warn!(
"Messages count is in cache ({}) not equal to expected messages count ({})",
cache.len(),
expected_messages_count
);
return false;
}
true
}
pub async fn get_expired_segments_start_offsets_per_partition(
&self,
now: u64,
) -> HashMap<u32, Vec<u64>> {
let mut expired_segments = HashMap::new();
if self.message_expiry.is_none() {
return expired_segments;
}
for (_, partition) in self.partitions.iter() {
let partition = partition.read().await;
let segments = partition.get_expired_segments_start_offsets(now).await;
if !segments.is_empty() {
expired_segments.insert(partition.partition_id, segments);
}
}
expired_segments
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::system::SystemConfig;
use crate::streaming::storage::tests::get_test_system_storage;
use bytes::Bytes;
use iggy::models::polled_messages::MessageState;
use std::sync::Arc;
#[tokio::test]
async fn given_partition_id_key_messages_should_be_appended_only_to_the_chosen_partition() {
let partition_id = 1;
let partitioning = Partitioning::partition_id(partition_id);
let partitions_count = 3;
let messages_count = 1000;
let topic = init_topic(partitions_count);
for entity_id in 1..=messages_count {
let payload = Bytes::from("test");
let messages = vec![PolledMessage::empty(
1,
MessageState::Available,
entity_id as u128,
payload,
1,
None,
)];
topic
.append_messages(&partitioning, messages)
.await
.unwrap();
}
let partitions = topic.get_partitions();
assert_eq!(partitions.len(), partitions_count as usize);
for partition in partitions {
let partition = partition.read().await;
let messages = partition.cache.as_ref().unwrap().to_vec();
if partition.partition_id == partition_id {
assert_eq!(messages.len() as u32, messages_count);
} else {
assert_eq!(messages.len() as u32, 0);
}
}
}
#[tokio::test]
async fn given_messages_key_key_messages_should_be_appended_to_the_calculated_partitions() {
let partitions_count = 3;
let messages_count = 1000;
let topic = init_topic(partitions_count);
for entity_id in 1..=messages_count {
let partitioning = Partitioning::messages_key_u32(entity_id);
let payload = Bytes::from("test");
let messages = vec![PolledMessage::empty(
1,
MessageState::Available,
entity_id as u128,
payload,
1,
None,
)];
topic
.append_messages(&partitioning, messages)
.await
.unwrap();
}
let mut read_messages_count = 0;
let partitions = topic.get_partitions();
assert_eq!(partitions.len(), partitions_count as usize);
for partition in partitions {
let partition = partition.read().await;
let messages = partition.cache.as_ref().unwrap().to_vec();
read_messages_count += messages.len();
assert!(messages.len() < messages_count as usize);
}
assert_eq!(read_messages_count, messages_count as usize);
}
#[test]
fn given_multiple_partitions_calculate_next_partition_id_should_return_next_partition_id_using_round_robin(
) {
let partitions_count = 3;
let messages_count = 1000;
let topic = init_topic(partitions_count);
let mut expected_partition_id = 0;
for _ in 1..=messages_count {
let partition_id = topic.get_next_partition_id();
expected_partition_id += 1;
if expected_partition_id > partitions_count {
expected_partition_id = 1;
}
assert_eq!(partition_id, expected_partition_id);
}
}
#[test]
fn given_multiple_partitions_calculate_partition_id_by_hash_should_return_next_partition_id() {
let partitions_count = 3;
let messages_count = 1000;
let topic = init_topic(partitions_count);
for entity_id in 1..=messages_count {
let key = Partitioning::messages_key_u32(entity_id);
let partition_id = topic.calculate_partition_id_by_messages_key_hash(&key.value);
let entity_id_hash = hash::calculate_32(&key.value);
let mut expected_partition_id = entity_id_hash % partitions_count;
if expected_partition_id == 0 {
expected_partition_id = partitions_count;
}
assert_eq!(partition_id, expected_partition_id);
}
}
fn init_topic(partitions_count: u32) -> Topic {
let storage = Arc::new(get_test_system_storage());
let stream_id = 1;
let id = 2;
let name = "test";
let config = Arc::new(SystemConfig::default());
Topic::create(
stream_id,
id,
name,
partitions_count,
config,
storage,
None,
None,
1,
)
.unwrap()
}
}