blob: d9958b3f70086c8968309c658be14bd0280e174f [file] [log] [blame]
use crate::streaming::session::Session;
use crate::streaming::systems::system::System;
use crate::streaming::topics::consumer_group::ConsumerGroup;
use iggy::error::IggyError;
use iggy::identifier::Identifier;
use iggy::locking::IggySharedMutFn;
use tokio::sync::RwLock;
impl System {
pub fn get_consumer_group(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
self.ensure_authenticated(session)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.get_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
)?;
topic.get_consumer_group(group_id)
}
pub fn get_consumer_groups(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<Vec<&RwLock<ConsumerGroup>>, IggyError> {
self.ensure_authenticated(session)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.get_consumer_groups(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
)?;
Ok(topic.get_consumer_groups())
}
pub async fn create_consumer_group(
&mut self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: Option<u32>,
name: &str,
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
self.ensure_authenticated(session)?;
{
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.create_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
)?;
}
let topic = self.get_stream_mut(stream_id)?.get_topic_mut(topic_id)?;
topic.create_consumer_group(group_id, name).await
}
pub async fn delete_consumer_group(
&mut self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_group_id: &Identifier,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let stream_id_value;
let topic_id_value;
{
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.delete_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
)?;
stream_id_value = topic.stream_id;
topic_id_value = topic.topic_id;
}
let consumer_group;
{
let stream = self.get_stream_mut(stream_id)?;
let topic = stream.get_topic_mut(topic_id)?;
consumer_group = topic.delete_consumer_group(consumer_group_id).await?;
}
let client_manager = self.client_manager.read().await;
let consumer_group = consumer_group.read().await;
for member in consumer_group.get_members() {
let member = member.read().await;
client_manager
.leave_consumer_group(
member.id,
stream_id_value,
topic_id_value,
consumer_group.group_id,
)
.await?;
}
Ok(())
}
pub async fn join_consumer_group(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_group_id: &Identifier,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let stream_id_value;
let topic_id_value;
{
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.join_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
)?;
stream_id_value = topic.stream_id;
topic_id_value = topic.topic_id;
}
let group_id;
{
let topic = self.find_topic(session, stream_id, topic_id)?;
{
let consumer_group = topic.get_consumer_group(consumer_group_id)?;
let consumer_group = consumer_group.read().await;
group_id = consumer_group.group_id;
}
topic
.join_consumer_group(consumer_group_id, session.client_id)
.await?;
}
let client_manager = self.client_manager.read().await;
client_manager
.join_consumer_group(session.client_id, stream_id_value, topic_id_value, group_id)
.await?;
Ok(())
}
pub async fn leave_consumer_group(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_group_id: &Identifier,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
{
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.leave_consumer_group(
session.get_user_id(),
topic.stream_id,
topic.topic_id,
)?;
}
self.leave_consumer_group_by_client(
stream_id,
topic_id,
consumer_group_id,
session.client_id,
)
.await
}
pub async fn leave_consumer_group_by_client(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_group_id: &Identifier,
client_id: u32,
) -> Result<(), IggyError> {
let stream_id_value;
let topic_id_value;
let group_id;
{
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
{
let consumer_group = topic.get_consumer_group(consumer_group_id)?;
let consumer_group = consumer_group.read().await;
group_id = consumer_group.group_id;
}
stream_id_value = stream.stream_id;
topic_id_value = topic.topic_id;
topic
.leave_consumer_group(consumer_group_id, client_id)
.await?;
}
let client_manager = self.client_manager.read().await;
client_manager
.leave_consumer_group(client_id, stream_id_value, topic_id_value, group_id)
.await
}
}