[rust]support message type check on rust client (#658)

* support message type check on rust client

* update test cases

* simplify MessageType conversion

* remove unused function

* fix code fmt issue

* fix cargo fmt issue

* Use strong type in application code and primitive type for wire data only

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

---------

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
Co-authored-by: Li Zhanhui <lizhanhui@gmail.com>
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 59d1eeb..5210842 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -33,6 +33,9 @@
     #[error("Message is invalid")]
     InvalidMessage,
 
+    #[error("Message type not match with topic accept message type")]
+    MessageTypeNotMatch,
+
     #[error("Server error")]
     Server,
 
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 28c232e..1defcd8 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -21,9 +21,18 @@
 
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::Endpoints;
+use crate::model::message::MessageType::{DELAY, FIFO, NORMAL, TRANSACTION};
 use crate::model::message_id::UNIQ_ID_GENERATOR;
 use crate::pb;
 
+#[derive(Clone, Copy, Debug)]
+pub enum MessageType {
+    NORMAL = 1,
+    FIFO = 2,
+    DELAY = 3,
+    TRANSACTION = 4,
+}
+
 /// [`Message`] is the data model for sending.
 pub trait Message {
     fn take_message_id(&mut self) -> String;
@@ -35,6 +44,17 @@
     fn take_message_group(&mut self) -> Option<String>;
     fn take_delivery_timestamp(&mut self) -> Option<i64>;
     fn transaction_enabled(&mut self) -> bool;
+    fn get_message_type(&self) -> MessageType;
+}
+
+pub trait MessageTypeAware {
+    fn accept_type(&self, message_type: MessageType) -> bool;
+}
+
+impl MessageTypeAware for pb::MessageQueue {
+    fn accept_type(&self, message_type: MessageType) -> bool {
+        self.accept_message_types.contains(&(message_type as i32))
+    }
 }
 
 pub(crate) struct MessageImpl {
@@ -47,6 +67,7 @@
     pub(crate) message_group: Option<String>,
     pub(crate) delivery_timestamp: Option<i64>,
     pub(crate) transaction_enabled: bool,
+    pub(crate) message_type: MessageType,
 }
 
 impl Message for MessageImpl {
@@ -85,6 +106,10 @@
     fn transaction_enabled(&mut self) -> bool {
         self.transaction_enabled
     }
+
+    fn get_message_type(&self) -> MessageType {
+        self.message_type
+    }
 }
 
 /// [`MessageBuilder`] is the builder for [`Message`].
@@ -108,6 +133,7 @@
                 message_group: None,
                 delivery_timestamp: None,
                 transaction_enabled: false,
+                message_type: NORMAL,
             },
         }
     }
@@ -135,6 +161,7 @@
                 message_group: Some(message_group.into()),
                 delivery_timestamp: None,
                 transaction_enabled: false,
+                message_type: FIFO,
             },
         }
     }
@@ -162,6 +189,7 @@
                 message_group: None,
                 delivery_timestamp: Some(delay_time),
                 transaction_enabled: false,
+                message_type: DELAY,
             },
         }
     }
@@ -184,6 +212,7 @@
                 message_group: None,
                 delivery_timestamp: None,
                 transaction_enabled: true,
+                message_type: TRANSACTION,
             },
         }
     }
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 7e3f399..2a69f07 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -26,9 +26,9 @@
 use crate::conf::{ClientOption, ProducerOption};
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, SendReceipt};
-use crate::model::message;
+use crate::model::message::{self, MessageTypeAware};
 use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl};
-use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
+use crate::pb::{Encoding, Resource, SystemProperties};
 use crate::util::{
     build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
     select_message_queue_by_message_group, HOST_NAME,
@@ -172,20 +172,16 @@
                 .take_delivery_timestamp()
                 .map(|seconds| Timestamp { seconds, nanos: 0 });
 
-            let message_type = if message.transaction_enabled() {
+            if message.transaction_enabled() {
                 message_group = None;
                 delivery_timestamp = None;
-                MessageType::Transaction as i32
             } else if delivery_timestamp.is_some() {
                 message_group = None;
-                MessageType::Delay as i32
             } else if message_group.is_some() {
                 delivery_timestamp = None;
-                MessageType::Fifo as i32
-            } else {
-                MessageType::Normal as i32
             };
 
+            // TODO: use a converter trait From or TryFrom
             let pb_message = pb::Message {
                 topic: Some(Resource {
                     name: message.take_topic(),
@@ -198,7 +194,7 @@
                     message_id: message.take_message_id(),
                     message_group,
                     delivery_timestamp,
-                    message_type,
+                    message_type: message.get_message_type() as i32,
                     born_host: HOST_NAME.clone(),
                     born_timestamp: born_timestamp.clone(),
                     body_digest: None,
@@ -241,6 +237,11 @@
         &self,
         messages: Vec<impl message::Message>,
     ) -> Result<Vec<SendReceipt>, ClientError> {
+        let message_types = messages
+            .iter()
+            .map(|message| message.get_message_type())
+            .collect::<Vec<_>>();
+
         let (topic, message_group, mut pb_messages) =
             self.transform_messages_to_protobuf(messages)?;
 
@@ -252,6 +253,22 @@
             select_message_queue(route)
         };
 
+        if self.option.validate_message_type() {
+            for message_type in message_types {
+                if !message_queue.accept_type(message_type) {
+                    return Err(ClientError::new(
+                        ErrorKind::MessageTypeNotMatch,
+                        format!(
+                            "Current message type {:?} not match with accepted types {:?}.",
+                            message_type, message_queue.accept_message_types
+                        )
+                        .as_str(),
+                        Self::OPERATION_SEND_MESSAGE,
+                    ));
+                }
+            }
+        }
+
         let endpoints =
             build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?;
         for message in pb_messages.iter_mut() {
@@ -298,7 +315,7 @@
     use crate::error::ErrorKind;
     use crate::log::terminal_logger;
     use crate::model::common::Route;
-    use crate::model::message::{MessageBuilder, MessageImpl};
+    use crate::model::message::{MessageBuilder, MessageImpl, MessageType};
     use crate::model::transaction::TransactionResolution;
     use crate::pb::{Broker, MessageQueue};
     use crate::session::Session;
@@ -424,6 +441,7 @@
             message_group: None,
             delivery_timestamp: None,
             transaction_enabled: false,
+            message_type: MessageType::TRANSACTION,
         }];
         let result = producer.transform_messages_to_protobuf(messages);
         assert!(result.is_err());
@@ -491,7 +509,7 @@
                             addresses: vec![],
                         }),
                     }),
-                    accept_message_types: vec![],
+                    accept_message_types: vec![MessageType::NORMAL as i32],
                 }],
             }))
         });
@@ -539,7 +557,7 @@
                             addresses: vec![],
                         }),
                     }),
-                    accept_message_types: vec![],
+                    accept_message_types: vec![MessageType::TRANSACTION as i32],
                 }],
             }))
         });
@@ -563,9 +581,7 @@
 
         let _ = producer
             .send_transaction_message(
-                MessageBuilder::builder()
-                    .set_topic("test_topic")
-                    .set_body(vec![])
+                MessageBuilder::transaction_message_builder("test_topic", vec![])
                     .build()
                     .unwrap(),
             )