[ISSUE #555] Support transaction message for Rust SDK (#556)

* feat(rust): implement transaction checker

Signed-off-by: SSpirits <admin@lv5.moe>

* feat(rust): implement transaction producer

Signed-off-by: SSpirits <admin@lv5.moe>

* feat(rust): update readme

Signed-off-by: SSpirits <admin@lv5.moe>

* feat(rust): commit transaction in example

Signed-off-by: SSpirits <admin@lv5.moe>

---------

Signed-off-by: SSpirits <admin@lv5.moe>
diff --git a/README-CN.md b/README-CN.md
index a4716a6..266a2f6 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -22,7 +22,7 @@
 | Producer with standard messages                |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
 | Producer with FIFO messages                    |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
 | Producer with timed/delay messages             |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
-| Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅    |   🚧   |   🚧    |    🚧    |   🚧   |
+| Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
 | Simple consumer                                |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
 | Push consumer with concurrent message listener |   ✅   |   ✅   |   🚧   |   🚧    |   🚧   |   🚧    |    🚧    |   🚧   |
 | Push consumer with FIFO message listener       |   ✅   |   ✅   |   🚧   |   🚧    |   🚧   |   🚧    |    🚧    |   🚧   |
diff --git a/README.md b/README.md
index 13bd8bd..916e688 100644
--- a/README.md
+++ b/README.md
@@ -22,7 +22,7 @@
 | Producer with standard messages                |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
 | Producer with FIFO messages                    |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
 | Producer with timed/delay messages             |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
-| Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅    |   🚧   |   🚧    |    🚧    |   🚧   |
+| Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
 | Simple consumer                                |   ✅   |   ✅   |   ✅   |   ✅    |   ✅   |   🚧    |    🚧    |   🚧   |
 | Push consumer with concurrent message listener |   ✅   |   ✅   |   🚧   |   🚧    |   🚧   |   🚧    |    🚧    |   🚧   |
 | Push consumer with FIFO message listener       |   ✅   |   ✅   |   🚧   |   🚧    |   🚧   |   🚧    |    🚧    |   🚧   |
diff --git a/rust/.cargo/Cargo.lock.min b/rust/.cargo/Cargo.lock.min
index 3b2ccb8..cb6ecc2 100644
--- a/rust/.cargo/Cargo.lock.min
+++ b/rust/.cargo/Cargo.lock.min
@@ -1266,7 +1266,7 @@
 
 [[package]]
 name = "rocketmq"
-version = "0.1.1"
+version = "0.1.2"
 dependencies = [
  "anyhow",
  "async-trait",
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index d7655d1..911b9ed 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -16,7 +16,7 @@
 #
 [package]
 name = "rocketmq"
-version = "0.1.1"
+version = "0.1.2"
 edition = "2021"
 rust-version = "1.61"
 authors = [
diff --git a/rust/examples/delay_producer.rs b/rust/examples/delay_producer.rs
new file mode 100644
index 0000000..3e237c3
--- /dev/null
+++ b/rust/examples/delay_producer.rs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::ops::Add;
+use std::time::{Duration, SystemTime};
+
+use rocketmq::conf::{ClientOption, ProducerOption};
+use rocketmq::model::message::MessageBuilder;
+use rocketmq::Producer;
+
+#[tokio::main]
+async fn main() {
+    // recommend to specify which topic(s) you would like to send message to
+    // producer will prefetch topic route when start and failed fast if topic not exist
+    let mut producer_option = ProducerOption::default();
+    producer_option.set_topics(vec!["delay_test"]);
+
+    // set which rocketmq proxy to connect
+    let mut client_option = ClientOption::default();
+    client_option.set_access_url("localhost:8081");
+
+    // build and start producer
+    let mut producer = Producer::new(producer_option, client_option).unwrap();
+    producer.start().await.unwrap();
+
+    // build message
+    let message = MessageBuilder::delay_message_builder(
+        "delay_test",
+        "hello world".as_bytes().to_vec(),
+        // deliver in 15 seconds
+        SystemTime::now()
+            .add(Duration::from_secs(15))
+            .duration_since(SystemTime::UNIX_EPOCH)
+            .unwrap()
+            .as_secs() as i64,
+    )
+    .build()
+    .unwrap();
+
+    // send message to rocketmq proxy
+    let result = producer.send(message).await;
+    debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+    println!(
+        "send message success, message_id={}",
+        result.unwrap().message_id()
+    );
+}
diff --git a/rust/examples/fifo_producer.rs b/rust/examples/fifo_producer.rs
new file mode 100644
index 0000000..38562e2
--- /dev/null
+++ b/rust/examples/fifo_producer.rs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use rocketmq::conf::{ClientOption, ProducerOption};
+use rocketmq::model::message::MessageBuilder;
+use rocketmq::Producer;
+
+#[tokio::main]
+async fn main() {
+    // recommend to specify which topic(s) you would like to send message to
+    // producer will prefetch topic route when start and failed fast if topic not exist
+    let mut producer_option = ProducerOption::default();
+    producer_option.set_topics(vec!["fifo_test"]);
+
+    // set which rocketmq proxy to connect
+    let mut client_option = ClientOption::default();
+    client_option.set_access_url("localhost:8081");
+
+    // build and start producer
+    let mut producer = Producer::new(producer_option, client_option).unwrap();
+    producer.start().await.unwrap();
+
+    // build message
+    let message = MessageBuilder::fifo_message_builder(
+        "fifo_test",
+        "hello world".as_bytes().to_vec(),
+        // message partition
+        "message_group",
+    )
+    .build()
+    .unwrap();
+
+    // send message to rocketmq proxy
+    let result = producer.send(message).await;
+    debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+    println!(
+        "send message success, message_id={}",
+        result.unwrap().message_id()
+    );
+}
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index acacabb..335b31f 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -30,7 +30,7 @@
     client_option.set_access_url("localhost:8081");
 
     // build and start producer
-    let producer = Producer::new(producer_option, client_option).unwrap();
+    let mut producer = Producer::new(producer_option, client_option).unwrap();
     producer.start().await.unwrap();
 
     // build message
@@ -42,7 +42,7 @@
         .unwrap();
 
     // send message to rocketmq proxy
-    let result = producer.send_one(message).await;
+    let result = producer.send(message).await;
     debug_assert!(result.is_ok(), "send message failed: {:?}", result);
     println!(
         "send message success, message_id={}",
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index 33aae9d..a5f9408 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -32,7 +32,7 @@
     client_option.set_enable_tls(false);
 
     // build and start simple consumer
-    let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
+    let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
     consumer.start().await.unwrap();
 
     loop {
diff --git a/rust/examples/transaction_producer.rs b/rust/examples/transaction_producer.rs
new file mode 100644
index 0000000..07516eb
--- /dev/null
+++ b/rust/examples/transaction_producer.rs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use rocketmq::conf::{ClientOption, ProducerOption};
+use rocketmq::model::message::MessageBuilder;
+use rocketmq::model::transaction::{Transaction, TransactionResolution};
+use rocketmq::Producer;
+
+#[tokio::main]
+async fn main() {
+    // recommend to specify which topic(s) you would like to send message to
+    // producer will prefetch topic route when start and failed fast if topic not exist
+    let mut producer_option = ProducerOption::default();
+    producer_option.set_topics(vec!["transaction_test"]);
+
+    // set which rocketmq proxy to connect
+    let mut client_option = ClientOption::default();
+    client_option.set_access_url("localhost:8081");
+
+    // build and start producer
+    let mut producer = Producer::new_transaction_producer(
+        producer_option,
+        client_option,
+        Box::new(|transaction_id, message| {
+            println!(
+                "receive transaction check request: transaction_id: {}, message: {:?}",
+                transaction_id, message
+            );
+            TransactionResolution::COMMIT
+        }),
+    )
+    .unwrap();
+    producer.start().await.unwrap();
+
+    // build message
+    let message = MessageBuilder::transaction_message_builder(
+        "transaction_test",
+        "hello world".as_bytes().to_vec(),
+    )
+    .build()
+    .unwrap();
+
+    // send message to rocketmq proxy
+    let result = producer.send_transaction_message(message).await;
+    if let Err(error) = result {
+        eprintln!("send message failed: {:?}", error);
+        return;
+    }
+    let transaction = result.unwrap();
+    println!(
+        "send message success, message_id={}, transaction_id={}",
+        transaction.message_id(),
+        transaction.transaction_id()
+    );
+    let result = transaction.commit().await;
+    debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
+}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 4b601fa..27cfb4d 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 use std::clone::Clone;
+use std::fmt::Debug;
 use std::string::ToString;
 use std::{collections::HashMap, sync::atomic::AtomicUsize, sync::Arc};
 
@@ -24,24 +25,26 @@
 use prost_types::Duration;
 use slog::{debug, error, info, o, warn, Logger};
 use tokio::select;
-use tokio::sync::oneshot;
+use tokio::sync::{mpsc, oneshot};
 
 use crate::conf::ClientOption;
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
-use crate::model::message::AckMessageEntry;
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::model::transaction::TransactionChecker;
 use crate::pb;
 use crate::pb::receive_message_response::Content;
+use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
 use crate::pb::{
-    AckMessageRequest, AckMessageResultEntry, Code, FilterExpression, HeartbeatRequest,
-    HeartbeatResponse, Message, MessageQueue, QueryRouteRequest, ReceiveMessageRequest, Resource,
-    SendMessageRequest, Status, TelemetryCommand,
+    AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression,
+    HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, QueryRouteRequest,
+    ReceiveMessageRequest, Resource, SendMessageRequest, Status, TelemetryCommand,
+    TransactionSource,
 };
 #[double]
 use crate::session::SessionManager;
 use crate::session::{RPCClient, Session};
 
-#[derive(Debug)]
 pub(crate) struct Client {
     logger: Logger,
     option: ClientOption,
@@ -50,6 +53,8 @@
     id: String,
     access_endpoints: Endpoints,
     settings: TelemetryCommand,
+    transaction_checker: Option<Box<TransactionChecker>>,
+    telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
 }
 
 lazy_static::lazy_static! {
@@ -57,11 +62,24 @@
 }
 
 const OPERATION_CLIENT_NEW: &str = "client.new";
+const OPERATION_GET_SESSION: &str = "client.get_session";
 const OPERATION_QUERY_ROUTE: &str = "client.query_route";
 const OPERATION_HEARTBEAT: &str = "client.heartbeat";
 const OPERATION_SEND_MESSAGE: &str = "client.send_message";
 const OPERATION_RECEIVE_MESSAGE: &str = "client.receive_message";
 const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
+const OPERATION_END_TRANSACTION: &str = "client.end_transaction";
+const OPERATION_HANDLE_TELEMETRY_COMMAND: &str = "client.handle_telemetry_command";
+
+impl Debug for Client {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Client")
+            .field("id", &self.id)
+            .field("access_endpoints", &self.access_endpoints)
+            .field("option", &self.option)
+            .finish()
+    }
+}
 
 #[automock]
 impl Client {
@@ -82,10 +100,23 @@
             id,
             access_endpoints: endpoints,
             settings,
+            transaction_checker: None,
+            telemetry_command_tx: None,
         })
     }
 
-    pub(crate) fn start(&self) {
+    pub(crate) fn is_started(&self) -> bool {
+        self.telemetry_command_tx.is_some()
+    }
+
+    pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
+        if self.is_started() {
+            panic!("client {} is started, can not be modified", self.id)
+        }
+        self.transaction_checker = Some(transaction_checker);
+    }
+
+    pub(crate) async fn start(&mut self) {
         let logger = self.logger.clone();
         let session_manager = self.session_manager.clone();
 
@@ -93,7 +124,14 @@
         let namespace = self.option.namespace.to_string();
         let client_type = self.option.client_type.clone();
 
+        // send heartbeat and handle telemetry command
+        let (tx, mut rx) = mpsc::channel(16);
+        self.telemetry_command_tx = Some(tx);
+        let rpc_client = self.get_session().await.unwrap();
+        let endpoints = self.access_endpoints.clone();
+        let transaction_checker = self.transaction_checker.take();
         tokio::spawn(async move {
+            rpc_client.is_started();
             let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
             loop {
                 select! {
@@ -132,11 +170,70 @@
                             debug!(logger,"send heartbeat to server success, peer={}",peer);
                         }
                     },
+                    command = rx.recv() => {
+                        if let Some(command) = command {
+                            let result = Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker, endpoints.clone(), command).await;
+                            if let Err(error) = result {
+                                error!(logger, "handle telemetry command failed: {:?}", error)
+                            }
+                        }
+                    },
                 }
             }
         });
     }
 
