doc(rust): add rust doc (#485)
* doc(rust): add rust doc
* build(rust): set msrv to 1.61
* doc(rust): update doc following rust doc spec
* ci(rust): update cargo test arg
diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml
index 30ea791..a37415f 100644
--- a/.github/workflows/rust_build.yml
+++ b/.github/workflows/rust_build.yml
@@ -26,4 +26,4 @@
- name: Unit Test
working-directory: ./rust
- run: cargo test
\ No newline at end of file
+ run: cargo test -- --nocapture
\ No newline at end of file
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index dae0b60..689ab0e 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -18,6 +18,18 @@
name = "rocketmq"
version = "0.1.0"
edition = "2021"
+rust-version = "1.61"
+authors = [
+ "SSpirits <admin@lv5.moe>",
+ "Zhanhui Li <lizhanhui@gmail.com>",
+]
+
+license = "MIT/Apache-2.0"
+readme = "./README.md"
+repository = "https://github.com/apache/rocketmq-clients"
+documentation = "https://docs.rs/rocketmq"
+description = "Rust client for Apache RocketMQ"
+keywords = ["rocketmq", "api", "client", "sdk", "grpc"]
[dependencies]
tokio = { version = "1", features = ["full"] }
@@ -42,16 +54,15 @@
opentelemetry = { version = "0.19.0", features = ["metrics", "rt-tokio"] }
opentelemetry-otlp = { version = "0.12.0", features = ["metrics", "grpc-tonic"] }
+minitrace = "0.4"
byteorder = "1"
mac_address = "1.1.4"
hex = "0.4.3"
-time = "0.3.19"
+time = "0.3"
once_cell = "1.9.0"
tokio-stream="0.1.12"
-minitrace = "0.4.1"
-
mockall = "0.11.4"
mockall_double= "0.3.0"
diff --git a/rust/README.md b/rust/README.md
index e8621d5..0181494 100644
--- a/rust/README.md
+++ b/rust/README.md
@@ -6,13 +6,13 @@
Here is the rust implementation of the client for [Apache RocketMQ](https://rocketmq.apache.org/). Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client), the current implementation is based on separating architecture for computing and storage, which is the more recommended way to access the RocketMQ service.
-Here are some preparations you may need to know (or refer to [here](https://rocketmq.apache.org/docs/quickStart/02quickstart)).
+Here are some preparations you may need to know [Quick Start](https://rocketmq.apache.org/docs/quickStart/02quickstart).
## Getting Started
### Requirements
-1. rust and cargo
+1. rust toolchain, rocketmq's MSRV is 1.61.
2. protoc 3.15.0+
3. setup name server, broker, and [proxy](https://github.com/apache/rocketmq/tree/develop/proxy).
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index c1b00d1..a7ad0ab 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
use rocketmq::conf::{ClientOption, ProducerOption};
-use rocketmq::model::message::MessageImpl;
+use rocketmq::model::message::MessageBuilder;
use rocketmq::Producer;
#[tokio::main]
@@ -34,9 +34,9 @@
producer.start().await.unwrap();
// build message
- let message = MessageImpl::builder()
+ let message = MessageBuilder::builder()
.set_topic("test_topic")
- .set_tags("test_tag")
+ .set_tag("test_tag")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap();
diff --git a/rust/src/client.rs b/rust/src/client.rs
index fe5cf34..11e4419 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -486,7 +486,8 @@
}
#[cfg(test)]
-mod tests {
+pub(crate) mod tests {
+ use lazy_static::lazy_static;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
@@ -507,6 +508,11 @@
use super::*;
+ lazy_static! {
+ // The lock is used to prevent the mocking static function at same time during parallel testing.
+ pub(crate) static ref MTX: Mutex<()> = Mutex::new(());
+ }
+
fn new_client_for_test() -> Client {
Client {
logger: terminal_logger(),
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index bb6faf6..7ecb691 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -14,9 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+//! Configuration of RocketMQ rust client.
+
use crate::model::common::ClientType;
+#[allow(unused_imports)]
+use crate::producer::Producer;
+#[allow(unused_imports)]
+use crate::simple_consumer::SimpleConsumer;
use std::time::Duration;
+/// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
pub struct ClientOption {
pub(crate) client_type: ClientType,
@@ -43,41 +51,55 @@
}
impl ClientOption {
+ /// Get the access url of RocketMQ proxy
pub fn access_url(&self) -> &str {
&self.access_url
}
+ /// Set the access url of RocketMQ proxy
pub fn set_access_url(&mut self, access_url: impl Into<String>) {
self.access_url = access_url.into();
}
+ /// Whether to enable tls
pub fn enable_tls(&self) -> bool {
self.enable_tls
}
+ /// Set whether to enable tls, default is true
pub fn set_enable_tls(&mut self, enable_tls: bool) {
self.enable_tls = enable_tls;
}
+ /// Get the timeout of connection and generic request
pub fn timeout(&self) -> &Duration {
&self.timeout
}
+ /// Set the timeout of connection and generic request, default is 3 seconds
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
+ /// Get the await duration during long polling
pub fn long_polling_timeout(&self) -> &Duration {
&self.long_polling_timeout
}
+ /// Set the await duration during long polling, default is 40 seconds
+ ///
+ /// This option only affects receive requests, it means timeout for a receive request will be `long_polling_timeout` + `timeout`
pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
self.long_polling_timeout = long_polling_timeout;
}
}
+/// Log format for output.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LoggingFormat {
+ /// Print log in terminal
Terminal,
+ /// Print log in json file
Json,
}
+/// The configuration of [`Producer`].
#[derive(Debug, Clone)]
pub struct ProducerOption {
logging_format: LoggingFormat,
@@ -100,23 +122,29 @@
}
impl ProducerOption {
+ /// Get the logging format of producer
pub fn logging_format(&self) -> &LoggingFormat {
&self.logging_format
}
+ /// Set the logging format for producer
pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
self.logging_format = logging_format;
}
+ /// Whether to prefetch route info
pub fn prefetch_route(&self) -> &bool {
&self.prefetch_route
}
+ /// Set whether to prefetch route info, default is true
pub fn set_prefetch_route(&mut self, prefetch_route: bool) {
self.prefetch_route = prefetch_route;
}
+ /// Get which topic(s) to messages to
pub fn topics(&self) -> &Option<Vec<String>> {
&self.topics
}
+ /// Set which topic(s) to messages to, it will prefetch route info for these topics when the producer starts
pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
}
@@ -129,14 +157,17 @@
self.namespace = name_space.into();
}
+ /// Whether to validate message type
pub fn validate_message_type(&self) -> bool {
self.validate_message_type
}
+ /// Set whether to validate message type, default is true
pub fn set_validate_message_type(&mut self, validate_message_type: bool) {
self.validate_message_type = validate_message_type;
}
}
+/// The configuration of [`SimpleConsumer`].
#[derive(Debug, Clone)]
pub struct SimpleConsumerOption {
logging_format: LoggingFormat,
@@ -159,30 +190,38 @@
}
impl SimpleConsumerOption {
+ /// Set the logging format of simple consumer
pub fn logging_format(&self) -> &LoggingFormat {
&self.logging_format
}
+ /// set the logging format for simple consumer
pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
self.logging_format = logging_format;
}
+ /// Get the consumer group of simple consumer
pub fn consumer_group(&self) -> &str {
&self.consumer_group
}
+ /// Set the consumer group of simple consumer
pub fn set_consumer_group(&mut self, consumer_group: impl Into<String>) {
self.consumer_group = consumer_group.into();
}
+ /// Whether to prefetch route info
pub fn prefetch_route(&self) -> &bool {
&self.prefetch_route
}
+ /// Set whether to prefetch route info, default is true
pub fn set_prefetch_route(&mut self, prefetch_route: bool) {
self.prefetch_route = prefetch_route;
}
+ /// Set which topic(s) to receive messages
pub fn topics(&self) -> &Option<Vec<String>> {
&self.topics
}
+ /// Set which topic(s) to receive messages, it will prefetch route info for these topics when the simple consumer starts
pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
}
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 7152611..1a13a5a 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -14,6 +14,111 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+//! # The Rust Implementation of Apache RocketMQ Client
+//!
+//! Here is the official rust client for [Apache RocketMQ](https://rocketmq.apache.org/)
+//! providing async/await API powered by tokio runtime.
+//!
+//! Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client),
+//! the current implementation is based on separating architecture for computing and storage,
+//! which is the more recommended way to access the RocketMQ service.
+//!
+//! Here are some preparations you may need to know: [RocketMQ Quick Start](https://rocketmq.apache.org/docs/quickStart/02quickstart).
+//!
+//! ## Examples
+//!
+//! Basic usage:
+//!
+//! ### Producer
+//! ```rust,no_run
+//! use rocketmq::conf::{ClientOption, ProducerOption};
+//! use rocketmq::model::message::MessageBuilder;
+//! use rocketmq::Producer;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! // recommend to specify which topic(s) you would like to send message to
+//! // producer will prefetch topic route when start and failed fast if topic not exist
+//! let mut producer_option = ProducerOption::default();
+//! producer_option.set_topics(vec!["test_topic"]);
+//!
+//! // set which rocketmq proxy to connect
+//! let mut client_option = ClientOption::default();
+//! client_option.set_access_url("localhost:8081");
+//!
+//! // build and start producer
+//! let producer = Producer::new(producer_option, client_option).unwrap();
+//! producer.start().await.unwrap();
+//!
+//! // build message
+//! let message = MessageBuilder::builder()
+//! .set_topic("test_topic")
+//! .set_tag("test_tag")
+//! .set_body("hello world".as_bytes().to_vec())
+//! .build()
+//! .unwrap();
+//!
+//! // send message to rocketmq proxy
+//! let result = producer.send_one(message).await;
+//! debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+//! println!(
+//! "send message success, message_id={}",
+//! result.unwrap().message_id
+//! );
+//! }
+//! ```
+//!
+//! ### Simple Consumer
+//! ```rust,no_run
+//! use rocketmq::conf::{ClientOption, SimpleConsumerOption};
+//! use rocketmq::model::common::{FilterExpression, FilterType};
+//! use rocketmq::SimpleConsumer;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! // recommend to specify which topic(s) you would like to send message to
+//! // simple consumer will prefetch topic route when start and failed fast if topic not exist
+//! let mut consumer_option = SimpleConsumerOption::default();
+//! consumer_option.set_topics(vec!["test_topic"]);
+//! consumer_option.set_consumer_group("SimpleConsumerGroup");
+//!
+//! // set which rocketmq proxy to connect
+//! let mut client_option = ClientOption::default();
+//! client_option.set_access_url("localhost:8081");
+//!
+//! // build and start simple consumer
+//! let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
+//! consumer.start().await.unwrap();
+//!
+//! // pop message from rocketmq proxy
+//! let receive_result = consumer
+//! .receive(
+//! "test_topic".to_string(),
+//! &FilterExpression::new(FilterType::Tag, "test_tag"),
+//! )
+//! .await;
+//! debug_assert!(
+//! receive_result.is_ok(),
+//! "receive message failed: {:?}",
+//! receive_result.unwrap_err()
+//! );
+//!
+//! let messages = receive_result.unwrap();
+//! for message in messages {
+//! println!("receive message: {:?}", message);
+//! // ack message to rocketmq proxy
+//! let ack_result = consumer.ack(message).await;
+//! debug_assert!(
+//! ack_result.is_ok(),
+//! "ack message failed: {:?}",
+//! ack_result.unwrap_err()
+//! );
+//! }
+//! }
+//! ```
+//!
+
#[allow(dead_code)]
pub mod conf;
mod error;
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index a488b15..4023b95 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -14,6 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+//! Common data model of RocketMQ rust client.
+
use std::net::IpAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
@@ -45,6 +48,7 @@
Found(Arc<Route>),
}
+/// Access points for receive messages or querying topic routes.
#[derive(Debug, Clone)]
pub struct Endpoints {
endpoint_url: String,
@@ -58,7 +62,7 @@
const ENDPOINT_SEPARATOR: &'static str = ",";
const ADDRESS_SEPARATOR: &'static str = ":";
- pub fn from_url(endpoint_url: &str) -> Result<Self, ClientError> {
+ pub(crate) fn from_url(endpoint_url: &str) -> Result<Self, ClientError> {
if endpoint_url.is_empty() {
return Err(ClientError::new(
ErrorKind::Config,
@@ -161,42 +165,67 @@
}
}
+ /// Get endpoint url
pub fn endpoint_url(&self) -> &str {
&self.endpoint_url
}
+ /// Get address scheme of endpoint
pub fn scheme(&self) -> AddressScheme {
self.scheme
}
- pub fn inner(&self) -> &pb::Endpoints {
+ pub(crate) fn inner(&self) -> &pb::Endpoints {
&self.inner
}
- pub fn into_inner(self) -> pb::Endpoints {
+ #[allow(dead_code)]
+ pub(crate) fn into_inner(self) -> pb::Endpoints {
self.inner
}
}
+/// Filter type for message filtering.
+///
+/// RocketMQ allows to filter messages by tag or SQL.
#[derive(Clone, Copy)]
#[repr(i32)]
pub enum FilterType {
+ /// Filter by tag
Tag = 1,
+ /// Filter by SQL
Sql = 2,
}
+/// Filter expression for message filtering.
pub struct FilterExpression {
pub(crate) filter_type: FilterType,
pub(crate) expression: String,
}
impl FilterExpression {
+ /// Create a new filter expression
+ ///
+ /// # Arguments
+ ///
+ /// * `filter_type` - set filter type
+ /// * `expression` - set message tag or SQL query string
pub fn new(filter_type: FilterType, expression: impl Into<String>) -> Self {
FilterExpression {
filter_type,
expression: expression.into(),
}
}
+
+ /// Get filter type
+ pub fn filter_type(&self) -> FilterType {
+ self.filter_type
+ }
+
+ /// Get message tag or SQL query string
+ pub fn expression(&self) -> &str {
+ &self.expression
+ }
}
#[cfg(test)]
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 5ca53df..79a0250 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -14,12 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+//! Message data model of RocketMQ rust client.
+
use crate::error::{ClientError, ErrorKind};
use crate::model::common::Endpoints;
use crate::model::message_id::UNIQ_ID_GENERATOR;
use crate::pb;
use std::collections::HashMap;
+/// [`Message`] is the data model for sending.
pub trait Message {
fn take_message_id(&mut self) -> String;
fn take_topic(&mut self) -> String;
@@ -31,12 +35,11 @@
fn take_delivery_timestamp(&mut self) -> Option<i64>;
}
-#[derive(Debug)]
-pub struct MessageImpl {
+pub(crate) struct MessageImpl {
pub(crate) message_id: String,
pub(crate) topic: String,
pub(crate) body: Option<Vec<u8>>,
- pub(crate) tags: Option<String>,
+ pub(crate) tag: Option<String>,
pub(crate) keys: Option<Vec<String>>,
pub(crate) properties: Option<HashMap<String, String>>,
pub(crate) message_group: Option<String>,
@@ -57,7 +60,7 @@
}
fn take_tag(&mut self) -> Option<String> {
- self.tags.take()
+ self.tag.take()
}
fn take_keys(&mut self) -> Vec<String> {
@@ -77,14 +80,22 @@
}
}
-impl MessageImpl {
+/// [`MessageBuilder`] is the builder for [`Message`].
+pub struct MessageBuilder {
+ message: MessageImpl,
+}
+
+impl MessageBuilder {
+ const OPERATION_BUILD_MESSAGE: &'static str = "build_message";
+
+ /// Create a new [`MessageBuilder`] for building a message. [Read more](https://rocketmq.apache.org/docs/domainModel/04message/)
pub fn builder() -> MessageBuilder {
MessageBuilder {
message: MessageImpl {
message_id: UNIQ_ID_GENERATOR.lock().next_id(),
topic: "".to_string(),
body: None,
- tags: None,
+ tag: None,
keys: None,
properties: None,
message_group: None,
@@ -93,6 +104,13 @@
}
}
+ /// Create a new [`MessageBuilder`] for building a fifo message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/03fifomessage)
+ ///
+ /// # Arguments
+ ///
+ /// * `topic` - topic of the message
+ /// * `body` - message body
+ /// * `message_group` - message group, messages with same message group will be delivered in FIFO order
pub fn fifo_message_builder(
topic: impl Into<String>,
body: Vec<u8>,
@@ -103,7 +121,7 @@
message_id: UNIQ_ID_GENERATOR.lock().next_id(),
topic: topic.into(),
body: Some(body),
- tags: None,
+ tag: None,
keys: None,
properties: None,
message_group: Some(message_group.into()),
@@ -112,6 +130,13 @@
}
}
+ /// Create a new [`MessageBuilder`] for building a delay message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/02delaymessage)
+ ///
+ /// # Arguments
+ ///
+ /// * `topic` - topic of the message
+ /// * `body` - message body
+ /// * `delay_time` - delivery timestamp of message, specify when to deliver the message
pub fn delay_message_builder(
topic: impl Into<String>,
body: Vec<u8>,
@@ -122,7 +147,7 @@
message_id: UNIQ_ID_GENERATOR.lock().next_id(),
topic: topic.into(),
body: Some(body),
- tags: None,
+ tag: None,
keys: None,
properties: None,
message_group: None,
@@ -130,35 +155,32 @@
},
}
}
-}
-pub struct MessageBuilder {
- message: MessageImpl,
-}
-
-impl MessageBuilder {
- const OPERATION_BUILD_MESSAGE: &'static str = "build_message";
-
+ /// Set topic for message, which is required
pub fn set_topic(mut self, topic: impl Into<String>) -> Self {
self.message.topic = topic.into();
self
}
+ /// Set message body, which is required
pub fn set_body(mut self, body: Vec<u8>) -> Self {
self.message.body = Some(body);
self
}
- pub fn set_tags(mut self, tags: impl Into<String>) -> Self {
- self.message.tags = Some(tags.into());
+ /// Set message tag
+ pub fn set_tag(mut self, tag: impl Into<String>) -> Self {
+ self.message.tag = Some(tag.into());
self
}
+ /// Set message keys
pub fn set_keys(mut self, keys: Vec<impl Into<String>>) -> Self {
self.message.keys = Some(keys.into_iter().map(|k| k.into()).collect());
self
}
+ /// Set message properties
pub fn set_properties(
mut self,
properties: HashMap<impl Into<String>, impl Into<String>>,
@@ -172,11 +194,17 @@
self
}
+ /// Set message group, which is required for fifo message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/03fifomessage)
+ ///
+ /// The message group could not be set with delivery timestamp at the same time
pub fn set_message_group(mut self, message_group: impl Into<String>) -> Self {
self.message.message_group = Some(message_group.into());
self
}
+ /// Set delivery timestamp, which is required for delay message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/02delaymessage)
+ ///
+ /// The delivery timestamp could not be set with message group at the same time
pub fn set_delivery_timestamp(mut self, delivery_timestamp: i64) -> Self {
self.message.delivery_timestamp = Some(delivery_timestamp);
self
@@ -197,7 +225,8 @@
Ok(())
}
- pub fn build(self) -> Result<MessageImpl, ClientError> {
+ /// Build message
+ pub fn build(self) -> Result<impl Message, ClientError> {
self.check_message().map_err(|e| {
ClientError::new(ErrorKind::InvalidMessage, &e, Self::OPERATION_BUILD_MESSAGE)
})?;
@@ -205,6 +234,7 @@
}
}
+/// [`AckMessageEntry`] is the data model for ack message.
pub trait AckMessageEntry {
fn topic(&self) -> String;
fn message_id(&self) -> String;
@@ -212,6 +242,9 @@
fn endpoints(&self) -> &Endpoints;
}
+/// [`MessageView`] is the data model for receive message.
+///
+/// [`MessageView`] has implemented [`AckMessageEntry`] trait.
#[derive(Debug)]
pub struct MessageView {
pub(crate) message_id: String,
@@ -267,46 +300,57 @@
}
}
+ /// Get message id
pub fn message_id(&self) -> &str {
&self.message_id
}
+ /// Get topic of message
pub fn topic(&self) -> &str {
&self.topic
}
+ /// Get message body
pub fn body(&self) -> &[u8] {
&self.body
}
+ /// Get message tag
pub fn tag(&self) -> Option<&str> {
self.tag.as_deref()
}
+ /// Get message keys
pub fn keys(&self) -> &[String] {
&self.keys
}
+ /// Get message properties
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
+ /// Get message group of fifo message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/03fifomessage)
pub fn message_group(&self) -> Option<&str> {
self.message_group.as_deref()
}
+ /// Get delivery timestamp of delay message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/02delaymessage)
pub fn delivery_timestamp(&self) -> Option<i64> {
self.delivery_timestamp
}
+ /// Get born host of message
pub fn born_host(&self) -> &str {
&self.born_host
}
+ /// Get born timestamp of message
pub fn born_timestamp(&self) -> i64 {
self.born_timestamp
}
+ /// Get delivery attempt of message
pub fn delivery_attempt(&self) -> i32 {
self.delivery_attempt
}
@@ -320,10 +364,10 @@
fn common_test_message() {
let mut properties = HashMap::new();
properties.insert("key", "value".to_string());
- let message = MessageImpl::builder()
+ let message = MessageBuilder::builder()
.set_topic("test")
.set_body(vec![1, 2, 3])
- .set_tags("tag")
+ .set_tag("tag")
.set_keys(vec!["key"])
.set_properties(properties)
.build();
@@ -340,14 +384,14 @@
properties
});
- let message = MessageImpl::builder()
+ let message = MessageBuilder::builder()
.set_topic("test")
.set_body(vec![1, 2, 3])
.set_message_group("message_group")
.set_delivery_timestamp(123456789)
.build();
assert!(message.is_err());
- let err = message.unwrap_err();
+ let err = message.err().unwrap();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
assert_eq!(
err.message,
@@ -355,14 +399,15 @@
);
let message =
- MessageImpl::fifo_message_builder("test", vec![1, 2, 3], "message_group").build();
+ MessageBuilder::fifo_message_builder("test", vec![1, 2, 3], "message_group").build();
let mut message = message.unwrap();
assert_eq!(
message.take_message_group(),
Some("message_group".to_string())
);
- let message = MessageImpl::delay_message_builder("test", vec![1, 2, 3], 123456789).build();
+ let message =
+ MessageBuilder::delay_message_builder("test", vec![1, 2, 3], 123456789).build();
let mut message = message.unwrap();
assert_eq!(message.take_delivery_timestamp(), Some(123456789));
}
diff --git a/rust/src/model/mod.rs b/rust/src/model/mod.rs
index aef3261..8ac95e2 100644
--- a/rust/src/model/mod.rs
+++ b/rust/src/model/mod.rs
@@ -14,6 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+//! Data model of RocketMQ rust client.
+
pub mod common;
pub mod message;
pub(crate) mod message_id;
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index fc56567..697b70e 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -15,11 +15,6 @@
* limitations under the License.
*/
-//! Publish messages of various types to brokers.
-//!
-//! `Producer` is a thin wrapper of internal `Client` struct that shoulders the actual workloads.
-//! Most of its methods take shared reference so that application developers may use it at will.
-
use std::time::{SystemTime, UNIX_EPOCH};
use mockall_double::double;
@@ -39,9 +34,12 @@
};
use crate::{log, pb};
-/// `Producer` is the core struct, to which application developers should turn, when publishing messages to brokers.
+/// [`Producer`] is the core struct, to which application developers should turn, when publishing messages to RocketMQ proxy.
///
-/// `Producer` is `Send` and `Sync` by design, so that developers may get started easily.
+/// [`Producer`] is a thin wrapper of internal client struct that shoulders the actual workloads.
+/// Most of its methods take shared reference so that application developers may use it at will.
+///
+/// [`Producer`] is `Send` and `Sync` by design, so that developers may get started easily.
#[derive(Debug)]
pub struct Producer {
option: ProducerOption,
@@ -52,6 +50,12 @@
impl Producer {
const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
+ /// Create a new producer instance
+ ///
+ /// # Arguments
+ ///
+ /// * `option` - producer option
+ /// * `client_option` - client option
pub fn new(option: ProducerOption, client_option: ClientOption) -> Result<Self, ClientError> {
let client_option = ClientOption {
client_type: ClientType::Producer,
@@ -68,6 +72,7 @@
})
}
+ /// Start the producer
pub async fn start(&self) -> Result<(), ClientError> {
if let Some(topics) = self.option.topics() {
for topic in topics {
@@ -172,6 +177,11 @@
Ok((topic, last_message_group.unwrap(), pb_messages))
}
+ /// Send a single message
+ ///
+ /// # Arguments
+ ///
+ /// * `message` - the message to send
pub async fn send_one(
&self,
message: impl message::Message,
@@ -180,6 +190,11 @@
Ok(results[0].clone())
}
+ /// Send a batch of messages
+ ///
+ /// # Arguments
+ ///
+ /// * `messages` - A vector that holds the messages to send
pub async fn send(
&self,
messages: Vec<impl message::Message>,
@@ -210,7 +225,7 @@
use crate::error::ErrorKind;
use crate::log::terminal_logger;
use crate::model::common::Route;
- use crate::model::message::MessageImpl;
+ use crate::model::message::{MessageBuilder, MessageImpl};
use crate::pb::{Broker, Code, MessageQueue, Status};
use std::sync::Arc;
@@ -226,6 +241,8 @@
#[tokio::test]
async fn producer_start() -> Result<(), ClientError> {
+ let _m = crate::client::tests::MTX.lock();
+
let ctx = Client::new_context();
ctx.expect().return_once(|_, _, _| {
let mut client = Client::default();
@@ -252,10 +269,10 @@
#[tokio::test]
async fn producer_transform_messages_to_protobuf() {
let producer = new_producer_for_test();
- let messages = vec![MessageImpl::builder()
+ let messages = vec![MessageBuilder::builder()
.set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
- .set_tags("tag")
+ .set_tag("tag")
.set_keys(vec!["key"])
.set_properties(vec![("key", "value")].into_iter().collect())
.set_message_group("message_group".to_string())
@@ -295,7 +312,7 @@
message_id: "".to_string(),
topic: "".to_string(),
body: None,
- tags: None,
+ tag: None,
keys: None,
properties: None,
message_group: None,
@@ -308,12 +325,12 @@
assert_eq!(err.message, "message topic is empty");
let messages = vec![
- MessageImpl::builder()
+ MessageBuilder::builder()
.set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap(),
- MessageImpl::builder()
+ MessageBuilder::builder()
.set_topic("DefaultCluster_dup")
.set_body("hello world".as_bytes().to_vec())
.build()
@@ -326,13 +343,13 @@
assert_eq!(err.message, "Not all messages have the same topic.");
let messages = vec![
- MessageImpl::builder()
+ MessageBuilder::builder()
.set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
.set_message_group("message_group")
.build()
.unwrap(),
- MessageImpl::builder()
+ MessageBuilder::builder()
.set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
.set_message_group("message_group_dup")
@@ -384,7 +401,7 @@
});
producer
.send_one(
- MessageImpl::builder()
+ MessageBuilder::builder()
.set_topic("test_topic")
.set_body(vec![])
.build()
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index d06c641..eefa93b 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
use std::time::Duration;
use mockall_double::double;
@@ -30,6 +31,15 @@
};
use crate::{log, pb};
+/// [`SimpleConsumer`] is a lightweight consumer to consume messages from RocketMQ proxy.
+///
+/// If you want to fully control the message consumption operation by yourself,
+/// the simple consumer should be your first consideration.
+///
+/// [`SimpleConsumer`] is a thin wrapper of internal client struct that shoulders the actual workloads.
+/// Most of its methods take shared reference so that application developers may use it at will.
+///
+/// [`SimpleConsumer`] is `Send` and `Sync` by design, so that developers may get started easily.
#[derive(Debug)]
pub struct SimpleConsumer {
option: SimpleConsumerOption,
@@ -41,6 +51,7 @@
const OPERATION_START_SIMPLE_CONSUMER: &'static str = "simple_consumer.start";
const OPERATION_RECEIVE_MESSAGE: &'static str = "simple_consumer.receive_message";
+ /// Create a new simple consumer instance
pub fn new(
option: SimpleConsumerOption,
client_option: ClientOption,
@@ -61,6 +72,7 @@
})
}
+ /// Start the simple consumer
pub async fn start(&self) -> Result<(), ClientError> {
if self.option.consumer_group().is_empty() {
return Err(ClientError::new(
@@ -83,6 +95,12 @@
Ok(())
}
+ /// receive messages from the specified topic
+ ///
+ /// # Arguments
+ ///
+ /// * `topic` - the topic for receiving messages
+ /// * `expression` - the subscription for the topic
pub async fn receive(
&self,
topic: impl AsRef<str>,
@@ -92,6 +110,14 @@
.await
}
+ /// receive messages from the specified topic with batch size and invisible duration
+ ///
+ /// # Arguments
+ ///
+ /// * `topic` - the topic for receiving messages
+ /// * `expression` - the subscription for the topic
+ /// * `batch_size` - max message num of server returned
+ /// * `invisible_duration` - set the invisible duration of messages that return from the server, these messages will not be visible to other consumers unless timeout
pub async fn receive_with_batch_size(
&self,
topic: &str,
@@ -122,6 +148,13 @@
.collect())
}
+ /// Ack the specified message
+ ///
+ /// It is important to acknowledge every consumed message, otherwise, they will be received again after the invisible duration
+ ///
+ /// # Arguments
+ ///
+ /// * `ack_entry` - special message view with handle want to ack
pub async fn ack(&self, ack_entry: impl AckMessageEntry + 'static) -> Result<(), ClientError> {
self.client.ack_message(ack_entry).await?;
Ok(())
@@ -142,6 +175,8 @@
#[tokio::test]
async fn simple_consumer_start() -> Result<(), ClientError> {
+ let _m = crate::client::tests::MTX.lock();
+
let ctx = Client::new_context();
ctx.expect().return_once(|_, _, _| {
let mut client = Client::default();