Merge pull request #674 from apache/java_release_5.0.6

Java release 5.0.6
diff --git a/.asf.yaml b/.asf.yaml
index 4f5661e..1401283 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -33,7 +33,7 @@
     # Enable squash button
     squash: true
     # Disable merge button
-    merge: false
+    merge: true
     # Disable rebase button
     rebase: false
   protected_branches:
diff --git a/golang/client.go b/golang/client.go
index 45e4b54..71c4819 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -24,7 +24,6 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
-	"reflect"
 	"sync"
 	"time"
 
@@ -32,6 +31,7 @@
 	"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
 	"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
 	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"go.uber.org/atomic"
 	"go.uber.org/zap"
@@ -504,27 +504,40 @@
 	f := func() {
 		cli.router.Range(func(k, v interface{}) bool {
 			topic := k.(string)
-			oldRoute := v
 			newRoute, err := cli.queryRoute(context.TODO(), topic, cli.opts.timeout)
 			if err != nil {
 				cli.log.Errorf("scheduled queryRoute err=%v", err)
 			}
-			if newRoute == nil && oldRoute != nil {
+			if newRoute == nil && v != nil {
 				cli.log.Info("newRoute is nil, but oldRoute is not. do not update")
 				return true
 			}
-			if !reflect.DeepEqual(newRoute, oldRoute) {
+			var oldRoute []*v2.MessageQueue
+			if v != nil {
+				oldRoute = v.([]*v2.MessageQueue)
+			}
+			if !routeEqual(oldRoute, newRoute) {
 				cli.router.Store(k, newRoute)
 				switch impl := cli.clientImpl.(type) {
 				case *defaultProducer:
-					plb, err := NewPublishingLoadBalancer(newRoute)
-					if err == nil {
-						impl.publishingRouteDataResultCache.Store(topic, plb)
+					existing, ok := impl.publishingRouteDataResultCache.Load(topic)
+					if !ok {
+						plb, err := NewPublishingLoadBalancer(newRoute)
+						if err == nil {
+							impl.publishingRouteDataResultCache.Store(topic, plb)
+						}
+					} else {
+						impl.publishingRouteDataResultCache.Store(topic, existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
 					}
 				case *defaultSimpleConsumer:
-					slb, err := NewSubscriptionLoadBalancer(newRoute)
-					if err == nil {
-						impl.subTopicRouteDataResultCache.Store(topic, slb)
+					existing, ok := impl.subTopicRouteDataResultCache.Load(topic)
+					if !ok {
+						slb, err := NewSubscriptionLoadBalancer(newRoute)
+						if err == nil {
+							impl.subTopicRouteDataResultCache.Store(topic, slb)
+						}
+					} else {
+						impl.subTopicRouteDataResultCache.Store(topic, existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
 					}
 				}
 			}
@@ -534,6 +547,19 @@
 	ticker.Tick(f, time.Second*30, cli.done)
 	return nil
 }
+
+func routeEqual(old, new []*v2.MessageQueue) bool {
+	if len(old) != len(new) {
+		return false
+	}
+	for i := 0; i < len(old); i++ {
+		if !proto.Equal(old[i], new[i]) {
+			return false
+		}
+	}
+	return true
+}
+
 func (cli *defaultClient) notifyClientTermination() {
 	cli.log.Info("start notifyClientTermination")
 	ctx := cli.Sign(context.Background())
diff --git a/golang/client_test.go b/golang/client_test.go
index b46d2d7..4549bdf 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,6 +20,7 @@
 import (
 	"context"
 	"fmt"
+	"reflect"
 	"testing"
 	"time"
 
@@ -293,3 +294,48 @@
 	assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
 	assert.Equal(t, "Failed to recover, err=EOF", commandExecutionLog[1].Message)
 }
+
+func Test_routeEqual(t *testing.T) {
+	oldMq := &v2.MessageQueue{
+		Topic: &v2.Resource{
+			Name: "topic-test",
+		},
+		Id:         0,
+		Permission: v2.Permission_READ_WRITE,
+		Broker: &v2.Broker{
+			Name:      "broker-test",
+			Id:        0,
+			Endpoints: fakeEndpoints(),
+		},
+		AcceptMessageTypes: []v2.MessageType{
+			v2.MessageType_NORMAL,
+		},
+	}
+	newMq := &v2.MessageQueue{
+		Topic: &v2.Resource{
+			Name: "topic-test",
+		},
+		Id:         0,
+		Permission: v2.Permission_READ_WRITE,
+		Broker: &v2.Broker{
+			Name:      "broker-test",
+			Id:        0,
+			Endpoints: fakeEndpoints(),
+		},
+		AcceptMessageTypes: []v2.MessageType{
+			v2.MessageType_NORMAL,
+		},
+	}
+
+	newMq.ProtoReflect() // message internal field value will be changed
+
+	oldRoute := []*v2.MessageQueue{oldMq}
+	newRoute := []*v2.MessageQueue{newMq}
+
+	assert.Equal(t, false, reflect.DeepEqual(oldRoute, newRoute))
+	assert.Equal(t, true, routeEqual(oldRoute, newRoute))
+	assert.Equal(t, true, routeEqual(nil, nil))
+	assert.Equal(t, false, routeEqual(nil, newRoute))
+	assert.Equal(t, false, routeEqual(oldRoute, nil))
+	assert.Equal(t, true, routeEqual(nil, []*v2.MessageQueue{}))
+}
diff --git a/golang/loadBalancer.go b/golang/loadBalancer.go
index db9cbd4..da2dd64 100644
--- a/golang/loadBalancer.go
+++ b/golang/loadBalancer.go
@@ -31,6 +31,7 @@
 type PublishingLoadBalancer interface {
 	TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error)
 	TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error)
+	CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
 }
 
 type publishingLoadBalancer struct {
@@ -119,8 +120,16 @@
 	return candidates, nil
 }
 
+func (plb *publishingLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) PublishingLoadBalancer {
+	return &publishingLoadBalancer{
+		messageQueues: messageQueues,
+		index:         plb.index,
+	}
+}
+
 type SubscriptionLoadBalancer interface {
 	TakeMessageQueue() (*v2.MessageQueue, error)
+	CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
 }
 
 type subscriptionLoadBalancer struct {
@@ -147,3 +156,10 @@
 	selectMessageQueue := slb.messageQueues[idx]
 	return selectMessageQueue, nil
 }
+
+func (slb *subscriptionLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) SubscriptionLoadBalancer {
+	return &subscriptionLoadBalancer{
+		messageQueues: messageQueues,
+		index:         slb.index,
+	}
+}
diff --git a/nodejs/src/client/BaseClient.ts b/nodejs/src/client/BaseClient.ts
index ad67e08..9fa12da 100644
--- a/nodejs/src/client/BaseClient.ts
+++ b/nodejs/src/client/BaseClient.ts
@@ -137,7 +137,7 @@
     // heartbeat every 10s
     this.#timers.push(setInterval(async () => {
       this.#doHeartbeat();
-    }, 5 * 60000));
+    }, 10000));
 
     // doStats every 60s
     // doStats()
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index da47ed2..bc88827 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -58,7 +58,7 @@
 byteorder = "1"
 mac_address = "1.1.4"
 hex = "0.4.3"
-time = "0.3"
+time = { version = "0.3", features = ["local-offset"] }
 once_cell = "1.18.0"
 
 mockall = "0.11.4"
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 91bd369..884f98b 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -31,16 +31,14 @@
 use crate::conf::ClientOption;
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
-use crate::model::message::{AckMessageEntry, MessageView};
-use crate::model::transaction::{TransactionChecker, TransactionResolution};
+use crate::model::message::AckMessageEntry;
 use crate::pb;
 use crate::pb::receive_message_response::Content;
-use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings};
 use crate::pb::{
     AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, Code,
-    EndTransactionRequest, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message,
-    MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest,
-    Resource, SendMessageRequest, Status, TelemetryCommand, TransactionSource,
+    FilterExpression, HeartbeatRequest, HeartbeatResponse, Message, MessageQueue,
+    NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest, Resource,
+    SendMessageRequest, Status, TelemetryCommand,
 };
 #[double]
 use crate::session::SessionManager;
