feat(rust): support changing invisible duration (#623)
* feat(rust): support changing invisible duration
Signed-off-by: SSpirits <admin@lv5.moe>
* fix(rust): fix clippy warning
Signed-off-by: SSpirits <admin@lv5.moe>
* feat(rust): configure the example branch using features
Signed-off-by: SSpirits <admin@lv5.moe>
* fix(rust): fix clippy warning
Signed-off-by: SSpirits <admin@lv5.moe>
---------
Signed-off-by: SSpirits <admin@lv5.moe>
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 80af0bb..6cc106b 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -33,11 +33,11 @@
[dependencies]
tokio = { version = "1", features = ["full"] }
-tokio-rustls = {version = "0.24.0", features = ["default", "dangerous_configuration"] }
-tokio-stream="0.1.12"
+tokio-rustls = { version = "0.24.0", features = ["default", "dangerous_configuration"] }
+tokio-stream = "0.1.12"
async-trait = "0.1.68"
lazy_static = "1.4"
-tonic = {version = "0.9.0", features = ["tls", "default", "channel", "tls-roots"]}
+tonic = { version = "0.9.0", features = ["tls", "default", "channel", "tls-roots"] }
prost = "0.11.8"
prost-types = "0.11.8"
@@ -47,7 +47,7 @@
hostname = "0.3.1"
os_info = "3"
-slog = {version = "2.7.0", features=["max_level_trace", "release_max_level_info"]}
+slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_info"] }
slog-term = "2.9.0"
slog-async = "2.7.0"
slog-json = "2.6.1"
@@ -63,7 +63,7 @@
once_cell = "1.9.0"
mockall = "0.11.4"
-mockall_double= "0.3.0"
+mockall_double = "0.3.0"
siphasher = "0.3.10"
ring = "0.16.20"
@@ -78,3 +78,8 @@
wiremock-grpc = "0.0.3-alpha2"
futures = "0.3"
awaitility = "0.3.0"
+
+[features]
+default = ["example_ack"]
+example_ack = []
+example_change_invisible_duration = []
\ No newline at end of file
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index 3e818e9..7179c9d 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -59,7 +59,7 @@
// shutdown the producer when you don't need it anymore.
// you should shutdown it manually to gracefully stop and unregister from server
let shutdown_result = producer.shutdown().await;
- if shutdown_result.is_ok() {
+ if shutdown_result.is_err() {
eprintln!(
"producer shutdown failed: {:?}",
shutdown_result.unwrap_err()
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index fc1a838..d9bb06d 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -14,6 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#[cfg(feature = "example_change_invisible_duration")]
+use std::time::Duration;
+
use rocketmq::conf::{ClientOption, SimpleConsumerOption};
use rocketmq::model::common::{FilterExpression, FilterType};
use rocketmq::SimpleConsumer;
@@ -63,14 +66,39 @@
for message in messages {
println!("receive message: {:?}", message);
- // ack message to rocketmq proxy
- let ack_result = consumer.ack(&message).await;
- if ack_result.is_err() {
- eprintln!(
- "ack message {} failed: {:?}",
- message.message_id(),
- ack_result.unwrap_err()
+
+ // Do your business logic here
+ // And then acknowledge the message to the RocketMQ proxy if everything is okay
+ #[cfg(feature = "example_ack")]
+ {
+ println!("ack message {}", message.message_id());
+ let ack_result = consumer.ack(&message).await;
+ if ack_result.is_err() {
+ eprintln!(
+ "ack message {} failed: {:?}",
+ message.message_id(),
+ ack_result.unwrap_err()
+ );
+ }
+ }
+
+ // Otherwise, you can retry this message later by changing the invisible duration
+ #[cfg(feature = "example_change_invisible_duration")]
+ {
+ println!(
+ "Delay next visible time of message {} by 10s",
+ message.message_id()
);
+ let change_invisible_duration_result = consumer
+ .change_invisible_duration(&message, Duration::from_secs(10))
+ .await;
+ if change_invisible_duration_result.is_err() {
+ eprintln!(
+ "change message {} invisible duration failed: {:?}",
+ message.message_id(),
+ change_invisible_duration_result.unwrap_err()
+ );
+ }
}
}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 3804b4a..ae03468 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -34,12 +34,12 @@
use crate::model::transaction::{TransactionChecker, TransactionResolution};
use crate::pb;
use crate::pb::receive_message_response::Content;
-use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
+use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings};
use crate::pb::{
- AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression,
- HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, NotifyClientTerminationRequest,
- QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, Status,
- TelemetryCommand, TransactionSource,
+ AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, Code,
+ EndTransactionRequest, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message,
+ MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest,
+ Resource, SendMessageRequest, Status, TelemetryCommand, TransactionSource,
};
#[double]
use crate::session::SessionManager;
@@ -282,6 +282,7 @@
))
}
}
+ Settings(_) => Ok(()),
_ => Err(ClientError::new(
ErrorKind::Config,
"receive telemetry command but there is no handler",
@@ -291,7 +292,6 @@
};
}
- #[allow(dead_code)]
pub(crate) fn client_id(&self) -> &str {
&self.id
}
@@ -378,7 +378,6 @@
})
}
- #[allow(dead_code)]
pub(crate) async fn topic_route(
&self,
topic: &str,
@@ -461,8 +460,7 @@
let result = self.query_topic_route(rpc_client, topic).await;
// send result to all waiters
- if result.is_ok() {
- let route = result.unwrap();
+ if let Ok(route) = result {
debug!(
self.logger,
"query route for topic={} success: route={:?}", topic, route
@@ -518,7 +516,6 @@
Ok(response)
}
- #[allow(dead_code)]
pub(crate) async fn send_message(
&self,
endpoints: &Endpoints,
@@ -547,7 +544,6 @@
.collect())
}
- #[allow(dead_code)]
pub(crate) async fn receive_message(
&self,
endpoints: &Endpoints,
@@ -608,7 +604,6 @@
Ok(messages)
}
- #[allow(dead_code)]
pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
&self,
ack_entry: &T,
@@ -649,6 +644,51 @@
Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.entries)
}
+
+ pub(crate) async fn change_invisible_duration<T: AckMessageEntry + 'static>(
+ &self,
+ ack_entry: &T,
+ invisible_duration: Duration,
+ ) -> Result<String, ClientError> {
+ let result = self
+ .change_invisible_duration_inner(
+ self.get_session_with_endpoints(ack_entry.endpoints())
+ .await
+ .unwrap(),
+ ack_entry.topic(),
+ ack_entry.receipt_handle(),
+ invisible_duration,
+ ack_entry.message_id(),
+ )
+ .await?;
+ Ok(result)
+ }
+
+ pub(crate) async fn change_invisible_duration_inner<T: RPCClient + 'static>(
+ &self,
+ mut rpc_client: T,
+ topic: String,
+ receipt_handle: String,
+ invisible_duration: Duration,
+ message_id: String,
+ ) -> Result<String, ClientError> {
+ let request = ChangeInvisibleDurationRequest {
+ group: Some(Resource {
+ name: self.option.group.as_ref().unwrap().to_string(),
+ resource_namespace: self.option.namespace.to_string(),
+ }),
+ topic: Some(Resource {
+ name: topic,
+ resource_namespace: self.option.namespace.to_string(),
+ }),
+ receipt_handle,
+ invisible_duration: Some(invisible_duration),
+ message_id,
+ };
+ let response = rpc_client.change_invisible_duration(request).await?;
+ Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
+ Ok(response.receipt_handle)
+ }
}
#[cfg(test)]
@@ -668,9 +708,10 @@
use crate::model::transaction::TransactionResolution;
use crate::pb::receive_message_response::Content;
use crate::pb::{
- AckMessageEntry, AckMessageResponse, Code, EndTransactionResponse, FilterExpression,
- HeartbeatResponse, Message, MessageQueue, QueryRouteResponse, ReceiveMessageResponse,
- Resource, SendMessageResponse, Status, SystemProperties, TelemetryCommand,
+ AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, Code,
+ EndTransactionResponse, FilterExpression, HeartbeatResponse, Message, MessageQueue,
+ QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status,
+ SystemProperties, TelemetryCommand,
};
use crate::session;
@@ -1046,6 +1087,33 @@
}
#[tokio::test]
+ async fn client_change_invisible_duration() {
+ let response = Ok(ChangeInvisibleDurationResponse {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "Success".to_string(),
+ }),
+ receipt_handle: "receipt_handle".to_string(),
+ });
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_change_invisible_duration()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+
+ let client = new_client_for_test();
+ let change_invisible_duration_result = client
+ .change_invisible_duration_inner(
+ mock,
+ "test_topic".to_string(),
+ "receipt_handle".to_string(),
+ prost_types::Duration::default(),
+ "message_id".to_string(),
+ )
+ .await;
+ assert!(change_invisible_duration_result.is_ok());
+ assert_eq!(change_invisible_duration_result.unwrap(), "receipt_handle");
+ }
+
+ #[tokio::test]
async fn client_ack_message_failed() {
let response = Ok(AckMessageResponse {
status: Some(Status {
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 919d4b3..5dee554 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -27,12 +27,13 @@
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};
-#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
Producer = 1,
+ #[allow(dead_code)]
PushConsumer = 2,
SimpleConsumer = 3,
+ #[allow(dead_code)]
PullConsumer = 4,
}
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index a2e9405..28c232e 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -59,7 +59,7 @@
}
fn take_body(&mut self) -> Vec<u8> {
- self.body.take().unwrap_or(vec![])
+ self.body.take().unwrap_or_default()
}
fn take_tag(&mut self) -> Option<String> {
@@ -67,11 +67,11 @@
}
fn take_keys(&mut self) -> Vec<String> {
- self.keys.take().unwrap_or(vec![])
+ self.keys.take().unwrap_or_default()
}
fn take_properties(&mut self) -> HashMap<String, String> {
- self.properties.take().unwrap_or(HashMap::new())
+ self.properties.take().unwrap_or_default()
}
fn take_message_group(&mut self) -> Option<String> {
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
index 2f74679..2a40d7b 100644
--- a/rust/src/model/transaction.rs
+++ b/rust/src/model/transaction.rs
@@ -102,11 +102,11 @@
#[async_trait]
impl Transaction for TransactionImpl {
async fn commit(mut self) -> Result<(), ClientError> {
- return self.end_transaction(TransactionResolution::COMMIT).await;
+ self.end_transaction(TransactionResolution::COMMIT).await
}
async fn rollback(mut self) -> Result<(), ClientError> {
- return self.end_transaction(TransactionResolution::ROLLBACK).await;
+ self.end_transaction(TransactionResolution::ROLLBACK).await
}
fn message_id(&self) -> &str {
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 9441f7c..d54894d 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -33,7 +33,8 @@
use crate::model::common::Endpoints;
use crate::pb::telemetry_command::Command;
use crate::pb::{
- AckMessageRequest, AckMessageResponse, EndTransactionRequest, EndTransactionResponse,
+ AckMessageRequest, AckMessageResponse, ChangeInvisibleDurationRequest,
+ ChangeInvisibleDurationResponse, EndTransactionRequest, EndTransactionResponse,
HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest,
NotifyClientTerminationResponse, QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest,
ReceiveMessageResponse, SendMessageRequest, SendMessageResponse, TelemetryCommand,
@@ -49,6 +50,7 @@
const OPERATION_SEND_MESSAGE: &str = "rpc.send_message";
const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message";
const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "rpc.change_invisible_duration";
const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction";
const OPERATION_NOTIFY_CLIENT_TERMINATION: &str = "rpc.notify_client_termination";
@@ -75,6 +77,10 @@
&mut self,
request: AckMessageRequest,
) -> Result<AckMessageResponse, ClientError>;
+ async fn change_invisible_duration(
+ &mut self,
+ request: ChangeInvisibleDurationRequest,
+ ) -> Result<ChangeInvisibleDurationResponse, ClientError>;
async fn end_transaction(
&mut self,
request: EndTransactionRequest,
@@ -85,7 +91,6 @@
) -> Result<NotifyClientTerminationResponse, ClientError>;
}
-#[allow(dead_code)]
#[derive(Debug)]
pub(crate) struct Session {
logger: Logger,
@@ -353,7 +358,6 @@
}
}
- #[allow(dead_code)]
pub(crate) fn is_started(&self) -> bool {
self.shutdown_tx.is_some()
}
@@ -489,6 +493,26 @@
Ok(response.into_inner())
}
+ async fn change_invisible_duration(
+ &mut self,
+ request: ChangeInvisibleDurationRequest,
+ ) -> Result<ChangeInvisibleDurationResponse, ClientError> {
+ let request = self.sign(request);
+ let response = self
+ .stub
+ .change_invisible_duration(request)
+ .await
+ .map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc change_invisible_duration failed",
+ OPERATION_CHANGE_INVISIBLE_DURATION,
+ )
+ .set_source(e)
+ })?;
+ Ok(response.into_inner())
+ }
+
async fn end_transaction(
&mut self,
request: EndTransactionRequest,
@@ -571,7 +595,6 @@
};
}
- #[allow(dead_code)]
pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, ClientError> {
let session_map = self.session_map.lock().await;
let mut sessions = Vec::new();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index a877705..f8a6eac 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -175,6 +175,19 @@
self.client.ack_message(ack_entry).await?;
Ok(())
}
+
+ pub async fn change_invisible_duration(
+ &self,
+ ack_entry: &(impl AckMessageEntry + 'static),
+ invisible_duration: Duration,
+ ) -> Result<String, ClientError> {
+ self.client
+ .change_invisible_duration(
+ ack_entry,
+ prost_types::Duration::try_from(invisible_duration).unwrap(),
+ )
+ .await
+ }
}
#[cfg(test)]
diff --git a/rust/src/util.rs b/rust/src/util.rs
index e25d262..6ea92cb 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -92,7 +92,7 @@
let topics = option
.topics()
.clone()
- .unwrap_or(vec![])
+ .unwrap_or_default()
.iter()
.map(|topic| Resource {
name: topic.to_string(),