blob: 253bc9401ab55e8b90fc445b38207e9d9ef8403e [file] [log] [blame]
use crate::streaming::utils::hash;
use iggy::identifier::{IdKind, Identifier};
use std::fmt::{Display, Formatter};
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum PollingConsumer {
Consumer(u32, u32), // Consumer ID + Partition ID
ConsumerGroup(u32, u32), // Consumer Group ID + Member ID
}
impl PollingConsumer {
pub fn consumer(consumer_id: &Identifier, partition_id: u32) -> Self {
PollingConsumer::Consumer(Self::resolve_consumer_id(consumer_id), partition_id)
}
pub fn consumer_group(consumer_group_id: u32, member_id: u32) -> Self {
PollingConsumer::ConsumerGroup(consumer_group_id, member_id)
}
pub fn resolve_consumer_id(identifier: &Identifier) -> u32 {
match identifier.kind {
IdKind::Numeric => identifier.get_u32_value().unwrap(),
IdKind::String => hash::calculate_32(&identifier.value),
}
}
}
impl Display for PollingConsumer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
PollingConsumer::Consumer(consumer_id, partition_id) => write!(
f,
"consumer ID: {}, partition ID: {}",
consumer_id, partition_id
),
PollingConsumer::ConsumerGroup(consumer_group_id, member_id) => {
write!(
f,
"consumer group ID: {}, member ID: {}",
consumer_group_id, member_id
)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use iggy::consumer::Consumer;
#[test]
fn given_consumer_with_numeric_id_polling_consumer_should_be_created() {
let consumer_id_value = 1;
let partition_id = 3;
let consumer_id = Identifier::numeric(consumer_id_value).unwrap();
let consumer = Consumer::new(consumer_id);
let polling_consumer = PollingConsumer::consumer(&consumer.id, partition_id);
assert_eq!(
polling_consumer,
PollingConsumer::Consumer(consumer_id_value, partition_id)
);
}
#[test]
fn given_consumer_with_named_id_polling_consumer_should_be_created() {
let consumer_name = "consumer";
let partition_id = 3;
let consumer_id = Identifier::named(consumer_name).unwrap();
let consumer = Consumer::new(consumer_id);
let resolved_consumer_id = PollingConsumer::resolve_consumer_id(&consumer.id);
let polling_consumer = PollingConsumer::consumer(&consumer.id, partition_id);
assert_eq!(
polling_consumer,
PollingConsumer::Consumer(resolved_consumer_id, partition_id)
);
}
#[test]
fn given_consumer_group_with_numeric_id_polling_consumer_group_should_be_created() {
let group_id = 1;
let client_id = 2;
let polling_consumer = PollingConsumer::consumer_group(group_id, client_id);
match polling_consumer {
PollingConsumer::ConsumerGroup(consumer_group_id, member_id) => {
assert_eq!(consumer_group_id, group_id);
assert_eq!(member_id, client_id);
}
_ => panic!("Expected ConsumerGroup"),
}
}
#[test]
fn given_distinct_named_ids_unique_polling_consumer_ids_should_be_created() {
let name1 = Identifier::named("consumer1").unwrap();
let name2 = Identifier::named("consumer2").unwrap();
let id1 = PollingConsumer::resolve_consumer_id(&name1);
let id2 = PollingConsumer::resolve_consumer_id(&name2);
assert_ne!(id1, id2);
}
}