@@ -54,7 +52,6 @@
     id: String,
     access_endpoints: Endpoints,
     settings: TelemetryCommand,
-    transaction_checker: Option<Box<TransactionChecker>>,
     telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
     shutdown_tx: Option<oneshot::Sender<()>>,
 }
@@ -70,8 +67,6 @@
 const OPERATION_SEND_MESSAGE: &str = "client.send_message";
 const OPERATION_RECEIVE_MESSAGE: &str = "client.receive_message";
 const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
-const OPERATION_END_TRANSACTION: &str = "client.end_transaction";
-const OPERATION_HANDLE_TELEMETRY_COMMAND: &str = "client.handle_telemetry_command";
 
 impl Debug for Client {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -102,28 +97,23 @@
             id,
             access_endpoints: endpoints,
             settings,
-            transaction_checker: None,
             telemetry_command_tx: None,
             shutdown_tx: None,
         })
     }
 
+    pub(crate) fn get_endpoints(&self) -> Endpoints {
+        self.access_endpoints.clone()
+    }
+
     pub(crate) fn is_started(&self) -> bool {
         self.shutdown_tx.is_some()
     }
 
-    pub(crate) fn has_transaction_checker(&self) -> bool {
-        self.transaction_checker.is_some()
-    }
-
-    pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
-        if self.is_started() {
-            panic!("client {} is started, can not be modified", self.id)
-        }
-        self.transaction_checker = Some(transaction_checker);
-    }
-
-    pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
+    pub(crate) async fn start(
+        &mut self,
+        telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command>,
+    ) -> Result<(), ClientError> {
         let logger = self.logger.clone();
         let session_manager = self.session_manager.clone();
 
@@ -134,19 +124,12 @@
         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
         self.shutdown_tx = Some(shutdown_tx);
 
-        // send heartbeat and handle telemetry command
-        let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
         self.telemetry_command_tx = Some(telemetry_command_tx);
+
         let rpc_client = self
             .get_session()
             .await
             .map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;
-        let endpoints = self.access_endpoints.clone();
-        let transaction_checker = self.transaction_checker.take();
-        // give a placeholder
-        if transaction_checker.is_some() {
-            self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
-        }
 
         tokio::spawn(async move {
             rpc_client.is_started();
@@ -188,24 +171,13 @@
                             debug!(logger,"send heartbeat to server success, peer={}",peer);
                         }
                     },
-                    command = telemetry_command_rx.recv() => {
-                        if let Some(command) = command {
-                            let result = Self::handle_telemetry_command(rpc_client.shadow_session(), &transaction_checker, endpoints.clone(), command).await;
-                            if let Err(error) = result {
-                                error!(logger, "handle telemetry command failed: {:?}", error);
-                            }
-                        }
-                    },
                     _ = &mut shutdown_rx => {
-                        info!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler");
+                        info!(logger, "receive shutdown signal, stop heartbeat task.");
                         break;
                     }
                 }
             }
