blob: 306171c4917a4f8f299089b5f64a4983dd484652 [file] [log] [blame]
extern crate sysinfo;
use super::server::{
ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig,
StateMaintenanceConfig,
};
use super::system::CompressionConfig;
use crate::archiver::ArchiverKind;
use crate::configs::server::{PersonalAccessTokenConfig, ServerConfig};
use crate::configs::system::{CacheConfig, SegmentConfig};
use crate::server_error::ServerError;
use crate::streaming::segments::segment;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use iggy::validatable::Validatable;
use sysinfo::System;
use tracing::{info, warn};
impl Validatable<ServerError> for ServerConfig {
fn validate(&self) -> Result<(), ServerError> {
self.data_maintenance.validate()?;
self.personal_access_token.validate()?;
self.system.segment.validate()?;
self.system.cache.validate()?;
self.system.compression.validate()?;
let topic_size = match self.system.topic.max_size {
MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
MaxTopicSize::Unlimited => Ok(u64::MAX),
MaxTopicSize::ServerDefault => Err(ServerError::InvalidConfiguration(
"Max topic size cannot be set to server default.".into(),
)),
}?;
if let IggyExpiry::ServerDefault = self.system.segment.message_expiry {
return Err(ServerError::InvalidConfiguration(
"Message expiry cannot be set to server default.".into(),
));
}
if self.http.enabled {
if let IggyExpiry::ServerDefault = self.http.jwt.access_token_expiry {
return Err(ServerError::InvalidConfiguration(
"Access token expiry cannot be set to server default.".into(),
));
}
}
if topic_size < self.system.segment.size.as_bytes_u64() {
return Err(ServerError::InvalidConfiguration(format!(
"Max topic size cannot be lower than segment size. Max topic size: {}, segment size: {}.",
topic_size,
self.system.segment.size
)));
}
Ok(())
}
}
impl Validatable<ServerError> for CompressionConfig {
fn validate(&self) -> Result<(), ServerError> {
let compression_alg = &self.default_algorithm;
if *compression_alg != CompressionAlgorithm::None {
// TODO(numinex): Change this message once server side compression is fully developed.
warn!(
"Server started with server-side compression enabled, using algorithm: {}, this feature is not implemented yet!",
compression_alg
);
}
Ok(())
}
}
impl Validatable<ServerError> for CacheConfig {
fn validate(&self) -> Result<(), ServerError> {
let limit_bytes = self.size.clone().into();
let mut sys = System::new_all();
sys.refresh_all();
sys.refresh_processes();
let total_memory = sys.total_memory();
let free_memory = sys.free_memory();
let cache_percentage = (limit_bytes as f64 / total_memory as f64) * 100.0;
let pretty_cache_limit = IggyByteSize::from(limit_bytes).as_human_string();
let pretty_total_memory = IggyByteSize::from(total_memory).as_human_string();
let pretty_free_memory = IggyByteSize::from(free_memory).as_human_string();
if limit_bytes > total_memory {
return Err(ServerError::CacheConfigValidationFailure(format!(
"Requested cache size exceeds 100% of total memory. Requested: {} ({:.2}% of total memory: {}).",
pretty_cache_limit, cache_percentage, pretty_total_memory
)));
}
if limit_bytes > (total_memory as f64 * 0.75) as u64 {
warn!(
"Cache configuration -> cache size exceeds 75% of total memory. Set to: {} ({:.2}% of total memory: {}).",
pretty_cache_limit, cache_percentage, pretty_total_memory
);
}
if self.enabled {
info!(
"Cache configuration -> cache size set to {} ({:.2}% of total memory: {}, free memory: {}).",
pretty_cache_limit, cache_percentage, pretty_total_memory, pretty_free_memory
);
} else {
info!("Cache configuration -> cache is disabled.");
}
Ok(())
}
}
impl Validatable<ServerError> for SegmentConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.size.as_bytes_u64() as u32 > segment::MAX_SIZE_BYTES {
return Err(ServerError::InvalidConfiguration(format!(
"Segment size cannot be greater than: {} bytes.",
segment::MAX_SIZE_BYTES
)));
}
Ok(())
}
}
impl Validatable<ServerError> for MessageSaverConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.enabled && self.interval.is_zero() {
return Err(ServerError::InvalidConfiguration(
"Message saver interval size cannot be zero, it must be greater than 0.".into(),
));
}
Ok(())
}
}
impl Validatable<ServerError> for DataMaintenanceConfig {
fn validate(&self) -> Result<(), ServerError> {
self.archiver.validate()?;
self.messages.validate()?;
self.state.validate()?;
Ok(())
}
}
impl Validatable<ServerError> for ArchiverConfig {
fn validate(&self) -> Result<(), ServerError> {
if !self.enabled {
return Ok(());
}
return match self.kind {
ArchiverKind::Disk => {
if self.disk.is_none() {
return Err(ServerError::InvalidConfiguration(
"Disk archiver configuration is missing.".into(),
));
}
let disk = self.disk.as_ref().unwrap();
if disk.path.is_empty() {
return Err(ServerError::InvalidConfiguration(
"Disk archiver path cannot be empty.".into(),
));
}
Ok(())
}
ArchiverKind::S3 => {
if self.s3.is_none() {
return Err(ServerError::InvalidConfiguration(
"S3 archiver configuration is missing.".into(),
));
}
let s3 = self.s3.as_ref().unwrap();
if s3.key_id.is_empty() {
return Err(ServerError::InvalidConfiguration(
"S3 archiver key id cannot be empty.".into(),
));
}
if s3.key_secret.is_empty() {
return Err(ServerError::InvalidConfiguration(
"S3 archiver key secret cannot be empty.".into(),
));
}
if s3.endpoint.is_none() && s3.region.is_none() {
return Err(ServerError::InvalidConfiguration(
"S3 archiver endpoint or region must be set.".into(),
));
}
if s3.endpoint.as_deref().unwrap_or_default().is_empty()
&& s3.region.as_deref().unwrap_or_default().is_empty()
{
return Err(ServerError::InvalidConfiguration(
"S3 archiver region or endpoint cannot be empty.".into(),
));
}
if s3.bucket.is_empty() {
return Err(ServerError::InvalidConfiguration(
"S3 archiver bucket cannot be empty.".into(),
));
}
Ok(())
}
};
}
}
impl Validatable<ServerError> for MessagesMaintenanceConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.archiver_enabled && self.interval.is_zero() {
return Err(ServerError::InvalidConfiguration(
"Message maintenance interval size cannot be zero, it must be greater than 0."
.into(),
));
}
Ok(())
}
}
impl Validatable<ServerError> for StateMaintenanceConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.archiver_enabled && self.interval.is_zero() {
return Err(ServerError::InvalidConfiguration(
"State maintenance interval size cannot be zero, it must be greater than 0.".into(),
));
}
Ok(())
}
}
impl Validatable<ServerError> for PersonalAccessTokenConfig {
fn validate(&self) -> Result<(), ServerError> {
if self.max_tokens_per_user == 0 {
return Err(ServerError::InvalidConfiguration(
"Max tokens per user cannot be zero, it must be greater than 0.".into(),
));
}
if self.cleaner.enabled && self.cleaner.interval.is_zero() {
return Err(ServerError::InvalidConfiguration(
"Personal access token cleaner interval cannot be zero, it must be greater than 0."
.into(),
));
}
Ok(())
}
}