feat(rust): support changing invisible duration (#623)

* feat(rust): support changing invisible duration

Signed-off-by: SSpirits <admin@lv5.moe>

* fix(rust): fix clippy warning

Signed-off-by: SSpirits <admin@lv5.moe>

* feat(rust): configure the example branch using features

Signed-off-by: SSpirits <admin@lv5.moe>

* fix(rust): fix clippy warning

Signed-off-by: SSpirits <admin@lv5.moe>

---------

Signed-off-by: SSpirits <admin@lv5.moe>
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 80af0bb..6cc106b 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -33,11 +33,11 @@
 
 [dependencies]
 tokio = { version = "1", features = ["full"] }
-tokio-rustls = {version = "0.24.0", features = ["default", "dangerous_configuration"] }
-tokio-stream="0.1.12"
+tokio-rustls = { version = "0.24.0", features = ["default", "dangerous_configuration"] }
+tokio-stream = "0.1.12"
 async-trait = "0.1.68"
 lazy_static = "1.4"
-tonic = {version = "0.9.0", features = ["tls", "default", "channel", "tls-roots"]}
+tonic = { version = "0.9.0", features = ["tls", "default", "channel", "tls-roots"] }
 prost = "0.11.8"
 prost-types = "0.11.8"
 
@@ -47,7 +47,7 @@
 hostname = "0.3.1"
 os_info = "3"
 
-slog = {version = "2.7.0", features=["max_level_trace", "release_max_level_info"]}
+slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_info"] }
 slog-term = "2.9.0"
 slog-async = "2.7.0"
 slog-json = "2.6.1"
@@ -63,7 +63,7 @@
 once_cell = "1.9.0"
 
 mockall = "0.11.4"
-mockall_double= "0.3.0"
+mockall_double = "0.3.0"
 
 siphasher = "0.3.10"
 ring = "0.16.20"
@@ -78,3 +78,8 @@
 wiremock-grpc = "0.0.3-alpha2"
 futures = "0.3"
 awaitility = "0.3.0"
+
+[features]
+default = ["example_ack"]
+example_ack = []
+example_change_invisible_duration = []
\ No newline at end of file
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index 3e818e9..7179c9d 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -59,7 +59,7 @@
     // shutdown the producer when you don't need it anymore.
     // you should shutdown it manually to gracefully stop and unregister from server
     let shutdown_result = producer.shutdown().await;
-    if shutdown_result.is_ok() {
+    if shutdown_result.is_err() {
         eprintln!(
             "producer shutdown failed: {:?}",
             shutdown_result.unwrap_err()
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index fc1a838..d9bb06d 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -14,6 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#[cfg(feature = "example_change_invisible_duration")]
+use std::time::Duration;
+
 use rocketmq::conf::{ClientOption, SimpleConsumerOption};
 use rocketmq::model::common::{FilterExpression, FilterType};
 use rocketmq::SimpleConsumer;
@@ -63,14 +66,39 @@
 
     for message in messages {
         println!("receive message: {:?}", message);
-        // ack message to rocketmq proxy
-        let ack_result = consumer.ack(&message).await;
-        if ack_result.is_err() {
-            eprintln!(
-                "ack message {} failed: {:?}",
-                message.message_id(),
-                ack_result.unwrap_err()
+
+        // Do your business logic here
+        // And then acknowledge the message to the RocketMQ proxy if everything is okay
+        #[cfg(feature = "example_ack")]
+        {
+            println!("ack message {}", message.message_id());
+            let ack_result = consumer.ack(&message).await;
+            if ack_result.is_err() {
+                eprintln!(
+                    "ack message {} failed: {:?}",
+                    message.message_id(),
+                    ack_result.unwrap_err()
+                );
+            }
+        }
+
+        // Otherwise, you can retry this message later by changing the invisible duration
+        #[cfg(feature = "example_change_invisible_duration")]
+        {
+            println!(
+                "Delay next visible time of message {} by 10s",
+                message.message_id()
             );
+            let change_invisible_duration_result = consumer
+                .change_invisible_duration(&message, Duration::from_secs(10))
+                .await;
+            if change_invisible_duration_result.is_err() {
+                eprintln!(
+                    "change message {} invisible duration failed: {:?}",
+                    message.message_id(),
+                    change_invisible_duration_result.unwrap_err()
+                );
+            }
         }
     }
 
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 3804b4a..ae03468 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -34,12 +34,12 @@
 use crate::model::transaction::{TransactionChecker, TransactionResolution};
 use crate::pb;
 use crate::pb::receive_message_response::Content;
-use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
+use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings};
 use crate::pb::{
-    AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression,
-    HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, NotifyClientTerminationRequest,
-    QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, Status,
-    TelemetryCommand, TransactionSource,
+    AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, Code,
+    EndTransactionRequest, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message,
+    MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest,
+    Resource, SendMessageRequest, Status, TelemetryCommand, TransactionSource,
 };
 #[double]
 use crate::session::SessionManager;
@@ -282,6 +282,7 @@
                     ))
                 }
             }
+            Settings(_) => Ok(()),
             _ => Err(ClientError::new(
                 ErrorKind::Config,
                 "receive telemetry command but there is no handler",
@@ -291,7 +292,6 @@
         };
     }
 
-    #[allow(dead_code)]
     pub(crate) fn client_id(&self) -> &str {
         &self.id
     }
@@ -378,7 +378,6 @@
         })
     }
 
