blob: 30b413aa60d6eb9f62d31ff1f0c6d5647361a35e [file] [log] [blame]
use crate::configs::system::SystemConfig;
use crate::streaming::storage::SystemStorage;
use crate::streaming::topics::topic::Topic;
use ahash::AHashMap;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::timestamp::IggyTimestamp;
use std::fmt::Display;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug)]
pub struct Stream {
pub stream_id: u32,
pub name: String,
pub path: String,
pub topics_path: String,
pub created_at: IggyTimestamp,
pub current_topic_id: AtomicU32,
pub size_bytes: Arc<AtomicU64>,
pub messages_count: Arc<AtomicU64>,
pub segments_count: Arc<AtomicU32>,
pub(crate) topics: AHashMap<u32, Topic>,
pub(crate) topics_ids: AHashMap<String, u32>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) storage: Arc<SystemStorage>,
}
impl Stream {
pub fn empty(
id: u32,
name: &str,
config: Arc<SystemConfig>,
storage: Arc<SystemStorage>,
) -> Self {
Stream::create(id, name, config, storage)
}
pub fn create(
id: u32,
name: &str,
config: Arc<SystemConfig>,
storage: Arc<SystemStorage>,
) -> Self {
let path = config.get_stream_path(id);
let topics_path = config.get_topics_path(id);
Stream {
stream_id: id,
name: name.to_string(),
path,
topics_path,
config,
current_topic_id: AtomicU32::new(1),
size_bytes: Arc::new(AtomicU64::new(0)),
messages_count: Arc::new(AtomicU64::new(0)),
segments_count: Arc::new(AtomicU32::new(0)),
topics: AHashMap::new(),
topics_ids: AHashMap::new(),
storage,
created_at: IggyTimestamp::now(),
}
}
pub fn get_size(&self) -> IggyByteSize {
IggyByteSize::from(self.size_bytes.load(Ordering::SeqCst))
}
}
impl Display for Stream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Stream {{ stream_id: {}, name: {}, path: {}, topic_path: {}, created_at: {} }}",
self.stream_id, self.name, self.path, self.topics_path, self.created_at,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
#[test]
fn should_be_created_given_valid_parameters() {
let tempdir = tempfile::TempDir::new().unwrap();
let config = Arc::new(SystemConfig {
path: tempdir.path().to_str().unwrap().to_string(),
..Default::default()
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let id = 1;
let name = "test";
let config = Arc::new(SystemConfig::default());
let path = config.get_stream_path(id);
let topics_path = config.get_topics_path(id);
let stream = Stream::create(id, name, config, storage);
assert_eq!(stream.stream_id, id);
assert_eq!(stream.name, name);
assert_eq!(stream.path, path);
assert_eq!(stream.topics_path, topics_path);
assert!(stream.topics.is_empty());
}
}