blob: ea0db9b9fb390a07fdf0f3ffca4cf99d611a443d [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::http::COMPONENT;
use crate::http::error::CustomError;
use crate::http::jwt::json_web_token::Identity;
use crate::http::mapper;
use crate::http::mapper::map_generated_access_token_to_identity_info;
use crate::http::shared::AppState;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
use crate::streaming::session::Session;
use crate::streaming::users::user::User;
use ::iggy_common::change_password::ChangePassword;
use ::iggy_common::create_user::CreateUser;
use ::iggy_common::delete_user::DeleteUser;
use ::iggy_common::login_user::LoginUser;
use ::iggy_common::update_permissions::UpdatePermissions;
use ::iggy_common::update_user::UpdateUser;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::routing::{delete, get, post, put};
use axum::{Extension, Json, Router, debug_handler};
use err_trail::ErrContext;
use iggy_common::Identifier;
use iggy_common::IdentityInfo;
use iggy_common::Validatable;
use iggy_common::{IggyError, UserInfo, UserInfoDetails};
use send_wrapper::SendWrapper;
use serde::Deserialize;
use std::sync::Arc;
use tracing::instrument;
pub fn router(state: Arc<AppState>) -> Router {
Router::new()
.route("/users", get(get_users).post(create_user))
.route(
"/users/{user_id}",
get(get_user).put(update_user).delete(delete_user),
)
.route("/users/{user_id}/permissions", put(update_permissions))
.route("/users/{user_id}/password", put(change_password))
.route("/users/login", post(login_user))
.route("/users/logout", delete(logout_user))
.route("/users/refresh-token", post(refresh_token))
.with_state(state)
}
#[debug_handler]
async fn get_user(
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
Path(user_id): Path<String>,
) -> Result<Json<UserInfoDetails>, CustomError> {
let identifier_user_id = Identifier::from_str_value(&user_id)?;
let Ok(user) = state.shard.shard().find_user(&identifier_user_id) else {
return Err(CustomError::ResourceNotFound);
};
let Some(user) = user else {
return Err(CustomError::ResourceNotFound);
};
if user.id != identity.user_id {
state
.shard
.shard()
.metadata
.perm_get_user(identity.user_id)?;
}
let user = mapper::map_user(&user);
Ok(Json(user))
}
#[debug_handler]
async fn get_users(
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
) -> Result<Json<Vec<UserInfo>>, CustomError> {
state
.shard
.shard()
.metadata
.perm_get_users(identity.user_id)?;
let users = state.shard.shard().get_users();
let user_refs: Vec<&User> = users.iter().collect();
let users = mapper::map_users(&user_refs);
Ok(Json(users))
}
#[debug_handler]
#[instrument(skip_all, name = "trace_create_user", fields(iggy_user_id = identity.user_id))]
async fn create_user(
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
Json(command): Json<CreateUser>,
) -> Result<Json<UserInfoDetails>, CustomError> {
command.validate()?;
let request = ShardRequest::control_plane(ShardRequestPayload::CreateUserRequest {
user_id: identity.user_id,
command,
});
match state.shard.send_to_control_plane(request).await? {
ShardResponse::CreateUserResponse(user) => {
let response = mapper::map_user(&user);
Ok(Json(response))
}
ShardResponse::ErrorResponse(err) => Err(err.into()),
_ => unreachable!("Expected CreateUserResponse"),
}
}
#[debug_handler]
#[instrument(skip_all, name = "trace_update_user", fields(iggy_user_id = identity.user_id, iggy_updated_user_id = user_id))]
async fn update_user(
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
Path(user_id): Path<String>,
Json(mut command): Json<UpdateUser>,
) -> Result<StatusCode, CustomError> {
command.user_id = Identifier::from_str_value(&user_id)?;
command.validate()?;
let request = ShardRequest::control_plane(ShardRequestPayload::UpdateUserRequest {
user_id: identity.user_id,
command,
});
match state.shard.send_to_control_plane(request).await? {
ShardResponse::UpdateUserResponse(_) => Ok(StatusCode::NO_CONTENT),
ShardResponse::ErrorResponse(err) => Err(err.into()),
_ => unreachable!("Expected UpdateUserResponse"),
}
}
#[debug_handler]
#[instrument(skip_all, name = "trace_update_permissions", fields(iggy_user_id = identity.user_id, iggy_updated_user_id = user_id))]
async fn update_permissions(
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
Path(user_id): Path<String>,
Json(mut command): Json<UpdatePermissions>,
) -> Result<StatusCode, CustomError> {
command.user_id = Identifier::from_str_value(&user_id)?;
command.validate()?;
let request = ShardRequest::control_plane(ShardRequestPayload::UpdatePermissionsRequest {
user_id: identity.user_id,
command,
});
match state.shard.send_to_control_plane(request).await? {
ShardResponse::UpdatePermissionsResponse => Ok(StatusCode::NO_CONTENT),
ShardResponse::ErrorResponse(err) => Err(err.into()),
_ => unreachable!("Expected UpdatePermissionsResponse"),
}
}
#[debug_handler]
#[instrument(skip_all, name = "trace_change_password", fields(iggy_user_id = identity.user_id, iggy_updated_user_id = user_id))]
async fn change_password(
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
Path(user_id): Path<String>,
Json(mut command): Json<ChangePassword>,
) -> Result<StatusCode, CustomError> {
command.user_id = Identifier::from_str_value(&user_id)?;
command.validate()?;
let request = ShardRequest::control_plane(ShardRequestPayload::ChangePasswordRequest {
user_id: identity.user_id,
command,
});
match state.shard.send_to_control_plane(request).await? {
ShardResponse::ChangePasswordResponse => Ok(StatusCode::NO_CONTENT),
ShardResponse::ErrorResponse(err) => Err(err.into()),
_ => unreachable!("Expected ChangePasswordResponse"),
}
}
#[debug_handler]
#[instrument(skip_all, name = "trace_delete_user", fields(iggy_user_id = identity.user_id, iggy_deleted_user_id = user_id))]
async fn delete_user(
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
Path(user_id): Path<String>,
) -> Result<StatusCode, CustomError> {
let user_id = Identifier::from_str_value(&user_id)?;
let command = DeleteUser { user_id };
let request = ShardRequest::control_plane(ShardRequestPayload::DeleteUserRequest {
user_id: identity.user_id,
command,
});
match state.shard.send_to_control_plane(request).await? {
ShardResponse::DeleteUserResponse(_) => Ok(StatusCode::NO_CONTENT),
ShardResponse::ErrorResponse(err) => Err(err.into()),
_ => unreachable!("Expected DeleteUserResponse"),
}
}
#[debug_handler]
#[instrument(skip_all, name = "trace_login_user")]
async fn login_user(
State(state): State<Arc<AppState>>,
Json(command): Json<LoginUser>,
) -> Result<Json<IdentityInfo>, CustomError> {
command.validate()?;
let user = state
.shard
.shard()
.login_user(&command.username, &command.password, None)
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to login, username: {}",
command.username
)
})?;
let tokens = state.jwt_manager.generate(user.id)?;
Ok(Json(map_generated_access_token_to_identity_info(tokens)))
}
#[debug_handler]
#[instrument(skip_all, name = "trace_logout_user", fields(iggy_user_id = identity.user_id))]
async fn logout_user(
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
) -> Result<StatusCode, CustomError> {
let session = Session::stateless(identity.user_id, identity.ip_address);
state
.shard
.shard()
.logout_user(&session)
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to logout, user ID: {}",
identity.user_id
)
})?;
{
let revoke_token_future = SendWrapper::new(
state
.jwt_manager
.revoke_token(&identity.token_id, identity.token_expiry),
);
revoke_token_future.await.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to revoke token, user ID: {}",
identity.user_id
)
})?;
}
Ok(StatusCode::NO_CONTENT)
}
#[debug_handler]
async fn refresh_token(
State(state): State<Arc<AppState>>,
Json(command): Json<RefreshToken>,
) -> Result<Json<IdentityInfo>, CustomError> {
let token = {
let refresh_token_future =
SendWrapper::new(state.jwt_manager.refresh_token(&command.token));
refresh_token_future
.await
.error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to refresh token"))?
};
Ok(Json(map_generated_access_token_to_identity_info(token)))
}
#[derive(Debug, Deserialize)]
struct RefreshToken {
token: String,
}