blob: 8ab1e1dcfeda234cc3def28eeefa89e76e9894b8 [file] [log] [blame]
use std::sync::atomic::Ordering;
use crate::streaming::partitions::partition::Partition;
use crate::streaming::segments::segment::Segment;
use iggy::error::IggyError;
use tracing::info;
pub struct DeletedSegment {
pub end_offset: u64,
pub messages_count: u64,
}
impl Partition {
pub fn get_segments_count(&self) -> u32 {
self.segments.len() as u32
}
pub fn get_segments(&self) -> &Vec<Segment> {
&self.segments
}
pub fn get_segments_mut(&mut self) -> &mut Vec<Segment> {
&mut self.segments
}
pub async fn get_expired_segments_start_offsets(&self, now: u64) -> Vec<u64> {
let mut expired_segments = Vec::new();
for segment in &self.segments {
if segment.is_expired(now).await {
expired_segments.push(segment.start_offset);
}
}
expired_segments.sort();
expired_segments
}
pub async fn add_persisted_segment(&mut self, start_offset: u64) -> Result<(), IggyError> {
info!(
"Creating the new segment for partition with ID: {}, stream with ID: {}, topic with ID: {}...",
self.partition_id, self.stream_id, self.topic_id
);
let new_segment = Segment::create(
self.stream_id,
self.topic_id,
self.partition_id,
start_offset,
self.config.clone(),
self.storage.clone(),
self.message_expiry,
self.size_of_parent_stream.clone(),
self.size_of_parent_topic.clone(),
self.size_bytes.clone(),
self.messages_count_of_parent_stream.clone(),
self.messages_count_of_parent_topic.clone(),
self.messages_count.clone(),
);
new_segment.persist().await?;
self.segments.push(new_segment);
self.segments_count_of_parent_stream
.fetch_add(1, Ordering::SeqCst);
Ok(())
}
pub async fn delete_segment(&mut self, start_offset: u64) -> Result<DeletedSegment, IggyError> {
let deleted_segment;
{
let segment = self
.segments
.iter()
.find(|s| s.start_offset == start_offset);
if segment.is_none() {
return Err(IggyError::SegmentNotFound);
}
let segment = segment.unwrap();
self.storage.segment.delete(segment).await?;
self.segments_count_of_parent_stream
.fetch_sub(1, Ordering::SeqCst);
deleted_segment = DeletedSegment {
end_offset: segment.end_offset,
messages_count: segment.get_messages_count(),
};
}
self.segments.retain(|s| s.start_offset != start_offset);
Ok(deleted_segment)
}
}