+    async fn handle_telemetry_command<T: RPCClient + 'static>(
+        mut rpc_client: T,
+        transaction_checker: &Option<Box<TransactionChecker>>,
+        endpoints: Endpoints,
+        command: pb::telemetry_command::Command,
+    ) -> Result<(), ClientError> {
+        return match command {
+            RecoverOrphanedTransactionCommand(command) => {
+                let transaction_id = command.transaction_id;
+                let message = command.message.unwrap();
+                let message_id = message
+                    .system_properties
+                    .as_ref()
+                    .unwrap()
+                    .message_id
+                    .clone();
+                let topic = message.topic.as_ref().unwrap().clone();
+                if let Some(transaction_checker) = transaction_checker {
+                    let resolution = transaction_checker(
+                        transaction_id.clone(),
+                        MessageView::from_pb_message(message, endpoints),
+                    );
+
+                    let response = rpc_client
+                        .end_transaction(EndTransactionRequest {
+                            topic: Some(topic),
+                            message_id: message_id.to_string(),
+                            transaction_id,
+                            resolution: resolution as i32,
+                            source: TransactionSource::SourceServerCheck as i32,
+                            trace_context: "".to_string(),
+                        })
+                        .await?;
+                    Self::handle_response_status(response.status, OPERATION_END_TRANSACTION)
+                } else {
+                    Err(ClientError::new(
+                        ErrorKind::Config,
+                        "failed to get transaction checker",
+                        OPERATION_END_TRANSACTION,
+                    ))
+                }
+            }
+            _ => Err(ClientError::new(
+                ErrorKind::Config,
+                "receive telemetry command but there is no handler",
+                OPERATION_HANDLE_TELEMETRY_COMMAND,
+            )
+            .with_context("command", format!("{:?}", command))),
+        };
+    }
+
     #[allow(dead_code)]
     pub(crate) fn client_id(&self) -> &str {
         &self.id
@@ -161,10 +258,22 @@
         )
     }
 
