blob: 215add8e8a2577d7500e92fb71d770ad679d7a7a [file] [log] [blame]
use crate::streaming::batching::iterator::IntoMessagesIterator;
use crate::streaming::batching::message_batch::RetainedMessageBatch;
use crate::streaming::models::messages::RetainedMessage;
use crate::streaming::persistence::persister::Persister;
use crate::streaming::segments::index::{Index, IndexRange};
use crate::streaming::segments::segment::Segment;
use crate::streaming::segments::time_index::TimeIndex;
use crate::streaming::sizeable::Sizeable;
use crate::streaming::storage::SegmentStorage;
use crate::streaming::utils::file;
use crate::streaming::utils::head_tail_buf::HeadTailBuffer;
use anyhow::Context;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use iggy::error::IggyError;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::checksum;
use std::io::SeekFrom;
use std::path::Path;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
use tracing::{error, info, trace, warn};
const EMPTY_INDEXES: Vec<Index> = vec![];
const EMPTY_TIME_INDEXES: Vec<TimeIndex> = vec![];
pub(crate) const INDEX_SIZE: u32 = 8;
pub(crate) const TIME_INDEX_SIZE: u32 = 12;
const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000;
#[derive(Debug)]
pub struct FileSegmentStorage {
persister: Arc<dyn Persister>,
}
impl FileSegmentStorage {
pub fn new(persister: Arc<dyn Persister>) -> Self {
Self { persister }
}
}
unsafe impl Send for FileSegmentStorage {}
unsafe impl Sync for FileSegmentStorage {}
#[async_trait]
impl SegmentStorage for FileSegmentStorage {
async fn load(&self, segment: &mut Segment) -> Result<(), IggyError> {
info!(
"Loading segment from disk for start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {} ...",
segment.start_offset, segment.partition_id, segment.topic_id, segment.stream_id
);
let log_file = file::open(&segment.log_path).await?;
let file_size = log_file.metadata().await.unwrap().len() as u64;
segment.size_bytes = file_size as u32;
if segment.config.segment.cache_indexes {
segment.indexes = Some(segment.storage.segment.load_all_indexes(segment).await?);
info!(
"Loaded {} indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
segment.indexes.as_ref().unwrap().len(),
segment.start_offset,
segment.partition_id,
segment.topic_id,
segment.stream_id
);
}
if segment.config.segment.cache_time_indexes {
let time_indexes = self.load_all_time_indexes(segment).await?;
if !time_indexes.is_empty() {
let last_index = time_indexes.last().unwrap();
segment.current_offset = segment.start_offset + last_index.relative_offset as u64;
segment.time_indexes = Some(time_indexes);
}
info!(
"Loaded {} time indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
segment.time_indexes.as_ref().unwrap().len(),
segment.start_offset,
segment.partition_id,
segment.topic_id,
segment.stream_id
);
} else {
let last_time_index = self.load_last_time_index(segment).await?;
if let Some(last_index) = last_time_index {
segment.current_offset = segment.start_offset + last_index.relative_offset as u64;
info!(
"Loaded last time index for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
segment.start_offset,
segment.partition_id,
segment.topic_id,
segment.stream_id
);
}
}
if segment.is_full().await {
segment.is_closed = true;
}
let messages_count = segment.get_messages_count();
info!(
"Loaded segment log file of size {} for start offset {}, current offset: {}, and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
IggyByteSize::from(file_size), segment.start_offset, segment.current_offset, segment.partition_id, segment.topic_id, segment.stream_id
);
segment
.size_of_parent_stream
.fetch_add(file_size, Ordering::SeqCst);
segment
.size_of_parent_topic
.fetch_add(file_size, Ordering::SeqCst);
segment
.size_of_parent_partition
.fetch_add(file_size, Ordering::SeqCst);
segment
.messages_count_of_parent_stream
.fetch_add(messages_count, Ordering::SeqCst);
segment
.messages_count_of_parent_topic
.fetch_add(messages_count, Ordering::SeqCst);
segment
.messages_count_of_parent_partition
.fetch_add(messages_count, Ordering::SeqCst);
Ok(())
}
async fn save(&self, segment: &Segment) -> Result<(), IggyError> {
info!("Saving segment with start offset: {} for partition with ID: {} for topic with ID: {} and stream with ID: {}",
segment.start_offset, segment.partition_id, segment.topic_id, segment.stream_id);
if !Path::new(&segment.log_path).exists()
&& self
.persister
.overwrite(&segment.log_path, &[])
.await
.is_err()
{
return Err(IggyError::CannotCreateSegmentLogFile(
segment.log_path.clone(),
));
}
if !Path::new(&segment.time_index_path).exists()
&& self
.persister
.overwrite(&segment.time_index_path, &[])
.await
.is_err()
{
return Err(IggyError::CannotCreateSegmentTimeIndexFile(
segment.time_index_path.clone(),
));
}
if !Path::new(&segment.index_path).exists()
&& self
.persister
.overwrite(&segment.index_path, &[])
.await
.is_err()
{
return Err(IggyError::CannotCreateSegmentIndexFile(
segment.index_path.clone(),
));
}
info!("Saved segment log file with start offset: {} for partition with ID: {} for topic with ID: {} and stream with ID: {}",
segment.start_offset, segment.partition_id, segment.topic_id, segment.stream_id);
Ok(())
}
async fn delete(&self, segment: &Segment) -> Result<(), IggyError> {
let segment_size = segment.size_bytes;
let segment_count_of_messages = segment.get_messages_count();
info!(
"Deleting segment of size {segment_size} with start offset: {} for partition with ID: {} for stream with ID: {} and topic with ID: {}...",
segment.start_offset, segment.partition_id, segment.stream_id, segment.topic_id,
);
self.persister.delete(&segment.log_path).await?;
self.persister.delete(&segment.index_path).await?;
self.persister.delete(&segment.time_index_path).await?;
segment
.size_of_parent_stream
.fetch_sub(segment.size_bytes as u64, Ordering::SeqCst);
segment
.size_of_parent_topic
.fetch_sub(segment.size_bytes as u64, Ordering::SeqCst);
segment
.size_of_parent_partition
.fetch_sub(segment.size_bytes as u64, Ordering::SeqCst);
segment
.messages_count_of_parent_stream
.fetch_sub(segment_count_of_messages, Ordering::SeqCst);
segment
.messages_count_of_parent_topic
.fetch_sub(segment_count_of_messages, Ordering::SeqCst);
segment
.messages_count_of_parent_partition
.fetch_sub(segment_count_of_messages, Ordering::SeqCst);
info!(
"Deleted segment of size {segment_size} with start offset: {} for partition with ID: {} for stream with ID: {} and topic with ID: {}.",
segment.start_offset, segment.partition_id, segment.stream_id, segment.topic_id,
);
Ok(())
}
async fn load_message_batches(
&self,
segment: &Segment,
index_range: &IndexRange,
) -> Result<Vec<RetainedMessageBatch>, IggyError> {
let mut batches = Vec::new();
load_batches_by_range(segment, index_range, |batch| {
batches.push(batch);
Ok(())
})
.await?;
trace!("Loaded {} message batches from disk.", batches.len());
Ok(batches)
}
async fn load_newest_batches_by_size(
&self,
segment: &Segment,
size_bytes: u64,
) -> Result<Vec<RetainedMessageBatch>, IggyError> {
let mut batches = Vec::new();
let mut total_size_bytes = 0;
load_messages_by_size(segment, size_bytes, |batch| {
total_size_bytes += batch.get_size_bytes() as u64;
batches.push(batch);
Ok(())
})
.await?;
let messages_count = batches.len();
trace!(
"Loaded {} newest messages batches of total size {} bytes from disk.",
messages_count,
total_size_bytes
);
Ok(batches)
}
async fn save_batches(
&self,
segment: &Segment,
batches: &[Arc<RetainedMessageBatch>],
) -> Result<u32, IggyError> {
let messages_size = batches.iter().map(|batch| batch.get_size_bytes()).sum();
let mut bytes = BytesMut::with_capacity(messages_size as usize);
for batch in batches {
batch.extend(&mut bytes);
}
if let Err(err) = self
.persister
.append(&segment.log_path, &bytes)
.await
.with_context(|| format!("Failed to save messages to segment: {}", segment.log_path))
{
return Err(IggyError::CannotSaveMessagesToSegment(err));
}
Ok(messages_size)
}
async fn load_message_ids(&self, segment: &Segment) -> Result<Vec<u128>, IggyError> {
let mut message_ids = Vec::new();
load_batches_by_range(segment, &IndexRange::max_range(), |batch| {
message_ids.extend(
batch
.into_messages_iter()
.map(|msg: RetainedMessage| msg.id),
);
Ok(())
})
.await?;
trace!("Loaded {} message IDs from disk.", message_ids.len());
Ok(message_ids)
}
async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError> {
load_batches_by_range(segment, &IndexRange::max_range(), |batch| {
for message in batch.into_messages_iter() {
let calculated_checksum = checksum::calculate(&message.payload);
trace!(
"Loaded message for offset: {}, checksum: {}, expected: {}",
message.offset,
calculated_checksum,
message.checksum
);
if calculated_checksum != message.checksum {
return Err(IggyError::InvalidMessageChecksum(
calculated_checksum,
message.checksum,
message.offset,
));
}
}
Ok(())
})
.await?;
Ok(())
}
async fn load_all_indexes(&self, segment: &Segment) -> Result<Vec<Index>, IggyError> {
trace!("Loading indexes from file...");
let file = file::open(&segment.index_path).await?;
let file_size = file.metadata().await?.len() as usize;
if file_size == 0 {
trace!("Index file is empty.");
return Ok(EMPTY_INDEXES);
}
let indexes_count = file_size / INDEX_SIZE as usize;
let mut indexes = Vec::with_capacity(indexes_count);
let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file);
for idx_num in 0..indexes_count {
let offset = reader.read_u32_le().await.map_err(|error| {
error!(
"Cannot read offset from index file for index number: {}. Error: {}",
idx_num, &error
);
error
})?;
let position = reader.read_u32_le().await.map_err(|error| {
error!(
"Cannot read position from index file for offset: {}. Error: {}",
offset, &error
);
error
})?;
indexes.push(Index {
relative_offset: offset,
position,
});
}
if indexes.len() != indexes_count {
error!(
"Loaded {} indexes from disk, expected {}.",
indexes.len(),
indexes_count
);
}
trace!("Loaded {} indexes from file.", indexes_count);
Ok(indexes)
}
async fn load_index_range(
&self,
segment: &Segment,
index_start_offset: u64,
index_end_offset: u64,
) -> Result<Option<IndexRange>, IggyError> {
trace!(
"Loading index range for offsets: {} to {}, segment starts at: {}",
index_start_offset,
index_end_offset,
segment.start_offset
);
if index_start_offset > index_end_offset {
warn!(
"Index start offset: {} is greater than index end offset: {}.",
index_start_offset, index_end_offset
);
return Ok(None);
}
let file = file::open(&segment.index_path).await?;
let file_length = file.metadata().await?.len() as u32;
if file_length == 0 {
trace!("Index file is empty.");
return Ok(None);
}
trace!("Index file length: {}.", file_length);
let relative_start_offset = (index_start_offset - segment.start_offset) as u32;
let relative_end_offset = (index_end_offset - segment.start_offset) as u32;
trace!(
"Seeking to index range: {}...{}",
relative_start_offset,
relative_end_offset,
);
let mut idx_pred = HeadTailBuffer::new();
let mut index_range = IndexRange::default();
let mut read_bytes = 0;
let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file);
loop {
let relative_offset = reader.read_u32_le().await?;
let position = reader.read_u32_le().await?;
read_bytes += INDEX_SIZE;
let idx = Index {
relative_offset,
position,
};
idx_pred.push(idx);
if relative_offset >= relative_start_offset {
index_range.start = idx_pred.tail().unwrap_or_default();
}
if relative_offset >= relative_end_offset {
index_range.end = idx;
break;
}
if read_bytes == file_length {
break;
}
}
trace!(
"Loaded index range: {}...{}, position range: {}...{}",
relative_start_offset,
relative_end_offset,
index_range.start.position,
index_range.end.position
);
Ok(Some(index_range))
}
async fn save_index(&self, segment: &Segment) -> Result<(), IggyError> {
if let Err(err) = self
.persister
.append(&segment.index_path, &segment.unsaved_indexes)
.await
.with_context(|| format!("Failed to save index to segment: {}", segment.index_path))
{
return Err(IggyError::CannotSaveIndexToSegment(err));
}
Ok(())
}
async fn try_load_time_index_for_timestamp(
&self,
segment: &Segment,
timestamp: u64,
) -> Result<Option<TimeIndex>, IggyError> {
trace!("Loading time indexes from file...");
let file = file::open(&segment.time_index_path).await?;
let file_size = file.metadata().await?.len() as usize;
if file_size == 0 {
trace!("Time index file is empty.");
return Ok(Some(TimeIndex::default()));
}
let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file);
let mut read_bytes = 0;
let mut idx_pred = HeadTailBuffer::new();
loop {
let offset = reader.read_u32_le().await?;
let time = reader.read_u64_le().await?;
let idx = TimeIndex {
relative_offset: offset,
timestamp: time,
};
idx_pred.push(idx);
if time >= timestamp {
return Ok(idx_pred.tail());
}
read_bytes += TIME_INDEX_SIZE as usize;
if read_bytes == file_size {
return Ok(None);
}
}
}
async fn load_all_time_indexes(&self, segment: &Segment) -> Result<Vec<TimeIndex>, IggyError> {
trace!("Loading time indexes from file...");
let file = file::open(&segment.time_index_path).await?;
let file_size = file.metadata().await?.len() as usize;
if file_size == 0 {
trace!("Time index file is empty.");
return Ok(EMPTY_TIME_INDEXES);
}
let indexes_count = file_size / TIME_INDEX_SIZE as usize;
let mut indexes = Vec::with_capacity(indexes_count);
let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file);
for idx_num in 0..indexes_count {
let offset = reader.read_u32_le().await.map_err(|error| {
error!(
"Cannot read offset from index file for offset: {}. Error: {}",
idx_num, &error
);
error
})?;
let timestamp = reader.read_u64().await.map_err(|error| {
error!(
"Cannot read timestamp from index file for offset: {}. Error: {}",
offset, &error
);
error
})?;
indexes.push(TimeIndex {
relative_offset: offset,
timestamp,
});
}
if indexes.len() != indexes_count {
error!(
"Loaded {} time indexes from disk, expected {}.",
indexes.len(),
indexes_count
);
}
trace!("Loaded {} time indexes from file.", indexes_count);
Ok(indexes)
}
async fn load_last_time_index(
&self,
segment: &Segment,
) -> Result<Option<TimeIndex>, IggyError> {
trace!("Loading last time index from file...");
let mut file = file::open(&segment.time_index_path).await?;
let file_size = file.metadata().await?.len() as usize;
if file_size == 0 {
trace!("Time index file is empty.");
return Ok(None);
}
let last_index_position = file_size - TIME_INDEX_SIZE as usize;
file.seek(SeekFrom::Start(last_index_position as u64))
.await?;
let index_offset = file.read_u32_le().await?;
let timestamp = file.read_u64_le().await?;
let index = TimeIndex {
relative_offset: index_offset,
timestamp,
};
trace!("Loaded last time index from file: {:?}", index);
Ok(Some(index))
}
async fn save_time_index(&self, segment: &Segment) -> Result<(), IggyError> {
if let Err(err) = self
.persister
.append(&segment.time_index_path, &segment.unsaved_timestamps)
.await
.with_context(|| {
format!(
"Failed to save TimeIndex to segment: {}",
segment.time_index_path
)
})
{
return Err(IggyError::CannotSaveTimeIndexToSegment(err));
}
Ok(())
}
}
async fn load_batches_by_range(
segment: &Segment,
index_range: &IndexRange,
mut on_batch: impl FnMut(RetainedMessageBatch) -> Result<(), IggyError>,
) -> Result<(), IggyError> {
let file = file::open(&segment.log_path).await?;
let file_size = file.metadata().await?.len();
if file_size == 0 {
return Ok(());
}
trace!(
"Loading message batches by index range: {} [{}] - {} [{}], file size: {file_size}",
index_range.start.position,
index_range.start.relative_offset,
index_range.end.position,
index_range.end.relative_offset
);
let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file);
reader
.seek(SeekFrom::Start(index_range.start.position as u64))
.await?;
let mut read_bytes = index_range.start.position as u64;
let mut last_batch_to_read = false;
while !last_batch_to_read {
let Ok(batch_base_offset) = reader.read_u64_le().await else {
break;
};
let batch_length = reader
.read_u32_le()
.await
.map_err(|_| IggyError::CannotReadBatchLength)?;
let last_offset_delta = reader
.read_u32_le()
.await
.map_err(|_| IggyError::CannotReadLastOffsetDelta)?;
let max_timestamp = reader
.read_u64_le()
.await
.map_err(|_| IggyError::CannotReadMaxTimestamp)?;
let last_offset = batch_base_offset + (last_offset_delta as u64);
let index_last_offset = index_range.end.relative_offset as u64 + segment.start_offset;
let payload_len = batch_length as usize;
let mut payload = BytesMut::with_capacity(payload_len);
payload.put_bytes(0, payload_len);
if let Err(error) = reader.read_exact(&mut payload).await {
warn!(
"Cannot read batch payload for batch with base offset: {batch_base_offset}, last offset delta: {last_offset_delta}, max timestamp: {max_timestamp}, batch length: {batch_length} and payload length: {payload_len}.\nProbably OS hasn't flushed the data yet, try setting `enforce_fsync = true` for partition configuration if this issue occurs again.\n{error}",
);
break;
}
read_bytes += 8 + 4 + 4 + 8 + payload_len as u64;
last_batch_to_read = read_bytes >= file_size || last_offset == index_last_offset;
let batch = RetainedMessageBatch::new(
batch_base_offset,
last_offset_delta,
max_timestamp,
batch_length,
payload.freeze(),
);
on_batch(batch)?;
}
Ok(())
}
async fn load_messages_by_size(
segment: &Segment,
size_bytes: u64,
mut on_batch: impl FnMut(RetainedMessageBatch) -> Result<(), IggyError>,
) -> Result<(), IggyError> {
let file = file::open(&segment.log_path).await?;
let file_size = file.metadata().await?.len();
if file_size == 0 {
return Ok(());
}
let threshold = file_size.saturating_sub(size_bytes);
let mut accumulated_size: u64 = 0;
let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file);
loop {
let batch_base_offset = reader
.read_u64_le()
.await
.map_err(|_| IggyError::CannotReadBatchBaseOffset)?;
let batch_length = reader
.read_u32_le()
.await
.map_err(|_| IggyError::CannotReadBatchLength)?;
let last_offset_delta = reader
.read_u32_le()
.await
.map_err(|_| IggyError::CannotReadLastOffsetDelta)?;
let max_timestamp = reader
.read_u64_le()
.await
.map_err(|_| IggyError::CannotReadMaxTimestamp)?;
let payload_len = batch_length as usize;
let mut payload = BytesMut::with_capacity(payload_len);
payload.put_bytes(0, payload_len);
reader
.read_exact(&mut payload)
.await
.map_err(|_| IggyError::CannotReadBatchPayload)?;
let batch = RetainedMessageBatch::new(
batch_base_offset,
last_offset_delta,
max_timestamp,
batch_length,
payload.freeze(),
);
let message_size = batch.get_size_bytes() as u64;
if accumulated_size >= threshold {
on_batch(batch)?;
}
accumulated_size += message_size;
if accumulated_size >= file_size {
break;
}
}
Ok(())
}