| use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; |
| use crate::streaming::segments::segment::{Segment, LOG_EXTENSION}; |
| use crate::streaming::storage::{PartitionStorage, Storage}; |
| use anyhow::Context; |
| use async_trait::async_trait; |
| use iggy::consumer::ConsumerKind; |
| use iggy::error::IggyError; |
| use serde::{Deserialize, Serialize}; |
| use sled::Db; |
| use std::path::Path; |
| use std::sync::atomic::Ordering; |
| use std::sync::Arc; |
| use tokio::fs; |
| use tokio::fs::create_dir; |
| use tracing::{error, info, trace, warn}; |
| |
| #[derive(Debug)] |
| pub struct FilePartitionStorage { |
| db: Arc<Db>, |
| } |
| |
| impl FilePartitionStorage { |
| pub fn new(db: Arc<Db>) -> Self { |
| Self { db } |
| } |
| } |
| |
| unsafe impl Send for FilePartitionStorage {} |
| unsafe impl Sync for FilePartitionStorage {} |
| |
| #[async_trait] |
| impl PartitionStorage for FilePartitionStorage { |
| async fn save_consumer_offset(&self, offset: &ConsumerOffset) -> Result<(), IggyError> { |
| // The stored value is just the offset, so we don't need to serialize the whole struct. |
| // It should be as fast and lightweight as possible. |
| // As described in the docs, sled works better with big-endian byte order. |
| if let Err(err) = self |
| .db |
| .insert(&offset.key, &offset.offset.to_be_bytes()) |
| .with_context(|| { |
| format!( |
| "Failed to save consumer offset: {}, key: {}", |
| offset.offset, offset.key |
| ) |
| }) |
| { |
| return Err(IggyError::CannotSaveResource(err)); |
| } |
| |
| trace!( |
| "Stored consumer offset value: {} for {} with ID: {}", |
| offset.offset, |
| offset.kind, |
| offset.consumer_id |
| ); |
| Ok(()) |
| } |
| |
| async fn load_consumer_offsets( |
| &self, |
| kind: ConsumerKind, |
| stream_id: u32, |
| topic_id: u32, |
| partition_id: u32, |
| ) -> Result<Vec<ConsumerOffset>, IggyError> { |
| let mut consumer_offsets = Vec::new(); |
| let key_prefix = format!( |
| "{}:", |
| ConsumerOffset::get_key_prefix(kind, stream_id, topic_id, partition_id) |
| ); |
| for data in self.db.scan_prefix(&key_prefix) { |
| let consumer_offset = match data.with_context(|| { |
| format!( |
| "Failed to load consumer offset, when searching by key: {}", |
| key_prefix |
| ) |
| }) { |
| Ok((key, value)) => { |
| let key = String::from_utf8(key.to_vec()).unwrap(); |
| let offset = u64::from_be_bytes(value.as_ref().try_into().unwrap()); |
| let consumer_id = key.split(':').last().unwrap().parse::<u32>().unwrap(); |
| ConsumerOffset { |
| key, |
| kind, |
| consumer_id, |
| offset, |
| } |
| } |
| Err(err) => { |
| return Err(IggyError::CannotLoadResource(err)); |
| } |
| }; |
| consumer_offsets.push(consumer_offset); |
| } |
| |
| consumer_offsets.sort_by(|a, b| a.consumer_id.cmp(&b.consumer_id)); |
| Ok(consumer_offsets) |
| } |
| |
| async fn delete_consumer_offsets( |
| &self, |
| kind: ConsumerKind, |
| stream_id: u32, |
| topic_id: u32, |
| partition_id: u32, |
| ) -> Result<(), IggyError> { |
| let consumer_offset_key_prefix = format!( |
| "{}:", |
| ConsumerOffset::get_key_prefix(kind, stream_id, topic_id, partition_id) |
| ); |
| |
| for data in self.db.scan_prefix(&consumer_offset_key_prefix) { |
| match data.with_context(|| { |
| format!( |
| "Failed to delete consumer offset, when searching by key: {}", |
| consumer_offset_key_prefix |
| ) |
| }) { |
| Ok((key, _)) => { |
| if let Err(err) = self.db.remove(&key).with_context(|| { |
| format!("Failed to delete consumer offset, key: {:?}", key) |
| }) { |
| return Err(IggyError::CannotLoadResource(err)); |
| } |
| } |
| Err(err) => { |
| return Err(IggyError::CannotLoadResource(err)); |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| #[derive(Debug, Serialize, Deserialize)] |
| struct PartitionData { |
| created_at: u64, |
| } |
| |
| #[async_trait] |
| impl Storage<Partition> for FilePartitionStorage { |
| async fn load(&self, partition: &mut Partition) -> Result<(), IggyError> { |
| info!( |
| "Loading partition with ID: {} for stream with ID: {} and topic with ID: {}, for path: {} from disk...", |
| partition.partition_id, partition.stream_id, partition.topic_id, partition.path |
| ); |
| let dir_entries = fs::read_dir(&partition.path).await; |
| if let Err(err) = fs::read_dir(&partition.path) |
| .await |
| .with_context(|| format!("Failed to read partition with ID: {} for stream with ID: {} and topic with ID: {} and path: {}", partition.partition_id, partition.stream_id, partition.topic_id, partition.path)) |
| { |
| return Err(IggyError::CannotReadPartitions(err)); |
| } |
| |
| let key = get_partition_key( |
| partition.stream_id, |
| partition.topic_id, |
| partition.partition_id, |
| ); |
| let partition_data = match self |
| .db |
| .get(&key) |
| .with_context(|| format!("Failed to load partition with key: {}", key)) |
| { |
| Ok(partition_data) => { |
| if let Some(partition_data) = partition_data { |
| let partition_data = rmp_serde::from_slice::<PartitionData>(&partition_data) |
| .with_context(|| { |
| format!("Failed to deserialize partition with key: {}", key) |
| }); |
| if let Err(err) = partition_data { |
| return Err(IggyError::CannotDeserializeResource(err)); |
| } else { |
| partition_data.unwrap() |
| } |
| } else { |
| return Err(IggyError::ResourceNotFound(key)); |
| } |
| } |
| Err(err) => { |
| return Err(IggyError::CannotLoadResource(err)); |
| } |
| }; |
| |
| partition.created_at = partition_data.created_at; |
| |
| let mut dir_entries = dir_entries.unwrap(); |
| while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { |
| let metadata = dir_entry.metadata().await.unwrap(); |
| if metadata.is_dir() { |
| continue; |
| } |
| |
| let path = dir_entry.path(); |
| let extension = path.extension(); |
| if extension.is_none() || extension.unwrap() != LOG_EXTENSION { |
| continue; |
| } |
| |
| let log_file_name = dir_entry |
| .file_name() |
| .into_string() |
| .unwrap() |
| .replace(&format!(".{}", LOG_EXTENSION), ""); |
| |
| let start_offset = log_file_name.parse::<u64>().unwrap(); |
| let mut segment = Segment::create( |
| partition.stream_id, |
| partition.topic_id, |
| partition.partition_id, |
| start_offset, |
| partition.config.clone(), |
| partition.storage.clone(), |
| partition.message_expiry, |
| partition.size_of_parent_stream.clone(), |
| partition.size_of_parent_topic.clone(), |
| partition.size_bytes.clone(), |
| partition.messages_count_of_parent_stream.clone(), |
| partition.messages_count_of_parent_topic.clone(), |
| partition.messages_count.clone(), |
| ); |
| segment.load().await?; |
| if !segment.is_closed { |
| segment.unsaved_batches = Some(Vec::new()) |
| } |
| |
| // If the first segment has at least a single message, we should increment the offset. |
| if !partition.should_increment_offset { |
| partition.should_increment_offset = segment.size_bytes > 0; |
| } |
| |
| if partition.config.partition.validate_checksum { |
| info!("Validating messages checksum for partition with ID: {} and segment with start offset: {}...", partition.partition_id, segment.start_offset); |
| segment.storage.segment.load_checksums(&segment).await?; |
| info!("Validated messages checksum for partition with ID: {} and segment with start offset: {}.", partition.partition_id, segment.start_offset); |
| } |
| |
| // Load the unique message IDs for the partition if the deduplication feature is enabled. |
| let mut unique_message_ids_count = 0; |
| if let Some(message_deduplicator) = &partition.message_deduplicator { |
| info!("Loading unique message IDs for partition with ID: {} and segment with start offset: {}...", partition.partition_id, segment.start_offset); |
| let message_ids = segment.storage.segment.load_message_ids(&segment).await?; |
| for message_id in message_ids { |
| if message_deduplicator.try_insert(&message_id).await { |
| unique_message_ids_count += 1; |
| } else { |
| warn!("Duplicated message ID: {} for partition with ID: {} and segment with start offset: {}.", message_id, partition.partition_id, segment.start_offset); |
| } |
| } |
| info!("Loaded: {} unique message IDs for partition with ID: {} and segment with start offset: {}...", unique_message_ids_count, partition.partition_id, segment.start_offset); |
| } |
| |
| partition |
| .segments_count_of_parent_stream |
| .fetch_add(1, Ordering::SeqCst); |
| partition.segments.push(segment); |
| } |
| |
| partition |
| .segments |
| .sort_by(|a, b| a.start_offset.cmp(&b.start_offset)); |
| |
| let end_offsets = partition |
| .segments |
| .iter() |
| .skip(1) |
| .map(|segment| segment.start_offset - 1) |
| .collect::<Vec<u64>>(); |
| |
| let segments_count = partition.segments.len(); |
| for (end_offset_index, segment) in partition.get_segments_mut().iter_mut().enumerate() { |
| if end_offset_index == segments_count - 1 { |
| break; |
| } |
| |
| segment.end_offset = end_offsets[end_offset_index]; |
| } |
| |
| if !partition.segments.is_empty() { |
| let last_segment = partition.segments.last_mut().unwrap(); |
| if last_segment.is_closed { |
| last_segment.end_offset = last_segment.current_offset; |
| } |
| |
| partition.current_offset = last_segment.current_offset; |
| } |
| |
| partition.load_consumer_offsets().await?; |
| info!( |
| "Loaded partition with ID: {} for stream with ID: {} and topic with ID: {}, current offset: {}.", |
| partition.partition_id, partition.stream_id, partition.topic_id, partition.current_offset |
| ); |
| |
| Ok(()) |
| } |
| |
| async fn save(&self, partition: &Partition) -> Result<(), IggyError> { |
| info!( |
| "Saving partition with start ID: {} for stream with ID: {} and topic with ID: {}...", |
| partition.partition_id, partition.stream_id, partition.topic_id |
| ); |
| if !Path::new(&partition.path).exists() && create_dir(&partition.path).await.is_err() { |
| return Err(IggyError::CannotCreatePartitionDirectory( |
| partition.partition_id, |
| partition.stream_id, |
| partition.topic_id, |
| )); |
| } |
| |
| let key = get_partition_key( |
| partition.stream_id, |
| partition.topic_id, |
| partition.partition_id, |
| ); |
| match rmp_serde::to_vec(&PartitionData { |
| created_at: partition.created_at, |
| }) |
| .with_context(|| format!("Failed to serialize partition with key: {}", key)) |
| { |
| Ok(data) => { |
| if let Err(err) = self |
| .db |
| .insert(&key, data) |
| .with_context(|| format!("Failed to insert partition with key: {}", key)) |
| { |
| return Err(IggyError::CannotSaveResource(err)); |
| } |
| } |
| Err(err) => { |
| return Err(IggyError::CannotSerializeResource(err)); |
| } |
| } |
| |
| if let Err(err) = self |
| .db |
| .insert( |
| &key, |
| rmp_serde::to_vec(&PartitionData { |
| created_at: partition.created_at, |
| }) |
| .unwrap(), |
| ) |
| .with_context(|| format!("Failed to insert partition with key: {}", key)) |
| { |
| return Err(IggyError::CannotSaveResource(err)); |
| } |
| |
| for segment in partition.get_segments() { |
| segment.persist().await?; |
| } |
| |
| info!("Saved partition with start ID: {} for stream with ID: {} and topic with ID: {}, path: {}.", partition.partition_id, partition.stream_id, partition.topic_id, partition.path); |
| |
| Ok(()) |
| } |
| |
| async fn delete(&self, partition: &Partition) -> Result<(), IggyError> { |
| info!( |
| "Deleting partition with ID: {} for stream with ID: {} and topic with ID: {}...", |
| partition.partition_id, partition.stream_id, partition.topic_id, |
| ); |
| if self |
| .db |
| .remove(get_partition_key( |
| partition.stream_id, |
| partition.topic_id, |
| partition.partition_id, |
| )) |
| .is_err() |
| { |
| return Err(IggyError::CannotDeletePartition( |
| partition.partition_id, |
| partition.topic_id, |
| partition.stream_id, |
| )); |
| } |
| |
| if let Err(err) = self |
| .delete_consumer_offsets( |
| ConsumerKind::Consumer, |
| partition.stream_id, |
| partition.topic_id, |
| partition.partition_id, |
| ) |
| .await |
| { |
| error!("Cannot delete consumer offsets for partition with ID: {} for topic with ID: {} for stream with ID: {}. Error: {}", partition.partition_id, partition.topic_id, partition.stream_id, err); |
| return Err(IggyError::CannotDeletePartition( |
| partition.partition_id, |
| partition.topic_id, |
| partition.stream_id, |
| )); |
| } |
| |
| if let Err(err) = self |
| .delete_consumer_offsets( |
| ConsumerKind::ConsumerGroup, |
| partition.stream_id, |
| partition.topic_id, |
| partition.partition_id, |
| ) |
| .await |
| { |
| error!("Cannot delete consumer group offsets for partition with ID: {} for topic with ID: {} for stream with ID: {}. Error: {}", partition.partition_id, partition.topic_id, partition.stream_id, err); |
| return Err(IggyError::CannotDeletePartition( |
| partition.partition_id, |
| partition.topic_id, |
| partition.stream_id, |
| )); |
| } |
| |
| if fs::remove_dir_all(&partition.path).await.is_err() { |
| error!("Cannot delete partition directory: {} for partition with ID: {} for topic with ID: {} for stream with ID: {}.", partition.path, partition.partition_id, partition.topic_id, partition.stream_id); |
| return Err(IggyError::CannotDeletePartitionDirectory( |
| partition.partition_id, |
| partition.stream_id, |
| partition.topic_id, |
| )); |
| } |
| info!( |
| "Deleted partition with ID: {} for stream with ID: {} and topic with ID: {}.", |
| partition.partition_id, partition.stream_id, partition.topic_id, |
| ); |
| Ok(()) |
| } |
| } |
| |
| fn get_partition_key(stream_id: u32, topic_id: u32, partition_id: u32) -> String { |
| format!( |
| "streams:{}:topics:{}:partitions:{}", |
| stream_id, topic_id, partition_id |
| ) |
| } |