-            info!(
-                logger,
-                "heartbeat task and telemetry command handler are stopped"
-            );
+            info!(logger, "heartbeat task is stopped");
         });
         Ok(())
     }
@@ -239,58 +211,6 @@
         Ok(())
     }
 
-    async fn handle_telemetry_command<T: RPCClient + 'static>(
-        mut rpc_client: T,
-        transaction_checker: &Option<Box<TransactionChecker>>,
-        endpoints: Endpoints,
-        command: pb::telemetry_command::Command,
-    ) -> Result<(), ClientError> {
-        return match command {
-            RecoverOrphanedTransactionCommand(command) => {
-                let transaction_id = command.transaction_id;
-                let message = command.message.unwrap();
-                let message_id = message
-                    .system_properties
-                    .as_ref()
-                    .unwrap()
-                    .message_id
-                    .clone();
-                let topic = message.topic.as_ref().unwrap().clone();
-                if let Some(transaction_checker) = transaction_checker {
-                    let resolution = transaction_checker(
-                        transaction_id.clone(),
-                        MessageView::from_pb_message(message, endpoints),
-                    );
-
-                    let response = rpc_client
-                        .end_transaction(EndTransactionRequest {
-                            topic: Some(topic),
-                            message_id: message_id.to_string(),
-                            transaction_id,
-                            resolution: resolution as i32,
-                            source: TransactionSource::SourceServerCheck as i32,
-                            trace_context: "".to_string(),
-                        })
-                        .await?;
-                    Self::handle_response_status(response.status, OPERATION_END_TRANSACTION)
-                } else {
-                    Err(ClientError::new(
-                        ErrorKind::Config,
-                        "failed to get transaction checker",
-                        OPERATION_END_TRANSACTION,
-                    ))
-                }
-            }
-            Settings(_) => Ok(()),
-            _ => Err(ClientError::new(
-                ErrorKind::Config,
-                "receive telemetry command but there is no handler",
-                OPERATION_HANDLE_TELEMETRY_COMMAND,
-            )
-            .with_context("command", format!("{:?}", command))),
-        };
-    }
-
     pub(crate) fn client_id(&self) -> &str {
         &self.id
     }
@@ -704,13 +624,11 @@
     use crate::error::{ClientError, ErrorKind};
     use crate::log::terminal_logger;
     use crate::model::common::{ClientType, Route};
-    use crate::model::transaction::TransactionResolution;
     use crate::pb::receive_message_response::Content;
     use crate::pb::{
         AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, Code,
-        EndTransactionResponse, FilterExpression, HeartbeatResponse, Message, MessageQueue,
-        QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status,
-        SystemProperties, TelemetryCommand,
+        FilterExpression, HeartbeatResponse, Message, MessageQueue, QueryRouteResponse,
+        ReceiveMessageResponse, Resource, SendMessageResponse, Status, TelemetryCommand,
     };
     use crate::session;
 
@@ -731,7 +649,6 @@
             id: Client::generate_client_id(),
             access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
             settings: TelemetryCommand::default(),
-            transaction_checker: None,
             telemetry_command_tx: None,
             shutdown_tx: None,
         }
@@ -747,7 +664,6 @@
             id: Client::generate_client_id(),
             access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
             settings: TelemetryCommand::default(),
-            transaction_checker: None,
             telemetry_command_tx: Some(tx),
             shutdown_tx: None,
         }
@@ -784,7 +700,8 @@
             .returning(|_, _, _| Ok(Session::mock()));
 
         let mut client = new_client_with_session_manager(session_manager);