-    async fn get_session(&self) -> Result<Session, ClientError> {
+    pub(crate) async fn get_session(&self) -> Result<Session, ClientError> {
+        if !self.is_started() {
+            return Err(ClientError::new(
+                ErrorKind::ClientIsNotRunning,
+                "client is not started",
+                OPERATION_GET_SESSION,
+            )
+            .with_context("client_id", self.id.clone()));
+        }
         let session = self
             .session_manager
-            .get_or_create_session(&self.access_endpoints, self.settings.clone())
+            .get_or_create_session(
+                &self.access_endpoints,
+                self.settings.clone(),
+                self.telemetry_command_tx.clone().unwrap(),
+            )
             .await?;
         Ok(session)
     }
@@ -175,12 +284,16 @@
     ) -> Result<Session, ClientError> {
         let session = self
             .session_manager
-            .get_or_create_session(endpoints, self.settings.clone())
+            .get_or_create_session(
+                endpoints,
+                self.settings.clone(),
+                self.telemetry_command_tx.clone().unwrap(),
+            )
             .await?;
         Ok(session)
     }
 
-    fn handle_response_status(
+    pub(crate) fn handle_response_status(
         status: Option<Status>,
         operation: &'static str,
     ) -> Result<(), ClientError> {
@@ -489,22 +602,24 @@
 
 #[cfg(test)]
 pub(crate) mod tests {
-    use lazy_static::lazy_static;
     use std::sync::atomic::{AtomicUsize, Ordering};
     use std::sync::Arc;
     use std::thread::sleep;
     use std::time::Duration;
 
+    use lazy_static::lazy_static;
+
     use crate::client::Client;
     use crate::conf::ClientOption;
     use crate::error::{ClientError, ErrorKind};
     use crate::log::terminal_logger;
     use crate::model::common::{ClientType, Route};
+    use crate::model::transaction::TransactionResolution;
     use crate::pb::receive_message_response::Content;
     use crate::pb::{
-        AckMessageEntry, AckMessageResponse, Code, FilterExpression, HeartbeatResponse, Message,
-        MessageQueue, QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse,
-        Status, TelemetryCommand,
+        AckMessageEntry, AckMessageResponse, Code, EndTransactionResponse, FilterExpression,
+        HeartbeatResponse, Message, MessageQueue, QueryRouteResponse, ReceiveMessageResponse,
+        Resource, SendMessageResponse, Status, SystemProperties, TelemetryCommand,
     };
     use crate::session;
 
@@ -524,10 +639,13 @@
             id: Client::generate_client_id(),
             access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
             settings: TelemetryCommand::default(),
+            transaction_checker: None,
+            telemetry_command_tx: None,
         }
     }
 
     fn new_client_with_session_manager(session_manager: SessionManager) -> Client {
+        let (tx, _) = mpsc::channel(16);
         Client {
             logger: terminal_logger(),
             option: ClientOption::default(),
@@ -536,6 +654,8 @@
             id: Client::generate_client_id(),
             access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
             settings: TelemetryCommand::default(),
+            transaction_checker: None,
+            telemetry_command_tx: Some(tx),
         }
     }
 
@@ -565,9 +685,12 @@
         session_manager
             .expect_get_all_sessions()
             .returning(|| Ok(vec![]));
+        session_manager
+            .expect_get_or_create_session()
+            .returning(|_, _, _| Ok(Session::mock()));
 
-        let client = new_client_with_session_manager(session_manager);
-        client.start();
+        let mut client = new_client_with_session_manager(session_manager);
+        client.start().await;
 
         // TODO use countdown latch instead sleeping
         // wait for run
@@ -580,7 +703,7 @@
         let mut session_manager = SessionManager::default();
         session_manager
             .expect_get_or_create_session()
-            .returning(|_, _| Ok(Session::mock()));
+            .returning(|_, _, _| Ok(Session::mock()));
 
         let client = new_client_with_session_manager(session_manager);
         let result = client.get_session().await;
@@ -887,4 +1010,33 @@
         assert_eq!(error.message, "server return an error");
         assert_eq!(error.operation, "client.ack_message");
     }
