blob: 0dbba743dab1ed087d832e771a1f2065a5bfe40f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::metadata::{
ConsumerGroupId, ConsumerGroupMeta, InnerMetadata, MetadataReadHandle, PartitionId,
PartitionMeta, StreamId, StreamMeta, TopicId, TopicMeta, UserId, UserMeta,
};
use crate::shard::transmission::message::{ResolvedPartition, ResolvedTopic};
use crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets;
use crate::streaming::partitions::consumer_offsets::ConsumerOffsets;
use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
use iggy_common::sharding::IggyNamespace;
use iggy_common::{
IdKind, Identifier, IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize, PersonalAccessToken,
};
use left_right::ReadGuard;
use std::sync::Arc;
use std::sync::atomic::Ordering;
/// Thread-safe wrapper for GlobalMetadata using left-right for lock-free reads.
/// Uses hierarchical structure: streams contain topics, topics contain partitions and consumer groups.
/// All mutations go through MetadataWriter (shard 0 only).
///
/// Each shard should own its own `Metadata` instance (cloned from a common source).
/// The underlying data is shared via left-right's internal mechanism.
#[derive(Clone)]
pub struct Metadata {
inner: MetadataReadHandle,
}
impl Metadata {
pub fn new(reader: MetadataReadHandle) -> Self {
Self { inner: reader }
}
#[inline]
pub(super) fn load(&self) -> ReadGuard<'_, InnerMetadata> {
self.inner
.enter()
.expect("metadata not initialized - writer must publish before reads")
}
pub fn get_stream_id(&self, identifier: &Identifier) -> Option<StreamId> {
let metadata = self.load();
match identifier.kind {
IdKind::Numeric => {
let stream_id = identifier.get_u32_value().ok()? as StreamId;
if metadata.streams.get(stream_id).is_some() {
Some(stream_id)
} else {
None
}
}
IdKind::String => {
let name = identifier.get_cow_str_value().ok()?;
metadata.stream_index.get(name.as_ref()).copied()
}
}
}
pub fn stream_name_exists(&self, name: &str) -> bool {
self.load().stream_index.contains_key(name)
}
pub fn get_topic_id(&self, stream_id: StreamId, identifier: &Identifier) -> Option<TopicId> {
let metadata = self.load();
let stream = metadata.streams.get(stream_id)?;
match identifier.kind {
IdKind::Numeric => {
let topic_id = identifier.get_u32_value().ok()? as TopicId;
if stream.topics.get(topic_id).is_some() {
Some(topic_id)
} else {
None
}
}
IdKind::String => {
let name = identifier.get_cow_str_value().ok()?;
stream.topic_index.get(&Arc::from(name.as_ref())).copied()
}
}
}
pub fn get_user_id(&self, identifier: &Identifier) -> Option<UserId> {
let metadata = self.load();
match identifier.kind {
IdKind::Numeric => Some(identifier.get_u32_value().ok()? as UserId),
IdKind::String => {
let name = identifier.get_cow_str_value().ok()?;
metadata.user_index.get(name.as_ref()).copied()
}
}
}
pub fn get_consumer_group_id(
&self,
stream_id: StreamId,
topic_id: TopicId,
identifier: &Identifier,
) -> Option<ConsumerGroupId> {
let metadata = self.load();
let stream = metadata.streams.get(stream_id)?;
let topic = stream.topics.get(topic_id)?;
match identifier.kind {
IdKind::Numeric => {
let group_id = identifier.get_u32_value().ok()? as ConsumerGroupId;
if topic.consumer_groups.get(group_id).is_some() {
Some(group_id)
} else {
None
}
}
IdKind::String => {
let name = identifier.get_cow_str_value().ok()?;
topic
.consumer_group_index
.get(&Arc::from(name.as_ref()))
.copied()
}
}
}
pub fn stream_exists(&self, id: StreamId) -> bool {
self.load().streams.get(id).is_some()
}
pub fn topic_exists(&self, stream_id: StreamId, topic_id: TopicId) -> bool {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.is_some()
}
pub fn partition_exists(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> bool {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.is_some()
}
pub fn user_exists(&self, id: UserId) -> bool {
self.load().users.get(id as usize).is_some()
}
pub fn consumer_group_exists(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
) -> bool {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.consumer_groups.get(group_id))
.is_some()
}
pub fn consumer_group_exists_by_name(
&self,
stream_id: StreamId,
topic_id: TopicId,
name: &str,
) -> bool {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.consumer_group_index.contains_key(name))
.unwrap_or(false)
}
pub fn streams_count(&self) -> usize {
self.load().streams.len()
}
pub fn next_stream_id(&self) -> usize {
self.load().streams.vacant_key()
}
pub fn topics_count(&self, stream_id: StreamId) -> usize {
self.load()
.streams
.get(stream_id)
.map(|s| s.topics.len())
.unwrap_or(0)
}
pub fn next_topic_id(&self, stream_id: StreamId) -> Option<usize> {
self.load()
.streams
.get(stream_id)
.map(|s| s.topics.vacant_key())
}
pub fn partitions_count(&self, stream_id: StreamId, topic_id: TopicId) -> usize {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.partitions.len())
.unwrap_or(0)
}
pub fn get_partitions_count(&self, stream_id: StreamId, topic_id: TopicId) -> Option<usize> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.partitions.len())
}
pub fn get_next_partition_id(&self, stream_id: StreamId, topic_id: TopicId) -> Option<usize> {
let metadata = self.load();
let topic = metadata.streams.get(stream_id)?.topics.get(topic_id)?;
let partitions_count = topic.partitions.len();
if partitions_count == 0 {
return None;
}
let counter = &topic.round_robin_counter;
let current = counter
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
Some((c + 1) % partitions_count)
})
.unwrap();
Some(current % partitions_count)
}
pub fn get_next_member_partition_id(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
member_id: usize,
calculate: bool,
) -> Option<PartitionId> {
let metadata = self.load();
let member = metadata
.streams
.get(stream_id)?
.topics
.get(topic_id)?
.consumer_groups
.get(group_id)?
.members
.get(member_id)?;
let assigned_partitions = &member.partitions;
if assigned_partitions.is_empty() {
return None;
}
let partitions_count = assigned_partitions.len();
let counter = &member.partition_index;
if calculate {
let current = counter
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some((current + 1) % partitions_count)
})
.unwrap();
Some(assigned_partitions[current % partitions_count])
} else {
let current = counter.load(Ordering::Relaxed);
Some(assigned_partitions[current % partitions_count])
}
}
pub fn users_count(&self) -> usize {
self.load().users.len()
}
pub fn username_exists(&self, username: &str) -> bool {
self.load().user_index.contains_key(username)
}
pub fn consumer_groups_count(&self, stream_id: StreamId, topic_id: TopicId) -> usize {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.consumer_groups.len())
.unwrap_or(0)
}
pub fn get_stream_stats(&self, id: StreamId) -> Option<Arc<StreamStats>> {
self.load().streams.get(id).map(|s| s.stats.clone())
}
pub fn get_topic_stats(
&self,
stream_id: StreamId,
topic_id: TopicId,
) -> Option<Arc<TopicStats>> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.stats.clone())
}
pub fn get_partition_stats(&self, ns: &IggyNamespace) -> Option<Arc<PartitionStats>> {
self.load()
.streams
.get(ns.stream_id())
.and_then(|s| s.topics.get(ns.topic_id()))
.and_then(|t| t.partitions.get(ns.partition_id()))
.map(|p| p.stats.clone())
}
pub fn get_partition_stats_by_ids(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<Arc<PartitionStats>> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.map(|p| p.stats.clone())
}
pub fn get_partition_consumer_offsets(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<Arc<ConsumerOffsets>> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.map(|p| p.consumer_offsets.clone())
}
pub fn get_partition_consumer_group_offsets(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<Arc<ConsumerGroupOffsets>> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.map(|p| p.consumer_group_offsets.clone())
}
pub fn get_user(&self, id: UserId) -> Option<UserMeta> {
self.load().users.get(id as usize).cloned()
}
pub fn get_all_users(&self) -> Vec<UserMeta> {
self.load().users.iter().map(|(_, u)| u.clone()).collect()
}
pub fn get_stream(&self, id: StreamId) -> Option<StreamMeta> {
self.load().streams.get(id).cloned()
}
pub fn get_topic(&self, stream_id: StreamId, topic_id: TopicId) -> Option<TopicMeta> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id).cloned())
}
pub fn get_partition(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<PartitionMeta> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id).cloned())
}
pub fn get_consumer_group(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
) -> Option<ConsumerGroupMeta> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.consumer_groups.get(group_id).cloned())
}
pub fn get_user_personal_access_tokens(&self, user_id: UserId) -> Vec<PersonalAccessToken> {
self.load()
.personal_access_tokens
.get(&user_id)
.map(|pats| pats.values().cloned().collect())
.unwrap_or_default()
}
pub fn get_personal_access_token_by_hash(
&self,
token_hash: &str,
) -> Option<PersonalAccessToken> {
let token_hash_arc: Arc<str> = Arc::from(token_hash);
let metadata = self.load();
for (_, user_pats) in metadata.personal_access_tokens.iter() {
if let Some(pat) = user_pats.get(&token_hash_arc) {
return Some(pat.clone());
}
}
None
}
pub fn user_pat_count(&self, user_id: UserId) -> usize {
self.load()
.personal_access_tokens
.get(&user_id)
.map(|pats| pats.len())
.unwrap_or(0)
}
pub fn user_has_pat_with_name(&self, user_id: UserId, name: &str) -> bool {
self.load()
.personal_access_tokens
.get(&user_id)
.map(|pats| pats.values().any(|pat| &*pat.name == name))
.unwrap_or(false)
}
pub fn find_pat_token_hash_by_name(&self, user_id: UserId, name: &str) -> Option<Arc<str>> {
self.load()
.personal_access_tokens
.get(&user_id)
.and_then(|pats| {
pats.iter()
.find(|(_, pat)| &*pat.name == name)
.map(|(hash, _)| hash.clone())
})
}
pub fn is_consumer_group_member(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
client_id: u32,
) -> bool {
let metadata = self.load();
metadata
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.consumer_groups.get(group_id))
.map(|g| g.members.iter().any(|(_, m)| m.client_id == client_id))
.unwrap_or(false)
}
/// Execute a closure with read access to the metadata snapshot.
/// This is the safe way to perform complex read operations that need
/// atomic access to multiple metadata fields.
///
/// The closure receives an immutable reference to the metadata and must
/// return owned data (not references). This ensures the ReadGuard is
/// dropped before any async operations can occur.
#[inline]
pub fn with_metadata<F, R>(&self, f: F) -> R
where
F: FnOnce(&InnerMetadata) -> R,
{
let guard = self.load();
f(&guard)
}
/// Get all partition IDs for a topic, sorted.
pub fn get_partition_ids(&self, stream_id: StreamId, topic_id: TopicId) -> Vec<PartitionId> {
self.with_metadata(|m| {
m.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| {
let mut ids: Vec<_> = t.partitions.iter().enumerate().map(|(k, _)| k).collect();
ids.sort_unstable();
ids
})
.unwrap_or_default()
})
}
/// Get all topic IDs for a stream, sorted.
pub fn get_topic_ids(&self, stream_id: StreamId) -> Vec<TopicId> {
self.with_metadata(|m| {
m.streams
.get(stream_id)
.map(|s| {
let mut ids: Vec<_> = s.topics.iter().map(|(k, _)| k).collect();
ids.sort_unstable();
ids
})
.unwrap_or_default()
})
}
/// Get all stream IDs, sorted.
pub fn get_stream_ids(&self) -> Vec<StreamId> {
self.with_metadata(|m| {
let mut ids: Vec<_> = m.streams.iter().map(|(k, _)| k).collect();
ids.sort_unstable();
ids
})
}
/// Get all namespaces (stream/topic/partition combinations).
pub fn get_all_namespaces(&self) -> Vec<IggyNamespace> {
self.with_metadata(|m| {
let mut namespaces = Vec::new();
for (stream_id, stream) in m.streams.iter() {
for (topic_id, topic) in stream.topics.iter() {
for (partition_id, _) in topic.partitions.iter().enumerate() {
namespaces.push(IggyNamespace::new(stream_id, topic_id, partition_id));
}
}
}
namespaces
})
}
/// Get topic configuration (message_expiry, max_topic_size).
pub fn get_topic_config(
&self,
stream_id: StreamId,
topic_id: TopicId,
) -> Option<(IggyExpiry, MaxTopicSize)> {
self.with_metadata(|m| {
m.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| (t.message_expiry, t.max_topic_size))
})
}
/// Get partition initialization info needed for LocalPartition setup.
pub fn get_partition_init_info(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<PartitionInitInfo> {
self.with_metadata(|m| {
m.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.map(|p| PartitionInitInfo {
created_at: p.created_at,
revision_id: p.revision_id,
stats: p.stats.clone(),
consumer_offsets: p.consumer_offsets.clone(),
consumer_group_offsets: p.consumer_group_offsets.clone(),
})
})
}
/// Get consumer group member ID for a client.
pub fn get_consumer_group_member_id(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
client_id: u32,
) -> Option<usize> {
self.with_metadata(|m| {
m.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.consumer_groups.get(group_id))
.and_then(|g| {
g.members
.iter()
.find(|(_, member)| member.client_id == client_id)
.map(|(id, _)| id)
})
})
}
/// Get all consumer groups for a topic.
pub fn get_all_consumer_groups(
&self,
stream_id: StreamId,
topic_id: TopicId,
) -> Vec<ConsumerGroupMeta> {
self.with_metadata(|m| {
m.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.consumer_groups.iter().map(|(_, cg)| cg.clone()).collect())
.unwrap_or_default()
})
}
/// Inheritance: manage_streams → read_streams → read_topics → poll_messages
pub fn perm_poll_messages(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
let metadata = self.load();
if metadata.users_can_poll_all_streams.contains(&user_id) {
return Ok(());
}
if let Some(global) = metadata.users_global_permissions.get(&user_id)
&& (global.read_topics
|| global.manage_topics
|| global.read_streams
|| global.manage_streams)
{
return Ok(());
}
if metadata
.users_can_poll_stream
.contains(&(user_id, stream_id))
{
return Ok(());
}
let Some(stream_permissions) = metadata.users_stream_permissions.get(&(user_id, stream_id))
else {
return Err(IggyError::Unauthorized);
};
if stream_permissions.manage_stream || stream_permissions.read_stream {
return Ok(());
}
if stream_permissions.manage_topics || stream_permissions.read_topics {
return Ok(());
}
if stream_permissions.poll_messages {
return Ok(());
}
if let Some(topics) = &stream_permissions.topics
&& let Some(topic_permissions) = topics.get(&topic_id)
&& (topic_permissions.manage_topic
|| topic_permissions.read_topic
|| topic_permissions.poll_messages)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
/// Inheritance: manage_streams → manage_topics → send_messages
pub fn perm_append_messages(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
let metadata = self.load();
if metadata.users_can_send_all_streams.contains(&user_id) {
return Ok(());
}
if let Some(global) = metadata.users_global_permissions.get(&user_id)
&& (global.manage_streams || global.manage_topics)
{
return Ok(());
}
if metadata
.users_can_send_stream
.contains(&(user_id, stream_id))
{
return Ok(());
}
let Some(stream_permissions) = metadata.users_stream_permissions.get(&(user_id, stream_id))
else {
return Err(IggyError::Unauthorized);
};
if stream_permissions.manage_stream || stream_permissions.manage_topics {
return Ok(());
}
if stream_permissions.send_messages {
return Ok(());
}
if let Some(topics) = &stream_permissions.topics
&& let Some(topic_permissions) = topics.get(&topic_id)
&& (topic_permissions.manage_topic || topic_permissions.send_messages)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
pub fn perm_get_stream(&self, user_id: u32, stream_id: StreamId) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global_permissions) = metadata.users_global_permissions.get(&user_id)
&& (global_permissions.manage_streams || global_permissions.read_streams)
{
return Ok(());
}
if let Some(stream_permissions) =
metadata.users_stream_permissions.get(&(user_id, stream_id))
&& (stream_permissions.manage_stream || stream_permissions.read_stream)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
pub fn perm_get_streams(&self, user_id: u32) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global_permissions) = metadata.users_global_permissions.get(&user_id)
&& (global_permissions.manage_streams || global_permissions.read_streams)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
pub fn perm_create_stream(&self, user_id: u32) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global_permissions) = metadata.users_global_permissions.get(&user_id)
&& global_permissions.manage_streams
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
pub fn perm_update_stream(&self, user_id: u32, stream_id: StreamId) -> Result<(), IggyError> {
self.perm_manage_stream(user_id, stream_id)
}
pub fn perm_delete_stream(&self, user_id: u32, stream_id: StreamId) -> Result<(), IggyError> {
self.perm_manage_stream(user_id, stream_id)
}
pub fn perm_purge_stream(&self, user_id: u32, stream_id: StreamId) -> Result<(), IggyError> {
self.perm_manage_stream(user_id, stream_id)
}
fn perm_manage_stream(&self, user_id: u32, stream_id: StreamId) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global_permissions) = metadata.users_global_permissions.get(&user_id)
&& global_permissions.manage_streams
{
return Ok(());
}
if let Some(stream_permissions) =
metadata.users_stream_permissions.get(&(user_id, stream_id))
&& stream_permissions.manage_stream
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
/// Inheritance: manage_streams → read_streams → read_topics
pub fn perm_get_topic(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global) = metadata.users_global_permissions.get(&user_id)
&& (global.read_streams
|| global.manage_streams
|| global.manage_topics
|| global.read_topics)
{
return Ok(());
}
if let Some(stream_permissions) =
metadata.users_stream_permissions.get(&(user_id, stream_id))
{
if stream_permissions.manage_stream
|| stream_permissions.read_stream
|| stream_permissions.manage_topics
|| stream_permissions.read_topics
{
return Ok(());
}
if let Some(topics) = &stream_permissions.topics
&& let Some(topic_permissions) = topics.get(&topic_id)
&& (topic_permissions.manage_topic || topic_permissions.read_topic)
{
return Ok(());
}
}
Err(IggyError::Unauthorized)
}
pub fn perm_get_topics(&self, user_id: u32, stream_id: StreamId) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global) = metadata.users_global_permissions.get(&user_id)
&& (global.read_streams
|| global.manage_streams
|| global.manage_topics
|| global.read_topics)
{
return Ok(());
}
if let Some(stream_permissions) =
metadata.users_stream_permissions.get(&(user_id, stream_id))
&& (stream_permissions.manage_stream
|| stream_permissions.read_stream
|| stream_permissions.manage_topics
|| stream_permissions.read_topics)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
/// Inheritance: manage_streams → manage_topics
pub fn perm_create_topic(&self, user_id: u32, stream_id: StreamId) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global) = metadata.users_global_permissions.get(&user_id)
&& (global.manage_streams || global.manage_topics)
{
return Ok(());
}
if let Some(stream_permissions) =
metadata.users_stream_permissions.get(&(user_id, stream_id))
&& (stream_permissions.manage_stream || stream_permissions.manage_topics)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
pub fn perm_update_topic(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_manage_topic(user_id, stream_id, topic_id)
}
pub fn perm_delete_topic(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_manage_topic(user_id, stream_id, topic_id)
}
pub fn perm_purge_topic(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_manage_topic(user_id, stream_id, topic_id)
}
/// Inheritance: manage_streams → manage_topics
fn perm_manage_topic(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global) = metadata.users_global_permissions.get(&user_id)
&& (global.manage_streams || global.manage_topics)
{
return Ok(());
}
if let Some(stream_permissions) =
metadata.users_stream_permissions.get(&(user_id, stream_id))
{
if stream_permissions.manage_stream || stream_permissions.manage_topics {
return Ok(());
}
if let Some(topics) = &stream_permissions.topics
&& let Some(topic_permissions) = topics.get(&topic_id)
&& topic_permissions.manage_topic
{
return Ok(());
}
}
Err(IggyError::Unauthorized)
}
pub fn perm_create_partitions(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_update_topic(user_id, stream_id, topic_id)
}
pub fn perm_delete_partitions(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_update_topic(user_id, stream_id, topic_id)
}
pub fn perm_delete_segments(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_update_topic(user_id, stream_id, topic_id)
}
pub fn perm_create_consumer_group(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_get_topic(user_id, stream_id, topic_id)
}
pub fn perm_delete_consumer_group(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_get_topic(user_id, stream_id, topic_id)
}
pub fn perm_get_consumer_group(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_get_topic(user_id, stream_id, topic_id)
}
pub fn perm_get_consumer_groups(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_get_topic(user_id, stream_id, topic_id)
}
pub fn perm_join_consumer_group(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_get_topic(user_id, stream_id, topic_id)
}
pub fn perm_leave_consumer_group(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_get_topic(user_id, stream_id, topic_id)
}
pub fn perm_get_consumer_offset(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_poll_messages(user_id, stream_id, topic_id)
}
pub fn perm_store_consumer_offset(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_poll_messages(user_id, stream_id, topic_id)
}
pub fn perm_delete_consumer_offset(
&self,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
self.perm_poll_messages(user_id, stream_id, topic_id)
}
pub fn perm_get_user(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_read_users(user_id)
}
pub fn perm_get_users(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_read_users(user_id)
}
pub fn perm_create_user(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_manage_users(user_id)
}
pub fn perm_delete_user(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_manage_users(user_id)
}
pub fn perm_update_user(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_manage_users(user_id)
}
pub fn perm_update_permissions(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_manage_users(user_id)
}
pub fn perm_change_password(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_manage_users(user_id)
}
fn perm_manage_users(&self, user_id: u32) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global_permissions) = metadata.users_global_permissions.get(&user_id)
&& global_permissions.manage_users
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
fn perm_read_users(&self, user_id: u32) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global_permissions) = metadata.users_global_permissions.get(&user_id)
&& (global_permissions.manage_users || global_permissions.read_users)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
pub fn perm_get_stats(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_get_server_info(user_id)
}
pub fn perm_get_clients(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_get_server_info(user_id)
}
pub fn perm_get_client(&self, user_id: u32) -> Result<(), IggyError> {
self.perm_get_server_info(user_id)
}
fn perm_get_server_info(&self, user_id: u32) -> Result<(), IggyError> {
let metadata = self.load();
if let Some(global_permissions) = metadata.users_global_permissions.get(&user_id)
&& (global_permissions.manage_servers || global_permissions.read_servers)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
/// Atomically resolve, authorize, and return stream metadata.
pub fn query_stream(
&self,
user_id: u32,
stream_id: &Identifier,
) -> Result<Option<StreamMeta>, IggyError> {
self.with_metadata(|m| {
let sid = match resolve_stream_id_inner(m, stream_id) {
Some(s) => s,
None => return Ok(None),
};
perm_get_stream_inner(m, user_id, sid)?;
Ok(m.streams.get(sid).cloned())
})
}
/// Atomically authorize and return all streams.
pub fn query_streams(&self, user_id: u32) -> Result<Vec<StreamMeta>, IggyError> {
self.with_metadata(|m| {
perm_get_streams_inner(m, user_id)?;
Ok(m.streams.iter().map(|(_, s)| s.clone()).collect())
})
}
/// Atomically resolve, authorize, and return topic metadata.
pub fn query_topic(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<Option<TopicMeta>, IggyError> {
self.with_metadata(|m| {
let sid = match resolve_stream_id_inner(m, stream_id) {
Some(s) => s,
None => return Ok(None),
};
let tid = match resolve_topic_id_inner(m, sid, topic_id) {
Some(id) => id,
None => return Ok(None),
};
perm_get_topic_inner(m, user_id, sid, tid)?;
Ok(m.streams.get(sid).and_then(|s| s.topics.get(tid).cloned()))
})
}
/// Atomically resolve, authorize, and return all topics for a stream.
pub fn query_topics(
&self,
user_id: u32,
stream_id: &Identifier,
) -> Result<Option<Vec<TopicMeta>>, IggyError> {
self.with_metadata(|m| {
let sid = match resolve_stream_id_inner(m, stream_id) {
Some(s) => s,
None => return Ok(None),
};
perm_get_topics_inner(m, user_id, sid)?;
Ok(m.streams
.get(sid)
.map(|s| s.topics.iter().map(|(_, t)| t.clone()).collect()))
})
}
/// Atomically resolve, authorize, and return consumer group metadata.
pub fn query_consumer_group(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<Option<ConsumerGroupMeta>, IggyError> {
self.with_metadata(|m| {
let sid = match resolve_stream_id_inner(m, stream_id) {
Some(s) => s,
None => return Ok(None),
};
let tid = match resolve_topic_id_inner(m, sid, topic_id) {
Some(id) => id,
None => return Ok(None),
};
let gid = match resolve_consumer_group_id_inner(m, sid, tid, group_id) {
Some(id) => id,
None => return Ok(None),
};
perm_get_consumer_group_inner(m, user_id, sid, tid)?;
Ok(m.streams
.get(sid)
.and_then(|s| s.topics.get(tid))
.and_then(|t| t.consumer_groups.get(gid).cloned()))
})
}
/// Atomically resolve, authorize, and return all consumer groups for a topic.
pub fn query_consumer_groups(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<Option<Vec<ConsumerGroupMeta>>, IggyError> {
self.with_metadata(|m| {
let sid = match resolve_stream_id_inner(m, stream_id) {
Some(s) => s,
None => return Ok(None),
};
let tid = match resolve_topic_id_inner(m, sid, topic_id) {
Some(id) => id,
None => return Ok(None),
};
perm_get_consumer_group_inner(m, user_id, sid, tid)?;
Ok(m.streams
.get(sid)
.and_then(|s| s.topics.get(tid))
.map(|t| t.consumer_groups.iter().map(|(_, cg)| cg.clone()).collect()))
})
}
/// Atomically resolve, authorize, and return user metadata.
/// Permission check skipped when requesting own data.
pub fn query_user(
&self,
requesting_user_id: u32,
target_user_id: &Identifier,
) -> Result<Option<UserMeta>, IggyError> {
self.with_metadata(|m| {
let uid = match resolve_user_id_inner(m, target_user_id) {
Some(id) => id,
None => return Ok(None),
};
if uid != requesting_user_id {
perm_get_user_inner(m, requesting_user_id)?;
}
Ok(m.users.get(uid as usize).cloned())
})
}
/// Atomically authorize and return all users.
pub fn query_users(&self, user_id: u32) -> Result<Vec<UserMeta>, IggyError> {
self.with_metadata(|m| {
perm_get_users_inner(m, user_id)?;
Ok(m.users.iter().map(|(_, u)| u.clone()).collect())
})
}
/// Atomically resolve topic and check permission for consumer offset query.
/// Returns resolved topic for use with get_consumer_offset.
pub fn resolve_for_consumer_offset(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<Option<ResolvedTopic>, IggyError> {
self.with_metadata(|m| {
let sid = match resolve_stream_id_inner(m, stream_id) {
Some(s) => s,
None => return Ok(None),
};
let tid = match resolve_topic_id_inner(m, sid, topic_id) {
Some(id) => id,
None => return Ok(None),
};
perm_get_consumer_offset_inner(m, user_id, sid, tid)?;
Ok(Some(ResolvedTopic {
stream_id: sid,
topic_id: tid,
}))
})
}
/// Atomically resolve topic and check append permission.
pub fn resolve_for_append(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<ResolvedTopic, IggyError> {
self.with_metadata(|m| {
let sid = resolve_stream_id_inner(m, stream_id)
.ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
let tid = resolve_topic_id_inner(m, sid, topic_id)
.ok_or_else(|| IggyError::TopicIdNotFound(stream_id.clone(), topic_id.clone()))?;
perm_append_messages_inner(m, user_id, sid, tid)?;
Ok(ResolvedTopic {
stream_id: sid,
topic_id: tid,
})
})
}
/// Atomically resolve topic and check poll permission.
pub fn resolve_for_poll(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<ResolvedTopic, IggyError> {
self.with_metadata(|m| {
let sid = resolve_stream_id_inner(m, stream_id)
.ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
let tid = resolve_topic_id_inner(m, sid, topic_id)
.ok_or_else(|| IggyError::TopicIdNotFound(stream_id.clone(), topic_id.clone()))?;
perm_poll_messages_inner(m, user_id, sid, tid)?;
Ok(ResolvedTopic {
stream_id: sid,
topic_id: tid,
})
})
}
/// Atomically resolve topic and check store consumer offset permission.
pub fn resolve_for_store_consumer_offset(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<ResolvedTopic, IggyError> {
self.with_metadata(|m| {
let sid = resolve_stream_id_inner(m, stream_id)
.ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
let tid = resolve_topic_id_inner(m, sid, topic_id)
.ok_or_else(|| IggyError::TopicIdNotFound(stream_id.clone(), topic_id.clone()))?;
perm_get_consumer_offset_inner(m, user_id, sid, tid)?;
Ok(ResolvedTopic {
stream_id: sid,
topic_id: tid,
})
})
}
/// Atomically resolve topic and check delete consumer offset permission.
pub fn resolve_for_delete_consumer_offset(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<ResolvedTopic, IggyError> {
self.with_metadata(|m| {
let sid = resolve_stream_id_inner(m, stream_id)
.ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
let tid = resolve_topic_id_inner(m, sid, topic_id)
.ok_or_else(|| IggyError::TopicIdNotFound(stream_id.clone(), topic_id.clone()))?;
perm_get_consumer_offset_inner(m, user_id, sid, tid)?;
Ok(ResolvedTopic {
stream_id: sid,
topic_id: tid,
})
})
}
/// Atomically resolve partition and check delete segments permission.
pub fn resolve_for_delete_segments(
&self,
user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: PartitionId,
) -> Result<ResolvedPartition, IggyError> {
self.with_metadata(|m| {
let sid = resolve_stream_id_inner(m, stream_id)
.ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
let tid = resolve_topic_id_inner(m, sid, topic_id)
.ok_or_else(|| IggyError::TopicIdNotFound(stream_id.clone(), topic_id.clone()))?;
let exists = m
.streams
.get(sid)
.and_then(|s| s.topics.get(tid))
.and_then(|t| t.partitions.get(partition_id))
.is_some();
if !exists {
return Err(IggyError::PartitionNotFound(
partition_id,
topic_id.clone(),
stream_id.clone(),
));
}
perm_get_topic_inner(m, user_id, sid, tid)?;
Ok(ResolvedPartition {
stream_id: sid,
topic_id: tid,
partition_id,
})
})
}
}
/// Information needed to initialize a LocalPartition.
#[derive(Clone, Debug)]
pub struct PartitionInitInfo {
pub created_at: IggyTimestamp,
pub revision_id: u64,
pub stats: Arc<PartitionStats>,
pub consumer_offsets: Arc<ConsumerOffsets>,
pub consumer_group_offsets: Arc<ConsumerGroupOffsets>,
}
pub(crate) fn resolve_stream_id_inner(
m: &InnerMetadata,
stream_id: &Identifier,
) -> Option<StreamId> {
match stream_id.kind {
IdKind::Numeric => {
let sid = stream_id.get_u32_value().ok()? as StreamId;
if m.streams.get(sid).is_some() {
Some(sid)
} else {
None
}
}
IdKind::String => {
let name = stream_id.get_cow_str_value().ok()?;
m.stream_index.get(name.as_ref()).copied()
}
}
}
pub(crate) fn resolve_topic_id_inner(
m: &InnerMetadata,
stream_id: StreamId,
topic_id: &Identifier,
) -> Option<TopicId> {
let stream = m.streams.get(stream_id)?;
match topic_id.kind {
IdKind::Numeric => {
let tid = topic_id.get_u32_value().ok()? as TopicId;
if stream.topics.get(tid).is_some() {
Some(tid)
} else {
None
}
}
IdKind::String => {
let name = topic_id.get_cow_str_value().ok()?;
stream.topic_index.get(&Arc::from(name.as_ref())).copied()
}
}
}
pub(crate) fn resolve_consumer_group_id_inner(
m: &InnerMetadata,
stream_id: StreamId,
topic_id: TopicId,
group_id: &Identifier,
) -> Option<ConsumerGroupId> {
let stream = m.streams.get(stream_id)?;
let topic = stream.topics.get(topic_id)?;
match group_id.kind {
IdKind::Numeric => {
let gid = group_id.get_u32_value().ok()? as ConsumerGroupId;
if topic.consumer_groups.get(gid).is_some() {
Some(gid)
} else {
None
}
}
IdKind::String => {
let name = group_id.get_cow_str_value().ok()?;
topic
.consumer_group_index
.get(&Arc::from(name.as_ref()))
.copied()
}
}
}
fn resolve_user_id_inner(m: &InnerMetadata, user_id: &Identifier) -> Option<UserId> {
match user_id.kind {
IdKind::Numeric => Some(user_id.get_u32_value().ok()?),
IdKind::String => {
let name = user_id.get_cow_str_value().ok()?;
m.user_index.get(name.as_ref()).copied()
}
}
}
fn perm_get_stream_inner(
m: &InnerMetadata,
user_id: u32,
stream_id: StreamId,
) -> Result<(), IggyError> {
if let Some(global) = m.users_global_permissions.get(&user_id)
&& (global.manage_streams || global.read_streams)
{
return Ok(());
}
if let Some(stream_perm) = m.users_stream_permissions.get(&(user_id, stream_id))
&& (stream_perm.manage_stream || stream_perm.read_stream)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
fn perm_get_streams_inner(m: &InnerMetadata, user_id: u32) -> Result<(), IggyError> {
if let Some(global) = m.users_global_permissions.get(&user_id)
&& (global.manage_streams || global.read_streams)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
fn perm_get_topic_inner(
m: &InnerMetadata,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
if let Some(global) = m.users_global_permissions.get(&user_id)
&& (global.read_streams
|| global.manage_streams
|| global.manage_topics
|| global.read_topics)
{
return Ok(());
}
if let Some(stream_permissions) = m.users_stream_permissions.get(&(user_id, stream_id)) {
if stream_permissions.manage_stream
|| stream_permissions.read_stream
|| stream_permissions.manage_topics
|| stream_permissions.read_topics
{
return Ok(());
}
if let Some(topics) = &stream_permissions.topics
&& let Some(topic_permissions) = topics.get(&topic_id)
&& (topic_permissions.manage_topic || topic_permissions.read_topic)
{
return Ok(());
}
}
Err(IggyError::Unauthorized)
}
fn perm_get_topics_inner(
m: &InnerMetadata,
user_id: u32,
stream_id: StreamId,
) -> Result<(), IggyError> {
if let Some(global) = m.users_global_permissions.get(&user_id)
&& (global.read_streams
|| global.manage_streams
|| global.manage_topics
|| global.read_topics)
{
return Ok(());
}
if let Some(stream_permissions) = m.users_stream_permissions.get(&(user_id, stream_id))
&& (stream_permissions.manage_stream
|| stream_permissions.read_stream
|| stream_permissions.manage_topics
|| stream_permissions.read_topics)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
fn perm_get_consumer_group_inner(
m: &InnerMetadata,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
perm_get_topic_inner(m, user_id, stream_id, topic_id)
}
fn perm_get_user_inner(m: &InnerMetadata, user_id: u32) -> Result<(), IggyError> {
if let Some(global) = m.users_global_permissions.get(&user_id)
&& (global.manage_users || global.read_users)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
fn perm_get_users_inner(m: &InnerMetadata, user_id: u32) -> Result<(), IggyError> {
perm_get_user_inner(m, user_id)
}
fn perm_get_consumer_offset_inner(
m: &InnerMetadata,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
if m.users_can_poll_all_streams.contains(&user_id) {
return Ok(());
}
if let Some(global) = m.users_global_permissions.get(&user_id)
&& (global.read_topics
|| global.manage_topics
|| global.read_streams
|| global.manage_streams)
{
return Ok(());
}
if m.users_can_poll_stream.contains(&(user_id, stream_id)) {
return Ok(());
}
let Some(stream_permissions) = m.users_stream_permissions.get(&(user_id, stream_id)) else {
return Err(IggyError::Unauthorized);
};
if stream_permissions.manage_stream || stream_permissions.read_stream {
return Ok(());
}
if stream_permissions.manage_topics || stream_permissions.read_topics {
return Ok(());
}
if stream_permissions.poll_messages {
return Ok(());
}
if let Some(topics) = &stream_permissions.topics
&& let Some(topic_permissions) = topics.get(&topic_id)
&& (topic_permissions.manage_topic
|| topic_permissions.read_topic
|| topic_permissions.poll_messages)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}
fn perm_poll_messages_inner(
m: &InnerMetadata,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
perm_get_consumer_offset_inner(m, user_id, stream_id, topic_id)
}
fn perm_append_messages_inner(
m: &InnerMetadata,
user_id: u32,
stream_id: StreamId,
topic_id: TopicId,
) -> Result<(), IggyError> {
if m.users_can_send_all_streams.contains(&user_id) {
return Ok(());
}
if let Some(global) = m.users_global_permissions.get(&user_id)
&& (global.manage_streams || global.manage_topics)
{
return Ok(());
}
if m.users_can_send_stream.contains(&(user_id, stream_id)) {
return Ok(());
}
let Some(stream_permissions) = m.users_stream_permissions.get(&(user_id, stream_id)) else {
return Err(IggyError::Unauthorized);
};
if stream_permissions.manage_stream
|| stream_permissions.manage_topics
|| stream_permissions.send_messages
{
return Ok(());
}
if let Some(topics) = &stream_permissions.topics
&& let Some(topic_permissions) = topics.get(&topic_id)
&& (topic_permissions.manage_topic || topic_permissions.send_messages)
{
return Ok(());
}
Err(IggyError::Unauthorized)
}