-    #[allow(dead_code)]
     pub(crate) async fn topic_route(
         &self,
         topic: &str,
@@ -461,8 +460,7 @@
         let result = self.query_topic_route(rpc_client, topic).await;
 
         // send result to all waiters
-        if result.is_ok() {
-            let route = result.unwrap();
+        if let Ok(route) = result {
             debug!(
                 self.logger,
                 "query route for topic={} success: route={:?}", topic, route
@@ -518,7 +516,6 @@
         Ok(response)
     }
 
-    #[allow(dead_code)]
     pub(crate) async fn send_message(
         &self,
         endpoints: &Endpoints,
@@ -547,7 +544,6 @@
             .collect())
     }
 
-    #[allow(dead_code)]
     pub(crate) async fn receive_message(
         &self,
         endpoints: &Endpoints,
@@ -608,7 +604,6 @@
         Ok(messages)
     }
 
-    #[allow(dead_code)]
     pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
         &self,
         ack_entry: &T,
@@ -649,6 +644,51 @@
         Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
         Ok(response.entries)
     }
+
+    pub(crate) async fn change_invisible_duration<T: AckMessageEntry + 'static>(
+        &self,
+        ack_entry: &T,
+        invisible_duration: Duration,
+    ) -> Result<String, ClientError> {
+        let result = self
+            .change_invisible_duration_inner(
+                self.get_session_with_endpoints(ack_entry.endpoints())
+                    .await
+                    .unwrap(),
+                ack_entry.topic(),
+                ack_entry.receipt_handle(),
+                invisible_duration,
+                ack_entry.message_id(),
+            )
+            .await?;
+        Ok(result)
+    }
+
+    pub(crate) async fn change_invisible_duration_inner<T: RPCClient + 'static>(
+        &self,
+        mut rpc_client: T,
+        topic: String,
+        receipt_handle: String,
+        invisible_duration: Duration,
+        message_id: String,
+    ) -> Result<String, ClientError> {
+        let request = ChangeInvisibleDurationRequest {
+            group: Some(Resource {
+                name: self.option.group.as_ref().unwrap().to_string(),
+                resource_namespace: self.option.namespace.to_string(),
+            }),
+            topic: Some(Resource {
+                name: topic,
+                resource_namespace: self.option.namespace.to_string(),
+            }),
+            receipt_handle,
+            invisible_duration: Some(invisible_duration),
+            message_id,
+        };
+        let response = rpc_client.change_invisible_duration(request).await?;
+        Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
+        Ok(response.receipt_handle)
+    }
 }
 
 #[cfg(test)]
@@ -668,9 +708,10 @@
     use crate::model::transaction::TransactionResolution;
     use crate::pb::receive_message_response::Content;
     use crate::pb::{
-        AckMessageEntry, AckMessageResponse, Code, EndTransactionResponse, FilterExpression,
-        HeartbeatResponse, Message, MessageQueue, QueryRouteResponse, ReceiveMessageResponse,
-        Resource, SendMessageResponse, Status, SystemProperties, TelemetryCommand,
+        AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, Code,
+        EndTransactionResponse, FilterExpression, HeartbeatResponse, Message, MessageQueue,
+        QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status,
+        SystemProperties, TelemetryCommand,
     };
     use crate::session;
 
@@ -1046,6 +1087,33 @@
     }
 
     #[tokio::test]