-        client.start().await?;
+        let (tx, _) = mpsc::channel(16);
+        client.start(tx).await?;
 
         // TODO use countdown latch instead sleeping
         // wait for run
@@ -800,7 +717,8 @@
             .returning(|_, _, _| Ok(Session::mock()));
 
         let mut client = new_client_with_session_manager(session_manager);
-        let _ = client.start().await;
+        let (tx, _rx) = mpsc::channel(16);
+        let _ = client.start(tx).await;
         let result = client.get_session().await;
         assert!(result.is_ok());
         let result = client
@@ -1134,33 +1052,4 @@
         assert_eq!(error.message, "server return an error");
         assert_eq!(error.operation, "client.ack_message");
     }
-
-    #[tokio::test]
-    async fn client_handle_telemetry_command() {
-        let response = Ok(EndTransactionResponse {
-            status: Some(Status {
-                code: Code::Ok as i32,
-                message: "".to_string(),
-            }),
-        });
-        let mut mock = session::MockRPCClient::new();
-        mock.expect_end_transaction()
-            .return_once(|_| Box::pin(futures::future::ready(response)));
-        let result = Client::handle_telemetry_command(
-            mock,
-            &Some(Box::new(|_, _| TransactionResolution::COMMIT)),
-            Endpoints::from_url("localhost:8081").unwrap(),
-            RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand {
-                message: Some(Message {
-                    topic: Some(Resource::default()),
-                    user_properties: Default::default(),
-                    system_properties: Some(SystemProperties::default()),
-                    body: vec![],
-                }),
-                transaction_id: "".to_string(),
-            }),
-        )
-        .await;
-        assert!(result.is_ok())
-    }
 }
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 59d1eeb..5210842 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -33,6 +33,9 @@
     #[error("Message is invalid")]
     InvalidMessage,
 
+    #[error("Message type not match with topic accept message type")]
+    MessageTypeNotMatch,
+
     #[error("Server error")]
     Server,
 
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 28c232e..1defcd8 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -21,9 +21,18 @@
 
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::Endpoints;
+use crate::model::message::MessageType::{DELAY, FIFO, NORMAL, TRANSACTION};
 use crate::model::message_id::UNIQ_ID_GENERATOR;
 use crate::pb;
 
+#[derive(Clone, Copy, Debug)]
+pub enum MessageType {
+    NORMAL = 1,
+    FIFO = 2,
+    DELAY = 3,
+    TRANSACTION = 4,
+}
+
 /// [`Message`] is the data model for sending.
 pub trait Message {
     fn take_message_id(&mut self) -> String;
@@ -35,6 +44,17 @@
     fn take_message_group(&mut self) -> Option<String>;
     fn take_delivery_timestamp(&mut self) -> Option<i64>;
     fn transaction_enabled(&mut self) -> bool;
+    fn get_message_type(&self) -> MessageType;
+}
+
+pub trait MessageTypeAware {
+    fn accept_type(&self, message_type: MessageType) -> bool;
+}
+
+impl MessageTypeAware for pb::MessageQueue {
+    fn accept_type(&self, message_type: MessageType) -> bool {
+        self.accept_message_types.contains(&(message_type as i32))
+    }
 }
 
 pub(crate) struct MessageImpl {
@@ -47,6 +67,7 @@
     pub(crate) message_group: Option<String>,
     pub(crate) delivery_timestamp: Option<i64>,
     pub(crate) transaction_enabled: bool,
+    pub(crate) message_type: MessageType,
 }
 
 impl Message for MessageImpl {
@@ -85,6 +106,10 @@
     fn transaction_enabled(&mut self) -> bool {
         self.transaction_enabled
     }
+
+    fn get_message_type(&self) -> MessageType {
+        self.message_type
+    }
 }
 
 /// [`MessageBuilder`] is the builder for [`Message`].
@@ -108,6 +133,7 @@
                 message_group: None,
                 delivery_timestamp: None,
                 transaction_enabled: false,
+                message_type: NORMAL,
             },
         }
     }
@@ -135,6 +161,7 @@
                 message_group: Some(message_group.into()),
                 delivery_timestamp: None,
                 transaction_enabled: false,
+                message_type: FIFO,
             },
         }
     }
@@ -162,6 +189,7 @@
                 message_group: None,
                 delivery_timestamp: Some(delay_time),
                 transaction_enabled: false,
+                message_type: DELAY,
             },
         }
     }
@@ -184,6 +212,7 @@
                 message_group: None,
                 delivery_timestamp: None,
                 transaction_enabled: true,
+                message_type: TRANSACTION,
             },
         }
     }
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 7e3f399..e456cbe 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -15,20 +15,30 @@
  * limitations under the License.
  */
 
+use std::fmt::Debug;
+use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
 use mockall_double::double;
 use prost_types::Timestamp;
