| use crate::bytes_serializable::BytesSerializable; |
| use crate::command::{Command, SEND_MESSAGES_CODE}; |
| use crate::error::IggyError; |
| use crate::identifier::Identifier; |
| use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE}; |
| use crate::models::header; |
| use crate::models::header::{HeaderKey, HeaderValue}; |
| use crate::utils::byte_size::IggyByteSize; |
| use crate::utils::sizeable::Sizeable; |
| use crate::validatable::Validatable; |
| use bytes::{BufMut, Bytes, BytesMut}; |
| use serde::{Deserialize, Serialize}; |
| use serde_with::base64::Base64; |
| use serde_with::serde_as; |
| use std::collections::HashMap; |
| use std::fmt::Display; |
| use std::hash::{Hash, Hasher}; |
| use std::str::FromStr; |
| use uuid::Uuid; |
| |
| const EMPTY_KEY_VALUE: Vec<u8> = vec![]; |
| |
| /// `SendMessages` command is used to send messages to a topic in a stream. |
| /// It has additional payload: |
| /// - `stream_id` - unique stream ID (numeric or name). |
| /// - `topic_id` - unique topic ID (numeric or name). |
| /// - `partitioning` - to which partition the messages should be sent - either provided by the client or calculated by the server. |
| /// - `messages` - collection of messages to be sent. |
| #[derive(Debug, Serialize, Deserialize, PartialEq)] |
| pub struct SendMessages { |
| /// Unique stream ID (numeric or name). |
| #[serde(skip)] |
| pub stream_id: Identifier, |
| /// Unique topic ID (numeric or name). |
| #[serde(skip)] |
| pub topic_id: Identifier, |
| /// To which partition the messages should be sent - either provided by the client or calculated by the server. |
| pub partitioning: Partitioning, |
| /// Collection of messages to be sent. |
| pub messages: Vec<Message>, |
| } |
| |
| /// `Partitioning` is used to specify to which partition the messages should be sent. |
| /// It has the following kinds: |
| /// - `Balanced` - the partition ID is calculated by the server using the round-robin algorithm. |
| /// - `PartitionId` - the partition ID is provided by the client. |
| /// - `MessagesKey` - the partition ID is calculated by the server using the hash of the provided messages key. |
| #[serde_as] |
| #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] |
| pub struct Partitioning { |
| /// The kind of partitioning. |
| pub kind: PartitioningKind, |
| #[serde(skip)] |
| /// The length of the value payload. |
| pub length: u8, |
| #[serde_as(as = "Base64")] |
| /// The binary value payload. |
| pub value: Vec<u8>, |
| } |
| |
| impl Hash for Partitioning { |
| fn hash<H: Hasher>(&self, state: &mut H) { |
| self.kind.hash(state); |
| self.length.hash(state); |
| self.value.hash(state); |
| } |
| } |
| |
| /// The single message to be sent. It has the following payload: |
| /// - `id` - unique message ID, if not specified by the client (has value = 0), it will be generated by the server. |
| /// - `length` - length of the payload. |
| /// - `payload` - binary message payload. |
| /// - `headers` - optional collection of headers. |
| #[serde_as] |
| #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] |
| pub struct Message { |
| /// Unique message ID, if not specified by the client (has value = 0), it will be generated by the server. |
| #[serde(default = "default_message_id")] |
| pub id: u128, |
| #[serde(skip)] |
| /// Length of the payload. |
| pub length: u32, |
| #[serde_as(as = "Base64")] |
| /// Binary message payload. |
| pub payload: Bytes, |
| /// Optional collection of headers. |
| pub headers: Option<HashMap<HeaderKey, HeaderValue>>, |
| } |
| |
| /// `PartitioningKind` is an enum which specifies the kind of partitioning and is used by `Partitioning`. |
| #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Default, Copy, Clone)] |
| #[serde(rename_all = "snake_case")] |
| pub enum PartitioningKind { |
| /// The partition ID is calculated by the server using the round-robin algorithm. |
| #[default] |
| Balanced, |
| /// The partition ID is provided by the client. |
| PartitionId, |
| /// The partition ID is calculated by the server using the hash of the provided messages key. |
| MessagesKey, |
| } |
| |
| impl Hash for PartitioningKind { |
| fn hash<H: Hasher>(&self, state: &mut H) { |
| self.as_code().hash(state); |
| } |
| } |
| |
| fn default_message_id() -> u128 { |
| 0 |
| } |
| |
| impl Default for SendMessages { |
| fn default() -> Self { |
| SendMessages { |
| stream_id: Identifier::default(), |
| topic_id: Identifier::default(), |
| partitioning: Partitioning::default(), |
| messages: vec![Message::default()], |
| } |
| } |
| } |
| |
| impl Default for Partitioning { |
| fn default() -> Self { |
| Partitioning::balanced() |
| } |
| } |
| |
| impl Partitioning { |
| /// Partition the messages using the balanced round-robin algorithm on the server. |
| pub fn balanced() -> Self { |
| Partitioning { |
| kind: PartitioningKind::Balanced, |
| length: 0, |
| value: EMPTY_KEY_VALUE, |
| } |
| } |
| |
| /// Partition the messages using the provided partition ID. |
| pub fn partition_id(partition_id: u32) -> Self { |
| Partitioning { |
| kind: PartitioningKind::PartitionId, |
| length: 4, |
| value: partition_id.to_le_bytes().to_vec(), |
| } |
| } |
| |
| /// Partition the messages using the provided messages key. |
| pub fn messages_key(value: &[u8]) -> Result<Self, IggyError> { |
| let length = value.len(); |
| if length == 0 || length > 255 { |
| return Err(IggyError::InvalidCommand); |
| } |
| |
| Ok(Partitioning { |
| kind: PartitioningKind::MessagesKey, |
| #[allow(clippy::cast_possible_truncation)] |
| length: length as u8, |
| value: value.to_vec(), |
| }) |
| } |
| |
| /// Partition the messages using the provided messages key as str. |
| pub fn messages_key_str(value: &str) -> Result<Self, IggyError> { |
| Self::messages_key(value.as_bytes()) |
| } |
| |
| /// Partition the messages using the provided messages key as u32. |
| pub fn messages_key_u32(value: u32) -> Self { |
| Partitioning { |
| kind: PartitioningKind::MessagesKey, |
| length: 4, |
| value: value.to_le_bytes().to_vec(), |
| } |
| } |
| |
| /// Partition the messages using the provided messages key as u64. |
| pub fn messages_key_u64(value: u64) -> Self { |
| Partitioning { |
| kind: PartitioningKind::MessagesKey, |
| length: 8, |
| value: value.to_le_bytes().to_vec(), |
| } |
| } |
| |
| /// Partition the messages using the provided messages key as u128. |
| pub fn messages_key_u128(value: u128) -> Self { |
| Partitioning { |
| kind: PartitioningKind::MessagesKey, |
| length: 16, |
| value: value.to_le_bytes().to_vec(), |
| } |
| } |
| |
| /// Create the partitioning from the provided partitioning. |
| pub fn from_partitioning(partitioning: &Partitioning) -> Self { |
| Partitioning { |
| kind: partitioning.kind, |
| length: partitioning.length, |
| value: partitioning.value.clone(), |
| } |
| } |
| } |
| |
| impl Sizeable for Partitioning { |
| fn get_size_bytes(&self) -> IggyByteSize { |
| IggyByteSize::from(u64::from(self.length) + 2) |
| } |
| } |
| |
| impl Command for SendMessages { |
| fn code(&self) -> u32 { |
| SEND_MESSAGES_CODE |
| } |
| } |
| |
| impl Validatable<IggyError> for SendMessages { |
| fn validate(&self) -> Result<(), IggyError> { |
| if self.messages.is_empty() { |
| return Err(IggyError::InvalidMessagesCount); |
| } |
| |
| let key_value_length = self.partitioning.value.len(); |
| if key_value_length > 255 |
| || (self.partitioning.kind != PartitioningKind::Balanced && key_value_length == 0) |
| { |
| return Err(IggyError::InvalidKeyValueLength); |
| } |
| |
| let mut headers_size = 0; |
| let mut payload_size = 0; |
| for message in &self.messages { |
| if let Some(headers) = &message.headers { |
| for value in headers.values() { |
| headers_size += value.value.len() as u32; |
| if headers_size > MAX_HEADERS_SIZE { |
| return Err(IggyError::TooBigHeadersPayload); |
| } |
| } |
| } |
| payload_size += message.payload.len() as u32; |
| if payload_size > MAX_PAYLOAD_SIZE { |
| return Err(IggyError::TooBigMessagePayload); |
| } |
| } |
| |
| if payload_size == 0 { |
| return Err(IggyError::EmptyMessagePayload); |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| impl PartitioningKind { |
| /// Get the code of the partitioning kind. |
| pub fn as_code(&self) -> u8 { |
| match self { |
| PartitioningKind::Balanced => 1, |
| PartitioningKind::PartitionId => 2, |
| PartitioningKind::MessagesKey => 3, |
| } |
| } |
| |
| /// Get the partitioning kind from the provided code. |
| pub fn from_code(code: u8) -> Result<Self, IggyError> { |
| match code { |
| 1 => Ok(PartitioningKind::Balanced), |
| 2 => Ok(PartitioningKind::PartitionId), |
| 3 => Ok(PartitioningKind::MessagesKey), |
| _ => Err(IggyError::InvalidCommand), |
| } |
| } |
| } |
| |
| impl Message { |
| /// Create a new message with the optional ID, payload and headers. |
| pub fn new( |
| id: Option<u128>, |
| payload: Bytes, |
| headers: Option<HashMap<HeaderKey, HeaderValue>>, |
| ) -> Self { |
| Message { |
| id: id.unwrap_or(0), |
| #[allow(clippy::cast_possible_truncation)] |
| length: payload.len() as u32, |
| payload, |
| headers, |
| } |
| } |
| } |
| |
| impl Sizeable for Message { |
| fn get_size_bytes(&self) -> IggyByteSize { |
| // ID + Length + Payload + Headers |
| header::get_headers_size_bytes(&self.headers) + (16 + 4 + self.payload.len() as u64).into() |
| } |
| } |
| |
| impl Default for Message { |
| fn default() -> Self { |
| let payload = Bytes::from("hello world"); |
| Message { |
| id: 1, |
| length: payload.len() as u32, |
| payload, |
| headers: None, |
| } |
| } |
| } |
| |
| impl Display for Message { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| let len = self.payload.len(); |
| |
| if len > 40 { |
| write!( |
| f, |
| "{}|{}...{}", |
| self.id, |
| String::from_utf8_lossy(&self.payload[..20]), |
| String::from_utf8_lossy(&self.payload[len - 20..]) |
| ) |
| } else { |
| write!(f, "{}|{}", self.id, String::from_utf8_lossy(&self.payload)) |
| } |
| } |
| } |
| |
| impl BytesSerializable for Partitioning { |
| fn to_bytes(&self) -> Bytes { |
| let mut bytes = BytesMut::with_capacity(2 + self.length as usize); |
| bytes.put_u8(self.kind.as_code()); |
| bytes.put_u8(self.length); |
| bytes.put_slice(&self.value); |
| bytes.freeze() |
| } |
| |
| fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> |
| where |
| Self: Sized, |
| { |
| if bytes.len() < 3 { |
| return Err(IggyError::InvalidCommand); |
| } |
| |
| let kind = PartitioningKind::from_code(bytes[0])?; |
| let length = bytes[1]; |
| let value = bytes[2..2 + length as usize].to_vec(); |
| if value.len() != length as usize { |
| return Err(IggyError::InvalidCommand); |
| } |
| |
| Ok(Partitioning { |
| kind, |
| length, |
| value, |
| }) |
| } |
| } |
| |
| impl BytesSerializable for Message { |
| fn to_bytes(&self) -> Bytes { |
| let mut bytes = BytesMut::with_capacity(self.get_size_bytes().as_bytes_usize()); |
| bytes.put_u128_le(self.id); |
| if let Some(headers) = &self.headers { |
| let headers_bytes = headers.to_bytes(); |
| bytes.put_u32_le(headers_bytes.len() as u32); |
| bytes.put_slice(&headers_bytes); |
| } else { |
| bytes.put_u32_le(0); |
| } |
| bytes.put_u32_le(self.length); |
| bytes.put_slice(&self.payload); |
| bytes.freeze() |
| } |
| |
| fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { |
| if bytes.len() < 24 { |
| return Err(IggyError::InvalidCommand); |
| } |
| |
| let mut id = u128::from_le_bytes( |
| bytes[..16] |
| .try_into() |
| .map_err(|_| IggyError::InvalidNumberEncoding)?, |
| ); |
| if id == 0 { |
| id = Uuid::now_v7().to_u128_le(); |
| } |
| let headers_length = u32::from_le_bytes( |
| bytes[16..20] |
| .try_into() |
| .map_err(|_| IggyError::InvalidNumberEncoding)?, |
| ); |
| let headers = if headers_length > 0 { |
| Some(HashMap::from_bytes( |
| bytes.slice(20..20 + headers_length as usize), |
| )?) |
| } else { |
| None |
| }; |
| |
| let payload_length = u32::from_le_bytes( |
| bytes[20 + headers_length as usize..24 + headers_length as usize] |
| .try_into() |
| .map_err(|_| IggyError::InvalidNumberEncoding)?, |
| ); |
| if payload_length == 0 { |
| return Err(IggyError::EmptyMessagePayload); |
| } |
| |
| let payload = bytes.slice( |
| 24 + headers_length as usize..24 + headers_length as usize + payload_length as usize, |
| ); |
| if payload.len() != payload_length as usize { |
| return Err(IggyError::InvalidMessagePayloadLength); |
| } |
| |
| Ok(Message { |
| id, |
| length: payload_length, |
| payload, |
| headers, |
| }) |
| } |
| } |
| |
| // This method is used by the new version of `IggyClient` to serialize `SendMessages` without copying the messages. |
| pub(crate) fn as_bytes( |
| stream_id: &Identifier, |
| topic_id: &Identifier, |
| partitioning: &Partitioning, |
| messages: &[Message], |
| ) -> Bytes { |
| let messages_size = messages |
| .iter() |
| .map(Message::get_size_bytes) |
| .sum::<IggyByteSize>(); |
| let key_bytes = partitioning.to_bytes(); |
| let stream_id_bytes = stream_id.to_bytes(); |
| let topic_id_bytes = topic_id.to_bytes(); |
| let mut bytes = BytesMut::with_capacity( |
| stream_id_bytes.len() |
| + topic_id_bytes.len() |
| + key_bytes.len() |
| + messages_size.as_bytes_usize(), |
| ); |
| bytes.put_slice(&stream_id_bytes); |
| bytes.put_slice(&topic_id_bytes); |
| bytes.put_slice(&key_bytes); |
| for message in messages { |
| bytes.put_slice(&message.to_bytes()); |
| } |
| |
| bytes.freeze() |
| } |
| |
| impl FromStr for Message { |
| type Err = IggyError; |
| fn from_str(input: &str) -> Result<Self, Self::Err> { |
| let id = default_message_id(); |
| let payload = Bytes::from(input.as_bytes().to_vec()); |
| let length = payload.len() as u32; |
| if length == 0 { |
| return Err(IggyError::EmptyMessagePayload); |
| } |
| |
| Ok(Message { |
| id, |
| length, |
| payload, |
| headers: None, |
| }) |
| } |
| } |
| |
| impl BytesSerializable for SendMessages { |
| fn to_bytes(&self) -> Bytes { |
| as_bytes( |
| &self.stream_id, |
| &self.topic_id, |
| &self.partitioning, |
| &self.messages, |
| ) |
| } |
| |
| fn from_bytes(bytes: Bytes) -> Result<SendMessages, IggyError> { |
| if bytes.len() < 11 { |
| return Err(IggyError::InvalidCommand); |
| } |
| |
| let mut position = 0; |
| let stream_id = Identifier::from_bytes(bytes.clone())?; |
| position += stream_id.get_size_bytes().as_bytes_usize(); |
| let topic_id = Identifier::from_bytes(bytes.slice(position..))?; |
| position += topic_id.get_size_bytes().as_bytes_usize(); |
| let key = Partitioning::from_bytes(bytes.slice(position..))?; |
| position += key.get_size_bytes().as_bytes_usize(); |
| let messages_payloads = bytes.slice(position..); |
| position = 0; |
| let mut messages = Vec::new(); |
| while position < messages_payloads.len() { |
| let message = Message::from_bytes(messages_payloads.slice(position..))?; |
| position += message.get_size_bytes().as_bytes_usize(); |
| messages.push(message); |
| } |
| |
| let command = SendMessages { |
| stream_id, |
| topic_id, |
| partitioning: key, |
| messages, |
| }; |
| Ok(command) |
| } |
| } |
| |
| impl Display for SendMessages { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!( |
| f, |
| "{}|{}|{}|{}", |
| self.stream_id, |
| self.topic_id, |
| self.partitioning, |
| self.messages |
| .iter() |
| .map(std::string::ToString::to_string) |
| .collect::<Vec<String>>() |
| .join("|") |
| ) |
| } |
| } |
| |
| impl Display for Partitioning { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| match self.kind { |
| PartitioningKind::Balanced => write!(f, "{}|0", self.kind), |
| PartitioningKind::PartitionId => write!( |
| f, |
| "{}|{}", |
| self.kind, |
| u32::from_le_bytes(self.value[..4].try_into().unwrap()) |
| ), |
| PartitioningKind::MessagesKey => { |
| write!(f, "{}|{}", self.kind, String::from_utf8_lossy(&self.value)) |
| } |
| } |
| } |
| } |
| |
| impl Display for PartitioningKind { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| match self { |
| PartitioningKind::Balanced => write!(f, "balanced"), |
| PartitioningKind::PartitionId => write!(f, "partition_id"), |
| PartitioningKind::MessagesKey => write!(f, "messages_key"), |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| #[test] |
| fn should_be_serialized_as_bytes() { |
| let message_1 = Message::from_str("hello 1").unwrap(); |
| let message_2 = Message::new(Some(2), "hello 2".into(), None); |
| let message_3 = Message::new(Some(3), "hello 3".into(), None); |
| let messages = vec![message_1, message_2, message_3]; |
| let command = SendMessages { |
| stream_id: Identifier::numeric(1).unwrap(), |
| topic_id: Identifier::numeric(2).unwrap(), |
| partitioning: Partitioning::partition_id(4), |
| messages, |
| }; |
| |
| let bytes = command.to_bytes(); |
| |
| let mut position = 0; |
| let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); |
| position += stream_id.get_size_bytes().as_bytes_usize(); |
| let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); |
| position += topic_id.get_size_bytes().as_bytes_usize(); |
| let key = Partitioning::from_bytes(bytes.slice(position..)).unwrap(); |
| position += key.get_size_bytes().as_bytes_usize(); |
| let messages = bytes.slice(position..); |
| let command_messages = command |
| .messages |
| .iter() |
| .fold(BytesMut::new(), |mut bytes_mut, message| { |
| bytes_mut.put(message.to_bytes()); |
| bytes_mut |
| }) |
| .freeze(); |
| |
| assert!(!bytes.is_empty()); |
| assert_eq!(stream_id, command.stream_id); |
| assert_eq!(topic_id, command.topic_id); |
| assert_eq!(key, command.partitioning); |
| assert_eq!(messages, command_messages); |
| } |
| |
| #[test] |
| fn should_be_deserialized_from_bytes() { |
| let stream_id = Identifier::numeric(1).unwrap(); |
| let topic_id = Identifier::numeric(2).unwrap(); |
| let key = Partitioning::partition_id(4); |
| |
| let message_1 = Message::from_str("hello 1").unwrap(); |
| let message_2 = Message::new(Some(2), "hello 2".into(), None); |
| let message_3 = Message::new(Some(3), "hello 3".into(), None); |
| let messages = [ |
| message_1.to_bytes(), |
| message_2.to_bytes(), |
| message_3.to_bytes(), |
| ] |
| .concat(); |
| |
| let key_bytes = key.to_bytes(); |
| let stream_id_bytes = stream_id.to_bytes(); |
| let topic_id_bytes = topic_id.to_bytes(); |
| let current_position = stream_id_bytes.len() + topic_id_bytes.len() + key_bytes.len(); |
| let mut bytes = BytesMut::with_capacity(current_position); |
| bytes.put_slice(&stream_id_bytes); |
| bytes.put_slice(&topic_id_bytes); |
| bytes.put_slice(&key_bytes); |
| bytes.put_slice(&messages); |
| let bytes = bytes.freeze(); |
| let command = SendMessages::from_bytes(bytes.clone()); |
| assert!(command.is_ok()); |
| |
| let messages_payloads = bytes.slice(current_position..); |
| let mut position = 0; |
| let mut messages = Vec::new(); |
| while position < messages_payloads.len() { |
| let message = Message::from_bytes(messages_payloads.slice(position..)).unwrap(); |
| position += message.get_size_bytes().as_bytes_usize(); |
| messages.push(message); |
| } |
| |
| let command = command.unwrap(); |
| assert_eq!(command.stream_id, stream_id); |
| assert_eq!(command.topic_id, topic_id); |
| assert_eq!(command.partitioning, key); |
| for (index, message) in command.messages.iter().enumerate() { |
| let command_message = &command.messages[index]; |
| assert_eq!(command_message.id, message.id); |
| assert_eq!(command_message.length, message.length); |
| assert_eq!(command_message.payload, message.payload); |
| } |
| } |
| |
| #[test] |
| fn key_of_type_balanced_should_have_empty_value() { |
| let key = Partitioning::balanced(); |
| assert_eq!(key.kind, PartitioningKind::Balanced); |
| assert_eq!(key.length, 0); |
| assert_eq!(key.value, EMPTY_KEY_VALUE); |
| assert_eq!( |
| PartitioningKind::from_code(1).unwrap(), |
| PartitioningKind::Balanced |
| ); |
| } |
| |
| #[test] |
| fn key_of_type_partition_should_have_value_of_const_length_4() { |
| let partition_id = 1234u32; |
| let key = Partitioning::partition_id(partition_id); |
| assert_eq!(key.kind, PartitioningKind::PartitionId); |
| assert_eq!(key.length, 4); |
| assert_eq!(key.value, partition_id.to_le_bytes()); |
| assert_eq!( |
| PartitioningKind::from_code(2).unwrap(), |
| PartitioningKind::PartitionId |
| ); |
| } |
| |
| #[test] |
| fn key_of_type_messages_key_should_have_value_of_dynamic_length() { |
| let messages_key = "hello world"; |
| let key = Partitioning::messages_key_str(messages_key).unwrap(); |
| assert_eq!(key.kind, PartitioningKind::MessagesKey); |
| assert_eq!(key.length, messages_key.len() as u8); |
| assert_eq!(key.value, messages_key.as_bytes()); |
| assert_eq!( |
| PartitioningKind::from_code(3).unwrap(), |
| PartitioningKind::MessagesKey |
| ); |
| } |
| |
| #[test] |
| fn key_of_type_messages_key_that_has_length_0_should_fail() { |
| let messages_key = ""; |
| let key = Partitioning::messages_key_str(messages_key); |
| assert!(key.is_err()); |
| } |
| |
| #[test] |
| fn key_of_type_messages_key_that_has_length_greater_than_255_should_fail() { |
| let messages_key = "a".repeat(256); |
| let key = Partitioning::messages_key_str(&messages_key); |
| assert!(key.is_err()); |
| } |
| } |