blob: aac1c9a0e63dcf8b7271ee3b828b36970155583c [file] [log] [blame]
use crate::http::jwt::json_web_token::GeneratedToken;
use crate::streaming::clients::client_manager::Client;
use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::streams::stream::Stream;
use crate::streaming::topics::consumer_group::ConsumerGroup;
use crate::streaming::topics::topic::Topic;
use crate::streaming::users::user::User;
use iggy::locking::IggySharedMut;
use iggy::locking::IggySharedMutFn;
use iggy::models::client_info::ConsumerGroupInfo;
use iggy::models::consumer_group::{ConsumerGroupDetails, ConsumerGroupMember};
use iggy::models::identity_info::{IdentityInfo, TokenInfo};
use iggy::models::personal_access_token::PersonalAccessTokenInfo;
use iggy::models::stream::StreamDetails;
use iggy::models::topic::TopicDetails;
use iggy::models::user_info::{UserInfo, UserInfoDetails};
use tokio::sync::RwLock;
pub fn map_stream(stream: &Stream) -> StreamDetails {
let topics = map_topics(&stream.get_topics());
let mut stream_details = StreamDetails {
id: stream.stream_id,
created_at: stream.created_at,
name: stream.name.clone(),
topics_count: topics.len() as u32,
size: stream.get_size(),
messages_count: stream.get_messages_count(),
topics,
};
stream_details.topics.sort_by(|a, b| a.id.cmp(&b.id));
stream_details
}
pub fn map_streams(streams: &[&Stream]) -> Vec<iggy::models::stream::Stream> {
let mut streams_data = Vec::with_capacity(streams.len());
for stream in streams {
let stream = iggy::models::stream::Stream {
id: stream.stream_id,
created_at: stream.created_at,
name: stream.name.clone(),
size: stream.get_size(),
topics_count: stream.get_topics().len() as u32,
messages_count: stream.get_messages_count(),
};
streams_data.push(stream);
}
streams_data.sort_by(|a, b| a.id.cmp(&b.id));
streams_data
}
pub fn map_topics(topics: &[&Topic]) -> Vec<iggy::models::topic::Topic> {
let mut topics_data = Vec::with_capacity(topics.len());
for topic in topics {
let topic = iggy::models::topic::Topic {
id: topic.topic_id,
created_at: topic.created_at,
name: topic.name.clone(),
size: topic.get_size(),
partitions_count: topic.get_partitions().len() as u32,
messages_count: topic.get_messages_count(),
message_expiry: topic.message_expiry,
compression_algorithm: topic.compression_algorithm,
max_topic_size: topic.max_topic_size,
replication_factor: topic.replication_factor,
};
topics_data.push(topic);
}
topics_data.sort_by(|a, b| a.id.cmp(&b.id));
topics_data
}
pub async fn map_topic(topic: &Topic) -> TopicDetails {
let mut topic_details = TopicDetails {
id: topic.topic_id,
created_at: topic.created_at,
name: topic.name.clone(),
size: topic.get_size(),
messages_count: topic.get_messages_count(),
partitions_count: topic.get_partitions().len() as u32,
partitions: Vec::new(),
message_expiry: topic.message_expiry,
compression_algorithm: topic.compression_algorithm,
max_topic_size: topic.max_topic_size,
replication_factor: topic.replication_factor,
};
for partition in topic.get_partitions() {
let partition = partition.read().await;
topic_details
.partitions
.push(iggy::models::partition::Partition {
id: partition.partition_id,
created_at: partition.created_at,
segments_count: partition.get_segments().len() as u32,
current_offset: partition.current_offset,
size: partition.get_size_bytes().into(),
messages_count: partition.get_messages_count(),
});
}
topic_details.partitions.sort_by(|a, b| a.id.cmp(&b.id));
topic_details
}
pub fn map_user(user: &User) -> UserInfoDetails {
UserInfoDetails {
id: user.id,
username: user.username.clone(),
created_at: user.created_at,
status: user.status,
permissions: user.permissions.clone(),
}
}
pub fn map_users(users: &[&User]) -> Vec<UserInfo> {
let mut users_data = Vec::with_capacity(users.len());
for user in users {
let user = UserInfo {
id: user.id,
username: user.username.clone(),
created_at: user.created_at,
status: user.status,
};
users_data.push(user);
}
users_data.sort_by(|a, b| a.id.cmp(&b.id));
users_data
}
pub fn map_personal_access_tokens(
personal_access_tokens: &[&PersonalAccessToken],
) -> Vec<PersonalAccessTokenInfo> {
let mut personal_access_tokens_data = Vec::with_capacity(personal_access_tokens.len());
for personal_access_token in personal_access_tokens {
let personal_access_token = PersonalAccessTokenInfo {
name: personal_access_token.name.clone(),
expiry_at: personal_access_token.expiry_at,
};
personal_access_tokens_data.push(personal_access_token);
}
personal_access_tokens_data.sort_by(|a, b| a.name.cmp(&b.name));
personal_access_tokens_data
}
pub fn map_client(client: &Client) -> iggy::models::client_info::ClientInfoDetails {
let client = iggy::models::client_info::ClientInfoDetails {
client_id: client.client_id,
user_id: client.user_id,
transport: client.transport.to_string(),
address: client.address.to_string(),
consumer_groups_count: client.consumer_groups.len() as u32,
consumer_groups: client
.consumer_groups
.iter()
.map(|consumer_group| ConsumerGroupInfo {
stream_id: consumer_group.stream_id,
topic_id: consumer_group.topic_id,
group_id: consumer_group.group_id,
})
.collect(),
};
client
}
pub async fn map_clients(
clients: &[IggySharedMut<Client>],
) -> Vec<iggy::models::client_info::ClientInfo> {
let mut all_clients = Vec::new();
for client in clients {
let client = client.read().await;
let client = iggy::models::client_info::ClientInfo {
client_id: client.client_id,
user_id: client.user_id,
transport: client.transport.to_string(),
address: client.address.to_string(),
consumer_groups_count: client.consumer_groups.len() as u32,
};
all_clients.push(client);
}
all_clients.sort_by(|a, b| a.client_id.cmp(&b.client_id));
all_clients
}
pub async fn map_consumer_groups(
consumer_groups: &[&RwLock<ConsumerGroup>],
) -> Vec<iggy::models::consumer_group::ConsumerGroup> {
let mut groups = Vec::new();
for consumer_group in consumer_groups {
let consumer_group = consumer_group.read().await;
let consumer_group = iggy::models::consumer_group::ConsumerGroup {
id: consumer_group.group_id,
name: consumer_group.name.clone(),
partitions_count: consumer_group.partitions_count,
members_count: consumer_group.get_members().len() as u32,
};
groups.push(consumer_group);
}
groups.sort_by(|a, b| a.id.cmp(&b.id));
groups
}
pub async fn map_consumer_group(consumer_group: &ConsumerGroup) -> ConsumerGroupDetails {
let mut consumer_group_details = ConsumerGroupDetails {
id: consumer_group.group_id,
name: consumer_group.name.clone(),
partitions_count: consumer_group.partitions_count,
members_count: consumer_group.get_members().len() as u32,
members: Vec::new(),
};
let members = consumer_group.get_members();
for member in members {
let member = member.read().await;
let partitions = member.get_partitions();
consumer_group_details.members.push(ConsumerGroupMember {
id: member.id,
partitions_count: partitions.len() as u32,
partitions,
});
}
consumer_group_details
}
pub fn map_generated_access_token_to_identity_info(token: GeneratedToken) -> IdentityInfo {
IdentityInfo {
user_id: token.user_id,
access_token: Some(TokenInfo {
token: token.access_token,
expiry: token.access_token_expiry,
}),
}
}