-use slog::{info, Logger};
+use slog::{error, info, warn, Logger};
+use tokio::select;
+use tokio::sync::RwLock;
+use tokio::sync::{mpsc, oneshot};
 
 #[double]
 use crate::client::Client;
 use crate::conf::{ClientOption, ProducerOption};
 use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{ClientType, SendReceipt};
-use crate::model::message;
-use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl};
-use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
+use crate::model::common::{ClientType, Endpoints, SendReceipt};
+use crate::model::message::{self, MessageTypeAware, MessageView};
+use crate::model::transaction::{
+    Transaction, TransactionChecker, TransactionImpl, TransactionResolution,
+};
+use crate::pb::settings::PubSub;
+use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings};
+use crate::pb::{Encoding, EndTransactionRequest, Resource, SystemProperties, TransactionSource};
+use crate::session::RPCClient;
 use crate::util::{
     build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
     select_message_queue_by_message_group, HOST_NAME,
@@ -41,17 +51,27 @@
 /// Most of its methods take shared reference so that application developers may use it at will.
 ///
 /// [`Producer`] is `Send` and `Sync` by design, so that developers may get started easily.
-#[derive(Debug)]
 pub struct Producer {
-    option: ProducerOption,
+    option: Arc<RwLock<ProducerOption>>,
     logger: Logger,
     client: Client,
+    transaction_checker: Option<Box<TransactionChecker>>,
+    shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+impl Debug for Producer {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Producer")
+            .field("option", &self.option)
+            .field("client", &self.client)
+            .finish()
+    }
 }
 
 impl Producer {
     const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
     const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message";
-
+    const OPERATION_END_TRANSACTION: &'static str = "producer.end_transaction";
     /// Create a new producer instance
     ///
     /// # Arguments
@@ -67,10 +87,13 @@
         let logger = log::logger(option.logging_format());
         let settings = build_producer_settings(&option, &client_option);
         let client = Client::new(&logger, client_option, settings)?;
+        let option = Arc::new(RwLock::new(option));
         Ok(Producer {
             option,
             logger,
             client,
+            transaction_checker: None,
+            shutdown_tx: None,
         })
     }
 
@@ -93,23 +116,80 @@
         };
         let logger = log::logger(option.logging_format());
         let settings = build_producer_settings(&option, &client_option);
-        let mut client = Client::new(&logger, client_option, settings)?;
-        client.set_transaction_checker(transaction_checker);
+        let client = Client::new(&logger, client_option, settings)?;
+        let option = Arc::new(RwLock::new(option));
         Ok(Producer {
             option,
             logger,
             client,
+            transaction_checker: Some(transaction_checker),
+            shutdown_tx: None,
         })
     }
 
