blob: ca5638bd3926f05b22568b1fc1a0e8647c0522be [file] [log] [blame]
use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig};
use crate::configs::system::SystemConfig;
use crate::streaming::cache::memory_tracker::CacheMemoryTracker;
use crate::streaming::clients::client_manager::ClientManager;
use crate::streaming::diagnostics::metrics::Metrics;
use crate::streaming::persistence::persister::*;
use crate::streaming::session::Session;
use crate::streaming::storage::SystemStorage;
use crate::streaming::streams::stream::Stream;
use crate::streaming::users::permissioner::Permissioner;
use iggy::error::IggyError;
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use tokio::fs::{create_dir, remove_dir_all};
use tokio::time::Instant;
use tracing::{info, trace};
use crate::archiver::disk::DiskArchiver;
use crate::archiver::s3::S3Archiver;
use crate::archiver::{Archiver, ArchiverKind};
use crate::state::file::FileState;
use crate::state::system::SystemState;
use crate::state::State;
use crate::streaming::users::user::User;
use crate::versioning::SemanticVersion;
use crate::{compat, map_toggle_str};
use iggy::locking::IggySharedMut;
use iggy::locking::IggySharedMutFn;
use iggy::models::user_info::UserId;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
#[derive(Debug)]
pub struct SharedSystem {
system: Arc<RwLock<System>>,
}
impl SharedSystem {
pub fn new(system: System) -> SharedSystem {
SharedSystem {
system: Arc::new(RwLock::new(system)),
}
}
pub async fn read(&self) -> RwLockReadGuard<System> {
self.system.read().await
}
pub async fn write(&self) -> RwLockWriteGuard<System> {
self.system.write().await
}
}
impl Clone for SharedSystem {
fn clone(&self) -> Self {
SharedSystem {
system: self.system.clone(),
}
}
}
#[derive(Debug)]
pub struct System {
pub permissioner: Permissioner,
pub(crate) storage: Arc<SystemStorage>,
pub(crate) streams: HashMap<u32, Stream>,
pub(crate) streams_ids: HashMap<String, u32>,
pub(crate) users: HashMap<UserId, User>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) client_manager: IggySharedMut<ClientManager>,
pub(crate) encryptor: Option<Arc<dyn Encryptor>>,
pub(crate) metrics: Metrics,
pub(crate) state: Arc<dyn State>,
pub(crate) archiver: Option<Arc<dyn Archiver>>,
pub personal_access_token: PersonalAccessTokenConfig,
}
/// For each cache eviction, we want to remove more than the size we need.
/// This is done on purpose to avoid evicting messages on every write.
const CACHE_OVER_EVICTION_FACTOR: u64 = 5;
impl System {
pub fn new(
config: Arc<SystemConfig>,
data_maintenance_config: DataMaintenanceConfig,
pat_config: PersonalAccessTokenConfig,
) -> System {
let version = SemanticVersion::current().expect("Invalid version");
info!(
"Server-side encryption is {}.",
map_toggle_str(config.encryption.enabled)
);
let encryptor: Option<Arc<dyn Encryptor>> = match config.encryption.enabled {
true => Some(Arc::new(
Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(),
)),
false => None,
};
let state_persister = Self::resolve_persister(config.state.enforce_fsync);
let partition_persister = Self::resolve_persister(config.partition.enforce_fsync);
let state = Arc::new(FileState::new(
&config.get_state_log_path(),
&version,
state_persister,
encryptor.clone(),
));
Self::create(
config.clone(),
SystemStorage::new(config, partition_persister),
state,
encryptor,
data_maintenance_config,
pat_config,
)
}
fn resolve_persister(enforce_fsync: bool) -> Arc<dyn Persister> {
match enforce_fsync {
true => Arc::new(FileWithSyncPersister),
false => Arc::new(FilePersister),
}
}
pub fn create(
system_config: Arc<SystemConfig>,
storage: SystemStorage,
state: Arc<dyn State>,
encryptor: Option<Arc<dyn Encryptor>>,
data_maintenance_config: DataMaintenanceConfig,
pat_config: PersonalAccessTokenConfig,
) -> System {
let archiver_config = data_maintenance_config.archiver;
let archiver: Option<Arc<dyn Archiver>> = if archiver_config.enabled {
info!("Archiving is enabled, kind: {}", archiver_config.kind);
match archiver_config.kind {
ArchiverKind::Disk => Some(Arc::new(DiskArchiver::new(
archiver_config
.disk
.clone()
.expect("Disk archiver config is missing"),
))),
ArchiverKind::S3 => Some(Arc::new(
S3Archiver::new(
archiver_config
.s3
.clone()
.expect("S3 archiver config is missing"),
)
.expect("Failed to create S3 archiver"),
)),
}
} else {
info!("Archiving is disabled.");
None
};
System {
config: system_config,
streams: HashMap::new(),
streams_ids: HashMap::new(),
storage: Arc::new(storage),
encryptor,
client_manager: IggySharedMut::new(ClientManager::default()),
permissioner: Permissioner::default(),
metrics: Metrics::init(),
users: HashMap::new(),
state,
personal_access_token: pat_config,
archiver,
}
}
pub async fn init(&mut self) -> Result<(), IggyError> {
let system_path = self.config.get_system_path();
if !Path::new(&system_path).exists() && create_dir(&system_path).await.is_err() {
return Err(IggyError::CannotCreateBaseDirectory(system_path));
}
let state_path = self.config.get_state_path();
if !Path::new(&state_path).exists() && create_dir(&state_path).await.is_err() {
return Err(IggyError::CannotCreateStateDirectory(state_path));
}
let streams_path = self.config.get_streams_path();
if !Path::new(&streams_path).exists() && create_dir(&streams_path).await.is_err() {
return Err(IggyError::CannotCreateStreamsDirectory(streams_path));
}
let runtime_path = self.config.get_runtime_path();
if Path::new(&runtime_path).exists() && remove_dir_all(&runtime_path).await.is_err() {
return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
}
if create_dir(&runtime_path).await.is_err() {
return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path));
}
info!(
"Initializing system, data will be stored at: {}",
self.config.get_system_path()
);
if self.config.database.is_some() {
compat::storage_conversion::init(
self.config.clone(),
self.state.clone(),
self.storage.clone(),
)
.await?;
}
let state_entries = self.state.init().await?;
let system_state = SystemState::init(state_entries).await?;
let now = Instant::now();
self.load_version().await?;
self.load_users(system_state.users.into_values().collect())
.await?;
self.load_streams(system_state.streams.into_values().collect())
.await?;
if let Some(archiver) = self.archiver.as_ref() {
archiver
.init()
.await
.expect("Failed to initialize archiver");
}
info!("Initialized system in {} ms.", now.elapsed().as_millis());
Ok(())
}
pub async fn shutdown(&mut self) -> Result<(), IggyError> {
self.persist_messages().await?;
Ok(())
}
pub async fn persist_messages(&self) -> Result<usize, IggyError> {
trace!("Saving buffered messages on disk...");
let mut saved_messages_number = 0;
for stream in self.streams.values() {
saved_messages_number += stream.persist_messages().await?;
}
Ok(saved_messages_number)
}
pub fn ensure_authenticated(&self, session: &Session) -> Result<(), IggyError> {
match session.is_authenticated() {
true => Ok(()),
false => Err(IggyError::Unauthenticated),
}
}
pub async fn clean_cache(&self, size_to_clean: u64) {
for stream in self.streams.values() {
for topic in stream.get_topics() {
for partition in topic.get_partitions().into_iter() {
tokio::task::spawn(async move {
let memory_tracker = CacheMemoryTracker::get_instance().unwrap();
let mut partition_guard = partition.write().await;
let cache = &mut partition_guard.cache.as_mut().unwrap();
let size_to_remove = (cache.current_size() as f64
/ memory_tracker.usage_bytes() as f64
* size_to_clean as f64)
.ceil() as u64;
cache.evict_by_size(size_to_remove * CACHE_OVER_EVICTION_FACTOR);
});
}
}
}
}
}