blob: ea85511a6edd4b2c8e0138fcd7f6678bdfb29405 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::metadata::{
ConsumerGroupId, ConsumerGroupMeta, InnerMetadata, MetadataReadHandle, PartitionId,
PartitionMeta, StreamId, StreamMeta, TopicId, TopicMeta, UserId, UserMeta,
};
use crate::streaming::partitions::partition::{ConsumerGroupOffsets, ConsumerOffsets};
use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
use iggy_common::sharding::IggyNamespace;
use iggy_common::{IdKind, Identifier, PersonalAccessToken};
use left_right::ReadGuard;
use std::sync::Arc;
use std::sync::atomic::Ordering;
/// Thread-safe wrapper for GlobalMetadata using left-right for lock-free reads.
/// Uses hierarchical structure: streams contain topics, topics contain partitions and consumer groups.
/// All mutations go through MetadataWriter (shard 0 only).
///
/// Each shard should own its own `Metadata` instance (cloned from a common source).
/// The underlying data is shared via left-right's internal mechanism.
#[derive(Clone)]
pub struct Metadata {
inner: MetadataReadHandle,
}
impl Metadata {
pub fn new(reader: MetadataReadHandle) -> Self {
Self { inner: reader }
}
#[inline]
pub fn load(&self) -> ReadGuard<'_, InnerMetadata> {
self.inner
.enter()
.expect("metadata not initialized - writer must publish before reads")
}
pub fn get_stream_id(&self, identifier: &Identifier) -> Option<StreamId> {
let metadata = self.load();
match identifier.kind {
IdKind::Numeric => {
let stream_id = identifier.get_u32_value().ok()? as StreamId;
if metadata.streams.get(stream_id).is_some() {
Some(stream_id)
} else {
None
}
}
IdKind::String => {
let name = identifier.get_cow_str_value().ok()?;
metadata.stream_index.get(name.as_ref()).copied()
}
}
}
pub fn stream_name_exists(&self, name: &str) -> bool {
self.load().stream_index.contains_key(name)
}
pub fn get_topic_id(&self, stream_id: StreamId, identifier: &Identifier) -> Option<TopicId> {
let metadata = self.load();
let stream = metadata.streams.get(stream_id)?;
match identifier.kind {
IdKind::Numeric => {
let topic_id = identifier.get_u32_value().ok()? as TopicId;
if stream.topics.get(topic_id).is_some() {
Some(topic_id)
} else {
None
}
}
IdKind::String => {
let name = identifier.get_cow_str_value().ok()?;
stream.topic_index.get(&Arc::from(name.as_ref())).copied()
}
}
}
pub fn get_user_id(&self, identifier: &Identifier) -> Option<UserId> {
let metadata = self.load();
match identifier.kind {
IdKind::Numeric => Some(identifier.get_u32_value().ok()? as UserId),
IdKind::String => {
let name = identifier.get_cow_str_value().ok()?;
metadata.user_index.get(name.as_ref()).copied()
}
}
}
pub fn get_consumer_group_id(
&self,
stream_id: StreamId,
topic_id: TopicId,
identifier: &Identifier,
) -> Option<ConsumerGroupId> {
let metadata = self.load();
let stream = metadata.streams.get(stream_id)?;
let topic = stream.topics.get(topic_id)?;
match identifier.kind {
IdKind::Numeric => {
let group_id = identifier.get_u32_value().ok()? as ConsumerGroupId;
if topic.consumer_groups.get(group_id).is_some() {
Some(group_id)
} else {
None
}
}
IdKind::String => {
let name = identifier.get_cow_str_value().ok()?;
topic
.consumer_group_index
.get(&Arc::from(name.as_ref()))
.copied()
}
}
}
pub fn stream_exists(&self, id: StreamId) -> bool {
self.load().streams.get(id).is_some()
}
pub fn topic_exists(&self, stream_id: StreamId, topic_id: TopicId) -> bool {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.is_some()
}
pub fn partition_exists(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> bool {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.is_some()
}
pub fn user_exists(&self, id: UserId) -> bool {
self.load().users.get(id as usize).is_some()
}
pub fn consumer_group_exists(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
) -> bool {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.consumer_groups.get(group_id))
.is_some()
}
pub fn consumer_group_exists_by_name(
&self,
stream_id: StreamId,
topic_id: TopicId,
name: &str,
) -> bool {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.consumer_group_index.contains_key(name))
.unwrap_or(false)
}
pub fn streams_count(&self) -> usize {
self.load().streams.len()
}
pub fn next_stream_id(&self) -> usize {
self.load().streams.vacant_key()
}
pub fn topics_count(&self, stream_id: StreamId) -> usize {
self.load()
.streams
.get(stream_id)
.map(|s| s.topics.len())
.unwrap_or(0)
}
pub fn next_topic_id(&self, stream_id: StreamId) -> Option<usize> {
self.load()
.streams
.get(stream_id)
.map(|s| s.topics.vacant_key())
}
pub fn partitions_count(&self, stream_id: StreamId, topic_id: TopicId) -> usize {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.partitions.len())
.unwrap_or(0)
}
pub fn get_partitions_count(&self, stream_id: StreamId, topic_id: TopicId) -> Option<usize> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.partitions.len())
}
pub fn get_next_partition_id(&self, stream_id: StreamId, topic_id: TopicId) -> Option<usize> {
let metadata = self.load();
let topic = metadata.streams.get(stream_id)?.topics.get(topic_id)?;
let partitions_count = topic.partitions.len();
if partitions_count == 0 {
return None;
}
let counter = &topic.round_robin_counter;
let mut partition_id = counter.fetch_add(1, Ordering::AcqRel);
if partition_id >= partitions_count {
partition_id %= partitions_count;
counter.store(partition_id + 1, Ordering::Relaxed);
}
Some(partition_id)
}
pub fn get_next_member_partition_id(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
member_id: usize,
calculate: bool,
) -> Option<PartitionId> {
let metadata = self.load();
let member = metadata
.streams
.get(stream_id)?
.topics
.get(topic_id)?
.consumer_groups
.get(group_id)?
.members
.get(member_id)?;
let assigned_partitions = &member.partitions;
if assigned_partitions.is_empty() {
return None;
}
let partitions_count = assigned_partitions.len();
let counter = &member.partition_index;
if calculate {
let current = counter
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some((current + 1) % partitions_count)
})
.unwrap();
Some(assigned_partitions[current % partitions_count])
} else {
let current = counter.load(Ordering::Relaxed);
Some(assigned_partitions[current % partitions_count])
}
}
pub fn users_count(&self) -> usize {
self.load().users.len()
}
pub fn username_exists(&self, username: &str) -> bool {
self.load().user_index.contains_key(username)
}
pub fn consumer_groups_count(&self, stream_id: StreamId, topic_id: TopicId) -> usize {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.consumer_groups.len())
.unwrap_or(0)
}
pub fn get_stream_stats(&self, id: StreamId) -> Option<Arc<StreamStats>> {
self.load().streams.get(id).map(|s| s.stats.clone())
}
pub fn get_topic_stats(
&self,
stream_id: StreamId,
topic_id: TopicId,
) -> Option<Arc<TopicStats>> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.map(|t| t.stats.clone())
}
pub fn get_partition_stats(&self, ns: &IggyNamespace) -> Option<Arc<PartitionStats>> {
self.load()
.streams
.get(ns.stream_id())
.and_then(|s| s.topics.get(ns.topic_id()))
.and_then(|t| t.partitions.get(ns.partition_id()))
.map(|p| p.stats.clone())
}
pub fn get_partition_stats_by_ids(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<Arc<PartitionStats>> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.map(|p| p.stats.clone())
}
pub fn get_partition_consumer_offsets(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<Arc<ConsumerOffsets>> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.and_then(|p| p.consumer_offsets.clone())
}
pub fn get_partition_consumer_group_offsets(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<Arc<ConsumerGroupOffsets>> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id))
.and_then(|p| p.consumer_group_offsets.clone())
}
pub fn get_user(&self, id: UserId) -> Option<UserMeta> {
self.load().users.get(id as usize).cloned()
}
pub fn get_all_users(&self) -> Vec<UserMeta> {
self.load().users.iter().map(|(_, u)| u.clone()).collect()
}
pub fn get_stream(&self, id: StreamId) -> Option<StreamMeta> {
self.load().streams.get(id).cloned()
}
pub fn get_topic(&self, stream_id: StreamId, topic_id: TopicId) -> Option<TopicMeta> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id).cloned())
}
pub fn get_partition(
&self,
stream_id: StreamId,
topic_id: TopicId,
partition_id: PartitionId,
) -> Option<PartitionMeta> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.partitions.get(partition_id).cloned())
}
pub fn get_consumer_group(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
) -> Option<ConsumerGroupMeta> {
self.load()
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.consumer_groups.get(group_id).cloned())
}
pub fn get_user_personal_access_tokens(&self, user_id: UserId) -> Vec<PersonalAccessToken> {
self.load()
.personal_access_tokens
.get(&user_id)
.map(|pats| pats.values().cloned().collect())
.unwrap_or_default()
}
pub fn get_personal_access_token_by_hash(
&self,
token_hash: &str,
) -> Option<PersonalAccessToken> {
let token_hash_arc: Arc<str> = Arc::from(token_hash);
let metadata = self.load();
for (_, user_pats) in metadata.personal_access_tokens.iter() {
if let Some(pat) = user_pats.get(&token_hash_arc) {
return Some(pat.clone());
}
}
None
}
pub fn user_pat_count(&self, user_id: UserId) -> usize {
self.load()
.personal_access_tokens
.get(&user_id)
.map(|pats| pats.len())
.unwrap_or(0)
}
pub fn user_has_pat_with_name(&self, user_id: UserId, name: &str) -> bool {
self.load()
.personal_access_tokens
.get(&user_id)
.map(|pats| pats.values().any(|pat| &*pat.name == name))
.unwrap_or(false)
}
pub fn find_pat_token_hash_by_name(&self, user_id: UserId, name: &str) -> Option<Arc<str>> {
self.load()
.personal_access_tokens
.get(&user_id)
.and_then(|pats| {
pats.iter()
.find(|(_, pat)| &*pat.name == name)
.map(|(hash, _)| hash.clone())
})
}
pub fn is_consumer_group_member(
&self,
stream_id: StreamId,
topic_id: TopicId,
group_id: ConsumerGroupId,
client_id: u32,
) -> bool {
let metadata = self.load();
metadata
.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
.and_then(|t| t.consumer_groups.get(group_id))
.map(|g| g.members.iter().any(|(_, m)| m.client_id == client_id))
.unwrap_or(false)
}
}