blob: 609574bb090119867fb747c4cf6def4552422ba5 [file] [log] [blame]
use super::batching::message_batch::RetainedMessageBatch;
use crate::streaming::partitions::partition::{ConsumerOffset, Partition};
use crate::streaming::partitions::storage::FilePartitionStorage;
use crate::streaming::persistence::persister::Persister;
use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::personal_access_tokens::storage::FilePersonalAccessTokenStorage;
use crate::streaming::segments::index::{Index, IndexRange};
use crate::streaming::segments::segment::Segment;
use crate::streaming::segments::storage::FileSegmentStorage;
use crate::streaming::segments::time_index::TimeIndex;
use crate::streaming::streams::storage::FileStreamStorage;
use crate::streaming::streams::stream::Stream;
use crate::streaming::systems::info::SystemInfo;
use crate::streaming::systems::storage::FileSystemInfoStorage;
use crate::streaming::topics::consumer_group::ConsumerGroup;
use crate::streaming::topics::storage::FileTopicStorage;
use crate::streaming::topics::topic::Topic;
use crate::streaming::users::storage::FileUserStorage;
use crate::streaming::users::user::User;
use async_trait::async_trait;
use iggy::consumer::ConsumerKind;
use iggy::error::IggyError;
use iggy::models::user_info::UserId;
use sled::Db;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
#[async_trait]
pub trait Storage<T>: Sync + Send {
async fn load(&self, component: &mut T) -> Result<(), IggyError>;
async fn save(&self, component: &T) -> Result<(), IggyError>;
async fn delete(&self, component: &T) -> Result<(), IggyError>;
}
#[async_trait]
pub trait SystemInfoStorage: Storage<SystemInfo> {}
#[async_trait]
pub trait UserStorage: Storage<User> {
async fn load_by_id(&self, id: UserId) -> Result<User, IggyError>;
async fn load_by_username(&self, username: &str) -> Result<User, IggyError>;
async fn load_all(&self) -> Result<Vec<User>, IggyError>;
}
#[async_trait]
pub trait PersonalAccessTokenStorage: Storage<PersonalAccessToken> {
async fn load_all(&self) -> Result<Vec<PersonalAccessToken>, IggyError>;
async fn load_for_user(&self, user_id: UserId) -> Result<Vec<PersonalAccessToken>, IggyError>;
async fn load_by_token(&self, token: &str) -> Result<PersonalAccessToken, IggyError>;
async fn load_by_name(
&self,
user_id: UserId,
name: &str,
) -> Result<PersonalAccessToken, IggyError>;
async fn delete_for_user(&self, user_id: UserId, name: &str) -> Result<(), IggyError>;
}
#[async_trait]
pub trait StreamStorage: Storage<Stream> {}
#[async_trait]
pub trait TopicStorage: Storage<Topic> {
async fn save_consumer_group(
&self,
topic: &Topic,
consumer_group: &ConsumerGroup,
) -> Result<(), IggyError>;
async fn load_consumer_groups(&self, topic: &Topic) -> Result<Vec<ConsumerGroup>, IggyError>;
async fn delete_consumer_group(
&self,
topic: &Topic,
consumer_group: &ConsumerGroup,
) -> Result<(), IggyError>;
}
#[async_trait]
pub trait PartitionStorage: Storage<Partition> {
async fn save_consumer_offset(&self, offset: &ConsumerOffset) -> Result<(), IggyError>;
async fn load_consumer_offsets(
&self,
kind: ConsumerKind,
stream_id: u32,
topic_id: u32,
partition_id: u32,
) -> Result<Vec<ConsumerOffset>, IggyError>;
async fn delete_consumer_offsets(
&self,
kind: ConsumerKind,
stream_id: u32,
topic_id: u32,
partition_id: u32,
) -> Result<(), IggyError>;
}
#[async_trait]
pub trait SegmentStorage: Storage<Segment> {
async fn load_message_batches(
&self,
segment: &Segment,
index_range: &IndexRange,
) -> Result<Vec<RetainedMessageBatch>, IggyError>;
async fn load_newest_batches_by_size(
&self,
segment: &Segment,
size_bytes: u64,
) -> Result<Vec<RetainedMessageBatch>, IggyError>;
async fn save_batches(
&self,
segment: &Segment,
messages: &[Arc<RetainedMessageBatch>],
) -> Result<u32, IggyError>;
async fn load_message_ids(&self, segment: &Segment) -> Result<Vec<u128>, IggyError>;
async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>;
async fn load_all_indexes(&self, segment: &Segment) -> Result<Vec<Index>, IggyError>;
async fn load_index_range(
&self,
segment: &Segment,
segment_start_offset: u64,
index_start_offset: u64,
index_end_offset: u64,
) -> Result<Option<IndexRange>, IggyError>;
async fn save_index(&self, segment: &Segment) -> Result<(), IggyError>;
async fn load_all_time_indexes(&self, segment: &Segment) -> Result<Vec<TimeIndex>, IggyError>;
async fn load_last_time_index(&self, segment: &Segment)
-> Result<Option<TimeIndex>, IggyError>;
async fn save_time_index(&self, segment: &Segment) -> Result<(), IggyError>;
}
#[derive(Debug)]
pub struct SystemStorage {
pub info: Arc<dyn SystemInfoStorage>,
pub user: Arc<dyn UserStorage>,
pub personal_access_token: Arc<dyn PersonalAccessTokenStorage>,
pub stream: Arc<dyn StreamStorage>,
pub topic: Arc<dyn TopicStorage>,
pub partition: Arc<dyn PartitionStorage>,
pub segment: Arc<dyn SegmentStorage>,
}
impl SystemStorage {
pub fn new(db: Arc<Db>, persister: Arc<dyn Persister>) -> Self {
Self {
info: Arc::new(FileSystemInfoStorage::new(db.clone())),
user: Arc::new(FileUserStorage::new(db.clone())),
personal_access_token: Arc::new(FilePersonalAccessTokenStorage::new(db.clone())),
stream: Arc::new(FileStreamStorage::new(db.clone())),
topic: Arc::new(FileTopicStorage::new(db.clone())),
partition: Arc::new(FilePartitionStorage::new(db.clone())),
segment: Arc::new(FileSegmentStorage::new(persister.clone())),
}
}
}
impl Debug for dyn SystemInfoStorage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SystemInfoStorage")
}
}
impl Debug for dyn UserStorage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "UserStorage")
}
}
impl Debug for dyn PersonalAccessTokenStorage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PersonalAccessTokenStorage")
}
}
impl Debug for dyn StreamStorage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "StreamStorage")
}
}
impl Debug for dyn TopicStorage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "TopicStorage")
}
}
impl Debug for dyn PartitionStorage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PartitionStorage")
}
}
impl Debug for dyn SegmentStorage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SegmentStorage")
}
}
#[cfg(test)]
pub(crate) mod tests {
use crate::streaming::partitions::partition::Partition;
use crate::streaming::segments::index::{Index, IndexRange};
use crate::streaming::segments::segment::Segment;
use crate::streaming::segments::time_index::TimeIndex;
use crate::streaming::storage::*;
use crate::streaming::streams::stream::Stream;
use crate::streaming::topics::topic::Topic;
use async_trait::async_trait;
use std::sync::Arc;
struct TestSystemInfoStorage {}
struct TestUserStorage {}
struct TestPersonalAccessTokenStorage {}
struct TestStreamStorage {}
struct TestTopicStorage {}
struct TestPartitionStorage {}
struct TestSegmentStorage {}
#[async_trait]
impl Storage<SystemInfo> for TestSystemInfoStorage {
async fn load(&self, _system_info: &mut SystemInfo) -> Result<(), IggyError> {
Ok(())
}
async fn save(&self, _system_info: &SystemInfo) -> Result<(), IggyError> {
Ok(())
}
async fn delete(&self, _system_info: &SystemInfo) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl SystemInfoStorage for TestSystemInfoStorage {}
#[async_trait]
impl Storage<User> for TestUserStorage {
async fn load(&self, _user: &mut User) -> Result<(), IggyError> {
Ok(())
}
async fn save(&self, _user: &User) -> Result<(), IggyError> {
Ok(())
}
async fn delete(&self, _user: &User) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl UserStorage for TestUserStorage {
async fn load_by_id(&self, _id: UserId) -> Result<User, IggyError> {
Ok(User::default())
}
async fn load_by_username(&self, _username: &str) -> Result<User, IggyError> {
Ok(User::default())
}
async fn load_all(&self) -> Result<Vec<User>, IggyError> {
Ok(vec![])
}
}
#[async_trait]
impl Storage<PersonalAccessToken> for TestPersonalAccessTokenStorage {
async fn load(
&self,
_personal_access_token: &mut PersonalAccessToken,
) -> Result<(), IggyError> {
Ok(())
}
async fn save(
&self,
_personal_access_token: &PersonalAccessToken,
) -> Result<(), IggyError> {
Ok(())
}
async fn delete(
&self,
_personal_access_token: &PersonalAccessToken,
) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl PersonalAccessTokenStorage for TestPersonalAccessTokenStorage {
async fn load_all(&self) -> Result<Vec<PersonalAccessToken>, IggyError> {
Ok(vec![])
}
async fn load_for_user(
&self,
_user_id: UserId,
) -> Result<Vec<PersonalAccessToken>, IggyError> {
Ok(vec![])
}
async fn load_by_token(&self, _token: &str) -> Result<PersonalAccessToken, IggyError> {
Ok(PersonalAccessToken::default())
}
async fn load_by_name(
&self,
_user_id: UserId,
_name: &str,
) -> Result<PersonalAccessToken, IggyError> {
Ok(PersonalAccessToken::default())
}
async fn delete_for_user(&self, _user_id: UserId, _name: &str) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl Storage<Stream> for TestStreamStorage {
async fn load(&self, _stream: &mut Stream) -> Result<(), IggyError> {
Ok(())
}
async fn save(&self, _stream: &Stream) -> Result<(), IggyError> {
Ok(())
}
async fn delete(&self, _stream: &Stream) -> Result<(), IggyError> {
Ok(())
}
}
impl StreamStorage for TestStreamStorage {}
#[async_trait]
impl Storage<Topic> for TestTopicStorage {
async fn load(&self, _topic: &mut Topic) -> Result<(), IggyError> {
Ok(())
}
async fn save(&self, _topic: &Topic) -> Result<(), IggyError> {
Ok(())
}
async fn delete(&self, _topic: &Topic) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl TopicStorage for TestTopicStorage {
async fn save_consumer_group(
&self,
_topic: &Topic,
_consumer_group: &ConsumerGroup,
) -> Result<(), IggyError> {
Ok(())
}
async fn load_consumer_groups(
&self,
_topic: &Topic,
) -> Result<Vec<ConsumerGroup>, IggyError> {
Ok(vec![])
}
async fn delete_consumer_group(
&self,
_topic: &Topic,
_consumer_group: &ConsumerGroup,
) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl Storage<Partition> for TestPartitionStorage {
async fn load(&self, _partition: &mut Partition) -> Result<(), IggyError> {
Ok(())
}
async fn save(&self, _partition: &Partition) -> Result<(), IggyError> {
Ok(())
}
async fn delete(&self, _partition: &Partition) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl PartitionStorage for TestPartitionStorage {
async fn save_consumer_offset(&self, _offset: &ConsumerOffset) -> Result<(), IggyError> {
Ok(())
}
async fn load_consumer_offsets(
&self,
_kind: ConsumerKind,
_stream_id: u32,
_topic_id: u32,
_partition_id: u32,
) -> Result<Vec<ConsumerOffset>, IggyError> {
Ok(vec![])
}
async fn delete_consumer_offsets(
&self,
_kind: ConsumerKind,
_stream_id: u32,
_topic_id: u32,
_partition_id: u32,
) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl Storage<Segment> for TestSegmentStorage {
async fn load(&self, _segment: &mut Segment) -> Result<(), IggyError> {
Ok(())
}
async fn save(&self, _segment: &Segment) -> Result<(), IggyError> {
Ok(())
}
async fn delete(&self, _segment: &Segment) -> Result<(), IggyError> {
Ok(())
}
}
#[async_trait]
impl SegmentStorage for TestSegmentStorage {
async fn load_message_batches(
&self,
_segment: &Segment,
_index_range: &IndexRange,
) -> Result<Vec<RetainedMessageBatch>, IggyError> {
Ok(vec![])
}
async fn load_newest_batches_by_size(
&self,
_segment: &Segment,
_size: u64,
) -> Result<Vec<RetainedMessageBatch>, IggyError> {
Ok(vec![])
}
async fn save_batches(
&self,
_segment: &Segment,
_messages: &[Arc<RetainedMessageBatch>],
) -> Result<u32, IggyError> {
Ok(0)
}
async fn load_message_ids(&self, _segment: &Segment) -> Result<Vec<u128>, IggyError> {
Ok(vec![])
}
async fn load_checksums(&self, _segment: &Segment) -> Result<(), IggyError> {
Ok(())
}
async fn load_all_indexes(&self, _segment: &Segment) -> Result<Vec<Index>, IggyError> {
Ok(vec![])
}
async fn load_index_range(
&self,
_segment: &Segment,
_segment_start_offset: u64,
_index_start_offset: u64,
_index_end_offset: u64,
) -> Result<Option<IndexRange>, IggyError> {
Ok(None)
}
async fn save_index(&self, _segment: &Segment) -> Result<(), IggyError> {
Ok(())
}
async fn load_all_time_indexes(
&self,
_segment: &Segment,
) -> Result<Vec<TimeIndex>, IggyError> {
Ok(vec![])
}
async fn load_last_time_index(
&self,
_segment: &Segment,
) -> Result<Option<TimeIndex>, IggyError> {
Ok(None)
}
async fn save_time_index(&self, _segment: &Segment) -> Result<(), IggyError> {
Ok(())
}
}
pub fn get_test_system_storage() -> SystemStorage {
SystemStorage {
info: Arc::new(TestSystemInfoStorage {}),
user: Arc::new(TestUserStorage {}),
personal_access_token: Arc::new(TestPersonalAccessTokenStorage {}),
stream: Arc::new(TestStreamStorage {}),
topic: Arc::new(TestTopicStorage {}),
partition: Arc::new(TestPartitionStorage {}),
segment: Arc::new(TestSegmentStorage {}),
}
}
}