blob: 06df7a002925b803997bee203bd305baa27c7eac [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::streaming::batching::batch_filter::BatchItemizer;
use crate::streaming::batching::iterator::IntoMessagesIterator;
use crate::streaming::models::messages::RetainedMessage;
use bytes::Bytes;
use iggy::utils::{byte_size::IggyByteSize, sizeable::Sizeable};
pub const RETAINED_BATCH_HEADER_LEN: u64 = 8 + 8 + 4 + 4;
#[derive(Debug)]
pub struct RetainedMessageBatch {
pub base_offset: u64,
pub last_offset_delta: u32,
pub max_timestamp: u64,
pub length: IggyByteSize,
pub bytes: Bytes,
}
impl RetainedMessageBatch {
pub fn new(
base_offset: u64,
last_offset_delta: u32,
max_timestamp: u64,
length: IggyByteSize,
bytes: Bytes,
) -> Self {
RetainedMessageBatch {
base_offset,
last_offset_delta,
max_timestamp,
length,
bytes,
}
}
pub fn is_contained_or_overlapping_within_offset_range(
&self,
start_offset: u64,
end_offset: u64,
) -> bool {
(self.base_offset <= end_offset && self.get_last_offset() >= end_offset)
|| (self.base_offset <= start_offset && self.get_last_offset() <= end_offset)
|| (self.base_offset <= end_offset && self.get_last_offset() >= start_offset)
}
pub fn get_last_offset(&self) -> u64 {
self.base_offset + self.last_offset_delta as u64
}
pub fn header_as_bytes(&self) -> [u8; 24] {
let mut header: [u8; 24] = [0u8; 24];
header[0..8].copy_from_slice(&self.base_offset.to_le_bytes());
header[8..12].copy_from_slice(&(self.length.as_bytes_u64() as u32).to_le_bytes());
header[12..16].copy_from_slice(&self.last_offset_delta.to_le_bytes());
header[16..24].copy_from_slice(&self.max_timestamp.to_le_bytes());
header
}
}
impl<'a, T, U> BatchItemizer<RetainedMessage, &'a U, T> for T
where
T: Iterator<Item = &'a U>,
&'a U: IntoMessagesIterator<Item = RetainedMessage>,
{
fn to_messages(self) -> Vec<RetainedMessage> {
self.flat_map(|batch| batch.into_messages_iter().collect::<Vec<_>>())
.collect()
}
fn to_messages_with_filter<F>(self, messages_count: usize, f: &F) -> Vec<RetainedMessage>
where
F: Fn(&RetainedMessage) -> bool,
{
self.fold(Vec::with_capacity(messages_count), |mut messages, batch| {
messages.extend(batch.into_messages_iter().filter(f));
messages
})
}
}
impl Sizeable for RetainedMessageBatch {
fn get_size_bytes(&self) -> IggyByteSize {
self.length + RETAINED_BATCH_HEADER_LEN.into()
}
}