+
+    #[tokio::test]
+    async fn client_handle_telemetry_command() {
+        let response = Ok(EndTransactionResponse {
+            status: Some(Status {
+                code: Code::Ok as i32,
+                message: "".to_string(),
+            }),
+        });
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_end_transaction()
+            .return_once(|_| Box::pin(futures::future::ready(response)));
+        let result = Client::handle_telemetry_command(
+            mock,
+            &Some(Box::new(|_, _| TransactionResolution::COMMIT)),
+            Endpoints::from_url("localhopst:8081").unwrap(),
+            RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand {
+                message: Some(Message {
+                    topic: Some(Resource::default()),
+                    user_properties: Default::default(),
+                    system_properties: Some(SystemProperties::default()),
+                    body: vec![],
+                }),
+                transaction_id: "".to_string(),
+            }),
+        )
+        .await;
+        assert!(result.is_ok())
+    }
 }
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 95b7adc..ea27d27 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -46,7 +46,7 @@
             group: "".to_string(),
             namespace: "".to_string(),
             access_url: "localhost:8081".to_string(),
-            enable_tls: true,
+            enable_tls: false,
             timeout: Duration::from_secs(3),
             long_polling_timeout: Duration::from_secs(40),
             access_key: None,
@@ -266,7 +266,7 @@
     fn conf_client_option() {
         let option = ClientOption::default();
         assert_eq!(option.access_url(), "localhost:8081");
-        assert!(option.enable_tls());
+        assert!(!option.enable_tls());
         assert_eq!(option.timeout(), &Duration::from_secs(3));
         assert_eq!(option.long_polling_timeout(), &Duration::from_secs(40));
     }
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index bd646e7..875b359 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -48,7 +48,7 @@
 //!     client_option.set_access_url("localhost:8081");
 //!
 //!     // build and start producer
-//!     let producer = Producer::new(producer_option, client_option).unwrap();
+//!     let mut  producer = Producer::new(producer_option, client_option).unwrap();
 //!     producer.start().await.unwrap();
 //!
 //!     // build message
@@ -60,7 +60,7 @@
 //!         .unwrap();
 //!
 //!     // send message to rocketmq proxy
-//!     let result = producer.send_one(message).await;
+//!     let result = producer.send(message).await;
 //!     debug_assert!(result.is_ok(), "send message failed: {:?}", result);
 //!     println!(
 //!         "send message success, message_id={}",
@@ -88,7 +88,7 @@
 //!     client_option.set_access_url("localhost:8081");
 //!
 //!     // build and start simple consumer
-//!     let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
+//!     let mut  consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
 //!     consumer.start().await.unwrap();
 //!
 //!     // pop message from rocketmq proxy
@@ -119,6 +119,10 @@
 //! ```
 //!
 
+// Export structs that are part of crate API.
+pub use producer::Producer;
+pub use simple_consumer::SimpleConsumer;
+
 #[allow(dead_code)]
 pub mod conf;
 pub mod error;
@@ -138,7 +142,3 @@
 
 mod producer;
 mod simple_consumer;
-
-// Export structs that are part of crate API.
-pub use producer::Producer;
-pub use simple_consumer::SimpleConsumer;
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 79a0250..a2e9405 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -17,11 +17,12 @@
 
 //! Message data model of RocketMQ rust client.
 
+use std::collections::HashMap;
+
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::Endpoints;
 use crate::model::message_id::UNIQ_ID_GENERATOR;
 use crate::pb;
-use std::collections::HashMap;
 
 /// [`Message`] is the data model for sending.
 pub trait Message {
@@ -33,6 +34,7 @@
     fn take_properties(&mut self) -> HashMap<String, String>;
     fn take_message_group(&mut self) -> Option<String>;
     fn take_delivery_timestamp(&mut self) -> Option<i64>;
+    fn transaction_enabled(&mut self) -> bool;
 }
 
 pub(crate) struct MessageImpl {
@@ -44,6 +46,7 @@
     pub(crate) properties: Option<HashMap<String, String>>,
     pub(crate) message_group: Option<String>,
     pub(crate) delivery_timestamp: Option<i64>,
+    pub(crate) transaction_enabled: bool,
 }
 
 impl Message for MessageImpl {
@@ -78,6 +81,10 @@
     fn take_delivery_timestamp(&mut self) -> Option<i64> {
         self.delivery_timestamp.take()
     }
+
+    fn transaction_enabled(&mut self) -> bool {
+        self.transaction_enabled
+    }
 }
 
 /// [`MessageBuilder`] is the builder for [`Message`].
@@ -100,6 +107,7 @@
                 properties: None,
                 message_group: None,
                 delivery_timestamp: None,
+                transaction_enabled: false,
             },
         }
     }
@@ -126,6 +134,7 @@
                 properties: None,
                 message_group: Some(message_group.into()),
                 delivery_timestamp: None,
+                transaction_enabled: false,
             },
         }
     }
@@ -152,6 +161,29 @@
                 properties: None,
                 message_group: None,
                 delivery_timestamp: Some(delay_time),
+                transaction_enabled: false,
+            },
+        }
+    }
+
+    /// Create a new [`MessageBuilder`] for building a transaction message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/04transactionmessage)
+    ///
+    /// # Arguments
+    ///
+    /// * `topic` - topic of the message
+    /// * `body` - message body
+    pub fn transaction_message_builder(topic: impl Into<String>, body: Vec<u8>) -> MessageBuilder {
+        MessageBuilder {
+            message: MessageImpl {
+                message_id: UNIQ_ID_GENERATOR.lock().next_id(),
+                topic: topic.into(),
+                body: Some(body),
+                tag: None,
+                keys: None,
+                properties: None,
+                message_group: None,
+                delivery_timestamp: None,
+                transaction_enabled: true,
             },
         }
     }
@@ -210,6 +242,14 @@
         self
     }
 
+    /// Mark this message as the beginning transaction, which is required for the transaction message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/04transactionmessage)
+    ///
+    /// The transaction message could not have message group and delivery timestamp
+    pub fn enable_transaction(mut self) -> Self {
+        self.message.transaction_enabled = true;
+        self
+    }
+
     fn check_message(&self) -> Result<(), String> {
         if self.message.topic.is_empty() {
             return Err("Topic is empty.".to_string());
@@ -222,6 +262,14 @@
                 "message_group and delivery_timestamp can not be set at the same time.".to_string(),
             );
         }
+        if self.message.transaction_enabled
+            && (self.message.message_group.is_some() || self.message.delivery_timestamp.is_some())
+        {
+            return Err(
+                "message_group and delivery_timestamp can not be set for transaction message."
+                    .to_string(),
+            );
+        }
         Ok(())
     }
 
