blob: 0cf32621fb69c5b0c2340ba6609d1b3620f175e8 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::state::models::{
CreateConsumerGroupWithId, CreatePersonalAccessTokenWithHash, CreateStreamWithId,
CreateTopicWithId, CreateUserWithId,
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use iggy_common::BytesSerializable;
use iggy_common::IggyError;
use iggy_common::change_password::ChangePassword;
use iggy_common::create_partitions::CreatePartitions;
use iggy_common::delete_consumer_group::DeleteConsumerGroup;
use iggy_common::delete_partitions::DeletePartitions;
use iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
use iggy_common::delete_segments::DeleteSegments;
use iggy_common::delete_stream::DeleteStream;
use iggy_common::delete_topic::DeleteTopic;
use iggy_common::delete_user::DeleteUser;
use iggy_common::purge_stream::PurgeStream;
use iggy_common::purge_topic::PurgeTopic;
use iggy_common::update_permissions::UpdatePermissions;
use iggy_common::update_stream::UpdateStream;
use iggy_common::update_topic::UpdateTopic;
use iggy_common::update_user::UpdateUser;
use iggy_common::{
CHANGE_PASSWORD_CODE, CREATE_CONSUMER_GROUP_CODE, CREATE_PARTITIONS_CODE,
CREATE_PERSONAL_ACCESS_TOKEN_CODE, CREATE_STREAM_CODE, CREATE_TOPIC_CODE, CREATE_USER_CODE,
Command, DELETE_CONSUMER_GROUP_CODE, DELETE_PARTITIONS_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE,
DELETE_STREAM_CODE, DELETE_TOPIC_CODE, DELETE_USER_CODE, PURGE_STREAM_CODE, PURGE_TOPIC_CODE,
UPDATE_PERMISSIONS_CODE, UPDATE_STREAM_CODE, UPDATE_TOPIC_CODE, UPDATE_USER_CODE,
};
use std::fmt::{Display, Formatter};
#[derive(Debug, PartialEq)]
pub enum EntryCommand {
CreateStream(CreateStreamWithId),
UpdateStream(UpdateStream),
DeleteStream(DeleteStream),
PurgeStream(PurgeStream),
CreateTopic(CreateTopicWithId),
UpdateTopic(UpdateTopic),
DeleteTopic(DeleteTopic),
PurgeTopic(PurgeTopic),
CreatePartitions(CreatePartitions),
DeletePartitions(DeletePartitions),
DeleteSegments(DeleteSegments),
CreateConsumerGroup(CreateConsumerGroupWithId),
DeleteConsumerGroup(DeleteConsumerGroup),
CreateUser(CreateUserWithId),
UpdateUser(UpdateUser),
DeleteUser(DeleteUser),
ChangePassword(ChangePassword),
UpdatePermissions(UpdatePermissions),
CreatePersonalAccessToken(CreatePersonalAccessTokenWithHash),
DeletePersonalAccessToken(DeletePersonalAccessToken),
}
impl BytesSerializable for EntryCommand {
fn to_bytes(&self) -> Bytes {
let (code, command) = match self {
EntryCommand::CreateStream(command) => (command.code(), command.to_bytes()),
EntryCommand::UpdateStream(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteStream(command) => (command.code(), command.to_bytes()),
EntryCommand::PurgeStream(command) => (command.code(), command.to_bytes()),
EntryCommand::CreateTopic(command) => (command.code(), command.to_bytes()),
EntryCommand::UpdateTopic(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteTopic(command) => (command.code(), command.to_bytes()),
EntryCommand::PurgeTopic(command) => (command.code(), command.to_bytes()),
EntryCommand::CreatePartitions(command) => (command.code(), command.to_bytes()),
EntryCommand::DeletePartitions(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteSegments(command) => (command.code(), command.to_bytes()),
EntryCommand::CreateConsumerGroup(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteConsumerGroup(command) => (command.code(), command.to_bytes()),
EntryCommand::CreateUser(command) => (command.code(), command.to_bytes()),
EntryCommand::UpdateUser(command) => (command.code(), command.to_bytes()),
EntryCommand::DeleteUser(command) => (command.code(), command.to_bytes()),
EntryCommand::ChangePassword(command) => (command.code(), command.to_bytes()),
EntryCommand::UpdatePermissions(command) => (command.code(), command.to_bytes()),
EntryCommand::CreatePersonalAccessToken(command) => {
(command.code(), command.to_bytes())
}
EntryCommand::DeletePersonalAccessToken(command) => {
(command.code(), command.to_bytes())
}
};
let mut bytes = BytesMut::with_capacity(4 + 4 + command.len());
bytes.put_u32_le(code);
bytes.put_u32_le(command.len() as u32);
bytes.extend(command);
bytes.freeze()
}
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
where
Self: Sized,
{
let code = bytes.slice(0..4).get_u32_le();
let length = bytes.slice(4..8).get_u32_le();
let payload = bytes.slice(8..8 + length as usize);
match code {
CREATE_STREAM_CODE => Ok(EntryCommand::CreateStream(CreateStreamWithId::from_bytes(
payload,
)?)),
UPDATE_STREAM_CODE => Ok(EntryCommand::UpdateStream(UpdateStream::from_bytes(
payload,
)?)),
DELETE_STREAM_CODE => Ok(EntryCommand::DeleteStream(DeleteStream::from_bytes(
payload,
)?)),
PURGE_STREAM_CODE => Ok(EntryCommand::PurgeStream(PurgeStream::from_bytes(payload)?)),
CREATE_TOPIC_CODE => Ok(EntryCommand::CreateTopic(CreateTopicWithId::from_bytes(
payload,
)?)),
UPDATE_TOPIC_CODE => Ok(EntryCommand::UpdateTopic(UpdateTopic::from_bytes(payload)?)),
DELETE_TOPIC_CODE => Ok(EntryCommand::DeleteTopic(DeleteTopic::from_bytes(payload)?)),
PURGE_TOPIC_CODE => Ok(EntryCommand::PurgeTopic(PurgeTopic::from_bytes(payload)?)),
CREATE_PARTITIONS_CODE => Ok(EntryCommand::CreatePartitions(
CreatePartitions::from_bytes(payload)?,
)),
DELETE_PARTITIONS_CODE => Ok(EntryCommand::DeletePartitions(
DeletePartitions::from_bytes(payload)?,
)),
CREATE_CONSUMER_GROUP_CODE => Ok(EntryCommand::CreateConsumerGroup(
CreateConsumerGroupWithId::from_bytes(payload)?,
)),
DELETE_CONSUMER_GROUP_CODE => Ok(EntryCommand::DeleteConsumerGroup(
DeleteConsumerGroup::from_bytes(payload)?,
)),
CREATE_USER_CODE => Ok(EntryCommand::CreateUser(CreateUserWithId::from_bytes(
payload,
)?)),
UPDATE_USER_CODE => Ok(EntryCommand::UpdateUser(UpdateUser::from_bytes(payload)?)),
DELETE_USER_CODE => Ok(EntryCommand::DeleteUser(DeleteUser::from_bytes(payload)?)),
CHANGE_PASSWORD_CODE => Ok(EntryCommand::ChangePassword(ChangePassword::from_bytes(
payload,
)?)),
UPDATE_PERMISSIONS_CODE => Ok(EntryCommand::UpdatePermissions(
UpdatePermissions::from_bytes(payload)?,
)),
CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok(EntryCommand::CreatePersonalAccessToken(
CreatePersonalAccessTokenWithHash::from_bytes(payload)?,
)),
DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok(EntryCommand::DeletePersonalAccessToken(
DeletePersonalAccessToken::from_bytes(payload)?,
)),
_ => Err(IggyError::InvalidCommand),
}
}
}
impl Display for EntryCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
EntryCommand::CreateStream(command) => write!(f, "CreateStream({command})"),
EntryCommand::UpdateStream(command) => write!(f, "UpdateStream({command})"),
EntryCommand::DeleteStream(command) => write!(f, "DeleteStream({command})"),
EntryCommand::PurgeStream(command) => write!(f, "PurgeStream({command})"),
EntryCommand::CreateTopic(command) => write!(f, "CreateTopic({command})"),
EntryCommand::UpdateTopic(command) => write!(f, "UpdateTopic({command})"),
EntryCommand::DeleteTopic(command) => write!(f, "DeleteTopic({command})"),
EntryCommand::PurgeTopic(command) => write!(f, "PurgeTopic({command})"),
EntryCommand::CreatePartitions(command) => write!(f, "CreatePartitions({command})"),
EntryCommand::DeletePartitions(command) => write!(f, "DeletePartitions({command})"),
EntryCommand::DeleteSegments(command) => write!(f, "DeleteSegments({command})"),
EntryCommand::CreateConsumerGroup(command) => {
write!(f, "CreateConsumerGroup({command})")
}
EntryCommand::DeleteConsumerGroup(command) => {
write!(f, "DeleteConsumerGroup({command})")
}
EntryCommand::CreateUser(command) => write!(f, "CreateUser({command})"),
EntryCommand::UpdateUser(command) => write!(f, "UpdateUser({command})"),
EntryCommand::DeleteUser(command) => write!(f, "DeleteUser({command})"),
EntryCommand::ChangePassword(command) => write!(f, "ChangePassword({command})"),
EntryCommand::UpdatePermissions(command) => write!(f, "UpdatePermissions({command})"),
EntryCommand::CreatePersonalAccessToken(command) => {
write!(f, "CreatePersonalAccessToken({command})")
}
EntryCommand::DeletePersonalAccessToken(command) => {
write!(f, "DeletePersonalAccessToken({command})")
}
}
}
}