+    async fn client_change_invisible_duration() {
+        let response = Ok(ChangeInvisibleDurationResponse {
+            status: Some(Status {
+                code: Code::Ok as i32,
+                message: "Success".to_string(),
+            }),
+            receipt_handle: "receipt_handle".to_string(),
+        });
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_change_invisible_duration()
+            .return_once(|_| Box::pin(futures::future::ready(response)));
+
+        let client = new_client_for_test();
+        let change_invisible_duration_result = client
+            .change_invisible_duration_inner(
+                mock,
+                "test_topic".to_string(),
+                "receipt_handle".to_string(),
+                prost_types::Duration::default(),
+                "message_id".to_string(),
+            )
+            .await;
+        assert!(change_invisible_duration_result.is_ok());
+        assert_eq!(change_invisible_duration_result.unwrap(), "receipt_handle");
+    }
+
+    #[tokio::test]
     async fn client_ack_message_failed() {
         let response = Ok(AckMessageResponse {
             status: Some(Status {
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 919d4b3..5dee554 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -27,12 +27,13 @@
 use crate::pb;
 use crate::pb::{Address, AddressScheme, MessageQueue};
 
-#[allow(dead_code)]
 #[derive(Debug, Clone)]
 pub(crate) enum ClientType {
     Producer = 1,
+    #[allow(dead_code)]
     PushConsumer = 2,
     SimpleConsumer = 3,
+    #[allow(dead_code)]
     PullConsumer = 4,
 }
 
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index a2e9405..28c232e 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -59,7 +59,7 @@
     }
 
     fn take_body(&mut self) -> Vec<u8> {
-        self.body.take().unwrap_or(vec![])
+        self.body.take().unwrap_or_default()
     }
 
     fn take_tag(&mut self) -> Option<String> {
@@ -67,11 +67,11 @@
     }
 
     fn take_keys(&mut self) -> Vec<String> {
-        self.keys.take().unwrap_or(vec![])
+        self.keys.take().unwrap_or_default()
     }
 
     fn take_properties(&mut self) -> HashMap<String, String> {
-        self.properties.take().unwrap_or(HashMap::new())
+        self.properties.take().unwrap_or_default()
     }
 
     fn take_message_group(&mut self) -> Option<String> {
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
index 2f74679..2a40d7b 100644
--- a/rust/src/model/transaction.rs
+++ b/rust/src/model/transaction.rs
@@ -102,11 +102,11 @@
 #[async_trait]
 impl Transaction for TransactionImpl {
     async fn commit(mut self) -> Result<(), ClientError> {
-        return self.end_transaction(TransactionResolution::COMMIT).await;
+        self.end_transaction(TransactionResolution::COMMIT).await
     }
 
     async fn rollback(mut self) -> Result<(), ClientError> {
-        return self.end_transaction(TransactionResolution::ROLLBACK).await;
+        self.end_transaction(TransactionResolution::ROLLBACK).await
     }
 
     fn message_id(&self) -> &str {
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 9441f7c..d54894d 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -33,7 +33,8 @@
 use crate::model::common::Endpoints;
 use crate::pb::telemetry_command::Command;
 use crate::pb::{
-    AckMessageRequest, AckMessageResponse, EndTransactionRequest, EndTransactionResponse,
+    AckMessageRequest, AckMessageResponse, ChangeInvisibleDurationRequest,
+    ChangeInvisibleDurationResponse, EndTransactionRequest, EndTransactionResponse,
     HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest,
     NotifyClientTerminationResponse, QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest,
     ReceiveMessageResponse, SendMessageRequest, SendMessageResponse, TelemetryCommand,
@@ -49,6 +50,7 @@
 const OPERATION_SEND_MESSAGE: &str = "rpc.send_message";
 const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message";
 const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "rpc.change_invisible_duration";
 const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction";
 const OPERATION_NOTIFY_CLIENT_TERMINATION: &str = "rpc.notify_client_termination";
 
@@ -75,6 +77,10 @@
         &mut self,
         request: AckMessageRequest,
     ) -> Result<AckMessageResponse, ClientError>;
+    async fn change_invisible_duration(
+        &mut self,
+        request: ChangeInvisibleDurationRequest,
+    ) -> Result<ChangeInvisibleDurationResponse, ClientError>;
     async fn end_transaction(
         &mut self,
         request: EndTransactionRequest,
@@ -85,7 +91,6 @@
     ) -> Result<NotifyClientTerminationResponse, ClientError>;
 }
 
-#[allow(dead_code)]
 #[derive(Debug)]
 pub(crate) struct Session {
     logger: Logger,
@@ -353,7 +358,6 @@
         }
     }
 
-    #[allow(dead_code)]
     pub(crate) fn is_started(&self) -> bool {
         self.shutdown_tx.is_some()
     }
@@ -489,6 +493,26 @@
         Ok(response.into_inner())
     }
 
+    async fn change_invisible_duration(
+        &mut self,
+        request: ChangeInvisibleDurationRequest,
+    ) -> Result<ChangeInvisibleDurationResponse, ClientError> {
+        let request = self.sign(request);
+        let response = self
+            .stub
+            .change_invisible_duration(request)
+            .await
+            .map_err(|e| {
+                ClientError::new(
+                    ErrorKind::ClientInternal,
+                    "send rpc change_invisible_duration failed",
+                    OPERATION_CHANGE_INVISIBLE_DURATION,
+                )
+                .set_source(e)
+            })?;
+        Ok(response.into_inner())
+    }
+
     async fn end_transaction(
         &mut self,
         request: EndTransactionRequest,
@@ -571,7 +595,6 @@
         };
     }
 
-    #[allow(dead_code)]
     pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, ClientError> {
         let session_map = self.session_map.lock().await;
         let mut sessions = Vec::new();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index a877705..f8a6eac 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -175,6 +175,19 @@
         self.client.ack_message(ack_entry).await?;
         Ok(())
     }
+
+    pub async fn change_invisible_duration(
+        &self,
+        ack_entry: &(impl AckMessageEntry + 'static),
+        invisible_duration: Duration,
+    ) -> Result<String, ClientError> {
+        self.client
+            .change_invisible_duration(
+                ack_entry,
+                prost_types::Duration::try_from(invisible_duration).unwrap(),
+            )
+            .await
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/src/util.rs b/rust/src/util.rs
index e25d262..6ea92cb 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -92,7 +92,7 @@
     let topics = option
         .topics()
         .clone()
-        .unwrap_or(vec![])
+        .unwrap_or_default()
         .iter()
         .map(|topic| Resource {
             name: topic.to_string(),