blob: 7e3f399671f4d2c301916f38cb911d8822321319 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::time::{SystemTime, UNIX_EPOCH};
use mockall_double::double;
use prost_types::Timestamp;
use slog::{info, Logger};
#[double]
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, SendReceipt};
use crate::model::message;
use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl};
use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
use crate::util::{
build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
select_message_queue_by_message_group, HOST_NAME,
};
use crate::{log, pb};
/// [`Producer`] is the core struct, to which application developers should turn, when publishing messages to RocketMQ proxy.
///
/// [`Producer`] is a thin wrapper of internal client struct that shoulders the actual workloads.
/// Most of its methods take shared reference so that application developers may use it at will.
///
/// [`Producer`] is `Send` and `Sync` by design, so that developers may get started easily.
#[derive(Debug)]
pub struct Producer {
option: ProducerOption,
logger: Logger,
client: Client,
}
impl Producer {
const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message";
/// Create a new producer instance
///
/// # Arguments
///
/// * `option` - producer option
/// * `client_option` - client option
pub fn new(option: ProducerOption, client_option: ClientOption) -> Result<Self, ClientError> {
let client_option = ClientOption {
client_type: ClientType::Producer,
namespace: option.namespace().to_string(),
..client_option
};
let logger = log::logger(option.logging_format());
let settings = build_producer_settings(&option, &client_option);
let client = Client::new(&logger, client_option, settings)?;
Ok(Producer {
option,
logger,
client,
})
}
/// Create a new transaction producer instance
///
/// # Arguments
///
/// * `option` - producer option
/// * `client_option` - client option
/// * `transaction_checker` - handle server query for uncommitted transaction status
pub fn new_transaction_producer(
option: ProducerOption,
client_option: ClientOption,
transaction_checker: Box<TransactionChecker>,
) -> Result<Self, ClientError> {
let client_option = ClientOption {
client_type: ClientType::Producer,
namespace: option.namespace().to_string(),
..client_option
};
let logger = log::logger(option.logging_format());
let settings = build_producer_settings(&option, &client_option);
let mut client = Client::new(&logger, client_option, settings)?;
client.set_transaction_checker(transaction_checker);
Ok(Producer {
option,
logger,
client,
})
}
/// Start the producer
pub async fn start(&mut self) -> Result<(), ClientError> {
self.client.start().await?;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
}
}
info!(
self.logger,
"start producer success, client_id: {}",
self.client.client_id()
);
Ok(())
}
fn transform_messages_to_protobuf(
&self,
messages: Vec<impl message::Message>,
) -> Result<(String, Option<String>, Vec<pb::Message>), ClientError> {
if messages.is_empty() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"no message found",
Self::OPERATION_SEND_MESSAGE,
));
}
let born_timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(duration) => Some(Timestamp {
seconds: duration.as_secs() as i64,
nanos: 0,
}),
Err(_) => None,
};
let mut pb_messages = Vec::with_capacity(messages.len());
let mut last_topic: Option<String> = None;
let mut last_message_group: Option<Option<String>> = None;
for mut message in messages {
if let Some(last_topic) = last_topic.clone() {
if last_topic.ne(&message.take_topic()) {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"Not all messages have the same topic.",
Self::OPERATION_SEND_MESSAGE,
));
}
} else {
last_topic = Some(message.take_topic());
}
let mut message_group = message.take_message_group();
if let Some(last_message_group) = last_message_group.clone() {
if last_message_group.ne(&message_group) {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"not all messages have the same message group",
Self::OPERATION_SEND_MESSAGE,
));
}
} else {
last_message_group = Some(message_group.clone());
}
let mut delivery_timestamp = message
.take_delivery_timestamp()
.map(|seconds| Timestamp { seconds, nanos: 0 });
let message_type = if message.transaction_enabled() {
message_group = None;
delivery_timestamp = None;
MessageType::Transaction as i32
} else if delivery_timestamp.is_some() {
message_group = None;
MessageType::Delay as i32
} else if message_group.is_some() {
delivery_timestamp = None;
MessageType::Fifo as i32
} else {
MessageType::Normal as i32
};
let pb_message = pb::Message {
topic: Some(Resource {
name: message.take_topic(),
resource_namespace: self.option.namespace().to_string(),
}),
user_properties: message.take_properties(),
system_properties: Some(SystemProperties {
tag: message.take_tag(),
keys: message.take_keys(),
message_id: message.take_message_id(),
message_group,
delivery_timestamp,
message_type,
born_host: HOST_NAME.clone(),
born_timestamp: born_timestamp.clone(),
body_digest: None,
body_encoding: Encoding::Identity as i32,
..SystemProperties::default()
}),
body: message.take_body(),
};
pb_messages.push(pb_message);
}
let topic = last_topic.unwrap();
if topic.is_empty() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"message topic is empty",
Self::OPERATION_SEND_MESSAGE,
));
}
Ok((topic, last_message_group.unwrap(), pb_messages))
}
/// Send a single message
///
/// # Arguments
///
/// * `message` - the message to send
pub async fn send(&self, message: impl message::Message) -> Result<SendReceipt, ClientError> {
let results = self.batch_send(vec![message]).await?;
Ok(results[0].clone())
}
/// Send a batch of messages
///
/// # Arguments
///
/// * `messages` - A vector that holds the messages to send
pub async fn batch_send(
&self,
messages: Vec<impl message::Message>,
) -> Result<Vec<SendReceipt>, ClientError> {
let (topic, message_group, mut pb_messages) =
self.transform_messages_to_protobuf(messages)?;
let route = self.client.topic_route(&topic, true).await?;
let message_queue = if let Some(message_group) = message_group {
select_message_queue_by_message_group(route, message_group)
} else {
select_message_queue(route)
};
let endpoints =
build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?;
for message in pb_messages.iter_mut() {
message.system_properties.as_mut().unwrap().queue_id = message_queue.id;
}
self.client.send_message(&endpoints, pb_messages).await
}
/// Send message in a transaction
pub async fn send_transaction_message(
&self,
mut message: impl message::Message,
) -> Result<impl Transaction, ClientError> {
if !self.client.has_transaction_checker() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"this producer can not send transaction message, please create a transaction producer using producer::new_transaction_producer",
Self::OPERATION_SEND_TRANSACTION_MESSAGE,
));
}
let topic = message.take_topic();
let receipt = self.send(message).await?;
let rpc_client = self.client.get_session().await?;
Ok(TransactionImpl::new(
Box::new(rpc_client),
Resource {
resource_namespace: self.option.namespace().to_string(),
name: topic,
},
receipt,
))
}
pub async fn shutdown(self) -> Result<(), ClientError> {
self.client.shutdown().await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::error::ErrorKind;
use crate::log::terminal_logger;
use crate::model::common::Route;
use crate::model::message::{MessageBuilder, MessageImpl};
use crate::model::transaction::TransactionResolution;
use crate::pb::{Broker, MessageQueue};
use crate::session::Session;
use super::*;
fn new_producer_for_test() -> Producer {
Producer {
option: Default::default(),
logger: terminal_logger(),
client: Client::default(),
}
}
#[tokio::test]
async fn producer_start() -> Result<(), ClientError> {
let _m = crate::client::tests::MTX.lock();
let ctx = Client::new_context();
ctx.expect().return_once(|_, _, _| {
let mut client = Client::default();
client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
index: Default::default(),
queue: vec![],
}))
});
client.expect_start().returning(|| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
Ok(client)
});
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["DefaultCluster".to_string()]);
Producer::new(producer_option, ClientOption::default())?
.start()
.await?;
Ok(())
}
#[tokio::test]
async fn transaction_producer_start() -> Result<(), ClientError> {
let _m = crate::client::tests::MTX.lock();
let ctx = Client::new_context();
ctx.expect().return_once(|_, _, _| {
let mut client = Client::default();
client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
index: Default::default(),
queue: vec![],
}))
});
client.expect_start().returning(|| Ok(()));
client.expect_set_transaction_checker().returning(|_| ());
client
.expect_client_id()
.return_const("fake_id".to_string());
Ok(client)
});
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["DefaultCluster".to_string()]);
Producer::new_transaction_producer(
producer_option,
ClientOption::default(),
Box::new(|_, _| TransactionResolution::COMMIT),
)?
.start()
.await?;
Ok(())
}
#[tokio::test]
async fn producer_transform_messages_to_protobuf() {
let producer = new_producer_for_test();
let messages = vec![MessageBuilder::builder()
.set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
.set_tag("tag")
.set_keys(vec!["key"])
.set_properties(vec![("key", "value")].into_iter().collect())
.set_message_group("message_group".to_string())
.build()
.unwrap()];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_ok());
let (topic, message_group, pb_messages) = result.unwrap();
assert_eq!(topic, "DefaultCluster");
assert_eq!(message_group, Some("message_group".to_string()));
// check message
assert_eq!(pb_messages.len(), 1);
let message = pb_messages.get(0).unwrap().clone();
assert_eq!(message.topic.unwrap().name, "DefaultCluster");
let system_properties = message.system_properties.unwrap();
assert_eq!(system_properties.tag.unwrap(), "tag");
assert_eq!(system_properties.keys, vec!["key".to_string()]);
assert_eq!(message.user_properties.get("key").unwrap(), "value");
assert_eq!(std::str::from_utf8(&message.body).unwrap(), "hello world");
assert!(!system_properties.message_id.is_empty());
}
#[tokio::test]
async fn producer_transform_messages_to_protobuf_failed() {
let producer = new_producer_for_test();
let messages: Vec<MessageImpl> = vec![];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
assert_eq!(err.message, "no message found");
let messages = vec![MessageImpl {
message_id: "".to_string(),
topic: "".to_string(),
body: None,
tag: None,
keys: None,
properties: None,
message_group: None,
delivery_timestamp: None,
transaction_enabled: false,
}];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
assert_eq!(err.message, "message topic is empty");
let messages = vec![
MessageBuilder::builder()
.set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap(),
MessageBuilder::builder()
.set_topic("DefaultCluster_dup")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap(),
];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
assert_eq!(err.message, "Not all messages have the same topic.");
let messages = vec![
MessageBuilder::builder()
.set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
.set_message_group("message_group")
.build()
.unwrap(),
MessageBuilder::builder()
.set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
.set_message_group("message_group_dup")
.build()
.unwrap(),
];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
assert_eq!(err.message, "not all messages have the same message group");
}
#[tokio::test]
async fn producer_send() -> Result<(), ClientError> {
let mut producer = new_producer_for_test();
producer.client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
index: Default::default(),
queue: vec![MessageQueue {
topic: Some(Resource {
name: "test_topic".to_string(),
resource_namespace: "".to_string(),
}),
id: 0,
permission: 0,
broker: Some(Broker {
name: "".to_string(),
id: 0,
endpoints: Some(pb::Endpoints {
scheme: 0,
addresses: vec![],
}),
}),
accept_message_types: vec![],
}],
}))
});
producer.client.expect_send_message().returning(|_, _| {
Ok(vec![SendReceipt::from_pb_send_result(
&pb::SendResultEntry {
message_id: "message_id".to_string(),
transaction_id: "transaction_id".to_string(),
..pb::SendResultEntry::default()
},
)])
});
let result = producer
.send(
MessageBuilder::builder()
.set_topic("test_topic")
.set_body(vec![])
.build()
.unwrap(),
)
.await?;
assert_eq!(result.message_id(), "message_id");
assert_eq!(result.transaction_id(), "transaction_id");
Ok(())
}
#[tokio::test]
async fn producer_send_transaction_message() -> Result<(), ClientError> {
let mut producer = new_producer_for_test();
producer.client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
index: Default::default(),
queue: vec![MessageQueue {
topic: Some(Resource {
name: "test_topic".to_string(),
resource_namespace: "".to_string(),
}),
id: 0,
permission: 0,
broker: Some(Broker {
name: "".to_string(),
id: 0,
endpoints: Some(pb::Endpoints {
scheme: 0,
addresses: vec![],
}),
}),
accept_message_types: vec![],
}],
}))
});
producer.client.expect_send_message().returning(|_, _| {
Ok(vec![SendReceipt::from_pb_send_result(
&pb::SendResultEntry {
message_id: "message_id".to_string(),
transaction_id: "transaction_id".to_string(),
..pb::SendResultEntry::default()
},
)])
});
producer
.client
.expect_get_session()
.return_once(|| Ok(Session::mock()));
producer
.client
.expect_has_transaction_checker()
.return_once(|| true);
let _ = producer
.send_transaction_message(
MessageBuilder::builder()
.set_topic("test_topic")
.set_body(vec![])
.build()
.unwrap(),
)
.await?;
Ok(())
}
}