[ISSUE #575] Gracefully shutdown for Rust client (#576)

* feat(rust): add gracefully shutdown for rust sdk

Signed-off-by: SSpirits <admin@lv5.moe>

* fix(rust): fix test

Signed-off-by: SSpirits <admin@lv5.moe>

* fix(rust): apply reviewer changes

Signed-off-by: SSpirits <admin@lv5.moe>

---------

Signed-off-by: SSpirits <admin@lv5.moe>
diff --git a/rust/examples/delay_producer.rs b/rust/examples/delay_producer.rs
index 3e237c3..cca3bae 100644
--- a/rust/examples/delay_producer.rs
+++ b/rust/examples/delay_producer.rs
@@ -23,8 +23,8 @@
 
 #[tokio::main]
 async fn main() {
-    // recommend to specify which topic(s) you would like to send message to
-    // producer will prefetch topic route when start and failed fast if topic not exist
+    // It's recommended to specify the topics that applications will publish messages to
+    // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
     let mut producer_option = ProducerOption::default();
     producer_option.set_topics(vec!["delay_test"]);
 
@@ -34,7 +34,11 @@
 
     // build and start producer
     let mut producer = Producer::new(producer_option, client_option).unwrap();
-    producer.start().await.unwrap();
+    let start_result = producer.start().await;
+    if start_result.is_err() {
+        eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+        return;
+    }
 
     // build message
     let message = MessageBuilder::delay_message_builder(
@@ -51,10 +55,23 @@
     .unwrap();
 
     // send message to rocketmq proxy
-    let result = producer.send(message).await;
-    debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+    let send_result = producer.send(message).await;
+    if send_result.is_err() {
+        eprintln!("send message failed: {:?}", send_result.unwrap_err());
+        return;
+    }
     println!(
         "send message success, message_id={}",
-        result.unwrap().message_id()
+        send_result.unwrap().message_id()
     );
+
+    // shutdown the producer when you don't need it anymore.
+    // recommend shutdown manually to gracefully stop and unregister from server
+    let shutdown_result = producer.shutdown().await;
+    if shutdown_result.is_err() {
+        eprintln!(
+            "producer shutdown failed: {:?}",
+            shutdown_result.unwrap_err()
+        );
+    }
 }
diff --git a/rust/examples/fifo_producer.rs b/rust/examples/fifo_producer.rs
index 38562e2..211ae68 100644
--- a/rust/examples/fifo_producer.rs
+++ b/rust/examples/fifo_producer.rs
@@ -20,8 +20,8 @@
 
 #[tokio::main]
 async fn main() {
-    // recommend to specify which topic(s) you would like to send message to
-    // producer will prefetch topic route when start and failed fast if topic not exist
+    // It's recommended to specify the topics that applications will publish messages to
+    // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
     let mut producer_option = ProducerOption::default();
     producer_option.set_topics(vec!["fifo_test"]);
 
@@ -31,7 +31,11 @@
 
     // build and start producer
     let mut producer = Producer::new(producer_option, client_option).unwrap();
-    producer.start().await.unwrap();
+    let start_result = producer.start().await;
+    if start_result.is_err() {
+        eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+        return;
+    }
 
     // build message
     let message = MessageBuilder::fifo_message_builder(
@@ -44,10 +48,23 @@
     .unwrap();
 
     // send message to rocketmq proxy
-    let result = producer.send(message).await;
-    debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+    let send_result = producer.send(message).await;
+    if send_result.is_err() {
+        eprintln!("send message failed: {:?}", send_result.unwrap_err());
+        return;
+    }
     println!(
         "send message success, message_id={}",
-        result.unwrap().message_id()
+        send_result.unwrap().message_id()
     );
+
+    // shutdown the producer when you don't need it anymore.
+    // you should shutdown it manually to gracefully stop and unregister from server
+    let shutdown_result = producer.shutdown().await;
+    if shutdown_result.is_err() {
+        eprintln!(
+            "producer shutdown failed: {:?}",
+            shutdown_result.unwrap_err()
+        );
+    }
 }
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index 335b31f..3e818e9 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -20,8 +20,8 @@
 
 #[tokio::main]
 async fn main() {
-    // recommend to specify which topic(s) you would like to send message to
-    // producer will prefetch topic route when start and failed fast if topic not exist
+    // It's recommended to specify the topics that applications will publish messages to
+    // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
     let mut producer_option = ProducerOption::default();
     producer_option.set_topics(vec!["test_topic"]);
 
@@ -31,7 +31,11 @@
 
     // build and start producer
     let mut producer = Producer::new(producer_option, client_option).unwrap();
-    producer.start().await.unwrap();
+    let start_result = producer.start().await;
+    if start_result.is_err() {
+        eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+        return;
+    }
 
     // build message
     let message = MessageBuilder::builder()
@@ -42,10 +46,23 @@
         .unwrap();
 
     // send message to rocketmq proxy
-    let result = producer.send(message).await;
-    debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+    let send_result = producer.send(message).await;
+    if send_result.is_err() {
+        eprintln!("send message failed: {:?}", send_result.unwrap_err());
+        return;
+    }
     println!(
         "send message success, message_id={}",
-        result.unwrap().message_id()
+        send_result.unwrap().message_id()
     );
+
+    // shutdown the producer when you don't need it anymore.
+    // you should shutdown it manually to gracefully stop and unregister from server
+    let shutdown_result = producer.shutdown().await;
+    if shutdown_result.is_ok() {
+        eprintln!(
+            "producer shutdown failed: {:?}",
+            shutdown_result.unwrap_err()
+        );
+    }
 }
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index a5f9408..fc1a838 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -20,8 +20,8 @@
 
 #[tokio::main]
 async fn main() {
-    // recommend to specify which topic(s) you would like to send message to
-    // simple consumer will prefetch topic route when start and failed fast if topic not exist
+    // It's recommended to specify the topics that applications will publish messages to
+    // because the simple consumer will prefetch topic routes for them on start and fail fast in case they do not exist
     let mut consumer_option = SimpleConsumerOption::default();
     consumer_option.set_topics(vec!["test_topic"]);
     consumer_option.set_consumer_group("SimpleConsumerGroup");
@@ -33,38 +33,54 @@
 
     // build and start simple consumer
     let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
-    consumer.start().await.unwrap();
-
-    loop {
-        // pop message from rocketmq proxy
-        let receive_result = consumer
-            .receive(
-                "test_topic".to_string(),
-                &FilterExpression::new(FilterType::Tag, "test_tag"),
-            )
-            .await;
-        debug_assert!(
-            receive_result.is_ok(),
-            "receive message failed: {:?}",
-            receive_result.unwrap_err()
+    let start_result = consumer.start().await;
+    if start_result.is_err() {
+        eprintln!(
+            "simple consumer start failed: {:?}",
+            start_result.unwrap_err()
         );
+        return;
+    }
 
-        let messages = receive_result.unwrap();
+    // pop message from rocketmq proxy
+    let receive_result = consumer
+        .receive(
+            "test_topic".to_string(),
+            &FilterExpression::new(FilterType::Tag, "test_tag"),
+        )
+        .await;
+    if receive_result.is_err() {
+        eprintln!("receive message failed: {:?}", receive_result.unwrap_err());
+        return;
+    }
 
-        if messages.is_empty() {
-            println!("no message received");
-            return;
-        }
+    let messages = receive_result.unwrap();
 
-        for message in messages {
-            println!("receive message: {:?}", message);
-            // ack message to rocketmq proxy
-            let ack_result = consumer.ack(&message).await;
-            debug_assert!(
-                ack_result.is_ok(),
-                "ack message failed: {:?}",
+    if messages.is_empty() {
+        println!("no message received");
+        return;
+    }
+
+    for message in messages {
+        println!("receive message: {:?}", message);
+        // ack message to rocketmq proxy
+        let ack_result = consumer.ack(&message).await;
+        if ack_result.is_err() {
+            eprintln!(
+                "ack message {} failed: {:?}",
+                message.message_id(),
                 ack_result.unwrap_err()
             );
         }
     }
+
+    // shutdown the simple consumer when you don't need it anymore.
+    // you should shutdown it manually to gracefully stop and unregister from server
+    let shutdown_result = consumer.shutdown().await;
+    if shutdown_result.is_err() {
+        eprintln!(
+            "simple consumer shutdown failed: {:?}",
+            shutdown_result.unwrap_err()
+        );
+    }
 }
diff --git a/rust/examples/transaction_producer.rs b/rust/examples/transaction_producer.rs
index 7423cc9..6df6cb6 100644
--- a/rust/examples/transaction_producer.rs
+++ b/rust/examples/transaction_producer.rs
@@ -28,8 +28,8 @@
 
 #[tokio::main]
 async fn main() {
-    // recommend to specify which topic(s) you would like to send message to
-    // producer will prefetch topic route when start and failed fast if topic not exist
+    // It's recommended to specify the topics that applications will publish messages to
+    // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
     let mut producer_option = ProducerOption::default();
     producer_option.set_topics(vec!["transaction_test"]);
 
@@ -62,7 +62,11 @@
         }),
     )
     .unwrap();
-    producer.start().await.unwrap();
+    let start_result = producer.start().await;
+    if start_result.is_err() {
+        eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+        return;
+    }
 
     // build message
     let message = MessageBuilder::transaction_message_builder(
@@ -93,5 +97,18 @@
     // commit transaction manually
     // delete following two lines so that RocketMQ server will check transaction status periodically
     let result = transaction.commit().await;
-    debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
+    if result.is_err() {
+        eprintln!("commit transaction failed: {:?}", result.unwrap_err());
+        return;
+    }
+
+    // shutdown the producer when you don't need it anymore.
+    // you should shutdown it manually to gracefully stop and unregister from server
+    let shutdown_result = producer.shutdown().await;
+    if shutdown_result.is_err() {
+        eprintln!(
+            "transaction producer shutdown failed: {:?}",
+            shutdown_result.unwrap_err()
+        );
+    }
 }
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 57bddb8..3804b4a 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -37,9 +37,9 @@
 use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
 use crate::pb::{
     AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression,
-    HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, QueryRouteRequest,
-    ReceiveMessageRequest, Resource, SendMessageRequest, Status, TelemetryCommand,
-    TransactionSource,
+    HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, NotifyClientTerminationRequest,
+    QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, Status,
+    TelemetryCommand, TransactionSource,
 };
 #[double]
 use crate::session::SessionManager;
@@ -55,6 +55,7 @@
     settings: TelemetryCommand,
     transaction_checker: Option<Box<TransactionChecker>>,
     telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
+    shutdown_tx: Option<oneshot::Sender<()>>,
 }
 
 lazy_static::lazy_static! {
@@ -62,6 +63,8 @@
 }
 
 const OPERATION_CLIENT_NEW: &str = "client.new";
+const OPERATION_CLIENT_START: &str = "client.start";
+const OPERATION_CLIENT_SHUTDOWN: &str = "client.shutdown";
 const OPERATION_GET_SESSION: &str = "client.get_session";
 const OPERATION_QUERY_ROUTE: &str = "client.query_route";
 const OPERATION_HEARTBEAT: &str = "client.heartbeat";
@@ -102,11 +105,12 @@
             settings,
             transaction_checker: None,
             telemetry_command_tx: None,
+            shutdown_tx: None,
         })
     }
 
     pub(crate) fn is_started(&self) -> bool {
-        self.telemetry_command_tx.is_some()
+        self.shutdown_tx.is_some()
     }
 
     pub(crate) fn has_transaction_checker(&self) -> bool {
@@ -124,20 +128,27 @@
         let logger = self.logger.clone();
         let session_manager = self.session_manager.clone();
 
-        let group = self.option.group.to_string();
+        let group = self.option.group.clone();
         let namespace = self.option.namespace.to_string();
         let client_type = self.option.client_type.clone();
 
+        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+        self.shutdown_tx = Some(shutdown_tx);
+
         // send heartbeat and handle telemetry command
-        let (tx, mut rx) = mpsc::channel(16);
-        self.telemetry_command_tx = Some(tx);
-        let rpc_client = self.get_session().await?;
+        let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
+        self.telemetry_command_tx = Some(telemetry_command_tx);
+        let rpc_client = self
+            .get_session()
+            .await
+            .map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;
         let endpoints = self.access_endpoints.clone();
         let transaction_checker = self.transaction_checker.take();
         // give a placeholder
         if transaction_checker.is_some() {
             self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
         }
+
         tokio::spawn(async move {
             rpc_client.is_started();
             let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
@@ -178,20 +189,57 @@
                             debug!(logger,"send heartbeat to server success, peer={}",peer);
                         }
                     },
-                    command = rx.recv() => {
+                    command = telemetry_command_rx.recv() => {
                         if let Some(command) = command {
                             let result = Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker, endpoints.clone(), command).await;
                             if let Err(error) = result {
-                                error!(logger, "handle telemetry command failed: {:?}", error)
+                                error!(logger, "handle telemetry command failed: {:?}", error);
                             }
                         }
                     },
+                    _ = &mut shutdown_rx => {
+                        debug!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler");
+                        break;
+                    }
                 }
             }
