[ISSUE #563] Optimize example and doc for Rust SDK (#564)

* feat(rust): optimize for example and doc

Signed-off-by: SSpirits <admin@lv5.moe>

* feat(rust): optimize readme

Signed-off-by: SSpirits <admin@lv5.moe>

* fix(rust): fix unit test

Signed-off-by: SSpirits <admin@lv5.moe>

---------

Signed-off-by: SSpirits <admin@lv5.moe>
diff --git a/rust/README.md b/rust/README.md
index 8e5c1b7..dc80d13 100644
--- a/rust/README.md
+++ b/rust/README.md
@@ -42,5 +42,5 @@
 [codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients
 [crates-image]: https://img.shields.io/crates/v/rocketmq.svg
 [crates-url]: https://crates.io/crates/rocketmq
-[rust-doc-image]: https://img.shields.io/crates/v/rocketmq.svg
+[rust-doc-image]: https://img.shields.io/docsrs/rocketmq
 [rust-doc-url]: https://docs.rs/rocketmq
diff --git a/rust/examples/transaction_producer.rs b/rust/examples/transaction_producer.rs
index 07516eb..7423cc9 100644
--- a/rust/examples/transaction_producer.rs
+++ b/rust/examples/transaction_producer.rs
@@ -14,11 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use std::collections::HashSet;
+use std::sync::Mutex;
+
 use rocketmq::conf::{ClientOption, ProducerOption};
 use rocketmq::model::message::MessageBuilder;
 use rocketmq::model::transaction::{Transaction, TransactionResolution};
 use rocketmq::Producer;
 
+lazy_static::lazy_static! {
+    static  ref MESSAGE_ID_SET: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
+}
+
 #[tokio::main]
 async fn main() {
     // recommend to specify which topic(s) you would like to send message to
@@ -30,16 +37,28 @@
     let mut client_option = ClientOption::default();
     client_option.set_access_url("localhost:8081");
 
-    // build and start producer
+    // build and start transaction producer, which has TransactionChecker
     let mut producer = Producer::new_transaction_producer(
         producer_option,
         client_option,
         Box::new(|transaction_id, message| {
-            println!(
-                "receive transaction check request: transaction_id: {}, message: {:?}",
-                transaction_id, message
-            );
-            TransactionResolution::COMMIT
+            if MESSAGE_ID_SET
+                .lock()
+                .unwrap()
+                .contains(message.message_id())
+            {
+                println!(
+                    "commit transaction: transaction_id: {}, message_id: {}",
+                    transaction_id, message.message_id()
+                );
+                TransactionResolution::COMMIT
+            } else {
+                println!(
+                    "rollback transaction due to unknown message: transaction_id: {}, message_id: {}",
+                    transaction_id, message.message_id()
+                );
+                TransactionResolution::ROLLBACK
+            }
         }),
     )
     .unwrap();
@@ -65,6 +84,14 @@
         transaction.message_id(),
         transaction.transaction_id()
     );
+
+    MESSAGE_ID_SET
+        .lock()
+        .unwrap()
+        .insert(transaction.message_id().to_string());
+
+    // commit transaction manually
+    // delete following two lines so that RocketMQ server will check transaction status periodically
     let result = transaction.commit().await;
     debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
 }
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 27cfb4d..57bddb8 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -31,7 +31,7 @@
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
 use crate::model::message::{AckMessageEntry, MessageView};
-use crate::model::transaction::TransactionChecker;
+use crate::model::transaction::{TransactionChecker, TransactionResolution};
 use crate::pb;
 use crate::pb::receive_message_response::Content;
 use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
@@ -109,6 +109,10 @@
         self.telemetry_command_tx.is_some()
     }
 
+    pub(crate) fn has_transaction_checker(&self) -> bool {
+        self.transaction_checker.is_some()
+    }
+
     pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
         if self.is_started() {
             panic!("client {} is started, can not be modified", self.id)
@@ -116,7 +120,7 @@
         self.transaction_checker = Some(transaction_checker);
     }
 
