blob: 8e0998760683ade5da63c5f99029a673a15a0cd6 [file] [log] [blame]
use crate::streaming::sizeable::Sizeable;
use bytes::{BufMut, Bytes, BytesMut};
use iggy::bytes_serializable::BytesSerializable;
use iggy::error::IggyError;
use iggy::models::messages::PolledMessage;
use iggy::utils::checksum;
use iggy::{messages::send_messages::Message, models::messages::MessageState};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
// It's the same as PolledMessages from Iggy models, but with the Arc<Message> instead of Message.
#[derive(Debug, Serialize, Deserialize)]
pub struct PolledMessages {
pub partition_id: u32,
pub current_offset: u64,
pub messages: Vec<Arc<PolledMessage>>,
}
#[derive(Debug)]
pub struct RetainedMessage {
pub id: u128,
pub offset: u64,
pub timestamp: u64,
pub checksum: u32,
pub message_state: MessageState,
pub headers: Option<Bytes>,
pub payload: Bytes,
}
impl TryFrom<RetainedMessage> for PolledMessage {
type Error = IggyError;
fn try_from(value: RetainedMessage) -> Result<Self, Self::Error> {
let headers = value.headers.map(HashMap::from_bytes).transpose()?;
let messages = PolledMessage {
offset: value.offset,
state: value.message_state,
timestamp: value.timestamp,
id: value.id,
checksum: value.checksum,
headers,
length: value.payload.len() as u32,
payload: value.payload,
};
Ok(messages)
}
}
impl RetainedMessage {
pub fn new(offset: u64, timestamp: u64, message: Message) -> Self {
RetainedMessage {
offset,
timestamp,
checksum: checksum::calculate(&message.payload),
message_state: MessageState::Available,
id: message.id,
payload: message.payload.clone(),
headers: message.headers.as_ref().map(|headers| headers.to_bytes()),
}
}
pub fn extend(&self, bytes: &mut BytesMut) {
let length = self.get_size_bytes();
let id = self.id;
let offset = self.offset;
let timestamp = self.timestamp;
let payload = self.payload.clone();
let checksum = self.checksum;
let message_state = self.message_state;
let headers = &self.headers;
bytes.put_u32_le(length);
bytes.put_u64_le(offset);
bytes.put_u8(message_state.as_code());
bytes.put_u64_le(timestamp);
bytes.put_u128_le(id);
bytes.put_u32_le(checksum);
if let Some(headers) = headers {
#[allow(clippy::cast_possible_truncation)]
bytes.put_u32_le(headers.len() as u32);
bytes.put_slice(headers);
} else {
bytes.put_u32_le(0u32);
}
bytes.put_slice(&payload);
}
pub fn try_from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
let offset = u64::from_le_bytes(bytes[..8].try_into()?);
let message_state = MessageState::from_code(bytes[8])?;
let timestamp = u64::from_le_bytes(bytes[9..17].try_into()?);
let id = u128::from_le_bytes(bytes[17..33].try_into()?);
let checksum = u32::from_le_bytes(bytes[33..37].try_into()?);
let headers_length = u32::from_le_bytes(bytes[37..41].try_into()?);
let headers = if headers_length > 0 {
Some(bytes.slice(41..41 + headers_length as usize))
} else {
None
};
let position = 41 + headers_length as usize;
let payload = bytes.slice(position..);
Ok(RetainedMessage {
id,
offset,
timestamp,
checksum,
message_state,
headers,
payload,
})
}
}
impl Sizeable for RetainedMessage {
fn get_size_bytes(&self) -> u32 {
let headers_len = self
.headers
.as_ref()
.map(|h| 4 + h.len() as u32)
.unwrap_or(4);
16 + 8 + 8 + 4 + 1 + headers_len + self.payload.len() as u32
}
}