[rust]support message type check on rust client (#658)
* support message type check on rust client
* update test cases
* simplify MessageType conversion
* remove unused function
* fix code fmt issue
* fix cargo fmt issue
* Use strong type in application code and primitive type for wire data only
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
---------
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
Co-authored-by: Li Zhanhui <lizhanhui@gmail.com>
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 59d1eeb..5210842 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -33,6 +33,9 @@
#[error("Message is invalid")]
InvalidMessage,
+ #[error("Message type not match with topic accept message type")]
+ MessageTypeNotMatch,
+
#[error("Server error")]
Server,
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 28c232e..1defcd8 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -21,9 +21,18 @@
use crate::error::{ClientError, ErrorKind};
use crate::model::common::Endpoints;
+use crate::model::message::MessageType::{DELAY, FIFO, NORMAL, TRANSACTION};
use crate::model::message_id::UNIQ_ID_GENERATOR;
use crate::pb;
+#[derive(Clone, Copy, Debug)]
+pub enum MessageType {
+ NORMAL = 1,
+ FIFO = 2,
+ DELAY = 3,
+ TRANSACTION = 4,
+}
+
/// [`Message`] is the data model for sending.
pub trait Message {
fn take_message_id(&mut self) -> String;
@@ -35,6 +44,17 @@
fn take_message_group(&mut self) -> Option<String>;
fn take_delivery_timestamp(&mut self) -> Option<i64>;
fn transaction_enabled(&mut self) -> bool;
+ fn get_message_type(&self) -> MessageType;
+}
+
+pub trait MessageTypeAware {
+ fn accept_type(&self, message_type: MessageType) -> bool;
+}
+
+impl MessageTypeAware for pb::MessageQueue {
+ fn accept_type(&self, message_type: MessageType) -> bool {
+ self.accept_message_types.contains(&(message_type as i32))
+ }
}
pub(crate) struct MessageImpl {
@@ -47,6 +67,7 @@
pub(crate) message_group: Option<String>,
pub(crate) delivery_timestamp: Option<i64>,
pub(crate) transaction_enabled: bool,
+ pub(crate) message_type: MessageType,
}
impl Message for MessageImpl {
@@ -85,6 +106,10 @@
fn transaction_enabled(&mut self) -> bool {
self.transaction_enabled
}
+
+ fn get_message_type(&self) -> MessageType {
+ self.message_type
+ }
}
/// [`MessageBuilder`] is the builder for [`Message`].
@@ -108,6 +133,7 @@
message_group: None,
delivery_timestamp: None,
transaction_enabled: false,
+ message_type: NORMAL,
},
}
}
@@ -135,6 +161,7 @@
message_group: Some(message_group.into()),
delivery_timestamp: None,
transaction_enabled: false,
+ message_type: FIFO,
},
}
}
@@ -162,6 +189,7 @@
message_group: None,
delivery_timestamp: Some(delay_time),
transaction_enabled: false,
+ message_type: DELAY,
},
}
}
@@ -184,6 +212,7 @@
message_group: None,
delivery_timestamp: None,
transaction_enabled: true,
+ message_type: TRANSACTION,
},
}
}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 7e3f399..2a69f07 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -26,9 +26,9 @@
use crate::conf::{ClientOption, ProducerOption};
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, SendReceipt};
-use crate::model::message;
+use crate::model::message::{self, MessageTypeAware};
use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl};
-use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
+use crate::pb::{Encoding, Resource, SystemProperties};
use crate::util::{
build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
select_message_queue_by_message_group, HOST_NAME,
@@ -172,20 +172,16 @@
.take_delivery_timestamp()
.map(|seconds| Timestamp { seconds, nanos: 0 });
- let message_type = if message.transaction_enabled() {
+ if message.transaction_enabled() {
message_group = None;
delivery_timestamp = None;
- MessageType::Transaction as i32
} else if delivery_timestamp.is_some() {
message_group = None;
- MessageType::Delay as i32
} else if message_group.is_some() {
delivery_timestamp = None;
- MessageType::Fifo as i32
- } else {
- MessageType::Normal as i32
};
+ // TODO: use a converter trait From or TryFrom
let pb_message = pb::Message {
topic: Some(Resource {
name: message.take_topic(),
@@ -198,7 +194,7 @@
message_id: message.take_message_id(),
message_group,
delivery_timestamp,
- message_type,
+ message_type: message.get_message_type() as i32,
born_host: HOST_NAME.clone(),
born_timestamp: born_timestamp.clone(),
body_digest: None,
@@ -241,6 +237,11 @@
&self,
messages: Vec<impl message::Message>,
) -> Result<Vec<SendReceipt>, ClientError> {
+ let message_types = messages
+ .iter()
+ .map(|message| message.get_message_type())
+ .collect::<Vec<_>>();
+
let (topic, message_group, mut pb_messages) =
self.transform_messages_to_protobuf(messages)?;
@@ -252,6 +253,22 @@
select_message_queue(route)
};
+ if self.option.validate_message_type() {
+ for message_type in message_types {
+ if !message_queue.accept_type(message_type) {
+ return Err(ClientError::new(
+ ErrorKind::MessageTypeNotMatch,
+ format!(
+ "Current message type {:?} not match with accepted types {:?}.",
+ message_type, message_queue.accept_message_types
+ )
+ .as_str(),
+ Self::OPERATION_SEND_MESSAGE,
+ ));
+ }
+ }
+ }
+
let endpoints =
build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?;
for message in pb_messages.iter_mut() {
@@ -298,7 +315,7 @@
use crate::error::ErrorKind;
use crate::log::terminal_logger;
use crate::model::common::Route;
- use crate::model::message::{MessageBuilder, MessageImpl};
+ use crate::model::message::{MessageBuilder, MessageImpl, MessageType};
use crate::model::transaction::TransactionResolution;
use crate::pb::{Broker, MessageQueue};
use crate::session::Session;
@@ -424,6 +441,7 @@
message_group: None,
delivery_timestamp: None,
transaction_enabled: false,
+ message_type: MessageType::TRANSACTION,
}];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_err());
@@ -491,7 +509,7 @@
addresses: vec![],
}),
}),
- accept_message_types: vec![],
+ accept_message_types: vec![MessageType::NORMAL as i32],
}],
}))
});
@@ -539,7 +557,7 @@
addresses: vec![],
}),
}),
- accept_message_types: vec![],
+ accept_message_types: vec![MessageType::TRANSACTION as i32],
}],
}))
});
@@ -563,9 +581,7 @@
let _ = producer
.send_transaction_message(
- MessageBuilder::builder()
- .set_topic("test_topic")
- .set_body(vec![])
+ MessageBuilder::transaction_message_builder("test_topic", vec![])
.build()
.unwrap(),
)