blob: f3a2d5eb77a68699b52a02a8bdd814bae861d9e4 [file] [log] [blame]
use crate::configs::server::PersonalAccessTokenConfig;
use crate::configs::system::SystemConfig;
use crate::streaming::cache::memory_tracker::CacheMemoryTracker;
use crate::streaming::clients::client_manager::ClientManager;
use crate::streaming::diagnostics::metrics::Metrics;
use crate::streaming::persistence::persister::*;
use crate::streaming::session::Session;
use crate::streaming::storage::SystemStorage;
use crate::streaming::streams::stream::Stream;
use crate::streaming::users::permissioner::Permissioner;
use iggy::error::IggyError;
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use sled::Db;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use tokio::fs::{create_dir, remove_dir_all};
use tokio::sync::RwLock;
use tokio::time::Instant;
use tracing::{info, trace};
use keepcalm::{SharedMut, SharedReadLock, SharedWriteLock};
#[derive(Debug)]
pub struct SharedSystem {
system: SharedMut<System>,
}
impl SharedSystem {
pub fn new(system: System) -> SharedSystem {
SharedSystem {
system: SharedMut::new(system),
}
}
pub fn read(&self) -> SharedReadLock<System> {
self.system.read()
}
pub fn write(&self) -> SharedWriteLock<System> {
self.system.write()
}
}
impl Clone for SharedSystem {
fn clone(&self) -> Self {
SharedSystem {
system: self.system.clone(),
}
}
}
#[derive(Debug)]
pub struct System {
pub permissioner: Permissioner,
pub(crate) storage: Arc<SystemStorage>,
pub(crate) streams: HashMap<u32, Stream>,
pub(crate) streams_ids: HashMap<String, u32>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) client_manager: Arc<RwLock<ClientManager>>,
pub(crate) encryptor: Option<Box<dyn Encryptor>>,
pub(crate) metrics: Metrics,
pub(crate) db: Option<Arc<Db>>,
pub personal_access_token: PersonalAccessTokenConfig,
}
/// For each cache eviction, we want to remove more than the size we need.
/// This is done on purpose to avoid evicting messages on every write.
const CACHE_OVER_EVICTION_FACTOR: u64 = 5;
impl System {
pub fn new(
config: Arc<SystemConfig>,
db: Option<Arc<Db>>,
pat_config: PersonalAccessTokenConfig,
) -> System {
let db = match db {
Some(db) => db,
None => {
let db = sled::open(config.get_database_path());
if db.is_err() {
panic!("Cannot open database at: {}", config.get_database_path());
}
Arc::new(db.unwrap())
}
};
let persister: Arc<dyn Persister> = match config.partition.enforce_fsync {
true => Arc::new(FileWithSyncPersister {}),
false => Arc::new(FilePersister {}),
};
Self::create(
config,
SystemStorage::new(db.clone(), persister),
Some(db),
pat_config,
)
}
pub fn create(
config: Arc<SystemConfig>,
storage: SystemStorage,
db: Option<Arc<Db>>,
pat_config: PersonalAccessTokenConfig,
) -> System {
info!(
"Server-side encryption is {}.",
Self::map_toggle_str(config.encryption.enabled)
);
System {
encryptor: match config.encryption.enabled {
true => Some(Box::new(
Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(),
)),
false => None,
},
config,
streams: HashMap::new(),
streams_ids: HashMap::new(),
storage: Arc::new(storage),
client_manager: Arc::new(RwLock::new(ClientManager::default())),
permissioner: Permissioner::default(),
metrics: Metrics::init(),
db,
personal_access_token: pat_config,
}
}
pub async fn init(&mut self) -> Result<(), IggyError> {
let system_path = self.config.get_system_path();
if !Path::new(&system_path).exists() && create_dir(&system_path).await.is_err() {
return Err(IggyError::CannotCreateBaseDirectory(system_path));
}
let streams_path = self.config.get_streams_path();
if !Path::new(&streams_path).exists() && create_dir(&streams_path).await.is_err() {
return Err(IggyError::CannotCreateStreamsDirectory(streams_path));
}
let runtime_path = self.config.get_runtime_path();
if Path::new(&runtime_path).exists() && remove_dir_all(&runtime_path).await.is_err() {
return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
}
if create_dir(&runtime_path).await.is_err() {
return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path));
}
info!(
"Initializing system, data will be stored at: {}",
self.config.get_system_path()
);
let now = Instant::now();
self.load_version().await?;
self.load_users().await?;
self.load_streams().await?;
info!("Initialized system in {} ms.", now.elapsed().as_millis());
Ok(())
}
pub async fn shutdown(&mut self) -> Result<(), IggyError> {
self.persist_messages().await?;
Ok(())
}
pub async fn persist_messages(&self) -> Result<(), IggyError> {
trace!("Saving buffered messages on disk...");
for stream in self.streams.values() {
stream.persist_messages().await?;
}
Ok(())
}
pub fn ensure_authenticated(&self, session: &Session) -> Result<(), IggyError> {
match session.is_authenticated() {
true => Ok(()),
false => Err(IggyError::Unauthenticated),
}
}
fn map_toggle_str<'a>(enabled: bool) -> &'a str {
match enabled {
true => "enabled",
false => "disabled",
}
}
pub async fn clean_cache(&self, size_to_clean: u64) {
for stream in self.streams.values() {
for topic in stream.get_topics() {
for partition in topic.get_partitions().into_iter() {
tokio::task::spawn(async move {
let memory_tracker = CacheMemoryTracker::get_instance().unwrap();
let mut partition_guard = partition.write().await;
let cache = &mut partition_guard.cache.as_mut().unwrap();
let size_to_remove = (cache.current_size() as f64
/ memory_tracker.usage_bytes() as f64
* size_to_clean as f64)
.ceil() as u64;
cache.evict_by_size(size_to_remove * CACHE_OVER_EVICTION_FACTOR);
});
}
}
}
}
}