blob: 8e3291977d966b1f6bb0b3e1ff5a3bd92c4688c9 [file] [log] [blame]
use crate::streaming::persistence::persister::Persister;
use crate::streaming::storage::SystemInfoStorage;
use crate::streaming::systems::info::SystemInfo;
use crate::streaming::utils::file;
use anyhow::Context;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use iggy::error::IggyError;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tracing::info;
#[derive(Debug)]
pub struct FileSystemInfoStorage {
persister: Arc<dyn Persister>,
path: String,
}
impl FileSystemInfoStorage {
pub fn new(path: String, persister: Arc<dyn Persister>) -> Self {
Self { path, persister }
}
}
unsafe impl Send for FileSystemInfoStorage {}
unsafe impl Sync for FileSystemInfoStorage {}
#[async_trait]
impl SystemInfoStorage for FileSystemInfoStorage {
async fn load(&self) -> Result<SystemInfo, IggyError> {
let file = file::open(&self.path).await;
if file.is_err() {
return Err(IggyError::ResourceNotFound(self.path.to_owned()));
}
let mut file = file.unwrap();
let file_size = file.metadata().await?.len() as usize;
let mut buffer = BytesMut::with_capacity(file_size);
buffer.put_bytes(0, file_size);
file.read_exact(&mut buffer).await?;
bincode::deserialize(&buffer)
.with_context(|| "Failed to deserialize system info")
.map_err(IggyError::CannotDeserializeResource)
}
async fn save(&self, system_info: &SystemInfo) -> Result<(), IggyError> {
let data = bincode::serialize(&system_info)
.with_context(|| "Failed to serialize system info")
.map_err(IggyError::CannotSerializeResource)?;
self.persister.overwrite(&self.path, &data).await?;
info!("Saved system info, {}", system_info);
Ok(())
}
}