blob: e44f7cd9e739c5632e4e7a43c2a71d09caa57893 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::time::Duration;
use slog::{info, Logger};
use crate::client::Client;
use crate::conf::{ClientOption, SimpleConsumerOption};
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, FilterExpression};
use crate::model::message::{AckMessageEntry, MessageView};
use crate::util::{
build_endpoints_by_message_queue, build_simple_consumer_settings, select_message_queue,
};
use crate::{log, pb};
#[derive(Debug)]
pub struct SimpleConsumer {
option: SimpleConsumerOption,
logger: Logger,
client: Client,
}
impl SimpleConsumer {
const OPERATION_START_SIMPLE_CONSUMER: &'static str = "simple_consumer.start";
const OPERATION_RECEIVE_MESSAGE: &'static str = "simple_consumer.receive_message";
pub fn new(
option: SimpleConsumerOption,
client_option: ClientOption,
) -> Result<Self, ClientError> {
let client_option = ClientOption {
client_type: ClientType::SimpleConsumer,
group: option.consumer_group().to_string(),
namespace: option.namespace().to_string(),
..client_option
};
let logger = log::logger(option.logging_format());
let settings = build_simple_consumer_settings(&option, &client_option);
let client = Client::new(&logger, client_option, settings)?;
Ok(SimpleConsumer {
option,
logger,
client,
})
}
pub async fn start(&self) -> Result<(), ClientError> {
if self.option.consumer_group().is_empty() {
return Err(ClientError::new(
ErrorKind::Config,
"required option is missing: consumer group is empty",
Self::OPERATION_START_SIMPLE_CONSUMER,
));
}
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
}
}
self.client.start();
info!(
self.logger,
"start simple consumer success, client_id: {}",
self.client.client_id()
);
Ok(())
}
pub async fn receive(
&self,
topic: impl AsRef<str>,
expression: &FilterExpression,
) -> Result<Vec<MessageView>, ClientError> {
self.receive_with_batch_size(topic.as_ref(), expression, 32, Duration::from_secs(15))
.await
}
pub async fn receive_with_batch_size(
&self,
topic: &str,
expression: &FilterExpression,
batch_size: i32,
invisible_duration: Duration,
) -> Result<Vec<MessageView>, ClientError> {
let route = self.client.topic_route(topic, true).await?;
let message_queue = select_message_queue(route);
let endpoints =
build_endpoints_by_message_queue(&message_queue, Self::OPERATION_RECEIVE_MESSAGE)?;
let messages = self
.client
.receive_message(
&endpoints,
message_queue,
pb::FilterExpression {
r#type: expression.filter_type as i32,
expression: expression.expression.clone(),
},
batch_size,
prost_types::Duration::try_from(invisible_duration).unwrap(),
)
.await?;
Ok(messages
.into_iter()
.map(|message| MessageView::from_pb_message(message, endpoints.clone()))
.collect())
}
pub async fn ack(&self, ack_entry: impl AckMessageEntry) -> Result<(), ClientError> {
self.client.ack_message(ack_entry).await?;
Ok(())
}
}