+            info!(
+                logger,
+                "heartbeat task and telemetry command handler are stopped"
+            );
         });
         Ok(())
     }
 
+    fn check_started(&self, operation: &'static str) -> Result<(), ClientError> {
+        if !self.is_started() {
+            return Err(ClientError::new(
+                ErrorKind::ClientIsNotRunning,
+                "client is not started",
+                operation,
+            )
+            .with_context("client_id", self.id.clone()));
+        }
+        Ok(())
+    }
+
+    pub(crate) async fn shutdown(mut self) -> Result<(), ClientError> {
+        self.check_started(OPERATION_CLIENT_SHUTDOWN)?;
+        let mut rpc_client = self.get_session().await?;
+        self.telemetry_command_tx = None;
+        if let Some(tx) = self.shutdown_tx.take() {
+            let _ = tx.send(());
+        }
+        let group = self.option.group.as_ref().map(|group| Resource {
+            name: group.to_string(),
+            resource_namespace: self.option.namespace.to_string(),
+        });
+        let response = rpc_client.notify_shutdown(NotifyClientTerminationRequest { group });
+        Self::handle_response_status(response.await?.status, OPERATION_CLIENT_SHUTDOWN)?;
+        self.session_manager.shutdown().await;
+        Ok(())
+    }
+
     async fn handle_telemetry_command<T: RPCClient + 'static>(
         mut rpc_client: T,
         transaction_checker: &Option<Box<TransactionChecker>>,
