blob: 5093b2de934d05694a733d38f3c7ec3b8625314d [file] [log] [blame]
use crate::state::{EntryCommand, StateEntry, COMPONENT};
use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use error_set::ErrContext;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::error::IggyError;
use iggy::identifier::{IdKind, Identifier};
use iggy::models::permissions::Permissions;
use iggy::models::user_status::UserStatus;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::timestamp::IggyTimestamp;
use iggy::utils::topic_size::MaxTopicSize;
use std::collections::HashMap;
use std::fmt::Display;
use tracing::debug;
#[derive(Debug)]
pub struct SystemState {
pub streams: HashMap<u32, StreamState>,
pub users: HashMap<u32, UserState>,
}
#[derive(Debug)]
pub struct StreamState {
pub id: u32,
pub name: String,
pub created_at: IggyTimestamp,
pub topics: HashMap<u32, TopicState>,
pub current_topic_id: u32,
}
#[derive(Debug)]
pub struct TopicState {
pub id: u32,
pub name: String,
pub partitions: HashMap<u32, PartitionState>,
pub consumer_groups: HashMap<u32, ConsumerGroupState>,
pub compression_algorithm: CompressionAlgorithm,
pub message_expiry: IggyExpiry,
pub max_topic_size: MaxTopicSize,
pub replication_factor: Option<u8>,
pub created_at: IggyTimestamp,
pub current_consumer_group_id: u32,
}
#[derive(Debug)]
pub struct PartitionState {
pub id: u32,
pub created_at: IggyTimestamp,
}
#[derive(Debug)]
pub struct PersonalAccessTokenState {
pub name: String,
pub token_hash: String,
pub expiry_at: Option<IggyTimestamp>,
}
#[derive(Debug)]
pub struct UserState {
pub id: u32,
pub username: String,
pub password_hash: String,
pub status: UserStatus,
pub permissions: Option<Permissions>,
pub personal_access_tokens: HashMap<String, PersonalAccessTokenState>,
}
#[derive(Debug)]
pub struct ConsumerGroupState {
pub id: u32,
pub name: String,
}
impl SystemState {
pub async fn init(entries: Vec<StateEntry>) -> Result<Self, IggyError> {
let mut streams = HashMap::new();
let mut users = HashMap::new();
let mut current_stream_id = 0;
let mut current_user_id = 0;
for entry in entries {
debug!("Processing state entry: {entry}",);
match entry.command().with_error_context(|_| {
format!("{COMPONENT} - failed to retrieve state entry command: {entry}")
})? {
EntryCommand::CreateStream(command) => {
let stream_id = command.stream_id.unwrap_or_else(|| {
current_stream_id += 1;
current_stream_id
});
let stream = StreamState {
id: stream_id,
name: command.name.clone(),
topics: HashMap::new(),
current_topic_id: 0,
created_at: entry.timestamp,
};
streams.insert(stream.id, stream);
}
EntryCommand::UpdateStream(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
stream.name = command.name;
}
EntryCommand::DeleteStream(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
streams.remove(&stream_id);
}
EntryCommand::PurgeStream(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
streams
.get(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
// It only affects the segments which are not part of the state
}
EntryCommand::CreateTopic(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = command.topic_id.unwrap_or_else(|| {
stream.current_topic_id += 1;
stream.current_topic_id
});
let topic = TopicState {
id: topic_id,
name: command.name,
consumer_groups: HashMap::new(),
current_consumer_group_id: 0,
compression_algorithm: command.compression_algorithm,
message_expiry: command.message_expiry,
max_topic_size: command.max_topic_size,
replication_factor: command.replication_factor,
created_at: entry.timestamp,
partitions: if command.partitions_count > 0 {
let mut partitions = HashMap::new();
for i in 1..=command.partitions_count {
partitions.insert(
i,
PartitionState {
id: i,
created_at: entry.timestamp,
},
);
}
partitions
} else {
HashMap::new()
},
};
stream.topics.insert(topic.id, topic);
}
EntryCommand::UpdateTopic(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
topic.name = command.name;
topic.compression_algorithm = command.compression_algorithm;
topic.message_expiry = command.message_expiry;
topic.max_topic_size = command.max_topic_size;
topic.replication_factor = command.replication_factor;
}
EntryCommand::DeleteTopic(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
stream.topics.remove(&topic_id);
}
EntryCommand::PurgeTopic(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
stream
.topics
.get(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
// It only affects the segments which are not part of the state
}
EntryCommand::CreatePartitions(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
let last_partition_id = if topic.partitions.is_empty() {
0
} else {
topic
.partitions
.values()
.map(|p| p.id)
.max()
.unwrap_or_else(|| panic!("No partition found"))
};
for i in 1..=command.partitions_count {
topic.partitions.insert(
last_partition_id + i,
PartitionState {
id: last_partition_id + i,
created_at: entry.timestamp,
},
);
}
}
EntryCommand::DeletePartitions(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
if topic.partitions.is_empty() {
continue;
}
let last_partition_id = topic
.partitions
.values()
.map(|p| p.id)
.max()
.unwrap_or_else(|| panic!("No partition found"));
for i in 0..command.partitions_count {
topic.partitions.remove(&(last_partition_id - i));
}
}
EntryCommand::CreateConsumerGroup(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
let consumer_group_id = command.group_id.unwrap_or_else(|| {
topic.current_consumer_group_id += 1;
topic.current_consumer_group_id
});
let consumer_group = ConsumerGroupState {
id: consumer_group_id,
name: command.name,
};
topic
.consumer_groups
.insert(consumer_group.id, consumer_group);
}
EntryCommand::DeleteConsumerGroup(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
let consumer_group_id =
find_consumer_group_id(&topic.consumer_groups, &command.group_id);
topic.consumer_groups.remove(&consumer_group_id);
}
EntryCommand::CreateUser(command) => {
current_user_id += 1;
let user = UserState {
id: current_user_id,
username: command.username,
password_hash: command.password, // This is already hashed
status: command.status,
permissions: command.permissions,
personal_access_tokens: HashMap::new(),
};
users.insert(user.id, user);
}
EntryCommand::UpdateUser(command) => {
let user_id = find_user_id(&users, &command.user_id);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
if let Some(username) = &command.username {
user.username.clone_from(username);
}
if let Some(status) = &command.status {
user.status = *status;
}
}
EntryCommand::DeleteUser(command) => {
let user_id = find_user_id(&users, &command.user_id);
users.remove(&user_id);
}
EntryCommand::ChangePassword(command) => {
let user_id = find_user_id(&users, &command.user_id);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
user.password_hash = command.new_password // This is already hashed
}
EntryCommand::UpdatePermissions(command) => {
let user_id = find_user_id(&users, &command.user_id);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
user.permissions = command.permissions;
}
EntryCommand::CreatePersonalAccessToken(command) => {
let token_hash = command.hash;
let user_id = find_user_id(
&users,
&entry.user_id.try_into().with_error_context(|_| {
format!(
"{COMPONENT} - failed to find user, user ID: {}",
entry.user_id
)
})?,
);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
let expiry_at = PersonalAccessToken::calculate_expiry_at(
entry.timestamp,
command.command.expiry,
);
if let Some(expiry_at) = expiry_at {
if expiry_at.as_micros() <= IggyTimestamp::now().as_micros() {
debug!("Personal access token: {token_hash} has already expired.");
continue;
}
}
user.personal_access_tokens.insert(
command.command.name.clone(),
PersonalAccessTokenState {
name: command.command.name,
token_hash,
expiry_at,
},
);
}
EntryCommand::DeletePersonalAccessToken(command) => {
let user_id = find_user_id(
&users,
&entry.user_id.try_into().with_error_context(|_| {
format!(
"{COMPONENT} - failed to find user, user ID: {}",
entry.user_id
)
})?,
);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
user.personal_access_tokens.remove(&command.name);
}
}
}
let state = SystemState { streams, users };
debug!("+++ State +++");
debug!("{state}");
debug!("+++ State +++");
Ok(state)
}
}
fn find_stream_id(streams: &HashMap<u32, StreamState>, stream_id: &Identifier) -> u32 {
match stream_id.kind {
IdKind::Numeric => stream_id
.get_u32_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid stream ID: {stream_id}"))),
IdKind::String => {
let name = stream_id
.get_cow_str_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid stream name: {stream_id}")));
let stream = streams
.values()
.find(|s| s.name == name)
.unwrap_or_else(|| panic!("{}", format!("Stream: {name} not found")));
stream.id
}
}
}
fn find_topic_id(topics: &HashMap<u32, TopicState>, topic_id: &Identifier) -> u32 {
match topic_id.kind {
IdKind::Numeric => topic_id
.get_u32_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid topic ID: {topic_id}"))),
IdKind::String => {
let name = topic_id
.get_cow_str_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid topic name: {topic_id}")));
let topic = topics
.values()
.find(|s| s.name == name)
.unwrap_or_else(|| panic!("{}", format!("Topic: {name} not found")));
topic.id
}
}
}
fn find_consumer_group_id(groups: &HashMap<u32, ConsumerGroupState>, group_id: &Identifier) -> u32 {
match group_id.kind {
IdKind::Numeric => group_id
.get_u32_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid group ID: {group_id}"))),
IdKind::String => {
let name = group_id
.get_cow_str_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid group name: {group_id}")));
let group = groups
.values()
.find(|s| s.name == name)
.unwrap_or_else(|| panic!("{}", format!("Consumer group: {name} not found")));
group.id
}
}
}
fn find_user_id(users: &HashMap<u32, UserState>, user_id: &Identifier) -> u32 {
match user_id.kind {
IdKind::Numeric => user_id
.get_u32_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid user ID: {user_id}"))),
IdKind::String => {
let username = user_id
.get_cow_str_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid username: {user_id}")));
let user = users
.values()
.find(|s| s.username == username)
.unwrap_or_else(|| panic!("{}", format!("User: {username} not found")));
user.id
}
}
}
impl Display for SystemState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Streams:")?;
for stream in self.streams.iter() {
write!(f, "\n================\n")?;
write!(f, "{}", stream.1)?;
}
write!(f, "Users:")?;
for user in self.users.iter() {
write!(f, "\n================\n")?;
write!(f, "{}", user.1)?;
}
Ok(())
}
}
impl Display for ConsumerGroupState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ConsumerGroup -> ID: {}, Name: {}", self.id, self.name)
}
}
impl Display for UserState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let permissions = if let Some(permissions) = &self.permissions {
permissions.to_string()
} else {
"no_permissions".to_string()
};
write!(
f,
"User -> ID: {}, Username: {}, Status: {}, Permissions: {}",
self.id, self.username, self.status, permissions
)
}
}
impl Display for StreamState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Stream -> ID: {}, Name: {}", self.id, self.name,)?;
for topic in self.topics.iter() {
write!(f, "\n {}", topic.1)?;
}
Ok(())
}
}
impl Display for TopicState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Topic -> ID: {}, Name: {}", self.id, self.name,)?;
for partition in self.partitions.iter() {
write!(f, "\n {}", partition.1)?;
}
write!(f, "\nConsumer Groups:")?;
for consumer_group in self.consumer_groups.iter() {
write!(f, "\n {}", consumer_group.1)?;
}
Ok(())
}
}
impl Display for PartitionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Partition -> ID: {}, Created At: {}",
self.id, self.created_at
)
}
}