blob: 0c44c82370f8ca0edb662f9e878e9e9590b01cca [file] [log] [blame]
use crate::bytes_serializable::BytesSerializable;
use crate::compression::compression_algorithm::CompressionAlgorithm;
use crate::error::IggyError;
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
use crate::models::messages::{MessageState, PolledMessage, PolledMessages};
use crate::models::partition::Partition;
use crate::models::permissions::Permissions;
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
use crate::models::user_info::{UserInfo, UserInfoDetails};
use crate::models::user_status::UserStatus;
use crate::utils::byte_size::IggyByteSize;
use crate::utils::expiry::IggyExpiry;
use crate::utils::topic_size::MaxTopicSize;
use bytes::Bytes;
use std::collections::HashMap;
use std::str::from_utf8;
const EMPTY_MESSAGES: Vec<PolledMessage> = vec![];
const EMPTY_TOPICS: Vec<Topic> = vec![];
const EMPTY_STREAMS: Vec<Stream> = vec![];
const EMPTY_CLIENTS: Vec<ClientInfo> = vec![];
const EMPTY_USERS: Vec<UserInfo> = vec![];
const EMPTY_PERSONAL_ACCESS_TOKENS: Vec<PersonalAccessTokenInfo> = vec![];
const EMPTY_CONSUMER_GROUPS: Vec<ConsumerGroup> = vec![];
pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
let process_id = u32::from_le_bytes(
payload[..4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let cpu_usage = f32::from_le_bytes(
payload[4..8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let total_cpu_usage = f32::from_le_bytes(
payload[8..12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let memory_usage = u64::from_le_bytes(
payload[12..20]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let total_memory = u64::from_le_bytes(
payload[20..28]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let available_memory = u64::from_le_bytes(
payload[28..36]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let run_time = u64::from_le_bytes(
payload[36..44]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let start_time = u64::from_le_bytes(
payload[44..52]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let read_bytes = u64::from_le_bytes(
payload[52..60]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let written_bytes = u64::from_le_bytes(
payload[60..68]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let messages_size_bytes = u64::from_le_bytes(
payload[68..76]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let streams_count = u32::from_le_bytes(
payload[76..80]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let topics_count = u32::from_le_bytes(
payload[80..84]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let partitions_count = u32::from_le_bytes(
payload[84..88]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let segments_count = u32::from_le_bytes(
payload[88..92]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let messages_count = u64::from_le_bytes(
payload[92..100]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let clients_count = u32::from_le_bytes(
payload[100..104]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let consumer_groups_count = u32::from_le_bytes(
payload[104..108]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let mut current_position = 108;
//
// Safely decode hostname
//
if current_position + 4 > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let hostname_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
current_position += 4;
if current_position + hostname_length > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let hostname = from_utf8(&payload[current_position..current_position + hostname_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += hostname_length;
//
// Safely Decode OS name
//
if current_position + 4 > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let os_name_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
current_position += 4;
if current_position + os_name_length > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let os_name = from_utf8(&payload[current_position..current_position + os_name_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += os_name_length;
//
// Safely decode OS version
//
if current_position + 4 > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let os_version_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
current_position += 4;
if current_position + os_version_length > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let os_version = from_utf8(&payload[current_position..current_position + os_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += os_version_length;
//
// Safely decode kernel version (NEW) + server version (NEW) + server semver (NEW)
// We'll check if there's enough bytes before reading each new field.
//
// Default them in case payload doesn't have them (older server)
let mut kernel_version = String::new();
let mut iggy_server_version = String::new();
let mut iggy_server_semver: Option<u32> = None;
// kernel_version (if it exists)
if current_position + 4 <= payload.len() {
let kernel_version_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
current_position += 4;
if current_position + kernel_version_length <= payload.len() {
let kv =
from_utf8(&payload[current_position..current_position + kernel_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
kernel_version = kv;
current_position += kernel_version_length;
} else {
// Not enough bytes for kernel version string, treat as empty or error out
// return Err(IggyError::InvalidNumberEncoding);
kernel_version = String::new(); // fallback
}
} else {
// This means older server didn't send kernel_version, so remain empty
}
// iggy_server_version (if it exists)
if current_position + 4 <= payload.len() {
let iggy_version_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
current_position += 4;
if current_position + iggy_version_length <= payload.len() {
let iv = from_utf8(&payload[current_position..current_position + iggy_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
iggy_server_version = iv;
current_position += iggy_version_length;
} else {
// Not enough bytes for iggy version string, treat as empty or error out
// return Err(IggyError::InvalidNumberEncoding);
iggy_server_version = String::new(); // fallback
}
} else {
// older server didn't send iggy_server_version, so remain empty
}
// iggy_server_semver (if it exists)
if current_position + 4 <= payload.len() {
let semver = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
// current_position += 4; // uncomment this when adding new fields
if semver != 0 {
iggy_server_semver = Some(semver);
}
} else {
// older server didn't send semver
}
Ok(Stats {
process_id,
cpu_usage,
total_cpu_usage,
memory_usage,
total_memory,
available_memory,
run_time,
start_time,
read_bytes,
written_bytes,
messages_size_bytes,
streams_count,
topics_count,
partitions_count,
segments_count,
messages_count,
clients_count,
consumer_groups_count,
hostname,
os_name,
os_version,
kernel_version,
iggy_server_version,
iggy_server_semver,
})
}
pub fn map_consumer_offset(payload: Bytes) -> Result<ConsumerOffsetInfo, IggyError> {
let partition_id = u32::from_le_bytes(
payload[..4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let current_offset = u64::from_le_bytes(
payload[4..12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let stored_offset = u64::from_le_bytes(
payload[12..20]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
Ok(ConsumerOffsetInfo {
partition_id,
current_offset,
stored_offset,
})
}
pub fn map_user(payload: Bytes) -> Result<UserInfoDetails, IggyError> {
let (user, position) = map_to_user_info(payload.clone(), 0)?;
let has_permissions = payload[position];
let permissions = if has_permissions == 1 {
let permissions_length = u32::from_le_bytes(
payload[position + 1..position + 5]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
let permissions = payload.slice(position + 5..position + 5 + permissions_length);
Some(Permissions::from_bytes(permissions)?)
} else {
None
};
let user = UserInfoDetails {
id: user.id,
created_at: user.created_at,
status: user.status,
username: user.username,
permissions,
};
Ok(user)
}
pub fn map_users(payload: Bytes) -> Result<Vec<UserInfo>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_USERS);
}
let mut users = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (user, read_bytes) = map_to_user_info(payload.clone(), position)?;
users.push(user);
position += read_bytes;
}
users.sort_by(|x, y| x.id.cmp(&y.id));
Ok(users)
}
pub fn map_personal_access_tokens(
payload: Bytes,
) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_PERSONAL_ACCESS_TOKENS);
}
let mut personal_access_tokens = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (personal_access_token, read_bytes) = map_to_pat_info(payload.clone(), position)?;
personal_access_tokens.push(personal_access_token);
position += read_bytes;
}
personal_access_tokens.sort_by(|x, y| x.name.cmp(&y.name));
Ok(personal_access_tokens)
}
pub fn map_identity_info(payload: Bytes) -> Result<IdentityInfo, IggyError> {
let user_id = u32::from_le_bytes(
payload[..4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
Ok(IdentityInfo {
user_id,
access_token: None,
})
}
pub fn map_raw_pat(payload: Bytes) -> Result<RawPersonalAccessToken, IggyError> {
let token_length = payload[0];
let token = from_utf8(&payload[1..1 + token_length as usize])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
Ok(RawPersonalAccessToken { token })
}
pub fn map_client(payload: Bytes) -> Result<ClientInfoDetails, IggyError> {
let (client, mut position) = map_to_client_info(payload.clone(), 0)?;
let mut consumer_groups = Vec::new();
let length = payload.len();
while position < length {
for _ in 0..client.consumer_groups_count {
let stream_id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let topic_id = u32::from_le_bytes(
payload[position + 4..position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let group_id = u32::from_le_bytes(
payload[position + 8..position + 12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let consumer_group = ConsumerGroupInfo {
stream_id,
topic_id,
group_id,
};
consumer_groups.push(consumer_group);
position += 12;
}
}
consumer_groups.sort_by(|x, y| x.group_id.cmp(&y.group_id));
let client = ClientInfoDetails {
client_id: client.client_id,
user_id: client.user_id,
address: client.address,
transport: client.transport,
consumer_groups_count: client.consumer_groups_count,
consumer_groups,
};
Ok(client)
}
pub fn map_clients(payload: Bytes) -> Result<Vec<ClientInfo>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_CLIENTS);
}
let mut clients = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (client, read_bytes) = map_to_client_info(payload.clone(), position)?;
clients.push(client);
position += read_bytes;
}
clients.sort_by(|x, y| x.client_id.cmp(&y.client_id));
Ok(clients)
}
pub fn map_polled_messages(payload: Bytes) -> Result<PolledMessages, IggyError> {
if payload.is_empty() {
return Ok(PolledMessages {
messages: EMPTY_MESSAGES,
partition_id: 0,
current_offset: 0,
});
}
let length = payload.len();
let partition_id = u32::from_le_bytes(
payload[..4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let current_offset = u64::from_le_bytes(
payload[4..12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
// Currently ignored
let _messages_count = u32::from_le_bytes(
payload[12..16]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let mut position = 16;
let mut messages = Vec::new();
while position < length {
let offset = u64::from_le_bytes(
payload[position..position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let state = MessageState::from_code(payload[position + 8])?;
let timestamp = u64::from_le_bytes(
payload[position + 9..position + 17]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let id = u128::from_le_bytes(
payload[position + 17..position + 33]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let checksum = u32::from_le_bytes(
payload[position + 33..position + 37]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let headers_length = u32::from_le_bytes(
payload[position + 37..position + 41]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let headers = if headers_length > 0 {
let headers_payload =
payload.slice(position + 41..position + 41 + headers_length as usize);
Some(HashMap::from_bytes(headers_payload)?)
} else {
None
};
position += headers_length as usize;
let message_length = u32::from_le_bytes(
payload[position + 41..position + 45]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let payload_range = position + 45..position + 45 + message_length as usize;
if payload_range.start > length || payload_range.end > length {
break;
}
let payload = payload[payload_range].to_vec();
let total_size = 45 + message_length as usize;
position += total_size;
messages.push(PolledMessage {
offset,
timestamp,
state,
checksum,
id,
headers,
length: IggyByteSize::from(message_length as u64),
payload: Bytes::from(payload),
});
if position + 45 >= length {
break;
}
}
messages.sort_by(|x, y| x.offset.cmp(&y.offset));
Ok(PolledMessages {
partition_id,
current_offset,
messages,
})
}
pub fn map_streams(payload: Bytes) -> Result<Vec<Stream>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_STREAMS);
}
let mut streams = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (stream, read_bytes) = map_to_stream(payload.clone(), position)?;
streams.push(stream);
position += read_bytes;
}
streams.sort_by(|x, y| x.id.cmp(&y.id));
Ok(streams)
}
pub fn map_stream(payload: Bytes) -> Result<StreamDetails, IggyError> {
let (stream, mut position) = map_to_stream(payload.clone(), 0)?;
let mut topics = Vec::new();
let length = payload.len();
while position < length {
let (topic, read_bytes) = map_to_topic(payload.clone(), position)?;
topics.push(topic);
position += read_bytes;
}
topics.sort_by(|x, y| x.id.cmp(&y.id));
let stream = StreamDetails {
id: stream.id,
created_at: stream.created_at,
topics_count: stream.topics_count,
size: stream.size,
messages_count: stream.messages_count,
name: stream.name,
topics,
};
Ok(stream)
}
fn map_to_stream(payload: Bytes, position: usize) -> Result<(Stream, usize), IggyError> {
let id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = u64::from_le_bytes(
payload[position + 4..position + 12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let topics_count = u32::from_le_bytes(
payload[position + 12..position + 16]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let size_bytes = u64::from_le_bytes(
payload[position + 16..position + 24]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let messages_count = u64::from_le_bytes(
payload[position + 24..position + 32]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let name_length = payload[position + 32];
let name = from_utf8(&payload[position + 33..position + 33 + name_length as usize])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
let read_bytes = 4 + 8 + 4 + 8 + 8 + 1 + name_length as usize;
Ok((
Stream {
id,
created_at,
name,
size: size_bytes,
messages_count,
topics_count,
},
read_bytes,
))
}
pub fn map_topics(payload: Bytes) -> Result<Vec<Topic>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_TOPICS);
}
let mut topics = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (topic, read_bytes) = map_to_topic(payload.clone(), position)?;
topics.push(topic);
position += read_bytes;
}
topics.sort_by(|x, y| x.id.cmp(&y.id));
Ok(topics)
}
pub fn map_topic(payload: Bytes) -> Result<TopicDetails, IggyError> {
let (topic, mut position) = map_to_topic(payload.clone(), 0)?;
let mut partitions = Vec::new();
let length = payload.len();
while position < length {
let (partition, read_bytes) = map_to_partition(payload.clone(), position)?;
partitions.push(partition);
position += read_bytes;
}
partitions.sort_by(|x, y| x.id.cmp(&y.id));
let topic = TopicDetails {
id: topic.id,
created_at: topic.created_at,
name: topic.name,
size: topic.size,
messages_count: topic.messages_count,
message_expiry: topic.message_expiry,
compression_algorithm: topic.compression_algorithm,
max_topic_size: topic.max_topic_size,
replication_factor: topic.replication_factor,
#[allow(clippy::cast_possible_truncation)]
partitions_count: partitions.len() as u32,
partitions,
};
Ok(topic)
}
fn map_to_topic(payload: Bytes, position: usize) -> Result<(Topic, usize), IggyError> {
let id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = u64::from_le_bytes(
payload[position + 4..position + 12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = created_at.into();
let partitions_count = u32::from_le_bytes(
payload[position + 12..position + 16]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let message_expiry = match u64::from_le_bytes(
payload[position + 16..position + 24]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) {
0 => IggyExpiry::NeverExpire,
message_expiry => message_expiry.into(),
};
let compression_algorithm = CompressionAlgorithm::from_code(payload[position + 24])?;
let max_topic_size = u64::from_le_bytes(
payload[position + 25..position + 33]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let max_topic_size: MaxTopicSize = max_topic_size.into();
let replication_factor = payload[position + 33];
let size_bytes = IggyByteSize::from(u64::from_le_bytes(
payload[position + 34..position + 42]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
));
let messages_count = u64::from_le_bytes(
payload[position + 42..position + 50]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let name_length = payload[position + 50];
let name = from_utf8(&payload[position + 51..position + 51 + name_length as usize])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
let read_bytes = 4 + 8 + 4 + 8 + 8 + 8 + 8 + 1 + 1 + 1 + name_length as usize;
Ok((
Topic {
id,
created_at,
name,
partitions_count,
size: size_bytes,
messages_count,
message_expiry,
compression_algorithm,
max_topic_size,
replication_factor,
},
read_bytes,
))
}
fn map_to_partition(payload: Bytes, position: usize) -> Result<(Partition, usize), IggyError> {
let id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = u64::from_le_bytes(
payload[position + 4..position + 12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = created_at.into();
let segments_count = u32::from_le_bytes(
payload[position + 12..position + 16]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let current_offset = u64::from_le_bytes(
payload[position + 16..position + 24]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let size_bytes = u64::from_le_bytes(
payload[position + 24..position + 32]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
let messages_count = u64::from_le_bytes(
payload[position + 32..position + 40]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let read_bytes = 4 + 8 + 4 + 8 + 8 + 8;
Ok((
Partition {
id,
created_at,
segments_count,
current_offset,
size: size_bytes,
messages_count,
},
read_bytes,
))
}
pub fn map_consumer_groups(payload: Bytes) -> Result<Vec<ConsumerGroup>, IggyError> {
if payload.is_empty() {
return Ok(EMPTY_CONSUMER_GROUPS);
}
let mut consumer_groups = Vec::new();
let length = payload.len();
let mut position = 0;
while position < length {
let (consumer_group, read_bytes) = map_to_consumer_group(payload.clone(), position)?;
consumer_groups.push(consumer_group);
position += read_bytes;
}
consumer_groups.sort_by(|x, y| x.id.cmp(&y.id));
Ok(consumer_groups)
}
pub fn map_consumer_group(payload: Bytes) -> Result<ConsumerGroupDetails, IggyError> {
let (consumer_group, mut position) = map_to_consumer_group(payload.clone(), 0)?;
let mut members = Vec::new();
let length = payload.len();
while position < length {
let (member, read_bytes) = map_to_consumer_group_member(payload.clone(), position)?;
members.push(member);
position += read_bytes;
}
members.sort_by(|x, y| x.id.cmp(&y.id));
let consumer_group_details = ConsumerGroupDetails {
id: consumer_group.id,
name: consumer_group.name,
partitions_count: consumer_group.partitions_count,
members_count: consumer_group.members_count,
members,
};
Ok(consumer_group_details)
}
fn map_to_consumer_group(
payload: Bytes,
position: usize,
) -> Result<(ConsumerGroup, usize), IggyError> {
let id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let partitions_count = u32::from_le_bytes(
payload[position + 4..position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let members_count = u32::from_le_bytes(
payload[position + 8..position + 12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let name_length = payload[position + 12];
let name = from_utf8(&payload[position + 13..position + 13 + name_length as usize])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
let read_bytes = 13 + name_length as usize;
Ok((
ConsumerGroup {
id,
partitions_count,
members_count,
name,
},
read_bytes,
))
}
fn map_to_consumer_group_member(
payload: Bytes,
position: usize,
) -> Result<(ConsumerGroupMember, usize), IggyError> {
let id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let partitions_count = u32::from_le_bytes(
payload[position + 4..position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let mut partitions = Vec::new();
for i in 0..partitions_count {
let partition_id = u32::from_le_bytes(
payload[position + 8 + (i * 4) as usize..position + 8 + ((i + 1) * 4) as usize]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
partitions.push(partition_id);
}
let read_bytes = (4 + 4 + partitions_count * 4) as usize;
Ok((
ConsumerGroupMember {
id,
partitions_count,
partitions,
},
read_bytes,
))
}
fn map_to_client_info(
payload: Bytes,
mut position: usize,
) -> Result<(ClientInfo, usize), IggyError> {
let mut read_bytes;
let client_id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let user_id = u32::from_le_bytes(
payload[position + 4..position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let user_id = match user_id {
0 => None,
_ => Some(user_id),
};
let transport = payload[position + 8];
let transport = match transport {
1 => "TCP",
2 => "QUIC",
_ => "Unknown",
}
.to_string();
let address_length = u32::from_le_bytes(
payload[position + 9..position + 13]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
let address = from_utf8(&payload[position + 13..position + 13 + address_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
read_bytes = 4 + 4 + 1 + 4 + address_length;
position += read_bytes;
let consumer_groups_count = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
read_bytes += 4;
Ok((
ClientInfo {
client_id,
user_id,
address,
transport,
consumer_groups_count,
},
read_bytes,
))
}
fn map_to_user_info(payload: Bytes, position: usize) -> Result<(UserInfo, usize), IggyError> {
let id = u32::from_le_bytes(
payload[position..position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = u64::from_le_bytes(
payload[position + 4..position + 12]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let created_at = created_at.into();
let status = payload[position + 12];
let status = UserStatus::from_code(status)?;
let username_length = payload[position + 13];
let username = from_utf8(&payload[position + 14..position + 14 + username_length as usize])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
let read_bytes = 4 + 8 + 1 + 1 + username_length as usize;
Ok((
UserInfo {
id,
created_at,
status,
username,
},
read_bytes,
))
}
fn map_to_pat_info(
payload: Bytes,
position: usize,
) -> Result<(PersonalAccessTokenInfo, usize), IggyError> {
let name_length = payload[position];
let name = from_utf8(&payload[position + 1..position + 1 + name_length as usize])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
let position = position + 1 + name_length as usize;
let expiry_at = u64::from_le_bytes(
payload[position..position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let expiry_at = match expiry_at {
0 => None,
value => Some(value.into()),
};
let read_bytes = 1 + name_length as usize + 8;
Ok((PersonalAccessTokenInfo { name, expiry_at }, read_bytes))
}