blob: a9d607911f724e8e8f7e422b79783592a649def2 [file] [log] [blame]
use crate::bytes_serializable::BytesSerializable;
use crate::error::IggyError;
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
use crate::models::partition::Partition;
use crate::models::permissions::Permissions;
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
use crate::models::polled_messages::{MessageState, PolledMessage, PolledMessages};
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
use crate::models::user_info::{UserInfo, UserInfoDetails};
use crate::models::user_status::UserStatus;
use crate::utils::byte_size::IggyByteSize;
use bytes::Bytes;
use std::collections::HashMap;
use std::str::from_utf8;
const EMPTY_MESSAGES: Vec<PolledMessage> = vec![];
const EMPTY_TOPICS: Vec<Topic> = vec![];
const EMPTY_STREAMS: Vec<Stream> = vec![];
const EMPTY_CLIENTS: Vec<ClientInfo> = vec![];
const EMPTY_USERS: Vec<UserInfo> = vec![];
const EMPTY_PERSONAL_ACCESS_TOKENS: Vec<PersonalAccessTokenInfo> = vec![];
const EMPTY_CONSUMER_GROUPS: Vec<ConsumerGroup> = vec![];
pub fn map_stats(payload: &[u8]) -> Result<Stats, IggyError> {
let process_id = u32::from_le_bytes(payload[..4].try_into()?);
let cpu_usage = f32::from_le_bytes(payload[4..8].try_into()?);
let memory_usage = u64::from_le_bytes(payload[8..16].try_into()?).into();
let total_memory = u64::from_le_bytes(payload[16..24].try_into()?).into();
let available_memory = u64::from_le_bytes(payload[24..32].try_into()?).into();
let run_time = u64::from_le_bytes(payload[32..40].try_into()?);
let start_time = u64::from_le_bytes(payload[40..48].try_into()?);
let read_bytes = u64::from_le_bytes(payload[48..56].try_into()?).into();
let written_bytes = u64::from_le_bytes(payload[56..64].try_into()?).into();
let total_size_bytes = u64::from_le_bytes(payload[64..72].try_into()?).into();
let streams_count = u32::from_le_bytes(payload[72..76].try_into()?);
let topics_count = u32::from_le_bytes(payload[76..80].try_into()?);
let partitions_count = u32::from_le_bytes(payload[80..84].try_into()?);
let segments_count = u32::from_le_bytes(payload[84..88].try_into()?);
let messages_count = u64::from_le_bytes(payload[88..96].try_into()?);
let clients_count = u32::from_le_bytes(payload[96..100].try_into()?);
let consumer_groups_count = u32::from_le_bytes(payload[100..104].try_into()?);
let mut current_position = 104;
let hostname_length =
u32::from_le_bytes(payload[current_position..current_position + 4].try_into()?) as usize;
let hostname =
from_utf8(&payload[current_position + 4..current_position + 4 + hostname_length])?
.to_string();
current_position += 4 + hostname_length;
let os_name_length =
u32::from_le_bytes(payload[current_position..current_position + 4].try_into()?) as usize;
let os_name = from_utf8(&payload[current_position + 4..current_position + 4 + os_name_length])?
.to_string();
current_position += 4 + os_name_length;
let os_version_length =
u32::from_le_bytes(payload[current_position..current_position + 4].try_into()?) as usize;
let os_version =
from_utf8(&payload[current_position + 4..current_position + 4 + os_version_length])?
.to_string();
current_position += 4 + os_version_length;
let kernel_version_length =
u32::from_le_bytes(payload[current_position..current_position + 4].try_into()?) as usize;
let kernel_version =
from_utf8(&payload[current_position + 4..current_position + 4 + kernel_version_length])?
.to_string();
Ok(Stats {
process_id,
cpu_usage,
memory_usage,
total_memory,
available_memory,
run_time,
start_time,
read_bytes,
written_bytes,
messages_size_bytes: total_size_bytes,
streams_count,
topics_count,
partitions_count,
segments_count,
messages_count,
clients_count,
consumer_groups_count,
hostname,
os_name,
os_version,
kernel_version,
})
}
pub fn map_consumer_offset(payload: &[u8]) -> Result<ConsumerOffsetInfo, IggyError> {
let partition_id = u32::from_le_bytes(payload[..4].try_into()?);
let current_offset = u64::from_le_bytes(payload[4..12].try_into()?);
let stored_offset = u64::from_le_bytes(payload[12..20].try_into()?);
Ok(ConsumerOffsetInfo {
partition_id,
current_offset,
stored_offset,
})
}
pub fn map_user(payload: &[u8]) -> Result<UserInfoDetails, IggyError> {
let (user, position) = map_to_user_info(payload, 0)?;
let has_permissions = payload[position];
let permissions = if has_permissions == 1 {
let permissions_length =
u32::from_le_bytes(payload[position + 1..position + 5].try_into()?) as usize;
let permissions = &payload[position + 5..position + 5 + permissions_length];
Some(Permissions::from_bytes(permissions)?)
} else {
None
};
let user = UserInfoDetails {
id: user.id,
created_at: user.created_at,
status: user.status,
username: user.username,
permissions,
};
Ok(user)
}
pub fn map_users(payload: &[u8]) -> Result<Vec<UserInfo>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_USERS);
}
let mut users = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (user, read_bytes) = map_to_user_info(payload, position)?;
users.push(user);
position += read_bytes;
}
users.sort_by(|x, y| x.id.cmp(&y.id));
Ok(users)
}
pub fn map_personal_access_tokens(
payload: &[u8],
) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_PERSONAL_ACCESS_TOKENS);
}
let mut personal_access_tokens = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (personal_access_token, read_bytes) = map_to_pat_info(payload, position)?;
personal_access_tokens.push(personal_access_token);
position += read_bytes;
}
personal_access_tokens.sort_by(|x, y| x.name.cmp(&y.name));
Ok(personal_access_tokens)
}
pub fn map_identity_info(payload: &[u8]) -> Result<IdentityInfo, IggyError> {
let user_id = u32::from_le_bytes(payload[..4].try_into()?);
Ok(IdentityInfo {
user_id,
tokens: None,
})
}
pub fn map_raw_pat(payload: &[u8]) -> Result<RawPersonalAccessToken, IggyError> {
let token_length = payload[0];
let token = from_utf8(&payload[1..1 + token_length as usize])?.to_string();
Ok(RawPersonalAccessToken { token })
}
pub fn map_client(payload: &[u8]) -> Result<ClientInfoDetails, IggyError> {
let (client, mut position) = map_to_client_info(payload, 0)?;
let mut consumer_groups = Vec::new();
let length = payload.len();
while position < length {
for _ in 0..client.consumer_groups_count {
let stream_id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let topic_id = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let consumer_group_id =
u32::from_le_bytes(payload[position + 8..position + 12].try_into()?);
let consumer_group = ConsumerGroupInfo {
stream_id,
topic_id,
consumer_group_id,
};
consumer_groups.push(consumer_group);
position += 12;
}
}
consumer_groups.sort_by(|x, y| x.consumer_group_id.cmp(&y.consumer_group_id));
let client = ClientInfoDetails {
client_id: client.client_id,
user_id: client.user_id,
address: client.address,
transport: client.transport,
consumer_groups_count: client.consumer_groups_count,
consumer_groups,
};
Ok(client)
}
pub fn map_clients(payload: &[u8]) -> Result<Vec<ClientInfo>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_CLIENTS);
}
let mut clients = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (client, read_bytes) = map_to_client_info(payload, position)?;
clients.push(client);
position += read_bytes;
}
clients.sort_by(|x, y| x.client_id.cmp(&y.client_id));
Ok(clients)
}
pub fn map_polled_messages(payload: &[u8]) -> Result<PolledMessages, IggyError> {
if payload.is_empty() {
return Ok(PolledMessages {
messages: EMPTY_MESSAGES,
partition_id: 0,
current_offset: 0,
});
}
let length = payload.len();
let partition_id = u32::from_le_bytes(payload[..4].try_into()?);
let current_offset = u64::from_le_bytes(payload[4..12].try_into()?);
// Currently ignored
let _messages_count = u32::from_le_bytes(payload[12..16].try_into()?);
let mut position = 16;
let mut messages = Vec::new();
while position < length {
let offset = u64::from_le_bytes(payload[position..position + 8].try_into()?);
let state = MessageState::from_code(payload[position + 8])?;
let timestamp = u64::from_le_bytes(payload[position + 9..position + 17].try_into()?);
let id = u128::from_le_bytes(payload[position + 17..position + 33].try_into()?);
let checksum = u32::from_le_bytes(payload[position + 33..position + 37].try_into()?);
let headers_length = u32::from_le_bytes(payload[position + 37..position + 41].try_into()?);
let headers = if headers_length > 0 {
let headers_payload = &payload[position + 41..position + 41 + headers_length as usize];
Some(HashMap::from_bytes(headers_payload)?)
} else {
None
};
position += headers_length as usize;
let message_length = u32::from_le_bytes(payload[position + 41..position + 45].try_into()?);
let payload_range = position + 45..position + 45 + message_length as usize;
if payload_range.start > length || payload_range.end > length {
break;
}
let payload = payload[payload_range].to_vec();
let total_size = 45 + message_length as usize;
position += total_size;
messages.push(PolledMessage {
offset,
timestamp,
state,
checksum,
id,
headers,
length: message_length,
payload: Bytes::from(payload),
});
if position + 45 >= length {
break;
}
}
messages.sort_by(|x, y| x.offset.cmp(&y.offset));
Ok(PolledMessages {
partition_id,
current_offset,
messages,
})
}
pub fn map_streams(payload: &[u8]) -> Result<Vec<Stream>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_STREAMS);
}
let mut streams = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (stream, read_bytes) = map_to_stream(payload, position)?;
streams.push(stream);
position += read_bytes;
}
streams.sort_by(|x, y| x.id.cmp(&y.id));
Ok(streams)
}
pub fn map_stream(payload: &[u8]) -> Result<StreamDetails, IggyError> {
let (stream, mut position) = map_to_stream(payload, 0)?;
let mut topics = Vec::new();
let length = payload.len();
while position < length {
let (topic, read_bytes) = map_to_topic(payload, position)?;
topics.push(topic);
position += read_bytes;
}
topics.sort_by(|x, y| x.id.cmp(&y.id));
let stream = StreamDetails {
id: stream.id,
created_at: stream.created_at,
topics_count: stream.topics_count,
size_bytes: stream.size_bytes,
messages_count: stream.messages_count,
name: stream.name,
topics,
};
Ok(stream)
}
fn map_to_stream(payload: &[u8], position: usize) -> Result<(Stream, usize), IggyError> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let created_at = u64::from_le_bytes(payload[position + 4..position + 12].try_into()?);
let topics_count = u32::from_le_bytes(payload[position + 12..position + 16].try_into()?);
let size_bytes = u64::from_le_bytes(payload[position + 16..position + 24].try_into()?).into();
let messages_count = u64::from_le_bytes(payload[position + 24..position + 32].try_into()?);
let name_length = payload[position + 32];
let name =
from_utf8(&payload[position + 33..position + 33 + name_length as usize])?.to_string();
let read_bytes = 4 + 8 + 4 + 8 + 8 + 1 + name_length as usize;
Ok((
Stream {
id,
created_at,
name,
size_bytes,
messages_count,
topics_count,
},
read_bytes,
))
}
pub fn map_topics(payload: &[u8]) -> Result<Vec<Topic>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_TOPICS);
}
let mut topics = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (topic, read_bytes) = map_to_topic(payload, position)?;
topics.push(topic);
position += read_bytes;
}
topics.sort_by(|x, y| x.id.cmp(&y.id));
Ok(topics)
}
pub fn map_topic(payload: &[u8]) -> Result<TopicDetails, IggyError> {
let (topic, mut position) = map_to_topic(payload, 0)?;
let mut partitions = Vec::new();
let length = payload.len();
while position < length {
let (partition, read_bytes) = map_to_partition(payload, position)?;
partitions.push(partition);
position += read_bytes;
}
partitions.sort_by(|x, y| x.id.cmp(&y.id));
let topic = TopicDetails {
id: topic.id,
created_at: topic.created_at,
name: topic.name,
size: topic.size,
messages_count: topic.messages_count,
message_expiry: topic.message_expiry,
max_topic_size: topic.max_topic_size,
replication_factor: topic.replication_factor,
#[allow(clippy::cast_possible_truncation)]
partitions_count: partitions.len() as u32,
partitions,
};
Ok(topic)
}
fn map_to_topic(payload: &[u8], position: usize) -> Result<(Topic, usize), IggyError> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let created_at = u64::from_le_bytes(payload[position + 4..position + 12].try_into()?);
let partitions_count = u32::from_le_bytes(payload[position + 12..position + 16].try_into()?);
let message_expiry = match u32::from_le_bytes(payload[position + 16..position + 20].try_into()?)
{
0 => None,
message_expiry => Some(message_expiry),
};
let max_topic_size = match u64::from_le_bytes(payload[position + 20..position + 28].try_into()?)
{
0 => None,
max_topic_size => Some(IggyByteSize::from(max_topic_size)),
};
let replication_factor = payload[position + 28];
let size_bytes = IggyByteSize::from(u64::from_le_bytes(
payload[position + 29..position + 37].try_into()?,
));
let messages_count = u64::from_le_bytes(payload[position + 37..position + 45].try_into()?);
let name_length = payload[position + 45];
let name =
from_utf8(&payload[position + 46..position + 46 + name_length as usize])?.to_string();
let read_bytes = 4 + 8 + 4 + 4 + 8 + 8 + 8 + 1 + 1 + name_length as usize;
Ok((
Topic {
id,
created_at,
name,
partitions_count,
size: size_bytes,
messages_count,
message_expiry,
max_topic_size,
replication_factor,
},
read_bytes,
))
}
fn map_to_partition(payload: &[u8], position: usize) -> Result<(Partition, usize), IggyError> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let created_at = u64::from_le_bytes(payload[position + 4..position + 12].try_into()?);
let segments_count = u32::from_le_bytes(payload[position + 12..position + 16].try_into()?);
let current_offset = u64::from_le_bytes(payload[position + 16..position + 24].try_into()?);
let size_bytes = u64::from_le_bytes(payload[position + 24..position + 32].try_into()?).into();
let messages_count = u64::from_le_bytes(payload[position + 32..position + 40].try_into()?);
let read_bytes = 4 + 8 + 4 + 8 + 8 + 8;
Ok((
Partition {
id,
created_at,
segments_count,
current_offset,
size_bytes,
messages_count,
},
read_bytes,
))
}
pub fn map_consumer_groups(payload: &[u8]) -> Result<Vec<ConsumerGroup>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_CONSUMER_GROUPS);
}
let mut consumer_groups = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (consumer_group, read_bytes) = map_to_consumer_group(payload, position)?;
consumer_groups.push(consumer_group);
position += read_bytes;
}
consumer_groups.sort_by(|x, y| x.id.cmp(&y.id));
Ok(consumer_groups)
}
pub fn map_consumer_group(payload: &[u8]) -> Result<ConsumerGroupDetails, IggyError> {
let (consumer_group, mut position) = map_to_consumer_group(payload, 0)?;
let mut members = Vec::new();
let length = payload.len();
while position < length {
let (member, read_bytes) = map_to_consumer_group_member(payload, position)?;
members.push(member);
position += read_bytes;
}
members.sort_by(|x, y| x.id.cmp(&y.id));
let consumer_group_details = ConsumerGroupDetails {
id: consumer_group.id,
name: consumer_group.name,
partitions_count: consumer_group.partitions_count,
members_count: consumer_group.members_count,
members,
};
Ok(consumer_group_details)
}
fn map_to_consumer_group(
payload: &[u8],
position: usize,
) -> Result<(ConsumerGroup, usize), IggyError> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let partitions_count = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let members_count = u32::from_le_bytes(payload[position + 8..position + 12].try_into()?);
let name_length = payload[position + 12];
let name =
from_utf8(&payload[position + 13..position + 13 + name_length as usize])?.to_string();
let read_bytes = 13 + name_length as usize;
Ok((
ConsumerGroup {
id,
partitions_count,
members_count,
name,
},
read_bytes,
))
}
fn map_to_consumer_group_member(
payload: &[u8],
position: usize,
) -> Result<(ConsumerGroupMember, usize), IggyError> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let partitions_count = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let mut partitions = Vec::new();
for i in 0..partitions_count {
let partition_id = u32::from_le_bytes(
payload[position + 8 + (i * 4) as usize..position + 8 + ((i + 1) * 4) as usize]
.try_into()?,
);
partitions.push(partition_id);
}
let read_bytes = (4 + 4 + partitions_count * 4) as usize;
Ok((
ConsumerGroupMember {
id,
partitions_count,
partitions,
},
read_bytes,
))
}
fn map_to_client_info(
payload: &[u8],
mut position: usize,
) -> Result<(ClientInfo, usize), IggyError> {
let mut read_bytes;
let client_id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let user_id = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let user_id = match user_id {
0 => None,
_ => Some(user_id),
};
let transport = payload[position + 8];
let transport = match transport {
1 => "TCP",
2 => "QUIC",
_ => "Unknown",
}
.to_string();
let address_length =
u32::from_le_bytes(payload[position + 9..position + 13].try_into()?) as usize;
let address = from_utf8(&payload[position + 13..position + 13 + address_length])?.to_string();
read_bytes = 4 + 4 + 1 + 4 + address_length;
position += read_bytes;
let consumer_groups_count = u32::from_le_bytes(payload[position..position + 4].try_into()?);
read_bytes += 4;
Ok((
ClientInfo {
client_id,
user_id,
address,
transport,
consumer_groups_count,
},
read_bytes,
))
}
fn map_to_user_info(payload: &[u8], position: usize) -> Result<(UserInfo, usize), IggyError> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let created_at = u64::from_le_bytes(payload[position + 4..position + 12].try_into()?);
let status = payload[position + 12];
let status = UserStatus::from_code(status)?;
let username_length = payload[position + 13];
let username =
from_utf8(&payload[position + 14..position + 14 + username_length as usize])?.to_string();
let read_bytes = 4 + 8 + 1 + 1 + username_length as usize;
Ok((
UserInfo {
id,
created_at,
status,
username,
},
read_bytes,
))
}
fn map_to_pat_info(
payload: &[u8],
position: usize,
) -> Result<(PersonalAccessTokenInfo, usize), IggyError> {
let name_length = payload[position];
let name = from_utf8(&payload[position + 1..position + 1 + name_length as usize])?.to_string();
let position = position + 1 + name_length as usize;
let expiry = u64::from_le_bytes(payload[position..position + 8].try_into()?);
let expiry = match expiry {
0 => None,
_ => Some(expiry),
};
let read_bytes = 1 + name_length as usize + 8;
Ok((PersonalAccessTokenInfo { name, expiry }, read_bytes))
}