blob: d66cbb9471efe448129cc1b98030b3e6c59cae37 [file] [log] [blame]
use crate::streaming::topics::consumer_group::ConsumerGroup;
use crate::streaming::topics::topic::Topic;
use iggy::error::IggyError;
use iggy::locking::IggySharedMutFn;
use tokio::sync::RwLock;
impl Topic {
pub async fn load(&mut self) -> Result<(), IggyError> {
let storage = self.storage.clone();
storage.topic.load(self).await?;
let consumer_groups = storage.topic.load_consumer_groups(self).await?;
for consumer_group in consumer_groups {
self.consumer_groups_ids.insert(
consumer_group.name.clone(),
consumer_group.consumer_group_id,
);
self.consumer_groups.insert(
consumer_group.consumer_group_id,
RwLock::new(ConsumerGroup::new(
self.topic_id,
consumer_group.consumer_group_id,
&consumer_group.name,
self.get_partitions_count(),
)),
);
}
Ok(())
}
pub async fn persist(&self) -> Result<(), IggyError> {
self.storage.topic.save(self).await
}
pub async fn delete(&self) -> Result<(), IggyError> {
for partition in self.get_partitions() {
let partition = partition.read().await;
partition.delete().await?;
}
self.storage.topic.delete(self).await
}
pub async fn persist_messages(&self) -> Result<usize, IggyError> {
let mut saved_messages_number = 0;
for partition in self.get_partitions() {
let mut partition = partition.write().await;
for segment in partition.get_segments_mut() {
saved_messages_number += segment.persist_messages().await?;
}
}
Ok(saved_messages_number)
}
pub async fn purge(&self) -> Result<(), IggyError> {
for partition in self.get_partitions() {
let mut partition = partition.write().await;
partition.purge().await?;
}
Ok(())
}
}