| use crate::configs::system::SystemConfig; |
| use crate::streaming::segments::index::Index; |
| use crate::streaming::segments::time_index::TimeIndex; |
| use crate::streaming::storage::SystemStorage; |
| use iggy::models::polled_messages::PolledMessage; |
| use iggy::utils::timestamp::IggyTimestamp; |
| use std::sync::Arc; |
| |
| pub const LOG_EXTENSION: &str = "log"; |
| pub const INDEX_EXTENSION: &str = "index"; |
| pub const TIME_INDEX_EXTENSION: &str = "timeindex"; |
| pub const MAX_SIZE_BYTES: u32 = 1000 * 1000 * 1000; |
| |
| #[derive(Debug)] |
| pub struct Segment { |
| pub stream_id: u32, |
| pub topic_id: u32, |
| pub partition_id: u32, |
| pub start_offset: u64, |
| pub end_offset: u64, |
| pub current_offset: u64, |
| pub index_path: String, |
| pub log_path: String, |
| pub time_index_path: String, |
| pub current_size_bytes: u32, |
| pub is_closed: bool, |
| pub(crate) message_expiry: Option<u32>, |
| pub(crate) unsaved_messages: Option<Vec<Arc<PolledMessage>>>, |
| pub(crate) config: Arc<SystemConfig>, |
| pub(crate) indexes: Option<Vec<Index>>, |
| pub(crate) time_indexes: Option<Vec<TimeIndex>>, |
| pub(crate) storage: Arc<SystemStorage>, |
| } |
| |
| impl Segment { |
| pub fn create( |
| stream_id: u32, |
| topic_id: u32, |
| partition_id: u32, |
| start_offset: u64, |
| config: Arc<SystemConfig>, |
| storage: Arc<SystemStorage>, |
| message_expiry: Option<u32>, |
| ) -> Segment { |
| let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset); |
| |
| Segment { |
| stream_id, |
| topic_id, |
| partition_id, |
| start_offset, |
| end_offset: 0, |
| current_offset: start_offset, |
| log_path: Self::get_log_path(&path), |
| index_path: Self::get_index_path(&path), |
| time_index_path: Self::get_time_index_path(&path), |
| current_size_bytes: 0, |
| message_expiry, |
| indexes: match config.segment.cache_indexes { |
| true => Some(Vec::new()), |
| false => None, |
| }, |
| time_indexes: match config.segment.cache_time_indexes { |
| true => Some(Vec::new()), |
| false => None, |
| }, |
| unsaved_messages: None, |
| is_closed: false, |
| config, |
| storage, |
| } |
| } |
| |
| pub async fn is_full(&self) -> bool { |
| if self.current_size_bytes >= self.config.segment.size.as_bytes_u64() as u32 { |
| return true; |
| } |
| |
| self.is_expired(IggyTimestamp::now().to_micros()).await |
| } |
| |
| pub async fn is_expired(&self, now: u64) -> bool { |
| if self.message_expiry.is_none() { |
| return false; |
| } |
| |
| let last_messages = self.get_messages(self.end_offset, 1).await; |
| if last_messages.is_err() { |
| return false; |
| } |
| |
| let last_messages = last_messages.unwrap(); |
| if last_messages.is_empty() { |
| return false; |
| } |
| |
| let last_message = last_messages[0].as_ref(); |
| let message_expiry = (self.message_expiry.unwrap() * 1000) as u64; |
| (last_message.timestamp + message_expiry) <= now |
| } |
| |
| fn get_log_path(path: &str) -> String { |
| format!("{}.{}", path, LOG_EXTENSION) |
| } |
| |
| fn get_index_path(path: &str) -> String { |
| format!("{}.{}", path, INDEX_EXTENSION) |
| } |
| |
| fn get_time_index_path(path: &str) -> String { |
| format!("{}.{}", path, TIME_INDEX_EXTENSION) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::configs::system::SegmentConfig; |
| use crate::streaming::storage::tests::get_test_system_storage; |
| |
| #[tokio::test] |
| async fn should_be_created_given_valid_parameters() { |
| let storage = Arc::new(get_test_system_storage()); |
| let stream_id = 1; |
| let topic_id = 2; |
| let partition_id = 3; |
| let start_offset = 0; |
| let config = Arc::new(SystemConfig::default()); |
| let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset); |
| let log_path = Segment::get_log_path(&path); |
| let index_path = Segment::get_index_path(&path); |
| let time_index_path = Segment::get_time_index_path(&path); |
| let message_expiry = Some(10); |
| |
| let segment = Segment::create( |
| stream_id, |
| topic_id, |
| partition_id, |
| start_offset, |
| config, |
| storage, |
| message_expiry, |
| ); |
| |
| assert_eq!(segment.stream_id, stream_id); |
| assert_eq!(segment.topic_id, topic_id); |
| assert_eq!(segment.partition_id, partition_id); |
| assert_eq!(segment.start_offset, start_offset); |
| assert_eq!(segment.current_offset, 0); |
| assert_eq!(segment.end_offset, 0); |
| assert_eq!(segment.current_size_bytes, 0); |
| assert_eq!(segment.log_path, log_path); |
| assert_eq!(segment.index_path, index_path); |
| assert_eq!(segment.time_index_path, time_index_path); |
| assert_eq!(segment.message_expiry, message_expiry); |
| assert!(segment.unsaved_messages.is_none()); |
| assert!(segment.indexes.is_some()); |
| assert!(segment.time_indexes.is_some()); |
| assert!(!segment.is_closed); |
| assert!(!segment.is_full().await); |
| } |
| |
| #[test] |
| fn should_not_initialize_indexes_cache_when_disabled() { |
| let storage = Arc::new(get_test_system_storage()); |
| let stream_id = 1; |
| let topic_id = 2; |
| let partition_id = 3; |
| let start_offset = 0; |
| let config = Arc::new(SystemConfig { |
| segment: SegmentConfig { |
| cache_indexes: false, |
| ..Default::default() |
| }, |
| ..Default::default() |
| }); |
| |
| let segment = Segment::create( |
| stream_id, |
| topic_id, |
| partition_id, |
| start_offset, |
| config, |
| storage, |
| None, |
| ); |
| |
| assert!(segment.indexes.is_none()); |
| } |
| |
| #[test] |
| fn should_not_initialize_time_indexes_cache_when_disabled() { |
| let storage = Arc::new(get_test_system_storage()); |
| let stream_id = 1; |
| let topic_id = 2; |
| let partition_id = 3; |
| let start_offset = 0; |
| let config = Arc::new(SystemConfig { |
| segment: SegmentConfig { |
| cache_time_indexes: false, |
| ..Default::default() |
| }, |
| ..Default::default() |
| }); |
| |
| let segment = Segment::create( |
| stream_id, |
| topic_id, |
| partition_id, |
| start_offset, |
| config, |
| storage, |
| None, |
| ); |
| |
| assert!(segment.time_indexes.is_none()); |
| } |
| } |