blob: 5fc08adf2784c0ab7b49268d24c1127b647cea6f [file] [log] [blame]
use crate::streaming::topics::consumer_group::ConsumerGroup;
use crate::streaming::topics::topic::Topic;
use iggy::error::IggyError;
use iggy::identifier::{IdKind, Identifier};
use iggy::utils::text;
use tokio::sync::RwLock;
use tracing::info;
impl Topic {
pub async fn reassign_consumer_groups(&mut self) {
if self.consumer_groups.is_empty() {
return;
}
let partitions_count = self.partitions.len() as u32;
info!(
"Reassigning consumer groups for topic with ID: {} for stream with ID with {}, partitions count: {}",
self.topic_id, self.stream_id, partitions_count
);
for (_, consumer_group) in self.consumer_groups.iter_mut() {
let mut consumer_group = consumer_group.write().await;
consumer_group.reassign_partitions(partitions_count).await;
}
}
pub fn get_consumer_groups(&self) -> Vec<&RwLock<ConsumerGroup>> {
self.consumer_groups.values().collect()
}
pub fn get_consumer_group(
&self,
identifier: &Identifier,
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
match identifier.kind {
IdKind::Numeric => self.get_consumer_group_by_id(identifier.get_u32_value().unwrap()),
IdKind::String => self.get_consumer_group_by_name(&identifier.get_cow_str_value()?),
}
}
pub fn get_consumer_group_by_name(
&self,
name: &str,
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
let consumer_group_id = self.consumer_groups_ids.get(name);
if consumer_group_id.is_none() {
return Err(IggyError::ConsumerGroupNameNotFound(
name.to_string(),
self.topic_id,
));
}
self.get_consumer_group_by_id(*consumer_group_id.unwrap())
}
pub fn get_consumer_group_by_id(&self, id: u32) -> Result<&RwLock<ConsumerGroup>, IggyError> {
let consumer_group = self.consumer_groups.get(&id);
if consumer_group.is_none() {
return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id));
}
Ok(consumer_group.unwrap())
}
pub async fn create_consumer_group(&mut self, id: u32, name: &str) -> Result<(), IggyError> {
if self.consumer_groups.contains_key(&id) {
return Err(IggyError::ConsumerGroupIdAlreadyExists(id, self.topic_id));
}
let name = text::to_lowercase_non_whitespace(name);
if self.consumer_groups_ids.contains_key(&name) {
return Err(IggyError::ConsumerGroupNameAlreadyExists(
name,
self.topic_id,
));
}
let consumer_group =
ConsumerGroup::new(self.topic_id, id, &name, self.partitions.len() as u32);
self.consumer_groups.insert(id, RwLock::new(consumer_group));
self.consumer_groups_ids.insert(name, id);
let consumer_group = self.get_consumer_group_by_id(id)?;
let consumer_group = consumer_group.read().await;
self.storage
.topic
.save_consumer_group(self, &consumer_group)
.await?;
info!(
"Created consumer group with ID: {} for topic with ID: {} and stream with ID: {}.",
id, self.topic_id, self.stream_id
);
Ok(())
}
pub async fn delete_consumer_group(
&mut self,
id: &Identifier,
) -> Result<RwLock<ConsumerGroup>, IggyError> {
let consumer_group_id;
{
let consumer_group = self.get_consumer_group(id)?;
let consumer_group = consumer_group.read().await;
consumer_group_id = consumer_group.consumer_group_id;
}
let consumer_group = self.consumer_groups.remove(&consumer_group_id);
if let Some(consumer_group) = consumer_group {
{
let consumer_group = consumer_group.read().await;
self.consumer_groups_ids.remove(&consumer_group.name);
self.storage
.topic
.delete_consumer_group(self, &consumer_group)
.await?;
info!(
"Deleted consumer group with ID: {} from topic with ID: {} and stream with ID: {}.",
id, self.topic_id, self.stream_id
);
}
return Ok(consumer_group);
}
Err(IggyError::ConsumerGroupIdNotFound(
consumer_group_id,
self.topic_id,
))
}
pub async fn join_consumer_group(
&self,
consumer_group_id: &Identifier,
member_id: u32,
) -> Result<(), IggyError> {
let consumer_group = self.get_consumer_group(consumer_group_id)?;
let mut consumer_group = consumer_group.write().await;
consumer_group.add_member(member_id).await;
info!(
"Member with ID: {} has joined consumer group with ID: {} for topic with ID: {} and stream with ID: {}.",
member_id, consumer_group_id, self.topic_id, self.stream_id
);
Ok(())
}
pub async fn leave_consumer_group(
&self,
consumer_group_id: &Identifier,
member_id: u32,
) -> Result<(), IggyError> {
let consumer_group = self.get_consumer_group(consumer_group_id)?;
let mut consumer_group = consumer_group.write().await;
consumer_group.delete_member(member_id).await;
info!(
"Member with ID: {} has left consumer group with ID: {} for topic with ID: {} and stream with ID: {}.",
member_id, consumer_group_id, self.topic_id, self.stream_id
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::system::SystemConfig;
use crate::streaming::storage::tests::get_test_system_storage;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::Arc;
#[tokio::test]
async fn should_be_created_given_valid_parameters() {
let consumer_group_id = 1;
let name = "test";
let mut topic = get_topic();
let result = topic.create_consumer_group(consumer_group_id, name).await;
assert!(result.is_ok());
assert_eq!(topic.consumer_groups.len(), 1);
let consumer_group = topic
.get_consumer_group(&Identifier::numeric(consumer_group_id).unwrap())
.unwrap();
let consumer_group = consumer_group.read().await;
assert_eq!(consumer_group.consumer_group_id, consumer_group_id);
assert_eq!(consumer_group.name, name);
assert_eq!(consumer_group.topic_id, topic.topic_id);
assert_eq!(
consumer_group.partitions_count,
topic.partitions.len() as u32
);
}
#[tokio::test]
async fn should_not_be_created_given_already_existing_group_with_same_id() {
let consumer_group_id = 1;
let name = "test";
let mut topic = get_topic();
let result = topic.create_consumer_group(consumer_group_id, name).await;
assert!(result.is_ok());
assert_eq!(topic.consumer_groups.len(), 1);
let result = topic.create_consumer_group(consumer_group_id, name).await;
assert!(result.is_err());
assert_eq!(topic.consumer_groups.len(), 1);
let err = result.unwrap_err();
assert!(matches!(err, IggyError::ConsumerGroupIdAlreadyExists(_, _)));
}
#[tokio::test]
async fn should_not_be_created_given_already_existing_group_with_same_name() {
let consumer_group_id = 1;
let name = "test";
let mut topic = get_topic();
let result = topic.create_consumer_group(consumer_group_id, name).await;
assert!(result.is_ok());
assert_eq!(topic.consumer_groups.len(), 1);
let consumer_group_id = consumer_group_id + 1;
let result = topic.create_consumer_group(consumer_group_id, name).await;
assert!(result.is_err());
assert_eq!(topic.consumer_groups.len(), 1);
let err = result.unwrap_err();
assert!(matches!(
err,
IggyError::ConsumerGroupNameAlreadyExists(_, _)
));
}
#[tokio::test]
async fn should_be_deleted_given_already_existing_group_with_same_id() {
let consumer_group_id = 1;
let name = "test";
let mut topic = get_topic();
let result = topic.create_consumer_group(consumer_group_id, name).await;
assert!(result.is_ok());
assert_eq!(topic.consumer_groups.len(), 1);
let result = topic
.delete_consumer_group(&Identifier::numeric(consumer_group_id).unwrap())
.await;
assert!(result.is_ok());
assert!(topic.consumer_groups.is_empty());
}
#[tokio::test]
async fn should_not_be_deleted_given_non_existing_group_with_same_id() {
let consumer_group_id = 1;
let name = "test";
let mut topic = get_topic();
let result = topic.create_consumer_group(consumer_group_id, name).await;
assert!(result.is_ok());
assert_eq!(topic.consumer_groups.len(), 1);
let consumer_group_id = consumer_group_id + 1;
let result = topic
.delete_consumer_group(&Identifier::numeric(consumer_group_id).unwrap())
.await;
assert!(result.is_err());
assert_eq!(topic.consumer_groups.len(), 1);
}
#[tokio::test]
async fn should_be_joined_by_new_member() {
let consumer_group_id = 1;
let name = "test";
let member_id = 1;
let mut topic = get_topic();
topic
.create_consumer_group(consumer_group_id, name)
.await
.unwrap();
let result = topic
.join_consumer_group(&Identifier::numeric(consumer_group_id).unwrap(), member_id)
.await;
assert!(result.is_ok());
let consumer_group = topic
.get_consumer_group(&Identifier::numeric(consumer_group_id).unwrap())
.unwrap()
.read()
.await;
let members = consumer_group.get_members();
assert_eq!(members.len(), 1);
}
#[tokio::test]
async fn should_be_left_by_existing_member() {
let consumer_group_id = 1;
let name = "test";
let member_id = 1;
let mut topic = get_topic();
topic
.create_consumer_group(consumer_group_id, name)
.await
.unwrap();
topic
.join_consumer_group(&Identifier::numeric(consumer_group_id).unwrap(), member_id)
.await
.unwrap();
let result = topic
.leave_consumer_group(&Identifier::numeric(consumer_group_id).unwrap(), member_id)
.await;
assert!(result.is_ok());
let consumer_group = topic
.get_consumer_group(&Identifier::numeric(consumer_group_id).unwrap())
.unwrap()
.read()
.await;
let members = consumer_group.get_members();
assert!(members.is_empty())
}
fn get_topic() -> Topic {
let storage = Arc::new(get_test_system_storage());
let stream_id = 1;
let id = 2;
let name = "test";
let partitions_count = 3;
let config = Arc::new(SystemConfig::default());
let size_of_parent_stream = Arc::new(AtomicU64::new(0));
let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0));
Topic::create(
stream_id,
id,
name,
partitions_count,
config,
storage,
size_of_parent_stream,
messages_count_of_parent_stream,
segments_count_of_parent_stream,
None,
None,
1,
)
.unwrap()
}
}