@@ -249,6 +297,7 @@
 pub struct MessageView {
     pub(crate) message_id: String,
     pub(crate) receipt_handle: Option<String>,
+    pub(crate) namespace: String,
     pub(crate) topic: String,
     pub(crate) body: Vec<u8>,
     pub(crate) tag: Option<String>,
@@ -283,10 +332,12 @@
 impl MessageView {
     pub(crate) fn from_pb_message(message: pb::Message, endpoints: Endpoints) -> Self {
         let system_properties = message.system_properties.unwrap();
+        let topic = message.topic.unwrap();
         MessageView {
             message_id: system_properties.message_id,
             receipt_handle: system_properties.receipt_handle,
-            topic: message.topic.unwrap().name,
+            namespace: topic.resource_namespace,
+            topic: topic.name,
             body: message.body,
             tag: system_properties.tag,
             keys: system_properties.keys,
@@ -305,7 +356,12 @@
         &self.message_id
     }
 
-    /// Get topic of message
+    /// Get topic namespace of message
+    pub fn namespace(&self) -> &str {
+        &self.namespace
+    }
+
+    /// Get topic name of message
     pub fn topic(&self) -> &str {
         &self.topic
     }
@@ -410,6 +466,10 @@
             MessageBuilder::delay_message_builder("test", vec![1, 2, 3], 123456789).build();
         let mut message = message.unwrap();
         assert_eq!(message.take_delivery_timestamp(), Some(123456789));
+
+        let message = MessageBuilder::transaction_message_builder("test", vec![1, 2, 3]).build();
+        let mut message = message.unwrap();
+        assert!(message.transaction_enabled());
     }
 
     #[test]
diff --git a/rust/src/model/mod.rs b/rust/src/model/mod.rs
index 8ac95e2..ca9a13c 100644
--- a/rust/src/model/mod.rs
+++ b/rust/src/model/mod.rs
@@ -20,3 +20,4 @@
 pub mod common;
 pub mod message;
 pub(crate) mod message_id;
+pub mod transaction;
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
new file mode 100644
index 0000000..62a4c9a
--- /dev/null
+++ b/rust/src/model/transaction.rs
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//! Transaction data model of RocketMQ rust client.
+
+use std::fmt::{Debug, Formatter};
+
+use async_trait::async_trait;
+
+use crate::client::Client;
+use crate::error::ClientError;
+use crate::model::common::SendReceipt;
+use crate::model::message::MessageView;
+use crate::pb::{EndTransactionRequest, Resource, TransactionSource};
+use crate::session::RPCClient;
+
+/// An entity to describe an independent transaction.
+///
+/// Once the request of commit of roll-back reached server, subsequently arrived commit or roll-back request in
+/// [`Transaction`] would be ignored by the server.
+///
+/// If transaction is not commit/roll-back in time, it is suspended until it is solved by [`TransactionChecker`]
+/// or reach the end of life.
+#[async_trait]
+pub trait Transaction {
+    /// Try to commit the transaction, which would expose the message before the transaction is closed if no exception thrown.
+    /// What you should pay more attention to is that the commitment may be successful even exception is thrown.
+    async fn commit(self) -> Result<(), ClientError>;
+
+    /// Try to roll back the transaction, which would expose the message before the transaction is closed if no exception thrown.
+    /// What you should pay more attention to is that the roll-back may be successful even exception is thrown.
+    async fn rollback(self) -> Result<(), ClientError>;
+
+    /// Get message id
+    fn message_id(&self) -> &str;
+
+    /// Get transaction id
+    fn transaction_id(&self) -> &str;
+}
+
+pub(crate) struct TransactionImpl {
+    rpc_client: Box<dyn RPCClient + Send + Sync>,
+    topic: Resource,
+    send_receipt: SendReceipt,
+}
+
+impl Debug for TransactionImpl {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TransactionImpl")
+            .field("transaction_id", &self.send_receipt.transaction_id())
+            .field("message_id", &self.send_receipt.message_id())
+            .finish()
+    }
+}
+
+impl TransactionImpl {
+    pub(crate) fn new(
+        rpc_client: Box<dyn RPCClient + Send + Sync>,
+        topic: Resource,
+        send_receipt: SendReceipt,
+    ) -> TransactionImpl {
+        TransactionImpl {
+            rpc_client,
+            topic,
+            send_receipt,
+        }
+    }
+
+    async fn end_transaction(
+        mut self,
+        resolution: TransactionResolution,
+    ) -> Result<(), ClientError> {
+        let response = self
+            .rpc_client
+            .end_transaction(EndTransactionRequest {
+                topic: Some(self.topic),
+                message_id: self.send_receipt.message_id().to_string(),
+                transaction_id: self.send_receipt.transaction_id().to_string(),
+                resolution: resolution as i32,
+                source: TransactionSource::SourceClient as i32,
+                trace_context: "".to_string(),
+            })
+            .await?;
+        Client::handle_response_status(response.status, "end transaction")
+    }
+}
+
+#[async_trait]
+impl Transaction for TransactionImpl {
+    async fn commit(mut self) -> Result<(), ClientError> {
+        return self.end_transaction(TransactionResolution::COMMIT).await;
+    }
+
+    async fn rollback(mut self) -> Result<(), ClientError> {
+        return self.end_transaction(TransactionResolution::ROLLBACK).await;
+    }
+
+    fn message_id(&self) -> &str {
+        self.send_receipt.message_id()
+    }
+
+    fn transaction_id(&self) -> &str {
+        self.send_receipt.transaction_id()
+    }
+}
+
+/// Resolution of Transaction.
+#[repr(i32)]
+pub enum TransactionResolution {
+    /// Notify server that current transaction should be committed.
+    COMMIT = 1,
+    /// Notify server that current transaction should be roll-backed.
+    ROLLBACK = 2,
+    /// Notify the server that the state of this transaction is not sure. You should be cautious before return unknown
+    /// because the examination from the server will be performed periodically.
+    UNKNOWN = 0,
+}
+
+/// A closure to check the state of transaction.
+pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync;
+
+#[cfg(test)]
+mod tests {
+    use crate::error::ClientError;
+    use crate::model::common::SendReceipt;
+    use crate::model::transaction::{Transaction, TransactionImpl};
+    use crate::pb::{Code, EndTransactionResponse, Resource, SendResultEntry, Status};
+    use crate::session;
+
+    #[tokio::test]
+    async fn transaction_commit() -> Result<(), ClientError> {
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_end_transaction().return_once(|_| {
+            Box::pin(futures::future::ready(Ok(EndTransactionResponse {
+                status: Some(Status {
+                    code: Code::Ok as i32,
+                    message: "".to_string(),
+                }),
+            })))
+        });
+        let transaction = TransactionImpl::new(
+            Box::new(mock),
+            Resource {
+                resource_namespace: "".to_string(),
+                name: "".to_string(),
+            },
+            SendReceipt::from_pb_send_result(&SendResultEntry {
+                status: None,
+                message_id: "".to_string(),
+                transaction_id: "".to_string(),
+                offset: 0,
+            }),
+        );
+        transaction.commit().await
+    }
+
+    #[tokio::test]
+    async fn transaction_rollback() -> Result<(), ClientError> {
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_end_transaction().return_once(|_| {
+            Box::pin(futures::future::ready(Ok(EndTransactionResponse {
+                status: Some(Status {
+                    code: Code::Ok as i32,
+                    message: "".to_string(),
+                }),
+            })))
+        });
+        let transaction = TransactionImpl::new(
+            Box::new(mock),
+            Resource {
+                resource_namespace: "".to_string(),
+                name: "".to_string(),
+            },
+            SendReceipt::from_pb_send_result(&SendResultEntry {
+                status: None,
+                message_id: "".to_string(),
+                transaction_id: "".to_string(),
+                offset: 0,
+            }),
+        );
+        transaction.rollback().await
+    }
+}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index be16c2c..fd82cdc 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -27,7 +27,8 @@
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, SendReceipt};
 use crate::model::message;
