blob: f6619b89277f1f4d545ec5435353a9266210c128 [file] [log] [blame]
use crate::state::command::EntryCommand;
use crate::state::system::UserState;
use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::session::Session;
use crate::streaming::systems::system::System;
use crate::streaming::users::user::User;
use crate::streaming::utils::crypto;
use crate::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV};
use iggy::error::IggyError;
use iggy::identifier::{IdKind, Identifier};
use iggy::locking::IggySharedMutFn;
use iggy::models::permissions::Permissions;
use iggy::models::user_status::UserStatus;
use iggy::users::create_user::CreateUser;
use iggy::users::defaults::*;
use iggy::utils::text;
use std::env;
use std::sync::atomic::{AtomicU32, Ordering};
use tracing::{error, info, warn};
static USER_ID: AtomicU32 = AtomicU32::new(1);
const MAX_USERS: usize = u32::MAX as usize;
impl System {
pub(crate) async fn load_users(&mut self, users: Vec<UserState>) -> Result<(), IggyError> {
info!("Loading users...");
if users.is_empty() {
info!("No users found, creating the root user...");
let root = Self::create_root_user();
let command = CreateUser {
username: root.username.clone(),
password: root.password.clone(),
status: root.status,
permissions: root.permissions.clone(),
};
self.state
.apply(0, EntryCommand::CreateUser(command))
.await?;
self.users.insert(root.id, root);
info!("Created the root user.");
}
for user_state in users.into_iter() {
let mut user = User::with_password(
user_state.id,
&user_state.username,
user_state.password_hash,
user_state.status,
user_state.permissions,
);
user.personal_access_tokens = user_state
.personal_access_tokens
.into_values()
.map(|token| {
(
token.token_hash.clone(),
PersonalAccessToken::raw(
user_state.id,
&token.name,
&token.token_hash,
token.expiry_at,
),
)
})
.collect();
self.users.insert(user_state.id, user);
}
let users_count = self.users.len();
let current_user_id = self.users.keys().max().unwrap_or(&1);
USER_ID.store(current_user_id + 1, Ordering::SeqCst);
self.permissioner
.init(&self.users.values().collect::<Vec<&User>>());
self.metrics.increment_users(users_count as u32);
info!("Initialized {} user(s).", users_count);
Ok(())
}
fn create_root_user() -> User {
let username = env::var(IGGY_ROOT_USERNAME_ENV);
let password = env::var(IGGY_ROOT_PASSWORD_ENV);
if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) {
panic!("When providing the custom root user credentials, both username and password must be set.");
}
if username.is_ok() && password.is_ok() {
info!("Using the custom root user credentials.");
} else {
info!("Using the default root user credentials.");
}
let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string());
let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string());
if username.is_empty() || password.is_empty() {
panic!("Root user credentials are not set.");
}
if username.len() < MIN_USERNAME_LENGTH {
panic!("Root username is too short.");
}
if username.len() > MAX_USERNAME_LENGTH {
panic!("Root username is too long.");
}
if password.len() < MIN_PASSWORD_LENGTH {
panic!("Root password is too short.");
}
if password.len() > MAX_PASSWORD_LENGTH {
panic!("Root password is too long.");
}
User::root(&username, &password)
}
pub fn find_user(&self, session: &Session, user_id: &Identifier) -> Result<&User, IggyError> {
self.ensure_authenticated(session)?;
let user = self.get_user(user_id);
if let Ok(user) = user {
let session_user_id = session.get_user_id();
if user.id != session_user_id {
self.permissioner.get_user(session_user_id)?;
}
return Ok(user);
}
user
}
pub fn get_user(&self, user_id: &Identifier) -> Result<&User, IggyError> {
match user_id.kind {
IdKind::Numeric => self
.users
.get(&user_id.get_u32_value()?)
.ok_or(IggyError::ResourceNotFound(user_id.to_string())),
IdKind::String => {
let username = user_id.get_cow_str_value()?;
self.users
.iter()
.find(|(_, user)| user.username == username)
.map(|(_, user)| user)
.ok_or(IggyError::ResourceNotFound(user_id.to_string()))
}
}
}
pub fn get_user_mut(&mut self, user_id: &Identifier) -> Result<&mut User, IggyError> {
match user_id.kind {
IdKind::Numeric => self
.users
.get_mut(&user_id.get_u32_value()?)
.ok_or(IggyError::ResourceNotFound(user_id.to_string())),
IdKind::String => {
let username = user_id.get_cow_str_value()?;
self.users
.iter_mut()
.find(|(_, user)| user.username == username)
.map(|(_, user)| user)
.ok_or(IggyError::ResourceNotFound(user_id.to_string()))
}
}
}
pub async fn get_users(&self, session: &Session) -> Result<Vec<&User>, IggyError> {
self.ensure_authenticated(session)?;
self.permissioner.get_users(session.get_user_id())?;
Ok(self.users.values().collect())
}
pub async fn create_user(
&mut self,
session: &Session,
username: &str,
password: &str,
status: UserStatus,
permissions: Option<Permissions>,
) -> Result<&User, IggyError> {
self.ensure_authenticated(session)?;
self.permissioner.create_user(session.get_user_id())?;
let username = text::to_lowercase_non_whitespace(username);
if self.users.iter().any(|(_, user)| user.username == username) {
error!("User: {username} already exists.");
return Err(IggyError::UserAlreadyExists);
}
if self.users.len() >= MAX_USERS {
error!("Available users limit reached.");
return Err(IggyError::UsersLimitReached);
}
let user_id = USER_ID.fetch_add(1, Ordering::SeqCst);
info!("Creating user: {username} with ID: {user_id}...");
let user = User::new(user_id, &username, password, status, permissions.clone());
self.permissioner
.init_permissions_for_user(user_id, permissions);
self.users.insert(user.id, user);
info!("Created user: {username} with ID: {user_id}.");
self.metrics.increment_users(1);
self.get_user(&user_id.try_into()?)
}
pub async fn delete_user(
&mut self,
session: &Session,
user_id: &Identifier,
) -> Result<User, IggyError> {
self.ensure_authenticated(session)?;
let existing_user_id;
let existing_username;
{
self.permissioner.delete_user(session.get_user_id())?;
let user = self.get_user(user_id)?;
if user.is_root() {
error!("Cannot delete the root user.");
return Err(IggyError::CannotDeleteUser(user.id));
}
existing_user_id = user.id;
existing_username = user.username.clone();
}
info!("Deleting user: {existing_username} with ID: {user_id}...");
let user = self
.users
.remove(&existing_user_id)
.ok_or(IggyError::ResourceNotFound(user_id.to_string()))?;
self.permissioner
.delete_permissions_for_user(existing_user_id);
let mut client_manager = self.client_manager.write().await;
client_manager
.delete_clients_for_user(existing_user_id)
.await?;
info!("Deleted user: {existing_username} with ID: {user_id}.");
self.metrics.decrement_users(1);
Ok(user)
}
pub async fn update_user(
&mut self,
session: &Session,
user_id: &Identifier,
username: Option<String>,
status: Option<UserStatus>,
) -> Result<&User, IggyError> {
self.ensure_authenticated(session)?;
self.permissioner.update_user(session.get_user_id())?;
if let Some(username) = username.clone() {
let username = text::to_lowercase_non_whitespace(&username);
let user = self.get_user(user_id)?;
let existing_user = self.get_user(&username.clone().try_into()?);
if existing_user.is_ok() && existing_user.unwrap().id != user.id {
error!("User: {username} already exists.");
return Err(IggyError::UserAlreadyExists);
}
}
let user = self.get_user_mut(user_id)?;
if let Some(username) = username {
user.username = username;
}
if let Some(status) = status {
user.status = status;
}
info!("Updated user: {} with ID: {}.", user.username, user.id);
Ok(user)
}
pub async fn update_permissions(
&mut self,
session: &Session,
user_id: &Identifier,
permissions: Option<Permissions>,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
{
self.permissioner
.update_permissions(session.get_user_id())?;
let user = self.get_user(user_id)?;
if user.is_root() {
error!("Cannot change the root user permissions.");
return Err(IggyError::CannotChangePermissions(user.id));
}
self.permissioner
.update_permissions_for_user(user.id, permissions.clone());
}
{
let user = self.get_user_mut(user_id)?;
user.permissions = permissions;
info!(
"Updated permissions for user: {} with ID: {user_id}.",
user.username
);
}
Ok(())
}
pub async fn change_password(
&mut self,
session: &Session,
user_id: &Identifier,
current_password: &str,
new_password: &str,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
{
let user = self.get_user(user_id)?;
let session_user_id = session.get_user_id();
if user.id != session_user_id {
self.permissioner.change_password(session_user_id)?;
}
}
let user = self.get_user_mut(user_id)?;
if !crypto::verify_password(current_password, &user.password) {
error!(
"Invalid current password for user: {} with ID: {user_id}.",
user.username
);
return Err(IggyError::InvalidCredentials);
}
user.password = crypto::hash_password(new_password);
info!(
"Changed password for user: {} with ID: {user_id}.",
user.username
);
Ok(())
}
pub async fn login_user(
&self,
username: &str,
password: &str,
session: Option<&Session>,
) -> Result<&User, IggyError> {
self.login_user_with_credentials(username, Some(password), session)
.await
}
pub async fn login_user_with_credentials(
&self,
username: &str,
password: Option<&str>,
session: Option<&Session>,
) -> Result<&User, IggyError> {
let user = match self.get_user(&username.try_into()?) {
Ok(user) => user,
Err(_) => {
error!("Cannot login user: {username} (not found).");
return Err(IggyError::InvalidCredentials);
}
};
info!("Logging in user: {username} with ID: {}...", user.id);
if !user.is_active() {
warn!("User: {username} with ID: {} is inactive.", user.id);
return Err(IggyError::UserInactive);
}
if let Some(password) = password {
if !crypto::verify_password(password, &user.password) {
warn!(
"Invalid password for user: {username} with ID: {}.",
user.id
);
return Err(IggyError::InvalidCredentials);
}
}
info!("Logged in user: {username} with ID: {}.", user.id);
if session.is_none() {
return Ok(user);
}
let session = session.unwrap();
if session.is_authenticated() {
warn!(
"User: {} with ID: {} was already authenticated, removing the previous session...",
user.username,
session.get_user_id()
);
self.logout_user(session).await?;
}
session.set_user_id(user.id);
let mut client_manager = self.client_manager.write().await;
client_manager
.set_user_id(session.client_id, user.id)
.await?;
Ok(user)
}
pub async fn logout_user(&self, session: &Session) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let user = self.get_user(&Identifier::numeric(session.get_user_id())?)?;
info!(
"Logging out user: {} with ID: {}...",
user.username, user.id
);
if session.client_id > 0 {
let mut client_manager = self.client_manager.write().await;
client_manager.clear_user_id(session.client_id).await?;
info!(
"Cleared user ID: {} for client: {}.",
user.id, session.client_id
);
}
info!("Logged out user: {} with ID: {}.", user.username, user.id);
Ok(())
}
}