blob: ffe19df45b5c19b52191c875c6025ac0264e762d [file]
extern crate sysinfo;
use super::server::{
ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig,
StateMaintenanceConfig, TelemetryConfig,
};
use super::system::CompressionConfig;
use crate::archiver::ArchiverKind;
use crate::configs::COMPONENT;
use crate::configs::server::{PersonalAccessTokenConfig, ServerConfig};
use crate::configs::system::{CacheConfig, SegmentConfig};
use crate::server_error::ConfigError;
use crate::streaming::segments::segment;
use error_set::ResultContext;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use iggy::validatable::Validatable;
use sysinfo::{Pid, ProcessesToUpdate, System};
use tracing::{info, warn};
impl Validatable<ConfigError> for ServerConfig {
fn validate(&self) -> Result<(), ConfigError> {
self.data_maintenance
.validate()
.with_error(|_| format!("{COMPONENT} - failed to validate data maintenance config"))?;
self.personal_access_token
.validate()
.with_error(|_| format!("CONFIGF - failed to validate personal access token config"))?;
self.system
.segment
.validate()
.with_error(|_| format!("CONFIGF - failed to validate segment config"))?;
self.system
.cache
.validate()
.with_error(|_| format!("CONFIGF - failed to validate cache config"))?;
self.system
.compression
.validate()
.with_error(|_| format!("{COMPONENT} - failed to validate compression config"))?;
self.telemetry
.validate()
.with_error(|_| format!("{COMPONENT} - failed to validate telemetry config"))?;
let topic_size = match self.system.topic.max_size {
MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
MaxTopicSize::Unlimited => Ok(u64::MAX),
MaxTopicSize::ServerDefault => Err(ConfigError::InvalidConfiguration),
}?;
if let IggyExpiry::ServerDefault = self.system.segment.message_expiry {
return Err(ConfigError::InvalidConfiguration);
}
if self.http.enabled {
if let IggyExpiry::ServerDefault = self.http.jwt.access_token_expiry {
return Err(ConfigError::InvalidConfiguration);
}
}
if topic_size < self.system.segment.size.as_bytes_u64() {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ConfigError> for CompressionConfig {
fn validate(&self) -> Result<(), ConfigError> {
let compression_alg = &self.default_algorithm;
if *compression_alg != CompressionAlgorithm::None {
// TODO(numinex): Change this message once server side compression is fully developed.
warn!(
"Server started with server-side compression enabled, using algorithm: {}, this feature is not implemented yet!",
compression_alg
);
}
Ok(())
}
}
impl Validatable<ConfigError> for TelemetryConfig {
fn validate(&self) -> Result<(), ConfigError> {
if !self.enabled {
return Ok(());
}
if self.service_name.trim().is_empty() {
return Err(ConfigError::InvalidConfiguration);
}
if self.logs.endpoint.is_empty() {
return Err(ConfigError::InvalidConfiguration);
}
if self.traces.endpoint.is_empty() {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ConfigError> for CacheConfig {
fn validate(&self) -> Result<(), ConfigError> {
let limit_bytes = self.size.clone().into();
let mut sys = System::new_all();
sys.refresh_all();
sys.refresh_processes(
ProcessesToUpdate::Some(&[Pid::from_u32(std::process::id())]),
true,
);
let total_memory = sys.total_memory();
let free_memory = sys.free_memory();
let cache_percentage = (limit_bytes as f64 / total_memory as f64) * 100.0;
let pretty_cache_limit = IggyByteSize::from(limit_bytes).as_human_string();
let pretty_total_memory = IggyByteSize::from(total_memory).as_human_string();
let pretty_free_memory = IggyByteSize::from(free_memory).as_human_string();
if limit_bytes > total_memory {
return Err(ConfigError::CacheConfigValidationFailure);
}
if limit_bytes > (total_memory as f64 * 0.75) as u64 {
warn!(
"Cache configuration -> cache size exceeds 75% of total memory. Set to: {} ({:.2}% of total memory: {}).",
pretty_cache_limit, cache_percentage, pretty_total_memory
);
}
if self.enabled {
info!(
"Cache configuration -> cache size set to {} ({:.2}% of total memory: {}, free memory: {}).",
pretty_cache_limit, cache_percentage, pretty_total_memory, pretty_free_memory
);
} else {
info!("Cache configuration -> cache is disabled.");
}
Ok(())
}
}
impl Validatable<ConfigError> for SegmentConfig {
fn validate(&self) -> Result<(), ConfigError> {
if self.size.as_bytes_u64() as u32 > segment::MAX_SIZE_BYTES {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ConfigError> for MessageSaverConfig {
fn validate(&self) -> Result<(), ConfigError> {
if self.enabled && self.interval.is_zero() {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ConfigError> for DataMaintenanceConfig {
fn validate(&self) -> Result<(), ConfigError> {
self.archiver
.validate()
.with_error(|_| format!("{COMPONENT} - failed to validate archiver config"))?;
self.messages
.validate()
.with_error(|_| format!("{COMPONENT} - failed to validate messages maintenance config"))?;
self.state
.validate()
.with_error(|_| format!("{COMPONENT} - failed to validate state maintenance config"))?;
Ok(())
}
}
impl Validatable<ConfigError> for ArchiverConfig {
fn validate(&self) -> Result<(), ConfigError> {
if !self.enabled {
return Ok(());
}
return match self.kind {
ArchiverKind::Disk => {
if self.disk.is_none() {
return Err(ConfigError::InvalidConfiguration);
}
let disk = self.disk.as_ref().unwrap();
if disk.path.is_empty() {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
ArchiverKind::S3 => {
if self.s3.is_none() {
return Err(ConfigError::InvalidConfiguration);
}
let s3 = self.s3.as_ref().unwrap();
if s3.key_id.is_empty() {
return Err(ConfigError::InvalidConfiguration);
}
if s3.key_secret.is_empty() {
return Err(ConfigError::InvalidConfiguration);
}
if s3.endpoint.is_none() && s3.region.is_none() {
return Err(ConfigError::InvalidConfiguration);
}
if s3.endpoint.as_deref().unwrap_or_default().is_empty()
&& s3.region.as_deref().unwrap_or_default().is_empty()
{
return Err(ConfigError::InvalidConfiguration);
}
if s3.bucket.is_empty() {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
};
}
}
impl Validatable<ConfigError> for MessagesMaintenanceConfig {
fn validate(&self) -> Result<(), ConfigError> {
if self.archiver_enabled && self.interval.is_zero() {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ConfigError> for StateMaintenanceConfig {
fn validate(&self) -> Result<(), ConfigError> {
if self.archiver_enabled && self.interval.is_zero() {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
}
impl Validatable<ConfigError> for PersonalAccessTokenConfig {
fn validate(&self) -> Result<(), ConfigError> {
if self.max_tokens_per_user == 0 {
return Err(ConfigError::InvalidConfiguration);
}
if self.cleaner.enabled && self.cleaner.interval.is_zero() {
return Err(ConfigError::InvalidConfiguration);
}
Ok(())
}
}