-use crate::pb::{Encoding, Resource, SystemProperties};
+use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl};
+use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
 use crate::util::{
     build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
     select_message_queue_by_message_group, HOST_NAME,
@@ -72,14 +73,42 @@
         })
     }
 
+    /// Create a new transaction producer instance
+    ///
+    /// # Arguments
+    ///
+    /// * `option` - producer option
+    /// * `client_option` - client option
+    /// * `transaction_checker` - A closure to check the state of transaction.
+    pub fn new_transaction_producer(
+        option: ProducerOption,
+        client_option: ClientOption,
+        transaction_checker: Box<TransactionChecker>,
+    ) -> Result<Self, ClientError> {
+        let client_option = ClientOption {
+            client_type: ClientType::Producer,
+            namespace: option.namespace().to_string(),
+            ..client_option
+        };
+        let logger = log::logger(option.logging_format());
+        let settings = build_producer_settings(&option, &client_option);
+        let mut client = Client::new(&logger, client_option, settings)?;
+        client.set_transaction_checker(transaction_checker);
+        Ok(Producer {
+            option,
+            logger,
+            client,
+        })
+    }
+
     /// Start the producer
-    pub async fn start(&self) -> Result<(), ClientError> {
+    pub async fn start(&mut self) -> Result<(), ClientError> {
+        self.client.start().await;
         if let Some(topics) = self.option.topics() {
             for topic in topics {
                 self.client.topic_route(topic, true).await?;
             }
         }
-        self.client.start();
         info!(
             self.logger,
             "start producer success, client_id: {}",
@@ -125,7 +154,7 @@
                 last_topic = Some(message.take_topic());
             }
 
-            let message_group = message.take_message_group();
+            let mut message_group = message.take_message_group();
             if let Some(last_message_group) = last_message_group.clone() {
                 if last_message_group.ne(&message_group) {
                     return Err(ClientError::new(
@@ -138,10 +167,24 @@
                 last_message_group = Some(message_group.clone());
             }
 
-            let delivery_timestamp = message
+            let mut delivery_timestamp = message
                 .take_delivery_timestamp()
                 .map(|seconds| Timestamp { seconds, nanos: 0 });
 
+            let message_type = if message.transaction_enabled() {
+                message_group = None;
+                delivery_timestamp = None;
+                MessageType::Transaction as i32
+            } else if delivery_timestamp.is_some() {
+                message_group = None;
+                MessageType::Delay as i32
+            } else if message_group.is_some() {
+                delivery_timestamp = None;
+                MessageType::Fifo as i32
+            } else {
+                MessageType::Normal as i32
+            };
+
             let pb_message = pb::Message {
                 topic: Some(Resource {
                     name: message.take_topic(),
@@ -154,6 +197,7 @@
                     message_id: message.take_message_id(),
                     message_group,
                     delivery_timestamp,
+                    message_type,
                     born_host: HOST_NAME.clone(),
                     born_timestamp: born_timestamp.clone(),
                     body_digest: None,
@@ -182,11 +226,8 @@
     /// # Arguments
     ///
     /// * `message` - the message to send
-    pub async fn send_one(
-        &self,
-        message: impl message::Message,
-    ) -> Result<SendReceipt, ClientError> {
-        let results = self.send(vec![message]).await?;
+    pub async fn send(&self, message: impl message::Message) -> Result<SendReceipt, ClientError> {
+        let results = self.batch_send(vec![message]).await?;
         Ok(results[0].clone())
     }
 
@@ -195,7 +236,7 @@
     /// # Arguments
     ///
     /// * `messages` - A vector that holds the messages to send
-    pub async fn send(
+    pub async fn batch_send(
         &self,
         messages: Vec<impl message::Message>,
     ) -> Result<Vec<SendReceipt>, ClientError> {
@@ -218,6 +259,23 @@
 
         self.client.send_message(&endpoints, pb_messages).await
     }
+
+    /// Send message in a transaction
+    pub async fn send_transaction_message(
+        &self,
+        mut message: impl message::Message,
+    ) -> Result<impl Transaction, ClientError> {
+        let topic = message.take_topic();
+        let receipt = self.send(message).await?;
+        Ok(TransactionImpl::new(
+            Box::new(self.client.get_session().await.unwrap()),
+            Resource {
+                resource_namespace: self.option.namespace().to_string(),
+                name: topic,
+            },
+            receipt,
+        ))
+    }
 }
 
 #[cfg(test)]
@@ -228,7 +286,9 @@
     use crate::log::terminal_logger;
     use crate::model::common::Route;
     use crate::model::message::{MessageBuilder, MessageImpl};
+    use crate::model::transaction::TransactionResolution;
     use crate::pb::{Broker, MessageQueue};
+    use crate::session::Session;
 
     use super::*;
 
@@ -268,6 +328,38 @@
     }
 
     #[tokio::test]
+    async fn transaction_producer_start() -> Result<(), ClientError> {
+        let _m = crate::client::tests::MTX.lock();
+
+        let ctx = Client::new_context();
+        ctx.expect().return_once(|_, _, _| {
+            let mut client = Client::default();
+            client.expect_topic_route().returning(|_, _| {
+                Ok(Arc::new(Route {
+                    index: Default::default(),
+                    queue: vec![],
+                }))
+            });
+            client.expect_start().returning(|| ());
+            client.expect_set_transaction_checker().returning(|_| ());
+            client
+                .expect_client_id()
+                .return_const("fake_id".to_string());
+            Ok(client)
+        });
+        let mut producer_option = ProducerOption::default();
+        producer_option.set_topics(vec!["DefaultCluster".to_string()]);
+        Producer::new_transaction_producer(
+            producer_option,
+            ClientOption::default(),
+            Box::new(|_, _| TransactionResolution::COMMIT),
+        )?
+        .start()
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
     async fn producer_transform_messages_to_protobuf() {
         let producer = new_producer_for_test();
         let messages = vec![MessageBuilder::builder()
@@ -318,6 +410,7 @@
             properties: None,
             message_group: None,
             delivery_timestamp: None,
+            transaction_enabled: false,
         }];
         let result = producer.transform_messages_to_protobuf(messages);
         assert!(result.is_err());
@@ -365,7 +458,7 @@
     }
 
     #[tokio::test]
-    async fn producer_send_one() -> Result<(), ClientError> {
+    async fn producer_send() -> Result<(), ClientError> {
         let mut producer = new_producer_for_test();
         producer.client.expect_topic_route().returning(|_, _| {
             Ok(Arc::new(Route {
@@ -399,7 +492,7 @@
             )])
         });
         let result = producer
-            .send_one(
+            .send(
                 MessageBuilder::builder()
                     .set_topic("test_topic")
                     .set_body(vec![])
@@ -411,4 +504,55 @@
         assert_eq!(result.transaction_id(), "transaction_id");
         Ok(())
     }
+
+    #[tokio::test]
+    async fn producer_send_transaction_message() -> Result<(), ClientError> {
+        let mut producer = new_producer_for_test();
+        producer.client.expect_topic_route().returning(|_, _| {
+            Ok(Arc::new(Route {
+                index: Default::default(),
+                queue: vec![MessageQueue {
+                    topic: Some(Resource {
+                        name: "test_topic".to_string(),
+                        resource_namespace: "".to_string(),
+                    }),
+                    id: 0,
+                    permission: 0,
+                    broker: Some(Broker {
+                        name: "".to_string(),
+                        id: 0,
+                        endpoints: Some(pb::Endpoints {
+                            scheme: 0,
+                            addresses: vec![],
+                        }),
+                    }),
+                    accept_message_types: vec![],
+                }],
+            }))
+        });
+        producer.client.expect_send_message().returning(|_, _| {
+            Ok(vec![SendReceipt::from_pb_send_result(
+                &pb::SendResultEntry {
+                    message_id: "message_id".to_string(),
+                    transaction_id: "transaction_id".to_string(),
+                    ..pb::SendResultEntry::default()
+                },
+            )])
+        });
+        producer
+            .client
+            .expect_get_session()
+            .return_once(|| Ok(Session::mock()));
+
+        let _ = producer
+            .send_transaction_message(
+                MessageBuilder::builder()
+                    .set_topic("test_topic")
+                    .set_body(vec![])
+                    .build()
+                    .unwrap(),
+            )
+            .await?;
+        Ok(())
+    }
 }
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 229d9e5..5762d66 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -31,26 +31,29 @@
 use crate::conf::ClientOption;
 use crate::error::ErrorKind;
 use crate::model::common::Endpoints;
+use crate::pb::telemetry_command::Command;
 use crate::pb::{
-    AckMessageRequest, AckMessageResponse, HeartbeatRequest, HeartbeatResponse, QueryRouteRequest,
-    QueryRouteResponse, ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest,
-    SendMessageResponse, TelemetryCommand,
+    AckMessageRequest, AckMessageResponse, EndTransactionRequest, EndTransactionResponse,
+    HeartbeatRequest, HeartbeatResponse, QueryRouteRequest, QueryRouteResponse,
+    ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest, SendMessageResponse,
+    TelemetryCommand,
 };
 use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
 use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
 
+const OPERATION_START: &str = "session.start";
+const OPERATION_UPDATE_SETTINGS: &str = "session.update_settings";
+
+const OPERATION_QUERY_ROUTE: &str = "rpc.query_route";
+const OPERATION_HEARTBEAT: &str = "rpc.heartbeat";
+const OPERATION_SEND_MESSAGE: &str = "rpc.send_message";
+const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message";
+const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction";
+
 #[async_trait]
 #[automock]
 pub(crate) trait RPCClient {
-    const OPERATION_START: &'static str = "session.start";
-    const OPERATION_UPDATE_SETTINGS: &'static str = "session.update_settings";
-
-    const OPERATION_QUERY_ROUTE: &'static str = "rpc.query_route";
-    const OPERATION_HEARTBEAT: &'static str = "rpc.heartbeat";
-    const OPERATION_SEND_MESSAGE: &'static str = "rpc.send_message";
-    const OPERATION_RECEIVE_MESSAGE: &'static str = "rpc.receive_message";
-    const OPERATION_ACK_MESSAGE: &'static str = "rpc.ack_message";
-
     async fn query_route(
         &mut self,
         request: QueryRouteRequest,
@@ -71,6 +74,10 @@
         &mut self,
         request: AckMessageRequest,
     ) -> Result<AckMessageResponse, ClientError>;
+    async fn end_transaction(
+        &mut self,
+        request: EndTransactionRequest,
+    ) -> Result<EndTransactionResponse, ClientError>;
 }
 
 #[allow(dead_code)]
@@ -255,13 +262,17 @@
         }
     }
 
-    pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> {
+    pub(crate) async fn start(
+        &mut self,
+        settings: TelemetryCommand,
+        telemetry_command_tx: mpsc::Sender<Command>,
+    ) -> Result<(), ClientError> {
         let (tx, rx) = mpsc::channel(16);
         tx.send(settings).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ChannelSend,
                 "failed to send telemetry command",
-                Self::OPERATION_START,
+                OPERATION_START,
             )
             .set_source(e)
         })?;
@@ -272,7 +283,7 @@
             ClientError::new(
                 ErrorKind::ClientInternal,
                 "send rpc telemetry failed",
-                Self::OPERATION_START,
+                OPERATION_START,
             )
             .set_source(e)
         })?;
@@ -281,10 +292,12 @@
         tokio::spawn(async move {
             let mut stream = response.into_inner();
             loop {
-                // TODO handle server stream
                 match stream.message().await {
                     Ok(Some(item)) => {
-                        debug!(logger, "telemetry command: {:?}", item);
+                        debug!(logger, "receive telemetry command: {:?}", item);
+                        if let Some(command) = item.command {
+                            _ = telemetry_command_tx.send(command).await;
+                        }
                     }
                     Ok(None) => {
                         debug!(logger, "request stream closed");
@@ -315,8 +328,9 @@
             return Err(ClientError::new(
                 ErrorKind::ClientIsNotRunning,
                 "session is not started",
-                Self::OPERATION_UPDATE_SETTINGS,
-            ));
+                OPERATION_UPDATE_SETTINGS,
+            )
+            .with_context("url", self.endpoints.endpoint_url()));
         }
 
         if let Some(tx) = self.telemetry_tx.as_ref() {
@@ -324,7 +338,7 @@
                 ClientError::new(
                     ErrorKind::ChannelSend,
                     "failed to send telemetry command",
-                    Self::OPERATION_UPDATE_SETTINGS,
+                    OPERATION_UPDATE_SETTINGS,
                 )
                 .set_source(e)
             })?;
@@ -345,7 +359,7 @@
             ClientError::new(
                 ErrorKind::ClientInternal,
                 "send rpc query_route failed",
-                Self::OPERATION_QUERY_ROUTE,
+                OPERATION_QUERY_ROUTE,
             )
             .set_source(e)
         })?;
@@ -361,7 +375,7 @@
             ClientError::new(
                 ErrorKind::ClientInternal,
                 "send rpc heartbeat failed",
-                Self::OPERATION_HEARTBEAT,
+                OPERATION_HEARTBEAT,
             )
             .set_source(e)
         })?;
@@ -377,7 +391,7 @@
             ClientError::new(
                 ErrorKind::ClientInternal,
                 "send rpc send_message failed",
-                Self::OPERATION_SEND_MESSAGE,
+                OPERATION_SEND_MESSAGE,
             )
             .set_source(e)
         })?;