@@ -268,14 +316,7 @@
     }
 
     pub(crate) async fn get_session(&self) -> Result<Session, ClientError> {
-        if !self.is_started() {
-            return Err(ClientError::new(
-                ErrorKind::ClientIsNotRunning,
-                "client is not started",
-                OPERATION_GET_SESSION,
-            )
-            .with_context("client_id", self.id.clone()));
-        }
+        self.check_started(OPERATION_GET_SESSION)?;
         let session = self
             .session_manager
             .get_or_create_session(
@@ -348,8 +389,8 @@
                 return Ok(route);
             }
         }
-        self.topic_route_inner(self.get_session().await.unwrap(), topic)
-            .await
+        let rpc_client = self.get_session().await?;
+        self.topic_route_inner(rpc_client, topic).await
     }
 
     async fn query_topic_route<T: RPCClient + 'static>(
@@ -461,15 +502,16 @@
 
     async fn heart_beat_inner<T: RPCClient + 'static>(
         mut rpc_client: T,
-        group: &str,
+        group: &Option<String>,
         namespace: &str,
         client_type: &ClientType,
     ) -> Result<HeartbeatResponse, ClientError> {
+        let group = group.as_ref().map(|group| Resource {
+            name: group.to_string(),
+            resource_namespace: namespace.to_string(),
+        });
         let request = HeartbeatRequest {
-            group: Some(Resource {
-                name: group.to_string(),
-                resource_namespace: namespace.to_string(),
-            }),
+            group,
             client_type: client_type.clone() as i32,
         };
         let response = rpc_client.heartbeat(request).await?;
@@ -534,7 +576,7 @@
     ) -> Result<Vec<Message>, ClientError> {
         let request = ReceiveMessageRequest {
             group: Some(Resource {
-                name: self.option.group.to_string(),
+                name: self.option.group.as_ref().unwrap().to_string(),
                 resource_namespace: self.option.namespace.to_string(),
             }),
             message_queue: Some(message_queue),
@@ -594,7 +636,7 @@
     ) -> Result<Vec<AckMessageResultEntry>, ClientError> {
         let request = AckMessageRequest {
             group: Some(Resource {
-                name: self.option.group.to_string(),
+                name: self.option.group.as_ref().unwrap().to_string(),
                 resource_namespace: self.option.namespace.to_string(),
             }),
             topic: Some(Resource {
@@ -642,7 +684,10 @@
     fn new_client_for_test() -> Client {
         Client {
             logger: terminal_logger(),
-            option: ClientOption::default(),
+            option: ClientOption {
+                group: Some("group".to_string()),
+                ..Default::default()
+            },
             session_manager: Arc::new(SessionManager::default()),
             route_table: Mutex::new(HashMap::new()),
             id: Client::generate_client_id(),
@@ -650,6 +695,7 @@
             settings: TelemetryCommand::default(),
             transaction_checker: None,
             telemetry_command_tx: None,
+            shutdown_tx: None,
         }
     }
 
@@ -665,6 +711,7 @@
             settings: TelemetryCommand::default(),
             transaction_checker: None,
             telemetry_command_tx: Some(tx),
+            shutdown_tx: None,
         }
     }
 
@@ -714,7 +761,8 @@
             .expect_get_or_create_session()
             .returning(|_, _, _| Ok(Session::mock()));
 
-        let client = new_client_with_session_manager(session_manager);
+        let mut client = new_client_with_session_manager(session_manager);
+        let _ = client.start().await;
         let result = client.get_session().await;
         assert!(result.is_ok());
         let result = client
@@ -893,7 +941,9 @@
         mock.expect_heartbeat()
             .return_once(|_| Box::pin(futures::future::ready(response)));
 
-        let send_result = Client::heart_beat_inner(mock, "", "", &ClientType::Producer).await;
+        let send_result =
+            Client::heart_beat_inner(mock, &Some("group".to_string()), "", &ClientType::Producer)
+                .await;
         assert!(send_result.is_ok());
     }
 
@@ -1034,7 +1084,7 @@
         let result = Client::handle_telemetry_command(
             mock,
             &Some(Box::new(|_, _| TransactionResolution::COMMIT)),
-            Endpoints::from_url("localhopst:8081").unwrap(),
+            Endpoints::from_url("localhost:8081").unwrap(),
             RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand {
                 message: Some(Message {
                     topic: Some(Resource::default()),
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index ea27d27..fa16a41 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -29,7 +29,7 @@
 #[derive(Debug, Clone)]
 pub struct ClientOption {
     pub(crate) client_type: ClientType,
-    pub(crate) group: String,
+    pub(crate) group: Option<String>,
     pub(crate) namespace: String,
     pub(crate) access_url: String,
     pub(crate) enable_tls: bool,
@@ -43,7 +43,7 @@
     fn default() -> Self {
         ClientOption {
             client_type: ClientType::Producer,
-            group: "".to_string(),
+            group: None,
             namespace: "".to_string(),
             access_url: "localhost:8081".to_string(),
             enable_tls: false,
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index d5e768c..7e3f399 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -275,8 +275,9 @@
         }
         let topic = message.take_topic();
         let receipt = self.send(message).await?;
+        let rpc_client = self.client.get_session().await?;
         Ok(TransactionImpl::new(
-            Box::new(self.client.get_session().await.unwrap()),
+            Box::new(rpc_client),
             Resource {
                 resource_namespace: self.option.namespace().to_string(),
                 name: topic,
@@ -284,6 +285,10 @@
             receipt,
         ))
     }
+
+    pub async fn shutdown(self) -> Result<(), ClientError> {
+        self.client.shutdown().await
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 5762d66..9441f7c 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -22,7 +22,7 @@
 use slog::{debug, error, info, o, Logger};
 use time::format_description::well_known::Rfc3339;
 use time::OffsetDateTime;
-use tokio::sync::{mpsc, Mutex};
+use tokio::sync::{mpsc, oneshot, Mutex};
 use tokio_stream::wrappers::ReceiverStream;
 use tokio_stream::StreamExt;
 use tonic::metadata::{AsciiMetadataValue, MetadataMap};
@@ -34,9 +34,9 @@
 use crate::pb::telemetry_command::Command;
 use crate::pb::{
     AckMessageRequest, AckMessageResponse, EndTransactionRequest, EndTransactionResponse,
-    HeartbeatRequest, HeartbeatResponse, QueryRouteRequest, QueryRouteResponse,
-    ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest, SendMessageResponse,
-    TelemetryCommand,
+    HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest,
+    NotifyClientTerminationResponse, QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest,
+    ReceiveMessageResponse, SendMessageRequest, SendMessageResponse, TelemetryCommand,
 };
 use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
 use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
@@ -50,6 +50,7 @@
 const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message";
 const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message";
 const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction";
+const OPERATION_NOTIFY_CLIENT_TERMINATION: &str = "rpc.notify_client_termination";
 
 #[async_trait]
 #[automock]
@@ -78,17 +79,36 @@
         &mut self,
         request: EndTransactionRequest,
     ) -> Result<EndTransactionResponse, ClientError>;
+    async fn notify_shutdown(
+        &mut self,
+        request: NotifyClientTerminationRequest,
+    ) -> Result<NotifyClientTerminationResponse, ClientError>;
 }
 
 #[allow(dead_code)]
-#[derive(Debug, Clone)]
+#[derive(Debug)]
 pub(crate) struct Session {
     logger: Logger,
     client_id: String,
     option: ClientOption,
     endpoints: Endpoints,
     stub: MessagingServiceClient<Channel>,
-    telemetry_tx: Box<Option<mpsc::Sender<TelemetryCommand>>>,
+    telemetry_tx: Option<mpsc::Sender<TelemetryCommand>>,
+    shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+impl Clone for Session {
+    fn clone(&self) -> Self {
+        Session {
+            logger: self.logger.clone(),
+            client_id: self.client_id.clone(),
+            option: self.option.clone(),
+            endpoints: self.endpoints.clone(),
+            stub: self.stub.clone(),
+            telemetry_tx: None,
+            shutdown_tx: None,
+        }
+    }
 }
 
 impl Session {
@@ -108,7 +128,8 @@
             stub: MessagingServiceClient::new(
                 Channel::from_static("http://localhost:8081").connect_lazy(),
             ),
-            telemetry_tx: Box::new(None),
+            telemetry_tx: None,
+            shutdown_tx: None,
         }
     }
 
@@ -159,7 +180,8 @@
             endpoints: endpoints.clone(),
             client_id,
             stub,
-            telemetry_tx: Box::new(None),
+            telemetry_tx: None,
+            shutdown_tx: None,
         })
     }
 
@@ -288,24 +310,35 @@
             .set_source(e)
         })?;
 
+        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+        self.shutdown_tx = Some(shutdown_tx);
+
         let logger = self.logger.clone();
         tokio::spawn(async move {
             let mut stream = response.into_inner();
             loop {
-                match stream.message().await {
-                    Ok(Some(item)) => {
-                        debug!(logger, "receive telemetry command: {:?}", item);
-                        if let Some(command) = item.command {
-                            _ = telemetry_command_tx.send(command).await;
+                tokio::select! {
+                    message = stream.message() => {
+                        match message {
+                            Ok(Some(item)) => {
+                                debug!(logger, "receive telemetry command: {:?}", item);
+                                if let Some(command) = item.command {
+                                    _ = telemetry_command_tx.send(command).await;
+                                }
+                            }
+                            Ok(None) => {
+                                info!(logger, "telemetry command stream closed by server");
+                                break;
+                            }
+                            Err(e) => {
+                                error!(logger, "telemetry response error: {:?}", e);
+                            }
                         }
                     }
-                    Ok(None) => {
-                        debug!(logger, "request stream closed");
+                    _ = &mut shutdown_rx => {
+                        info!(logger, "receive shutdown signal, stop dealing with telemetry command");
                         break;
                     }
-                    Err(e) => {
-                        error!(logger, "telemetry response error: {:?}", e);
-                    }
                 }
             }
         });
@@ -314,9 +347,15 @@
         Ok(())
     }
 
+    pub(crate) fn shutdown(&mut self) {
+        if let Some(tx) = self.shutdown_tx.take() {
+            let _ = tx.send(());
+        }
+    }
+
     #[allow(dead_code)]
     pub(crate) fn is_started(&self) -> bool {
-        self.telemetry_tx.is_some()
+        self.shutdown_tx.is_some()
     }
 
     #[allow(dead_code)]
@@ -465,6 +504,26 @@
         })?;
         Ok(response.into_inner())
     }
+
+    async fn notify_shutdown(
+        &mut self,
+        request: NotifyClientTerminationRequest,
+    ) -> Result<NotifyClientTerminationResponse, ClientError> {
+        let request = self.sign(request);
+        let response = self
+            .stub
+            .notify_client_termination(request)
+            .await
+            .map_err(|e| {
+                ClientError::new(
+                    ErrorKind::ClientInternal,
+                    "send rpc notify_client_termination failed",
+                    OPERATION_NOTIFY_CLIENT_TERMINATION,
+                )
+                .set_source(e)
+            })?;
+        Ok(response.into_inner())
+    }
 }
 
 #[derive(Debug)]
@@ -521,6 +580,14 @@
         }
         Ok(sessions)
     }
