[ISSUE #555] Support transaction message for Rust SDK (#556)
* feat(rust): implement transaction checker
Signed-off-by: SSpirits <admin@lv5.moe>
* feat(rust): implement transaction producer
Signed-off-by: SSpirits <admin@lv5.moe>
* feat(rust): update readme
Signed-off-by: SSpirits <admin@lv5.moe>
* feat(rust): commit transaction in example
Signed-off-by: SSpirits <admin@lv5.moe>
---------
Signed-off-by: SSpirits <admin@lv5.moe>
diff --git a/README-CN.md b/README-CN.md
index a4716a6..266a2f6 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -22,7 +22,7 @@
| Producer with standard messages | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
-| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 | 🚧 |
+| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
| Push consumer with concurrent message listener | ✅ | ✅ | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
diff --git a/README.md b/README.md
index 13bd8bd..916e688 100644
--- a/README.md
+++ b/README.md
@@ -22,7 +22,7 @@
| Producer with standard messages | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
-| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 | 🚧 |
+| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | 🚧 | 🚧 |
| Push consumer with concurrent message listener | ✅ | ✅ | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
diff --git a/rust/.cargo/Cargo.lock.min b/rust/.cargo/Cargo.lock.min
index 3b2ccb8..cb6ecc2 100644
--- a/rust/.cargo/Cargo.lock.min
+++ b/rust/.cargo/Cargo.lock.min
@@ -1266,7 +1266,7 @@
[[package]]
name = "rocketmq"
-version = "0.1.1"
+version = "0.1.2"
dependencies = [
"anyhow",
"async-trait",
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index d7655d1..911b9ed 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -16,7 +16,7 @@
#
[package]
name = "rocketmq"
-version = "0.1.1"
+version = "0.1.2"
edition = "2021"
rust-version = "1.61"
authors = [
diff --git a/rust/examples/delay_producer.rs b/rust/examples/delay_producer.rs
new file mode 100644
index 0000000..3e237c3
--- /dev/null
+++ b/rust/examples/delay_producer.rs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::ops::Add;
+use std::time::{Duration, SystemTime};
+
+use rocketmq::conf::{ClientOption, ProducerOption};
+use rocketmq::model::message::MessageBuilder;
+use rocketmq::Producer;
+
+#[tokio::main]
+async fn main() {
+ // recommend to specify which topic(s) you would like to send message to
+ // producer will prefetch topic route when start and failed fast if topic not exist
+ let mut producer_option = ProducerOption::default();
+ producer_option.set_topics(vec!["delay_test"]);
+
+ // set which rocketmq proxy to connect
+ let mut client_option = ClientOption::default();
+ client_option.set_access_url("localhost:8081");
+
+ // build and start producer
+ let mut producer = Producer::new(producer_option, client_option).unwrap();
+ producer.start().await.unwrap();
+
+ // build message
+ let message = MessageBuilder::delay_message_builder(
+ "delay_test",
+ "hello world".as_bytes().to_vec(),
+ // deliver in 15 seconds
+ SystemTime::now()
+ .add(Duration::from_secs(15))
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_secs() as i64,
+ )
+ .build()
+ .unwrap();
+
+ // send message to rocketmq proxy
+ let result = producer.send(message).await;
+ debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ println!(
+ "send message success, message_id={}",
+ result.unwrap().message_id()
+ );
+}
diff --git a/rust/examples/fifo_producer.rs b/rust/examples/fifo_producer.rs
new file mode 100644
index 0000000..38562e2
--- /dev/null
+++ b/rust/examples/fifo_producer.rs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use rocketmq::conf::{ClientOption, ProducerOption};
+use rocketmq::model::message::MessageBuilder;
+use rocketmq::Producer;
+
+#[tokio::main]
+async fn main() {
+ // recommend to specify which topic(s) you would like to send message to
+ // producer will prefetch topic route when start and failed fast if topic not exist
+ let mut producer_option = ProducerOption::default();
+ producer_option.set_topics(vec!["fifo_test"]);
+
+ // set which rocketmq proxy to connect
+ let mut client_option = ClientOption::default();
+ client_option.set_access_url("localhost:8081");
+
+ // build and start producer
+ let mut producer = Producer::new(producer_option, client_option).unwrap();
+ producer.start().await.unwrap();
+
+ // build message
+ let message = MessageBuilder::fifo_message_builder(
+ "fifo_test",
+ "hello world".as_bytes().to_vec(),
+ // message partition
+ "message_group",
+ )
+ .build()
+ .unwrap();
+
+ // send message to rocketmq proxy
+ let result = producer.send(message).await;
+ debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ println!(
+ "send message success, message_id={}",
+ result.unwrap().message_id()
+ );
+}
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index acacabb..335b31f 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -30,7 +30,7 @@
client_option.set_access_url("localhost:8081");
// build and start producer
- let producer = Producer::new(producer_option, client_option).unwrap();
+ let mut producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();
// build message
@@ -42,7 +42,7 @@
.unwrap();
// send message to rocketmq proxy
- let result = producer.send_one(message).await;
+ let result = producer.send(message).await;
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
println!(
"send message success, message_id={}",
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index 33aae9d..a5f9408 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -32,7 +32,7 @@
client_option.set_enable_tls(false);
// build and start simple consumer
- let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
+ let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
consumer.start().await.unwrap();
loop {
diff --git a/rust/examples/transaction_producer.rs b/rust/examples/transaction_producer.rs
new file mode 100644
index 0000000..07516eb
--- /dev/null
+++ b/rust/examples/transaction_producer.rs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use rocketmq::conf::{ClientOption, ProducerOption};
+use rocketmq::model::message::MessageBuilder;
+use rocketmq::model::transaction::{Transaction, TransactionResolution};
+use rocketmq::Producer;
+
+#[tokio::main]
+async fn main() {
+ // recommend to specify which topic(s) you would like to send message to
+ // producer will prefetch topic route when start and failed fast if topic not exist
+ let mut producer_option = ProducerOption::default();
+ producer_option.set_topics(vec!["transaction_test"]);
+
+ // set which rocketmq proxy to connect
+ let mut client_option = ClientOption::default();
+ client_option.set_access_url("localhost:8081");
+
+ // build and start producer
+ let mut producer = Producer::new_transaction_producer(
+ producer_option,
+ client_option,
+ Box::new(|transaction_id, message| {
+ println!(
+ "receive transaction check request: transaction_id: {}, message: {:?}",
+ transaction_id, message
+ );
+ TransactionResolution::COMMIT
+ }),
+ )
+ .unwrap();
+ producer.start().await.unwrap();
+
+ // build message
+ let message = MessageBuilder::transaction_message_builder(
+ "transaction_test",
+ "hello world".as_bytes().to_vec(),
+ )
+ .build()
+ .unwrap();
+
+ // send message to rocketmq proxy
+ let result = producer.send_transaction_message(message).await;
+ if let Err(error) = result {
+ eprintln!("send message failed: {:?}", error);
+ return;
+ }
+ let transaction = result.unwrap();
+ println!(
+ "send message success, message_id={}, transaction_id={}",
+ transaction.message_id(),
+ transaction.transaction_id()
+ );
+ let result = transaction.commit().await;
+ debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
+}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 4b601fa..27cfb4d 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
use std::clone::Clone;
+use std::fmt::Debug;
use std::string::ToString;
use std::{collections::HashMap, sync::atomic::AtomicUsize, sync::Arc};
@@ -24,24 +25,26 @@
use prost_types::Duration;
use slog::{debug, error, info, o, warn, Logger};
use tokio::select;
-use tokio::sync::oneshot;
+use tokio::sync::{mpsc, oneshot};
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
-use crate::model::message::AckMessageEntry;
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::model::transaction::TransactionChecker;
use crate::pb;
use crate::pb::receive_message_response::Content;
+use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
use crate::pb::{
- AckMessageRequest, AckMessageResultEntry, Code, FilterExpression, HeartbeatRequest,
- HeartbeatResponse, Message, MessageQueue, QueryRouteRequest, ReceiveMessageRequest, Resource,
- SendMessageRequest, Status, TelemetryCommand,
+ AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression,
+ HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, QueryRouteRequest,
+ ReceiveMessageRequest, Resource, SendMessageRequest, Status, TelemetryCommand,
+ TransactionSource,
};
#[double]
use crate::session::SessionManager;
use crate::session::{RPCClient, Session};
-#[derive(Debug)]
pub(crate) struct Client {
logger: Logger,
option: ClientOption,
@@ -50,6 +53,8 @@
id: String,
access_endpoints: Endpoints,
settings: TelemetryCommand,
+ transaction_checker: Option<Box<TransactionChecker>>,
+ telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
}
lazy_static::lazy_static! {
@@ -57,11 +62,24 @@
}
const OPERATION_CLIENT_NEW: &str = "client.new";
+const OPERATION_GET_SESSION: &str = "client.get_session";
const OPERATION_QUERY_ROUTE: &str = "client.query_route";
const OPERATION_HEARTBEAT: &str = "client.heartbeat";
const OPERATION_SEND_MESSAGE: &str = "client.send_message";
const OPERATION_RECEIVE_MESSAGE: &str = "client.receive_message";
const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
+const OPERATION_END_TRANSACTION: &str = "client.end_transaction";
+const OPERATION_HANDLE_TELEMETRY_COMMAND: &str = "client.handle_telemetry_command";
+
+impl Debug for Client {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Client")
+ .field("id", &self.id)
+ .field("access_endpoints", &self.access_endpoints)
+ .field("option", &self.option)
+ .finish()
+ }
+}
#[automock]
impl Client {
@@ -82,10 +100,23 @@
id,
access_endpoints: endpoints,
settings,
+ transaction_checker: None,
+ telemetry_command_tx: None,
})
}
- pub(crate) fn start(&self) {
+ pub(crate) fn is_started(&self) -> bool {
+ self.telemetry_command_tx.is_some()
+ }
+
+ pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
+ if self.is_started() {
+ panic!("client {} is started, can not be modified", self.id)
+ }
+ self.transaction_checker = Some(transaction_checker);
+ }
+
+ pub(crate) async fn start(&mut self) {
let logger = self.logger.clone();
let session_manager = self.session_manager.clone();
@@ -93,7 +124,14 @@
let namespace = self.option.namespace.to_string();
let client_type = self.option.client_type.clone();
+ // send heartbeat and handle telemetry command
+ let (tx, mut rx) = mpsc::channel(16);
+ self.telemetry_command_tx = Some(tx);
+ let rpc_client = self.get_session().await.unwrap();
+ let endpoints = self.access_endpoints.clone();
+ let transaction_checker = self.transaction_checker.take();
tokio::spawn(async move {
+ rpc_client.is_started();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
select! {
@@ -132,11 +170,70 @@
debug!(logger,"send heartbeat to server success, peer={}",peer);
}
},
+ command = rx.recv() => {
+ if let Some(command) = command {
+ let result = Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker, endpoints.clone(), command).await;
+ if let Err(error) = result {
+ error!(logger, "handle telemetry command failed: {:?}", error)
+ }
+ }
+ },
}
}
});
}
+ async fn handle_telemetry_command<T: RPCClient + 'static>(
+ mut rpc_client: T,
+ transaction_checker: &Option<Box<TransactionChecker>>,
+ endpoints: Endpoints,
+ command: pb::telemetry_command::Command,
+ ) -> Result<(), ClientError> {
+ return match command {
+ RecoverOrphanedTransactionCommand(command) => {
+ let transaction_id = command.transaction_id;
+ let message = command.message.unwrap();
+ let message_id = message
+ .system_properties
+ .as_ref()
+ .unwrap()
+ .message_id
+ .clone();
+ let topic = message.topic.as_ref().unwrap().clone();
+ if let Some(transaction_checker) = transaction_checker {
+ let resolution = transaction_checker(
+ transaction_id.clone(),
+ MessageView::from_pb_message(message, endpoints),
+ );
+
+ let response = rpc_client
+ .end_transaction(EndTransactionRequest {
+ topic: Some(topic),
+ message_id: message_id.to_string(),
+ transaction_id,
+ resolution: resolution as i32,
+ source: TransactionSource::SourceServerCheck as i32,
+ trace_context: "".to_string(),
+ })
+ .await?;
+ Self::handle_response_status(response.status, OPERATION_END_TRANSACTION)
+ } else {
+ Err(ClientError::new(
+ ErrorKind::Config,
+ "failed to get transaction checker",
+ OPERATION_END_TRANSACTION,
+ ))
+ }
+ }
+ _ => Err(ClientError::new(
+ ErrorKind::Config,
+ "receive telemetry command but there is no handler",
+ OPERATION_HANDLE_TELEMETRY_COMMAND,
+ )
+ .with_context("command", format!("{:?}", command))),
+ };
+ }
+
#[allow(dead_code)]
pub(crate) fn client_id(&self) -> &str {
&self.id
@@ -161,10 +258,22 @@
)
}
- async fn get_session(&self) -> Result<Session, ClientError> {
+ pub(crate) async fn get_session(&self) -> Result<Session, ClientError> {
+ if !self.is_started() {
+ return Err(ClientError::new(
+ ErrorKind::ClientIsNotRunning,
+ "client is not started",
+ OPERATION_GET_SESSION,
+ )
+ .with_context("client_id", self.id.clone()));
+ }
let session = self
.session_manager
- .get_or_create_session(&self.access_endpoints, self.settings.clone())
+ .get_or_create_session(
+ &self.access_endpoints,
+ self.settings.clone(),
+ self.telemetry_command_tx.clone().unwrap(),
+ )
.await?;
Ok(session)
}
@@ -175,12 +284,16 @@
) -> Result<Session, ClientError> {
let session = self
.session_manager
- .get_or_create_session(endpoints, self.settings.clone())
+ .get_or_create_session(
+ endpoints,
+ self.settings.clone(),
+ self.telemetry_command_tx.clone().unwrap(),
+ )
.await?;
Ok(session)
}
- fn handle_response_status(
+ pub(crate) fn handle_response_status(
status: Option<Status>,
operation: &'static str,
) -> Result<(), ClientError> {
@@ -489,22 +602,24 @@
#[cfg(test)]
pub(crate) mod tests {
- use lazy_static::lazy_static;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
+ use lazy_static::lazy_static;
+
use crate::client::Client;
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
use crate::log::terminal_logger;
use crate::model::common::{ClientType, Route};
+ use crate::model::transaction::TransactionResolution;
use crate::pb::receive_message_response::Content;
use crate::pb::{
- AckMessageEntry, AckMessageResponse, Code, FilterExpression, HeartbeatResponse, Message,
- MessageQueue, QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse,
- Status, TelemetryCommand,
+ AckMessageEntry, AckMessageResponse, Code, EndTransactionResponse, FilterExpression,
+ HeartbeatResponse, Message, MessageQueue, QueryRouteResponse, ReceiveMessageResponse,
+ Resource, SendMessageResponse, Status, SystemProperties, TelemetryCommand,
};
use crate::session;
@@ -524,10 +639,13 @@
id: Client::generate_client_id(),
access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
settings: TelemetryCommand::default(),
+ transaction_checker: None,
+ telemetry_command_tx: None,
}
}
fn new_client_with_session_manager(session_manager: SessionManager) -> Client {
+ let (tx, _) = mpsc::channel(16);
Client {
logger: terminal_logger(),
option: ClientOption::default(),
@@ -536,6 +654,8 @@
id: Client::generate_client_id(),
access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
settings: TelemetryCommand::default(),
+ transaction_checker: None,
+ telemetry_command_tx: Some(tx),
}
}
@@ -565,9 +685,12 @@
session_manager
.expect_get_all_sessions()
.returning(|| Ok(vec![]));
+ session_manager
+ .expect_get_or_create_session()
+ .returning(|_, _, _| Ok(Session::mock()));
- let client = new_client_with_session_manager(session_manager);
- client.start();
+ let mut client = new_client_with_session_manager(session_manager);
+ client.start().await;
// TODO use countdown latch instead sleeping
// wait for run
@@ -580,7 +703,7 @@
let mut session_manager = SessionManager::default();
session_manager
.expect_get_or_create_session()
- .returning(|_, _| Ok(Session::mock()));
+ .returning(|_, _, _| Ok(Session::mock()));
let client = new_client_with_session_manager(session_manager);
let result = client.get_session().await;
@@ -887,4 +1010,33 @@
assert_eq!(error.message, "server return an error");
assert_eq!(error.operation, "client.ack_message");
}
+
+ #[tokio::test]
+ async fn client_handle_telemetry_command() {
+ let response = Ok(EndTransactionResponse {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "".to_string(),
+ }),
+ });
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_end_transaction()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+ let result = Client::handle_telemetry_command(
+ mock,
+ &Some(Box::new(|_, _| TransactionResolution::COMMIT)),
+ Endpoints::from_url("localhopst:8081").unwrap(),
+ RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand {
+ message: Some(Message {
+ topic: Some(Resource::default()),
+ user_properties: Default::default(),
+ system_properties: Some(SystemProperties::default()),
+ body: vec![],
+ }),
+ transaction_id: "".to_string(),
+ }),
+ )
+ .await;
+ assert!(result.is_ok())
+ }
}
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 95b7adc..ea27d27 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -46,7 +46,7 @@
group: "".to_string(),
namespace: "".to_string(),
access_url: "localhost:8081".to_string(),
- enable_tls: true,
+ enable_tls: false,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
access_key: None,
@@ -266,7 +266,7 @@
fn conf_client_option() {
let option = ClientOption::default();
assert_eq!(option.access_url(), "localhost:8081");
- assert!(option.enable_tls());
+ assert!(!option.enable_tls());
assert_eq!(option.timeout(), &Duration::from_secs(3));
assert_eq!(option.long_polling_timeout(), &Duration::from_secs(40));
}
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index bd646e7..875b359 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -48,7 +48,7 @@
//! client_option.set_access_url("localhost:8081");
//!
//! // build and start producer
-//! let producer = Producer::new(producer_option, client_option).unwrap();
+//! let mut producer = Producer::new(producer_option, client_option).unwrap();
//! producer.start().await.unwrap();
//!
//! // build message
@@ -60,7 +60,7 @@
//! .unwrap();
//!
//! // send message to rocketmq proxy
-//! let result = producer.send_one(message).await;
+//! let result = producer.send(message).await;
//! debug_assert!(result.is_ok(), "send message failed: {:?}", result);
//! println!(
//! "send message success, message_id={}",
@@ -88,7 +88,7 @@
//! client_option.set_access_url("localhost:8081");
//!
//! // build and start simple consumer
-//! let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
+//! let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
//! consumer.start().await.unwrap();
//!
//! // pop message from rocketmq proxy
@@ -119,6 +119,10 @@
//! ```
//!
+// Export structs that are part of crate API.
+pub use producer::Producer;
+pub use simple_consumer::SimpleConsumer;
+
#[allow(dead_code)]
pub mod conf;
pub mod error;
@@ -138,7 +142,3 @@
mod producer;
mod simple_consumer;
-
-// Export structs that are part of crate API.
-pub use producer::Producer;
-pub use simple_consumer::SimpleConsumer;
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 79a0250..a2e9405 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -17,11 +17,12 @@
//! Message data model of RocketMQ rust client.
+use std::collections::HashMap;
+
use crate::error::{ClientError, ErrorKind};
use crate::model::common::Endpoints;
use crate::model::message_id::UNIQ_ID_GENERATOR;
use crate::pb;
-use std::collections::HashMap;
/// [`Message`] is the data model for sending.
pub trait Message {
@@ -33,6 +34,7 @@
fn take_properties(&mut self) -> HashMap<String, String>;
fn take_message_group(&mut self) -> Option<String>;
fn take_delivery_timestamp(&mut self) -> Option<i64>;
+ fn transaction_enabled(&mut self) -> bool;
}
pub(crate) struct MessageImpl {
@@ -44,6 +46,7 @@
pub(crate) properties: Option<HashMap<String, String>>,
pub(crate) message_group: Option<String>,
pub(crate) delivery_timestamp: Option<i64>,
+ pub(crate) transaction_enabled: bool,
}
impl Message for MessageImpl {
@@ -78,6 +81,10 @@
fn take_delivery_timestamp(&mut self) -> Option<i64> {
self.delivery_timestamp.take()
}
+
+ fn transaction_enabled(&mut self) -> bool {
+ self.transaction_enabled
+ }
}
/// [`MessageBuilder`] is the builder for [`Message`].
@@ -100,6 +107,7 @@
properties: None,
message_group: None,
delivery_timestamp: None,
+ transaction_enabled: false,
},
}
}
@@ -126,6 +134,7 @@
properties: None,
message_group: Some(message_group.into()),
delivery_timestamp: None,
+ transaction_enabled: false,
},
}
}
@@ -152,6 +161,29 @@
properties: None,
message_group: None,
delivery_timestamp: Some(delay_time),
+ transaction_enabled: false,
+ },
+ }
+ }
+
+ /// Create a new [`MessageBuilder`] for building a transaction message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/04transactionmessage)
+ ///
+ /// # Arguments
+ ///
+ /// * `topic` - topic of the message
+ /// * `body` - message body
+ pub fn transaction_message_builder(topic: impl Into<String>, body: Vec<u8>) -> MessageBuilder {
+ MessageBuilder {
+ message: MessageImpl {
+ message_id: UNIQ_ID_GENERATOR.lock().next_id(),
+ topic: topic.into(),
+ body: Some(body),
+ tag: None,
+ keys: None,
+ properties: None,
+ message_group: None,
+ delivery_timestamp: None,
+ transaction_enabled: true,
},
}
}
@@ -210,6 +242,14 @@
self
}
+ /// Mark this message as the beginning transaction, which is required for the transaction message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/04transactionmessage)
+ ///
+ /// The transaction message could not have message group and delivery timestamp
+ pub fn enable_transaction(mut self) -> Self {
+ self.message.transaction_enabled = true;
+ self
+ }
+
fn check_message(&self) -> Result<(), String> {
if self.message.topic.is_empty() {
return Err("Topic is empty.".to_string());
@@ -222,6 +262,14 @@
"message_group and delivery_timestamp can not be set at the same time.".to_string(),
);
}
+ if self.message.transaction_enabled
+ && (self.message.message_group.is_some() || self.message.delivery_timestamp.is_some())
+ {
+ return Err(
+ "message_group and delivery_timestamp can not be set for transaction message."
+ .to_string(),
+ );
+ }
Ok(())
}
@@ -249,6 +297,7 @@
pub struct MessageView {
pub(crate) message_id: String,
pub(crate) receipt_handle: Option<String>,
+ pub(crate) namespace: String,
pub(crate) topic: String,
pub(crate) body: Vec<u8>,
pub(crate) tag: Option<String>,
@@ -283,10 +332,12 @@
impl MessageView {
pub(crate) fn from_pb_message(message: pb::Message, endpoints: Endpoints) -> Self {
let system_properties = message.system_properties.unwrap();
+ let topic = message.topic.unwrap();
MessageView {
message_id: system_properties.message_id,
receipt_handle: system_properties.receipt_handle,
- topic: message.topic.unwrap().name,
+ namespace: topic.resource_namespace,
+ topic: topic.name,
body: message.body,
tag: system_properties.tag,
keys: system_properties.keys,
@@ -305,7 +356,12 @@
&self.message_id
}
- /// Get topic of message
+ /// Get topic namespace of message
+ pub fn namespace(&self) -> &str {
+ &self.namespace
+ }
+
+ /// Get topic name of message
pub fn topic(&self) -> &str {
&self.topic
}
@@ -410,6 +466,10 @@
MessageBuilder::delay_message_builder("test", vec![1, 2, 3], 123456789).build();
let mut message = message.unwrap();
assert_eq!(message.take_delivery_timestamp(), Some(123456789));
+
+ let message = MessageBuilder::transaction_message_builder("test", vec![1, 2, 3]).build();
+ let mut message = message.unwrap();
+ assert!(message.transaction_enabled());
}
#[test]
diff --git a/rust/src/model/mod.rs b/rust/src/model/mod.rs
index 8ac95e2..ca9a13c 100644
--- a/rust/src/model/mod.rs
+++ b/rust/src/model/mod.rs
@@ -20,3 +20,4 @@
pub mod common;
pub mod message;
pub(crate) mod message_id;
+pub mod transaction;
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
new file mode 100644
index 0000000..62a4c9a
--- /dev/null
+++ b/rust/src/model/transaction.rs
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//! Transaction data model of RocketMQ rust client.
+
+use std::fmt::{Debug, Formatter};
+
+use async_trait::async_trait;
+
+use crate::client::Client;
+use crate::error::ClientError;
+use crate::model::common::SendReceipt;
+use crate::model::message::MessageView;
+use crate::pb::{EndTransactionRequest, Resource, TransactionSource};
+use crate::session::RPCClient;
+
+/// An entity to describe an independent transaction.
+///
+/// Once the request of commit of roll-back reached server, subsequently arrived commit or roll-back request in
+/// [`Transaction`] would be ignored by the server.
+///
+/// If transaction is not commit/roll-back in time, it is suspended until it is solved by [`TransactionChecker`]
+/// or reach the end of life.
+#[async_trait]
+pub trait Transaction {
+ /// Try to commit the transaction, which would expose the message before the transaction is closed if no exception thrown.
+ /// What you should pay more attention to is that the commitment may be successful even exception is thrown.
+ async fn commit(self) -> Result<(), ClientError>;
+
+ /// Try to roll back the transaction, which would expose the message before the transaction is closed if no exception thrown.
+ /// What you should pay more attention to is that the roll-back may be successful even exception is thrown.
+ async fn rollback(self) -> Result<(), ClientError>;
+
+ /// Get message id
+ fn message_id(&self) -> &str;
+
+ /// Get transaction id
+ fn transaction_id(&self) -> &str;
+}
+
+pub(crate) struct TransactionImpl {
+ rpc_client: Box<dyn RPCClient + Send + Sync>,
+ topic: Resource,
+ send_receipt: SendReceipt,
+}
+
+impl Debug for TransactionImpl {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TransactionImpl")
+ .field("transaction_id", &self.send_receipt.transaction_id())
+ .field("message_id", &self.send_receipt.message_id())
+ .finish()
+ }
+}
+
+impl TransactionImpl {
+ pub(crate) fn new(
+ rpc_client: Box<dyn RPCClient + Send + Sync>,
+ topic: Resource,
+ send_receipt: SendReceipt,
+ ) -> TransactionImpl {
+ TransactionImpl {
+ rpc_client,
+ topic,
+ send_receipt,
+ }
+ }
+
+ async fn end_transaction(
+ mut self,
+ resolution: TransactionResolution,
+ ) -> Result<(), ClientError> {
+ let response = self
+ .rpc_client
+ .end_transaction(EndTransactionRequest {
+ topic: Some(self.topic),
+ message_id: self.send_receipt.message_id().to_string(),
+ transaction_id: self.send_receipt.transaction_id().to_string(),
+ resolution: resolution as i32,
+ source: TransactionSource::SourceClient as i32,
+ trace_context: "".to_string(),
+ })
+ .await?;
+ Client::handle_response_status(response.status, "end transaction")
+ }
+}
+
+#[async_trait]
+impl Transaction for TransactionImpl {
+ async fn commit(mut self) -> Result<(), ClientError> {
+ return self.end_transaction(TransactionResolution::COMMIT).await;
+ }
+
+ async fn rollback(mut self) -> Result<(), ClientError> {
+ return self.end_transaction(TransactionResolution::ROLLBACK).await;
+ }
+
+ fn message_id(&self) -> &str {
+ self.send_receipt.message_id()
+ }
+
+ fn transaction_id(&self) -> &str {
+ self.send_receipt.transaction_id()
+ }
+}
+
+/// Resolution of Transaction.
+#[repr(i32)]
+pub enum TransactionResolution {
+ /// Notify server that current transaction should be committed.
+ COMMIT = 1,
+ /// Notify server that current transaction should be roll-backed.
+ ROLLBACK = 2,
+ /// Notify the server that the state of this transaction is not sure. You should be cautious before return unknown
+ /// because the examination from the server will be performed periodically.
+ UNKNOWN = 0,
+}
+
+/// A closure to check the state of transaction.
+pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync;
+
+#[cfg(test)]
+mod tests {
+ use crate::error::ClientError;
+ use crate::model::common::SendReceipt;
+ use crate::model::transaction::{Transaction, TransactionImpl};
+ use crate::pb::{Code, EndTransactionResponse, Resource, SendResultEntry, Status};
+ use crate::session;
+
+ #[tokio::test]
+ async fn transaction_commit() -> Result<(), ClientError> {
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_end_transaction().return_once(|_| {
+ Box::pin(futures::future::ready(Ok(EndTransactionResponse {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "".to_string(),
+ }),
+ })))
+ });
+ let transaction = TransactionImpl::new(
+ Box::new(mock),
+ Resource {
+ resource_namespace: "".to_string(),
+ name: "".to_string(),
+ },
+ SendReceipt::from_pb_send_result(&SendResultEntry {
+ status: None,
+ message_id: "".to_string(),
+ transaction_id: "".to_string(),
+ offset: 0,
+ }),
+ );
+ transaction.commit().await
+ }
+
+ #[tokio::test]
+ async fn transaction_rollback() -> Result<(), ClientError> {
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_end_transaction().return_once(|_| {
+ Box::pin(futures::future::ready(Ok(EndTransactionResponse {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "".to_string(),
+ }),
+ })))
+ });
+ let transaction = TransactionImpl::new(
+ Box::new(mock),
+ Resource {
+ resource_namespace: "".to_string(),
+ name: "".to_string(),
+ },
+ SendReceipt::from_pb_send_result(&SendResultEntry {
+ status: None,
+ message_id: "".to_string(),
+ transaction_id: "".to_string(),
+ offset: 0,
+ }),
+ );
+ transaction.rollback().await
+ }
+}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index be16c2c..fd82cdc 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -27,7 +27,8 @@
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, SendReceipt};
use crate::model::message;
-use crate::pb::{Encoding, Resource, SystemProperties};
+use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl};
+use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
use crate::util::{
build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
select_message_queue_by_message_group, HOST_NAME,
@@ -72,14 +73,42 @@
})
}
+ /// Create a new transaction producer instance
+ ///
+ /// # Arguments
+ ///
+ /// * `option` - producer option
+ /// * `client_option` - client option
+ /// * `transaction_checker` - A closure to check the state of transaction.
+ pub fn new_transaction_producer(
+ option: ProducerOption,
+ client_option: ClientOption,
+ transaction_checker: Box<TransactionChecker>,
+ ) -> Result<Self, ClientError> {
+ let client_option = ClientOption {
+ client_type: ClientType::Producer,
+ namespace: option.namespace().to_string(),
+ ..client_option
+ };
+ let logger = log::logger(option.logging_format());
+ let settings = build_producer_settings(&option, &client_option);
+ let mut client = Client::new(&logger, client_option, settings)?;
+ client.set_transaction_checker(transaction_checker);
+ Ok(Producer {
+ option,
+ logger,
+ client,
+ })
+ }
+
/// Start the producer
- pub async fn start(&self) -> Result<(), ClientError> {
+ pub async fn start(&mut self) -> Result<(), ClientError> {
+ self.client.start().await;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
}
}
- self.client.start();
info!(
self.logger,
"start producer success, client_id: {}",
@@ -125,7 +154,7 @@
last_topic = Some(message.take_topic());
}
- let message_group = message.take_message_group();
+ let mut message_group = message.take_message_group();
if let Some(last_message_group) = last_message_group.clone() {
if last_message_group.ne(&message_group) {
return Err(ClientError::new(
@@ -138,10 +167,24 @@
last_message_group = Some(message_group.clone());
}
- let delivery_timestamp = message
+ let mut delivery_timestamp = message
.take_delivery_timestamp()
.map(|seconds| Timestamp { seconds, nanos: 0 });
+ let message_type = if message.transaction_enabled() {
+ message_group = None;
+ delivery_timestamp = None;
+ MessageType::Transaction as i32
+ } else if delivery_timestamp.is_some() {
+ message_group = None;
+ MessageType::Delay as i32
+ } else if message_group.is_some() {
+ delivery_timestamp = None;
+ MessageType::Fifo as i32
+ } else {
+ MessageType::Normal as i32
+ };
+
let pb_message = pb::Message {
topic: Some(Resource {
name: message.take_topic(),
@@ -154,6 +197,7 @@
message_id: message.take_message_id(),
message_group,
delivery_timestamp,
+ message_type,
born_host: HOST_NAME.clone(),
born_timestamp: born_timestamp.clone(),
body_digest: None,
@@ -182,11 +226,8 @@
/// # Arguments
///
/// * `message` - the message to send
- pub async fn send_one(
- &self,
- message: impl message::Message,
- ) -> Result<SendReceipt, ClientError> {
- let results = self.send(vec![message]).await?;
+ pub async fn send(&self, message: impl message::Message) -> Result<SendReceipt, ClientError> {
+ let results = self.batch_send(vec![message]).await?;
Ok(results[0].clone())
}
@@ -195,7 +236,7 @@
/// # Arguments
///
/// * `messages` - A vector that holds the messages to send
- pub async fn send(
+ pub async fn batch_send(
&self,
messages: Vec<impl message::Message>,
) -> Result<Vec<SendReceipt>, ClientError> {
@@ -218,6 +259,23 @@
self.client.send_message(&endpoints, pb_messages).await
}
+
+ /// Send message in a transaction
+ pub async fn send_transaction_message(
+ &self,
+ mut message: impl message::Message,
+ ) -> Result<impl Transaction, ClientError> {
+ let topic = message.take_topic();
+ let receipt = self.send(message).await?;
+ Ok(TransactionImpl::new(
+ Box::new(self.client.get_session().await.unwrap()),
+ Resource {
+ resource_namespace: self.option.namespace().to_string(),
+ name: topic,
+ },
+ receipt,
+ ))
+ }
}
#[cfg(test)]
@@ -228,7 +286,9 @@
use crate::log::terminal_logger;
use crate::model::common::Route;
use crate::model::message::{MessageBuilder, MessageImpl};
+ use crate::model::transaction::TransactionResolution;
use crate::pb::{Broker, MessageQueue};
+ use crate::session::Session;
use super::*;
@@ -268,6 +328,38 @@
}
#[tokio::test]
+ async fn transaction_producer_start() -> Result<(), ClientError> {
+ let _m = crate::client::tests::MTX.lock();
+
+ let ctx = Client::new_context();
+ ctx.expect().return_once(|_, _, _| {
+ let mut client = Client::default();
+ client.expect_topic_route().returning(|_, _| {
+ Ok(Arc::new(Route {
+ index: Default::default(),
+ queue: vec![],
+ }))
+ });
+ client.expect_start().returning(|| ());
+ client.expect_set_transaction_checker().returning(|_| ());
+ client
+ .expect_client_id()
+ .return_const("fake_id".to_string());
+ Ok(client)
+ });
+ let mut producer_option = ProducerOption::default();
+ producer_option.set_topics(vec!["DefaultCluster".to_string()]);
+ Producer::new_transaction_producer(
+ producer_option,
+ ClientOption::default(),
+ Box::new(|_, _| TransactionResolution::COMMIT),
+ )?
+ .start()
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
async fn producer_transform_messages_to_protobuf() {
let producer = new_producer_for_test();
let messages = vec![MessageBuilder::builder()
@@ -318,6 +410,7 @@
properties: None,
message_group: None,
delivery_timestamp: None,
+ transaction_enabled: false,
}];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_err());
@@ -365,7 +458,7 @@
}
#[tokio::test]
- async fn producer_send_one() -> Result<(), ClientError> {
+ async fn producer_send() -> Result<(), ClientError> {
let mut producer = new_producer_for_test();
producer.client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
@@ -399,7 +492,7 @@
)])
});
let result = producer
- .send_one(
+ .send(
MessageBuilder::builder()
.set_topic("test_topic")
.set_body(vec![])
@@ -411,4 +504,55 @@
assert_eq!(result.transaction_id(), "transaction_id");
Ok(())
}
+
+ #[tokio::test]
+ async fn producer_send_transaction_message() -> Result<(), ClientError> {
+ let mut producer = new_producer_for_test();
+ producer.client.expect_topic_route().returning(|_, _| {
+ Ok(Arc::new(Route {
+ index: Default::default(),
+ queue: vec![MessageQueue {
+ topic: Some(Resource {
+ name: "test_topic".to_string(),
+ resource_namespace: "".to_string(),
+ }),
+ id: 0,
+ permission: 0,
+ broker: Some(Broker {
+ name: "".to_string(),
+ id: 0,
+ endpoints: Some(pb::Endpoints {
+ scheme: 0,
+ addresses: vec![],
+ }),
+ }),
+ accept_message_types: vec![],
+ }],
+ }))
+ });
+ producer.client.expect_send_message().returning(|_, _| {
+ Ok(vec![SendReceipt::from_pb_send_result(
+ &pb::SendResultEntry {
+ message_id: "message_id".to_string(),
+ transaction_id: "transaction_id".to_string(),
+ ..pb::SendResultEntry::default()
+ },
+ )])
+ });
+ producer
+ .client
+ .expect_get_session()
+ .return_once(|| Ok(Session::mock()));
+
+ let _ = producer
+ .send_transaction_message(
+ MessageBuilder::builder()
+ .set_topic("test_topic")
+ .set_body(vec![])
+ .build()
+ .unwrap(),
+ )
+ .await?;
+ Ok(())
+ }
}
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 229d9e5..5762d66 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -31,26 +31,29 @@
use crate::conf::ClientOption;
use crate::error::ErrorKind;
use crate::model::common::Endpoints;
+use crate::pb::telemetry_command::Command;
use crate::pb::{
- AckMessageRequest, AckMessageResponse, HeartbeatRequest, HeartbeatResponse, QueryRouteRequest,
- QueryRouteResponse, ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest,
- SendMessageResponse, TelemetryCommand,
+ AckMessageRequest, AckMessageResponse, EndTransactionRequest, EndTransactionResponse,
+ HeartbeatRequest, HeartbeatResponse, QueryRouteRequest, QueryRouteResponse,
+ ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest, SendMessageResponse,
+ TelemetryCommand,
};
use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
+const OPERATION_START: &str = "session.start";
+const OPERATION_UPDATE_SETTINGS: &str = "session.update_settings";
+
+const OPERATION_QUERY_ROUTE: &str = "rpc.query_route";
+const OPERATION_HEARTBEAT: &str = "rpc.heartbeat";
+const OPERATION_SEND_MESSAGE: &str = "rpc.send_message";
+const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message";
+const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction";
+
#[async_trait]
#[automock]
pub(crate) trait RPCClient {
- const OPERATION_START: &'static str = "session.start";
- const OPERATION_UPDATE_SETTINGS: &'static str = "session.update_settings";
-
- const OPERATION_QUERY_ROUTE: &'static str = "rpc.query_route";
- const OPERATION_HEARTBEAT: &'static str = "rpc.heartbeat";
- const OPERATION_SEND_MESSAGE: &'static str = "rpc.send_message";
- const OPERATION_RECEIVE_MESSAGE: &'static str = "rpc.receive_message";
- const OPERATION_ACK_MESSAGE: &'static str = "rpc.ack_message";
-
async fn query_route(
&mut self,
request: QueryRouteRequest,
@@ -71,6 +74,10 @@
&mut self,
request: AckMessageRequest,
) -> Result<AckMessageResponse, ClientError>;
+ async fn end_transaction(
+ &mut self,
+ request: EndTransactionRequest,
+ ) -> Result<EndTransactionResponse, ClientError>;
}
#[allow(dead_code)]
@@ -255,13 +262,17 @@
}
}
- pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> {
+ pub(crate) async fn start(
+ &mut self,
+ settings: TelemetryCommand,
+ telemetry_command_tx: mpsc::Sender<Command>,
+ ) -> Result<(), ClientError> {
let (tx, rx) = mpsc::channel(16);
tx.send(settings).await.map_err(|e| {
ClientError::new(
ErrorKind::ChannelSend,
"failed to send telemetry command",
- Self::OPERATION_START,
+ OPERATION_START,
)
.set_source(e)
})?;
@@ -272,7 +283,7 @@
ClientError::new(
ErrorKind::ClientInternal,
"send rpc telemetry failed",
- Self::OPERATION_START,
+ OPERATION_START,
)
.set_source(e)
})?;
@@ -281,10 +292,12 @@
tokio::spawn(async move {
let mut stream = response.into_inner();
loop {
- // TODO handle server stream
match stream.message().await {
Ok(Some(item)) => {
- debug!(logger, "telemetry command: {:?}", item);
+ debug!(logger, "receive telemetry command: {:?}", item);
+ if let Some(command) = item.command {
+ _ = telemetry_command_tx.send(command).await;
+ }
}
Ok(None) => {
debug!(logger, "request stream closed");
@@ -315,8 +328,9 @@
return Err(ClientError::new(
ErrorKind::ClientIsNotRunning,
"session is not started",
- Self::OPERATION_UPDATE_SETTINGS,
- ));
+ OPERATION_UPDATE_SETTINGS,
+ )
+ .with_context("url", self.endpoints.endpoint_url()));
}
if let Some(tx) = self.telemetry_tx.as_ref() {
@@ -324,7 +338,7 @@
ClientError::new(
ErrorKind::ChannelSend,
"failed to send telemetry command",
- Self::OPERATION_UPDATE_SETTINGS,
+ OPERATION_UPDATE_SETTINGS,
)
.set_source(e)
})?;
@@ -345,7 +359,7 @@
ClientError::new(
ErrorKind::ClientInternal,
"send rpc query_route failed",
- Self::OPERATION_QUERY_ROUTE,
+ OPERATION_QUERY_ROUTE,
)
.set_source(e)
})?;
@@ -361,7 +375,7 @@
ClientError::new(
ErrorKind::ClientInternal,
"send rpc heartbeat failed",
- Self::OPERATION_HEARTBEAT,
+ OPERATION_HEARTBEAT,
)
.set_source(e)
})?;
@@ -377,7 +391,7 @@
ClientError::new(
ErrorKind::ClientInternal,
"send rpc send_message failed",
- Self::OPERATION_SEND_MESSAGE,
+ OPERATION_SEND_MESSAGE,
)
.set_source(e)
})?;
@@ -399,7 +413,7 @@
ClientError::new(
ErrorKind::ClientInternal,
"send rpc receive_message failed",
- Self::OPERATION_RECEIVE_MESSAGE,
+ OPERATION_RECEIVE_MESSAGE,
)
.set_source(e)
})?
@@ -411,7 +425,7 @@
ClientError::new(
ErrorKind::ClientInternal,
"receive message failed: error in reading stream",
- Self::OPERATION_RECEIVE_MESSAGE,
+ OPERATION_RECEIVE_MESSAGE,
)
.set_source(e)
})?;
@@ -429,7 +443,23 @@
ClientError::new(
ErrorKind::ClientInternal,
"send rpc ack_message failed",
- Self::OPERATION_ACK_MESSAGE,
+ OPERATION_ACK_MESSAGE,
+ )
+ .set_source(e)
+ })?;
+ Ok(response.into_inner())
+ }
+
+ async fn end_transaction(
+ &mut self,
+ request: EndTransactionRequest,
+ ) -> Result<EndTransactionResponse, ClientError> {
+ let request = self.sign(request);
+ let response = self.stub.end_transaction(request).await.map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc end_transaction failed",
+ OPERATION_END_TRANSACTION,
)
.set_source(e)
})?;
@@ -462,6 +492,7 @@
&self,
endpoints: &Endpoints,
settings: TelemetryCommand,
+ telemetry_command_tx: mpsc::Sender<Command>,
) -> Result<Session, ClientError> {
let mut session_map = self.session_map.lock().await;
let endpoint_url = endpoints.endpoint_url().to_string();
@@ -475,7 +506,7 @@
&self.option,
)
.await?;
- session.start(settings).await?;
+ session.start(settings, telemetry_command_tx).await?;
session_map.insert(endpoint_url.clone(), session.clone());
Ok(session)
};
@@ -562,11 +593,12 @@
let mut session = session.unwrap();
+ let (tx, _) = mpsc::channel(16);
let result = session
- .start(build_producer_settings(
- &ProducerOption::default(),
- &ClientOption::default(),
- ))
+ .start(
+ build_producer_settings(&ProducerOption::default(), &ClientOption::default()),
+ tx,
+ )
.await;
assert!(result.is_ok());
assert!(session.is_started());
@@ -577,7 +609,6 @@
let mut server = RocketMQMockServer::start_default().await;
server.setup(
MockBuilder::when()
- // 👇 RPC prefix
.path("/apache.rocketmq.v2.MessagingService/Telemetry")
.then()
.return_status(Code::Ok),
@@ -588,10 +619,12 @@
client_option.set_enable_tls(false);
let session_manager =
SessionManager::new(&logger, "test_client".to_string(), &client_option);
+ let (tx, _) = mpsc::channel(16);
let session = session_manager
.get_or_create_session(
&Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
build_producer_settings(&ProducerOption::default(), &client_option),
+ tx,
)
.await
.unwrap();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index 28e9d66..48e0ce9 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -82,7 +82,7 @@
}
/// Start the simple consumer
- pub async fn start(&self) -> Result<(), ClientError> {
+ pub async fn start(&mut self) -> Result<(), ClientError> {
if self.option.consumer_group().is_empty() {
return Err(ClientError::new(
ErrorKind::Config,
@@ -90,12 +90,12 @@
Self::OPERATION_START_SIMPLE_CONSUMER,
));
}
+ self.client.start().await;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
}
}
- self.client.start();
info!(
self.logger,
"start simple consumer success, client_id: {}",
@@ -175,9 +175,9 @@
#[cfg(test)]
mod tests {
- use crate::log::terminal_logger;
use std::sync::Arc;
+ use crate::log::terminal_logger;
use crate::model::common::{FilterType, Route};
use crate::pb::{
AckMessageResultEntry, Broker, Message, MessageQueue, Resource, SystemProperties,