blob: e419ba1d4269dc700e63bfe399e302b589375d9d [file] [log] [blame]
use bytes::{BufMut, Bytes, BytesMut};
use iggy::command::*;
use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup;
use iggy::consumer_groups::delete_consumer_group::DeleteConsumerGroup;
use iggy::consumer_groups::get_consumer_group::GetConsumerGroup;
use iggy::consumer_groups::get_consumer_groups::GetConsumerGroups;
use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup;
use iggy::consumer_groups::leave_consumer_group::LeaveConsumerGroup;
use iggy::consumer_offsets::delete_consumer_offset::DeleteConsumerOffset;
use iggy::consumer_offsets::get_consumer_offset::GetConsumerOffset;
use iggy::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
use iggy::error::IggyError;
use iggy::messages::poll_messages::PollMessages;
use iggy::messages::send_messages::SendMessages;
use iggy::partitions::create_partitions::CreatePartitions;
use iggy::partitions::delete_partitions::DeletePartitions;
use iggy::personal_access_tokens::create_personal_access_token::CreatePersonalAccessToken;
use iggy::personal_access_tokens::delete_personal_access_token::DeletePersonalAccessToken;
use iggy::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokens;
use iggy::personal_access_tokens::login_with_personal_access_token::LoginWithPersonalAccessToken;
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
use iggy::streams::get_stream::GetStream;
use iggy::streams::get_streams::GetStreams;
use iggy::streams::purge_stream::PurgeStream;
use iggy::streams::update_stream::UpdateStream;
use iggy::system::get_client::GetClient;
use iggy::system::get_clients::GetClients;
use iggy::system::get_me::GetMe;
use iggy::system::get_snapshot::GetSnapshot;
use iggy::system::get_stats::GetStats;
use iggy::system::ping::Ping;
use iggy::topics::create_topic::CreateTopic;
use iggy::topics::delete_topic::DeleteTopic;
use iggy::topics::get_topic::GetTopic;
use iggy::topics::get_topics::GetTopics;
use iggy::topics::purge_topic::PurgeTopic;
use iggy::topics::update_topic::UpdateTopic;
use iggy::users::change_password::ChangePassword;
use iggy::users::create_user::CreateUser;
use iggy::users::delete_user::DeleteUser;
use iggy::users::get_user::GetUser;
use iggy::users::get_users::GetUsers;
use iggy::users::login_user::LoginUser;
use iggy::users::logout_user::LogoutUser;
use iggy::users::update_permissions::UpdatePermissions;
use iggy::users::update_user::UpdateUser;
use iggy::validatable::Validatable;
use iggy::{
bytes_serializable::BytesSerializable, messages::flush_unsaved_buffer::FlushUnsavedBuffer,
};
use std::fmt::{Display, Formatter};
use strum::EnumString;
#[derive(Debug, PartialEq, EnumString)]
pub enum ServerCommand {
Ping(Ping),
GetStats(GetStats),
GetMe(GetMe),
GetClient(GetClient),
GetClients(GetClients),
GetUser(GetUser),
GetUsers(GetUsers),
CreateUser(CreateUser),
DeleteUser(DeleteUser),
UpdateUser(UpdateUser),
UpdatePermissions(UpdatePermissions),
ChangePassword(ChangePassword),
LoginUser(LoginUser),
LogoutUser(LogoutUser),
GetPersonalAccessTokens(GetPersonalAccessTokens),
CreatePersonalAccessToken(CreatePersonalAccessToken),
DeletePersonalAccessToken(DeletePersonalAccessToken),
LoginWithPersonalAccessToken(LoginWithPersonalAccessToken),
SendMessages(SendMessages),
PollMessages(PollMessages),
FlushUnsavedBuffer(FlushUnsavedBuffer),
GetConsumerOffset(GetConsumerOffset),
StoreConsumerOffset(StoreConsumerOffset),
DeleteConsumerOffset(DeleteConsumerOffset),
GetStream(GetStream),
GetStreams(GetStreams),
CreateStream(CreateStream),
DeleteStream(DeleteStream),
UpdateStream(UpdateStream),
PurgeStream(PurgeStream),
GetTopic(GetTopic),
GetTopics(GetTopics),
CreateTopic(CreateTopic),
DeleteTopic(DeleteTopic),
UpdateTopic(UpdateTopic),
PurgeTopic(PurgeTopic),
CreatePartitions(CreatePartitions),
DeletePartitions(DeletePartitions),
GetConsumerGroup(GetConsumerGroup),
GetConsumerGroups(GetConsumerGroups),
CreateConsumerGroup(CreateConsumerGroup),
DeleteConsumerGroup(DeleteConsumerGroup),
JoinConsumerGroup(JoinConsumerGroup),
LeaveConsumerGroup(LeaveConsumerGroup),
GetSnapshotFile(GetSnapshot),
}
impl BytesSerializable for ServerCommand {
fn to_bytes(&self) -> Bytes {
match self {
ServerCommand::Ping(payload) => as_bytes(payload),
ServerCommand::GetStats(payload) => as_bytes(payload),
ServerCommand::GetMe(payload) => as_bytes(payload),
ServerCommand::GetClient(payload) => as_bytes(payload),
ServerCommand::GetClients(payload) => as_bytes(payload),
ServerCommand::GetUser(payload) => as_bytes(payload),
ServerCommand::GetUsers(payload) => as_bytes(payload),
ServerCommand::CreateUser(payload) => as_bytes(payload),
ServerCommand::DeleteUser(payload) => as_bytes(payload),
ServerCommand::UpdateUser(payload) => as_bytes(payload),
ServerCommand::UpdatePermissions(payload) => as_bytes(payload),
ServerCommand::ChangePassword(payload) => as_bytes(payload),
ServerCommand::LoginUser(payload) => as_bytes(payload),
ServerCommand::LogoutUser(payload) => as_bytes(payload),
ServerCommand::GetPersonalAccessTokens(payload) => as_bytes(payload),
ServerCommand::CreatePersonalAccessToken(payload) => as_bytes(payload),
ServerCommand::DeletePersonalAccessToken(payload) => as_bytes(payload),
ServerCommand::LoginWithPersonalAccessToken(payload) => as_bytes(payload),
ServerCommand::SendMessages(payload) => as_bytes(payload),
ServerCommand::PollMessages(payload) => as_bytes(payload),
ServerCommand::StoreConsumerOffset(payload) => as_bytes(payload),
ServerCommand::DeleteConsumerOffset(payload) => as_bytes(payload),
ServerCommand::GetConsumerOffset(payload) => as_bytes(payload),
ServerCommand::GetStream(payload) => as_bytes(payload),
ServerCommand::GetStreams(payload) => as_bytes(payload),
ServerCommand::CreateStream(payload) => as_bytes(payload),
ServerCommand::DeleteStream(payload) => as_bytes(payload),
ServerCommand::UpdateStream(payload) => as_bytes(payload),
ServerCommand::PurgeStream(payload) => as_bytes(payload),
ServerCommand::GetTopic(payload) => as_bytes(payload),
ServerCommand::GetTopics(payload) => as_bytes(payload),
ServerCommand::CreateTopic(payload) => as_bytes(payload),
ServerCommand::DeleteTopic(payload) => as_bytes(payload),
ServerCommand::UpdateTopic(payload) => as_bytes(payload),
ServerCommand::PurgeTopic(payload) => as_bytes(payload),
ServerCommand::CreatePartitions(payload) => as_bytes(payload),
ServerCommand::DeletePartitions(payload) => as_bytes(payload),
ServerCommand::GetConsumerGroup(payload) => as_bytes(payload),
ServerCommand::GetConsumerGroups(payload) => as_bytes(payload),
ServerCommand::CreateConsumerGroup(payload) => as_bytes(payload),
ServerCommand::DeleteConsumerGroup(payload) => as_bytes(payload),
ServerCommand::JoinConsumerGroup(payload) => as_bytes(payload),
ServerCommand::LeaveConsumerGroup(payload) => as_bytes(payload),
ServerCommand::FlushUnsavedBuffer(payload) => as_bytes(payload),
ServerCommand::GetSnapshotFile(payload) => as_bytes(payload),
}
}
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
let code = u32::from_le_bytes(
bytes[..4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let payload = bytes.slice(4..);
match code {
PING_CODE => Ok(ServerCommand::Ping(Ping::from_bytes(payload)?)),
GET_STATS_CODE => Ok(ServerCommand::GetStats(GetStats::from_bytes(payload)?)),
GET_ME_CODE => Ok(ServerCommand::GetMe(GetMe::from_bytes(payload)?)),
GET_CLIENT_CODE => Ok(ServerCommand::GetClient(GetClient::from_bytes(payload)?)),
GET_CLIENTS_CODE => Ok(ServerCommand::GetClients(GetClients::from_bytes(payload)?)),
GET_USER_CODE => Ok(ServerCommand::GetUser(GetUser::from_bytes(payload)?)),
GET_USERS_CODE => Ok(ServerCommand::GetUsers(GetUsers::from_bytes(payload)?)),
CREATE_USER_CODE => Ok(ServerCommand::CreateUser(CreateUser::from_bytes(payload)?)),
DELETE_USER_CODE => Ok(ServerCommand::DeleteUser(DeleteUser::from_bytes(payload)?)),
UPDATE_USER_CODE => Ok(ServerCommand::UpdateUser(UpdateUser::from_bytes(payload)?)),
UPDATE_PERMISSIONS_CODE => Ok(ServerCommand::UpdatePermissions(
UpdatePermissions::from_bytes(payload)?,
)),
CHANGE_PASSWORD_CODE => Ok(ServerCommand::ChangePassword(ChangePassword::from_bytes(
payload,
)?)),
LOGIN_USER_CODE => Ok(ServerCommand::LoginUser(LoginUser::from_bytes(payload)?)),
LOGOUT_USER_CODE => Ok(ServerCommand::LogoutUser(LogoutUser::from_bytes(payload)?)),
GET_PERSONAL_ACCESS_TOKENS_CODE => Ok(ServerCommand::GetPersonalAccessTokens(
GetPersonalAccessTokens::from_bytes(payload)?,
)),
CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok(ServerCommand::CreatePersonalAccessToken(
CreatePersonalAccessToken::from_bytes(payload)?,
)),
DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok(ServerCommand::DeletePersonalAccessToken(
DeletePersonalAccessToken::from_bytes(payload)?,
)),
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => {
Ok(ServerCommand::LoginWithPersonalAccessToken(
LoginWithPersonalAccessToken::from_bytes(payload)?,
))
}
SEND_MESSAGES_CODE => Ok(ServerCommand::SendMessages(SendMessages::from_bytes(
payload,
)?)),
POLL_MESSAGES_CODE => Ok(ServerCommand::PollMessages(PollMessages::from_bytes(
payload,
)?)),
FLUSH_UNSAVED_BUFFER_CODE => Ok(ServerCommand::FlushUnsavedBuffer(
FlushUnsavedBuffer::from_bytes(payload)?,
)),
STORE_CONSUMER_OFFSET_CODE => Ok(ServerCommand::StoreConsumerOffset(
StoreConsumerOffset::from_bytes(payload)?,
)),
DELETE_CONSUMER_OFFSET_CODE => Ok(ServerCommand::DeleteConsumerOffset(
DeleteConsumerOffset::from_bytes(payload)?,
)),
GET_CONSUMER_OFFSET_CODE => Ok(ServerCommand::GetConsumerOffset(
GetConsumerOffset::from_bytes(payload)?,
)),
GET_STREAM_CODE => Ok(ServerCommand::GetStream(GetStream::from_bytes(payload)?)),
GET_STREAMS_CODE => Ok(ServerCommand::GetStreams(GetStreams::from_bytes(payload)?)),
CREATE_STREAM_CODE => Ok(ServerCommand::CreateStream(CreateStream::from_bytes(
payload,
)?)),
DELETE_STREAM_CODE => Ok(ServerCommand::DeleteStream(DeleteStream::from_bytes(
payload,
)?)),
UPDATE_STREAM_CODE => Ok(ServerCommand::UpdateStream(UpdateStream::from_bytes(
payload,
)?)),
PURGE_STREAM_CODE => Ok(ServerCommand::PurgeStream(PurgeStream::from_bytes(
payload,
)?)),
GET_TOPIC_CODE => Ok(ServerCommand::GetTopic(GetTopic::from_bytes(payload)?)),
GET_TOPICS_CODE => Ok(ServerCommand::GetTopics(GetTopics::from_bytes(payload)?)),
CREATE_TOPIC_CODE => Ok(ServerCommand::CreateTopic(CreateTopic::from_bytes(
payload,
)?)),
DELETE_TOPIC_CODE => Ok(ServerCommand::DeleteTopic(DeleteTopic::from_bytes(
payload,
)?)),
UPDATE_TOPIC_CODE => Ok(ServerCommand::UpdateTopic(UpdateTopic::from_bytes(
payload,
)?)),
PURGE_TOPIC_CODE => Ok(ServerCommand::PurgeTopic(PurgeTopic::from_bytes(payload)?)),
CREATE_PARTITIONS_CODE => Ok(ServerCommand::CreatePartitions(
CreatePartitions::from_bytes(payload)?,
)),
DELETE_PARTITIONS_CODE => Ok(ServerCommand::DeletePartitions(
DeletePartitions::from_bytes(payload)?,
)),
GET_CONSUMER_GROUP_CODE => Ok(ServerCommand::GetConsumerGroup(
GetConsumerGroup::from_bytes(payload)?,
)),
GET_CONSUMER_GROUPS_CODE => Ok(ServerCommand::GetConsumerGroups(
GetConsumerGroups::from_bytes(payload)?,
)),
CREATE_CONSUMER_GROUP_CODE => Ok(ServerCommand::CreateConsumerGroup(
CreateConsumerGroup::from_bytes(payload)?,
)),
DELETE_CONSUMER_GROUP_CODE => Ok(ServerCommand::DeleteConsumerGroup(
DeleteConsumerGroup::from_bytes(payload)?,
)),
JOIN_CONSUMER_GROUP_CODE => Ok(ServerCommand::JoinConsumerGroup(
JoinConsumerGroup::from_bytes(payload)?,
)),
LEAVE_CONSUMER_GROUP_CODE => Ok(ServerCommand::LeaveConsumerGroup(
LeaveConsumerGroup::from_bytes(payload)?,
)),
GET_SNAPSHOT_FILE_CODE => Ok(ServerCommand::GetSnapshotFile(GetSnapshot::from_bytes(
payload,
)?)),
_ => Err(IggyError::InvalidCommand),
}
}
}
fn as_bytes<T: Command>(command: &T) -> Bytes {
let payload = command.to_bytes();
let mut bytes = BytesMut::with_capacity(4 + payload.len());
bytes.put_u32_le(command.code());
bytes.put_slice(&payload);
bytes.freeze()
}
impl Validatable<IggyError> for ServerCommand {
fn validate(&self) -> Result<(), IggyError> {
match self {
ServerCommand::Ping(command) => command.validate(),
ServerCommand::GetStats(command) => command.validate(),
ServerCommand::GetMe(command) => command.validate(),
ServerCommand::GetClient(command) => command.validate(),
ServerCommand::GetClients(command) => command.validate(),
ServerCommand::GetUser(command) => command.validate(),
ServerCommand::GetUsers(command) => command.validate(),
ServerCommand::CreateUser(command) => command.validate(),
ServerCommand::DeleteUser(command) => command.validate(),
ServerCommand::UpdateUser(command) => command.validate(),
ServerCommand::UpdatePermissions(command) => command.validate(),
ServerCommand::ChangePassword(command) => command.validate(),
ServerCommand::LoginUser(command) => command.validate(),
ServerCommand::LogoutUser(command) => command.validate(),
ServerCommand::GetPersonalAccessTokens(command) => command.validate(),
ServerCommand::CreatePersonalAccessToken(command) => command.validate(),
ServerCommand::DeletePersonalAccessToken(command) => command.validate(),
ServerCommand::LoginWithPersonalAccessToken(command) => command.validate(),
ServerCommand::SendMessages(command) => command.validate(),
ServerCommand::PollMessages(command) => command.validate(),
ServerCommand::StoreConsumerOffset(command) => command.validate(),
ServerCommand::DeleteConsumerOffset(command) => command.validate(),
ServerCommand::GetConsumerOffset(command) => command.validate(),
ServerCommand::GetStream(command) => command.validate(),
ServerCommand::GetStreams(command) => command.validate(),
ServerCommand::CreateStream(command) => command.validate(),
ServerCommand::DeleteStream(command) => command.validate(),
ServerCommand::UpdateStream(command) => command.validate(),
ServerCommand::PurgeStream(command) => command.validate(),
ServerCommand::GetTopic(command) => command.validate(),
ServerCommand::GetTopics(command) => command.validate(),
ServerCommand::CreateTopic(command) => command.validate(),
ServerCommand::DeleteTopic(command) => command.validate(),
ServerCommand::UpdateTopic(command) => command.validate(),
ServerCommand::PurgeTopic(command) => command.validate(),
ServerCommand::CreatePartitions(command) => command.validate(),
ServerCommand::DeletePartitions(command) => command.validate(),
ServerCommand::GetConsumerGroup(command) => command.validate(),
ServerCommand::GetConsumerGroups(command) => command.validate(),
ServerCommand::CreateConsumerGroup(command) => command.validate(),
ServerCommand::DeleteConsumerGroup(command) => command.validate(),
ServerCommand::JoinConsumerGroup(command) => command.validate(),
ServerCommand::LeaveConsumerGroup(command) => command.validate(),
ServerCommand::FlushUnsavedBuffer(command) => command.validate(),
ServerCommand::GetSnapshotFile(command) => command.validate(),
}
}
}
impl Display for ServerCommand {
fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ServerCommand::Ping(_) => write!(formatter, "{PING}"),
ServerCommand::GetStats(_) => write!(formatter, "{GET_STATS}"),
ServerCommand::GetMe(_) => write!(formatter, "{GET_ME}"),
ServerCommand::GetClient(payload) => write!(formatter, "{GET_CLIENT}|{payload}"),
ServerCommand::GetClients(_) => write!(formatter, "{GET_CLIENTS}"),
ServerCommand::GetUser(payload) => write!(formatter, "{GET_USER}|{payload}"),
ServerCommand::GetUsers(_) => write!(formatter, "{GET_USERS}"),
ServerCommand::CreateUser(payload) => write!(formatter, "{CREATE_USER}|{payload}"),
ServerCommand::DeleteUser(payload) => write!(formatter, "{DELETE_USER}|{payload}"),
ServerCommand::UpdateUser(payload) => write!(formatter, "{UPDATE_USER}|{payload}"),
ServerCommand::UpdatePermissions(payload) => {
write!(formatter, "{UPDATE_PERMISSIONS}|{payload}")
}
ServerCommand::ChangePassword(payload) => {
write!(formatter, "{CHANGE_PASSWORD}|{payload}")
}
ServerCommand::LoginUser(payload) => write!(formatter, "{LOGIN_USER}|{payload}"),
ServerCommand::LogoutUser(_) => write!(formatter, "{LOGOUT_USER}"),
ServerCommand::GetPersonalAccessTokens(_) => {
write!(formatter, "{GET_PERSONAL_ACCESS_TOKENS}")
}
ServerCommand::CreatePersonalAccessToken(payload) => {
write!(formatter, "{CREATE_PERSONAL_ACCESS_TOKEN}|{payload}")
}
ServerCommand::DeletePersonalAccessToken(payload) => {
write!(formatter, "{DELETE_PERSONAL_ACCESS_TOKEN}|{payload}")
}
ServerCommand::LoginWithPersonalAccessToken(payload) => {
write!(formatter, "{LOGIN_WITH_PERSONAL_ACCESS_TOKEN}|{payload}")
}
ServerCommand::GetStream(payload) => write!(formatter, "{GET_STREAM}|{payload}"),
ServerCommand::GetStreams(_) => write!(formatter, "{GET_STREAMS}"),
ServerCommand::CreateStream(payload) => write!(formatter, "{CREATE_STREAM}|{payload}"),
ServerCommand::DeleteStream(payload) => write!(formatter, "{DELETE_STREAM}|{payload}"),
ServerCommand::UpdateStream(payload) => write!(formatter, "{UPDATE_STREAM}|{payload}"),
ServerCommand::PurgeStream(payload) => write!(formatter, "{PURGE_STREAM}|{payload}"),
ServerCommand::GetTopic(payload) => write!(formatter, "{GET_TOPIC}|{payload}"),
ServerCommand::GetTopics(payload) => write!(formatter, "{GET_TOPICS}|{payload}"),
ServerCommand::CreateTopic(payload) => write!(formatter, "{CREATE_TOPIC}|{payload}"),
ServerCommand::DeleteTopic(payload) => write!(formatter, "{DELETE_TOPIC}|{payload}"),
ServerCommand::UpdateTopic(payload) => write!(formatter, "{UPDATE_TOPIC}|{payload}"),
ServerCommand::PurgeTopic(payload) => write!(formatter, "{PURGE_TOPIC}|{payload}"),
ServerCommand::CreatePartitions(payload) => {
write!(formatter, "{CREATE_PARTITIONS}|{payload}")
}
ServerCommand::DeletePartitions(payload) => {
write!(formatter, "{DELETE_PARTITIONS}|{payload}")
}
ServerCommand::PollMessages(payload) => write!(formatter, "{POLL_MESSAGES}|{payload}"),
ServerCommand::SendMessages(payload) => write!(formatter, "{SEND_MESSAGES}|{payload}"),
ServerCommand::StoreConsumerOffset(payload) => {
write!(formatter, "{STORE_CONSUMER_OFFSET}|{payload}")
}
ServerCommand::DeleteConsumerOffset(payload) => {
write!(formatter, "{DELETE_CONSUMER_OFFSET}|{payload}")
}
ServerCommand::GetConsumerOffset(payload) => {
write!(formatter, "{GET_CONSUMER_OFFSET}|{payload}")
}
ServerCommand::GetConsumerGroup(payload) => {
write!(formatter, "{GET_CONSUMER_GROUP}|{payload}")
}
ServerCommand::GetConsumerGroups(payload) => {
write!(formatter, "{GET_CONSUMER_GROUPS}|{payload}")
}
ServerCommand::CreateConsumerGroup(payload) => {
write!(formatter, "{CREATE_CONSUMER_GROUP}|{payload}")
}
ServerCommand::DeleteConsumerGroup(payload) => {
write!(formatter, "{DELETE_CONSUMER_GROUP}|{payload}")
}
ServerCommand::JoinConsumerGroup(payload) => {
write!(formatter, "{JOIN_CONSUMER_GROUP}|{payload}")
}
ServerCommand::LeaveConsumerGroup(payload) => {
write!(formatter, "{LEAVE_CONSUMER_GROUP}|{payload}")
}
ServerCommand::FlushUnsavedBuffer(payload) => {
write!(formatter, "{FLUSH_UNSAVED_BUFFER}|{payload}")
}
ServerCommand::GetSnapshotFile(payload) => {
write!(formatter, "{GET_SNAPSHOT_FILE}|{payload}")
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_be_serialized_as_bytes_and_deserialized_from_bytes() {
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::Ping(Ping::default()),
PING_CODE,
&Ping::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetStats(GetStats::default()),
GET_STATS_CODE,
&GetStats::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetMe(GetMe::default()),
GET_ME_CODE,
&GetMe::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetClient(GetClient::default()),
GET_CLIENT_CODE,
&GetClient::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetClients(GetClients::default()),
GET_CLIENTS_CODE,
&GetClients::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetUser(GetUser::default()),
GET_USER_CODE,
&GetUser::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetUsers(GetUsers::default()),
GET_USERS_CODE,
&GetUsers::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::CreateUser(CreateUser::default()),
CREATE_USER_CODE,
&CreateUser::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::DeleteUser(DeleteUser::default()),
DELETE_USER_CODE,
&DeleteUser::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::UpdateUser(UpdateUser::default()),
UPDATE_USER_CODE,
&UpdateUser::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::UpdatePermissions(UpdatePermissions::default()),
UPDATE_PERMISSIONS_CODE,
&UpdatePermissions::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::ChangePassword(ChangePassword::default()),
CHANGE_PASSWORD_CODE,
&ChangePassword::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::LoginUser(LoginUser::default()),
LOGIN_USER_CODE,
&LoginUser::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::LogoutUser(LogoutUser::default()),
LOGOUT_USER_CODE,
&LogoutUser::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetPersonalAccessTokens(GetPersonalAccessTokens::default()),
GET_PERSONAL_ACCESS_TOKENS_CODE,
&GetPersonalAccessTokens::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::CreatePersonalAccessToken(CreatePersonalAccessToken::default()),
CREATE_PERSONAL_ACCESS_TOKEN_CODE,
&CreatePersonalAccessToken::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::DeletePersonalAccessToken(DeletePersonalAccessToken::default()),
DELETE_PERSONAL_ACCESS_TOKEN_CODE,
&DeletePersonalAccessToken::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::LoginWithPersonalAccessToken(LoginWithPersonalAccessToken::default()),
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
&LoginWithPersonalAccessToken::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::SendMessages(SendMessages::default()),
SEND_MESSAGES_CODE,
&SendMessages::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::PollMessages(PollMessages::default()),
POLL_MESSAGES_CODE,
&PollMessages::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::StoreConsumerOffset(StoreConsumerOffset::default()),
STORE_CONSUMER_OFFSET_CODE,
&StoreConsumerOffset::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetConsumerOffset(GetConsumerOffset::default()),
GET_CONSUMER_OFFSET_CODE,
&GetConsumerOffset::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetStream(GetStream::default()),
GET_STREAM_CODE,
&GetStream::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetStreams(GetStreams::default()),
GET_STREAMS_CODE,
&GetStreams::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::CreateStream(CreateStream::default()),
CREATE_STREAM_CODE,
&CreateStream::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::DeleteStream(DeleteStream::default()),
DELETE_STREAM_CODE,
&DeleteStream::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::UpdateStream(UpdateStream::default()),
UPDATE_STREAM_CODE,
&UpdateStream::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::PurgeStream(PurgeStream::default()),
PURGE_STREAM_CODE,
&PurgeStream::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetTopic(GetTopic::default()),
GET_TOPIC_CODE,
&GetTopic::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetTopics(GetTopics::default()),
GET_TOPICS_CODE,
&GetTopics::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::CreateTopic(CreateTopic::default()),
CREATE_TOPIC_CODE,
&CreateTopic::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::DeleteTopic(DeleteTopic::default()),
DELETE_TOPIC_CODE,
&DeleteTopic::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::UpdateTopic(UpdateTopic::default()),
UPDATE_TOPIC_CODE,
&UpdateTopic::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::PurgeTopic(PurgeTopic::default()),
PURGE_TOPIC_CODE,
&PurgeTopic::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::CreatePartitions(CreatePartitions::default()),
CREATE_PARTITIONS_CODE,
&CreatePartitions::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::DeletePartitions(DeletePartitions::default()),
DELETE_PARTITIONS_CODE,
&DeletePartitions::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetConsumerGroup(GetConsumerGroup::default()),
GET_CONSUMER_GROUP_CODE,
&GetConsumerGroup::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::GetConsumerGroups(GetConsumerGroups::default()),
GET_CONSUMER_GROUPS_CODE,
&GetConsumerGroups::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::CreateConsumerGroup(CreateConsumerGroup::default()),
CREATE_CONSUMER_GROUP_CODE,
&CreateConsumerGroup::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::DeleteConsumerGroup(DeleteConsumerGroup::default()),
DELETE_CONSUMER_GROUP_CODE,
&DeleteConsumerGroup::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::JoinConsumerGroup(JoinConsumerGroup::default()),
JOIN_CONSUMER_GROUP_CODE,
&JoinConsumerGroup::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::LeaveConsumerGroup(LeaveConsumerGroup::default()),
LEAVE_CONSUMER_GROUP_CODE,
&LeaveConsumerGroup::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&ServerCommand::FlushUnsavedBuffer(FlushUnsavedBuffer::default()),
FLUSH_UNSAVED_BUFFER_CODE,
&FlushUnsavedBuffer::default(),
);
}
fn assert_serialized_as_bytes_and_deserialized_from_bytes(
command: &ServerCommand,
code: u32,
payload: &dyn Command,
) {
assert_serialized_as_bytes(command, code, payload);
assert_deserialized_from_bytes(command, code, payload);
}
fn assert_serialized_as_bytes(
server_command: &ServerCommand,
code: u32,
command: &dyn Command,
) {
let payload = command.to_bytes();
let mut bytes = BytesMut::with_capacity(4 + payload.len());
bytes.put_u32_le(code);
bytes.put_slice(&payload);
assert_eq!(server_command.to_bytes(), bytes);
}
fn assert_deserialized_from_bytes(
command: &ServerCommand,
command_id: u32,
payload: &dyn Command,
) {
let payload = payload.to_bytes();
let mut bytes = BytesMut::with_capacity(4 + payload.len());
bytes.put_u32_le(command_id);
bytes.put_slice(&payload);
let bytes = Bytes::from(bytes);
assert_eq!(&ServerCommand::from_bytes(bytes).unwrap(), command);
}
}