blob: b13551761613ce611b40f543ca9378b3491b0c95 [file]
use crate::state::system::StreamState;
use crate::streaming::streams::stream::Stream;
use crate::streaming::streams::COMPONENT;
use error_set::ErrContext;
use iggy::error::IggyError;
impl Stream {
pub async fn load(&mut self, state: StreamState) -> Result<(), IggyError> {
let storage = self.storage.clone();
let state_id = state.id;
storage.stream
.load(self, state)
.await
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to load stream with state, state ID: {state_id}, stream: {self}"))
}
pub async fn persist(&self) -> Result<(), IggyError> {
self.storage
.stream
.save(self)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to persist stream: {self}")
})
}
pub async fn delete(&self) -> Result<(), IggyError> {
for topic in self.get_topics() {
topic.delete().await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to delete topic in stream: {self}")
})?;
}
self.storage
.stream
.delete(self)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to delete stream: {self}")
})
}
pub async fn persist_messages(&self) -> Result<usize, IggyError> {
let mut saved_messages_number = 0;
for topic in self.get_topics() {
saved_messages_number += topic.persist_messages().await.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to persist messages for topic: {topic} in stream: {self}"
)
})?;
}
Ok(saved_messages_number)
}
pub async fn purge(&self) -> Result<(), IggyError> {
for topic in self.get_topics() {
topic.purge().await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to purge topic: {topic} in stream: {self}")
})?;
}
Ok(())
}
}