+    async fn get_resource_namespace(&self) -> String {
+        let option_guard = self.option.read();
+        let resource_namespace = option_guard.await.namespace().to_string();
+        resource_namespace
+    }
+
     /// Start the producer
     pub async fn start(&mut self) -> Result<(), ClientError> {
-        self.client.start().await?;
-        if let Some(topics) = self.option.topics() {
+        let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
+        let telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command> =
+            telemetry_command_tx;
+        self.client.start(telemetry_command_tx).await?;
+        let option_guard = self.option.read().await;
+        let topics = option_guard.topics();
+        if let Some(topics) = topics {
             for topic in topics {
                 self.client.topic_route(topic, true).await?;
             }
         }
+        drop(option_guard);
+        let transaction_checker = self.transaction_checker.take();
+        if transaction_checker.is_some() {
+            self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
+        }
+        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+        self.shutdown_tx = Some(shutdown_tx);
+        let rpc_client = self.client.get_session().await?;
+        let endpoints = self.client.get_endpoints();
+        let logger = self.logger.clone();
+        let producer_option = Arc::clone(&self.option);
+        tokio::spawn(async move {
+            loop {
+                select! {
+                    command = telemetry_command_rx.recv() => {
+                        if let Some(command) = command {
+                            match command {
+                                RecoverOrphanedTransactionCommand(command) => {
+                                    let result = Self::handle_recover_orphaned_transaction_command(
+                                            rpc_client.shadow_session(),
+                                            command,
+                                            &transaction_checker,
+                                            endpoints.clone()).await;
+                                    if let Err(error) = result {
+                                        error!(logger, "handle trannsaction command failed: {:?}", error);
+                                    };
+                                }
+                                Settings(command) => {
+                                    let option = &mut producer_option.write().await;
+                                    Self::handle_settings_command(command, option);
+                                    info!(logger, "handle setting command success.");
+                                }
+                                _ => {
+                                    warn!(logger, "unimplemented command {:?}", command);
+                                }
+                            }
+                        }
+                    }
+                    _ = &mut shutdown_rx => {
+                       break;
+                    }
+                }
+            }
+        });
         info!(
             self.logger,
             "start producer success, client_id: {}",
@@ -118,7 +198,70 @@
         Ok(())
     }
 
-    fn transform_messages_to_protobuf(
+    async fn handle_recover_orphaned_transaction_command<T: RPCClient + 'static>(
+        mut rpc_client: T,
+        command: pb::RecoverOrphanedTransactionCommand,
+        transaction_checker: &Option<Box<TransactionChecker>>,
+        endpoints: Endpoints,
+    ) -> Result<(), ClientError> {
+        let transaction_id = command.transaction_id;
+        let message = command.message.clone().ok_or_else(|| {
+            ClientError::new(
+                ErrorKind::InvalidMessage,
+                "no message in command",
+                Self::OPERATION_END_TRANSACTION,
+            )
+        })?;
+        let message_id = message
+            .system_properties
+            .as_ref()
+            .map(|props| props.message_id.clone())
+            .ok_or_else(|| {
+                ClientError::new(
+                    ErrorKind::InvalidMessage,
+                    "no message id exists",
+                    Self::OPERATION_END_TRANSACTION,
+                )
+            })?;
+        let topic = message.topic.clone().ok_or_else(|| {
+            ClientError::new(
+                ErrorKind::InvalidMessage,
+                "no topic exists in message",
+                Self::OPERATION_END_TRANSACTION,
+            )
+        })?;
+        if let Some(transaction_checker) = transaction_checker {
+            let resolution = transaction_checker(
+                transaction_id.clone(),
+                MessageView::from_pb_message(message, endpoints),
+            );
+            let response = rpc_client
+                .end_transaction(EndTransactionRequest {
+                    topic: Some(topic),
+                    message_id,
+                    transaction_id,
+                    resolution: resolution as i32,
+                    source: TransactionSource::SourceServerCheck as i32,
+                    trace_context: "".to_string(),
+                })
+                .await?;
+            Client::handle_response_status(response.status, Self::OPERATION_END_TRANSACTION)
+        } else {
+            Err(ClientError::new(
+                ErrorKind::Config,
+                "failed to get transaction checker",
+                Self::OPERATION_END_TRANSACTION,
+            ))
+        }
+    }
+
+    fn handle_settings_command(settings: pb::Settings, option: &mut ProducerOption) {
+        if let Some(PubSub::Publishing(publishing)) = settings.pub_sub {
+            option.set_validate_message_type(publishing.validate_message_type);
+        };
+    }
+
+    async fn transform_messages_to_protobuf(
         &self,
         messages: Vec<impl message::Message>,
     ) -> Result<(String, Option<String>, Vec<pb::Message>), ClientError> {
@@ -172,24 +315,20 @@
                 .take_delivery_timestamp()
                 .map(|seconds| Timestamp { seconds, nanos: 0 });
 
-            let message_type = if message.transaction_enabled() {
+            if message.transaction_enabled() {
                 message_group = None;
                 delivery_timestamp = None;
-                MessageType::Transaction as i32
             } else if delivery_timestamp.is_some() {
                 message_group = None;
-                MessageType::Delay as i32
             } else if message_group.is_some() {
                 delivery_timestamp = None;
-                MessageType::Fifo as i32
-            } else {
-                MessageType::Normal as i32
             };
 
+            // TODO: use a converter trait From or TryFrom
             let pb_message = pb::Message {
                 topic: Some(Resource {
                     name: message.take_topic(),
-                    resource_namespace: self.option.namespace().to_string(),
+                    resource_namespace: self.get_resource_namespace().await,
                 }),
                 user_properties: message.take_properties(),
                 system_properties: Some(SystemProperties {
@@ -198,7 +337,7 @@
                     message_id: message.take_message_id(),
                     message_group,
                     delivery_timestamp,
-                    message_type,
+                    message_type: message.get_message_type() as i32,
                     born_host: HOST_NAME.clone(),
                     born_timestamp: born_timestamp.clone(),
                     body_digest: None,
@@ -241,8 +380,13 @@
         &self,
         messages: Vec<impl message::Message>,
     ) -> Result<Vec<SendReceipt>, ClientError> {
+        let message_types = messages
+            .iter()
+            .map(|message| message.get_message_type())
+            .collect::<Vec<_>>();
+
         let (topic, message_group, mut pb_messages) =
-            self.transform_messages_to_protobuf(messages)?;
+            self.transform_messages_to_protobuf(messages).await?;
 
         let route = self.client.topic_route(&topic, true).await?;
 
@@ -252,6 +396,25 @@
             select_message_queue(route)
         };
 
+        let option_guard = self.option.read().await;
+        let validate_message_type = option_guard.validate_message_type();
+        drop(option_guard);
+        if validate_message_type {
+            for message_type in message_types {
+                if !message_queue.accept_type(message_type) {
+                    return Err(ClientError::new(
+                        ErrorKind::MessageTypeNotMatch,
+                        format!(
+                            "Current message type {:?} not match with accepted types {:?}.",
+                            message_type, message_queue.accept_message_types
+                        )
+                        .as_str(),
+                        Self::OPERATION_SEND_MESSAGE,
+                    ));
+                }
+            }
+        }
+
         let endpoints =
             build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?;
         for message in pb_messages.iter_mut() {
@@ -261,12 +424,16 @@
         self.client.send_message(&endpoints, pb_messages).await
     }
 
+    pub fn has_transaction_checker(&self) -> bool {
+        self.transaction_checker.is_some()
+    }
+
     /// Send message in a transaction
     pub async fn send_transaction_message(
         &self,
         mut message: impl message::Message,
     ) -> Result<impl Transaction, ClientError> {
-        if !self.client.has_transaction_checker() {
+        if !self.has_transaction_checker() {
             return Err(ClientError::new(
                 ErrorKind::InvalidMessage,
                 "this producer can not send transaction message, please create a transaction producer using producer::new_transaction_producer",
@@ -279,14 +446,17 @@
         Ok(TransactionImpl::new(
             Box::new(rpc_client),
             Resource {
-                resource_namespace: self.option.namespace().to_string(),
+                resource_namespace: self.get_resource_namespace().await,
                 name: topic,
             },
             receipt,
         ))
     }
 
-    pub async fn shutdown(self) -> Result<(), ClientError> {
+    pub async fn shutdown(mut self) -> Result<(), ClientError> {
+        if let Some(tx) = self.shutdown_tx.take() {
+            let _ = tx.send(());
+        }
         self.client.shutdown().await
     }
 }
@@ -295,13 +465,14 @@
 mod tests {
     use std::sync::Arc;
 
+    use crate::client::MockClient;
     use crate::error::ErrorKind;
     use crate::log::terminal_logger;
     use crate::model::common::Route;
-    use crate::model::message::{MessageBuilder, MessageImpl};
+    use crate::model::message::{MessageBuilder, MessageImpl, MessageType};
     use crate::model::transaction::TransactionResolution;
-    use crate::pb::{Broker, MessageQueue};
-    use crate::session::Session;
+    use crate::pb::{Broker, Code, EndTransactionResponse, MessageQueue, Status};
+    use crate::session::{self, Session};
 
     use super::*;
 
@@ -310,6 +481,18 @@
             option: Default::default(),
             logger: terminal_logger(),
             client: Client::default(),
+            shutdown_tx: None,
+            transaction_checker: None,
+        }
+    }
+
+    fn new_transaction_producer_for_test() -> Producer {
+        Producer {
+            option: Default::default(),
+            logger: terminal_logger(),
+            client: Client::default(),
+            shutdown_tx: None,
+            transaction_checker: Some(Box::new(|_, _| TransactionResolution::COMMIT)),
         }
     }
 
@@ -326,10 +509,16 @@
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| Ok(()));
+            client.expect_start().returning(|_| Ok(()));
             client
                 .expect_client_id()
                 .return_const("fake_id".to_string());
+            client
+                .expect_get_session()
+                .return_once(|| Ok(Session::mock()));
+            client
+                .expect_get_endpoints()
+                .return_once(|| Endpoints::from_url("foobar.com:8080").unwrap());
             Ok(client)
         });
         let mut producer_option = ProducerOption::default();
@@ -353,11 +542,16 @@
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| Ok(()));
-            client.expect_set_transaction_checker().returning(|_| ());
+            client.expect_start().returning(|_| Ok(()));
             client
                 .expect_client_id()
                 .return_const("fake_id".to_string());
+            client
+                .expect_get_session()
+                .return_once(|| Ok(Session::mock()));
+            client
+                .expect_get_endpoints()
+                .return_once(|| Endpoints::from_url("foobar.com:8080").unwrap());
             Ok(client)
         });
         let mut producer_option = ProducerOption::default();
@@ -384,7 +578,7 @@
             .set_message_group("message_group".to_string())
             .build()
             .unwrap()];
-        let result = producer.transform_messages_to_protobuf(messages);
+        let result = producer.transform_messages_to_protobuf(messages).await;
         assert!(result.is_ok());
 
         let (topic, message_group, pb_messages) = result.unwrap();
@@ -408,7 +602,7 @@
         let producer = new_producer_for_test();
 
         let messages: Vec<MessageImpl> = vec![];
-        let result = producer.transform_messages_to_protobuf(messages);
+        let result = producer.transform_messages_to_protobuf(messages).await;
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert_eq!(err.kind, ErrorKind::InvalidMessage);
@@ -424,8 +618,9 @@
             message_group: None,
             delivery_timestamp: None,
             transaction_enabled: false,
+            message_type: MessageType::TRANSACTION,
         }];
