blob: 35a5601640d1c4738ecb62c6d37868708a1550ac [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::bootstrap::create_root_user;
use crate::state::file::FileState;
use crate::state::models::CreateUserWithId;
use crate::state::{COMPONENT, EntryCommand, StateEntry};
use ahash::AHashMap;
use err_trail::ErrContext;
use iggy_common::CompressionAlgorithm;
use iggy_common::IggyError;
use iggy_common::IggyExpiry;
use iggy_common::IggyTimestamp;
use iggy_common::MaxTopicSize;
use iggy_common::PersonalAccessToken;
use iggy_common::create_user::CreateUser;
use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
use iggy_common::{IdKind, Identifier, Permissions, UserStatus};
use std::collections::BTreeMap;
use std::fmt::Display;
use tracing::{debug, error, info};
#[derive(Debug, Clone)]
pub struct SystemState {
pub streams: BTreeMap<u32, StreamState>,
pub users: AHashMap<u32, UserState>,
}
impl SystemState {
pub fn decompose(self) -> (BTreeMap<u32, StreamState>, AHashMap<u32, UserState>) {
(self.streams, self.users)
}
}
#[derive(Debug, Clone)]
pub struct StreamState {
pub id: u32,
pub name: String,
pub created_at: IggyTimestamp,
pub topics: BTreeMap<u32, TopicState>,
}
#[derive(Debug, Clone)]
pub struct TopicState {
pub id: u32,
pub name: String,
pub partitions: BTreeMap<u32, PartitionState>,
pub consumer_groups: BTreeMap<u32, ConsumerGroupState>,
pub compression_algorithm: CompressionAlgorithm,
pub message_expiry: IggyExpiry,
pub max_topic_size: MaxTopicSize,
pub replication_factor: Option<u8>,
pub created_at: IggyTimestamp,
}
#[derive(Debug, Clone)]
pub struct PartitionState {
pub id: u32,
pub created_at: IggyTimestamp,
}
#[derive(Debug, Clone)]
pub struct PersonalAccessTokenState {
pub name: String,
pub token_hash: String,
pub expiry_at: Option<IggyTimestamp>,
}
#[derive(Debug, Clone)]
pub struct UserState {
pub id: u32,
pub username: String,
pub password_hash: String,
pub status: UserStatus,
pub created_at: IggyTimestamp,
pub permissions: Option<Permissions>,
pub personal_access_tokens: AHashMap<String, PersonalAccessTokenState>,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupState {
pub id: u32,
pub name: String,
}
impl SystemState {
pub async fn load(state: FileState) -> Result<Self, IggyError> {
let mut state_entries = state.init().await.error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to initialize state entries")
})?;
// Create root user if does not exist.
let root_exists = state_entries
.iter()
.any(|entry| {
entry
.command()
.map(|command| matches!(command, EntryCommand::CreateUser(payload) if payload.user_id == DEFAULT_ROOT_USER_ID))
.unwrap_or_else(|err| {
error!("Failed to check if root user exists: {err}");
false
})
});
if !root_exists {
info!("No users found, creating the root user...");
let root = create_root_user();
let command = CreateUser {
username: root.username.clone(),
password: root.password.clone(),
status: root.status,
permissions: root.permissions.clone(),
};
state
.apply(0, &EntryCommand::CreateUser(CreateUserWithId {
user_id: root.id,
command
}))
.await
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to apply create user command, username: {}",
root.username
)
})?;
state_entries = state.init().await.error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to initialize state entries")
})?;
}
let system_state = Self::init(state_entries).await.error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to initialize system state")
})?;
Ok(system_state)
}
pub async fn init(entries: Vec<StateEntry>) -> Result<Self, IggyError> {
let mut streams = BTreeMap::new();
let mut users = AHashMap::new();
for entry in entries {
debug!("Processing state entry: {entry}",);
match entry.command().error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to retrieve state entry command: {entry}"
)
})? {
EntryCommand::CreateStream(command) => {
info!("Creating stream: {command:?}");
let stream_id = command.stream_id;
let command = command.command;
let stream = StreamState {
id: stream_id,
name: command.name.clone(),
topics: BTreeMap::new(),
created_at: entry.timestamp,
};
streams.insert(stream.id, stream);
}
EntryCommand::UpdateStream(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
stream.name = command.name;
}
EntryCommand::DeleteStream(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
streams.remove(&stream_id);
}
EntryCommand::PurgeStream(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
streams
.get(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
// It only affects the segments which are not part of the state
}
EntryCommand::CreateTopic(command) => {
let stream_id = find_stream_id(&streams, &command.command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = command.topic_id;
let command = command.command;
let topic = TopicState {
id: topic_id,
name: command.name,
consumer_groups: BTreeMap::new(),
compression_algorithm: command.compression_algorithm,
message_expiry: command.message_expiry,
max_topic_size: command.max_topic_size,
replication_factor: command.replication_factor,
created_at: entry.timestamp,
partitions: if command.partitions_count > 0 {
let mut partitions = BTreeMap::new();
for i in 0..command.partitions_count {
partitions.insert(
i,
PartitionState {
id: i,
created_at: entry.timestamp,
},
);
}
partitions
} else {
BTreeMap::new()
},
};
stream.topics.insert(topic.id, topic);
}
EntryCommand::UpdateTopic(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
topic.name = command.name;
topic.compression_algorithm = command.compression_algorithm;
topic.message_expiry = command.message_expiry;
topic.max_topic_size = command.max_topic_size;
topic.replication_factor = command.replication_factor;
}
EntryCommand::DeleteTopic(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
stream.topics.remove(&topic_id);
}
EntryCommand::PurgeTopic(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
stream
.topics
.get(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
// It only affects the segments which are not part of the state
}
EntryCommand::CreatePartitions(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
let last_partition_id = if topic.partitions.is_empty() {
0
} else {
topic
.partitions
.values()
.map(|p| p.id)
.max()
.unwrap_or_else(|| panic!("No partition found"))
};
for i in 1..=command.partitions_count {
topic.partitions.insert(
last_partition_id + i,
PartitionState {
id: last_partition_id + i,
created_at: entry.timestamp,
},
);
}
}
EntryCommand::DeletePartitions(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
if topic.partitions.is_empty() {
continue;
}
let last_partition_id = topic
.partitions
.values()
.map(|p| p.id)
.max()
.unwrap_or_else(|| panic!("No partition found"));
for i in 0..command.partitions_count {
topic.partitions.remove(&(last_partition_id - i));
}
}
EntryCommand::DeleteSegments(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
if topic.partitions.is_empty() {
continue;
}
let partition_id = command.partition_id;
let _partition =
topic
.partitions
.get(&command.partition_id)
.unwrap_or_else(|| {
panic!("{}", format!("Partition {partition_id} not found."))
});
// State is not affected by the delete segments
}
EntryCommand::CreateConsumerGroup(command) => {
let consumer_group_id = command.group_id;
let command = command.command;
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
let consumer_group = ConsumerGroupState {
id: consumer_group_id,
name: command.name,
};
topic
.consumer_groups
.insert(consumer_group.id, consumer_group);
}
EntryCommand::DeleteConsumerGroup(command) => {
let stream_id = find_stream_id(&streams, &command.stream_id);
let stream = streams
.get_mut(&stream_id)
.unwrap_or_else(|| panic!("{}", format!("Stream: {stream_id} not found")));
let topic_id = find_topic_id(&stream.topics, &command.topic_id);
let topic = stream
.topics
.get_mut(&topic_id)
.unwrap_or_else(|| panic!("{}", format!("Topic: {topic_id} not found")));
let consumer_group_id =
find_consumer_group_id(&topic.consumer_groups, &command.group_id);
topic.consumer_groups.remove(&consumer_group_id);
}
EntryCommand::CreateUser(command) => {
let user_id = command.user_id;
let command = command.command;
let user = UserState {
id: user_id,
username: command.username,
password_hash: command.password, // This is already hashed
status: command.status,
created_at: entry.timestamp,
permissions: command.permissions,
personal_access_tokens: AHashMap::new(),
};
users.insert(user.id, user);
}
EntryCommand::UpdateUser(command) => {
let user_id = find_user_id(&users, &command.user_id);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
if let Some(username) = &command.username {
user.username.clone_from(username);
}
if let Some(status) = &command.status {
user.status = *status;
}
}
EntryCommand::DeleteUser(command) => {
let user_id = find_user_id(&users, &command.user_id);
users.remove(&user_id);
}
EntryCommand::ChangePassword(command) => {
let user_id = find_user_id(&users, &command.user_id);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
user.password_hash = command.new_password // This is already hashed
}
EntryCommand::UpdatePermissions(command) => {
let user_id = find_user_id(&users, &command.user_id);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
user.permissions = command.permissions;
}
EntryCommand::CreatePersonalAccessToken(command) => {
let token_hash = command.hash;
let user_id = find_user_id(
&users,
&entry.user_id.try_into().error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to find user, user ID: {}",
entry.user_id
)
})?,
);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
let expiry_at = PersonalAccessToken::calculate_expiry_at(
entry.timestamp,
command.command.expiry,
);
if let Some(expiry_at) = expiry_at
&& expiry_at.as_micros() <= IggyTimestamp::now().as_micros()
{
debug!("Personal access token: {token_hash} has already expired.");
continue;
}
user.personal_access_tokens.insert(
command.command.name.clone(),
PersonalAccessTokenState {
name: command.command.name,
token_hash,
expiry_at,
},
);
}
EntryCommand::DeletePersonalAccessToken(command) => {
let user_id = find_user_id(
&users,
&entry.user_id.try_into().error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to find user, user ID: {}",
entry.user_id
)
})?,
);
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found")));
user.personal_access_tokens.remove(&command.name);
}
}
}
let state = SystemState { streams, users };
debug!("+++ State +++");
debug!("{state}");
debug!("+++ State +++");
Ok(state)
}
}
fn find_stream_id(streams: &BTreeMap<u32, StreamState>, stream_id: &Identifier) -> u32 {
match stream_id.kind {
IdKind::Numeric => stream_id
.get_u32_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid stream ID: {stream_id}"))),
IdKind::String => {
let name = stream_id
.get_cow_str_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid stream name: {stream_id}")));
let stream = streams
.values()
.find(|s| s.name == name)
.unwrap_or_else(|| panic!("{}", format!("Stream: {name} not found")));
stream.id
}
}
}
fn find_topic_id(topics: &BTreeMap<u32, TopicState>, topic_id: &Identifier) -> u32 {
match topic_id.kind {
IdKind::Numeric => topic_id
.get_u32_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid topic ID: {topic_id}"))),
IdKind::String => {
let name = topic_id
.get_cow_str_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid topic name: {topic_id}")));
let topic = topics
.values()
.find(|s| s.name == name)
.unwrap_or_else(|| panic!("{}", format!("Topic: {name} not found")));
topic.id
}
}
}
fn find_consumer_group_id(
groups: &BTreeMap<u32, ConsumerGroupState>,
group_id: &Identifier,
) -> u32 {
match group_id.kind {
IdKind::Numeric => group_id
.get_u32_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid group ID: {group_id}"))),
IdKind::String => {
let name = group_id
.get_cow_str_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid group name: {group_id}")));
let group = groups
.values()
.find(|s| s.name == name)
.unwrap_or_else(|| panic!("{}", format!("Consumer group: {name} not found")));
group.id
}
}
}
fn find_user_id(users: &AHashMap<u32, UserState>, user_id: &Identifier) -> u32 {
match user_id.kind {
IdKind::Numeric => user_id
.get_u32_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid user ID: {user_id}"))),
IdKind::String => {
let username = user_id
.get_cow_str_value()
.unwrap_or_else(|_| panic!("{}", format!("Invalid username: {user_id}")));
let user = users
.values()
.find(|s| s.username == username)
.unwrap_or_else(|| panic!("{}", format!("User: {username} not found")));
user.id
}
}
}
impl Display for SystemState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Streams:")?;
for stream in self.streams.iter() {
write!(f, "\n================\n")?;
write!(f, "{}", stream.1)?;
}
write!(f, "Users:")?;
for user in self.users.iter() {
write!(f, "\n================\n")?;
write!(f, "{}", user.1)?;
}
Ok(())
}
}
impl Display for ConsumerGroupState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ConsumerGroup -> ID: {}, Name: {}", self.id, self.name)
}
}
impl Display for UserState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let permissions = if let Some(permissions) = &self.permissions {
permissions.to_string()
} else {
"no_permissions".to_string()
};
write!(
f,
"User -> ID: {}, Username: {}, Status: {}, Permissions: {}",
self.id, self.username, self.status, permissions
)
}
}
impl Display for StreamState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Stream -> ID: {}, Name: {}", self.id, self.name,)?;
for topic in self.topics.iter() {
write!(f, "\n {}", topic.1)?;
}
Ok(())
}
}
impl Display for TopicState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Topic -> ID: {}, Name: {}", self.id, self.name,)?;
for partition in self.partitions.iter() {
write!(f, "\n {}", partition.1)?;
}
write!(f, "\nConsumer Groups:")?;
for consumer_group in self.consumer_groups.iter() {
write!(f, "\n {}", consumer_group.1)?;
}
Ok(())
}
}
impl Display for PartitionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Partition -> ID: {}, Created At: {}",
self.id, self.created_at
)
}
}