blob: 82df9ca41428b28e1c415919a6bd2570226037bf [file] [log] [blame]
use crate::bytes_serializable::BytesSerializable;
use crate::error::IggyError;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::{BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::fmt::Display;
/// `Consumer` represents the type of consumer that is consuming a message.
/// It can be either a `Consumer` or a `ConsumerGroup`.
/// It consists of the following fields:
/// - `kind`: the type of consumer. It can be either `Consumer` or `ConsumerGroup`.
/// - `id`: the unique identifier of the consumer.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct Consumer {
/// The type of consumer. It can be either `Consumer` or `ConsumerGroup`.
#[serde(skip)]
pub kind: ConsumerKind,
/// The unique identifier of the consumer.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_id")]
pub id: Identifier,
}
/// `ConsumerKind` is an enum that represents the type of consumer.
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Copy, Clone)]
#[serde(rename_all = "snake_case")]
pub enum ConsumerKind {
/// `Consumer` represents a regular consumer.
#[default]
Consumer,
/// `ConsumerGroup` represents a consumer group.
ConsumerGroup,
}
fn default_id() -> Identifier {
Identifier::numeric(1).unwrap()
}
impl Validatable<IggyError> for Consumer {
fn validate(&self) -> Result<(), IggyError> {
Ok(())
}
}
impl Consumer {
/// Creates a new `Consumer` from a `Consumer`.
pub fn from_consumer(consumer: &Consumer) -> Self {
Self {
kind: consumer.kind,
id: consumer.id.clone(),
}
}
/// Creates a new `Consumer` from the `Identifier`.
pub fn new(id: Identifier) -> Self {
Self {
kind: ConsumerKind::Consumer,
id,
}
}
// Creates a new `ConsumerGroup` from the `Identifier`.
pub fn group(id: Identifier) -> Self {
Self {
kind: ConsumerKind::ConsumerGroup,
id,
}
}
}
impl BytesSerializable for Consumer {
fn to_bytes(&self) -> Bytes {
let id_bytes = self.id.to_bytes();
let mut bytes = BytesMut::with_capacity(1 + id_bytes.len());
bytes.put_u8(self.kind.as_code());
bytes.put_slice(&id_bytes);
bytes.freeze()
}
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
where
Self: Sized,
{
if bytes.len() < 4 {
return Err(IggyError::InvalidCommand);
}
let kind = ConsumerKind::from_code(bytes[0])?;
let id = Identifier::from_bytes(bytes.slice(1..))?;
let consumer = Consumer { kind, id };
consumer.validate()?;
Ok(consumer)
}
}
/// `ConsumerKind` is an enum that represents the type of consumer.
impl ConsumerKind {
/// Returns the code of the `ConsumerKind`.
pub fn as_code(&self) -> u8 {
match self {
ConsumerKind::Consumer => 1,
ConsumerKind::ConsumerGroup => 2,
}
}
/// Creates a new `ConsumerKind` from the code.
pub fn from_code(code: u8) -> Result<Self, IggyError> {
match code {
1 => Ok(ConsumerKind::Consumer),
2 => Ok(ConsumerKind::ConsumerGroup),
_ => Err(IggyError::InvalidCommand),
}
}
}
impl Display for Consumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}|{}", self.kind, self.id)
}
}
impl Display for ConsumerKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConsumerKind::Consumer => write!(f, "consumer"),
ConsumerKind::ConsumerGroup => write!(f, "consumer_group"),
}
}
}