feat(rust): enhance producer and add example (#471)

* feat(rust): enhance producer and add example

* chore(rust): fix license

* fix(rust): fix according to review comments
diff --git a/.gitignore b/.gitignore
index 1f25160..d0df1ca 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
 # Intellij IDEA
 .idea/
+.run/
 out/
 *.ipr
 *.iml
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index d415495..c99ca5f 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -53,9 +53,12 @@
 
 mockall = "0.11.4"
 
+siphasher = "0.3.10"
+
 [build-dependencies]
 tonic-build = "0.9.0"
 
 [dev-dependencies]
 wiremock-grpc = "0.0.3-alpha2"
 futures = "0.3"
+awaitility = "0.3.0"
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
new file mode 100644
index 0000000..e88a3bc
--- /dev/null
+++ b/rust/examples/producer.rs
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use rocketmq::{ClientOption, MessageImpl, Producer, ProducerOption};
+
+#[tokio::main]
+async fn main() {
+    // specify which topic(s) you would like to send message to
+    // producer will prefetch topic route when start and failed fast if topic not exist
+    let mut producer_option = ProducerOption::default();
+    producer_option.set_topics(vec!["test_topic".to_string()]);
+
+    // set which rocketmq proxy to connect
+    let mut client_option = ClientOption::default();
+    client_option.set_access_url("localhost:8081".to_string());
+
+    // build and start producer
+    let producer = Producer::new(producer_option, client_option).await.unwrap();
+    producer.start().await.unwrap();
+
+    // build message
+    let message = MessageImpl::builder()
+        .set_topic("test_topic".to_string())
+        .set_tags("test_tag".to_string())
+        .set_body("hello world".as_bytes().to_vec())
+        .build()
+        .unwrap();
+
+    // send message to rocketmq proxy
+    let result = producer.send_one(message).await;
+    debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+    println!(
+        "send message success, message_id={}",
+        result.unwrap().message_id
+    );
+}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 0c80d04..a60777e 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -14,15 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use std::clone::Clone;
+use std::string::ToString;
 use std::{collections::HashMap, sync::atomic::AtomicUsize, sync::Arc};
 
 use parking_lot::Mutex;
-use slog::{debug, info, o, Logger};
+use slog::{debug, error, info, o, warn, Logger};
 use tokio::sync::oneshot;
 
 use crate::conf::ClientOption;
 use crate::error::{ClientError, ErrorKind};
-use crate::model::{Endpoints, Route, RouteStatus};
+use crate::model::common::{Endpoints, Route, RouteStatus};
 use crate::pb::{
     Code, Message, QueryRouteRequest, Resource, SendMessageRequest, SendResultEntry, Status,
 };
@@ -35,7 +37,7 @@
     session_manager: SessionManager,
     route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
     id: String,
-    endpoints: Endpoints,
+    access_endpoints: Endpoints,
 }
 
 lazy_static::lazy_static! {
@@ -49,7 +51,7 @@
 
     pub(crate) fn new(logger: &Logger, option: ClientOption) -> Result<Self, ClientError> {
         let id = Self::generate_client_id();
-        let endpoints = Endpoints::from_access_url(option.access_url().to_string())
+        let endpoints = Endpoints::from_url(option.access_url())
             .map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
         let session_manager = SessionManager::new(logger, id.clone(), &option);
         Ok(Client {
@@ -58,7 +60,7 @@
             session_manager,
             route_table: Mutex::new(HashMap::new()),
             id,
-            endpoints,
+            access_endpoints: endpoints,
         })
     }
 
@@ -86,18 +88,27 @@
     }
 
     async fn get_session(&self) -> Result<Session, ClientError> {
-        // TODO: support multiple endpoints
-        self.session_manager.get_session(&self.endpoints).await
+        self.session_manager
+            .get_session(&self.access_endpoints)
+            .await
+    }
+
+    async fn get_session_with_endpoints(
+        &self,
+        endpoints: &Endpoints,
+    ) -> Result<Session, ClientError> {
+        self.session_manager.get_session(endpoints).await
     }
 
     fn handle_response_status(
+        &self,
         status: Option<Status>,
         operation: &'static str,
     ) -> Result<(), ClientError> {
         if status.is_none() {
             return Err(ClientError::new(
                 ErrorKind::Server,
-                "Server do not return status, this may be a bug.".to_string(),
+                "Server do not return status, this may be a bug.",
                 operation,
             ));
         }
@@ -105,175 +116,252 @@
         let status = status.unwrap();
         let status_code = Code::from_i32(status.code).unwrap();
         if !status_code.eq(&Code::Ok) {
-            return Err(ClientError::new(
-                ErrorKind::Server,
-                "Server return an error.".to_string(),
-                operation,
-            )
-            .with_context("code", status_code.as_str_name())
-            .with_context("message", status.message));
+            return Err(
+                ClientError::new(ErrorKind::Server, "Server return an error.", operation)
+                    .with_context("code", status_code.as_str_name())
+                    .with_context("message", status.message),
+            );
         }
         Ok(())
     }
 
+    pub(crate) fn topic_route_from_cache(&self, topic: &str) -> Option<Arc<Route>> {
+        self.route_table.lock().get(topic).and_then(|route_status| {
+            if let RouteStatus::Found(route) = route_status {
+                debug!(self.logger, "get route for topic={} from cache", topic);
+                Some(Arc::clone(route))
+            } else {
+                None
+            }
+        })
+    }
+
     pub(crate) async fn topic_route(
         &self,
         topic: &str,
         lookup_cache: bool,
     ) -> Result<Arc<Route>, ClientError> {
-        self.topic_route_inner(self.get_session().await.unwrap(), topic, lookup_cache)
+        if lookup_cache {
+            if let Some(route) = self.topic_route_from_cache(topic) {
+                return Ok(route);
+            }
+        }
+        self.topic_route_inner(self.get_session().await.unwrap(), topic)
             .await
     }
 