@@ -399,7 +413,7 @@
                 ClientError::new(
                     ErrorKind::ClientInternal,
                     "send rpc receive_message failed",
-                    Self::OPERATION_RECEIVE_MESSAGE,
+                    OPERATION_RECEIVE_MESSAGE,
                 )
                 .set_source(e)
             })?
@@ -411,7 +425,7 @@
                 ClientError::new(
                     ErrorKind::ClientInternal,
                     "receive message failed: error in reading stream",
-                    Self::OPERATION_RECEIVE_MESSAGE,
+                    OPERATION_RECEIVE_MESSAGE,
                 )
                 .set_source(e)
             })?;
@@ -429,7 +443,23 @@
             ClientError::new(
                 ErrorKind::ClientInternal,
                 "send rpc ack_message failed",
-                Self::OPERATION_ACK_MESSAGE,
+                OPERATION_ACK_MESSAGE,
+            )
+            .set_source(e)
+        })?;
+        Ok(response.into_inner())
+    }
+
+    async fn end_transaction(
+        &mut self,
+        request: EndTransactionRequest,
+    ) -> Result<EndTransactionResponse, ClientError> {
+        let request = self.sign(request);
+        let response = self.stub.end_transaction(request).await.map_err(|e| {
+            ClientError::new(
+                ErrorKind::ClientInternal,
+                "send rpc end_transaction failed",
+                OPERATION_END_TRANSACTION,
             )
             .set_source(e)
         })?;
@@ -462,6 +492,7 @@
         &self,
         endpoints: &Endpoints,
         settings: TelemetryCommand,
+        telemetry_command_tx: mpsc::Sender<Command>,
     ) -> Result<Session, ClientError> {
         let mut session_map = self.session_map.lock().await;
         let endpoint_url = endpoints.endpoint_url().to_string();
@@ -475,7 +506,7 @@
                 &self.option,
             )
             .await?;
