blob: 8985f138a3146a388097341d1775acb53ba32ea3 [file] [log] [blame]
use std::ops::Deref;
use crate::streaming::batching::batch_filter::BatchItemizer;
use crate::streaming::batching::iterator::IntoMessagesIterator;
use crate::streaming::models::messages::RetainedMessage;
use bytes::{BufMut, Bytes, BytesMut};
use iggy::error::IggyError::{
self, MissingBaseOffsetRetainedMessageBatch, MissingLastOffsetDeltaRetainedMessageBatch,
MissingLengthRetainedMessageBatch, MissingMaxTimestampRetainedMessageBatch,
MissingPayloadRetainedMessageBatch,
};
use crate::streaming::sizeable::Sizeable;
#[derive(Debug, Clone)]
pub struct RetainedMessageBatch {
pub base_offset: u64,
pub last_offset_delta: u32,
pub max_timestamp: u64,
pub length: u32,
pub bytes: Bytes,
}
impl RetainedMessageBatch {
pub fn new(
base_offset: u64,
last_offset_delta: u32,
max_timestamp: u64,
length: u32,
bytes: Bytes,
) -> Self {
RetainedMessageBatch {
base_offset,
last_offset_delta,
max_timestamp,
length,
bytes,
}
}
pub fn builder() -> RetainedMessageBatchBuilder {
RetainedMessageBatchBuilder::new()
}
pub fn is_contained_or_overlapping_within_offset_range(
&self,
start_offset: u64,
end_offset: u64,
) -> bool {
(self.base_offset <= end_offset && self.get_last_offset() >= end_offset)
|| (self.base_offset <= start_offset && self.get_last_offset() <= end_offset)
|| (self.base_offset <= end_offset && self.get_last_offset() >= start_offset)
}
pub fn get_last_offset(&self) -> u64 {
self.base_offset + self.last_offset_delta as u64
}
pub fn extend(&self, bytes: &mut BytesMut) {
bytes.put_u64_le(self.base_offset);
bytes.put_u32_le(self.length);
bytes.put_u32_le(self.last_offset_delta);
bytes.put_u64_le(self.max_timestamp);
bytes.put_slice(&self.bytes);
}
}
impl<'a, T, U> BatchItemizer<RetainedMessage, &'a U, T> for T
where
T: Iterator<Item = &'a U>,
&'a U: IntoMessagesIterator<Item = RetainedMessage>,
{
fn to_messages(self) -> Vec<RetainedMessage> {
self.flat_map(|batch| batch.into_messages_iter().collect::<Vec<_>>())
.collect()
}
fn to_messages_with_filter<F>(self, messages_count: usize, f: &F) -> Vec<RetainedMessage>
where
F: Fn(&RetainedMessage) -> bool,
{
self.fold(Vec::with_capacity(messages_count), |mut messages, batch| {
messages.extend(batch.into_messages_iter().filter(f));
messages
})
}
}
impl Sizeable for RetainedMessageBatch {
fn get_size_bytes(&self) -> u32 {
8 + 4 + 8 + 4 + self.length
}
}
impl<T> Sizeable for T
where
T: Deref<Target = RetainedMessageBatch>,
{
fn get_size_bytes(&self) -> u32 {
8 + 4 + 8 + 4 + self.length
}
}
#[derive(Debug, Clone)]
pub struct RetainedMessageBatchBuilder {
base_offset: Option<u64>,
last_offset_delta: Option<u32>,
max_timestamp: Option<u64>,
length: Option<u32>,
payload: Option<Bytes>,
}
impl RetainedMessageBatchBuilder {
fn new() -> Self {
RetainedMessageBatchBuilder {
base_offset: None,
last_offset_delta: None,
max_timestamp: None,
length: None,
payload: None,
}
}
pub fn base_offset(mut self, base_offset: u64) -> Self {
self.base_offset = Some(base_offset);
self
}
pub fn last_offset_delta(mut self, last_offset_delta: u32) -> Self {
self.last_offset_delta = Some(last_offset_delta);
self
}
pub fn max_timestamp(mut self, max_timestamp: u64) -> Self {
self.max_timestamp = Some(max_timestamp);
self
}
pub fn length(mut self, length: u32) -> Self {
self.length = Some(length);
self
}
pub fn payload(mut self, payload: Bytes) -> Self {
self.payload = Some(payload);
self
}
pub fn build(self) -> Result<RetainedMessageBatch, IggyError> {
let base_offset = self
.base_offset
.ok_or(MissingBaseOffsetRetainedMessageBatch)?;
let last_offset_delta = self
.last_offset_delta
.ok_or(MissingLastOffsetDeltaRetainedMessageBatch)?;
let max_timestamp = self
.max_timestamp
.ok_or(MissingMaxTimestampRetainedMessageBatch)?;
let length = self.length.ok_or(MissingLengthRetainedMessageBatch)?;
let bytes = self.payload.ok_or(MissingPayloadRetainedMessageBatch)?;
Ok(RetainedMessageBatch {
base_offset,
last_offset_delta,
max_timestamp,
length,
bytes,
})
}
}