| use crate::bytes_serializable::BytesSerializable; |
| use crate::command::{Command, POLL_MESSAGES_CODE}; |
| use crate::consumer::{Consumer, ConsumerKind}; |
| use crate::error::IggyError; |
| use crate::identifier::Identifier; |
| use crate::utils::sizeable::Sizeable; |
| use crate::utils::timestamp::IggyTimestamp; |
| use crate::validatable::Validatable; |
| use bytes::{BufMut, Bytes, BytesMut}; |
| use serde::{Deserialize, Serialize}; |
| use serde_with::{serde_as, DisplayFromStr}; |
| use std::fmt::Display; |
| use std::str::FromStr; |
| |
| /// `PollMessages` command is used to poll messages from a topic in a stream. |
| /// It has additional payload: |
| /// - `consumer` - consumer which will poll messages. Either regular consumer or consumer group. |
| /// - `stream_id` - unique stream ID (numeric or name). |
| /// - `topic_id` - unique topic ID (numeric or name). |
| /// - `partition_id` - partition ID from which messages will be polled. Has to be specified for the regular consumer. For consumer group it is ignored (use `None`). |
| /// - `strategy` - polling strategy which specifies from where to start polling messages. |
| /// - `count` - number of messages to poll. |
| /// - `auto_commit` - whether to commit offset on the server automatically after polling the messages. |
| #[derive(Debug, Serialize, Deserialize, PartialEq)] |
| pub struct PollMessages { |
| /// Consumer which will poll messages. Either regular consumer or consumer group. |
| #[serde(flatten)] |
| pub consumer: Consumer, |
| /// Unique stream ID (numeric or name). |
| #[serde(skip)] |
| pub stream_id: Identifier, |
| /// Unique topic ID (numeric or name). |
| #[serde(skip)] |
| pub topic_id: Identifier, |
| /// Partition ID from which messages will be polled. Has to be specified for the regular consumer. For consumer group it is ignored (use `None`). |
| #[serde(default = "default_partition_id")] |
| pub partition_id: Option<u32>, |
| /// Polling strategy which specifies from where to start polling messages. |
| #[serde(default = "default_strategy", flatten)] |
| pub strategy: PollingStrategy, |
| #[serde(default = "default_count")] |
| /// Number of messages to poll. |
| pub count: u32, |
| #[serde(default)] |
| /// Whether to commit offset on the server automatically after polling the messages. |
| pub auto_commit: bool, |
| } |
| |
| /// `PollingStrategy` specifies from where to start polling messages. |
| /// It has the following kinds: |
| /// - `Offset` - start polling from the specified offset. |
| /// - `Timestamp` - start polling from the specified timestamp. |
| /// - `First` - start polling from the first message in the partition. |
| /// - `Last` - start polling from the last message in the partition. |
| /// - `Next` - start polling from the next message after the last polled message based on the stored consumer offset. |
| #[serde_as] |
| #[derive(Debug, Serialize, Deserialize, PartialEq, Copy, Clone)] |
| pub struct PollingStrategy { |
| /// Kind of the polling strategy. |
| #[serde_as(as = "DisplayFromStr")] |
| #[serde(default = "default_kind")] |
| pub kind: PollingKind, |
| /// Value of the polling strategy. |
| #[serde_as(as = "DisplayFromStr")] |
| #[serde(default = "default_value")] |
| pub value: u64, |
| } |
| |
| /// `PollingKind` is an enum which specifies from where to start polling messages and is used by `PollingStrategy`. |
| #[derive(Debug, Serialize, Deserialize, PartialEq, Default, Copy, Clone)] |
| #[serde(rename_all = "snake_case")] |
| pub enum PollingKind { |
| #[default] |
| /// Start polling from the specified offset. |
| Offset, |
| /// Start polling from the specified timestamp. |
| Timestamp, |
| /// Start polling from the first message in the partition. |
| First, |
| /// Start polling from the last message in the partition. |
| Last, |
| /// Start polling from the next message after the last polled message based on the stored consumer offset. Should be used with `auto_commit` set to `true`. |
| Next, |
| } |
| |
| impl Default for PollMessages { |
| fn default() -> Self { |
| Self { |
| consumer: Consumer::default(), |
| stream_id: Identifier::numeric(1).unwrap(), |
| topic_id: Identifier::numeric(1).unwrap(), |
| partition_id: default_partition_id(), |
| strategy: default_strategy(), |
| count: default_count(), |
| auto_commit: false, |
| } |
| } |
| } |
| |
| impl Default for PollingStrategy { |
| fn default() -> Self { |
| Self { |
| kind: PollingKind::Offset, |
| value: 0, |
| } |
| } |
| } |
| |
| impl Command for PollMessages { |
| fn code(&self) -> u32 { |
| POLL_MESSAGES_CODE |
| } |
| } |
| |
| fn default_partition_id() -> Option<u32> { |
| Some(1) |
| } |
| |
| fn default_kind() -> PollingKind { |
| PollingKind::Offset |
| } |
| |
| fn default_value() -> u64 { |
| 0 |
| } |
| |
| fn default_strategy() -> PollingStrategy { |
| PollingStrategy::default() |
| } |
| |
| fn default_count() -> u32 { |
| 10 |
| } |
| |
| impl Validatable<IggyError> for PollMessages { |
| fn validate(&self) -> Result<(), IggyError> { |
| Ok(()) |
| } |
| } |
| |
| impl PollingStrategy { |
| /// Poll messages from the specified offset. |
| pub fn offset(value: u64) -> Self { |
| Self { |
| kind: PollingKind::Offset, |
| value, |
| } |
| } |
| |
| /// Poll messages from the specified timestamp. |
| pub fn timestamp(value: IggyTimestamp) -> Self { |
| Self { |
| kind: PollingKind::Timestamp, |
| value: value.into(), |
| } |
| } |
| |
| /// Poll messages from the first message in the partition. |
| pub fn first() -> Self { |
| Self { |
| kind: PollingKind::First, |
| value: 0, |
| } |
| } |
| |
| /// Poll messages from the last message in the partition. |
| pub fn last() -> Self { |
| Self { |
| kind: PollingKind::Last, |
| value: 0, |
| } |
| } |
| |
| /// Poll messages from the next message after the last polled message based on the stored consumer offset. Should be used with `auto_commit` set to `true`. |
| pub fn next() -> Self { |
| Self { |
| kind: PollingKind::Next, |
| value: 0, |
| } |
| } |
| |
| /// Change the value of the polling strategy, affects only `Offset` and `Timestamp` kinds. |
| pub fn set_value(&mut self, value: u64) { |
| if self.kind == PollingKind::Offset || self.kind == PollingKind::Timestamp { |
| self.value = value; |
| } |
| } |
| } |
| |
| impl PollingKind { |
| /// Returns code of the polling kind. |
| pub fn as_code(&self) -> u8 { |
| match self { |
| PollingKind::Offset => 1, |
| PollingKind::Timestamp => 2, |
| PollingKind::First => 3, |
| PollingKind::Last => 4, |
| PollingKind::Next => 5, |
| } |
| } |
| |
| /// Returns polling kind from the specified code. |
| pub fn from_code(code: u8) -> Result<Self, IggyError> { |
| match code { |
| 1 => Ok(PollingKind::Offset), |
| 2 => Ok(PollingKind::Timestamp), |
| 3 => Ok(PollingKind::First), |
| 4 => Ok(PollingKind::Last), |
| 5 => Ok(PollingKind::Next), |
| _ => Err(IggyError::InvalidCommand), |
| } |
| } |
| } |
| |
| impl FromStr for PollingKind { |
| type Err = IggyError; |
| fn from_str(input: &str) -> Result<Self, Self::Err> { |
| match input { |
| "o" | "offset" => Ok(PollingKind::Offset), |
| "t" | "timestamp" => Ok(PollingKind::Timestamp), |
| "f" | "first" => Ok(PollingKind::First), |
| "l" | "last" => Ok(PollingKind::Last), |
| "n" | "next" => Ok(PollingKind::Next), |
| _ => Err(IggyError::InvalidCommand), |
| } |
| } |
| } |
| |
| impl Display for PollingKind { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| match self { |
| PollingKind::Offset => write!(f, "offset"), |
| PollingKind::Timestamp => write!(f, "timestamp"), |
| PollingKind::First => write!(f, "first"), |
| PollingKind::Last => write!(f, "last"), |
| PollingKind::Next => write!(f, "next"), |
| } |
| } |
| } |
| |
| impl BytesSerializable for PollMessages { |
| fn to_bytes(&self) -> Bytes { |
| as_bytes( |
| &self.stream_id, |
| &self.topic_id, |
| self.partition_id, |
| &self.consumer, |
| &self.strategy, |
| self.count, |
| self.auto_commit, |
| ) |
| } |
| |
| fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { |
| if bytes.len() < 29 { |
| return Err(IggyError::InvalidCommand); |
| } |
| |
| let mut position = 0; |
| let consumer_kind = ConsumerKind::from_code(bytes[0])?; |
| let consumer_id = Identifier::from_bytes(bytes.slice(1..))?; |
| position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); |
| let consumer = Consumer { |
| kind: consumer_kind, |
| id: consumer_id, |
| }; |
| let stream_id = Identifier::from_bytes(bytes.slice(position..))?; |
| position += stream_id.get_size_bytes().as_bytes_usize(); |
| let topic_id = Identifier::from_bytes(bytes.slice(position..))?; |
| position += topic_id.get_size_bytes().as_bytes_usize(); |
| let partition_id = u32::from_le_bytes( |
| bytes[position..position + 4] |
| .try_into() |
| .map_err(|_| IggyError::InvalidNumberEncoding)?, |
| ); |
| let partition_id = match partition_id { |
| 0 => None, |
| partition_id => Some(partition_id), |
| }; |
| let polling_kind = PollingKind::from_code(bytes[position + 4])?; |
| position += 5; |
| let value = u64::from_le_bytes( |
| bytes[position..position + 8] |
| .try_into() |
| .map_err(|_| IggyError::InvalidNumberEncoding)?, |
| ); |
| let strategy = PollingStrategy { |
| kind: polling_kind, |
| value, |
| }; |
| let count = u32::from_le_bytes( |
| bytes[position + 8..position + 12] |
| .try_into() |
| .map_err(|_| IggyError::InvalidNumberEncoding)?, |
| ); |
| let auto_commit = bytes[position + 12]; |
| let auto_commit = matches!(auto_commit, 1); |
| let command = PollMessages { |
| consumer, |
| stream_id, |
| topic_id, |
| partition_id, |
| strategy, |
| count, |
| auto_commit, |
| }; |
| Ok(command) |
| } |
| } |
| |
| // This method is used by the new version of `IggyClient` to serialize `PollMessages` without cloning the args. |
| pub(crate) fn as_bytes( |
| stream_id: &Identifier, |
| topic_id: &Identifier, |
| partition_id: Option<u32>, |
| consumer: &Consumer, |
| strategy: &PollingStrategy, |
| count: u32, |
| auto_commit: bool, |
| ) -> Bytes { |
| let consumer_bytes = consumer.to_bytes(); |
| let stream_id_bytes = stream_id.to_bytes(); |
| let topic_id_bytes = topic_id.to_bytes(); |
| let strategy_bytes = strategy.to_bytes(); |
| let mut bytes = BytesMut::with_capacity( |
| 9 + consumer_bytes.len() |
| + stream_id_bytes.len() |
| + topic_id_bytes.len() |
| + strategy_bytes.len(), |
| ); |
| bytes.put_slice(&consumer_bytes); |
| bytes.put_slice(&stream_id_bytes); |
| bytes.put_slice(&topic_id_bytes); |
| if let Some(partition_id) = partition_id { |
| bytes.put_u32_le(partition_id); |
| } else { |
| bytes.put_u32_le(0); |
| } |
| bytes.put_slice(&strategy_bytes); |
| bytes.put_u32_le(count); |
| if auto_commit { |
| bytes.put_u8(1); |
| } else { |
| bytes.put_u8(0); |
| } |
| |
| bytes.freeze() |
| } |
| |
| impl Display for PollMessages { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!( |
| f, |
| "{}|{}|{}|{}|{}|{}|{}", |
| self.consumer, |
| self.stream_id, |
| self.topic_id, |
| self.partition_id.unwrap_or(0), |
| self.strategy, |
| self.count, |
| auto_commit_to_string(self.auto_commit) |
| ) |
| } |
| } |
| |
| impl Display for PollingStrategy { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "{}|{}", self.kind, self.value) |
| } |
| } |
| |
| fn auto_commit_to_string(auto_commit: bool) -> &'static str { |
| if auto_commit { |
| "a" |
| } else { |
| "n" |
| } |
| } |
| |
| impl BytesSerializable for PollingStrategy { |
| fn to_bytes(&self) -> Bytes { |
| let mut bytes = BytesMut::with_capacity(9); |
| bytes.put_u8(self.kind.as_code()); |
| bytes.put_u64_le(self.value); |
| bytes.freeze() |
| } |
| |
| fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { |
| if bytes.len() != 9 { |
| return Err(IggyError::InvalidCommand); |
| } |
| |
| let kind = PollingKind::from_code(bytes[0])?; |
| let value = u64::from_le_bytes( |
| bytes[1..9] |
| .try_into() |
| .map_err(|_| IggyError::InvalidNumberEncoding)?, |
| ); |
| let strategy = PollingStrategy { kind, value }; |
| Ok(strategy) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| #[test] |
| fn should_be_serialized_as_bytes() { |
| let command = PollMessages { |
| consumer: Consumer::new(Identifier::numeric(1).unwrap()), |
| stream_id: Identifier::numeric(2).unwrap(), |
| topic_id: Identifier::numeric(3).unwrap(), |
| partition_id: Some(4), |
| strategy: PollingStrategy::offset(2), |
| count: 3, |
| auto_commit: true, |
| }; |
| |
| let bytes = command.to_bytes(); |
| let mut position = 0; |
| let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap(); |
| let consumer_id = Identifier::from_bytes(bytes.slice(1..)).unwrap(); |
| position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); |
| let consumer = Consumer { |
| kind: consumer_kind, |
| id: consumer_id, |
| }; |
| let stream_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); |
| position += stream_id.get_size_bytes().as_bytes_usize(); |
| let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); |
| position += topic_id.get_size_bytes().as_bytes_usize(); |
| let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); |
| let polling_kind = PollingKind::from_code(bytes[position + 4]).unwrap(); |
| position += 5; |
| let value = u64::from_le_bytes(bytes[position..position + 8].try_into().unwrap()); |
| let strategy = PollingStrategy { |
| kind: polling_kind, |
| value, |
| }; |
| let count = u32::from_le_bytes(bytes[position + 8..position + 12].try_into().unwrap()); |
| let auto_commit = bytes[position + 12]; |
| let auto_commit = matches!(auto_commit, 1); |
| |
| assert!(!bytes.is_empty()); |
| assert_eq!(consumer, command.consumer); |
| assert_eq!(stream_id, command.stream_id); |
| assert_eq!(topic_id, command.topic_id); |
| assert_eq!(Some(partition_id), command.partition_id); |
| assert_eq!(strategy, command.strategy); |
| assert_eq!(count, command.count); |
| assert_eq!(auto_commit, command.auto_commit); |
| } |
| |
| #[test] |
| fn should_be_deserialized_from_bytes() { |
| let consumer = Consumer::new(Identifier::numeric(1).unwrap()); |
| let stream_id = Identifier::numeric(2).unwrap(); |
| let topic_id = Identifier::numeric(3).unwrap(); |
| let partition_id = 4u32; |
| let strategy = PollingStrategy::offset(2); |
| let count = 3u32; |
| let auto_commit = 1u8; |
| |
| let consumer_bytes = consumer.to_bytes(); |
| let stream_id_bytes = stream_id.to_bytes(); |
| let topic_id_bytes = topic_id.to_bytes(); |
| let strategy_bytes = strategy.to_bytes(); |
| let mut bytes = BytesMut::with_capacity( |
| 9 + consumer_bytes.len() |
| + stream_id_bytes.len() |
| + topic_id_bytes.len() |
| + strategy_bytes.len(), |
| ); |
| bytes.put_slice(&consumer_bytes); |
| bytes.put_slice(&stream_id_bytes); |
| bytes.put_slice(&topic_id_bytes); |
| bytes.put_u32_le(partition_id); |
| bytes.put_slice(&strategy_bytes); |
| bytes.put_u32_le(count); |
| bytes.put_u8(auto_commit); |
| |
| let command = PollMessages::from_bytes(bytes.freeze()); |
| assert!(command.is_ok()); |
| |
| let auto_commit = matches!(auto_commit, 1); |
| |
| let command = command.unwrap(); |
| assert_eq!(command.consumer, consumer); |
| assert_eq!(command.stream_id, stream_id); |
| assert_eq!(command.topic_id, topic_id); |
| assert_eq!(command.partition_id, Some(partition_id)); |
| assert_eq!(command.strategy, strategy); |
| assert_eq!(command.count, count); |
| assert_eq!(command.auto_commit, auto_commit); |
| } |
| } |