blob: 5f7f771ac03a0b4788d58eeb7fa25abed4bdfd76 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::{
IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
compat::index_rebuilding::index_rebuilder::IndexRebuilder,
configs::{
cache_indexes::CacheIndexesConfig,
config_provider::ConfigProviderKind,
server::ServerConfig,
system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig},
},
io::fs_utils::{self, DirEntry},
server_error::ServerError,
shard::{
system::info::SystemInfo,
transmission::{
connector::{ShardConnector, StopSender},
frame::ShardFrame,
},
},
slab::{
streams::Streams,
traits_ext::{
EntityComponentSystem, EntityComponentSystemMutCell, Insert, InsertCell, IntoComponents,
},
users::Users,
},
state::system::{StreamState, TopicState, UserState},
streaming::{
partitions::{
consumer_offset::ConsumerOffset,
helpers::create_message_deduplicator,
journal::MemoryMessageJournal,
log::SegmentedLog,
partition,
storage::{load_consumer_group_offsets, load_consumer_offsets},
},
persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind},
personal_access_tokens::personal_access_token::PersonalAccessToken,
polling_consumer::ConsumerGroupId,
segments::{Segment, storage::Storage},
stats::{PartitionStats, StreamStats, TopicStats},
storage::SystemStorage,
streams::stream,
topics::{consumer_group, topic},
users::user::User,
utils::{crypto, file::overwrite},
},
versioning::SemanticVersion,
};
use ahash::HashMap;
use compio::{fs::create_dir_all, runtime::Runtime};
use err_trail::ErrContext;
use iggy_common::{
IggyByteSize, IggyError,
defaults::{
DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH,
MIN_USERNAME_LENGTH,
},
};
use std::{collections::HashSet, env, path::Path, sync::Arc};
use tracing::{info, warn};
pub async fn load_streams(
state: impl IntoIterator<Item = StreamState>,
config: &SystemConfig,
) -> Result<Streams, IggyError> {
let streams = Streams::default();
for StreamState {
name,
created_at,
id,
topics,
} in state
{
info!(
"Loading stream with ID: {}, name: {} from state...",
id, name
);
let stream_id = id;
let stats = Arc::new(StreamStats::default());
let stream = stream::Stream::new(name.clone(), stats.clone(), created_at);
let new_id = streams.insert(stream);
assert_eq!(
new_id, stream_id as usize,
"load_streams: id mismatch when inserting stream, mismatch for stream with ID: {}, name: {}",
stream_id, name
);
info!(
"Loaded stream with ID: {}, name: {} from state...",
id, name
);
let topics = topics.into_values();
for TopicState {
id,
name,
created_at,
compression_algorithm,
message_expiry,
max_topic_size,
replication_factor,
consumer_groups,
partitions,
} in topics
{
info!(
"Loading topic with ID: {}, name: {} from state...",
id, name
);
let topic_id = id;
let parent_stats = stats.clone();
let stats = Arc::new(TopicStats::new(parent_stats));
let topic_id = streams.with_components_by_id_mut(stream_id as usize, |(mut root, ..)| {
let topic = topic::Topic::new(
name.clone(),
stats.clone(),
created_at,
replication_factor.unwrap_or(1),
message_expiry,
compression_algorithm,
max_topic_size,
);
let new_id = root.topics_mut().insert(topic);
assert_eq!(
new_id, topic_id as usize,
"load_streams: topic id mismatch when inserting topic, mismatch for topic with ID: {}, name: {}",
topic_id, &name
);
new_id
});
info!("Loaded topic with ID: {}, name: {} from state...", id, name);
let parent_stats = stats.clone();
let cgs = consumer_groups.into_values();
let partitions = partitions.into_values();
// Load each partition asynchronously and insert immediately
for partition_state in partitions {
info!(
"Loading partition with ID: {}, for topic with ID: {} from state...",
partition_state.id, topic_id
);
let partition_id = partition_state.id;
let partition = load_partition(
config,
stream_id as usize,
topic_id,
partition_state,
parent_stats.clone(),
)
.await?;
streams.with_components_by_id(stream_id as usize, |(root, ..)| {
root.topics()
.with_components_by_id_mut(topic_id, |(mut root, ..)| {
let new_id = root.partitions_mut().insert(partition);
assert_eq!(
new_id, partition_id as usize,
"load_streams: partition id mismatch when inserting partition, mismatch for partition with ID: {}, for topic with ID: {}, for stream with ID: {}",
partition_id, topic_id, stream_id
);
});
});
info!(
"Loaded partition with ID: {}, for topic with ID: {} from state...",
partition_id, topic_id
);
}
let partition_ids = streams.with_components_by_id(stream_id as usize, |(root, ..)| {
root.topics().with_components_by_id(topic_id, |(root, ..)| {
root.partitions().with_components(|components| {
let (root, ..) = components.into_components();
root.iter().map(|(_, root)| root.id()).collect::<Vec<_>>()
})
})
});
for cg_state in cgs {
info!(
"Loading consumer group with ID: {}, name: {} for topic with ID: {} from state...",
cg_state.id, cg_state.name, topic_id
);
streams.with_components_by_id(stream_id as usize, |(root, ..)| {
root.topics()
.with_components_by_id_mut(topic_id, |(mut root, ..)| {
let id = cg_state.id;
let cg = consumer_group::ConsumerGroup::new(cg_state.name.clone(), Default::default(), partition_ids.clone());
let new_id = root.consumer_groups_mut().insert(cg);
assert_eq!(
new_id, id as usize,
"load_streams: consumer group id mismatch when inserting consumer group, mismatch for consumer group with ID: {}, name: {} for topic with ID: {}, for stream with ID: {}",
id, cg_state.name, topic_id, stream_id
);
});
});
info!(
"Loaded consumer group with ID: {}, name: {} for topic with ID: {} from state...",
cg_state.id, cg_state.name, topic_id
);
}
}
}
Ok(streams)
}
pub fn load_users(state: impl IntoIterator<Item = UserState>) -> Users {
let users = Users::new();
for user_state in state {
let UserState {
id,
username,
password_hash,
status,
created_at,
permissions,
personal_access_tokens,
} = user_state;
let mut user = User::with_password(id, &username, password_hash, status, permissions);
user.created_at = created_at;
user.personal_access_tokens = personal_access_tokens
.into_values()
.map(|token| {
(
Arc::new(token.token_hash.clone()),
PersonalAccessToken::raw(id, &token.name, &token.token_hash, token.expiry_at),
)
})
.collect();
users.insert(user);
}
users
}
pub fn create_shard_connections(
shards_set: &HashSet<usize>,
) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
let shards_count = shards_set.len();
// Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU core numbers
let connectors: Vec<ShardConnector<ShardFrame>> = (0..shards_count)
.map(|idx| ShardConnector::new(idx as u16))
.collect();
let shutdown_handles = connectors
.iter()
.map(|conn| (conn.id, conn.stop_sender.clone()))
.collect();
(connectors, shutdown_handles)
}
pub async fn load_config(
config_provider: &ConfigProviderKind,
) -> Result<ServerConfig, ServerError> {
let config = ServerConfig::load(config_provider).await?;
Ok(config)
}
pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> {
let system_path = config.get_system_path();
if !Path::new(&system_path).exists() && create_dir_all(&system_path).await.is_err() {
return Err(IggyError::CannotCreateBaseDirectory(system_path));
}
let state_path = config.get_state_path();
if !Path::new(&state_path).exists() && create_dir_all(&state_path).await.is_err() {
return Err(IggyError::CannotCreateStateDirectory(state_path));
}
let state_log = config.get_state_messages_file_path();
if !Path::new(&state_log).exists() && (overwrite(&state_log).await).is_err() {
return Err(IggyError::CannotCreateStateDirectory(state_log));
}
let streams_path = config.get_streams_path();
if !Path::new(&streams_path).exists() && create_dir_all(&streams_path).await.is_err() {
return Err(IggyError::CannotCreateStreamsDirectory(streams_path));
}
let runtime_path = config.get_runtime_path();
if Path::new(&runtime_path).exists() && fs_utils::remove_dir_all(&runtime_path).await.is_err() {
return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
}
if create_dir_all(&runtime_path).await.is_err() {
return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path));
}
info!(
"Initializing system, data will be stored at: {}",
config.get_system_path()
);
Ok(())
}
pub fn create_root_user() -> User {
let mut username = env::var(IGGY_ROOT_USERNAME_ENV);
let mut password = env::var(IGGY_ROOT_PASSWORD_ENV);
if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) {
panic!(
"When providing the custom root user credentials, both username and password must be set."
);
}
if username.is_ok() && password.is_ok() {
info!("Using the custom root user credentials.");
} else {
info!("Using the default root user credentials...");
username = Ok(DEFAULT_ROOT_USERNAME.to_string());
let generated_password = crypto::generate_secret(20..40);
println!("Generated root user password: {generated_password}");
password = Ok(generated_password);
}
let username = username.expect("Root username is not set.");
let password = password.expect("Root password is not set.");
if username.is_empty() || password.is_empty() {
panic!("Root user credentials are not set.");
}
if username.len() < MIN_USERNAME_LENGTH {
panic!("Root username is too short.");
}
if username.len() > MAX_USERNAME_LENGTH {
panic!("Root username is too long.");
}
if password.len() < MIN_PASSWORD_LENGTH {
panic!("Root password is too short.");
}
if password.len() > MAX_PASSWORD_LENGTH {
panic!("Root password is too long.");
}
User::root(&username, &password)
}
pub fn create_shard_executor(_cpu_set: HashSet<usize>) -> Runtime {
// TODO: The event interval tick, could be configured based on the fact
// How many clients we expect to have connected.
// This roughly estimates the number of tasks we will create.
let mut proactor = compio::driver::ProactorBuilder::new();
proactor
.capacity(4096)
.coop_taskrun(true)
.taskrun_flag(true);
// FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms
// This causes a freeze on macOS with compio fs operations
// see https://github.com/compio-rs/compio/issues/446
#[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
proactor.thread_pool_limit(0);
compio::runtime::RuntimeBuilder::new()
.with_proactor(proactor.to_owned())
.event_interval(128)
.thread_affinity(_cpu_set)
.build()
.unwrap()
}
pub fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
match enforce_fsync {
true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)),
false => Arc::new(PersisterKind::File(FilePersister)),
}
}
pub async fn update_system_info(
storage: &SystemStorage,
system_info: &mut SystemInfo,
version: &SemanticVersion,
) -> Result<(), IggyError> {
system_info.update_version(version);
storage.info.save(system_info).await?;
Ok(())
}
async fn collect_log_files(partition_path: &str) -> Result<Vec<DirEntry>, IggyError> {
let dir_entries = fs_utils::walk_dir(&partition_path)
.await
.map_err(|_| IggyError::CannotReadPartitions)?;
let mut log_files = Vec::new();
for entry in dir_entries {
if entry.is_dir {
continue;
}
let extension = entry.path.extension();
if extension.is_none() || extension.unwrap() != LOG_EXTENSION {
continue;
}
log_files.push(entry);
}
Ok(log_files)
}
pub async fn load_segments(
config: &SystemConfig,
stream_id: usize,
topic_id: usize,
partition_id: usize,
partition_path: String,
stats: Arc<PartitionStats>,
) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> {
let mut log_files = collect_log_files(&partition_path).await?;
log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
let mut log = SegmentedLog::<MemoryMessageJournal>::default();
for entry in log_files {
let log_file_name = entry
.path
.file_stem()
.unwrap()
.to_string_lossy()
.to_string();
let start_offset = log_file_name.parse::<u64>().unwrap();
let messages_file_path = format!("{}/{}.{}", partition_path, log_file_name, LOG_EXTENSION);
let index_file_path = format!("{}/{}.{}", partition_path, log_file_name, INDEX_EXTENSION);
let time_index_path = index_file_path.replace(INDEX_EXTENSION, "timeindex");
async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
match compio::fs::metadata(path).await {
Ok(_) => Ok(true),
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => Ok(false),
_ => Err(err),
},
}
}
let index_path_exists = try_exists(&index_file_path).await.unwrap();
let time_index_path_exists = try_exists(&time_index_path).await.unwrap();
let index_cache_enabled = matches!(
config.segment.cache_indexes,
CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
);
if index_cache_enabled && (!index_path_exists || time_index_path_exists) {
warn!(
"Index at path {} does not exist, rebuilding it based on {}...",
index_file_path, messages_file_path
);
let now = std::time::Instant::now();
let index_rebuilder = IndexRebuilder::new(
messages_file_path.clone(),
index_file_path.clone(),
start_offset,
);
index_rebuilder.rebuild().await.unwrap_or_else(|e| {
panic!(
"Failed to rebuild index for partition with ID: {} for stream with ID: {} and topic with ID: {}. Error: {e}",
partition_id, stream_id, topic_id,
)
});
info!(
"Rebuilding index for path {} finished, it took {} ms",
index_file_path,
now.elapsed().as_millis()
);
}
if time_index_path_exists {
compio::fs::remove_file(&time_index_path).await.unwrap();
}
let messages_metadata = compio::fs::metadata(&messages_file_path)
.await
.map_err(|_| IggyError::CannotReadPartitions)?;
let messages_size = messages_metadata.len() as u32;
let index_size = match compio::fs::metadata(&index_file_path).await {
Ok(metadata) => metadata.len() as u32,
Err(_) => 0, // Default to 0 if index file doesn't exist
};
let storage = Storage::new(
&messages_file_path,
&index_file_path,
messages_size as u64,
index_size as u64,
config.partition.enforce_fsync,
config.partition.enforce_fsync,
true,
)
.await?;
let loaded_indexes = {
storage.
index_reader
.as_ref()
.unwrap()
.load_all_indexes_from_disk()
.await
.with_error(|error| format!("Failed to load indexes during startup for stream ID: {}, topic ID: {}, partition_id: {}, {error}", stream_id, topic_id, partition_id))
.map_err(|_| IggyError::CannotReadFile)?
};
let end_offset = if loaded_indexes.count() == 0 {
0
} else {
let last_index_offset = loaded_indexes.last().unwrap().offset() as u64;
start_offset + last_index_offset
};
let (start_timestamp, end_timestamp) = if loaded_indexes.count() == 0 {
(0, 0)
} else {
(
loaded_indexes.get(0).unwrap().timestamp(),
loaded_indexes.last().unwrap().timestamp(),
)
};
let mut segment = Segment::new(
start_offset,
config.segment.size,
config.segment.message_expiry,
);
segment.start_timestamp = start_timestamp;
segment.end_timestamp = end_timestamp;
segment.end_offset = end_offset;
segment.size = IggyByteSize::from(messages_size as u64);
// At segment load, set the current position to the size of the segment (No data is buffered yet).
segment.current_position = segment.size.as_bytes_u32();
segment.sealed = true; // Persisted segments are assumed to be sealed
if config.partition.validate_checksum {
info!(
"Validating checksum for segment at offset {} in stream ID: {}, topic ID: {}, partition ID: {}",
start_offset, stream_id, topic_id, partition_id
);
let messages_count = loaded_indexes.count() as u32;
if messages_count > 0 {
const BATCH_COUNT: u32 = 10000;
let mut current_relative_offset = 0u32;
let mut processed_count = 0u32;
while processed_count < messages_count {
let remaining_count = messages_count - processed_count;
let batch_count = std::cmp::min(BATCH_COUNT, remaining_count);
let batch_indexes = loaded_indexes
.slice_by_offset(current_relative_offset, batch_count)
.unwrap();
let messages_reader = storage.messages_reader.as_ref().unwrap();
match messages_reader.load_messages_from_disk(batch_indexes).await {
Ok(messages_batch) => {
if let Err(e) = messages_batch.validate_checksums() {
return Err(IggyError::CannotReadPartitions).with_error(|_| {
format!(
"Failed to validate message checksum for segment at offset {} in stream ID: {}, topic ID: {}, partition ID: {}, error: {}",
start_offset, stream_id, topic_id, partition_id, e
)
});
}
processed_count += messages_batch.count();
current_relative_offset += batch_count;
}
Err(e) => {
return Err(e).with_error(|_| {
format!(
"Failed to load messages from disk for checksum validation at offset {} in stream ID: {}, topic ID: {}, partition ID: {}",
start_offset, stream_id, topic_id, partition_id
)
});
}
}
}
info!(
"Checksum validation completed for segment at offset {}",
start_offset
);
}
}
log.add_persisted_segment(segment, storage);
stats.increment_segments_count(1);
stats.increment_size_bytes(messages_size as u64);
let messages_count = if end_offset > start_offset {
(end_offset - start_offset + 1) as u64
} else if messages_size > 0 {
loaded_indexes.count() as u64
} else {
0
};
if messages_count > 0 {
stats.increment_messages_count(messages_count);
}
let should_cache_indexes = match config.segment.cache_indexes {
CacheIndexesConfig::All => true,
CacheIndexesConfig::OpenSegment => false,
CacheIndexesConfig::None => false,
};
if should_cache_indexes {
let segment_index = log.segments().len() - 1;
log.set_segment_indexes(segment_index, loaded_indexes);
}
}
if matches!(
config.segment.cache_indexes,
CacheIndexesConfig::OpenSegment
) && log.has_segments()
{
let segments_count = log.segments().len();
if segments_count > 0 {
let last_storage = log.storages().last().unwrap();
match last_storage.index_reader.as_ref() {
Some(index_reader) => {
if let Ok(loaded_indexes) = index_reader.load_all_indexes_from_disk().await {
log.set_segment_indexes(segments_count - 1, loaded_indexes);
}
}
None => {
warn!("Index reader not available for last segment in OpenSegment mode");
}
}
}
}
Ok(log)
}
async fn load_partition(
config: &SystemConfig,
stream_id: usize,
topic_id: usize,
partition_state: crate::state::system::PartitionState,
parent_stats: Arc<TopicStats>,
) -> Result<partition::Partition, IggyError> {
let stats = Arc::new(PartitionStats::new(parent_stats));
let partition_id = partition_state.id;
let partition_path = config.get_partition_path(stream_id, topic_id, partition_id as usize);
let log_files = collect_log_files(&partition_path).await?;
let should_increment_offset = !log_files.is_empty()
&& log_files
.first()
.map(|entry| {
let log_file_name = entry
.path
.file_stem()
.unwrap()
.to_string_lossy()
.to_string();
let start_offset = log_file_name.parse::<u64>().unwrap();
let messages_file_path = config.get_messages_file_path(
stream_id,
topic_id,
partition_id as usize,
start_offset,
);
let metadata = std::fs::metadata(&messages_file_path)
.expect("failed to get metadata for first segment in log");
metadata.len() > 0
})
.unwrap_or_else(|| false);
info!(
"Loading partition with ID: {} for stream with ID: {} and topic with ID: {}, for path: {} from disk...",
partition_id, stream_id, topic_id, partition_path
);
// Load consumer offsets
let message_deduplicator = create_message_deduplicator(config);
let consumer_offset_path =
config.get_consumer_offsets_path(stream_id, topic_id, partition_id as usize);
let consumer_group_offsets_path =
config.get_consumer_group_offsets_path(stream_id, topic_id, partition_id as usize);
let consumer_offset = Arc::new(
load_consumer_offsets(&consumer_offset_path)?
.into_iter()
.map(|offset| (offset.consumer_id as usize, offset))
.collect::<HashMap<usize, ConsumerOffset>>()
.into(),
);
let consumer_group_offset = Arc::new(
load_consumer_group_offsets(&consumer_group_offsets_path)?
.into_iter()
.collect::<HashMap<ConsumerGroupId, ConsumerOffset>>()
.into(),
);
let log = Default::default();
let partition = partition::Partition::new(
partition_state.created_at,
should_increment_offset,
stats,
message_deduplicator,
Arc::new(Default::default()),
consumer_offset,
consumer_group_offset,
log,
);
info!(
"Loaded partition with ID: {} for stream with ID: {} and topic with ID: {}",
partition_id, stream_id, topic_id
);
Ok(partition)
}