blob: 4b92e62a373fa82a282240ef661c689fb83e02d2 [file] [log] [blame]
use crate::shared::args::Args;
use iggy::client::Client;
use iggy::consumer::{Consumer, ConsumerKind};
use iggy::error::IggyError;
use iggy::identifier::Identifier;
use iggy::messages::poll_messages::{PollMessages, PollingStrategy};
use iggy::models::polled_messages::PolledMessage;
use iggy::streams::create_stream::CreateStream;
use iggy::streams::get_stream::GetStream;
use iggy::topics::create_topic::CreateTopic;
use iggy::topics::get_topic::GetTopic;
use iggy::users::defaults::*;
use iggy::users::login_user::LoginUser;
use tracing::info;
type MessageHandler = dyn Fn(&PolledMessage) -> Result<(), Box<dyn std::error::Error>>;
pub async fn login_root(client: &dyn Client) {
client
.login_user(&LoginUser {
username: DEFAULT_ROOT_USERNAME.to_string(),
password: DEFAULT_ROOT_PASSWORD.to_string(),
})
.await
.unwrap();
}
pub async fn init_by_consumer(args: &Args, client: &dyn Client) {
let (stream_id, topic_id, partition_id) = (args.stream_id, args.topic_id, args.partition_id);
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
info!("Validating if stream: {} exists..", stream_id);
let stream = client
.get_stream(&GetStream {
stream_id: Identifier::numeric(args.stream_id).unwrap(),
})
.await;
if stream.is_ok() {
info!("Stream: {} was found.", stream_id);
break;
}
interval.tick().await;
}
loop {
info!("Validating if topic: {} exists..", topic_id);
let topic = client
.get_topic(&GetTopic {
stream_id: Identifier::numeric(stream_id).unwrap(),
topic_id: Identifier::numeric(args.topic_id).unwrap(),
})
.await;
if topic.is_err() {
interval.tick().await;
continue;
}
info!("Topic: {} was found.", topic_id);
let topic = topic.unwrap();
if topic.partitions_count >= partition_id {
break;
}
panic!(
"Topic: {} has only {} partition(s), but partition: {} was requested.",
topic_id, topic.partitions_count, partition_id
);
}
}
pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), IggyError> {
let stream = client
.get_stream(&GetStream {
stream_id: Identifier::numeric(args.stream_id)?,
})
.await;
if stream.is_ok() {
return Ok(());
}
info!("Stream does not exist, creating...");
client
.create_stream(&CreateStream {
stream_id: Some(args.stream_id),
name: "sample".to_string(),
})
.await?;
client
.create_topic(&CreateTopic {
stream_id: Identifier::numeric(args.stream_id).unwrap(),
topic_id: Some(args.topic_id),
partitions_count: args.partitions_count,
name: "orders".to_string(),
message_expiry: None,
max_topic_size: None,
replication_factor: 1,
})
.await?;
Ok(())
}
pub async fn consume_messages(
args: &Args,
client: &dyn Client,
handle_message: &MessageHandler,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Messages will be polled by consumer: {} from stream: {}, topic: {}, partition: {} with interval {} ms.",
args.consumer_id, args.stream_id, args.topic_id, args.partition_id, args.interval);
let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.interval));
let mut consumed_batches = 0;
loop {
if args.message_batches_limit > 0 && consumed_batches == args.message_batches_limit {
info!("Consumed {consumed_batches} batches of messages, exiting.");
return Ok(());
}
let polled_messages = client
.poll_messages(&PollMessages {
consumer: Consumer {
kind: ConsumerKind::from_code(args.consumer_kind)?,
id: Identifier::numeric(args.consumer_id).unwrap(),
},
stream_id: Identifier::numeric(args.stream_id)?,
topic_id: Identifier::numeric(args.topic_id)?,
partition_id: Some(args.partition_id),
strategy: PollingStrategy::next(),
count: args.messages_per_batch,
auto_commit: true,
})
.await?;
if polled_messages.messages.is_empty() {
info!("No messages found.");
interval.tick().await;
continue;
}
consumed_batches += 1;
for message in polled_messages.messages {
handle_message(&message)?;
}
interval.tick().await;
}
}