blob: 95b7adcb0196f0b6fc5e273e12ef01d58692ed59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//! Configuration of RocketMQ rust client.
use std::time::Duration;
use crate::model::common::ClientType;
#[allow(unused_imports)]
use crate::producer::Producer;
#[allow(unused_imports)]
use crate::simple_consumer::SimpleConsumer;
/// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
pub struct ClientOption {
pub(crate) client_type: ClientType,
pub(crate) group: String,
pub(crate) namespace: String,
pub(crate) access_url: String,
pub(crate) enable_tls: bool,
pub(crate) timeout: Duration,
pub(crate) long_polling_timeout: Duration,
pub(crate) access_key: Option<String>,
pub(crate) secret_key: Option<String>,
}
impl Default for ClientOption {
fn default() -> Self {
ClientOption {
client_type: ClientType::Producer,
group: "".to_string(),
namespace: "".to_string(),
access_url: "localhost:8081".to_string(),
enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
access_key: None,
secret_key: None,
}
}
}
impl ClientOption {
/// Get the access url of RocketMQ proxy
pub fn access_url(&self) -> &str {
&self.access_url
}
/// Set the access url of RocketMQ proxy
pub fn set_access_url(&mut self, access_url: impl Into<String>) {
self.access_url = access_url.into();
}
/// Whether to enable tls
pub fn enable_tls(&self) -> bool {
self.enable_tls
}
/// Set whether to enable tls, default is true
pub fn set_enable_tls(&mut self, enable_tls: bool) {
self.enable_tls = enable_tls;
}
/// Get the timeout of connection and generic request
pub fn timeout(&self) -> &Duration {
&self.timeout
}
/// Set the timeout of connection and generic request, default is 3 seconds
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
/// Get the await duration during long polling
pub fn long_polling_timeout(&self) -> &Duration {
&self.long_polling_timeout
}
/// Set the await duration during long polling, default is 40 seconds
///
/// This option only affects receive requests, it means timeout for a receive request will be `long_polling_timeout` + `timeout`
pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
self.long_polling_timeout = long_polling_timeout;
}
/// Get the access key
pub fn access_key(&self) -> Option<&String> {
self.access_key.as_ref()
}
/// Set the access key
pub fn set_access_key(&mut self, access_key: impl Into<String>) {
self.access_key = Some(access_key.into());
}
/// Get the secret key
pub fn secret_key(&self) -> Option<&String> {
self.secret_key.as_ref()
}
/// Set the secret key
pub fn set_secret_key(&mut self, secret_key: impl Into<String>) {
self.secret_key = Some(secret_key.into());
}
}
/// Log format for output.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LoggingFormat {
/// Print log in terminal
Terminal,
/// Print log in json file
Json,
}
/// The configuration of [`Producer`].
#[derive(Debug, Clone)]
pub struct ProducerOption {
logging_format: LoggingFormat,
prefetch_route: bool,
topics: Option<Vec<String>>,
namespace: String,
validate_message_type: bool,
}
impl Default for ProducerOption {
fn default() -> Self {
ProducerOption {
logging_format: LoggingFormat::Terminal,
prefetch_route: true,
topics: None,
namespace: "".to_string(),
validate_message_type: true,
}
}
}
impl ProducerOption {
/// Get the logging format of producer
pub fn logging_format(&self) -> &LoggingFormat {
&self.logging_format
}
/// Set the logging format for producer
pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
self.logging_format = logging_format;
}
/// Whether to prefetch route info
pub fn prefetch_route(&self) -> &bool {
&self.prefetch_route
}
/// Set whether to prefetch route info, default is true
pub fn set_prefetch_route(&mut self, prefetch_route: bool) {
self.prefetch_route = prefetch_route;
}
/// Get which topic(s) to messages to
pub fn topics(&self) -> &Option<Vec<String>> {
&self.topics
}
/// Set which topic(s) to messages to, it will prefetch route info for these topics when the producer starts
pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
}
// not expose to user for now
pub(crate) fn namespace(&self) -> &str {
&self.namespace
}
pub(crate) fn set_namespace(&mut self, name_space: impl Into<String>) {
self.namespace = name_space.into();
}
/// Whether to validate message type
pub fn validate_message_type(&self) -> bool {
self.validate_message_type
}
/// Set whether to validate message type, default is true
pub fn set_validate_message_type(&mut self, validate_message_type: bool) {
self.validate_message_type = validate_message_type;
}
}
/// The configuration of [`SimpleConsumer`].
#[derive(Debug, Clone)]
pub struct SimpleConsumerOption {
logging_format: LoggingFormat,
consumer_group: String,
prefetch_route: bool,
topics: Option<Vec<String>>,
namespace: String,
}
impl Default for SimpleConsumerOption {
fn default() -> Self {
SimpleConsumerOption {
logging_format: LoggingFormat::Terminal,
consumer_group: "".to_string(),
prefetch_route: true,
topics: None,
namespace: "".to_string(),
}
}
}
impl SimpleConsumerOption {
/// Set the logging format of simple consumer
pub fn logging_format(&self) -> &LoggingFormat {
&self.logging_format
}
/// set the logging format for simple consumer
pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
self.logging_format = logging_format;
}
/// Get the consumer group of simple consumer
pub fn consumer_group(&self) -> &str {
&self.consumer_group
}
/// Set the consumer group of simple consumer
pub fn set_consumer_group(&mut self, consumer_group: impl Into<String>) {
self.consumer_group = consumer_group.into();
}
/// Whether to prefetch route info
pub fn prefetch_route(&self) -> &bool {
&self.prefetch_route
}
/// Set whether to prefetch route info, default is true
pub fn set_prefetch_route(&mut self, prefetch_route: bool) {
self.prefetch_route = prefetch_route;
}
/// Set which topic(s) to receive messages
pub fn topics(&self) -> &Option<Vec<String>> {
&self.topics
}
/// Set which topic(s) to receive messages, it will prefetch route info for these topics when the simple consumer starts
pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
}
// not expose to user for now
pub(crate) fn namespace(&self) -> &str {
&self.namespace
}
pub(crate) fn set_namespace(&mut self, name_space: impl Into<String>) {
self.namespace = name_space.into();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn conf_client_option() {
let option = ClientOption::default();
assert_eq!(option.access_url(), "localhost:8081");
assert!(option.enable_tls());
assert_eq!(option.timeout(), &Duration::from_secs(3));
assert_eq!(option.long_polling_timeout(), &Duration::from_secs(40));
}
#[test]
fn conf_producer_option() {
let option = ProducerOption::default();
assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
assert!(option.prefetch_route());
assert!(option.validate_message_type());
}
#[test]
fn conf_simple_consumer_option() {
let option = SimpleConsumerOption::default();
assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
assert!(option.prefetch_route());
}
}