+
+    pub(crate) async fn shutdown(&self) {
+        let mut session_map = self.session_map.lock().await;
+        for (_, session) in session_map.iter_mut() {
+            session.shutdown();
+        }
+        session_map.clear();
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index d4e222e..a877705 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -67,7 +67,7 @@
 
         let client_option = ClientOption {
             client_type: ClientType::SimpleConsumer,
-            group: option.consumer_group().to_string(),
+            group: Some(option.consumer_group().to_string()),
             namespace: option.namespace().to_string(),
             ..client_option
         };
@@ -104,6 +104,10 @@
         Ok(())
     }
 
+    pub async fn shutdown(self) -> Result<(), ClientError> {
+        self.client.shutdown().await
+    }
+
     /// receive messages from the specified topic
     ///
     /// # Arguments
@@ -115,7 +119,7 @@
         topic: impl AsRef<str>,
         expression: &FilterExpression,
     ) -> Result<Vec<MessageView>, ClientError> {
-        self.receive_with_batch_size(topic.as_ref(), expression, 32, Duration::from_secs(15))
+        self.receive_with(topic.as_ref(), expression, 32, Duration::from_secs(15))
             .await
     }
 
@@ -127,7 +131,7 @@
     /// * `expression` - the subscription for the topic
     /// * `batch_size` - max message num of server returned
     /// * `invisible_duration` - set the invisible duration of messages that return from the server, these messages will not be visible to other consumers unless timeout
-    pub async fn receive_with_batch_size(
+    pub async fn receive_with(
         &self,
         topic: impl AsRef<str>,
         expression: &FilterExpression,