blob: 8c53722da8ad35e276cfae2a1566c7140ff49a86 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::shared::args::Args;
use iggy::prelude::*;
use tracing::info;
type MessageHandler = dyn Fn(&IggyMessage) -> Result<(), Box<dyn std::error::Error>>;
pub async fn init_by_consumer(args: &Args, client: &dyn Client) {
let (stream_id, topic_id, partition_id) = (
args.stream_id.clone(),
args.topic_id.clone(),
args.partition_id,
);
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
let stream_id = stream_id.try_into().unwrap();
let topic_id = topic_id.try_into().unwrap();
loop {
interval.tick().await;
info!("Validating if stream: {stream_id} exists..");
let stream = client.get_stream(&stream_id).await;
if stream.is_err() {
continue;
}
let stream = stream.unwrap();
if stream.is_none() {
continue;
}
info!("Stream: {stream_id} was found.");
break;
}
loop {
interval.tick().await;
info!("Validating if topic: {} exists..", topic_id);
let topic = client.get_topic(&stream_id, &topic_id).await;
if topic.is_err() {
continue;
}
let topic = topic.unwrap();
if topic.is_none() {
continue;
}
let topic = topic.unwrap();
info!("Topic: {} was found.", topic_id);
if topic.partitions_count >= partition_id {
break;
}
panic!(
"Topic: {} has only {} partition(s), but partition: {} was requested.",
topic_id, topic.partitions_count, partition_id
);
}
}
pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), IggyError> {
let stream_id = args.stream_id.clone().try_into()?;
let topic_name = args.topic_id.clone();
let stream = client.get_stream(&stream_id).await?;
if stream.is_some() {
return Ok(());
}
info!("Stream does not exist, creating...");
client.create_stream(&args.stream_id).await?;
client
.create_topic(
&stream_id,
&topic_name,
args.partitions_count,
CompressionAlgorithm::from_code(args.compression_algorithm)?,
None,
IggyExpiry::NeverExpire,
MaxTopicSize::ServerDefault,
)
.await?;
Ok(())
}
pub async fn consume_messages(
args: &Args,
client: &dyn Client,
handle_message: &MessageHandler,
) -> Result<(), Box<dyn std::error::Error>> {
let interval = args.get_interval();
info!(
"Messages will be polled by consumer: {} from stream: {}, topic: {}, partition: {} with interval {}.",
args.consumer_id,
args.stream_id,
args.topic_id,
args.partition_id,
interval.map_or("none".to_string(), |i| i.as_human_time_string())
);
let stream_id = args.stream_id.clone().try_into()?;
let topic_id = args.topic_id.clone().try_into()?;
let mut interval = interval.map(|interval| tokio::time::interval(interval.get_duration()));
let mut consumed_batches = 0;
let consumer = Consumer {
kind: ConsumerKind::from_code(args.consumer_kind)?,
id: Identifier::numeric(args.consumer_id).unwrap(),
};
loop {
if args.message_batches_limit > 0 && consumed_batches == args.message_batches_limit {
info!("Consumed {consumed_batches} batches of messages, exiting.");
return Ok(());
}
if let Some(interval) = &mut interval {
interval.tick().await;
}
let polled_messages = client
.poll_messages(
&stream_id,
&topic_id,
Some(args.partition_id),
&consumer,
&PollingStrategy::next(),
args.messages_per_batch,
true,
)
.await?;
if polled_messages.messages.is_empty() {
info!("No messages found.");
continue;
}
consumed_batches += 1;
for message in polled_messages.messages {
handle_message(&message)?;
}
}
}