blob: 544389e74e78b0fc3f2ed406742eb6b96b04b9de [file] [log] [blame]
use crate::state::system::StreamState;
use crate::streaming::storage::StreamStorage;
use crate::streaming::streams::stream::Stream;
use crate::streaming::topics::topic::Topic;
use async_trait::async_trait;
use futures::future::join_all;
use iggy::error::IggyError;
use iggy::utils::timestamp::IggyTimestamp;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use tokio::fs;
use tokio::fs::create_dir;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
#[derive(Debug)]
pub struct FileStreamStorage;
unsafe impl Send for FileStreamStorage {}
unsafe impl Sync for FileStreamStorage {}
#[derive(Debug, Serialize, Deserialize)]
struct StreamData {
name: String,
created_at: IggyTimestamp,
}
#[async_trait]
impl StreamStorage for FileStreamStorage {
async fn load(&self, stream: &mut Stream, mut state: StreamState) -> Result<(), IggyError> {
info!("Loading stream with ID: {} from disk...", stream.stream_id);
if !Path::new(&stream.path).exists() {
return Err(IggyError::StreamIdNotFound(stream.stream_id));
}
let mut unloaded_topics = Vec::new();
let dir_entries = fs::read_dir(&stream.topics_path).await;
if dir_entries.is_err() {
return Err(IggyError::CannotReadTopics(stream.stream_id));
}
let mut dir_entries = dir_entries.unwrap();
while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) {
let name = dir_entry.file_name().into_string().unwrap();
let topic_id = name.parse::<u32>();
if topic_id.is_err() {
error!("Invalid topic ID file with name: '{}'.", name);
continue;
}
let topic_id = topic_id.unwrap();
let topic_state = state.topics.get(&topic_id);
if topic_state.is_none() {
let stream_id = stream.stream_id;
error!("Topic with ID: '{topic_id}' for stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed.");
if let Err(error) = fs::remove_dir_all(&dir_entry.path()).await {
error!("Cannot remove topic directory: {error}");
} else {
warn!("Topic with ID: '{topic_id}' for stream with ID: '{stream_id}' was removed.");
}
continue;
}
let topic_state = topic_state.unwrap();
let topic = Topic::empty(
stream.stream_id,
topic_id,
&topic_state.name,
stream.size_bytes.clone(),
stream.messages_count.clone(),
stream.segments_count.clone(),
stream.config.clone(),
stream.storage.clone(),
);
unloaded_topics.push(topic);
}
let state_topic_ids = state.topics.keys().copied().collect::<HashSet<u32>>();
let unloaded_topic_ids = unloaded_topics
.iter()
.map(|topic| topic.topic_id)
.collect::<HashSet<u32>>();
let missing_ids = state_topic_ids
.difference(&unloaded_topic_ids)
.copied()
.collect::<HashSet<u32>>();
if missing_ids.is_empty() {
info!(
"All topics for stream with ID: '{}' found on disk were found in state.",
stream.stream_id
);
} else {
error!("Topics with IDs: '{missing_ids:?}' for stream with ID: '{}' were not found on disk.", stream.stream_id);
return Err(IggyError::MissingTopics(stream.stream_id));
}
let loaded_topics = Arc::new(Mutex::new(Vec::new()));
let mut load_topics = Vec::new();
for mut topic in unloaded_topics {
let loaded_topics = loaded_topics.clone();
let topic_state = state.topics.remove(&topic.topic_id).unwrap();
let load_topic = tokio::spawn(async move {
match topic.load(topic_state).await {
Ok(_) => loaded_topics.lock().await.push(topic),
Err(error) => error!(
"Failed to load topic with ID: {} for stream with ID: {}. Error: {}",
topic.topic_id, topic.stream_id, error
),
}
});
load_topics.push(load_topic);
}
join_all(load_topics).await;
for topic in loaded_topics.lock().await.drain(..) {
if stream.topics.contains_key(&topic.topic_id) {
error!(
"Topic with ID: '{}' already exists for stream with ID: {}.",
&topic.topic_id, &stream.stream_id
);
continue;
}
if stream.topics_ids.contains_key(&topic.name) {
error!(
"Topic with name: '{}' already exists for stream with ID: {}.",
&topic.name, &stream.stream_id
);
continue;
}
stream.topics_ids.insert(topic.name.clone(), topic.topic_id);
stream.topics.insert(topic.topic_id, topic);
}
info!(
"Loaded stream: '{}' with ID: {} from disk.",
&stream.name, &stream.stream_id
);
Ok(())
}
async fn save(&self, stream: &Stream) -> Result<(), IggyError> {
if !Path::new(&stream.path).exists() && create_dir(&stream.path).await.is_err() {
return Err(IggyError::CannotCreateStreamDirectory(
stream.stream_id,
stream.path.clone(),
));
}
if !Path::new(&stream.topics_path).exists()
&& create_dir(&stream.topics_path).await.is_err()
{
return Err(IggyError::CannotCreateTopicsDirectory(
stream.stream_id,
stream.topics_path.clone(),
));
}
info!("Saved stream with ID: {}.", stream.stream_id);
Ok(())
}
async fn delete(&self, stream: &Stream) -> Result<(), IggyError> {
info!("Deleting stream with ID: {}...", stream.stream_id);
if fs::remove_dir_all(&stream.path).await.is_err() {
return Err(IggyError::CannotDeleteStreamDirectory(stream.stream_id));
}
info!("Deleted stream with ID: {}.", stream.stream_id);
Ok(())
}
}