blob: 7aac7105de76c55d5cb916ccf2f763351e335e4d [file] [log] [blame]
use crate::streaming::clients::client_manager::{Client, Transport};
use crate::streaming::session::Session;
use crate::streaming::systems::system::System;
use iggy::error::IggyError;
use iggy::identifier::Identifier;
use iggy::locking::IggySharedMut;
use iggy::locking::IggySharedMutFn;
use std::net::SocketAddr;
use tracing::{error, info};
impl System {
pub async fn add_client(&self, address: &SocketAddr, transport: Transport) -> u32 {
let mut client_manager = self.client_manager.write().await;
let client_id = client_manager.add_client(address, transport);
info!("Added {transport} client with ID: {client_id} for IP address: {address}");
self.metrics.increment_clients(1);
client_id
}
pub async fn delete_client(&self, address: &SocketAddr) {
let consumer_groups: Vec<(u32, u32, u32)>;
let client_id;
{
let client_manager = self.client_manager.read().await;
let client = client_manager.get_client_by_address(address);
if client.is_err() {
return;
}
let client = client.unwrap();
let client = client.read().await;
client_id = client.client_id;
consumer_groups = client
.consumer_groups
.iter()
.map(|c| (c.stream_id, c.topic_id, c.group_id))
.collect();
}
for (stream_id, topic_id, consumer_group_id) in consumer_groups.iter() {
if let Err(error) = self
.leave_consumer_group_by_client(
&Identifier::numeric(*stream_id).unwrap(),
&Identifier::numeric(*topic_id).unwrap(),
&Identifier::numeric(*consumer_group_id).unwrap(),
client_id,
)
.await
{
error!(
"Failed to leave consumer group with ID: {} by client with ID: {}. Error: {}",
consumer_group_id, client_id, error
);
}
}
{
let mut client_manager = self.client_manager.write().await;
let client = client_manager.delete_client(address);
if client.is_none() {
return;
}
self.metrics.decrement_clients(1);
let client = client.unwrap();
let client = client.read().await;
info!(
"Deleted {} client with ID: {} for IP address: {}",
client.transport, client.client_id, client.address
);
}
}
pub async fn get_client(
&self,
session: &Session,
client_id: u32,
) -> Result<IggySharedMut<Client>, IggyError> {
self.ensure_authenticated(session)?;
self.permissioner.get_client(session.get_user_id())?;
let client_manager = self.client_manager.read().await;
client_manager.get_client_by_id(client_id)
}
pub async fn get_clients(
&self,
session: &Session,
) -> Result<Vec<IggySharedMut<Client>>, IggyError> {
self.ensure_authenticated(session)?;
self.permissioner.get_clients(session.get_user_id())?;
let client_manager = self.client_manager.read().await;
Ok(client_manager.get_clients())
}
}