blob: 7c217210f0800677b9cedc7dc08168a0a02597d2 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use super::indexes::*;
use super::logs::*;
use crate::configs::system::SystemConfig;
use crate::streaming::batching::batch_accumulator::BatchAccumulator;
use crate::streaming::segments::*;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::timestamp::IggyTimestamp;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::fs::remove_file;
use tracing::{info, warn};
#[derive(Debug)]
pub struct Segment {
pub stream_id: u32,
pub topic_id: u32,
pub partition_id: u32,
pub start_offset: u64,
pub start_timestamp: u64, // first message timestamp
pub end_offset: u64,
pub end_timestamp: u64, // last message timestamp
pub current_offset: u64,
pub index_path: String,
pub log_path: String,
pub size_bytes: IggyByteSize,
pub last_index_position: u32,
pub max_size_bytes: IggyByteSize,
pub size_of_parent_stream: Arc<AtomicU64>,
pub size_of_parent_topic: Arc<AtomicU64>,
pub size_of_parent_partition: Arc<AtomicU64>,
pub messages_count_of_parent_stream: Arc<AtomicU64>,
pub messages_count_of_parent_topic: Arc<AtomicU64>,
pub messages_count_of_parent_partition: Arc<AtomicU64>,
pub is_closed: bool,
pub(super) log_writer: Option<SegmentLogWriter>,
pub(super) log_reader: Option<SegmentLogReader>,
pub(super) index_writer: Option<SegmentIndexWriter>,
pub(super) index_reader: Option<SegmentIndexReader>,
pub message_expiry: IggyExpiry,
pub unsaved_messages: Option<BatchAccumulator>,
pub config: Arc<SystemConfig>,
pub indexes: Option<Vec<Index>>,
pub(super) log_size_bytes: Arc<AtomicU64>,
pub(super) index_size_bytes: Arc<AtomicU64>,
}
impl Segment {
#[allow(clippy::too_many_arguments)]
pub fn create(
stream_id: u32,
topic_id: u32,
partition_id: u32,
start_offset: u64,
config: Arc<SystemConfig>,
message_expiry: IggyExpiry,
size_of_parent_stream: Arc<AtomicU64>,
size_of_parent_topic: Arc<AtomicU64>,
size_of_parent_partition: Arc<AtomicU64>,
messages_count_of_parent_stream: Arc<AtomicU64>,
messages_count_of_parent_topic: Arc<AtomicU64>,
messages_count_of_parent_partition: Arc<AtomicU64>,
) -> Segment {
let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset);
let log_path = Self::get_log_path(&path);
let index_path = Self::get_index_path(&path);
let message_expiry = match message_expiry {
IggyExpiry::ServerDefault => config.segment.message_expiry,
_ => message_expiry,
};
let indexes = match config.segment.cache_indexes {
true => Some(Vec::new()),
false => None,
};
Segment {
stream_id,
topic_id,
partition_id,
start_offset,
start_timestamp: IggyTimestamp::now().as_micros(),
end_offset: 0,
end_timestamp: IggyTimestamp::now().as_micros(),
current_offset: start_offset,
log_path,
index_path,
size_bytes: IggyByteSize::from(0),
last_index_position: 0,
max_size_bytes: config.segment.size,
message_expiry,
indexes,
unsaved_messages: None,
is_closed: false,
log_writer: None,
log_reader: None,
index_writer: None,
index_reader: None,
size_of_parent_stream,
size_of_parent_partition,
size_of_parent_topic,
messages_count_of_parent_stream,
messages_count_of_parent_topic,
messages_count_of_parent_partition,
config,
log_size_bytes: Arc::new(AtomicU64::new(0)),
index_size_bytes: Arc::new(AtomicU64::new(0)),
}
}
/// Load the segment state from disk.
pub async fn load_from_disk(&mut self) -> Result<(), IggyError> {
info!(
"Loading segment from disk: log_path: {}, index_path: {}",
self.log_path, self.index_path
);
if self.log_reader.is_none() || self.index_reader.is_none() {
self.initialize_writing().await?;
self.initialize_reading().await?;
}
let log_size_bytes = self.log_size_bytes.load(Ordering::Acquire);
info!("Log file size: {}", IggyByteSize::from(log_size_bytes));
// TODO(hubcio): in future, remove size_bytes and use only atomic log_size_bytes everywhere
self.size_bytes = IggyByteSize::from(log_size_bytes);
self.last_index_position = log_size_bytes as _;
self.indexes = Some(
self.index_reader
.as_ref()
.unwrap()
.load_all_indexes_impl()
.await
.with_error_context(|error| format!("Failed to load indexes for {self}. {error}"))
.map_err(|_| IggyError::CannotReadFile)?,
);
let last_index_offset = if self.indexes.as_ref().unwrap().is_empty() {
0_u64
} else {
self.indexes.as_ref().unwrap().last().unwrap().offset as u64
};
self.current_offset = self.start_offset + last_index_offset;
info!("Loaded {} indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
self.indexes.as_ref().unwrap().len(),
self.start_offset,
self.partition_id,
self.topic_id,
self.stream_id);
if !self.config.segment.cache_indexes {
self.indexes = None;
}
if self.is_full().await {
self.is_closed = true;
}
let messages_count = self.get_messages_count();
info!(
"Loaded segment with log file of size {} ({} messages) for start offset {}, current offset: {}, and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
IggyByteSize::from(log_size_bytes),
messages_count,
self.start_offset,
self.current_offset,
self.partition_id,
self.topic_id,
self.stream_id
);
self.size_of_parent_stream
.fetch_add(log_size_bytes, Ordering::SeqCst);
self.size_of_parent_topic
.fetch_add(log_size_bytes, Ordering::SeqCst);
self.size_of_parent_partition
.fetch_add(log_size_bytes, Ordering::SeqCst);
self.messages_count_of_parent_stream
.fetch_add(messages_count, Ordering::SeqCst);
self.messages_count_of_parent_topic
.fetch_add(messages_count, Ordering::SeqCst);
self.messages_count_of_parent_partition
.fetch_add(messages_count, Ordering::SeqCst);
Ok(())
}
/// Save the segment state to disk.
pub async fn persist(&mut self) -> Result<(), IggyError> {
info!("Saving segment with start offset: {} for partition with ID: {} for topic with ID: {} and stream with ID: {}",
self.start_offset, self.partition_id, self.topic_id, self.stream_id);
self.initialize_writing().await?;
self.initialize_reading().await?;
info!("Saved segment log file with start offset: {} for partition with ID: {} for topic with ID: {} and stream with ID: {}",
self.start_offset, self.partition_id, self.topic_id, self.stream_id);
Ok(())
}
pub async fn initialize_writing(&mut self) -> Result<(), IggyError> {
// TODO(hubcio): consider splitting enforce_fsync for index/log to separate entries in config
let log_fsync = self.config.partition.enforce_fsync;
let index_fsync = self.config.partition.enforce_fsync;
let server_confirmation = self.config.segment.server_confirmation;
let max_file_operation_retries = self.config.state.max_file_operation_retries;
let retry_delay = self.config.state.retry_delay;
let log_writer = SegmentLogWriter::new(
&self.log_path,
self.log_size_bytes.clone(),
log_fsync,
server_confirmation,
max_file_operation_retries,
retry_delay,
)
.await?;
let index_writer =
SegmentIndexWriter::new(&self.index_path, self.index_size_bytes.clone(), index_fsync)
.await?;
self.log_writer = Some(log_writer);
self.index_writer = Some(index_writer);
Ok(())
}
pub async fn initialize_reading(&mut self) -> Result<(), IggyError> {
let log_reader = SegmentLogReader::new(&self.log_path, self.log_size_bytes.clone()).await?;
// TODO(hubcio): there is no need to store open fd for reader if we have index cache enabled
let index_reader =
SegmentIndexReader::new(&self.index_path, self.index_size_bytes.clone()).await?;
self.log_reader = Some(log_reader);
self.index_reader = Some(index_reader);
Ok(())
}
pub async fn is_full(&self) -> bool {
if self.size_bytes >= self.max_size_bytes {
return true;
}
self.is_expired(IggyTimestamp::now()).await
}
pub async fn is_expired(&self, now: IggyTimestamp) -> bool {
if !self.is_closed {
return false;
}
match self.message_expiry {
IggyExpiry::NeverExpire => false,
IggyExpiry::ServerDefault => false,
IggyExpiry::ExpireDuration(expiry) => {
let last_messages = self.get_messages_by_offset(self.current_offset, 1).await;
if last_messages.is_err() {
return false;
}
let last_messages = last_messages.unwrap();
if last_messages.is_empty() {
return false;
}
let last_message = &last_messages[0];
let last_message_timestamp = last_message.timestamp;
last_message_timestamp + expiry.as_micros() <= now.as_micros()
}
}
}
pub async fn shutdown_reading(&mut self) {
if let Some(log_reader) = self.log_reader.take() {
drop(log_reader);
}
if let Some(index_reader) = self.index_reader.take() {
drop(index_reader);
}
}
pub async fn shutdown_writing(&mut self) {
if let Some(log_writer) = self.log_writer.take() {
tokio::spawn(async move {
let _ = log_writer.fsync().await;
log_writer.shutdown_persister_task().await;
});
} else {
warn!(
"Log writer already closed when calling close() for {}",
self
);
}
if let Some(index_writer) = self.index_writer.take() {
tokio::spawn(async move {
let _ = index_writer.fsync().await;
drop(index_writer)
});
} else {
warn!("Index writer already closed when calling close()");
}
}
pub async fn delete(&mut self) -> Result<(), IggyError> {
let segment_size = self.size_bytes;
let segment_count_of_messages = self.get_messages_count();
info!(
"Deleting segment of size {segment_size} with start offset: {} for partition with ID: {} for stream with ID: {} and topic with ID: {}...",
self.start_offset, self.partition_id, self.stream_id, self.topic_id,
);
self.shutdown_reading().await;
if !self.is_closed {
self.shutdown_writing().await;
}
let _ = remove_file(&self.log_path)
.await
.with_error_context(|error| {
format!("Failed to delete log file: {}. {error}", self.log_path)
});
let _ = remove_file(&self.index_path)
.await
.with_error_context(|error| {
format!("Failed to delete index file: {}. {error}", self.index_path)
});
let segment_size_bytes = self.size_bytes.as_bytes_u64();
self.size_of_parent_stream
.fetch_sub(segment_size_bytes, Ordering::SeqCst);
self.size_of_parent_topic
.fetch_sub(segment_size_bytes, Ordering::SeqCst);
self.size_of_parent_partition
.fetch_sub(segment_size_bytes, Ordering::SeqCst);
self.messages_count_of_parent_stream
.fetch_sub(segment_count_of_messages, Ordering::SeqCst);
self.messages_count_of_parent_topic
.fetch_sub(segment_count_of_messages, Ordering::SeqCst);
self.messages_count_of_parent_partition
.fetch_sub(segment_count_of_messages, Ordering::SeqCst);
info!(
"Deleted segment of size {segment_size} with start offset: {} for partition with ID: {} for stream with ID: {} and topic with ID: {}.",
self.start_offset, self.partition_id, self.stream_id, self.topic_id,
);
Ok(())
}
fn get_log_path(path: &str) -> String {
format!("{}.{}", path, LOG_EXTENSION)
}
fn get_index_path(path: &str) -> String {
format!("{}.{}", path, INDEX_EXTENSION)
}
}
impl std::fmt::Display for Segment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Segment {{ stream ID: {}, topic ID: {}, partition_id: {}, start_offset: {}, end_offset: {}, current_offset: {}, size_bytes: {}, last_index_position: {}, max_size_bytes: {}, closed: {} }}",
self.stream_id,
self.topic_id,
self.partition_id,
self.start_offset,
self.end_offset,
self.current_offset,
self.size_bytes,
self.last_index_position,
self.max_size_bytes,
self.is_closed
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::system::SegmentConfig;
use iggy::utils::duration::IggyDuration;
#[tokio::test]
async fn should_be_created_given_valid_parameters() {
let stream_id = 1;
let topic_id = 2;
let partition_id = 3;
let start_offset = 0;
let config = Arc::new(SystemConfig::default());
let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset);
let log_path = Segment::get_log_path(&path);
let index_path = Segment::get_index_path(&path);
let message_expiry = IggyExpiry::ExpireDuration(IggyDuration::from(10));
let size_of_parent_stream = Arc::new(AtomicU64::new(0));
let size_of_parent_topic = Arc::new(AtomicU64::new(0));
let size_of_parent_partition = Arc::new(AtomicU64::new(0));
let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
let segment = Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
config,
message_expiry,
size_of_parent_stream,
size_of_parent_topic,
size_of_parent_partition,
messages_count_of_parent_stream,
messages_count_of_parent_topic,
messages_count_of_parent_partition,
);
assert_eq!(segment.stream_id, stream_id);
assert_eq!(segment.topic_id, topic_id);
assert_eq!(segment.partition_id, partition_id);
assert_eq!(segment.start_offset, start_offset);
assert_eq!(segment.current_offset, 0);
assert_eq!(segment.end_offset, 0);
assert_eq!(segment.size_bytes, 0);
assert_eq!(segment.log_path, log_path);
assert_eq!(segment.index_path, index_path);
assert_eq!(segment.message_expiry, message_expiry);
assert!(segment.unsaved_messages.is_none());
assert!(segment.indexes.is_some());
assert!(!segment.is_closed);
assert!(!segment.is_full().await);
}
#[tokio::test]
async fn should_not_initialize_indexes_cache_when_disabled() {
let stream_id = 1;
let topic_id = 2;
let partition_id = 3;
let start_offset = 0;
let config = Arc::new(SystemConfig {
segment: SegmentConfig {
cache_indexes: false,
..Default::default()
},
..Default::default()
});
let message_expiry = IggyExpiry::NeverExpire;
let size_of_parent_stream = Arc::new(AtomicU64::new(0));
let size_of_parent_topic = Arc::new(AtomicU64::new(0));
let size_of_parent_partition = Arc::new(AtomicU64::new(0));
let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
let segment = Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
config,
message_expiry,
size_of_parent_stream,
size_of_parent_topic,
size_of_parent_partition,
messages_count_of_parent_stream,
messages_count_of_parent_topic,
messages_count_of_parent_partition,
);
assert!(segment.indexes.is_none());
}
}