blob: 32eee38ce00894024a9da5784f984567e3fecff6 [file] [log] [blame]
use crate::shared::args::Args;
use iggy::client::Client;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::consumer::{Consumer, ConsumerKind};
use iggy::error::IggyError;
use iggy::identifier::Identifier;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::models::messages::PolledMessage;
use iggy::users::defaults::*;
use iggy::utils::expiry::IggyExpiry;
use tracing::info;
type MessageHandler = dyn Fn(&PolledMessage) -> Result<(), Box<dyn std::error::Error>>;
pub async fn login_root(client: &dyn Client) {
client
.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
.await
.unwrap();
}
pub async fn init_by_consumer(args: &Args, client: &dyn Client) {
let (stream_id, topic_id, partition_id) = (args.stream_id, args.topic_id, args.partition_id);
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
info!("Validating if stream: {} exists..", stream_id);
let stream = client.get_stream(&args.stream_id.try_into().unwrap()).await;
if stream.is_ok() {
info!("Stream: {} was found.", stream_id);
break;
}
interval.tick().await;
}
loop {
info!("Validating if topic: {} exists..", topic_id);
let topic = client
.get_topic(
&stream_id.try_into().unwrap(),
&topic_id.try_into().unwrap(),
)
.await;
if topic.is_err() {
interval.tick().await;
continue;
}
info!("Topic: {} was found.", topic_id);
let topic = topic.unwrap();
if topic.partitions_count >= partition_id {
break;
}
panic!(
"Topic: {} has only {} partition(s), but partition: {} was requested.",
topic_id, topic.partitions_count, partition_id
);
}
}
pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), IggyError> {
let stream = client.get_stream(&args.stream_id.try_into()?).await;
if stream.is_ok() {
return Ok(());
}
info!("Stream does not exist, creating...");
client.create_stream("sample", Some(args.stream_id)).await?;
client
.create_topic(
&args.stream_id.try_into()?,
"orders",
args.partitions_count,
CompressionAlgorithm::from_code(args.compression_algorithm)?,
None,
Some(args.topic_id),
IggyExpiry::NeverExpire,
None,
)
.await?;
Ok(())
}
pub async fn consume_messages(
args: &Args,
client: &dyn Client,
handle_message: &MessageHandler,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Messages will be polled by consumer: {} from stream: {}, topic: {}, partition: {} with interval {} ms.",
args.consumer_id, args.stream_id, args.topic_id, args.partition_id, args.interval);
let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.interval));
let mut consumed_batches = 0;
let consumer = Consumer {
kind: ConsumerKind::from_code(args.consumer_kind)?,
id: Identifier::numeric(args.consumer_id).unwrap(),
};
loop {
if args.message_batches_limit > 0 && consumed_batches == args.message_batches_limit {
info!("Consumed {consumed_batches} batches of messages, exiting.");
return Ok(());
}
let polled_messages = client
.poll_messages(
&args.stream_id.try_into()?,
&args.topic_id.try_into()?,
Some(args.partition_id),
&consumer,
&PollingStrategy::next(),
args.messages_per_batch,
true,
)
.await?;
if polled_messages.messages.is_empty() {
info!("No messages found.");
interval.tick().await;
continue;
}
consumed_batches += 1;
for message in polled_messages.messages {
handle_message(&message)?;
}
interval.tick().await;
}
}