-            session.start(settings).await?;
+            session.start(settings, telemetry_command_tx).await?;
             session_map.insert(endpoint_url.clone(), session.clone());
             Ok(session)
         };
@@ -562,11 +593,12 @@
 
         let mut session = session.unwrap();
 
+        let (tx, _) = mpsc::channel(16);
         let result = session
-            .start(build_producer_settings(
-                &ProducerOption::default(),
-                &ClientOption::default(),
-            ))
+            .start(
+                build_producer_settings(&ProducerOption::default(), &ClientOption::default()),
+                tx,
+            )
             .await;
         assert!(result.is_ok());
         assert!(session.is_started());
@@ -577,7 +609,6 @@
         let mut server = RocketMQMockServer::start_default().await;
         server.setup(
             MockBuilder::when()
-                //    👇 RPC prefix
                 .path("/apache.rocketmq.v2.MessagingService/Telemetry")
                 .then()
                 .return_status(Code::Ok),
@@ -588,10 +619,12 @@
         client_option.set_enable_tls(false);
         let session_manager =
             SessionManager::new(&logger, "test_client".to_string(), &client_option);
+        let (tx, _) = mpsc::channel(16);
         let session = session_manager
             .get_or_create_session(
                 &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
                 build_producer_settings(&ProducerOption::default(), &client_option),
+                tx,
             )
             .await
             .unwrap();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index 28e9d66..48e0ce9 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -82,7 +82,7 @@
     }
 
     /// Start the simple consumer
-    pub async fn start(&self) -> Result<(), ClientError> {
+    pub async fn start(&mut self) -> Result<(), ClientError> {
         if self.option.consumer_group().is_empty() {
             return Err(ClientError::new(
                 ErrorKind::Config,
@@ -90,12 +90,12 @@
                 Self::OPERATION_START_SIMPLE_CONSUMER,
             ));
         }
+        self.client.start().await;
         if let Some(topics) = self.option.topics() {
             for topic in topics {
                 self.client.topic_route(topic, true).await?;
             }
         }
-        self.client.start();
         info!(
             self.logger,
             "start simple consumer success, client_id: {}",
@@ -175,9 +175,9 @@
 
 #[cfg(test)]
 mod tests {
-    use crate::log::terminal_logger;
     use std::sync::Arc;
 
+    use crate::log::terminal_logger;
     use crate::model::common::{FilterType, Route};
     use crate::pb::{
         AckMessageResultEntry, Broker, Message, MessageQueue, Resource, SystemProperties,