[ISSUE #563] Optimize example and doc for Rust SDK (#564)
* feat(rust): optimize for example and doc
Signed-off-by: SSpirits <admin@lv5.moe>
* feat(rust): optimize readme
Signed-off-by: SSpirits <admin@lv5.moe>
* fix(rust): fix unit test
Signed-off-by: SSpirits <admin@lv5.moe>
---------
Signed-off-by: SSpirits <admin@lv5.moe>
diff --git a/rust/README.md b/rust/README.md
index 8e5c1b7..dc80d13 100644
--- a/rust/README.md
+++ b/rust/README.md
@@ -42,5 +42,5 @@
[codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients
[crates-image]: https://img.shields.io/crates/v/rocketmq.svg
[crates-url]: https://crates.io/crates/rocketmq
-[rust-doc-image]: https://img.shields.io/crates/v/rocketmq.svg
+[rust-doc-image]: https://img.shields.io/docsrs/rocketmq
[rust-doc-url]: https://docs.rs/rocketmq
diff --git a/rust/examples/transaction_producer.rs b/rust/examples/transaction_producer.rs
index 07516eb..7423cc9 100644
--- a/rust/examples/transaction_producer.rs
+++ b/rust/examples/transaction_producer.rs
@@ -14,11 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use std::collections::HashSet;
+use std::sync::Mutex;
+
use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::model::transaction::{Transaction, TransactionResolution};
use rocketmq::Producer;
+lazy_static::lazy_static! {
+ static ref MESSAGE_ID_SET: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
+}
+
#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
@@ -30,16 +37,28 @@
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");
- // build and start producer
+ // build and start transaction producer, which has TransactionChecker
let mut producer = Producer::new_transaction_producer(
producer_option,
client_option,
Box::new(|transaction_id, message| {
- println!(
- "receive transaction check request: transaction_id: {}, message: {:?}",
- transaction_id, message
- );
- TransactionResolution::COMMIT
+ if MESSAGE_ID_SET
+ .lock()
+ .unwrap()
+ .contains(message.message_id())
+ {
+ println!(
+ "commit transaction: transaction_id: {}, message_id: {}",
+ transaction_id, message.message_id()
+ );
+ TransactionResolution::COMMIT
+ } else {
+ println!(
+ "rollback transaction due to unknown message: transaction_id: {}, message_id: {}",
+ transaction_id, message.message_id()
+ );
+ TransactionResolution::ROLLBACK
+ }
}),
)
.unwrap();
@@ -65,6 +84,14 @@
transaction.message_id(),
transaction.transaction_id()
);
+
+ MESSAGE_ID_SET
+ .lock()
+ .unwrap()
+ .insert(transaction.message_id().to_string());
+
+ // commit transaction manually
+ // delete following two lines so that RocketMQ server will check transaction status periodically
let result = transaction.commit().await;
debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 27cfb4d..57bddb8 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -31,7 +31,7 @@
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
use crate::model::message::{AckMessageEntry, MessageView};
-use crate::model::transaction::TransactionChecker;
+use crate::model::transaction::{TransactionChecker, TransactionResolution};
use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
@@ -109,6 +109,10 @@
self.telemetry_command_tx.is_some()
}
+ pub(crate) fn has_transaction_checker(&self) -> bool {
+ self.transaction_checker.is_some()
+ }
+
pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
if self.is_started() {
panic!("client {} is started, can not be modified", self.id)
@@ -116,7 +120,7 @@
self.transaction_checker = Some(transaction_checker);
}
- pub(crate) async fn start(&mut self) {
+ pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
let logger = self.logger.clone();
let session_manager = self.session_manager.clone();
@@ -127,9 +131,13 @@
// send heartbeat and handle telemetry command
let (tx, mut rx) = mpsc::channel(16);
self.telemetry_command_tx = Some(tx);
- let rpc_client = self.get_session().await.unwrap();
+ let rpc_client = self.get_session().await?;
let endpoints = self.access_endpoints.clone();
let transaction_checker = self.transaction_checker.take();
+ // give a placeholder
+ if transaction_checker.is_some() {
+ self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
+ }
tokio::spawn(async move {
rpc_client.is_started();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
@@ -181,6 +189,7 @@
}
}
});
+ Ok(())
}
async fn handle_telemetry_command<T: RPCClient + 'static>(
@@ -690,7 +699,7 @@
.returning(|_, _, _| Ok(Session::mock()));
let mut client = new_client_with_session_manager(session_manager);
- client.start().await;
+ client.start().await?;
// TODO use countdown latch instead sleeping
// wait for run
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
index 62a4c9a..2f74679 100644
--- a/rust/src/model/transaction.rs
+++ b/rust/src/model/transaction.rs
@@ -125,12 +125,18 @@
COMMIT = 1,
/// Notify server that current transaction should be roll-backed.
ROLLBACK = 2,
- /// Notify the server that the state of this transaction is not sure. You should be cautious before return unknown
+ /// Notify server that the state of this transaction is not sure. You should be cautious before return unknown
/// because the examination from the server will be performed periodically.
UNKNOWN = 0,
}
/// A closure to check the state of transaction.
+/// RocketMQ Server will call producer periodically to check the state of uncommitted transaction.
+///
+/// # Arguments
+///
+/// * transaction id
+/// * message
pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync;
#[cfg(test)]
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index fd82cdc..d5e768c 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -50,6 +50,7 @@
impl Producer {
const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
+ const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message";
/// Create a new producer instance
///
@@ -79,7 +80,7 @@
///
/// * `option` - producer option
/// * `client_option` - client option
- /// * `transaction_checker` - A closure to check the state of transaction.
+ /// * `transaction_checker` - handle server query for uncommitted transaction status
pub fn new_transaction_producer(
option: ProducerOption,
client_option: ClientOption,
@@ -103,7 +104,7 @@
/// Start the producer
pub async fn start(&mut self) -> Result<(), ClientError> {
- self.client.start().await;
+ self.client.start().await?;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
@@ -265,6 +266,13 @@
&self,
mut message: impl message::Message,
) -> Result<impl Transaction, ClientError> {
+ if !self.client.has_transaction_checker() {
+ return Err(ClientError::new(
+ ErrorKind::InvalidMessage,
+ "this producer can not send transaction message, please create a transaction producer using producer::new_transaction_producer",
+ Self::OPERATION_SEND_TRANSACTION_MESSAGE,
+ ));
+ }
let topic = message.take_topic();
let receipt = self.send(message).await?;
Ok(TransactionImpl::new(
@@ -313,7 +321,7 @@
queue: vec![],
}))
});
- client.expect_start().returning(|| ());
+ client.expect_start().returning(|| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
@@ -340,7 +348,7 @@
queue: vec![],
}))
});
- client.expect_start().returning(|| ());
+ client.expect_start().returning(|| Ok(()));
client.expect_set_transaction_checker().returning(|_| ());
client
.expect_client_id()
@@ -543,6 +551,10 @@
.client
.expect_get_session()
.return_once(|| Ok(Session::mock()));
+ producer
+ .client
+ .expect_has_transaction_checker()
+ .return_once(|| true);
let _ = producer
.send_transaction_message(
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index 48e0ce9..d4e222e 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -90,7 +90,7 @@
Self::OPERATION_START_SIMPLE_CONSUMER,
));
}
- self.client.start().await;
+ self.client.start().await?;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
@@ -198,7 +198,7 @@
queue: vec![],
}))
});
- client.expect_start().returning(|| ());
+ client.expect_start().returning(|| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());