blob: b3cd20fc2556f1e32f535a7c6f1a82eb62cb0d97 [file] [log] [blame]
use crate::configs::system::SystemConfig;
use crate::streaming::segments::index::Index;
use crate::streaming::segments::time_index::TimeIndex;
use crate::streaming::storage::SystemStorage;
use iggy::models::polled_messages::PolledMessage;
use iggy::utils::timestamp::IggyTimestamp;
use std::sync::Arc;
pub const LOG_EXTENSION: &str = "log";
pub const INDEX_EXTENSION: &str = "index";
pub const TIME_INDEX_EXTENSION: &str = "timeindex";
pub const MAX_SIZE_BYTES: u32 = 1000 * 1000 * 1000;
#[derive(Debug)]
pub struct Segment {
pub stream_id: u32,
pub topic_id: u32,
pub partition_id: u32,
pub start_offset: u64,
pub end_offset: u64,
pub current_offset: u64,
pub index_path: String,
pub log_path: String,
pub time_index_path: String,
pub current_size_bytes: u32,
pub is_closed: bool,
pub(crate) message_expiry: Option<u32>,
pub(crate) unsaved_messages: Option<Vec<Arc<PolledMessage>>>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) indexes: Option<Vec<Index>>,
pub(crate) time_indexes: Option<Vec<TimeIndex>>,
pub(crate) storage: Arc<SystemStorage>,
}
impl Segment {
pub fn create(
stream_id: u32,
topic_id: u32,
partition_id: u32,
start_offset: u64,
config: Arc<SystemConfig>,
storage: Arc<SystemStorage>,
message_expiry: Option<u32>,
) -> Segment {
let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset);
Segment {
stream_id,
topic_id,
partition_id,
start_offset,
end_offset: 0,
current_offset: start_offset,
log_path: Self::get_log_path(&path),
index_path: Self::get_index_path(&path),
time_index_path: Self::get_time_index_path(&path),
current_size_bytes: 0,
message_expiry,
indexes: match config.segment.cache_indexes {
true => Some(Vec::new()),
false => None,
},
time_indexes: match config.segment.cache_time_indexes {
true => Some(Vec::new()),
false => None,
},
unsaved_messages: None,
is_closed: false,
config,
storage,
}
}
pub async fn is_full(&self) -> bool {
if self.current_size_bytes >= self.config.segment.size.as_bytes_u64() as u32 {
return true;
}
self.is_expired(IggyTimestamp::now().to_micros()).await
}
pub async fn is_expired(&self, now: u64) -> bool {
if self.message_expiry.is_none() {
return false;
}
let last_messages = self.get_messages(self.end_offset, 1).await;
if last_messages.is_err() {
return false;
}
let last_messages = last_messages.unwrap();
if last_messages.is_empty() {
return false;
}
let last_message = last_messages[0].as_ref();
let message_expiry = (self.message_expiry.unwrap() * 1000) as u64;
(last_message.timestamp + message_expiry) <= now
}
fn get_log_path(path: &str) -> String {
format!("{}.{}", path, LOG_EXTENSION)
}
fn get_index_path(path: &str) -> String {
format!("{}.{}", path, INDEX_EXTENSION)
}
fn get_time_index_path(path: &str) -> String {
format!("{}.{}", path, TIME_INDEX_EXTENSION)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::system::SegmentConfig;
use crate::streaming::storage::tests::get_test_system_storage;
#[tokio::test]
async fn should_be_created_given_valid_parameters() {
let storage = Arc::new(get_test_system_storage());
let stream_id = 1;
let topic_id = 2;
let partition_id = 3;
let start_offset = 0;
let config = Arc::new(SystemConfig::default());
let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset);
let log_path = Segment::get_log_path(&path);
let index_path = Segment::get_index_path(&path);
let time_index_path = Segment::get_time_index_path(&path);
let message_expiry = Some(10);
let segment = Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
config,
storage,
message_expiry,
);
assert_eq!(segment.stream_id, stream_id);
assert_eq!(segment.topic_id, topic_id);
assert_eq!(segment.partition_id, partition_id);
assert_eq!(segment.start_offset, start_offset);
assert_eq!(segment.current_offset, 0);
assert_eq!(segment.end_offset, 0);
assert_eq!(segment.current_size_bytes, 0);
assert_eq!(segment.log_path, log_path);
assert_eq!(segment.index_path, index_path);
assert_eq!(segment.time_index_path, time_index_path);
assert_eq!(segment.message_expiry, message_expiry);
assert!(segment.unsaved_messages.is_none());
assert!(segment.indexes.is_some());
assert!(segment.time_indexes.is_some());
assert!(!segment.is_closed);
assert!(!segment.is_full().await);
}
#[test]
fn should_not_initialize_indexes_cache_when_disabled() {
let storage = Arc::new(get_test_system_storage());
let stream_id = 1;
let topic_id = 2;
let partition_id = 3;
let start_offset = 0;
let config = Arc::new(SystemConfig {
segment: SegmentConfig {
cache_indexes: false,
..Default::default()
},
..Default::default()
});
let segment = Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
config,
storage,
None,
);
assert!(segment.indexes.is_none());
}
#[test]
fn should_not_initialize_time_indexes_cache_when_disabled() {
let storage = Arc::new(get_test_system_storage());
let stream_id = 1;
let topic_id = 2;
let partition_id = 3;
let start_offset = 0;
let config = Arc::new(SystemConfig {
segment: SegmentConfig {
cache_time_indexes: false,
..Default::default()
},
..Default::default()
});
let segment = Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
config,
storage,
None,
);
assert!(segment.time_indexes.is_none());
}
}