blob: 001255598c80af934a11cba789467e9509657a46 [file] [log] [blame]
use crate::state::models::CreatePersonalAccessTokenWithHash;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use iggy::bytes_serializable::BytesSerializable;
use iggy::command::{
Command, CHANGE_PASSWORD_CODE, CREATE_CONSUMER_GROUP_CODE, CREATE_PARTITIONS_CODE,
CREATE_PERSONAL_ACCESS_TOKEN_CODE, CREATE_STREAM_CODE, CREATE_TOPIC_CODE, CREATE_USER_CODE,
DELETE_CONSUMER_GROUP_CODE, DELETE_PARTITIONS_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE,
DELETE_STREAM_CODE, DELETE_TOPIC_CODE, DELETE_USER_CODE, PURGE_STREAM_CODE, PURGE_TOPIC_CODE,
UPDATE_PERMISSIONS_CODE, UPDATE_STREAM_CODE, UPDATE_TOPIC_CODE, UPDATE_USER_CODE,
};
use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup;
use iggy::consumer_groups::delete_consumer_group::DeleteConsumerGroup;
use iggy::error::IggyError;
use iggy::partitions::create_partitions::CreatePartitions;
use iggy::partitions::delete_partitions::DeletePartitions;
use iggy::personal_access_tokens::delete_personal_access_token::DeletePersonalAccessToken;
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
use iggy::streams::purge_stream::PurgeStream;
use iggy::streams::update_stream::UpdateStream;
use iggy::topics::create_topic::CreateTopic;
use iggy::topics::delete_topic::DeleteTopic;
use iggy::topics::purge_topic::PurgeTopic;
use iggy::topics::update_topic::UpdateTopic;
use iggy::users::change_password::ChangePassword;
use iggy::users::create_user::CreateUser;
use iggy::users::delete_user::DeleteUser;
use iggy::users::update_permissions::UpdatePermissions;
use iggy::users::update_user::UpdateUser;
use std::fmt::{Display, Formatter};
#[derive(Debug, PartialEq)]
pub enum EntryCommand {
CreateStream(CreateStream),
UpdateStream(UpdateStream),
DeleteStream(DeleteStream),
PurgeStream(PurgeStream),
CreateTopic(CreateTopic),
UpdateTopic(UpdateTopic),
DeleteTopic(DeleteTopic),
PurgeTopic(PurgeTopic),
CreatePartitions(CreatePartitions),
DeletePartitions(DeletePartitions),
CreateConsumerGroup(CreateConsumerGroup),
DeleteConsumerGroup(DeleteConsumerGroup),
CreateUser(CreateUser),
UpdateUser(UpdateUser),
DeleteUser(DeleteUser),
ChangePassword(ChangePassword),
UpdatePermissions(UpdatePermissions),
CreatePersonalAccessToken(CreatePersonalAccessTokenWithHash),
DeletePersonalAccessToken(DeletePersonalAccessToken),
}
impl BytesSerializable for EntryCommand {
fn to_bytes(&self) -> Bytes {
let (code, command) = match self {
EntryCommand::CreateStream(command) => (command.code(), command.to_bytes()),
EntryCommand::UpdateStream(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteStream(command) => (command.code(), command.to_bytes()),
EntryCommand::PurgeStream(command) => (command.code(), command.to_bytes()),
EntryCommand::CreateTopic(command) => (command.code(), command.to_bytes()),
EntryCommand::UpdateTopic(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteTopic(command) => (command.code(), command.to_bytes()),
EntryCommand::PurgeTopic(command) => (command.code(), command.to_bytes()),
EntryCommand::CreatePartitions(command) => (command.code(), command.to_bytes()),
EntryCommand::DeletePartitions(command) => (command.code(), command.to_bytes()),
EntryCommand::CreateConsumerGroup(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteConsumerGroup(command) => (command.code(), command.to_bytes()),
EntryCommand::CreateUser(command) => (command.code(), command.to_bytes()),
EntryCommand::UpdateUser(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteUser(command) => (command.code(), command.to_bytes()),
EntryCommand::ChangePassword(command) => (command.code(), command.to_bytes()),
EntryCommand::UpdatePermissions(command) => (command.code(), command.to_bytes()),
EntryCommand::CreatePersonalAccessToken(command) => {
(command.code(), command.to_bytes())
}
EntryCommand::DeletePersonalAccessToken(command) => {
(command.code(), command.to_bytes())
}
};
let mut bytes = BytesMut::with_capacity(4 + 4 + command.len());
bytes.put_u32_le(code);
bytes.put_u32_le(command.len() as u32);
bytes.extend(command);
bytes.freeze()
}
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
where
Self: Sized,
{
let code = bytes.slice(0..4).get_u32_le();
let length = bytes.slice(4..8).get_u32_le();
let payload = bytes.slice(8..8 + length as usize);
match code {
CREATE_STREAM_CODE => Ok(EntryCommand::CreateStream(CreateStream::from_bytes(
payload,
)?)),
UPDATE_STREAM_CODE => Ok(EntryCommand::UpdateStream(UpdateStream::from_bytes(
payload,
)?)),
DELETE_STREAM_CODE => Ok(EntryCommand::DeleteStream(DeleteStream::from_bytes(
payload,
)?)),
PURGE_STREAM_CODE => Ok(EntryCommand::PurgeStream(PurgeStream::from_bytes(payload)?)),
CREATE_TOPIC_CODE => Ok(EntryCommand::CreateTopic(CreateTopic::from_bytes(payload)?)),
UPDATE_TOPIC_CODE => Ok(EntryCommand::UpdateTopic(UpdateTopic::from_bytes(payload)?)),
DELETE_TOPIC_CODE => Ok(EntryCommand::DeleteTopic(DeleteTopic::from_bytes(payload)?)),
PURGE_TOPIC_CODE => Ok(EntryCommand::PurgeTopic(PurgeTopic::from_bytes(payload)?)),
CREATE_PARTITIONS_CODE => Ok(EntryCommand::CreatePartitions(
CreatePartitions::from_bytes(payload)?,
)),
DELETE_PARTITIONS_CODE => Ok(EntryCommand::DeletePartitions(
DeletePartitions::from_bytes(payload)?,
)),
CREATE_CONSUMER_GROUP_CODE => Ok(EntryCommand::CreateConsumerGroup(
CreateConsumerGroup::from_bytes(payload)?,
)),
DELETE_CONSUMER_GROUP_CODE => Ok(EntryCommand::DeleteConsumerGroup(
DeleteConsumerGroup::from_bytes(payload)?,
)),
CREATE_USER_CODE => Ok(EntryCommand::CreateUser(CreateUser::from_bytes(payload)?)),
UPDATE_USER_CODE => Ok(EntryCommand::UpdateUser(UpdateUser::from_bytes(payload)?)),
DELETE_USER_CODE => Ok(EntryCommand::DeleteUser(DeleteUser::from_bytes(payload)?)),
CHANGE_PASSWORD_CODE => Ok(EntryCommand::ChangePassword(ChangePassword::from_bytes(
payload,
)?)),
UPDATE_PERMISSIONS_CODE => Ok(EntryCommand::UpdatePermissions(
UpdatePermissions::from_bytes(payload)?,
)),
CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok(EntryCommand::CreatePersonalAccessToken(
CreatePersonalAccessTokenWithHash::from_bytes(payload)?,
)),
DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok(EntryCommand::DeletePersonalAccessToken(
DeletePersonalAccessToken::from_bytes(payload)?,
)),
_ => Err(IggyError::InvalidCommand),
}
}
}
impl Display for EntryCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
EntryCommand::CreateStream(command) => write!(f, "CreateStream({})", command),
EntryCommand::UpdateStream(command) => write!(f, "UpdateStream({})", command),
EntryCommand::DeleteStream(command) => write!(f, "DeleteStream({})", command),
EntryCommand::PurgeStream(command) => write!(f, "PurgeStream({})", command),
EntryCommand::CreateTopic(command) => write!(f, "CreateTopic({})", command),
EntryCommand::UpdateTopic(command) => write!(f, "UpdateTopic({})", command),
EntryCommand::DeleteTopic(command) => write!(f, "DeleteTopic({})", command),
EntryCommand::PurgeTopic(command) => write!(f, "PurgeTopic({})", command),
EntryCommand::CreatePartitions(command) => write!(f, "CreatePartitions({})", command),
EntryCommand::DeletePartitions(command) => write!(f, "DeletePartitions({})", command),
EntryCommand::CreateConsumerGroup(command) => {
write!(f, "CreateConsumerGroup({})", command)
}
EntryCommand::DeleteConsumerGroup(command) => {
write!(f, "DeleteConsumerGroup({})", command)
}
EntryCommand::CreateUser(command) => write!(f, "CreateUser({})", command),
EntryCommand::UpdateUser(command) => write!(f, "UpdateUser({})", command),
EntryCommand::DeleteUser(command) => write!(f, "DeleteUser({})", command),
EntryCommand::ChangePassword(command) => write!(f, "ChangePassword({})", command),
EntryCommand::UpdatePermissions(command) => write!(f, "UpdatePermissions({})", command),
EntryCommand::CreatePersonalAccessToken(command) => {
write!(f, "CreatePersonalAccessToken({})", command)
}
EntryCommand::DeletePersonalAccessToken(command) => {
write!(f, "DeletePersonalAccessToken({})", command)
}
}
}
}