-    pub(crate) async fn topic_route_inner(
+    async fn query_topic_route(
         &self,
         mut rpc_client: impl RPCClient,
         topic: &str,
-        lookup_cache: bool,
-    ) -> Result<Arc<Route>, ClientError> {
-        debug!(self.logger, "query route for topic={}", topic);
-        // TODO extract function to get route from cache
-        let rx = match self
-            .route_table
-            .lock()
-            .entry(topic.to_owned())
-            .or_insert_with(|| RouteStatus::Querying(Vec::new()))
-        {
-            RouteStatus::Found(route) => {
-                if lookup_cache {
-                    return Ok(Arc::clone(route));
-                }
-                None
-            }
-            RouteStatus::Querying(ref mut v) => {
-                if v.is_empty() {
-                    None
-                } else {
-                    let (tx, rx) = oneshot::channel();
-                    v.push(tx);
-                    Some(rx)
-                }
-            }
-        };
-
-        if let Some(rx) = rx {
-            return match rx.await {
-                Ok(route) => route,
-                Err(e) => Err(ClientError::new(
-                    ErrorKind::ChannelReceive,
-                    "Wait inflight query request failed.".to_string(),
-                    Self::OPERATION_QUERY_ROUTE,
-                )
-                .set_source(e)),
-            };
-        }
-
+    ) -> Result<Route, ClientError> {
         let request = QueryRouteRequest {
             topic: Some(Resource {
                 name: topic.to_owned(),
                 resource_namespace: self.option.name_space().to_string(),
             }),
-            endpoints: Some(self.endpoints.inner().clone()),
+            endpoints: Some(self.access_endpoints.inner().clone()),
         };
 
         let response = rpc_client.query_route(request).await?;
-        Self::handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
+        self.handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
 
         let route = Route {
+            index: AtomicUsize::new(0),
             queue: response.message_queues,
         };
-        debug!(
-            self.logger,
-            "query route for topic={} success: route={:?}", topic, route
-        );
-        let route = Arc::new(route);
-        let prev = self
-            .route_table
-            .lock()
-            .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
-        info!(self.logger, "update route for topic={}", topic);
-
-        match prev {
-            Some(RouteStatus::Found(_)) => {}
-            Some(RouteStatus::Querying(mut v)) => {
-                for item in v.drain(..) {
-                    let _ = item.send(Ok(Arc::clone(&route)));
-                }
-            }
-            None => {}
-        };
         Ok(route)
     }
 
+    async fn topic_route_inner(
+        &self,
+        rpc_client: impl RPCClient,
+        topic: &str,
+    ) -> Result<Arc<Route>, ClientError> {
+        debug!(self.logger, "query route for topic={}", topic);
+        let rx = match self
+            .route_table
+            .lock()
+            .entry(topic.to_owned())
+            .or_insert_with(|| RouteStatus::Querying(None))
+        {
+            RouteStatus::Found(_route) => None,
+            RouteStatus::Querying(ref mut option) => {
+                match option {
+                    Some(vec) => {
+                        // add self to waiting list
+                        let (tx, rx) = oneshot::channel();
+                        vec.push(tx);
+                        Some(rx)
+                    }
+                    None => {
+                        // there is no ongoing request, so we need to send a new request
+                        let _ = option.insert(Vec::new());
+                        None
+                    }
+                }
+            }
+        };
+
+        // wait for inflight request
+        if let Some(rx) = rx {
+            return match rx.await {
+                Ok(route) => route,
+                Err(_e) => Err(ClientError::new(
+                    ErrorKind::ChannelReceive,
+                    "Wait for inflight query topic route request failed.",
+                    Self::OPERATION_QUERY_ROUTE,
+                )),
+            };
+        }
+
+        let result = self.query_topic_route(rpc_client, topic).await;
+
+        // send result to all waiters
+        if result.is_ok() {
+            let route = result.unwrap();
+            debug!(
+                self.logger,
+                "query route for topic={} success: route={:?}", topic, route
+            );
+            let route = Arc::new(route);
+            let prev = self
+                .route_table
+                .lock()
+                .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
+            info!(self.logger, "update route for topic={}", topic);
+
+            if let Some(RouteStatus::Querying(Some(mut v))) = prev {
+                for item in v.drain(..) {
+                    let _ = item.send(Ok(Arc::clone(&route)));
+                }
+            };
+            Ok(route)
+        } else {
+            let err = result.unwrap_err();
+            warn!(
+                self.logger,
+                "query route for topic={} failed: error={}", topic, err
+            );
+            let prev = self.route_table.lock().remove(topic);
+            if let Some(RouteStatus::Querying(Some(mut v))) = prev {
+                for item in v.drain(..) {
+                    let _ = item.send(Err(ClientError::new(
+                        ErrorKind::Server,
+                        "Query route failed.",
+                        Self::OPERATION_QUERY_ROUTE,
+                    )));
+                }
+            };
+            Err(err)
+        }
+    }
+
     pub(crate) async fn send_message(
         &self,
-        message: Message,
-    ) -> Result<SendResultEntry, ClientError> {
-        self.send_message_inner(self.get_session().await.unwrap(), message)
-            .await
+        messages: Vec<Message>,
+        endpoints: &Endpoints,
+    ) -> Result<Vec<SendResultEntry>, ClientError> {
+        self.send_message_inner(
+            self.get_session_with_endpoints(endpoints).await.unwrap(),
+            messages,
+        )
+        .await
     }
 
     pub(crate) async fn send_message_inner(
         &self,
         mut rpc_client: impl RPCClient,
-        message: Message,
-    ) -> Result<SendResultEntry, ClientError> {
-        if let Some(properties) = &message.system_properties {
-            debug!(
-                self.logger,
-                "send for topic={:?} message_id={}", message.topic, properties.message_id
-            );
-        } else {
-            return Err(ClientError::new(
-                ErrorKind::ClientInternal,
-                "Message do not have system properties.".to_string(),
-                Self::OPERATION_SEND_MESSAGE,
-            ));
-        }
-
-        let request = SendMessageRequest {
-            messages: vec![message],
-        };
+        messages: Vec<Message>,
+    ) -> Result<Vec<SendResultEntry>, ClientError> {
+        let message_count = messages.len();
+        let request = SendMessageRequest { messages };
         let response = rpc_client.send_message(request).await?;
-        Self::handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
+        self.handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
 
-        let send_result = response.entries.get(0);
-        match send_result {
-            Some(send_result) => Ok(send_result.clone()),
-            None => Err(ClientError::new(
-                ErrorKind::Server,
-                "Server do not return send result, this may be a bug.".to_string(),
-                Self::OPERATION_SEND_MESSAGE,
-            )),
+        if response.entries.len() != message_count {
+            error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
         }
+
+        Ok(response.entries)
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::sync::atomic::Ordering;
+    use std::sync::atomic::{AtomicUsize, Ordering};
+    use std::sync::Arc;
+    use std::thread::sleep;
+    use std::time::Duration;
 
     use crate::client::Client;
     use crate::conf::ClientOption;
+    use crate::error::{ClientError, ErrorKind};
     use crate::log::terminal_logger;
-    use crate::pb::{Code, MessageQueue, QueryRouteResponse, Resource, Status};
+    use crate::model::common::Route;
+    use crate::pb::{
+        Code, MessageQueue, QueryRouteResponse, Resource, SendMessageResponse, Status,
+    };
     use crate::session;
 
     use super::CLIENT_ID_SEQUENCE;
 
     #[test]
-    fn test_client_id_sequence() {
+    fn client_id_sequence() {
         let v1 = CLIENT_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed);
         let v2 = CLIENT_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed);
         assert!(v2 > v1, "Client ID sequence should be increasing");
     }
 
-    #[tokio::test]
-    async fn client_query_route() {
-        let logger = terminal_logger();
-        let client = Client::new(&logger, ClientOption::default()).unwrap();
+    #[test]
+    fn handle_response_status() {
+        let client = Client::new(&terminal_logger(), ClientOption::default()).unwrap();
 
-        let response = Ok(QueryRouteResponse {
+        let result = client.handle_response_status(None, "test");
+        assert!(result.is_err(), "should return error when status is None");
+        let result = result.unwrap_err();
+        assert_eq!(result.kind, ErrorKind::Server);
+        assert_eq!(
+            result.message,
+            "Server do not return status, this may be a bug."
+        );
+        assert_eq!(result.operation, "test");
+
+        let result = client.handle_response_status(
+            Some(Status {
+                code: Code::BadRequest as i32,
+                message: "test failed".to_string(),
+            }),
+            "test failed",
+        );
+        assert!(
+            result.is_err(),
+            "should return error when status is BadRequest"
+        );
+        let result = result.unwrap_err();
+        assert_eq!(result.kind, ErrorKind::Server);
+        assert_eq!(result.message, "Server return an error.");
+        assert_eq!(result.operation, "test failed");
+        assert_eq!(
+            result.context,
+            vec![
+                ("code", "BAD_REQUEST".to_string()),
+                ("message", "test failed".to_string())
+            ]
+        );
+
+        let result = client.handle_response_status(
+            Some(Status {
+                code: Code::Ok as i32,
+                message: "test success".to_string(),
+            }),
+            "test success",
+        );
+        assert!(result.is_ok(), "should not return error when status is Ok");
+    }
+
+    fn new_topic_route_response() -> Result<QueryRouteResponse, ClientError> {
+        Ok(QueryRouteResponse {
             status: Some(Status {
                 code: Code::Ok as i32,
                 message: "Success".to_string(),
@@ -288,14 +376,34 @@
                 broker: None,
                 accept_message_types: vec![],
             }],
-        });
+        })
+    }
+
+    #[tokio::test]
+    async fn client_query_route_from_cache() {
+        let logger = terminal_logger();
+        let client = Client::new(&logger, ClientOption::default()).unwrap();
+        client.route_table.lock().insert(
+            "DefaultCluster".to_string(),
+            super::RouteStatus::Found(Arc::new(Route {
+                index: AtomicUsize::new(0),
+                queue: vec![],
+            })),
+        );
+        let option = client.topic_route_from_cache("DefaultCluster");
+        assert!(option.is_some());
+    }
+
+    #[tokio::test]
+    async fn client_query_route() {
+        let logger = terminal_logger();
+        let client = Client::new(&logger, ClientOption::default()).unwrap();
 
         let mut mock = session::MockRPCClient::new();
         mock.expect_query_route()
-            .times(1)
-            .return_once(|_| Box::pin(futures::future::ready(response)));
+            .return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));
 
-        let result = client.topic_route_inner(mock, "DefaultCluster", true).await;
+        let result = client.topic_route_inner(mock, "DefaultCluster").await;
         assert!(result.is_ok());
 
         let route = result.unwrap();
@@ -308,4 +416,85 @@
         assert_eq!(topic.name, "DefaultCluster");
         assert_eq!(topic.resource_namespace, "default");
     }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn client_query_route_with_inflight_request() {
+        let logger = terminal_logger();
+        let client = Client::new(&logger, ClientOption::default()).unwrap();
+        let client = Arc::new(client);
+
+        let client_clone = client.clone();
+        tokio::spawn(async move {
+            let mut mock = session::MockRPCClient::new();
+            mock.expect_query_route().return_once(|_| {
+                sleep(Duration::from_millis(200));
+                Box::pin(futures::future::ready(new_topic_route_response()))
+            });
+
+            let result = client_clone.topic_route_inner(mock, "DefaultCluster").await;
+            assert!(result.is_ok());
+        });
+
+        let handle = tokio::spawn(async move {
+            sleep(Duration::from_millis(100));
+            let mock = session::MockRPCClient::new();
+            let result = client.topic_route_inner(mock, "DefaultCluster").await;
+            assert!(result.is_ok());
+        });
+
+        awaitility::at_most(Duration::from_secs(1)).until(|| handle.is_finished());
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn client_query_route_with_failed_request() {
+        let logger = terminal_logger();
+        let client = Client::new(&logger, ClientOption::default()).unwrap();
+        let client = Arc::new(client);
+
+        let client_clone = client.clone();
+        tokio::spawn(async move {
+            let mut mock = session::MockRPCClient::new();
+            mock.expect_query_route().return_once(|_| {
+                sleep(Duration::from_millis(200));
+                Box::pin(futures::future::ready(Err(ClientError::new(
+                    ErrorKind::Server,
+                    "Server error",
+                    "test",
+                ))))
+            });
+
+            let result = client_clone.topic_route_inner(mock, "DefaultCluster").await;
+            assert!(result.is_err());
+        });
+
+        let handle = tokio::spawn(async move {
+            sleep(Duration::from_millis(100));
+            let mock = session::MockRPCClient::new();
+            let result = client.topic_route_inner(mock, "DefaultCluster").await;
+            assert!(result.is_err());
+        });
+
+        awaitility::at_most(Duration::from_secs(1)).until(|| handle.is_finished());
+    }
+
+    #[tokio::test]
+    async fn client_send_message() {
+        let response = Ok(SendMessageResponse {
+            status: Some(Status {
+                code: Code::Ok as i32,
+                message: "Success".to_string(),
+            }),
+            entries: vec![],
+        });
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_send_message()
+            .return_once(|_| Box::pin(futures::future::ready(response)));
+
+        let client = Client::new(&terminal_logger(), ClientOption::default()).unwrap();
+        let send_result = client.send_message_inner(mock, vec![]).await;
+        assert!(send_result.is_ok());
+
+        let send_results = send_result.unwrap();
+        assert_eq!(send_results.len(), 0);
+    }
 }
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 233742a..334a1e8 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -65,6 +65,7 @@
     logging_format: LoggingFormat,
     prefetch_route: bool,
     topics: Option<Vec<String>>,
+    namespace: String,
 }
 
 impl Default for ProducerOption {
@@ -73,6 +74,7 @@
             logging_format: LoggingFormat::Terminal,
             prefetch_route: true,
             topics: None,
+            namespace: "".to_string(),
         }
     }
 }
