blob: 13690a8139ec831457252cacc08c40f5413325ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::{
IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
compat::index_rebuilding::index_rebuilder::IndexRebuilder,
configs::{
cache_indexes::CacheIndexesConfig,
server::ServerConfig,
sharding::ShardInfo,
system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig},
},
io::fs_utils::{self, DirEntry},
server_error::ServerError,
shard::{
system::info::SystemInfo,
transmission::{
connector::{ShardConnector, StopSender},
frame::ShardFrame,
},
},
slab::{
consumer_groups::ConsumerGroups, partitions::Partitions, streams::Streams, topics::Topics,
traits_ext::IntoComponents, users::Users,
},
state::system::{StreamState, UserState},
streaming::{
partitions::{
consumer_offset::ConsumerOffset,
helpers::create_message_deduplicator,
journal::MemoryMessageJournal,
log::SegmentedLog,
partition,
storage::{load_consumer_group_offsets, load_consumer_offsets},
},
persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind},
polling_consumer::ConsumerGroupId,
segments::{Segment, storage::Storage},
stats::{PartitionStats, StreamStats, TopicStats},
storage::SystemStorage,
streams::stream,
topics::{consumer_group, topic},
users::user::User,
utils::{crypto, file::overwrite},
},
versioning::SemanticVersion,
};
use ahash::HashMap;
use compio::{fs::create_dir_all, runtime::Runtime};
use err_trail::ErrContext;
use iggy_common::{
IggyByteSize, IggyError, PersonalAccessToken,
defaults::{
DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH,
MIN_USERNAME_LENGTH,
},
};
use std::{env, path::Path, sync::Arc};
use tracing::{info, warn};
pub async fn load_streams(
state: impl IntoIterator<Item = StreamState>,
config: &SystemConfig,
) -> Result<Streams, IggyError> {
let state: Vec<StreamState> = state.into_iter().collect();
let mut stream_entries = Vec::with_capacity(state.len());
for stream_state in state {
let stream_id = stream_state.id as usize;
info!(
"Loading stream with ID: {}, name: {} from state...",
stream_state.id, stream_state.name
);
let stream_stats = Arc::new(StreamStats::default());
let mut topic_entries = Vec::new();
for topic_state in stream_state.topics.into_values() {
let topic_id = topic_state.id as usize;
info!(
"Loading topic with ID: {}, name: {} from state...",
topic_state.id, topic_state.name
);
let topic_stats = Arc::new(TopicStats::new(stream_stats.clone()));
// Build partitions
let mut partition_entries = Vec::new();
for partition_state in topic_state.partitions.into_values() {
let partition_id = partition_state.id as usize;
info!(
"Loading partition with ID: {}, for topic with ID: {} from state...",
partition_id, topic_id
);
let partition = load_partition(
config,
stream_id,
topic_id,
partition_state,
topic_stats.clone(),
)
.await?;
partition_entries.push((partition_id, partition));
info!(
"Loaded partition with ID: {}, for topic with ID: {} from state...",
partition_id, topic_id
);
}
// Build consumer groups
let partition_ids: Vec<_> = partition_entries.iter().map(|(id, _)| *id).collect();
let cg_entries: Vec<_> = topic_state
.consumer_groups
.into_values()
.map(|cg_state| {
info!(
"Loading consumer group with ID: {}, name: {} for topic with ID: {} from state...",
cg_state.id, cg_state.name, topic_id
);
let cg = consumer_group::ConsumerGroup::new(
cg_state.name.clone(),
Default::default(),
partition_ids.clone(),
);
info!(
"Loaded consumer group with ID: {}, name: {} for topic with ID: {} from state...",
cg_state.id, cg_state.name, topic_id
);
(cg_state.id as usize, cg)
})
.collect();
// Build topic with pre-built partitions and consumer groups
let mut topic = topic::Topic::new(
topic_state.name.clone(),
topic_stats,
topic_state.created_at,
topic_state.replication_factor.unwrap_or(1),
topic_state.message_expiry,
topic_state.compression_algorithm,
topic_state.max_topic_size,
);
// Decompose, set nested containers, recompose
let (mut root, auxilary, stats) = topic.into_components();
root.set_partitions(Partitions::from_entries(partition_entries));
root.set_consumer_groups(ConsumerGroups::from_entries(cg_entries));
topic = topic::Topic::new_with_components(root, auxilary, stats);
topic_entries.push((topic_id, topic));
info!(
"Loaded topic with ID: {}, name: {} from state...",
topic_state.id, topic_state.name
);
}
// Build stream with pre-built topics
let mut stream = stream::Stream::new(
stream_state.name.clone(),
stream_stats,
stream_state.created_at,
);
// Decompose, set nested containers, recompose
let (mut root, stats) = stream.into_components();
root.set_topics(Topics::from_entries(topic_entries));
stream = stream::Stream::new_with_components(root, stats);
stream_entries.push((stream_id, stream));
info!(
"Loaded stream with ID: {}, name: {} from state...",
stream_state.id, stream_state.name
);
}
Ok(Streams::from_entries(stream_entries))
}
pub fn load_users(state: impl IntoIterator<Item = UserState>) -> Users {
let users = Users::new();
for user_state in state {
let UserState {
id,
username,
password_hash,
status,
created_at,
permissions,
personal_access_tokens,
} = user_state;
let mut user = User::with_password(id, &username, password_hash, status, permissions);
user.created_at = created_at;
user.personal_access_tokens = personal_access_tokens
.into_values()
.map(|token| {
(
Arc::from(token.token_hash.as_str()),
PersonalAccessToken::raw(id, &token.name, &token.token_hash, token.expiry_at),
)
})
.collect();
users.insert(user);
}
users
}
pub fn create_shard_connections(
shard_assignment: &[ShardInfo],
) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
// Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU core numbers
let connectors: Vec<ShardConnector<ShardFrame>> = shard_assignment
.iter()
.enumerate()
.map(|(idx, _assignment)| {
// let cpu_id = assignment.cpu_set.iter().next().unwrap_or(&idx);
ShardConnector::new(idx as u16)
})
.collect();
let shutdown_handles = connectors
.iter()
.map(|conn| (conn.id, conn.stop_sender.clone()))
.collect();
(connectors, shutdown_handles)
}
pub async fn load_config() -> Result<ServerConfig, ServerError> {
let config = ServerConfig::load().await?;
Ok(config)
}
pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> {
let system_path = config.get_system_path();
if !Path::new(&system_path).exists() && create_dir_all(&system_path).await.is_err() {
return Err(IggyError::CannotCreateBaseDirectory(system_path));
}
let state_path = config.get_state_path();
if !Path::new(&state_path).exists() && create_dir_all(&state_path).await.is_err() {
return Err(IggyError::CannotCreateStateDirectory(state_path));
}
let state_log = config.get_state_messages_file_path();
if !Path::new(&state_log).exists() && (overwrite(&state_log).await).is_err() {
return Err(IggyError::CannotCreateStateDirectory(state_log));
}
let streams_path = config.get_streams_path();
if !Path::new(&streams_path).exists() && create_dir_all(&streams_path).await.is_err() {
return Err(IggyError::CannotCreateStreamsDirectory(streams_path));
}
let runtime_path = config.get_runtime_path();
if Path::new(&runtime_path).exists() && fs_utils::remove_dir_all(&runtime_path).await.is_err() {
return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
}
if create_dir_all(&runtime_path).await.is_err() {
return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path));
}
info!(
"Initializing system, data will be stored at: {}",
config.get_system_path()
);
Ok(())
}
pub fn create_root_user() -> User {
let mut username = env::var(IGGY_ROOT_USERNAME_ENV);
let mut password = env::var(IGGY_ROOT_PASSWORD_ENV);
if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) {
panic!(
"When providing the custom root user credentials, both username and password must be set."
);
}
if username.is_ok() && password.is_ok() {
info!("Using the custom root user credentials.");
} else {
info!("Using the default root user credentials...");
username = Ok(DEFAULT_ROOT_USERNAME.to_string());
let generated_password = crypto::generate_secret(20..40);
println!("Generated root user password: {generated_password}");
password = Ok(generated_password);
}
let username = username.expect("Root username is not set.");
let password = password.expect("Root password is not set.");
if username.is_empty() || password.is_empty() {
panic!("Root user credentials are not set.");
}
if username.len() < MIN_USERNAME_LENGTH {
panic!("Root username is too short.");
}
if username.len() > MAX_USERNAME_LENGTH {
panic!("Root username is too long.");
}
if password.len() < MIN_PASSWORD_LENGTH {
panic!("Root password is too short.");
}
if password.len() > MAX_PASSWORD_LENGTH {
panic!("Root password is too long.");
}
User::root(&username, &password)
}
pub fn create_shard_executor() -> Runtime {
// TODO: The event interval tick, could be configured based on the fact
// How many clients we expect to have connected.
// This roughly estimates the number of tasks we will create.
let mut proactor = compio::driver::ProactorBuilder::new();
proactor
.capacity(4096)
.coop_taskrun(true)
.taskrun_flag(true);
// FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms
// This causes a freeze on macOS with compio fs operations
// see https://github.com/compio-rs/compio/issues/446
#[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
proactor.thread_pool_limit(0);
compio::runtime::RuntimeBuilder::new()
.with_proactor(proactor.to_owned())
.event_interval(128)
.build()
.unwrap()
}
pub fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
match enforce_fsync {
true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)),
false => Arc::new(PersisterKind::File(FilePersister)),
}
}
pub async fn update_system_info(
storage: &SystemStorage,
system_info: &mut SystemInfo,
version: &SemanticVersion,
) -> Result<(), IggyError> {
system_info.update_version(version);
storage.info.save(system_info).await?;
Ok(())
}
async fn collect_log_files(partition_path: &str) -> Result<Vec<DirEntry>, IggyError> {
let dir_entries = fs_utils::walk_dir(&partition_path)
.await
.map_err(|_| IggyError::CannotReadPartitions)?;
let mut log_files = Vec::new();
for entry in dir_entries {
if entry.is_dir {
continue;
}
let extension = entry.path.extension();
if extension.is_none() || extension.unwrap() != LOG_EXTENSION {
continue;
}
log_files.push(entry);
}
Ok(log_files)
}
pub async fn load_segments(
config: &SystemConfig,
stream_id: usize,
topic_id: usize,
partition_id: usize,
partition_path: String,
stats: Arc<PartitionStats>,
) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> {
let mut log_files = collect_log_files(&partition_path).await?;
log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
let mut log = SegmentedLog::<MemoryMessageJournal>::default();
for entry in log_files {
let log_file_name = entry
.path
.file_stem()
.unwrap()
.to_string_lossy()
.to_string();
let start_offset = log_file_name.parse::<u64>().unwrap();
let messages_file_path = format!("{}/{}.{}", partition_path, log_file_name, LOG_EXTENSION);
let index_file_path = format!("{}/{}.{}", partition_path, log_file_name, INDEX_EXTENSION);
let time_index_path = index_file_path.replace(INDEX_EXTENSION, "timeindex");
async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
match compio::fs::metadata(path).await {
Ok(_) => Ok(true),
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => Ok(false),
_ => Err(err),
},
}
}
let index_path_exists = try_exists(&index_file_path).await.unwrap();
let time_index_path_exists = try_exists(&time_index_path).await.unwrap();
let index_cache_enabled = matches!(
config.segment.cache_indexes,
CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
);
if index_cache_enabled && (!index_path_exists || time_index_path_exists) {
warn!(
"Index at path {} does not exist, rebuilding it based on {}...",
index_file_path, messages_file_path
);
let now = std::time::Instant::now();
let index_rebuilder = IndexRebuilder::new(
messages_file_path.clone(),
index_file_path.clone(),
start_offset,
);
index_rebuilder.rebuild().await.unwrap_or_else(|e| {
panic!(
"Failed to rebuild index for partition with ID: {} for stream with ID: {} and topic with ID: {}. Error: {e}",
partition_id, stream_id, topic_id,
)
});
info!(
"Rebuilding index for path {} finished, it took {} ms",
index_file_path,
now.elapsed().as_millis()
);
}
if time_index_path_exists {
compio::fs::remove_file(&time_index_path).await.unwrap();
}
let messages_metadata = compio::fs::metadata(&messages_file_path)
.await
.map_err(|_| IggyError::CannotReadPartitions)?;
let messages_size = messages_metadata.len() as u32;
let index_size = match compio::fs::metadata(&index_file_path).await {
Ok(metadata) => metadata.len() as u32,
Err(_) => 0, // Default to 0 if index file doesn't exist
};
let storage = Storage::new(
&messages_file_path,
&index_file_path,
messages_size as u64,
index_size as u64,
config.partition.enforce_fsync,
config.partition.enforce_fsync,
true,
)
.await?;
let loaded_indexes = {
storage.
index_reader
.as_ref()
.unwrap()
.load_all_indexes_from_disk()
.await
.error(|e: &IggyError| format!("Failed to load indexes during startup for stream ID: {}, topic ID: {}, partition_id: {}, {e}", stream_id, topic_id, partition_id))
.map_err(|_| IggyError::CannotReadFile)?
};
let end_offset = if loaded_indexes.count() == 0 {
0
} else {
let last_index_offset = loaded_indexes.last().unwrap().offset() as u64;
start_offset + last_index_offset
};
let (start_timestamp, end_timestamp) = if loaded_indexes.count() == 0 {
(0, 0)
} else {
(
loaded_indexes.get(0).unwrap().timestamp(),
loaded_indexes.last().unwrap().timestamp(),
)
};
let mut segment = Segment::new(
start_offset,
config.segment.size,
config.segment.message_expiry,
);
segment.start_timestamp = start_timestamp;
segment.end_timestamp = end_timestamp;
segment.end_offset = end_offset;
segment.size = IggyByteSize::from(messages_size as u64);
// At segment load, set the current position to the size of the segment (No data is buffered yet).
segment.current_position = segment.size.as_bytes_u32();
segment.sealed = true; // Persisted segments are assumed to be sealed
if config.partition.validate_checksum {
info!(
"Validating checksum for segment at offset {} in stream ID: {}, topic ID: {}, partition ID: {}",
start_offset, stream_id, topic_id, partition_id
);
let messages_count = loaded_indexes.count() as u32;
if messages_count > 0 {
const BATCH_COUNT: u32 = 10000;
let mut current_relative_offset = 0u32;
let mut processed_count = 0u32;
while processed_count < messages_count {
let remaining_count = messages_count - processed_count;
let batch_count = std::cmp::min(BATCH_COUNT, remaining_count);
let batch_indexes = loaded_indexes
.slice_by_offset(current_relative_offset, batch_count)
.unwrap();
let messages_reader = storage.messages_reader.as_ref().unwrap();
match messages_reader.load_messages_from_disk(batch_indexes).await {
Ok(messages_batch) => {
if let Err(e) = messages_batch.validate_checksums() {
return Err(IggyError::CannotReadPartitions).error(|_: &IggyError| {
format!(
"Failed to validate message checksum for segment at offset {} in stream ID: {}, topic ID: {}, partition ID: {}, error: {}",
start_offset, stream_id, topic_id, partition_id, e
)
});
}
processed_count += messages_batch.count();
current_relative_offset += batch_count;
}
Err(e) => {
return Err(e).error(|_: &IggyError| {
format!(
"Failed to load messages from disk for checksum validation at offset {} in stream ID: {}, topic ID: {}, partition ID: {}",
start_offset, stream_id, topic_id, partition_id
)
});
}
}
}
info!(
"Checksum validation completed for segment at offset {}",
start_offset
);
}
}
log.add_persisted_segment(segment, storage);
stats.increment_segments_count(1);
stats.increment_size_bytes(messages_size as u64);
let messages_count = if end_offset > start_offset {
(end_offset - start_offset + 1) as u64
} else if messages_size > 0 {
loaded_indexes.count() as u64
} else {
0
};
if messages_count > 0 {
stats.increment_messages_count(messages_count);
}
let should_cache_indexes = match config.segment.cache_indexes {
CacheIndexesConfig::All => true,
CacheIndexesConfig::OpenSegment => false,
CacheIndexesConfig::None => false,
};
if should_cache_indexes {
let segment_index = log.segments().len() - 1;
log.set_segment_indexes(segment_index, loaded_indexes);
}
}
if matches!(
config.segment.cache_indexes,
CacheIndexesConfig::OpenSegment
) && log.has_segments()
{
let segments_count = log.segments().len();
if segments_count > 0 {
let last_storage = log.storages().last().unwrap();
match last_storage.index_reader.as_ref() {
Some(index_reader) => {
if let Ok(loaded_indexes) = index_reader.load_all_indexes_from_disk().await {
log.set_segment_indexes(segments_count - 1, loaded_indexes);
}
}
None => {
warn!("Index reader not available for last segment in OpenSegment mode");
}
}
}
}
Ok(log)
}
async fn load_partition(
config: &SystemConfig,
stream_id: usize,
topic_id: usize,
partition_state: crate::state::system::PartitionState,
parent_stats: Arc<TopicStats>,
) -> Result<partition::Partition, IggyError> {
let stats = Arc::new(PartitionStats::new(parent_stats));
let partition_id = partition_state.id;
let partition_path = config.get_partition_path(stream_id, topic_id, partition_id as usize);
let log_files = collect_log_files(&partition_path).await?;
let should_increment_offset = !log_files.is_empty()
&& log_files
.first()
.map(|entry| {
let log_file_name = entry
.path
.file_stem()
.unwrap()
.to_string_lossy()
.to_string();
let start_offset = log_file_name.parse::<u64>().unwrap();
let messages_file_path = config.get_messages_file_path(
stream_id,
topic_id,
partition_id as usize,
start_offset,
);
let metadata = std::fs::metadata(&messages_file_path)
.expect("failed to get metadata for first segment in log");
metadata.len() > 0
})
.unwrap_or_else(|| false);
info!(
"Loading partition with ID: {} for stream with ID: {} and topic with ID: {}, for path: {} from disk...",
partition_id, stream_id, topic_id, partition_path
);
// Load consumer offsets
let message_deduplicator = create_message_deduplicator(config);
let consumer_offset_path =
config.get_consumer_offsets_path(stream_id, topic_id, partition_id as usize);
let consumer_group_offsets_path =
config.get_consumer_group_offsets_path(stream_id, topic_id, partition_id as usize);
let consumer_offset = Arc::new(
load_consumer_offsets(&consumer_offset_path)?
.into_iter()
.map(|offset| (offset.consumer_id as usize, offset))
.collect::<HashMap<usize, ConsumerOffset>>()
.into(),
);
let consumer_group_offset = Arc::new(
load_consumer_group_offsets(&consumer_group_offsets_path)?
.into_iter()
.collect::<HashMap<ConsumerGroupId, ConsumerOffset>>()
.into(),
);
let log = Default::default();
let partition = partition::Partition::new(
partition_state.created_at,
should_increment_offset,
stats,
message_deduplicator,
Arc::new(Default::default()),
consumer_offset,
consumer_group_offset,
log,
);
info!(
"Loaded partition with ID: {} for stream with ID: {} and topic with ID: {}",
partition_id, stream_id, topic_id
);
Ok(partition)
}