blob: 91c6a5644163d82a1362c76e47df8255a4b2a4f6 [file] [log] [blame]
use crate::bytes_serializable::BytesSerializable;
use crate::error::IggyError;
use crate::messages::append_messages;
use crate::models::header;
use crate::models::header::{HeaderKey, HeaderValue};
use crate::sizeable::Sizeable;
use crate::utils::{checksum, timestamp::IggyTimestamp};
use bytes::{BufMut, Bytes};
use serde::{Deserialize, Serialize};
use serde_with::base64::Base64;
use serde_with::serde_as;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
/// The wrapper on top of the collection of messages that are polled from the partition.
/// It consists of the following fields:
/// - `partition_id`: the identifier of the partition.
/// - `current_offset`: the current offset of the partition.
/// - `messages`: the collection of messages.
#[derive(Debug, Serialize, Deserialize)]
pub struct PolledMessages {
/// The identifier of the partition.
pub partition_id: u32,
/// The current offset of the partition.
pub current_offset: u64,
/// The collection of messages.
pub messages: Vec<PolledMessage>,
}
/// The single message that is polled from the partition.
/// It consists of the following fields:
/// - `offset`: the offset of the message.
/// - `state`: the state of the message.
/// - `timestamp`: the timestamp of the message.
/// - `id`: the identifier of the message.
/// - `checksum`: the checksum of the message, can be used to verify the integrity of the message.
/// - `headers`: the optional headers of the message.
/// - `length`: the length of the payload.
/// - `payload`: the binary payload of the message.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct PolledMessage {
/// The offset of the message.
pub offset: u64,
/// The state of the message.
pub state: MessageState,
/// The timestamp of the message.
pub timestamp: u64,
/// The identifier of the message.
pub id: u128,
/// The checksum of the message, can be used to verify the integrity of the message.
pub checksum: u32,
/// The optional headers of the message.
pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
/// The length of the payload.
#[serde(skip)]
pub length: u32,
/// The binary payload of the message.
#[serde_as(as = "Base64")]
pub payload: Bytes,
}
/// The state of the message, currently only the `Available` state is used.
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum MessageState {
/// The message is available.
Available,
/// The message is unavailable.
Unavailable,
/// The message is poisoned.
Poisoned,
/// The message is marked for deletion.
MarkedForDeletion,
}
impl MessageState {
/// Returns the code of the message state.
pub fn as_code(&self) -> u8 {
match self {
MessageState::Available => 1,
MessageState::Unavailable => 10,
MessageState::Poisoned => 20,
MessageState::MarkedForDeletion => 30,
}
}
/// Returns the message state from the code.
pub fn from_code(code: u8) -> Result<Self, IggyError> {
match code {
1 => Ok(MessageState::Available),
10 => Ok(MessageState::Unavailable),
20 => Ok(MessageState::Poisoned),
30 => Ok(MessageState::MarkedForDeletion),
_ => Err(IggyError::InvalidCommand),
}
}
}
impl Display for MessageState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
MessageState::Available => write!(f, "available"),
MessageState::Unavailable => write!(f, "unavailable"),
MessageState::Poisoned => write!(f, "poisoned"),
MessageState::MarkedForDeletion => write!(f, "marked_for_deletion"),
}
}
}
impl FromStr for MessageState {
type Err = IggyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"available" => Ok(MessageState::Available),
"unavailable" => Ok(MessageState::Unavailable),
"poisoned" => Ok(MessageState::Poisoned),
"marked_for_deletion" => Ok(MessageState::MarkedForDeletion),
_ => Err(IggyError::InvalidCommand),
}
}
}
impl Sizeable for Arc<PolledMessage> {
fn get_size_bytes(&self) -> u32 {
self.as_ref().get_size_bytes()
}
}
impl PolledMessage {
/// Creates a new message from the `Message` struct being part of `SendMessages` command.
pub fn from_message(message: &append_messages::AppendableMessage) -> Self {
let timestamp = IggyTimestamp::now().to_micros();
let checksum = checksum::calculate(&message.payload);
let headers = message.headers.as_ref().cloned();
Self::empty(
timestamp,
MessageState::Available,
message.id,
message.payload.clone(),
checksum,
headers,
)
}
/// Creates a new message without a specified offset.
pub fn empty(
timestamp: u64,
state: MessageState,
id: u128,
payload: Bytes,
checksum: u32,
headers: Option<HashMap<HeaderKey, HeaderValue>>,
) -> Self {
PolledMessage::create(0, state, timestamp, id, payload, checksum, headers)
}
/// Creates a new message with a specified offset.
pub fn create(
offset: u64,
state: MessageState,
timestamp: u64,
id: u128,
payload: Bytes,
checksum: u32,
headers: Option<HashMap<HeaderKey, HeaderValue>>,
) -> Self {
PolledMessage {
offset,
state,
timestamp,
id,
checksum,
#[allow(clippy::cast_possible_truncation)]
length: payload.len() as u32,
payload,
headers,
}
}
/// Returns the size of the message in bytes.
pub fn get_size_bytes(&self) -> u32 {
// Offset + State + Timestamp + ID + Checksum + Length + Payload + Headers
8 + 1 + 8 + 16 + 4 + 4 + self.length + header::get_headers_size_bytes(&self.headers)
}
/// Extends the provided bytes with the message.
pub fn extend(&self, bytes: &mut Vec<u8>) {
bytes.put_u64_le(self.offset);
bytes.put_u8(self.state.as_code());
bytes.put_u64_le(self.timestamp);
bytes.put_u128_le(self.id);
bytes.put_u32_le(self.checksum);
if let Some(headers) = &self.headers {
let headers_bytes = headers.as_bytes();
#[allow(clippy::cast_possible_truncation)]
bytes.put_u32_le(headers_bytes.len() as u32);
bytes.extend(&headers_bytes);
} else {
bytes.put_u32_le(0u32);
}
bytes.put_u32_le(self.length);
bytes.extend(&self.payload);
}
}