-        let result = producer.transform_messages_to_protobuf(messages);
+        let result = producer.transform_messages_to_protobuf(messages).await;
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert_eq!(err.kind, ErrorKind::InvalidMessage);
@@ -443,7 +638,7 @@
                 .build()
                 .unwrap(),
         ];
-        let result = producer.transform_messages_to_protobuf(messages);
+        let result = producer.transform_messages_to_protobuf(messages).await;
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert_eq!(err.kind, ErrorKind::InvalidMessage);
@@ -463,7 +658,7 @@
                 .build()
                 .unwrap(),
         ];
-        let result = producer.transform_messages_to_protobuf(messages);
+        let result = producer.transform_messages_to_protobuf(messages).await;
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert_eq!(err.kind, ErrorKind::InvalidMessage);
@@ -491,7 +686,7 @@
                             addresses: vec![],
                         }),
                     }),
-                    accept_message_types: vec![],
+                    accept_message_types: vec![MessageType::NORMAL as i32],
                 }],
             }))
         });
@@ -520,7 +715,7 @@
 
     #[tokio::test]
     async fn producer_send_transaction_message() -> Result<(), ClientError> {
-        let mut producer = new_producer_for_test();
+        let mut producer = new_transaction_producer_for_test();
         producer.client.expect_topic_route().returning(|_, _| {
             Ok(Arc::new(Route {
                 index: Default::default(),
@@ -539,7 +734,7 @@
                             addresses: vec![],
                         }),
                     }),
-                    accept_message_types: vec![],
+                    accept_message_types: vec![MessageType::TRANSACTION as i32],
                 }],
             }))
         });
