| use crate::state::system::StreamState; |
| use crate::streaming::session::Session; |
| use crate::streaming::streams::stream::Stream; |
| use crate::streaming::systems::system::System; |
| use futures::future::try_join_all; |
| use iggy::error::IggyError; |
| use iggy::identifier::{IdKind, Identifier}; |
| use iggy::locking::IggySharedMutFn; |
| use iggy::utils::text; |
| use std::cell::RefCell; |
| use std::collections::{HashMap, HashSet}; |
| use std::sync::atomic::{AtomicU32, Ordering}; |
| use tokio::fs; |
| use tokio::fs::read_dir; |
| use tracing::{error, info, warn}; |
| |
| static CURRENT_STREAM_ID: AtomicU32 = AtomicU32::new(1); |
| |
| impl System { |
| pub(crate) async fn load_streams( |
| &mut self, |
| streams: Vec<StreamState>, |
| ) -> Result<(), IggyError> { |
| info!("Loading streams from disk..."); |
| let mut unloaded_streams = Vec::new(); |
| let dir_entries = read_dir(&self.config.get_streams_path()).await; |
| if let Err(error) = dir_entries { |
| error!("Cannot read streams directory: {}", error); |
| return Err(IggyError::CannotReadStreams); |
| } |
| |
| let mut dir_entries = dir_entries.unwrap(); |
| while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { |
| let name = dir_entry.file_name().into_string().unwrap(); |
| let stream_id = name.parse::<u32>(); |
| if stream_id.is_err() { |
| error!("Invalid stream ID file with name: '{name}'."); |
| continue; |
| } |
| |
| let stream_id = stream_id.unwrap(); |
| let stream_state = streams.iter().find(|s| s.id == stream_id); |
| if stream_state.is_none() { |
| error!("Stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed."); |
| if let Err(error) = fs::remove_dir_all(&dir_entry.path()).await { |
| error!("Cannot remove stream directory: {error}"); |
| } else { |
| warn!("Stream with ID: '{stream_id}' was removed."); |
| } |
| continue; |
| } |
| |
| let stream_state = stream_state.unwrap(); |
| let mut stream = Stream::empty( |
| stream_id, |
| &stream_state.name, |
| self.config.clone(), |
| self.storage.clone(), |
| ); |
| stream.created_at = stream_state.created_at; |
| unloaded_streams.push(stream); |
| } |
| |
| let state_stream_ids = streams |
| .iter() |
| .map(|stream| stream.id) |
| .collect::<HashSet<u32>>(); |
| let unloaded_stream_ids = unloaded_streams |
| .iter() |
| .map(|stream| stream.stream_id) |
| .collect::<HashSet<u32>>(); |
| let missing_ids = state_stream_ids |
| .difference(&unloaded_stream_ids) |
| .copied() |
| .collect::<HashSet<u32>>(); |
| if missing_ids.is_empty() { |
| info!("All streams found on disk were found in state."); |
| } else { |
| error!("Streams with IDs: '{missing_ids:?}' were not found on disk."); |
| return Err(IggyError::MissingStreams); |
| } |
| |
| let mut streams_states = streams |
| .into_iter() |
| .map(|s| (s.id, s)) |
| .collect::<HashMap<_, _>>(); |
| let loaded_streams = RefCell::new(Vec::new()); |
| let load_stream_tasks = unloaded_streams.into_iter().map(|mut stream| { |
| let state = streams_states.remove(&stream.stream_id).unwrap(); |
| let load_stream_task = async { |
| stream.load(state).await?; |
| loaded_streams.borrow_mut().push(stream); |
| Result::<(), IggyError>::Ok(()) |
| }; |
| load_stream_task |
| }); |
| try_join_all(load_stream_tasks).await?; |
| |
| for stream in loaded_streams.take() { |
| if self.streams.contains_key(&stream.stream_id) { |
| error!("Stream with ID: '{}' already exists.", &stream.stream_id); |
| continue; |
| } |
| |
| if self.streams_ids.contains_key(&stream.name) { |
| error!("Stream with name: '{}' already exists.", &stream.name); |
| continue; |
| } |
| |
| self.metrics.increment_streams(1); |
| self.metrics.increment_topics(stream.get_topics_count()); |
| self.metrics |
| .increment_partitions(stream.get_partitions_count()); |
| self.metrics.increment_segments(stream.get_segments_count()); |
| self.metrics.increment_messages(stream.get_messages_count()); |
| |
| self.streams_ids |
| .insert(stream.name.clone(), stream.stream_id); |
| self.streams.insert(stream.stream_id, stream); |
| } |
| |
| info!("Loaded {} stream(s) from disk.", self.streams.len()); |
| Ok(()) |
| } |
| |
| pub fn get_streams(&self) -> Vec<&Stream> { |
| self.streams.values().collect() |
| } |
| |
| pub fn find_streams(&self, session: &Session) -> Result<Vec<&Stream>, IggyError> { |
| self.ensure_authenticated(session)?; |
| self.permissioner.get_streams(session.get_user_id())?; |
| Ok(self.get_streams()) |
| } |
| |
| pub fn find_stream( |
| &self, |
| session: &Session, |
| identifier: &Identifier, |
| ) -> Result<&Stream, IggyError> { |
| self.ensure_authenticated(session)?; |
| let stream = self.get_stream(identifier); |
| if let Ok(stream) = stream { |
| self.permissioner |
| .get_stream(session.get_user_id(), stream.stream_id)?; |
| return Ok(stream); |
| } |
| |
| stream |
| } |
| |
| pub fn get_stream(&self, identifier: &Identifier) -> Result<&Stream, IggyError> { |
| match identifier.kind { |
| IdKind::Numeric => self.get_stream_by_id(identifier.get_u32_value()?), |
| IdKind::String => self.get_stream_by_name(&identifier.get_cow_str_value()?), |
| } |
| } |
| |
| pub fn get_stream_mut(&mut self, identifier: &Identifier) -> Result<&mut Stream, IggyError> { |
| match identifier.kind { |
| IdKind::Numeric => self.get_stream_by_id_mut(identifier.get_u32_value()?), |
| IdKind::String => self.get_stream_by_name_mut(&identifier.get_cow_str_value()?), |
| } |
| } |
| |
| fn get_stream_by_name(&self, name: &str) -> Result<&Stream, IggyError> { |
| let stream_id = self.streams_ids.get(name); |
| if stream_id.is_none() { |
| return Err(IggyError::StreamNameNotFound(name.to_string())); |
| } |
| |
| self.get_stream_by_id(*stream_id.unwrap()) |
| } |
| |
| fn get_stream_by_id(&self, stream_id: u32) -> Result<&Stream, IggyError> { |
| let stream = self.streams.get(&stream_id); |
| if stream.is_none() { |
| return Err(IggyError::StreamIdNotFound(stream_id)); |
| } |
| |
| Ok(stream.unwrap()) |
| } |
| |
| fn get_stream_by_name_mut(&mut self, name: &str) -> Result<&mut Stream, IggyError> { |
| let stream_id; |
| { |
| let id = self.streams_ids.get_mut(name); |
| if id.is_none() { |
| return Err(IggyError::StreamNameNotFound(name.to_string())); |
| } |
| |
| stream_id = *id.unwrap(); |
| } |
| |
| self.get_stream_by_id_mut(stream_id) |
| } |
| |
| fn get_stream_by_id_mut(&mut self, stream_id: u32) -> Result<&mut Stream, IggyError> { |
| let stream = self.streams.get_mut(&stream_id); |
| if stream.is_none() { |
| return Err(IggyError::StreamIdNotFound(stream_id)); |
| } |
| |
| Ok(stream.unwrap()) |
| } |
| |
| pub async fn create_stream( |
| &mut self, |
| session: &Session, |
| stream_id: Option<u32>, |
| name: &str, |
| ) -> Result<&Stream, IggyError> { |
| self.ensure_authenticated(session)?; |
| self.permissioner.create_stream(session.get_user_id())?; |
| let name = text::to_lowercase_non_whitespace(name); |
| if self.streams_ids.contains_key(&name) { |
| return Err(IggyError::StreamNameAlreadyExists(name.to_string())); |
| } |
| |
| let mut id; |
| if stream_id.is_none() { |
| id = CURRENT_STREAM_ID.fetch_add(1, Ordering::SeqCst); |
| loop { |
| if self.streams.contains_key(&id) { |
| if id == u32::MAX { |
| return Err(IggyError::StreamIdAlreadyExists(id)); |
| } |
| id = CURRENT_STREAM_ID.fetch_add(1, Ordering::SeqCst); |
| } else { |
| break; |
| } |
| } |
| } else { |
| id = stream_id.unwrap(); |
| } |
| |
| if self.streams.contains_key(&id) { |
| return Err(IggyError::StreamIdAlreadyExists(id)); |
| } |
| |
| let stream = Stream::create(id, &name, self.config.clone(), self.storage.clone()); |
| stream.persist().await?; |
| info!("Created stream with ID: {id}, name: '{name}'."); |
| self.streams_ids.insert(name, stream.stream_id); |
| self.streams.insert(stream.stream_id, stream); |
| self.metrics.increment_streams(1); |
| self.get_stream_by_id(id) |
| } |
| |
| pub async fn update_stream( |
| &mut self, |
| session: &Session, |
| id: &Identifier, |
| name: &str, |
| ) -> Result<(), IggyError> { |
| self.ensure_authenticated(session)?; |
| let stream_id; |
| { |
| let stream = self.get_stream(id)?; |
| stream_id = stream.stream_id; |
| } |
| |
| self.permissioner |
| .update_stream(session.get_user_id(), stream_id)?; |
| let updated_name = text::to_lowercase_non_whitespace(name); |
| |
| { |
| if let Some(stream_id_by_name) = self.streams_ids.get(&updated_name) { |
| if *stream_id_by_name != stream_id { |
| return Err(IggyError::StreamNameAlreadyExists(updated_name.clone())); |
| } |
| } |
| } |
| |
| let old_name; |
| { |
| let stream = self.get_stream_mut(id)?; |
| old_name = stream.name.clone(); |
| stream.name.clone_from(&updated_name); |
| stream.persist().await?; |
| } |
| |
| { |
| self.streams_ids.remove(&old_name); |
| self.streams_ids.insert(updated_name.clone(), stream_id); |
| } |
| |
| info!( |
| "Stream with ID '{}' updated. Old name: '{}' changed to: '{}'.", |
| id, old_name, updated_name |
| ); |
| Ok(()) |
| } |
| |
| pub async fn delete_stream( |
| &mut self, |
| session: &Session, |
| id: &Identifier, |
| ) -> Result<u32, IggyError> { |
| self.ensure_authenticated(session)?; |
| let stream = self.get_stream(id)?; |
| let stream_id = stream.stream_id; |
| self.permissioner |
| .delete_stream(session.get_user_id(), stream_id)?; |
| let stream_name = stream.name.clone(); |
| if stream.delete().await.is_err() { |
| return Err(IggyError::CannotDeleteStream(stream_id)); |
| } |
| |
| self.metrics.decrement_streams(1); |
| self.metrics.decrement_topics(stream.get_topics_count()); |
| self.metrics |
| .decrement_partitions(stream.get_partitions_count()); |
| self.metrics.decrement_messages(stream.get_messages_count()); |
| self.metrics.decrement_segments(stream.get_segments_count()); |
| self.streams.remove(&stream_id); |
| self.streams_ids.remove(&stream_name); |
| let current_stream_id = CURRENT_STREAM_ID.load(Ordering::SeqCst); |
| if current_stream_id > stream_id { |
| CURRENT_STREAM_ID.store(stream_id, Ordering::SeqCst); |
| } |
| |
| let client_manager = self.client_manager.read().await; |
| client_manager |
| .delete_consumer_groups_for_stream(stream_id) |
| .await; |
| Ok(stream_id) |
| } |
| |
| pub async fn purge_stream( |
| &self, |
| session: &Session, |
| stream_id: &Identifier, |
| ) -> Result<(), IggyError> { |
| let stream = self.get_stream(stream_id)?; |
| self.permissioner |
| .purge_stream(session.get_user_id(), stream.stream_id)?; |
| stream.purge().await |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig}; |
| use crate::configs::system::SystemConfig; |
| use crate::state::command::EntryCommand; |
| use crate::state::entry::StateEntry; |
| use crate::state::State; |
| use crate::streaming::storage::tests::get_test_system_storage; |
| use crate::streaming::users::user::User; |
| use async_trait::async_trait; |
| use iggy::users::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME}; |
| use std::{ |
| net::{Ipv4Addr, SocketAddr}, |
| sync::Arc, |
| }; |
| |
| #[tokio::test] |
| async fn should_get_stream_by_id_and_name() { |
| let stream_id = 1; |
| let stream_name = "test"; |
| let config = Arc::new(SystemConfig::default()); |
| let storage = get_test_system_storage(); |
| let mut system = System::create( |
| config, |
| storage, |
| Arc::new(TestState::default()), |
| None, |
| DataMaintenanceConfig::default(), |
| PersonalAccessTokenConfig::default(), |
| ); |
| let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD); |
| let permissions = root.permissions.clone(); |
| let session = Session::new( |
| 1, |
| root.id, |
| SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234), |
| ); |
| system |
| .permissioner |
| .init_permissions_for_user(root.id, permissions); |
| system |
| .create_stream(&session, Some(stream_id), stream_name) |
| .await |
| .unwrap(); |
| |
| let stream = system.get_stream(&Identifier::numeric(stream_id).unwrap()); |
| assert!(stream.is_ok()); |
| let stream = stream.unwrap(); |
| assert_eq!(stream.stream_id, stream_id); |
| assert_eq!(stream.name, stream_name); |
| |
| let stream = system.get_stream(&Identifier::named(stream_name).unwrap()); |
| assert!(stream.is_ok()); |
| let stream = stream.unwrap(); |
| assert_eq!(stream.stream_id, stream_id); |
| assert_eq!(stream.name, stream_name); |
| } |
| |
| #[derive(Debug, Default)] |
| struct TestState {} |
| |
| #[async_trait] |
| impl State for TestState { |
| async fn init(&self) -> Result<Vec<StateEntry>, IggyError> { |
| Ok(Vec::new()) |
| } |
| |
| async fn load_entries(&self) -> Result<Vec<StateEntry>, IggyError> { |
| Ok(Vec::new()) |
| } |
| |
| async fn apply(&self, _: u32, _: EntryCommand) -> Result<(), IggyError> { |
| Ok(()) |
| } |
| } |
| } |