@@ -98,4 +100,11 @@
     pub fn set_topics(&mut self, topics: Vec<String>) {
         self.topics = Some(topics);
     }
+
+    pub fn namespace(&self) -> &str {
+        &self.namespace
+    }
+    pub fn set_namespace(&mut self, name_space: String) {
+        self.namespace = name_space;
+    }
 }
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 9d3af04..6ae04e3 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -26,9 +26,15 @@
     #[error("Failed to create session")]
     Connect,
 
+    #[error("Message is invalid")]
+    InvalidMessage,
+
     #[error("Server error")]
     Server,
 
+    #[error("No broker available to send message")]
+    NoBrokerAvailable,
+
     #[error("Client internal error")]
     ClientInternal,
 
@@ -53,10 +59,10 @@
 impl Error for ClientError {}
 
 impl ClientError {
-    pub fn new(kind: ErrorKind, message: String, operation: &'static str) -> Self {
+    pub fn new(kind: ErrorKind, message: &str, operation: &'static str) -> Self {
         Self {
             kind,
-            message,
+            message: message.to_string(),
             operation,
             context: Vec::new(),
             source: None,
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 17b9bd0..4a0c165 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -22,16 +22,18 @@
 mod log;
 
 mod client;
-mod model;
 
 #[allow(clippy::all)]
 #[path = "pb/apache.rocketmq.v2.rs"]
 mod pb;
 mod session;
 
-pub(crate) mod producer;
+#[allow(dead_code)]
+mod model;
+mod producer;
 
 // Export structs that are part of crate API.
+pub use conf::ClientOption;
+pub use conf::ProducerOption;
+pub use model::message::MessageImpl;
 pub use producer::Producer;
-
-pub mod models;
diff --git a/rust/src/model.rs b/rust/src/model.rs
deleted file mode 100644
index e549b06..0000000
--- a/rust/src/model.rs
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use std::net::IpAddr;
-use std::sync::Arc;
-
-use tokio::sync::oneshot;
-
-use crate::error::{ClientError, ErrorKind};
-use crate::pb;
-use crate::pb::{Address, AddressScheme, MessageQueue};
-
-#[derive(Debug)]
-pub struct Route {
-    pub queue: Vec<MessageQueue>,
-}
-
-#[derive(Debug)]
-pub(crate) enum RouteStatus {
-    Querying(Vec<oneshot::Sender<Result<Arc<Route>, ClientError>>>),
-    Found(Arc<Route>),
-}
-
-#[derive(Debug)]
-pub(crate) struct Endpoints {
-    access_url: String,
-    scheme: AddressScheme,
-    inner: pb::Endpoints,
-}
-
-impl Endpoints {
-    const ENDPOINT_SEPARATOR: &'static str = ",";
-    const ADDRESS_SEPARATOR: &'static str = ":";
-
-    pub fn from_access_url(access_url: String) -> Result<Self, ClientError> {
-        if access_url.is_empty() {
-            return Err(ClientError::new(
-                ErrorKind::Config,
-                "Access url is empty.".to_string(),
-                "access_url.parse",
-            )
-            .with_context("access_url", access_url));
-        }
-
-        let mut scheme = AddressScheme::DomainName;
-        let mut urls = Vec::new();
-        for url in access_url.split(Self::ENDPOINT_SEPARATOR) {
-            if let Some((host, port)) = url.rsplit_once(Self::ADDRESS_SEPARATOR) {
-                let port_i32 = port.parse::<i32>().map_err(|e| {
-                    ClientError::new(
-                        ErrorKind::Config,
-                        format!("Port {} in access url is invalid.", port),
-                        "access_url.parse",
-                    )
-                    .with_context("access_url", access_url.clone())
-                    .set_source(e)
-                })?;
-                urls.push((host.to_string(), port_i32));
-            } else {
-                return Err(ClientError::new(
-                    ErrorKind::Config,
-                    "Port in access url is missing.".to_string(),
-                    "access_url.parse",
-                )
-                .with_context("access_url", access_url));
-            }
-        }
-
-        let mut addresses = Vec::new();
-        let urls_len = urls.len();
-        for (host, port) in urls.into_iter() {
-            match host.parse::<IpAddr>() {
-                Ok(ip_addr) => {
-                    match ip_addr {
-                        IpAddr::V4(_) => {
-                            if scheme == AddressScheme::IPv6 {
-                                return Err(ClientError::new(
-                                    ErrorKind::Config,
-                                    "Multiple addresses not in the same schema.".to_string(),
-                                    "access_url.parse",
-                                )
-                                .with_context("access_url", access_url));
-                            }
-                            scheme = AddressScheme::IPv4
-                        }
-                        IpAddr::V6(_) => {
-                            if scheme == AddressScheme::IPv4 {
-                                return Err(ClientError::new(
-                                    ErrorKind::Config,
-                                    "Multiple addresses not in the same schema.".to_string(),
-                                    "access_url.parse",
-                                )
-                                .with_context("access_url", access_url));
-                            }
-                            scheme = AddressScheme::IPv6
-                        }
-                    }
-                    addresses.push(Address { host, port });
-                }
-                Err(_) => {
-                    if urls_len > 1 {
-                        return Err(ClientError::new(
-                            ErrorKind::Config,
-                            "Multiple addresses not allowed in domain schema.".to_string(),
-                            "access_url.parse",
-                        )
-                        .with_context("access_url", access_url));
-                    }
-                    scheme = AddressScheme::DomainName;
-                    addresses.push(Address { host, port });
-                }
-            }
-        }
-
-        Ok(Endpoints {
-            access_url,
-            scheme,
-            inner: pb::Endpoints {
-                scheme: scheme as i32,
-                addresses,
-            },
-        })
-    }
-
-    pub fn access_url(&self) -> &str {
-        &self.access_url
-    }
-
-    pub fn scheme(&self) -> AddressScheme {
-        self.scheme
-    }
-
-    pub fn inner(&self) -> &pb::Endpoints {
-        &self.inner
-    }
-
-    pub fn into_inner(self) -> pb::Endpoints {
-        self.inner
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::error::ErrorKind;
-    use crate::model::Endpoints;
-    use crate::pb::AddressScheme;
-
-    #[test]
-    fn parse_domain_access_url() {
-        let endpoints = Endpoints::from_access_url("localhost:8080".to_string()).unwrap();
-        assert_eq!(endpoints.access_url(), "localhost:8080");
-        assert_eq!(endpoints.scheme(), AddressScheme::DomainName);
-        let inner = endpoints.into_inner();
-        assert_eq!(inner.addresses.len(), 1);
-        assert_eq!(inner.addresses[0].host, "localhost");
-        assert_eq!(inner.addresses[0].port, 8080);
-    }
-
-    #[test]
-    fn parse_ipv4_access_url() {
-        let endpoints = Endpoints::from_access_url("127.0.0.1:8080".to_string()).unwrap();
-        assert_eq!(endpoints.access_url(), "127.0.0.1:8080");
-        assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
-        let inner = endpoints.into_inner();
-        assert_eq!(inner.addresses.len(), 1);
-        assert_eq!(inner.addresses[0].host, "127.0.0.1");
-        assert_eq!(inner.addresses[0].port, 8080);
-    }
-
-    #[test]
-    fn parse_ipv6_access_url() {
-        let endpoints = Endpoints::from_access_url("::1:8080".to_string()).unwrap();
-        assert_eq!(endpoints.access_url(), "::1:8080");
-        assert_eq!(endpoints.scheme(), AddressScheme::IPv6);
-        let inner = endpoints.into_inner();
-        assert_eq!(inner.addresses.len(), 1);
-        assert_eq!(inner.addresses[0].host, "::1");
-        assert_eq!(inner.addresses[0].port, 8080);
-    }
-
-    #[test]
-    fn parse_access_url_failed() {
-        let err = Endpoints::from_access_url("".to_string()).err().unwrap();
-        assert_eq!(err.kind, ErrorKind::Config);
-        assert_eq!(err.operation, "access_url.parse");
-        assert_eq!(err.message, "Access url is empty.");
-
-        let err = Endpoints::from_access_url("localhost:<port>".to_string())
-            .err()
-            .unwrap();
-        assert_eq!(err.kind, ErrorKind::Config);
-        assert_eq!(err.operation, "access_url.parse");
-        assert_eq!(err.message, "Port <port> in access url is invalid.");
-
-        let err = Endpoints::from_access_url("localhost".to_string())
-            .err()
-            .unwrap();
-        assert_eq!(err.kind, ErrorKind::Config);
-        assert_eq!(err.operation, "access_url.parse");
-        assert_eq!(err.message, "Port in access url is missing.");
-
-        let err = Endpoints::from_access_url("127.0.0.1:8080,::1:8080".to_string())
-            .err()
-            .unwrap();
-        assert_eq!(err.kind, ErrorKind::Config);
-        assert_eq!(err.operation, "access_url.parse");
-        assert_eq!(err.message, "Multiple addresses not in the same schema.");
-
-        let err = Endpoints::from_access_url("::1:8080,127.0.0.1:8080".to_string())
-            .err()
-            .unwrap();
-        assert_eq!(err.kind, ErrorKind::Config);
-        assert_eq!(err.operation, "access_url.parse");
-        assert_eq!(err.message, "Multiple addresses not in the same schema.");
-
-        let err = Endpoints::from_access_url("localhost:8080,localhost:8081".to_string())
-            .err()
-            .unwrap();
-        assert_eq!(err.kind, ErrorKind::Config);
-        assert_eq!(err.operation, "access_url.parse");
-        assert_eq!(
-            err.message,
-            "Multiple addresses not allowed in domain schema."
-        );
-    }
-}
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
new file mode 100644
index 0000000..2082c28
--- /dev/null
+++ b/rust/src/model/common.rs
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::net::IpAddr;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+use tokio::sync::oneshot;
+
+use crate::error::{ClientError, ErrorKind};
+use crate::pb;
+use crate::pb::{Address, AddressScheme, MessageQueue};
+
+#[derive(Debug)]
+pub struct Route {
+    pub(crate) index: AtomicUsize,
+    pub queue: Vec<MessageQueue>,
+}
+
+#[derive(Debug)]
+pub(crate) enum RouteStatus {
+    Querying(Option<Vec<oneshot::Sender<Result<Arc<Route>, ClientError>>>>),
+    Found(Arc<Route>),
+}
+
+#[derive(Debug)]
+pub(crate) struct Endpoints {
+    endpoint_url: String,
+    scheme: AddressScheme,
+    inner: pb::Endpoints,
+}
+
+impl Endpoints {
+    const OPERATION_PARSE: &'static str = "endpoint.parse";
+
+    const ENDPOINT_SEPARATOR: &'static str = ",";
+    const ADDRESS_SEPARATOR: &'static str = ":";
+
+    pub fn from_url(endpoint_url: &str) -> Result<Self, ClientError> {
+        if endpoint_url.is_empty() {
+            return Err(ClientError::new(
+                ErrorKind::Config,
+                "Endpoint url is empty.",
+                Self::OPERATION_PARSE,
+            )
+            .with_context("url", endpoint_url));
+        }
+
+        let mut scheme = AddressScheme::DomainName;
+        let mut urls = Vec::new();
+        for url in endpoint_url.split(Self::ENDPOINT_SEPARATOR) {
+            if let Some((host, port)) = url.rsplit_once(Self::ADDRESS_SEPARATOR) {
+                let port_i32 = port.parse::<i32>().map_err(|e| {
+                    ClientError::new(
+                        ErrorKind::Config,
+                        &format!("Port {} in endpoint url is invalid.", port),
+                        Self::OPERATION_PARSE,
+                    )
+                    .with_context("url", endpoint_url)
+                    .set_source(e)
+                })?;
+                urls.push((host.to_string(), port_i32));
+            } else {
+                return Err(ClientError::new(
+                    ErrorKind::Config,
+                    "Port in endpoint url is missing.",
+                    Self::OPERATION_PARSE,
+                )
+                .with_context("url", endpoint_url));
+            }
+        }
+
+        let mut addresses = Vec::new();
+        let urls_len = urls.len();
+        for (host, port) in urls.into_iter() {
+            match host.parse::<IpAddr>() {
+                Ok(ip_addr) => {
+                    match ip_addr {
+                        IpAddr::V4(_) => {
+                            if scheme == AddressScheme::IPv6 {
+                                return Err(ClientError::new(
+                                    ErrorKind::Config,
+                                    "Multiple addresses not in the same schema.",
+                                    Self::OPERATION_PARSE,
+                                )
+                                .with_context("url", endpoint_url));
+                            }
+                            scheme = AddressScheme::IPv4
+                        }
+                        IpAddr::V6(_) => {
+                            if scheme == AddressScheme::IPv4 {
+                                return Err(ClientError::new(
+                                    ErrorKind::Config,
+                                    "Multiple addresses not in the same schema.",
+                                    Self::OPERATION_PARSE,
+                                )
+                                .with_context("url", endpoint_url));
+                            }
+                            scheme = AddressScheme::IPv6
+                        }
+                    }
+                    addresses.push(Address { host, port });
+                }
+                Err(_) => {
+                    if urls_len > 1 {
+                        return Err(ClientError::new(
+                            ErrorKind::Config,
+                            "Multiple addresses not allowed in domain schema.",
+                            Self::OPERATION_PARSE,
+                        )
+                        .with_context("url", endpoint_url));
+                    }
+                    scheme = AddressScheme::DomainName;
+                    addresses.push(Address { host, port });
+                }
+            }
+        }
+
+        Ok(Endpoints {
+            endpoint_url: endpoint_url.to_string(),
+            scheme,
+            inner: pb::Endpoints {
+                scheme: scheme as i32,
+                addresses,
+            },
+        })
+    }
+
+    pub(crate) fn from_pb_endpoints(endpoints: pb::Endpoints) -> Self {
+        let mut addresses = Vec::new();
+        for address in endpoints.addresses.iter() {
+            addresses.push(format!("{}:{}", address.host, address.port));
+        }
+
+        Endpoints {
+            endpoint_url: addresses.join(Self::ENDPOINT_SEPARATOR),
+            scheme: endpoints.scheme(),
+            inner: endpoints,
+        }
+    }
+
+    pub fn endpoint_url(&self) -> &str {
+        &self.endpoint_url
+    }
+
+    pub fn scheme(&self) -> AddressScheme {
+        self.scheme
+    }
+
+    pub fn inner(&self) -> &pb::Endpoints {
+        &self.inner
+    }
+
+    pub fn into_inner(self) -> pb::Endpoints {
+        self.inner
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::error::ErrorKind;
+    use crate::model::common::Endpoints;
+    use crate::pb;
+    use crate::pb::{Address, AddressScheme};
+
+    #[test]
+    fn parse_domain_endpoint_url() {
+        let endpoints = Endpoints::from_url("localhost:8080").unwrap();
+        assert_eq!(endpoints.endpoint_url(), "localhost:8080");
+        assert_eq!(endpoints.scheme(), AddressScheme::DomainName);
+        let inner = endpoints.into_inner();
+        assert_eq!(inner.addresses.len(), 1);
+        assert_eq!(inner.addresses[0].host, "localhost");
+        assert_eq!(inner.addresses[0].port, 8080);
+    }
+
+    #[test]
+    fn parse_ipv4_endpoint_url() {
+        let endpoints = Endpoints::from_url("127.0.0.1:8080").unwrap();
+        assert_eq!(endpoints.endpoint_url(), "127.0.0.1:8080");
+        assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
+        let inner = endpoints.into_inner();
+        assert_eq!(inner.addresses.len(), 1);
+        assert_eq!(inner.addresses[0].host, "127.0.0.1");
+        assert_eq!(inner.addresses[0].port, 8080);
+    }
+
+    #[test]
+    fn parse_ipv6_endpoint_url() {
+        let endpoints = Endpoints::from_url("::1:8080").unwrap();
+        assert_eq!(endpoints.endpoint_url(), "::1:8080");
+        assert_eq!(endpoints.scheme(), AddressScheme::IPv6);
+        let inner = endpoints.into_inner();
+        assert_eq!(inner.addresses.len(), 1);
+        assert_eq!(inner.addresses[0].host, "::1");
+        assert_eq!(inner.addresses[0].port, 8080);
+    }
+
+    #[test]
+    fn parse_endpoint_url_failed() {
+        let err = Endpoints::from_url("").err().unwrap();
+        assert_eq!(err.kind, ErrorKind::Config);
+        assert_eq!(err.operation, "endpoint.parse");
+        assert_eq!(err.message, "Endpoint url is empty.");
+
+        let err = Endpoints::from_url("localhost:<port>").err().unwrap();
+        assert_eq!(err.kind, ErrorKind::Config);
+        assert_eq!(err.operation, "endpoint.parse");
+        assert_eq!(err.message, "Port <port> in endpoint url is invalid.");
+
+        let err = Endpoints::from_url("localhost").err().unwrap();
+        assert_eq!(err.kind, ErrorKind::Config);
+        assert_eq!(err.operation, "endpoint.parse");
+        assert_eq!(err.message, "Port in endpoint url is missing.");
+
+        let err = Endpoints::from_url("127.0.0.1:8080,::1:8080")
+            .err()
+            .unwrap();
+        assert_eq!(err.kind, ErrorKind::Config);
+        assert_eq!(err.operation, "endpoint.parse");
+        assert_eq!(err.message, "Multiple addresses not in the same schema.");
+
+        let err = Endpoints::from_url("::1:8080,127.0.0.1:8080")
+            .err()
+            .unwrap();
+        assert_eq!(err.kind, ErrorKind::Config);
+        assert_eq!(err.operation, "endpoint.parse");
+        assert_eq!(err.message, "Multiple addresses not in the same schema.");
+
+        let err = Endpoints::from_url("localhost:8080,localhost:8081")
+            .err()
+            .unwrap();
+        assert_eq!(err.kind, ErrorKind::Config);
+        assert_eq!(err.operation, "endpoint.parse");
+        assert_eq!(
+            err.message,
+            "Multiple addresses not allowed in domain schema."
+        );
+    }
+
+    #[test]
+    fn parse_pb_endpoints() {
+        let pb_endpoints = pb::Endpoints {
+            scheme: AddressScheme::IPv4 as i32,
+            addresses: vec![
+                Address {
+                    host: "localhost".to_string(),
+                    port: 8080,
+                },
+                Address {
+                    host: "127.0.0.1".to_string(),
+                    port: 8081,
+                },
+            ],
+        };
+
+        let endpoints = Endpoints::from_pb_endpoints(pb_endpoints);
+        assert_eq!(endpoints.endpoint_url(), "localhost:8080,127.0.0.1:8081");
+        assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
+        assert_eq!(endpoints.into_inner().addresses.len(), 2);
+    }
+}
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
new file mode 100644
index 0000000..17584d0
--- /dev/null
+++ b/rust/src/model/message.rs
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::error::{ClientError, ErrorKind};
+use crate::model::message_id::UNIQ_ID_GENERATOR;
+use std::collections::HashMap;
+
+pub trait Message {
+    fn take_message_id(&mut self) -> String;
+    fn take_topic(&mut self) -> String;
+    fn take_body(&mut self) -> Vec<u8>;
+    fn take_tag(&mut self) -> Option<String>;
+    fn take_keys(&mut self) -> Vec<String>;
+    fn take_properties(&mut self) -> HashMap<String, String>;
+    fn take_message_group(&mut self) -> Option<String>;
+    fn take_delivery_timestamp(&mut self) -> Option<i64>;
+}
+
+#[derive(Debug)]
+pub struct MessageImpl {
+    pub(crate) message_id: String,
+    pub(crate) topic: String,
+    pub(crate) body: Option<Vec<u8>>,
+    pub(crate) tags: Option<String>,
+    pub(crate) keys: Option<Vec<String>>,
+    pub(crate) properties: Option<HashMap<String, String>>,
+    pub(crate) message_group: Option<String>,
+    pub(crate) delivery_timestamp: Option<i64>,
+}
+
+impl Message for MessageImpl {
+    fn take_message_id(&mut self) -> String {
+        self.message_id.clone()
+    }
+
+    fn take_topic(&mut self) -> String {
+        self.topic.clone()
+    }
+
+    fn take_body(&mut self) -> Vec<u8> {
+        self.body.take().unwrap_or(vec![])
+    }
+
+    fn take_tag(&mut self) -> Option<String> {
+        self.tags.take()
+    }
+
+    fn take_keys(&mut self) -> Vec<String> {
+        self.keys.take().unwrap_or(vec![])
+    }
+
+    fn take_properties(&mut self) -> HashMap<String, String> {
+        self.properties.take().unwrap_or(HashMap::new())
+    }
+
+    fn take_message_group(&mut self) -> Option<String> {
+        self.message_group.take()
+    }
+
+    fn take_delivery_timestamp(&mut self) -> Option<i64> {
+        self.delivery_timestamp.take()
+    }
+}
+
+impl MessageImpl {
+    pub fn builder() -> MessageBuilder {
+        MessageBuilder {
+            message: MessageImpl {
+                message_id: UNIQ_ID_GENERATOR.lock().next_id(),
+                topic: "".to_string(),
+                body: None,
+                tags: None,
+                keys: None,
+                properties: None,
+                message_group: None,
+                delivery_timestamp: None,
+            },
+        }
+    }
+
+    pub fn fifo_message_builder(
+        topic: String,
+        body: Vec<u8>,
+        message_group: String,
+    ) -> MessageBuilder {
+        MessageBuilder {
+            message: MessageImpl {
+                message_id: UNIQ_ID_GENERATOR.lock().next_id(),
+                topic,
+                body: Some(body),
+                tags: None,
+                keys: None,
+                properties: None,
+                message_group: Some(message_group),
+                delivery_timestamp: None,
+            },
+        }
+    }
+
+    pub fn delay_message_builder(topic: String, body: Vec<u8>, delay_time: i64) -> MessageBuilder {
+        MessageBuilder {
+            message: MessageImpl {
+                message_id: UNIQ_ID_GENERATOR.lock().next_id(),
+                topic,
+                body: Some(body),
+                tags: None,
+                keys: None,
+                properties: None,
+                message_group: None,
+                delivery_timestamp: Some(delay_time),
+            },
+        }
+    }
+}
+
+pub struct MessageBuilder {
+    message: MessageImpl,
+}
+
+impl MessageBuilder {
+    const OPERATION_BUILD_MESSAGE: &'static str = "build_message";
+
+    pub fn set_topic(mut self, topic: String) -> Self {
+        self.message.topic = topic;
+        self
+    }
+
+    pub fn set_body(mut self, body: Vec<u8>) -> Self {
+        self.message.body = Some(body);
+        self
+    }
+
+    pub fn set_tags(mut self, tags: String) -> Self {
+        self.message.tags = Some(tags);
+        self
+    }
+
+    pub fn set_keys(mut self, keys: Vec<String>) -> Self {
+        self.message.keys = Some(keys);
+        self
+    }
+
+    pub fn set_properties(mut self, properties: HashMap<String, String>) -> Self {
+        self.message.properties = Some(properties);
+        self
+    }
+
+    pub fn set_message_group(mut self, message_group: String) -> Self {
+        self.message.message_group = Some(message_group);
+        self
+    }
+
+    pub fn set_delivery_timestamp(mut self, delivery_timestamp: i64) -> Self {
+        self.message.delivery_timestamp = Some(delivery_timestamp);
+        self
+    }
+
+    fn check_message(&self) -> Result<(), String> {
+        if self.message.topic.is_empty() {
+            return Err("Topic is empty.".to_string());
+        }
+        if self.message.body.is_none() {
+            return Err("Body is empty.".to_string());
+        }
+        if self.message.message_group.is_some() && self.message.delivery_timestamp.is_some() {
+            return Err(
+                "message_group and delivery_timestamp can not be set at the same time.".to_string(),
+            );
+        }
+        Ok(())
+    }
+
+    pub fn build(self) -> Result<MessageImpl, ClientError> {
+        self.check_message().map_err(|e| {
+            ClientError::new(ErrorKind::InvalidMessage, &e, Self::OPERATION_BUILD_MESSAGE)
+        })?;
+        Ok(self.message)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_message() {
+        let mut properties = HashMap::new();
+        properties.insert("key".to_string(), "value".to_string());
+        let message = MessageImpl::builder()
+            .set_topic("test".to_string())
+            .set_body(vec![1, 2, 3])
+            .set_tags("tag".to_string())
+            .set_keys(vec!["key".to_string()])
+            .set_properties(properties)
+            .build();
+        assert!(message.is_ok());
+
+        let mut message = message.unwrap();
+        assert_eq!(message.take_topic(), "test");
+        assert_eq!(message.take_body(), vec![1, 2, 3]);
+        assert_eq!(message.take_tag(), Some("tag".to_string()));
+        assert_eq!(message.take_keys(), vec!["key".to_string()]);
+        assert_eq!(message.take_properties(), {
+            let mut properties = HashMap::new();
+            properties.insert("key".to_string(), "value".to_string());
+            properties
+        });
+
+        let message = MessageImpl::builder()
+            .set_topic("test".to_string())
+            .set_body(vec![1, 2, 3])
+            .set_message_group("message_group".to_string())
+            .set_delivery_timestamp(123456789)
+            .build();
+        assert!(message.is_err());
+        let err = message.unwrap_err();
+        assert_eq!(err.kind, ErrorKind::InvalidMessage);
+        assert_eq!(
+            err.message,
+            "message_group and delivery_timestamp can not be set at the same time."
+        );
+
+        let message = MessageImpl::builder()
+            .set_topic("test".to_string())
+            .set_body(vec![1, 2, 3])
+            .set_message_group("message_group".to_string())
+            .build();
+        let mut message = message.unwrap();
+        assert_eq!(
+            message.take_message_group(),
+            Some("message_group".to_string())
+        );
+
+        let message = MessageImpl::builder()
+            .set_topic("test".to_string())
+            .set_body(vec![1, 2, 3])
+            .set_delivery_timestamp(123456789)
+            .build();
+        let mut message = message.unwrap();
+        assert_eq!(message.take_delivery_timestamp(), Some(123456789));
+    }
+}
diff --git a/rust/src/models/message_id.rs b/rust/src/model/message_id.rs
similarity index 95%
rename from rust/src/models/message_id.rs
rename to rust/src/model/message_id.rs
index 92d4e44..1357a60 100644
--- a/rust/src/models/message_id.rs
+++ b/rust/src/model/message_id.rs
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-use byteorder::{BigEndian, WriteBytesExt};
-use once_cell::sync::Lazy;
-use parking_lot::Mutex;
 use std::io::Write;
 use std::process;
 use std::time::SystemTime;
+
+use byteorder::{BigEndian, WriteBytesExt};
+use once_cell::sync::Lazy;
+use parking_lot::Mutex;
 use time::{Date, OffsetDateTime, PrimitiveDateTime, Time};
 
 /**
@@ -62,7 +63,7 @@
 pub(crate) static UNIQ_ID_GENERATOR: Lazy<Mutex<UniqueIdGenerator>> = Lazy::new(|| {
     let mut wtr = Vec::new();
     wtr.write_u8(1).unwrap();
-    //mac
+    // mac
     let x = mac_address::get_mac_address().unwrap();
     let ma = match x {
         Some(ma) => ma,
@@ -71,9 +72,8 @@
         }
     };
     wtr.write_all(&ma.bytes()).unwrap();
-    //processid
-    wtr.write_u16::<byteorder::BigEndian>(process::id() as u16)
-        .unwrap();
+    // process id
+    wtr.write_u16::<BigEndian>(process::id() as u16).unwrap();
     let generator = UniqueIdGenerator {
         counter: 0,
         start_timestamp: 0,
@@ -91,7 +91,7 @@
 }
 
 impl UniqueIdGenerator {
-    pub fn generate(&mut self) -> String {
+    pub fn next_id(&mut self) -> String {
         if SystemTime::now()
             .duration_since(SystemTime::UNIX_EPOCH)
             .unwrap()
@@ -133,7 +133,7 @@
     fn text_generate_uniq_id() {
         use super::UNIQ_ID_GENERATOR;
         for i in 0..10 {
-            let uid = UNIQ_ID_GENERATOR.lock().generate();
+            let uid = UNIQ_ID_GENERATOR.lock().next_id();
             println!("i: {}, uid: {}", i, uid);
         }
     }
diff --git a/rust/src/models/mod.rs b/rust/src/model/mod.rs
similarity index 96%
rename from rust/src/models/mod.rs
rename to rust/src/model/mod.rs
index 8496998..1a873d6 100644
--- a/rust/src/models/mod.rs
+++ b/rust/src/model/mod.rs
@@ -14,6 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+pub(crate) mod common;
 pub(crate) mod message;
 pub(crate) mod message_id;
-pub(crate) mod message_view;
diff --git a/rust/src/models/message.rs b/rust/src/models/message.rs
deleted file mode 100644
index f8366aa..0000000
--- a/rust/src/models/message.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use std::{
-    collections::HashMap,
-    io::Write,
-    mem, process,
-    sync::Arc,
-    sync::{atomic::AtomicUsize, Weak},
-};
-pub(crate) struct MessageImpl {
-    pub(crate) keys: Vec<String>,
-    pub(crate) body: Vec<u8>,
-    pub(crate) topic: String,
-    pub(crate) tags: String,
-    pub(crate) message_group: String,
-    pub(crate) delivery_timestamp: i64,
-    pub(crate) properties: HashMap<String, String>,
-}
-
-impl MessageImpl {
-    pub fn new(topic: &str, tags: &str, keys: Vec<String>, body: &str) -> Self {
-        MessageImpl {
-            keys: keys,
-            body: body.as_bytes().to_vec(),
-            topic: topic.to_string(),
-            tags: tags.to_string(),
-            message_group: "".to_string(),
-            delivery_timestamp: 0,
-            properties: HashMap::new(),
-        }
-    }
-}
diff --git a/rust/src/models/message_view.rs b/rust/src/models/message_view.rs
deleted file mode 100644
index f0849b0..0000000
--- a/rust/src/models/message_view.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#[derive(Debug)]
-pub(crate) struct MessageView {
-    pub(crate) body: Vec<u8>,
-    pub(crate) message_id: String,
-    pub(crate) topic: String,
-    pub(crate) consume_group: String,
-    pub(crate) endpoint: String,
-    pub(crate) receipt_handle: String,
-}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 891c9c4..e288a5c 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -20,12 +20,22 @@
 //! `Producer` is a thin wrapper of internal `Client` struct that shoulders the actual workloads.
 //! Most of its methods take shared reference so that application developers may use it at will.
 
+use std::hash::Hasher;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use prost_types::Timestamp;
+use siphasher::sip::SipHasher24;
+use slog::{info, Logger};
+
 use crate::client::Client;
 use crate::conf::{ClientOption, ProducerOption};
-use crate::error::ClientError;
-use crate::log;
-use crate::pb::{Message, SendResultEntry};
-use slog::{info, Logger};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{Endpoints, Route};
+use crate::model::message;
+use crate::pb::{Encoding, MessageQueue, Resource, SendResultEntry, SystemProperties};
+use crate::{log, pb};
 
 /// `Producer` is the core struct, to which application developers should turn, when publishing messages to brokers.
 ///
@@ -37,7 +47,16 @@
     client: Client,
 }
 
+lazy_static::lazy_static! {
+    static ref HOST_NAME: String = match hostname::get() {
+        Ok(name) => name.to_str().unwrap_or("localhost").to_string(),
+        Err(_) => "localhost".to_string(),
+    };
+}
+
 impl Producer {
+    const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
+
     pub async fn new(
         option: ProducerOption,
         client_option: ClientOption,
@@ -65,49 +84,283 @@
         Ok(())
     }
 
-    pub async fn send(&self, message: Message) -> Result<SendResultEntry, ClientError> {
-        self.client.send_message(message).await
+    fn transform_messages_to_protobuf(
+        &self,
+        messages: Vec<impl message::Message>,
+    ) -> Result<(String, Option<String>, Vec<pb::Message>), ClientError> {
+        if messages.is_empty() {
+            return Err(ClientError::new(
+                ErrorKind::InvalidMessage,
+                "No message found.",
+                Self::OPERATION_SEND_MESSAGE,
+            ));
+        }
+
+        let born_timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
+            Ok(duration) => Some(Timestamp {
+                seconds: duration.as_secs() as i64,
+                nanos: 0,
+            }),
+            Err(_) => None,
+        };
+
+        let mut pb_messages = Vec::with_capacity(messages.len());
+        let mut last_topic: Option<String> = None;
+        let mut last_message_group: Option<Option<String>> = None;
+
+        for mut message in messages {
+            if let Some(last_topic) = last_topic.clone() {
+                if last_topic.ne(&message.take_topic()) {
+                    return Err(ClientError::new(
+                        ErrorKind::InvalidMessage,
+                        "Not all messages have the same topic.",
+                        Self::OPERATION_SEND_MESSAGE,
+                    ));
+                }
+            } else {
+                last_topic = Some(message.take_topic());
+            }
+
+            let message_group = message.take_message_group();
+            if let Some(last_message_group) = last_message_group.clone() {
+                if last_message_group.ne(&message_group) {
+                    return Err(ClientError::new(
+                        ErrorKind::InvalidMessage,
+                        "Not all messages have the same message group.",
+                        Self::OPERATION_SEND_MESSAGE,
+                    ));
+                }
+            } else {
+                last_message_group = Some(message_group.clone());
+            }
+
+            let delivery_timestamp = message
+                .take_delivery_timestamp()
+                .map(|seconds| Timestamp { seconds, nanos: 0 });
+
+            let pb_message = pb::Message {
+                topic: Some(Resource {
+                    name: message.take_topic(),
+                    resource_namespace: self.option.namespace().to_string(),
+                }),
+                user_properties: message.take_properties(),
+                system_properties: Some(SystemProperties {
+                    tag: message.take_tag(),
+                    keys: message.take_keys(),
+                    message_id: message.take_message_id(),
+                    message_group,
+                    delivery_timestamp,
+                    born_host: HOST_NAME.clone(),
+                    born_timestamp: born_timestamp.clone(),
+                    body_digest: None,
+                    body_encoding: Encoding::Identity as i32,
+                    ..SystemProperties::default()
+                }),
+                body: message.take_body(),
+            };
+            pb_messages.push(pb_message);
+        }
+
+        let topic = last_topic.unwrap();
+        if topic.is_empty() {
+            return Err(ClientError::new(
+                ErrorKind::InvalidMessage,
+                "Message topic is empty.",
+                Self::OPERATION_SEND_MESSAGE,
+            ));
+        }
+
+        Ok((topic, last_message_group.unwrap(), pb_messages))
+    }
+
+    pub async fn send_one(
+        &self,
+        message: impl message::Message,
+    ) -> Result<SendResultEntry, ClientError> {
+        let results = self.send(vec![message]).await?;
+        Ok(results[0].clone())
+    }
+
+    pub async fn send(
+        &self,
+        messages: Vec<impl message::Message>,
+    ) -> Result<Vec<SendResultEntry>, ClientError> {
+        let (topic, message_group, mut pb_messages) =
+            self.transform_messages_to_protobuf(messages)?;
+
+        let route = self.client.topic_route(&topic, true).await?;
+
+        let message_queue = if let Some(message_group) = message_group {
+            self.select_message_queue_by_message_group(route, message_group)
+        } else {
+            self.select_message_queue(route)
+        };
+
+        if message_queue.broker.is_none() {
+            return Err(ClientError::new(
+                ErrorKind::NoBrokerAvailable,
+                "Message queue do not have a available endpoint.",
+                Self::OPERATION_SEND_MESSAGE,
+            )
+            .with_context("message_queue", format!("{:?}", message_queue)));
+        }
+
+        let broker = message_queue.broker.unwrap();
+        if broker.endpoints.is_none() {
+            return Err(ClientError::new(
+                ErrorKind::NoBrokerAvailable,
+                "Message queue do not have a available endpoint.",
+                Self::OPERATION_SEND_MESSAGE,
+            )
+            .with_context("broker", broker.name)
+            .with_context("topic", topic)
+            .with_context("queue_id", message_queue.id.to_string()));
+        }
+
+        let endpoints = Endpoints::from_pb_endpoints(broker.endpoints.unwrap());
+        for message in pb_messages.iter_mut() {
+            message.system_properties.as_mut().unwrap().queue_id = message_queue.id;
+        }
+
+        self.client.send_message(pb_messages, &endpoints).await
+    }
+
+    fn select_message_queue(&self, route: Arc<Route>) -> MessageQueue {
+        let i = route.index.fetch_add(1, Ordering::Relaxed);
+        route.queue[i % route.queue.len()].clone()
+    }
+
+    fn select_message_queue_by_message_group(
+        &self,
+        route: Arc<Route>,
+        message_group: String,
+    ) -> MessageQueue {
+        let mut sip_hasher24 = SipHasher24::default();
+        sip_hasher24.write(message_group.as_bytes());
+        let index = sip_hasher24.finish() % route.queue.len() as u64;
+        route.queue[index as usize].clone()
     }
 }
 
-// #[cfg(test)]
-// mod tests {
-//     use crate::conf::{ClientOption, ProducerOption};
-//     use crate::pb::{Message, Resource, SystemProperties};
-//     use crate::producer::Producer;
-//
-//     #[tokio::test]
-//     async fn test_producer_start() {
-//         let mut producer_option = ProducerOption::default();
-//         producer_option.set_topics(vec!["DefaultCluster".to_string()]);
-//         let producer = Producer::new(producer_option, ClientOption::default())
-//             .await
-//             .unwrap();
-//         producer.start().await.unwrap();
-//     }
-//
-//     #[tokio::test]
-//     async fn test_producer_send() {
-//         let mut producer_option = ProducerOption::default();
-//         producer_option.set_topics(vec!["DefaultCluster".to_string()]);
-//         let producer = Producer::new(producer_option, ClientOption::default())
-//             .await
-//             .unwrap();
-//         producer.start().await.unwrap();
-//         let send_result = producer
-//             .send(Message {
-//                 topic: Some(Resource {
-//                     resource_namespace: "".to_string(),
-//                     name: "DefaultCluster".to_string(),
-//                 }),
-//                 user_properties: Default::default(),
-//                 system_properties: Some(SystemProperties {
-//                     message_id: "message_test_id".to_string(),
-//                     ..SystemProperties::default()
-//                 }),
-//                 body: "Hello world".to_string().into_bytes(),
-//             })
-//             .await;
-//         println!("{:?}", send_result);
-//     }
-// }
+#[cfg(test)]
+mod tests {
+    use crate::conf::{ClientOption, ProducerOption};
+    use crate::error::ErrorKind;
+    use crate::model::message::MessageImpl;
+    use crate::producer::Producer;
+
+    // #[tokio::test]
+    // async fn producer_start() {
+    //     let mut producer_option = ProducerOption::default();
+    //     producer_option.set_topics(vec!["DefaultCluster".to_string()]);
+    //     let producer = Producer::new(producer_option, ClientOption::default())
+    //         .await
+    //         .unwrap();
+    //     producer.start().await.unwrap();
+    // }
+
+    #[tokio::test]
+    async fn producer_transform_messages_to_protobuf() {
+        let producer = Producer::new(ProducerOption::default(), ClientOption::default())
+            .await
+            .unwrap();
+        let messages = vec![MessageImpl::builder()
+            .set_topic("DefaultCluster".to_string())
+            .set_body("hello world".as_bytes().to_vec())
+            .set_tags("tag".to_string())
+            .set_keys(vec!["key".to_string()])
+            .set_properties(
+                vec![("key".to_string(), "value".to_string())]
+                    .into_iter()
+                    .collect(),
+            )
+            .set_message_group("message_group".to_string())
+            .build()
+            .unwrap()];
+        let result = producer.transform_messages_to_protobuf(messages);
+        assert!(result.is_ok());
+
+        let (topic, message_group, pb_messages) = result.unwrap();
+        assert_eq!(topic, "DefaultCluster");
+        assert_eq!(message_group, Some("message_group".to_string()));
+
+        // check message
+        assert_eq!(pb_messages.len(), 1);
+        let message = pb_messages.get(0).unwrap().clone();
+        assert_eq!(message.topic.unwrap().name, "DefaultCluster");
+        let system_properties = message.system_properties.unwrap();
+        assert_eq!(system_properties.tag.unwrap(), "tag");
+        assert_eq!(system_properties.keys, vec!["key".to_string()]);
+        assert_eq!(message.user_properties.get("key").unwrap(), "value");
+        assert_eq!(std::str::from_utf8(&message.body).unwrap(), "hello world");
+        assert!(!system_properties.message_id.is_empty());
+    }
+
+    #[tokio::test]
+    async fn producer_transform_messages_to_protobuf_failed() {
+        let producer = Producer::new(ProducerOption::default(), ClientOption::default())
+            .await
+            .unwrap();
+
+        let messages: Vec<MessageImpl> = vec![];
+        let result = producer.transform_messages_to_protobuf(messages);
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert_eq!(err.kind, ErrorKind::InvalidMessage);
+        assert_eq!(err.message, "No message found.");
+
+        let messages = vec![MessageImpl {
+            message_id: "".to_string(),
+            topic: "".to_string(),
+            body: None,
+            tags: None,
+            keys: None,
+            properties: None,
+            message_group: None,
+            delivery_timestamp: None,
+        }];
+        let result = producer.transform_messages_to_protobuf(messages);
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert_eq!(err.kind, ErrorKind::InvalidMessage);
+        assert_eq!(err.message, "Message topic is empty.");
+
+        let messages = vec![
+            MessageImpl::builder()
+                .set_topic("DefaultCluster".to_string())
+                .set_body("hello world".as_bytes().to_vec())
+                .build()
+                .unwrap(),
+            MessageImpl::builder()
+                .set_topic("DefaultCluster_dup".to_string())
+                .set_body("hello world".as_bytes().to_vec())
+                .build()
+                .unwrap(),
+        ];
+        let result = producer.transform_messages_to_protobuf(messages);
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert_eq!(err.kind, ErrorKind::InvalidMessage);
+        assert_eq!(err.message, "Not all messages have the same topic.");
+
+        let messages = vec![
+            MessageImpl::builder()
+                .set_topic("DefaultCluster".to_string())
+                .set_body("hello world".as_bytes().to_vec())
+                .set_message_group("message_group".to_string())
+                .build()
+                .unwrap(),
+            MessageImpl::builder()
+                .set_topic("DefaultCluster".to_string())
+                .set_body("hello world".as_bytes().to_vec())
+                .set_message_group("message_group_dup".to_string())
+                .build()
+                .unwrap(),
+        ];
+        let result = producer.transform_messages_to_protobuf(messages);
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert_eq!(err.kind, ErrorKind::InvalidMessage);
+        assert_eq!(err.message, "Not all messages have the same message group.");
+    }
+}
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 182a235..b84469e 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,13 +18,14 @@
 
 use async_trait::async_trait;
 use mockall::automock;
-use slog::{debug, info, o, Logger};
+use slog::{info, o, Logger};
 use tokio::sync::Mutex;
+use tonic::metadata::AsciiMetadataValue;
 use tonic::transport::{Channel, Endpoint};
 
 use crate::conf::ClientOption;
 use crate::error::ErrorKind;
-use crate::model::Endpoints;
+use crate::model::common::Endpoints;
 use crate::pb::{QueryRouteRequest, QueryRouteResponse, SendMessageRequest, SendMessageResponse};
 use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
 
@@ -64,8 +65,7 @@
         client_id: String,
         option: &ClientOption,
     ) -> Result<Self, ClientError> {
-        let peer = endpoints.access_url().to_owned();
-        debug!(logger, "creating session, peer={}", peer);
+        let peer = endpoints.endpoint_url().to_owned();
 
         let mut channel_endpoints = Vec::new();
         for endpoint in endpoints.inner().addresses.clone() {
@@ -75,7 +75,7 @@
         if channel_endpoints.is_empty() {
             return Err(ClientError::new(
                 ErrorKind::Connect,
-                "No endpoint available.".to_string(),
+                "No endpoint available.",
                 Self::OPERATION_CREATE,
             )
             .with_context("peer", peer.clone()));
@@ -85,7 +85,7 @@
             channel_endpoints[0].connect().await.map_err(|e| {
                 ClientError::new(
                     ErrorKind::Connect,
-                    "Failed to connect to peer.".to_string(),
+                    "Failed to connect to peer.",
                     Self::OPERATION_CREATE,
                 )
                 .set_source(e)
@@ -100,7 +100,7 @@
         info!(
             logger,
             "create session success, peer={}",
-            endpoints.access_url()
+            endpoints.endpoint_url()
         );
 
         Ok(Session {
@@ -124,7 +124,7 @@
             .map_err(|e| {
                 ClientError::new(
                     ErrorKind::Connect,
-                    "Failed to create channel endpoint.".to_string(),
+                    "Failed to create channel endpoint.",
                     Self::OPERATION_CREATE,
                 )
                 .set_source(e)
@@ -147,22 +147,17 @@
     }
 
     fn sign(&self, metadata: &mut tonic::metadata::MetadataMap) {
-        // let _ = tonic::metadata::AsciiMetadataValue::try_from(&self.id).and_then(|v| {
-        //     metadata.insert("x-mq-client-id", v);
-        //     Ok(())
-        // });
+        let _ = AsciiMetadataValue::try_from(&self.client_id)
+            .map(|v| metadata.insert("x-mq-client-id", v));
 
-        metadata.insert(
-            "x-mq-language",
-            tonic::metadata::AsciiMetadataValue::from_static("RUST"),
-        );
+        metadata.insert("x-mq-language", AsciiMetadataValue::from_static("RUST"));
         metadata.insert(
             "x-mq-client-version",
-            tonic::metadata::AsciiMetadataValue::from_static("5.0.0"),
+            AsciiMetadataValue::from_static("5.0.0"),
         );
         metadata.insert(
             "x-mq-protocol-version",
-            tonic::metadata::AsciiMetadataValue::from_static("2.0.0"),
+            AsciiMetadataValue::from_static("2.0.0"),
         );
     }
 }
@@ -178,7 +173,7 @@
         let response = self.stub.query_route(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
-                "Query topic route rpc failed.".to_string(),
+                "Query topic route rpc failed.",
                 Self::OPERATION_QUERY_ROUTE,
             )
             .set_source(e)
@@ -190,10 +185,12 @@
         &mut self,
         request: SendMessageRequest,
     ) -> Result<SendMessageResponse, ClientError> {
+        let mut request = tonic::Request::new(request);
+        self.sign(request.metadata_mut());
         let response = self.stub.send_message(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
-                "Send message rpc failed.".to_string(),
+                "Send message rpc failed.",
                 Self::OPERATION_SEND_MESSAGE,
             )
             .set_source(e)
@@ -224,9 +221,9 @@
 
     pub(crate) async fn get_session(&self, endpoints: &Endpoints) -> Result<Session, ClientError> {
         let mut session_map = self.session_map.lock().await;
-        let access_url = endpoints.access_url().to_string();
-        return if session_map.contains_key(&access_url) {
-            Ok(session_map.get(&access_url).unwrap().clone())
+        let endpoint_url = endpoints.endpoint_url().to_string();
+        return if session_map.contains_key(&endpoint_url) {
+            Ok(session_map.get(&endpoint_url).unwrap().clone())
         } else {
             let session = Session::new(
                 &self.logger,
@@ -235,7 +232,7 @@
                 &self.option,
             )
             .await?;
-            session_map.insert(access_url.clone(), session.clone());
+            session_map.insert(endpoint_url.clone(), session.clone());
             Ok(session)
         };
     }
@@ -244,6 +241,7 @@
 #[cfg(test)]
 mod tests {
     use crate::log::terminal_logger;
+    use slog::debug;
     use wiremock_grpc::generate;
 
     use super::*;
@@ -256,7 +254,7 @@
         let logger = terminal_logger();
         let session = Session::new(
             &logger,
-            &Endpoints::from_access_url(format!("localhost:{}", server.address().port())).unwrap(),
+            &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
             "test_client".to_string(),
             &ClientOption::default(),
         )
@@ -269,7 +267,7 @@
         let logger = terminal_logger();
         let session = Session::new(
             &logger,
-            &Endpoints::from_access_url("127.0.0.1:8080,127.0.0.1:8081".to_string()).unwrap(),
+            &Endpoints::from_url("127.0.0.1:8080,127.0.0.1:8081").unwrap(),
             "test_client".to_string(),
             &ClientOption::default(),
         )
@@ -285,8 +283,7 @@
             SessionManager::new(&logger, "test_client".to_string(), &ClientOption::default());
         let session = session_manager
             .get_session(
-                &Endpoints::from_access_url(format!("localhost:{}", server.address().port()))
-                    .unwrap(),
+                &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
             )
             .await
             .unwrap();