blob: e79643ce7f3110446c487ab4235157f03177129e [file] [log] [blame]
extern crate sysinfo;
use super::server::{MessageCleanerConfig, MessageSaverConfig};
use super::system::CompressionConfig;
use crate::configs::server::{PersonalAccessTokenConfig, ServerConfig};
use crate::configs::system::{CacheConfig, RetentionPolicyConfig, SegmentConfig};
use crate::server_error::ServerError;
use crate::streaming::segments::segment;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::utils::byte_size::IggyByteSize;
use iggy::validatable::Validatable;
use sysinfo::System;
use tracing::{error, info, warn};
impl Validatable<ServerError> for ServerConfig {
fn validate(&self) -> Result<(), ServerError> {
self.system.segment.validate()?;
self.system.cache.validate()?;
self.system.retention_policy.validate()?;
self.system.compression.validate()?;
self.personal_access_token.validate()?;
Ok(())
}
}
impl Validatable<ServerError> for CompressionConfig {
fn validate(&self) -> Result<(), ServerError> {
let compression_alg = &self.default_algorithm;
if *compression_alg != CompressionAlgorithm::None {
// TODO(numinex): Change this message once server side compression is fully developed.
warn!(
"Server started with server-side compression enabled, using algorithm: {}, this feature is not implemented yet!",
compression_alg
);
}
Ok(())
}
}
impl Validatable<ServerError> for CacheConfig {
fn validate(&self) -> Result<(), ServerError> {
let limit_bytes = self.size.clone().into();
let mut sys = System::new_all();
sys.refresh_all();
sys.refresh_processes();
let total_memory = sys.total_memory();
let free_memory = sys.free_memory();
let cache_percentage = (limit_bytes as f64 / total_memory as f64) * 100.0;
let pretty_cache_limit = IggyByteSize::from(limit_bytes).as_human_string();
let pretty_total_memory = IggyByteSize::from(total_memory).as_human_string();
let pretty_free_memory = IggyByteSize::from(free_memory).as_human_string();
if limit_bytes > total_memory {
return Err(ServerError::CacheConfigValidationFailure(format!(
"Requested cache size exceeds 100% of total memory. Requested: {} ({:.2}% of total memory: {}).",
pretty_cache_limit, cache_percentage, pretty_total_memory
)));
}
if limit_bytes > (total_memory as f64 * 0.75) as u64 {
warn!(
"Cache configuration -> cache size exceeds 75% of total memory. Set to: {} ({:.2}% of total memory: {}).",
pretty_cache_limit, cache_percentage, pretty_total_memory
);
}
info!(
"Cache configuration -> cache size set to {} ({:.2}% of total memory: {}, free memory: {}).",
pretty_cache_limit, cache_percentage, pretty_total_memory, pretty_free_memory
);
Ok(())
}
}
impl Validatable<ServerError> for RetentionPolicyConfig {
fn validate(&self) -> Result<(), ServerError> {
// TODO(hubcio): Change this message once topic size based retention policy is fully developed.
if self.max_topic_size.as_bytes_u64() > 0 {
warn!("Retention policy max_topic_size is not implemented yet!");
}
Ok(())
}
}
impl Validatable<ServerError> for SegmentConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.size.as_bytes_u64() as u32 > segment::MAX_SIZE_BYTES {
error!(
"Segment configuration -> size cannot be greater than: {} bytes.",
segment::MAX_SIZE_BYTES
);
return Err(ServerError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ServerError> for MessageSaverConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.enabled && self.interval.is_zero() {
error!("Message saver interval size cannot be zero, it must be greater than 0.");
return Err(ServerError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ServerError> for MessageCleanerConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.enabled && self.interval.is_zero() {
error!("Message cleaner interval size cannot be zero, it must be greater than 0.");
return Err(ServerError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ServerError> for PersonalAccessTokenConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.max_tokens_per_user == 0 {
error!("Max tokens per user cannot be zero, it must be greater than 0.");
return Err(ServerError::InvalidConfiguration);
}
if self.cleaner.enabled && self.cleaner.interval.is_zero() {
error!(
"Personal access token cleaner interval cannot be zero, it must be greater than 0."
);
return Err(ServerError::InvalidConfiguration);
}
Ok(())
}
}