-    pub(crate) async fn start(&mut self) {
+    pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
         let logger = self.logger.clone();
         let session_manager = self.session_manager.clone();
 
@@ -127,9 +131,13 @@
         // send heartbeat and handle telemetry command
         let (tx, mut rx) = mpsc::channel(16);
         self.telemetry_command_tx = Some(tx);
-        let rpc_client = self.get_session().await.unwrap();
+        let rpc_client = self.get_session().await?;
         let endpoints = self.access_endpoints.clone();
         let transaction_checker = self.transaction_checker.take();
+        // give a placeholder
+        if transaction_checker.is_some() {
+            self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
+        }
         tokio::spawn(async move {
             rpc_client.is_started();
             let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
@@ -181,6 +189,7 @@
                 }
             }
         });
+        Ok(())
     }
 
     async fn handle_telemetry_command<T: RPCClient + 'static>(
@@ -690,7 +699,7 @@
             .returning(|_, _, _| Ok(Session::mock()));
 
         let mut client = new_client_with_session_manager(session_manager);
-        client.start().await;
+        client.start().await?;
 
         // TODO use countdown latch instead sleeping
         // wait for run
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
index 62a4c9a..2f74679 100644
--- a/rust/src/model/transaction.rs
+++ b/rust/src/model/transaction.rs
@@ -125,12 +125,18 @@
     COMMIT = 1,
     /// Notify server that current transaction should be roll-backed.
     ROLLBACK = 2,
-    /// Notify the server that the state of this transaction is not sure. You should be cautious before return unknown
+    /// Notify server that the state of this transaction is not sure. You should be cautious before return unknown
     /// because the examination from the server will be performed periodically.
     UNKNOWN = 0,
 }
 
 /// A closure to check the state of transaction.
+/// RocketMQ Server will call producer periodically to check the state of uncommitted transaction.
+///
+/// # Arguments
+///
+/// * transaction id
+/// * message
 pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync;
 
 #[cfg(test)]
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index fd82cdc..d5e768c 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -50,6 +50,7 @@
 
 impl Producer {
     const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
+    const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message";
 
     /// Create a new producer instance
     ///
@@ -79,7 +80,7 @@
     ///
     /// * `option` - producer option
     /// * `client_option` - client option
-    /// * `transaction_checker` - A closure to check the state of transaction.
+    /// * `transaction_checker` - handle server query for uncommitted transaction status
     pub fn new_transaction_producer(
         option: ProducerOption,
         client_option: ClientOption,
@@ -103,7 +104,7 @@
 
     /// Start the producer
     pub async fn start(&mut self) -> Result<(), ClientError> {
-        self.client.start().await;
+        self.client.start().await?;
         if let Some(topics) = self.option.topics() {
             for topic in topics {
                 self.client.topic_route(topic, true).await?;
@@ -265,6 +266,13 @@
         &self,
         mut message: impl message::Message,
     ) -> Result<impl Transaction, ClientError> {
+        if !self.client.has_transaction_checker() {
+            return Err(ClientError::new(
+                ErrorKind::InvalidMessage,
+                "this producer can not send transaction message, please create a transaction producer using producer::new_transaction_producer",
+                Self::OPERATION_SEND_TRANSACTION_MESSAGE,
+            ));
+        }
         let topic = message.take_topic();
         let receipt = self.send(message).await?;
         Ok(TransactionImpl::new(
@@ -313,7 +321,7 @@
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| ());
+            client.expect_start().returning(|| Ok(()));
             client
                 .expect_client_id()
                 .return_const("fake_id".to_string());
@@ -340,7 +348,7 @@
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| ());
+            client.expect_start().returning(|| Ok(()));
             client.expect_set_transaction_checker().returning(|_| ());
             client
                 .expect_client_id()
@@ -543,6 +551,10 @@
             .client
             .expect_get_session()
             .return_once(|| Ok(Session::mock()));
+        producer
+            .client
+            .expect_has_transaction_checker()
+            .return_once(|| true);
 
         let _ = producer
             .send_transaction_message(
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index 48e0ce9..d4e222e 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -90,7 +90,7 @@
                 Self::OPERATION_START_SIMPLE_CONSUMER,
             ));
         }
-        self.client.start().await;
+        self.client.start().await?;
         if let Some(topics) = self.option.topics() {
             for topic in topics {
                 self.client.topic_route(topic, true).await?;
@@ -198,7 +198,7 @@
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| ());
+            client.expect_start().returning(|| Ok(()));
             client
                 .expect_client_id()
                 .return_const("fake_id".to_string());