blob: b47b7b01161d457639f3fd9f64a32dc039eda9bb [file] [log] [blame]
use crate::streaming::session::Session;
use crate::streaming::systems::system::System;
use crate::streaming::systems::COMPONENT;
use crate::streaming::topics::consumer_group::ConsumerGroup;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::identifier::Identifier;
use iggy::locking::IggySharedMutFn;
use tokio::sync::RwLock;
impl System {
pub fn get_consumer_group(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<Option<&RwLock<ConsumerGroup>>, IggyError> {
self.ensure_authenticated(session)?;
let Some(topic) = self.try_find_topic(session, stream_id, topic_id)? else {
return Ok(None);
};
self.permissioner
.get_consumer_group(session.get_user_id(), topic.stream_id, topic.topic_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - permission denied to get consumer group with ID: {group_id} for user with ID: {} in topic with ID: {topic_id} and stream with ID: {stream_id}",
session.get_user_id(),
)
})?;
topic.try_get_consumer_group(group_id)
}
pub fn get_consumer_groups(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<Vec<&RwLock<ConsumerGroup>>, IggyError> {
self.ensure_authenticated(session)?;
let topic = self.find_topic(session, stream_id, topic_id)
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?;
self.permissioner
.get_consumer_groups(session.get_user_id(), topic.stream_id, topic.topic_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - permission denied to get consumer groups in topic with ID: {topic_id} and stream with ID: {stream_id} for user with ID: {}",
session.get_user_id(),
)
})?;
Ok(topic.get_consumer_groups())
}
pub async fn create_consumer_group(
&mut self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: Option<u32>,
name: &str,
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
self.ensure_authenticated(session)?;
{
let topic = self.find_topic(session, stream_id, topic_id)
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
self.permissioner.create_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
).with_error_context(|error| format!("{COMPONENT} (error: {error}) - permission denied to create consumer group for user {} on stream_id: {}, topic_id: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
}
let topic = self.get_stream_mut(stream_id)?
.get_topic_mut(topic_id)
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
topic
.create_consumer_group(group_id, name)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to create consumer group with name: {name}")
})
}
pub async fn delete_consumer_group(
&mut self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_group_id: &Identifier,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let stream_id_value;
let topic_id_value;
{
let topic = self.find_topic(session, stream_id, topic_id)
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
self.permissioner.delete_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
).with_error_context(|error| format!("{COMPONENT} (error: {error}) - permission denied to delete consumer group for user {} on stream_id: {}, topic_id: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
stream_id_value = topic.stream_id;
topic_id_value = topic.topic_id;
}
let consumer_group;
{
let stream = self.get_stream_mut(stream_id).with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get mutable reference to stream with id: {stream_id}"
)
})?;
let topic = stream.get_topic_mut(topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
consumer_group = topic.delete_consumer_group(consumer_group_id)
.await
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to delete consumer group with ID: {consumer_group_id}"))?
}
let client_manager = self.client_manager.read().await;
let consumer_group = consumer_group.read().await;
for member in consumer_group.get_members() {
let member = member.read().await;
client_manager
.leave_consumer_group(
member.id,
stream_id_value,
topic_id_value,
consumer_group.group_id,
)
.await
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to make client leave consumer group for client_id: {}, group_id: {}", member.id, consumer_group.group_id))?;
}
Ok(())
}
pub async fn join_consumer_group(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_group_id: &Identifier,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let stream_id_value;
let topic_id_value;
{
let topic = self
.find_topic(session, stream_id, topic_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}",
)
})?;
self.permissioner.join_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
).with_error_context(|error| format!("{COMPONENT} (error: {error}) - permission denied to join consumer group for user {} on stream_id: {}, topic_id: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
stream_id_value = topic.stream_id;
topic_id_value = topic.topic_id;
}
let group_id;
{
let topic = self.find_topic(session, stream_id, topic_id)?;
{
let consumer_group = topic
.get_consumer_group(consumer_group_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - consumer group not found for group_id: {:?}",
consumer_group_id
)
})?;
let consumer_group = consumer_group.read().await;
group_id = consumer_group.group_id;
}
topic
.join_consumer_group(consumer_group_id, session.client_id)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to join consumer group for group_id: {}",
group_id
)
})?;
}
let client_manager = self.client_manager.read().await;
client_manager
.join_consumer_group(session.client_id, stream_id_value, topic_id_value, group_id)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to make client join consumer group for client_id: {}",
session.client_id
)
})?;
Ok(())
}
pub async fn leave_consumer_group(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_group_id: &Identifier,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
{
let topic = self
.find_topic(session, stream_id, topic_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - topic not found for stream_id: {:?}, topic_id: {:?}",
stream_id, topic_id
)
})?;
self.permissioner.leave_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
).with_error_context(|error| format!("{COMPONENT} (error: {error}) - permission denied to leave consumer group for user {} on stream_id: {}, topic_id: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
}
self.leave_consumer_group_by_client(
stream_id,
topic_id,
consumer_group_id,
session.client_id,
)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to leave consumer group for client_id: {}",
session.client_id
)
})
}
pub async fn leave_consumer_group_by_client(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_group_id: &Identifier,
client_id: u32,
) -> Result<(), IggyError> {
let stream_id_value;
let topic_id_value;
let group_id;
{
let stream = self.get_stream(stream_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}")
})?;
let topic = stream.get_topic(topic_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}",
)
})?;
{
let consumer_group = topic
.get_consumer_group(consumer_group_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - consumer group not found for group_id: {consumer_group_id}",
)
})?;
let consumer_group = consumer_group.read().await;
group_id = consumer_group.group_id;
}
stream_id_value = stream.stream_id;
topic_id_value = topic.topic_id;
topic
.leave_consumer_group(consumer_group_id, client_id)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed leave consumer group, client ID {client_id}",)
})?;
}
let client_manager = self.client_manager.read().await;
client_manager
.leave_consumer_group(client_id, stream_id_value, topic_id_value, group_id)
.await
}
}