| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| use crate::{ |
| IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, |
| compat::index_rebuilding::index_rebuilder::IndexRebuilder, |
| configs::{ |
| cache_indexes::CacheIndexesConfig, |
| server::ServerConfig, |
| sharding::ShardInfo, |
| system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig}, |
| }, |
| io::fs_utils::{self, DirEntry}, |
| metadata::{ConsumerGroupMeta, MetadataWriter, PartitionMeta, StreamMeta, TopicMeta, UserMeta}, |
| server_error::ServerError, |
| shard::{ |
| system::info::SystemInfo, |
| transmission::{ |
| connector::{ShardConnector, StopSender}, |
| frame::ShardFrame, |
| }, |
| }, |
| state::system::{StreamState, TopicState, UserState}, |
| streaming::{ |
| partitions::{ |
| consumer_group_offsets::ConsumerGroupOffsets, consumer_offsets::ConsumerOffsets, |
| journal::MemoryMessageJournal, log::SegmentedLog, |
| }, |
| persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, |
| segments::{Segment, storage::Storage}, |
| stats::{PartitionStats, StreamStats, TopicStats}, |
| storage::SystemStorage, |
| users::user::User, |
| utils::{crypto, file::overwrite}, |
| }, |
| }; |
| use compio::{fs::create_dir_all, runtime::Runtime}; |
| use err_trail::ErrContext; |
| use iggy_common::SemanticVersion; |
| use iggy_common::{ |
| IggyByteSize, IggyError, PersonalAccessToken, |
| defaults::{ |
| DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, |
| MIN_USERNAME_LENGTH, |
| }, |
| }; |
| use slab::Slab; |
| use std::{env, path::Path, sync::Arc}; |
| use tracing::{info, warn}; |
| |
| pub fn create_shard_connections( |
| shard_assignment: &[ShardInfo], |
| ) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) { |
| // Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU core numbers |
| let connectors: Vec<ShardConnector<ShardFrame>> = shard_assignment |
| .iter() |
| .enumerate() |
| .map(|(idx, _assignment)| { |
| // let cpu_id = assignment.cpu_set.iter().next().unwrap_or(&idx); |
| ShardConnector::new(idx as u16) |
| }) |
| .collect(); |
| |
| let shutdown_handles = connectors |
| .iter() |
| .map(|conn| (conn.id, conn.stop_sender.clone())) |
| .collect(); |
| |
| (connectors, shutdown_handles) |
| } |
| |
| pub async fn load_config() -> Result<ServerConfig, ServerError> { |
| let config = ServerConfig::load().await?; |
| Ok(config) |
| } |
| |
| pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> { |
| let system_path = config.get_system_path(); |
| if !Path::new(&system_path).exists() && create_dir_all(&system_path).await.is_err() { |
| return Err(IggyError::CannotCreateBaseDirectory(system_path)); |
| } |
| |
| let state_path = config.get_state_path(); |
| if !Path::new(&state_path).exists() && create_dir_all(&state_path).await.is_err() { |
| return Err(IggyError::CannotCreateStateDirectory(state_path)); |
| } |
| let state_log = config.get_state_messages_file_path(); |
| if !Path::new(&state_log).exists() && (overwrite(&state_log).await).is_err() { |
| return Err(IggyError::CannotCreateStateDirectory(state_log)); |
| } |
| |
| let streams_path = config.get_streams_path(); |
| if !Path::new(&streams_path).exists() && create_dir_all(&streams_path).await.is_err() { |
| return Err(IggyError::CannotCreateStreamsDirectory(streams_path)); |
| } |
| |
| let runtime_path = config.get_runtime_path(); |
| if Path::new(&runtime_path).exists() && fs_utils::remove_dir_all(&runtime_path).await.is_err() { |
| return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path)); |
| } |
| |
| if create_dir_all(&runtime_path).await.is_err() { |
| return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path)); |
| } |
| |
| info!( |
| "Initializing system, data will be stored at: {}", |
| config.get_system_path() |
| ); |
| Ok(()) |
| } |
| |
| pub fn create_root_user() -> User { |
| let mut username = env::var(IGGY_ROOT_USERNAME_ENV); |
| let mut password = env::var(IGGY_ROOT_PASSWORD_ENV); |
| if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) { |
| panic!( |
| "When providing the custom root user credentials, both username and password must be set." |
| ); |
| } |
| if username.is_ok() && password.is_ok() { |
| info!("Using the custom root user credentials."); |
| } else { |
| info!("Using the default root user credentials..."); |
| username = Ok(DEFAULT_ROOT_USERNAME.to_string()); |
| let generated_password = crypto::generate_secret(20..40); |
| println!("Generated root user password: {generated_password}"); |
| password = Ok(generated_password); |
| } |
| |
| let username = username.expect("Root username is not set."); |
| let password = password.expect("Root password is not set."); |
| if username.is_empty() || password.is_empty() { |
| panic!("Root user credentials are not set."); |
| } |
| if username.len() < MIN_USERNAME_LENGTH { |
| panic!("Root username is too short."); |
| } |
| if username.len() > MAX_USERNAME_LENGTH { |
| panic!("Root username is too long."); |
| } |
| if password.len() < MIN_PASSWORD_LENGTH { |
| panic!("Root password is too short."); |
| } |
| if password.len() > MAX_PASSWORD_LENGTH { |
| panic!("Root password is too long."); |
| } |
| |
| User::root(&username, &password) |
| } |
| |
| pub fn create_shard_executor() -> Runtime { |
| // TODO: The event interval tick, could be configured based on the fact |
| // How many clients we expect to have connected. |
| // This roughly estimates the number of tasks we will create. |
| let mut proactor = compio::driver::ProactorBuilder::new(); |
| |
| proactor |
| .capacity(4096) |
| .coop_taskrun(true) |
| .taskrun_flag(true); |
| |
| // FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms |
| // This causes a freeze on macOS with compio fs operations |
| // see https://github.com/compio-rs/compio/issues/446 |
| #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))] |
| proactor.thread_pool_limit(0); |
| |
| compio::runtime::RuntimeBuilder::new() |
| .with_proactor(proactor.to_owned()) |
| .event_interval(128) |
| .build() |
| .unwrap() |
| } |
| |
| pub fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> { |
| match enforce_fsync { |
| true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)), |
| false => Arc::new(PersisterKind::File(FilePersister)), |
| } |
| } |
| |
| pub async fn update_system_info( |
| storage: &SystemStorage, |
| system_info: &mut SystemInfo, |
| version: &SemanticVersion, |
| ) -> Result<(), IggyError> { |
| system_info.update_version(version); |
| storage.info.save(system_info).await?; |
| Ok(()) |
| } |
| |
| async fn collect_log_files(partition_path: &str) -> Result<Vec<DirEntry>, IggyError> { |
| let dir_entries = fs_utils::walk_dir(&partition_path) |
| .await |
| .map_err(|_| IggyError::CannotReadPartitions)?; |
| let mut log_files = Vec::new(); |
| for entry in dir_entries { |
| if entry.is_dir { |
| continue; |
| } |
| |
| let extension = entry.path.extension(); |
| if extension.is_none() || extension.unwrap() != LOG_EXTENSION { |
| continue; |
| } |
| |
| log_files.push(entry); |
| } |
| |
| Ok(log_files) |
| } |
| |
| pub async fn load_segments( |
| config: &SystemConfig, |
| stream_id: usize, |
| topic_id: usize, |
| partition_id: usize, |
| partition_path: String, |
| stats: Arc<PartitionStats>, |
| ) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> { |
| let mut log_files = collect_log_files(&partition_path).await?; |
| log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name())); |
| let mut log = SegmentedLog::<MemoryMessageJournal>::default(); |
| for entry in log_files { |
| let log_file_name = entry |
| .path |
| .file_stem() |
| .unwrap() |
| .to_string_lossy() |
| .to_string(); |
| |
| let start_offset = log_file_name.parse::<u64>().unwrap(); |
| |
| let messages_file_path = format!("{}/{}.{}", partition_path, log_file_name, LOG_EXTENSION); |
| let index_file_path = format!("{}/{}.{}", partition_path, log_file_name, INDEX_EXTENSION); |
| |
| async fn try_exists(path: &str) -> Result<bool, std::io::Error> { |
| match compio::fs::metadata(path).await { |
| Ok(_) => Ok(true), |
| Err(err) => match err.kind() { |
| std::io::ErrorKind::NotFound => Ok(false), |
| _ => Err(err), |
| }, |
| } |
| } |
| |
| let index_path_exists = try_exists(&index_file_path).await.unwrap(); |
| let index_cache_enabled = matches!( |
| config.segment.cache_indexes, |
| CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment |
| ); |
| |
| if index_cache_enabled && !index_path_exists { |
| warn!( |
| "Index at path {} does not exist, rebuilding it based on {}...", |
| index_file_path, messages_file_path |
| ); |
| let now = std::time::Instant::now(); |
| let index_rebuilder = IndexRebuilder::new( |
| messages_file_path.clone(), |
| index_file_path.clone(), |
| start_offset, |
| ); |
| index_rebuilder.rebuild().await.unwrap_or_else(|e| { |
| panic!( |
| "Failed to rebuild index for partition with ID: {} for stream with ID: {} and topic with ID: {}. Error: {e}", |
| partition_id, stream_id, topic_id, |
| ) |
| }); |
| info!( |
| "Rebuilding index for path {} finished, it took {} ms", |
| index_file_path, |
| now.elapsed().as_millis() |
| ); |
| } |
| |
| let messages_metadata = compio::fs::metadata(&messages_file_path) |
| .await |
| .map_err(|_| IggyError::CannotReadPartitions)?; |
| let messages_size = messages_metadata.len() as u32; |
| |
| let index_size = match compio::fs::metadata(&index_file_path).await { |
| Ok(metadata) => metadata.len() as u32, |
| Err(_) => 0, // Default to 0 if index file doesn't exist |
| }; |
| |
| let storage = Storage::new( |
| &messages_file_path, |
| &index_file_path, |
| messages_size as u64, |
| index_size as u64, |
| config.partition.enforce_fsync, |
| config.partition.enforce_fsync, |
| true, |
| ) |
| .await?; |
| |
| let loaded_indexes = { |
| storage. |
| index_reader |
| .as_ref() |
| .unwrap() |
| .load_all_indexes_from_disk() |
| .await |
| .error(|e: &IggyError| format!("Failed to load indexes during startup for stream ID: {}, topic ID: {}, partition_id: {}, {e}", stream_id, topic_id, partition_id)) |
| .map_err(|_| IggyError::CannotReadFile)? |
| }; |
| |
| let end_offset = if loaded_indexes.count() == 0 { |
| 0 |
| } else { |
| let last_index_offset = loaded_indexes.last().unwrap().offset() as u64; |
| start_offset + last_index_offset |
| }; |
| |
| let (start_timestamp, end_timestamp) = if loaded_indexes.count() == 0 { |
| (0, 0) |
| } else { |
| ( |
| loaded_indexes.get(0).unwrap().timestamp(), |
| loaded_indexes.last().unwrap().timestamp(), |
| ) |
| }; |
| |
| let mut segment = Segment::new(start_offset, config.segment.size); |
| |
| segment.start_timestamp = start_timestamp; |
| segment.end_timestamp = end_timestamp; |
| segment.end_offset = end_offset; |
| segment.size = IggyByteSize::from(messages_size as u64); |
| // At segment load, set the current position to the size of the segment (No data is buffered yet). |
| segment.current_position = segment.size.as_bytes_u32(); |
| segment.sealed = true; // Persisted segments are assumed to be sealed |
| |
| if config.partition.validate_checksum { |
| info!( |
| "Validating checksum for segment at offset {} in stream ID: {}, topic ID: {}, partition ID: {}", |
| start_offset, stream_id, topic_id, partition_id |
| ); |
| let messages_count = loaded_indexes.count() as u32; |
| if messages_count > 0 { |
| const BATCH_COUNT: u32 = 10000; |
| let mut current_relative_offset = 0u32; |
| let mut processed_count = 0u32; |
| |
| while processed_count < messages_count { |
| let remaining_count = messages_count - processed_count; |
| let batch_count = std::cmp::min(BATCH_COUNT, remaining_count); |
| let batch_indexes = loaded_indexes |
| .slice_by_offset(current_relative_offset, batch_count) |
| .unwrap(); |
| |
| let messages_reader = storage.messages_reader.as_ref().unwrap(); |
| match messages_reader.load_messages_from_disk(batch_indexes).await { |
| Ok(messages_batch) => { |
| if let Err(e) = messages_batch.validate_checksums() { |
| return Err(IggyError::CannotReadPartitions).error(|_: &IggyError| { |
| format!( |
| "Failed to validate message checksum for segment at offset {} in stream ID: {}, topic ID: {}, partition ID: {}, error: {}", |
| start_offset, stream_id, topic_id, partition_id, e |
| ) |
| }); |
| } |
| processed_count += messages_batch.count(); |
| current_relative_offset += batch_count; |
| } |
| Err(e) => { |
| return Err(e).error(|_: &IggyError| { |
| format!( |
| "Failed to load messages from disk for checksum validation at offset {} in stream ID: {}, topic ID: {}, partition ID: {}", |
| start_offset, stream_id, topic_id, partition_id |
| ) |
| }); |
| } |
| } |
| } |
| info!( |
| "Checksum validation completed for segment at offset {}", |
| start_offset |
| ); |
| } |
| } |
| |
| log.add_persisted_segment(segment, storage); |
| |
| stats.increment_segments_count(1); |
| |
| stats.increment_size_bytes(messages_size as u64); |
| |
| let messages_count = if end_offset > start_offset { |
| (end_offset - start_offset + 1) as u64 |
| } else if messages_size > 0 { |
| loaded_indexes.count() as u64 |
| } else { |
| 0 |
| }; |
| |
| if messages_count > 0 { |
| stats.increment_messages_count(messages_count); |
| } |
| |
| let should_cache_indexes = match config.segment.cache_indexes { |
| CacheIndexesConfig::All => true, |
| CacheIndexesConfig::OpenSegment => false, |
| CacheIndexesConfig::None => false, |
| }; |
| |
| if should_cache_indexes { |
| let segment_index = log.segments().len() - 1; |
| log.set_segment_indexes(segment_index, loaded_indexes); |
| } |
| } |
| |
| // The last segment is the active one and must remain unsealed for writes |
| if log.has_segments() { |
| log.segments_mut().last_mut().unwrap().sealed = false; |
| } |
| |
| if matches!( |
| config.segment.cache_indexes, |
| CacheIndexesConfig::OpenSegment |
| ) && log.has_segments() |
| { |
| let segments_count = log.segments().len(); |
| if segments_count > 0 { |
| let last_storage = log.storages().last().unwrap(); |
| match last_storage.index_reader.as_ref() { |
| Some(index_reader) => { |
| if let Ok(loaded_indexes) = index_reader.load_all_indexes_from_disk().await { |
| log.set_segment_indexes(segments_count - 1, loaded_indexes); |
| } |
| } |
| None => { |
| warn!("Index reader not available for last segment in OpenSegment mode"); |
| } |
| } |
| } |
| } |
| |
| Ok(log) |
| } |
| |
| /// Builds `InnerMetadata` from persisted user and stream state. |
| pub fn build_inner_metadata( |
| users_state: impl IntoIterator<Item = UserState>, |
| streams_state: impl IntoIterator<Item = StreamState>, |
| ) -> crate::metadata::InnerMetadata { |
| use crate::metadata::InnerMetadata; |
| use std::sync::atomic::AtomicUsize; |
| |
| let mut user_entries = Vec::new(); |
| let mut user_index = ahash::AHashMap::default(); |
| let mut personal_access_tokens: ahash::AHashMap< |
| u32, |
| ahash::AHashMap<Arc<str>, PersonalAccessToken>, |
| > = ahash::AHashMap::default(); |
| let mut users_count = 0; |
| let mut pats_count = 0; |
| |
| for UserState { |
| id, |
| username, |
| password_hash, |
| status, |
| created_at, |
| permissions, |
| personal_access_tokens: user_pats, |
| } in users_state |
| { |
| let username_arc: Arc<str> = Arc::from(username.as_str()); |
| let user_meta = UserMeta { |
| id, |
| username: username_arc.clone(), |
| password_hash: Arc::from(password_hash.as_str()), |
| status, |
| permissions: permissions.map(Arc::new), |
| created_at, |
| }; |
| user_entries.push((id as usize, user_meta)); |
| user_index.insert(username_arc, id); |
| |
| if !user_pats.is_empty() { |
| let user_pat_map: ahash::AHashMap<Arc<str>, PersonalAccessToken> = user_pats |
| .into_values() |
| .map(|token| { |
| let pat = PersonalAccessToken::raw( |
| id, |
| &token.name, |
| &token.token_hash, |
| token.expiry_at, |
| ); |
| (Arc::from(token.token_hash.as_str()), pat) |
| }) |
| .collect(); |
| pats_count += user_pat_map.len(); |
| personal_access_tokens.insert(id, user_pat_map); |
| } |
| |
| users_count += 1; |
| } |
| info!( |
| "Building metadata: {} users, {} personal access tokens", |
| users_count, pats_count |
| ); |
| |
| let mut stream_entries = Vec::new(); |
| let mut stream_index = ahash::AHashMap::default(); |
| let mut streams_count = 0; |
| let mut topics_count = 0; |
| let mut partitions_count = 0; |
| let mut consumer_groups_count = 0; |
| |
| for StreamState { |
| name, |
| created_at, |
| id, |
| topics, |
| } in streams_state |
| { |
| info!( |
| "Building stream with ID: {}, name: {} metadata...", |
| id, name |
| ); |
| let stream_id = id as usize; |
| let stream_name: Arc<str> = Arc::from(name.as_str()); |
| |
| let stream_stats = Arc::new(StreamStats::default()); |
| |
| let mut topic_entries = Vec::new(); |
| let mut topic_index = ahash::AHashMap::default(); |
| |
| for TopicState { |
| id, |
| name, |
| created_at, |
| compression_algorithm, |
| message_expiry, |
| max_topic_size, |
| replication_factor, |
| consumer_groups, |
| partitions, |
| } in topics.into_values() |
| { |
| info!("Building topic with ID: {}, name: {} metadata...", id, name); |
| let topic_id = id as usize; |
| let topic_name: Arc<str> = Arc::from(name.as_str()); |
| |
| let topic_stats = Arc::new(TopicStats::new(stream_stats.clone())); |
| |
| let mut partition_entries = Vec::new(); |
| let mut partition_ids = Vec::new(); |
| |
| for partition_state in partitions.into_values() { |
| let partition_id = partition_state.id as usize; |
| partition_ids.push(partition_id); |
| |
| let partition_stats = Arc::new(PartitionStats::new(topic_stats.clone())); |
| let partition_meta = PartitionMeta { |
| id: partition_id, |
| created_at: partition_state.created_at, |
| revision_id: 0, |
| stats: partition_stats, |
| consumer_offsets: Arc::new(ConsumerOffsets::with_capacity(0)), |
| consumer_group_offsets: Arc::new(ConsumerGroupOffsets::with_capacity(0)), |
| }; |
| partition_entries.push((partition_id, partition_meta)); |
| partitions_count += 1; |
| } |
| |
| partition_ids.sort_unstable(); |
| |
| let mut cg_entries = Vec::new(); |
| let mut cg_index = ahash::AHashMap::default(); |
| |
| for cg_state in consumer_groups.into_values() { |
| info!( |
| "Building consumer group with ID: {}, name: {} for topic with ID: {} metadata...", |
| cg_state.id, cg_state.name, topic_id |
| ); |
| let group_id = cg_state.id as usize; |
| let group_name: Arc<str> = Arc::from(cg_state.name.as_str()); |
| let cg_meta = ConsumerGroupMeta { |
| id: group_id, |
| name: group_name.clone(), |
| partitions: partition_ids.clone(), |
| members: Slab::new(), |
| }; |
| cg_entries.push((group_id, cg_meta)); |
| cg_index.insert(group_name, group_id); |
| consumer_groups_count += 1; |
| } |
| |
| let topic_meta = TopicMeta { |
| id: topic_id, |
| name: topic_name.clone(), |
| created_at, |
| message_expiry, |
| compression_algorithm, |
| max_topic_size, |
| replication_factor: replication_factor.unwrap_or(1), |
| stats: topic_stats, |
| partitions: partition_entries.into_iter().map(|(_, p)| p).collect(), |
| consumer_groups: cg_entries.into_iter().collect(), |
| consumer_group_index: cg_index, |
| round_robin_counter: Arc::new(AtomicUsize::new(0)), |
| }; |
| topic_entries.push((topic_id, topic_meta)); |
| topic_index.insert(topic_name, topic_id); |
| topics_count += 1; |
| } |
| |
| let stream_meta = StreamMeta { |
| id: stream_id, |
| name: stream_name.clone(), |
| created_at, |
| stats: stream_stats, |
| topics: topic_entries.into_iter().collect(), |
| topic_index, |
| }; |
| stream_entries.push((stream_id, stream_meta)); |
| stream_index.insert(stream_name, stream_id); |
| streams_count += 1; |
| } |
| |
| info!( |
| "Built metadata: {} streams, {} topics, {} partitions, {} consumer groups", |
| streams_count, topics_count, partitions_count, consumer_groups_count |
| ); |
| |
| InnerMetadata { |
| streams: stream_entries.into_iter().collect(), |
| users: user_entries.into_iter().collect(), |
| stream_index, |
| user_index, |
| personal_access_tokens, |
| users_global_permissions: Default::default(), |
| users_stream_permissions: Default::default(), |
| users_can_poll_all_streams: Default::default(), |
| users_can_send_all_streams: Default::default(), |
| users_can_poll_stream: Default::default(), |
| users_can_send_stream: Default::default(), |
| } |
| } |
| |
| /// Loads all metadata from persisted state into metadata writer. |
| /// Single atomic initialization via `Initialize` operation. |
| pub fn load_metadata( |
| users_state: impl IntoIterator<Item = UserState>, |
| streams_state: impl IntoIterator<Item = StreamState>, |
| writer: &mut MetadataWriter, |
| ) { |
| let inner = build_inner_metadata(users_state, streams_state); |
| writer.initialize(inner); |
| } |