@@ -556,20 +751,46 @@
             .client
             .expect_get_session()
             .return_once(|| Ok(Session::mock()));
-        producer
-            .client
-            .expect_has_transaction_checker()
-            .return_once(|| true);
 
         let _ = producer
             .send_transaction_message(
-                MessageBuilder::builder()
-                    .set_topic("test_topic")
-                    .set_body(vec![])
+                MessageBuilder::transaction_message_builder("test_topic", vec![])
                     .build()
                     .unwrap(),
             )
             .await?;
         Ok(())
     }
+
+    #[tokio::test]
+    async fn client_handle_recover_orphaned_transaction_command() {
+        let response = Ok(EndTransactionResponse {
+            status: Some(Status {
+                code: Code::Ok as i32,
+                message: "".to_string(),
+            }),
+        });
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_end_transaction()
+            .return_once(|_| Box::pin(futures::future::ready(response)));
+
+        let context = MockClient::handle_response_status_context();
+        context.expect().return_once(|_, _| Result::Ok(()));
+        let result = Producer::handle_recover_orphaned_transaction_command(
+            mock,
+            pb::RecoverOrphanedTransactionCommand {
+                message: Some(pb::Message {
+                    topic: Some(Resource::default()),
+                    user_properties: Default::default(),
+                    system_properties: Some(SystemProperties::default()),
+                    body: vec![],
+                }),
+                transaction_id: "".to_string(),
+            },
+            &Some(Box::new(|_, _| TransactionResolution::COMMIT)),
+            Endpoints::from_url("localhost:8081").unwrap(),
+        )
+        .await;
+        assert!(result.is_ok())
+    }
 }
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index f8a6eac..e891d38 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -18,7 +18,9 @@
 use std::time::Duration;
 
 use mockall_double::double;
-use slog::{info, Logger};
+use slog::{info, warn, Logger};
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
 
 #[double]
 use crate::client::Client;
@@ -45,6 +47,7 @@
     option: SimpleConsumerOption,
     logger: Logger,
     client: Client,
+    shutdown_tx: Option<oneshot::Sender<()>>,
 }
 
 impl SimpleConsumer {
@@ -78,6 +81,7 @@
             option,
             logger,
             client,
+            shutdown_tx: None,
         })
     }
 
@@ -90,12 +94,29 @@
                 Self::OPERATION_START_SIMPLE_CONSUMER,
             ));
         }
-        self.client.start().await?;
+        let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
+        self.client.start(telemetry_command_tx).await?;
         if let Some(topics) = self.option.topics() {
             for topic in topics {
                 self.client.topic_route(topic, true).await?;
             }
         }
+        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+        self.shutdown_tx = Some(shutdown_tx);
+        let logger = self.logger.clone();
+        tokio::spawn(async move {
+            loop {
+                select! {
+                    command = telemetry_command_rx.recv() => {
+                        warn!(logger, "command {:?} cannot be handled in simple consumer.", command);
+                    }
+
+                    _ = &mut shutdown_rx => {
+                       break;
+                    }
+                }
+            }
+        });
         info!(
             self.logger,
             "start simple consumer success, client_id: {}",
@@ -105,6 +126,9 @@
     }
 
     pub async fn shutdown(self) -> Result<(), ClientError> {
+        if let Some(shutdown_tx) = self.shutdown_tx {
+            let _ = shutdown_tx.send(());
+        };
         self.client.shutdown().await
     }
 
@@ -215,7 +239,7 @@
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| Ok(()));
+            client.expect_start().returning(|_| Ok(()));
             client
                 .expect_client_id()
                 .return_const("fake_id".to_string());
@@ -272,6 +296,7 @@
             option: SimpleConsumerOption::default(),
             logger: terminal_logger(),
             client,
+            shutdown_tx: None,
         };
 
         let messages = simple_consumer