blob: a46e75911cd5f830e32c626ec991bc30f764eb74 [file] [log] [blame]
use crate::configs::system::SystemConfig;
use crate::streaming::storage::SystemStorage;
use crate::streaming::topics::topic::Topic;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::timestamp::IggyTimestamp;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug)]
pub struct Stream {
pub stream_id: u32,
pub name: String,
pub path: String,
pub topics_path: String,
pub created_at: IggyTimestamp,
pub current_topic_id: AtomicU32,
pub size_bytes: Arc<AtomicU64>,
pub messages_count: Arc<AtomicU64>,
pub segments_count: Arc<AtomicU32>,
pub(crate) topics: HashMap<u32, Topic>,
pub(crate) topics_ids: HashMap<String, u32>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) storage: Arc<SystemStorage>,
}
impl Stream {
pub fn empty(
id: u32,
name: &str,
config: Arc<SystemConfig>,
storage: Arc<SystemStorage>,
) -> Self {
Stream::create(id, name, config, storage)
}
pub fn create(
id: u32,
name: &str,
config: Arc<SystemConfig>,
storage: Arc<SystemStorage>,
) -> Self {
let path = config.get_stream_path(id);
let topics_path = config.get_topics_path(id);
Stream {
stream_id: id,
name: name.to_string(),
path,
topics_path,
config,
current_topic_id: AtomicU32::new(1),
size_bytes: Arc::new(AtomicU64::new(0)),
messages_count: Arc::new(AtomicU64::new(0)),
segments_count: Arc::new(AtomicU32::new(0)),
topics: HashMap::new(),
topics_ids: HashMap::new(),
storage,
created_at: IggyTimestamp::now(),
}
}
pub fn get_size(&self) -> IggyByteSize {
IggyByteSize::from(self.size_bytes.load(Ordering::SeqCst))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::storage::tests::get_test_system_storage;
#[test]
fn should_be_created_given_valid_parameters() {
let storage = Arc::new(get_test_system_storage());
let id = 1;
let name = "test";
let config = Arc::new(SystemConfig::default());
let path = config.get_stream_path(id);
let topics_path = config.get_topics_path(id);
let stream = Stream::create(id, name, config, storage);
assert_eq!(stream.stream_id, id);
assert_eq!(stream.name, name);
assert_eq!(stream.path, path);
assert_eq!(stream.topics_path, topics_path);
assert!(stream.topics.is_empty());
}
}