blob: b035f43c84913c0f5a73a1f82943c6bab56bde03 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::streaming::users::user::User;
use crate::{
shard::{
IggyShard,
transmission::{
event::ShardEvent,
frame::{ConsumerGroupResponseData, StreamResponseData, TopicResponseData},
message::ResolvedTopic,
},
},
state::{
command::EntryCommand,
models::{
CreateConsumerGroupWithId, CreatePersonalAccessTokenWithHash, CreateStreamWithId,
CreateTopicWithId, CreateUserWithId,
},
},
streaming::polling_consumer::ConsumerGroupId,
};
use iggy_common::{
Identifier, IggyError, IggyExpiry, Permissions, PersonalAccessToken, UserStatus,
change_password::ChangePassword, create_consumer_group::CreateConsumerGroup,
create_partitions::CreatePartitions, create_personal_access_token::CreatePersonalAccessToken,
create_stream::CreateStream, create_topic::CreateTopic,
delete_consumer_group::DeleteConsumerGroup, delete_partitions::DeletePartitions,
delete_personal_access_token::DeletePersonalAccessToken, delete_stream::DeleteStream,
delete_topic::DeleteTopic, delete_user::DeleteUser, join_consumer_group::JoinConsumerGroup,
leave_consumer_group::LeaveConsumerGroup, purge_stream::PurgeStream, purge_topic::PurgeTopic,
update_permissions::UpdatePermissions, update_stream::UpdateStream, update_topic::UpdateTopic,
update_user::UpdateUser,
};
use std::rc::Rc;
pub struct DeleteStreamResult {
pub stream_id: usize,
}
pub struct DeleteTopicResult {
pub topic_id: usize,
}
pub struct CreatePartitionsResult {
pub partition_ids: Vec<usize>,
}
pub struct DeletePartitionsResult {
pub partition_ids: Vec<usize>,
}
pub async fn execute_create_stream(
shard: &Rc<IggyShard>,
user_id: u32,
command: CreateStream,
) -> Result<StreamResponseData, IggyError> {
shard.metadata.perm_create_stream(user_id)?;
let stream_id = shard.create_stream(command.name.clone()).await?;
// Capture response data from metadata before state apply
let response_data = shard.metadata.with_metadata(|m| {
let stream = m.streams.get(stream_id).expect("just created");
StreamResponseData {
id: stream_id as u32,
name: stream.name.clone(),
created_at: stream.created_at,
}
});
shard
.state
.apply(
user_id,
&EntryCommand::CreateStream(CreateStreamWithId {
stream_id: stream_id as u32,
command,
}),
)
.await?;
Ok(response_data)
}
pub async fn execute_update_stream(
shard: &Rc<IggyShard>,
user_id: u32,
command: UpdateStream,
) -> Result<(), IggyError> {
let stream = shard.resolve_stream(&command.stream_id)?;
shard.metadata.perm_update_stream(user_id, stream.id())?;
shard.update_stream(stream, command.name.clone())?;
shard
.state
.apply(user_id, &EntryCommand::UpdateStream(command))
.await?;
Ok(())
}
pub async fn execute_delete_stream(
shard: &Rc<IggyShard>,
user_id: u32,
command: DeleteStream,
) -> Result<DeleteStreamResult, IggyError> {
let stream = shard.resolve_stream(&command.stream_id)?;
shard.metadata.perm_delete_stream(user_id, stream.id())?;
// Capture all topic/partition info BEFORE deletion for broadcast
let topics_with_partitions: Vec<(usize, Vec<usize>)> = shard
.metadata
.get_topic_ids(stream.id())
.into_iter()
.map(|topic_id| {
let partition_ids = shard.metadata.get_partition_ids(stream.id(), topic_id);
(topic_id, partition_ids)
})
.collect();
let stream_info = shard.delete_stream(stream).await?;
shard
.state
.apply(user_id, &EntryCommand::DeleteStream(command))
.await?;
// Broadcast DeletedPartitions to all shards for each topic's partitions (best-effort)
for (topic_id, partition_ids) in topics_with_partitions {
if partition_ids.is_empty() {
continue;
}
let event = ShardEvent::DeletedPartitions {
stream_id: Identifier::numeric(stream.id() as u32).unwrap(),
topic_id: Identifier::numeric(topic_id as u32).unwrap(),
partitions_count: partition_ids.len() as u32,
partition_ids,
};
if let Err(e) = shard.broadcast_event_to_all_shards(event).await {
tracing::warn!("Broadcast failed: {e}. Shards will sync on restart.");
}
}
Ok(DeleteStreamResult {
stream_id: stream_info.id,
})
}
pub async fn execute_purge_stream(
shard: &Rc<IggyShard>,
user_id: u32,
command: PurgeStream,
) -> Result<(), IggyError> {
let stream = shard.resolve_stream(&command.stream_id)?;
shard.metadata.perm_purge_stream(user_id, stream.id())?;
shard.purge_stream(stream).await?;
shard
.state
.apply(user_id, &EntryCommand::PurgeStream(command))
.await?;
let event = ShardEvent::PurgedStream {
stream_id: Identifier::numeric(stream.id() as u32).unwrap(),
};
if let Err(e) = shard.broadcast_event_to_all_shards(event).await {
tracing::warn!("Broadcast failed: {e}. Shards will sync on restart.");
}
Ok(())
}
pub async fn execute_create_topic(
shard: &Rc<IggyShard>,
user_id: u32,
command: CreateTopic,
) -> Result<TopicResponseData, IggyError> {
let stream = shard.resolve_stream(&command.stream_id)?;
shard.metadata.perm_create_topic(user_id, stream.id())?;
let topic_id = shard
.create_topic(
stream,
command.name.clone(),
command.message_expiry,
command.compression_algorithm,
command.max_topic_size,
command.replication_factor,
)
.await?;
let resolved_topic = ResolvedTopic {
stream_id: stream.id(),
topic_id,
};
let partition_infos = shard
.create_partitions(resolved_topic, command.partitions_count)
.await?;
let response_data = shard.metadata.with_metadata(|m| {
let topic = m
.streams
.get(stream.id())
.and_then(|s| s.topics.get(topic_id))
.expect("just created");
TopicResponseData {
id: topic_id as u32,
name: topic.name.clone(),
created_at: topic.created_at,
partitions_count: partition_infos.len() as u32,
message_expiry: topic.message_expiry,
compression_algorithm: topic.compression_algorithm,
max_topic_size: topic.max_topic_size,
replication_factor: topic.replication_factor,
}
});
shard
.state
.apply(
user_id,
&EntryCommand::CreateTopic(CreateTopicWithId {
topic_id: topic_id as u32,
command,
}),
)
.await?;
let event = ShardEvent::CreatedPartitions {
stream_id: Identifier::numeric(stream.id() as u32).unwrap(),
topic_id: Identifier::numeric(topic_id as u32).unwrap(),
partitions: partition_infos,
};
if let Err(e) = shard.broadcast_event_to_all_shards(event).await {
tracing::warn!("Broadcast failed: {e}. Shards will sync on restart.");
}
Ok(response_data)
}
pub async fn execute_update_topic(
shard: &Rc<IggyShard>,
user_id: u32,
command: UpdateTopic,
) -> Result<(), IggyError> {
let topic = shard.resolve_topic(&command.stream_id, &command.topic_id)?;
shard
.metadata
.perm_update_topic(user_id, topic.stream_id, topic.topic_id)?;
shard.update_topic(
topic,
command.name.clone(),
command.message_expiry,
command.compression_algorithm,
command.max_topic_size,
command.replication_factor,
)?;
shard
.state
.apply(user_id, &EntryCommand::UpdateTopic(command))
.await?;
Ok(())
}
pub async fn execute_delete_topic(
shard: &Rc<IggyShard>,
user_id: u32,
command: DeleteTopic,
) -> Result<DeleteTopicResult, IggyError> {
let topic = shard.resolve_topic(&command.stream_id, &command.topic_id)?;
shard
.metadata
.perm_delete_topic(user_id, topic.stream_id, topic.topic_id)?;
// Capture partition_ids BEFORE deletion for broadcast
let partition_ids = shard
.metadata
.get_partition_ids(topic.stream_id, topic.topic_id);
let topic_info = shard.delete_topic(topic).await?;
shard
.state
.apply(user_id, &EntryCommand::DeleteTopic(command))
.await?;
// Broadcast to all shards to clean up their local_partitions entries (best-effort)
let event = ShardEvent::DeletedPartitions {
stream_id: Identifier::numeric(topic.stream_id as u32).unwrap(),
topic_id: Identifier::numeric(topic.topic_id as u32).unwrap(),
partitions_count: partition_ids.len() as u32,
partition_ids,
};
if let Err(e) = shard.broadcast_event_to_all_shards(event).await {
tracing::warn!("Broadcast failed: {e}. Shards will sync on restart.");
}
Ok(DeleteTopicResult {
topic_id: topic_info.id,
})
}
pub async fn execute_purge_topic(
shard: &Rc<IggyShard>,
user_id: u32,
command: PurgeTopic,
) -> Result<(), IggyError> {
let topic = shard.resolve_topic(&command.stream_id, &command.topic_id)?;
shard
.metadata
.perm_purge_topic(user_id, topic.stream_id, topic.topic_id)?;
shard.purge_topic(topic).await?;
shard
.state
.apply(user_id, &EntryCommand::PurgeTopic(command))
.await?;
let event = ShardEvent::PurgedTopic {
stream_id: Identifier::numeric(topic.stream_id as u32).unwrap(),
topic_id: Identifier::numeric(topic.topic_id as u32).unwrap(),
};
if let Err(e) = shard.broadcast_event_to_all_shards(event).await {
tracing::warn!("Broadcast failed: {e}. Shards will sync on restart.");
}
Ok(())
}
pub async fn execute_create_partitions(
shard: &Rc<IggyShard>,
user_id: u32,
command: CreatePartitions,
) -> Result<CreatePartitionsResult, IggyError> {
let topic = shard.resolve_topic(&command.stream_id, &command.topic_id)?;
shard
.metadata
.perm_create_partitions(user_id, topic.stream_id, topic.topic_id)?;
let partition_infos = shard
.create_partitions(topic, command.partitions_count)
.await?;
let partition_ids = partition_infos.iter().map(|p| p.id).collect::<Vec<_>>();
let total_partition_count = shard
.metadata
.partitions_count(topic.stream_id, topic.topic_id) as u32;
shard.writer().rebalance_consumer_groups_for_topic(
topic.stream_id,
topic.topic_id,
total_partition_count,
);
shard
.state
.apply(user_id, &EntryCommand::CreatePartitions(command))
.await?;
let event = ShardEvent::CreatedPartitions {
stream_id: Identifier::numeric(topic.stream_id as u32).unwrap(),
topic_id: Identifier::numeric(topic.topic_id as u32).unwrap(),
partitions: partition_infos,
};
if let Err(e) = shard.broadcast_event_to_all_shards(event).await {
tracing::warn!("Broadcast failed: {e}. Shards will sync on restart.");
}
Ok(CreatePartitionsResult { partition_ids })
}
pub async fn execute_delete_partitions(
shard: &Rc<IggyShard>,
user_id: u32,
command: DeletePartitions,
) -> Result<DeletePartitionsResult, IggyError> {
let topic = shard.resolve_topic(&command.stream_id, &command.topic_id)?;
shard
.metadata
.perm_delete_partitions(user_id, topic.stream_id, topic.topic_id)?;
let deleted_partition_ids = shard
.delete_partitions(topic, command.partitions_count)
.await?;
let remaining_partition_count = shard
.metadata
.partitions_count(topic.stream_id, topic.topic_id)
as u32;
shard.writer().rebalance_consumer_groups_for_topic(
topic.stream_id,
topic.topic_id,
remaining_partition_count,
);
shard
.state
.apply(user_id, &EntryCommand::DeletePartitions(command))
.await?;
let event = ShardEvent::DeletedPartitions {
stream_id: Identifier::numeric(topic.stream_id as u32).unwrap(),
topic_id: Identifier::numeric(topic.topic_id as u32).unwrap(),
partitions_count: deleted_partition_ids.len() as u32,
partition_ids: deleted_partition_ids.clone(),
};
if let Err(e) = shard.broadcast_event_to_all_shards(event).await {
tracing::warn!("Broadcast failed: {e}. Shards will sync on restart.");
}
Ok(DeletePartitionsResult {
partition_ids: deleted_partition_ids,
})
}
pub async fn execute_create_consumer_group(
shard: &Rc<IggyShard>,
user_id: u32,
command: CreateConsumerGroup,
) -> Result<ConsumerGroupResponseData, IggyError> {
let topic = shard.resolve_topic(&command.stream_id, &command.topic_id)?;
shard
.metadata
.perm_create_consumer_group(user_id, topic.stream_id, topic.topic_id)?;
let group_id = shard.create_consumer_group(topic, command.name.clone())?;
let response_data = shard
.metadata
.get_consumer_group(topic.stream_id, topic.topic_id, group_id)
.map(|cg| ConsumerGroupResponseData {
id: group_id as u32,
name: cg.name.clone(),
partitions_count: cg.partitions.len() as u32,
})
.expect("just created");
shard
.state
.apply(
user_id,
&EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId {
group_id: group_id as u32,
command,
}),
)
.await?;
Ok(response_data)
}
pub async fn execute_delete_consumer_group(
shard: &Rc<IggyShard>,
user_id: u32,
command: DeleteConsumerGroup,
) -> Result<(), IggyError> {
let group =
shard.resolve_consumer_group(&command.stream_id, &command.topic_id, &command.group_id)?;
shard
.metadata
.perm_delete_consumer_group(user_id, group.stream_id, group.topic_id)?;
let deleted = shard.delete_consumer_group(group)?;
let cg_id = ConsumerGroupId(deleted.group_id);
shard
.delete_consumer_group_offsets(
cg_id,
group.stream_id,
group.topic_id,
&deleted.partition_ids,
)
.await?;
shard
.state
.apply(user_id, &EntryCommand::DeleteConsumerGroup(command))
.await?;
Ok(())
}
pub fn execute_join_consumer_group(
shard: &Rc<IggyShard>,
user_id: u32,
client_id: u32,
command: JoinConsumerGroup,
) -> Result<(), IggyError> {
let group =
shard.resolve_consumer_group(&command.stream_id, &command.topic_id, &command.group_id)?;
shard
.metadata
.perm_join_consumer_group(user_id, group.stream_id, group.topic_id)?;
shard.join_consumer_group(client_id, group)?;
Ok(())
}
pub fn execute_leave_consumer_group(
shard: &Rc<IggyShard>,
user_id: u32,
client_id: u32,
command: LeaveConsumerGroup,
) -> Result<(), IggyError> {
let group =
shard.resolve_consumer_group(&command.stream_id, &command.topic_id, &command.group_id)?;
shard
.metadata
.perm_leave_consumer_group(user_id, group.stream_id, group.topic_id)?;
shard.leave_consumer_group(client_id, group)?;
Ok(())
}
pub async fn execute_create_user(
shard: &Rc<IggyShard>,
user_id: u32,
username: String,
password: String,
status: UserStatus,
permissions: Option<Permissions>,
) -> Result<User, IggyError> {
shard.metadata.perm_create_user(user_id)?;
let user = shard.create_user(&username, &password, status, permissions.clone())?;
let command = iggy_common::create_user::CreateUser {
username,
password,
status,
permissions,
};
shard
.state
.apply(
user_id,
&EntryCommand::CreateUser(CreateUserWithId {
user_id: user.id,
command,
}),
)
.await?;
Ok(user)
}
pub async fn execute_delete_user(
shard: &Rc<IggyShard>,
user_id: u32,
target_user_id: Identifier,
) -> Result<User, IggyError> {
shard.metadata.perm_delete_user(user_id)?;
let user = shard.delete_user(&target_user_id)?;
let command = DeleteUser {
user_id: target_user_id,
};
shard
.state
.apply(user_id, &EntryCommand::DeleteUser(command))
.await?;
Ok(user)
}
pub async fn execute_update_user(
shard: &Rc<IggyShard>,
user_id: u32,
target_user_id: Identifier,
username: Option<String>,
status: Option<UserStatus>,
) -> Result<User, IggyError> {
shard.metadata.perm_update_user(user_id)?;
let user = shard.update_user(&target_user_id, username.clone(), status)?;
let command = UpdateUser {
user_id: target_user_id,
username,
status,
};
shard
.state
.apply(user_id, &EntryCommand::UpdateUser(command))
.await?;
Ok(user)
}
pub async fn execute_change_password(
shard: &Rc<IggyShard>,
user_id: u32,
target_user_id: Identifier,
current_password: String,
new_password: String,
) -> Result<(), IggyError> {
let target_user = shard.get_user(&target_user_id)?;
if target_user.id != user_id {
shard.metadata.perm_change_password(user_id)?;
}
shard.change_password(&target_user_id, &current_password, &new_password)?;
let command = ChangePassword {
user_id: target_user_id,
current_password,
new_password,
};
shard
.state
.apply(user_id, &EntryCommand::ChangePassword(command))
.await?;
Ok(())
}
pub async fn execute_update_permissions(
shard: &Rc<IggyShard>,
user_id: u32,
target_user_id: Identifier,
permissions: Option<Permissions>,
) -> Result<(), IggyError> {
shard.metadata.perm_update_permissions(user_id)?;
let target_user = shard.get_user(&target_user_id)?;
if target_user.is_root() {
return Err(IggyError::CannotChangePermissions(target_user.id));
}
shard.update_permissions(&target_user_id, permissions.clone())?;
let command = UpdatePermissions {
user_id: target_user_id,
permissions,
};
shard
.state
.apply(user_id, &EntryCommand::UpdatePermissions(command))
.await?;
Ok(())
}
pub async fn execute_create_personal_access_token(
shard: &Rc<IggyShard>,
user_id: u32,
name: String,
expiry: IggyExpiry,
) -> Result<(PersonalAccessToken, String), IggyError> {
let (personal_access_token, token) =
shard.create_personal_access_token(user_id, &name, expiry)?;
let command = CreatePersonalAccessToken { name, expiry };
shard
.state
.apply(
user_id,
&EntryCommand::CreatePersonalAccessToken(CreatePersonalAccessTokenWithHash {
hash: personal_access_token.token.to_string(),
command,
}),
)
.await?;
Ok((personal_access_token, token))
}
pub async fn execute_delete_personal_access_token(
shard: &Rc<IggyShard>,
user_id: u32,
name: String,
) -> Result<(), IggyError> {
shard.delete_personal_access_token(user_id, &name)?;
let command = DeletePersonalAccessToken { name };
shard
.state
.apply(user_id, &EntryCommand::DeletePersonalAccessToken(command))
.await?;
Ok(())
}