blob: 758aa3e04715471d48edb8bacaeb1611c264c2ea [file] [log] [blame]
use crate::streaming::utils::hash;
use iggy::error::IggyError;
use iggy::locking::IggySharedMut;
use iggy::locking::IggySharedMutFn;
use iggy::models::user_info::UserId;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::net::SocketAddr;
#[derive(Debug, Default)]
pub struct ClientManager {
clients: HashMap<u32, IggySharedMut<Client>>,
}
#[derive(Debug)]
pub struct Client {
pub client_id: u32,
pub user_id: Option<u32>,
pub address: SocketAddr,
pub transport: Transport,
pub consumer_groups: Vec<ConsumerGroup>,
}
#[derive(Debug)]
pub struct ConsumerGroup {
pub stream_id: u32,
pub topic_id: u32,
pub consumer_group_id: u32,
}
#[derive(Debug, Clone, Copy)]
pub enum Transport {
Tcp,
Quic,
}
impl Display for Transport {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Transport::Tcp => write!(f, "TCP"),
Transport::Quic => write!(f, "QUIC"),
}
}
}
impl ClientManager {
pub fn add_client(&mut self, address: &SocketAddr, transport: Transport) -> u32 {
let id = hash::calculate_32(address.to_string().as_bytes());
let client = Client {
client_id: id,
user_id: None,
address: *address,
transport,
consumer_groups: Vec::new(),
};
self.clients
.insert(client.client_id, IggySharedMut::new(client));
id
}
pub async fn set_user_id(&mut self, client_id: u32, user_id: UserId) -> Result<(), IggyError> {
let client = self.clients.get(&client_id);
if client.is_none() {
return Err(IggyError::ClientNotFound(client_id));
}
let mut client = client.unwrap().write().await;
client.user_id = Some(user_id);
Ok(())
}
pub async fn clear_user_id(&mut self, client_id: u32) -> Result<(), IggyError> {
let client = self.clients.get(&client_id);
if client.is_none() {
return Err(IggyError::ClientNotFound(client_id));
}
let mut client = client.unwrap().write().await;
client.user_id = None;
Ok(())
}
pub fn get_client_by_address(
&self,
address: &SocketAddr,
) -> Result<IggySharedMut<Client>, IggyError> {
let id = hash::calculate_32(address.to_string().as_bytes());
self.get_client_by_id(id)
}
pub fn get_client_by_id(&self, client_id: u32) -> Result<IggySharedMut<Client>, IggyError> {
let client = self.clients.get(&client_id);
if client.is_none() {
return Err(IggyError::ClientNotFound(client_id));
}
Ok(client.unwrap().clone())
}
pub fn get_clients(&self) -> Vec<IggySharedMut<Client>> {
self.clients.values().cloned().collect()
}
pub async fn delete_clients_for_user(&mut self, user_id: UserId) -> Result<(), IggyError> {
let mut clients_to_remove = Vec::new();
for client in self.clients.values() {
let client = client.read().await;
if let Some(client_user_id) = client.user_id {
if client_user_id == user_id {
clients_to_remove.push(client.client_id);
}
}
}
for client_id in clients_to_remove {
self.clients.remove(&client_id);
}
Ok(())
}
pub fn delete_client(&mut self, address: &SocketAddr) -> Option<IggySharedMut<Client>> {
let id = hash::calculate_32(address.to_string().as_bytes());
self.clients.remove(&id)
}
pub async fn join_consumer_group(
&self,
client_id: u32,
stream_id: u32,
topic_id: u32,
consumer_group_id: u32,
) -> Result<(), IggyError> {
let client = self.clients.get(&client_id);
if client.is_none() {
return Err(IggyError::ClientNotFound(client_id));
}
let mut client = client.unwrap().write().await;
if client.consumer_groups.iter().any(|consumer_group| {
consumer_group.consumer_group_id == consumer_group_id
&& consumer_group.topic_id == topic_id
&& consumer_group.stream_id == stream_id
}) {
return Ok(());
}
client.consumer_groups.push(ConsumerGroup {
stream_id,
topic_id,
consumer_group_id,
});
Ok(())
}
pub async fn leave_consumer_group(
&self,
client_id: u32,
stream_id: u32,
topic_id: u32,
consumer_group_id: u32,
) -> Result<(), IggyError> {
let client = self.clients.get(&client_id);
if client.is_none() {
return Err(IggyError::ClientNotFound(client_id));
}
let mut client = client.unwrap().write().await;
for (index, consumer_group) in client.consumer_groups.iter().enumerate() {
if consumer_group.stream_id == stream_id
&& consumer_group.topic_id == topic_id
&& consumer_group.consumer_group_id == consumer_group_id
{
client.consumer_groups.remove(index);
return Ok(());
}
}
Ok(())
}
pub async fn delete_consumer_groups_for_stream(&self, stream_id: u32) {
for client in self.clients.values() {
let mut client = client.write().await;
let indexes_to_remove = client
.consumer_groups
.iter()
.enumerate()
.filter_map(|(index, consumer_group)| {
if consumer_group.stream_id == stream_id {
Some(index)
} else {
None
}
})
.collect::<Vec<_>>();
for index in indexes_to_remove {
client.consumer_groups.remove(index);
}
}
}
pub async fn delete_consumer_groups_for_topic(&self, stream_id: u32, topic_id: u32) {
for client in self.clients.values() {
let mut client = client.write().await;
let indexes_to_remove = client
.consumer_groups
.iter()
.enumerate()
.filter_map(|(index, consumer_group)| {
if consumer_group.stream_id == stream_id && consumer_group.topic_id == topic_id
{
Some(index)
} else {
None
}
})
.collect::<Vec<_>>();
for index in indexes_to_remove {
client.consumer_groups.remove(index);
}
}
}
}