| /* Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| use crate::archiver::{ArchiverKind, ArchiverKindType}; |
| use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig}; |
| use crate::configs::system::SystemConfig; |
| use crate::map_toggle_str; |
| use crate::state::file::FileState; |
| use crate::state::system::SystemState; |
| use crate::state::StateKind; |
| use crate::streaming::cache::memory_tracker::CacheMemoryTracker; |
| use crate::streaming::clients::client_manager::ClientManager; |
| use crate::streaming::diagnostics::metrics::Metrics; |
| use crate::streaming::persistence::persister::*; |
| use crate::streaming::session::Session; |
| use crate::streaming::storage::SystemStorage; |
| use crate::streaming::streams::stream::Stream; |
| use crate::streaming::systems::COMPONENT; |
| use crate::streaming::users::permissioner::Permissioner; |
| use crate::streaming::users::user::User; |
| use crate::versioning::SemanticVersion; |
| use ahash::AHashMap; |
| use error_set::ErrContext; |
| use iggy::error::IggyError; |
| use iggy::locking::IggySharedMut; |
| use iggy::locking::IggySharedMutFn; |
| use iggy::models::user_info::UserId; |
| use iggy::utils::byte_size::IggyByteSize; |
| use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind}; |
| use std::path::Path; |
| use std::sync::Arc; |
| use tokio::fs::{create_dir_all, remove_dir_all}; |
| use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; |
| use tokio::time::Instant; |
| use tracing::{error, info, instrument, trace}; |
| |
| #[derive(Debug)] |
| pub struct SharedSystem { |
| system: Arc<RwLock<System>>, |
| } |
| |
| impl SharedSystem { |
| pub fn new(system: System) -> SharedSystem { |
| SharedSystem { |
| system: Arc::new(RwLock::new(system)), |
| } |
| } |
| |
| pub async fn read(&self) -> RwLockReadGuard<System> { |
| self.system.read().await |
| } |
| |
| pub async fn write(&self) -> RwLockWriteGuard<System> { |
| self.system.write().await |
| } |
| } |
| |
| impl Clone for SharedSystem { |
| fn clone(&self) -> Self { |
| SharedSystem { |
| system: self.system.clone(), |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| pub struct System { |
| pub permissioner: Permissioner, |
| pub(crate) storage: Arc<SystemStorage>, |
| pub(crate) streams: AHashMap<u32, Stream>, |
| pub(crate) streams_ids: AHashMap<String, u32>, |
| pub(crate) users: AHashMap<UserId, User>, |
| pub(crate) config: Arc<SystemConfig>, |
| pub(crate) client_manager: IggySharedMut<ClientManager>, |
| pub(crate) encryptor: Option<Arc<EncryptorKind>>, |
| pub(crate) metrics: Metrics, |
| pub(crate) state: Arc<StateKind>, |
| pub(crate) archiver: Option<Arc<ArchiverKind>>, |
| pub personal_access_token: PersonalAccessTokenConfig, |
| } |
| |
| /// For each cache eviction, we want to remove more than the size we need. |
| /// This is done on purpose to avoid evicting messages on every write. |
| const CACHE_OVER_EVICTION_FACTOR: u64 = 5; |
| |
| impl System { |
| pub fn new( |
| config: Arc<SystemConfig>, |
| data_maintenance_config: DataMaintenanceConfig, |
| pat_config: PersonalAccessTokenConfig, |
| ) -> System { |
| let version = SemanticVersion::current().expect("Invalid version"); |
| info!( |
| "Server-side encryption is {}.", |
| map_toggle_str(config.encryption.enabled) |
| ); |
| |
| let encryptor: Option<Arc<EncryptorKind>> = match config.encryption.enabled { |
| true => Some(Arc::new(EncryptorKind::Aes256Gcm( |
| Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(), |
| ))), |
| false => None, |
| }; |
| |
| let state_persister = Self::resolve_persister(config.state.enforce_fsync); |
| let partition_persister = Self::resolve_persister(config.partition.enforce_fsync); |
| |
| let state = Arc::new(StateKind::File(FileState::new( |
| &config.get_state_log_path(), |
| &version, |
| state_persister, |
| encryptor.clone(), |
| ))); |
| Self::create( |
| config.clone(), |
| SystemStorage::new(config, partition_persister), |
| state, |
| encryptor, |
| data_maintenance_config, |
| pat_config, |
| ) |
| } |
| |
| fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> { |
| match enforce_fsync { |
| true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)), |
| false => Arc::new(PersisterKind::File(FilePersister)), |
| } |
| } |
| |
| pub fn create( |
| system_config: Arc<SystemConfig>, |
| storage: SystemStorage, |
| state: Arc<StateKind>, |
| encryptor: Option<Arc<EncryptorKind>>, |
| data_maintenance_config: DataMaintenanceConfig, |
| pat_config: PersonalAccessTokenConfig, |
| ) -> System { |
| let archiver_config = data_maintenance_config.archiver; |
| let archiver: Option<Arc<ArchiverKind>> = if archiver_config.enabled { |
| info!("Archiving is enabled, kind: {}", archiver_config.kind); |
| match archiver_config.kind { |
| ArchiverKindType::Disk => Some(Arc::new(ArchiverKind::get_disk_archiver( |
| archiver_config |
| .disk |
| .clone() |
| .expect("Disk archiver config is missing"), |
| ))), |
| ArchiverKindType::S3 => Some(Arc::new( |
| ArchiverKind::get_s3_archiver( |
| archiver_config |
| .s3 |
| .clone() |
| .expect("S3 archiver config is missing"), |
| ) |
| .expect("Failed to create S3 archiver"), |
| )), |
| } |
| } else { |
| info!("Archiving is disabled."); |
| None |
| }; |
| |
| System { |
| config: system_config, |
| streams: AHashMap::new(), |
| streams_ids: AHashMap::new(), |
| storage: Arc::new(storage), |
| encryptor, |
| client_manager: IggySharedMut::new(ClientManager::default()), |
| permissioner: Permissioner::default(), |
| metrics: Metrics::init(), |
| users: AHashMap::new(), |
| state, |
| personal_access_token: pat_config, |
| archiver, |
| } |
| } |
| |
| #[instrument(skip_all, name = "trace_system_init")] |
| pub async fn init(&mut self) -> Result<(), IggyError> { |
| let system_path = self.config.get_system_path(); |
| if !Path::new(&system_path).exists() && create_dir_all(&system_path).await.is_err() { |
| return Err(IggyError::CannotCreateBaseDirectory(system_path)); |
| } |
| |
| let state_path = self.config.get_state_path(); |
| if !Path::new(&state_path).exists() && create_dir_all(&state_path).await.is_err() { |
| return Err(IggyError::CannotCreateStateDirectory(state_path)); |
| } |
| |
| let streams_path = self.config.get_streams_path(); |
| if !Path::new(&streams_path).exists() && create_dir_all(&streams_path).await.is_err() { |
| return Err(IggyError::CannotCreateStreamsDirectory(streams_path)); |
| } |
| |
| let runtime_path = self.config.get_runtime_path(); |
| if Path::new(&runtime_path).exists() && remove_dir_all(&runtime_path).await.is_err() { |
| return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path)); |
| } |
| |
| if create_dir_all(&runtime_path).await.is_err() { |
| return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path)); |
| } |
| |
| info!( |
| "Initializing system, data will be stored at: {}", |
| self.config.get_system_path() |
| ); |
| |
| let state_entries = self.state.init().await.with_error_context(|error| { |
| format!("{COMPONENT} (error: {error}) - failed to initialize state entries") |
| })?; |
| let system_state = SystemState::init(state_entries) |
| .await |
| .with_error_context(|error| { |
| format!("{COMPONENT} (error: {error}) - failed to initialize system state") |
| })?; |
| let now = Instant::now(); |
| self.load_version().await.with_error_context(|error| { |
| format!("{COMPONENT} (error: {error}) - failed to load version") |
| })?; |
| self.load_users(system_state.users.into_values().collect()) |
| .await |
| .with_error_context(|error| { |
| format!("{COMPONENT} (error: {error}) - failed to load users") |
| })?; |
| self.load_streams(system_state.streams.into_values().collect()) |
| .await |
| .with_error_context(|error| { |
| format!("{COMPONENT} (error: {error}) - failed to load streams") |
| })?; |
| if let Some(archiver) = self.archiver.as_ref() { |
| archiver |
| .init() |
| .await |
| .expect("Failed to initialize archiver"); |
| } |
| info!("Initialized system in {} ms.", now.elapsed().as_millis()); |
| Ok(()) |
| } |
| |
| #[instrument(skip_all, name = "trace_shutdown")] |
| pub async fn shutdown(&mut self) -> Result<(), IggyError> { |
| self.persist_messages().await?; |
| Ok(()) |
| } |
| |
| #[instrument(skip_all, name = "trace_persist_messages")] |
| pub async fn persist_messages(&self) -> Result<usize, IggyError> { |
| trace!("Saving buffered messages on disk..."); |
| let mut saved_messages_number = 0; |
| for stream in self.streams.values() { |
| saved_messages_number += stream.persist_messages().await?; |
| } |
| |
| Ok(saved_messages_number) |
| } |
| |
| pub fn ensure_authenticated(&self, session: &Session) -> Result<(), IggyError> { |
| if !session.is_active() { |
| error!("{COMPONENT} - session is inactive, session: {session}"); |
| return Err(IggyError::StaleClient); |
| } |
| |
| if session.is_authenticated() { |
| Ok(()) |
| } else { |
| error!("{COMPONENT} - unauthenticated access attempt, session: {session}"); |
| Err(IggyError::Unauthenticated) |
| } |
| } |
| |
| pub async fn clean_cache(&self, size_to_clean: IggyByteSize) { |
| for stream in self.streams.values() { |
| for topic in stream.get_topics() { |
| for partition in topic.get_partitions().into_iter() { |
| tokio::task::spawn(async move { |
| let memory_tracker = CacheMemoryTracker::get_instance().unwrap(); |
| let mut partition_guard = partition.write().await; |
| let cache = &mut partition_guard.cache.as_mut().unwrap(); |
| let size_to_remove = (cache.current_size().as_bytes_u64() as f64 |
| / memory_tracker.usage_bytes().as_bytes_u64() as f64 |
| * size_to_clean.as_bytes_u64() as f64) |
| .ceil() as u64; |
| cache.evict_by_size(size_to_remove * CACHE_OVER_EVICTION_FACTOR); |
| }); |
| } |
| } |
| } |
| } |
| } |