feat(rust): implement simple consumer (#476)

* feat(rust): implement simple consumer

* chore(rust): improve sdk api signature

* build(rust): check protoc version before compile proto files

* style(rust): fix markdown style

* fix(rust): fix config according to review comments

* doc(rust): fix README.md
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index c99ca5f..35a229c 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -33,6 +33,7 @@
 parking_lot = "0.12"
 hmac = "0.12"
 hostname = "0.3.1"
+os_type = "2.6.0"
 
 slog = {version = "2.7.0", features=["max_level_trace", "release_max_level_info"]}
 slog-term = "2.9.0"
@@ -57,6 +58,9 @@
 
 [build-dependencies]
 tonic-build = "0.9.0"
+which = "4.4.0"
+version_check = "0.9.4"
+regex = "1.7.3"
 
 [dev-dependencies]
 wiremock-grpc = "0.0.3-alpha2"
diff --git a/rust/README.md b/rust/README.md
new file mode 100644
index 0000000..5c48483
--- /dev/null
+++ b/rust/README.md
@@ -0,0 +1,29 @@
+# The Rust Implementation of Apache RocketMQ Client
+
+[RocketMQ Website](https://rocketmq.apache.org/)
+
+## Overview
+
+Here is the rust implementation of the client for [Apache RocketMQ](https://rocketmq.apache.org/). Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client), the current implementation is based on separating architecture for computing and storage, which is the more recommended way to access the RocketMQ service.
+
+Here are some preparations you may need to know (or refer to [here](https://rocketmq.apache.org/docs/quickStart/02quickstart)).
+
+## Getting Started
+
+### Requirements
+
+1. rust and cargo
+2. protoc 3.15.0+
+3. setup name server, broker, and [proxy](https://github.com/apache/rocketmq/tree/develop/proxy).
+
+### Run Example
+
+Run the following command to start the example:
+
+```sh
+# send message via producer
+cargo run --example producer
+
+# consume message via simple consumer
+cargo run --example simple_consumer
+```
diff --git a/rust/build.rs b/rust/build.rs
index aada9c3..41efaab 100644
--- a/rust/build.rs
+++ b/rust/build.rs
@@ -14,7 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use regex::Regex;
+use std::path::PathBuf;
+use std::process::Command;
+use std::{env, str};
+use version_check::Version;
+
 fn main() {
+    check_protoc_version();
+
     tonic_build::configure()
         .build_client(true)
         .build_server(false)
@@ -28,3 +36,38 @@
         )
         .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
 }
+
+fn check_protoc_version() {
+    let protoc = env::var_os("PROTOC")
+        .map(PathBuf::from)
+        .or_else(|| which::which("protoc").ok());
+
+    if protoc.is_none() {
+        panic!("protoc not found");
+    }
+
+    let mut cmd = Command::new(protoc.unwrap());
+    cmd.arg("--version");
+    let result = cmd.output();
+
+    if result.is_err() {
+        panic!("failed to invoke protoc: {:?}", result)
+    }
+
+    let output = result.unwrap();
+    if !output.status.success() {
+        panic!("protoc failed: {:?}", output)
+    }
+
+    let version_regex = Regex::new(r"(?:(\d+)\.)?(?:(\d+)\.)?(\*|\d+)").unwrap();
+    let protoc_version = version_regex.find(str::from_utf8(&output.stdout).unwrap());
+    if protoc_version.is_none() {
+        panic!("failed to parse protoc version");
+    }
+
+    let protoc_version = Version::parse(protoc_version.unwrap().as_str()).unwrap();
+    let min_version = Version::from_mmp(3, 15, 0);
+    if protoc_version.cmp(&min_version).is_le() {
+        panic!("protoc version must be >= 3.15.0");
+    }
+}
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index e88a3bc..c1b00d1 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -14,27 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-use rocketmq::{ClientOption, MessageImpl, Producer, ProducerOption};
+use rocketmq::conf::{ClientOption, ProducerOption};
+use rocketmq::model::message::MessageImpl;
+use rocketmq::Producer;
 
 #[tokio::main]
 async fn main() {
-    // specify which topic(s) you would like to send message to
+    // recommend to specify which topic(s) you would like to send message to
     // producer will prefetch topic route when start and failed fast if topic not exist
     let mut producer_option = ProducerOption::default();
-    producer_option.set_topics(vec!["test_topic".to_string()]);
+    producer_option.set_topics(vec!["test_topic"]);
 
     // set which rocketmq proxy to connect
     let mut client_option = ClientOption::default();
-    client_option.set_access_url("localhost:8081".to_string());
+    client_option.set_access_url("localhost:8081");
 
     // build and start producer
-    let producer = Producer::new(producer_option, client_option).await.unwrap();
+    let producer = Producer::new(producer_option, client_option).unwrap();
     producer.start().await.unwrap();
 
     // build message
     let message = MessageImpl::builder()
-        .set_topic("test_topic".to_string())
-        .set_tags("test_tag".to_string())
+        .set_topic("test_topic")
+        .set_tags("test_tag")
         .set_body("hello world".as_bytes().to_vec())
         .build()
         .unwrap();
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
new file mode 100644
index 0000000..4011556
--- /dev/null
+++ b/rust/examples/simple_consumer.rs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use rocketmq::conf::{ClientOption, SimpleConsumerOption};
+use rocketmq::model::common::{FilterExpression, FilterType};
+use rocketmq::SimpleConsumer;
+
+#[tokio::main]
+async fn main() {
+    // recommend to specify which topic(s) you would like to send message to
+    // simple consumer will prefetch topic route when start and failed fast if topic not exist
+    let mut consumer_option = SimpleConsumerOption::default();
+    consumer_option.set_topics(vec!["test_topic"]);
+    consumer_option.set_consumer_group("SimpleConsumerGroup");
+
+    // set which rocketmq proxy to connect
+    let mut client_option = ClientOption::default();
+    client_option.set_access_url("localhost:8081");
+
+    // build and start simple consumer
+    let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
+    consumer.start().await.unwrap();
+
+    // pop message from rocketmq proxy
+    let receive_result = consumer
+        .receive(
+            "test_topic".to_string(),
+            &FilterExpression::new(FilterType::Tag, "test_tag"),
+        )
+        .await;
+    debug_assert!(
+        receive_result.is_ok(),
+        "receive message failed: {:?}",
+        receive_result.unwrap_err()
+    );
+
+    let messages = receive_result.unwrap();
+    for message in messages {
+        println!("receive message: {:?}", message);
+        // ack message to rocketmq proxy
+        let ack_result = consumer.ack(message).await;
+        debug_assert!(
+            ack_result.is_ok(),
+            "ack message failed: {:?}",
+            ack_result.unwrap_err()
+        );
+    }
+}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index a60777e..7e50b3b 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -19,14 +19,21 @@
 use std::{collections::HashMap, sync::atomic::AtomicUsize, sync::Arc};
 
 use parking_lot::Mutex;
+use prost_types::Duration;
 use slog::{debug, error, info, o, warn, Logger};
+use tokio::select;
 use tokio::sync::oneshot;
 
 use crate::conf::ClientOption;
 use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{Endpoints, Route, RouteStatus};
+use crate::model::common::{ClientType, Endpoints, Route, RouteStatus};
+use crate::model::message::AckMessageEntry;
+use crate::pb;
+use crate::pb::receive_message_response::Content;
 use crate::pb::{
-    Code, Message, QueryRouteRequest, Resource, SendMessageRequest, SendResultEntry, Status,
+    AckMessageRequest, AckMessageResultEntry, Code, FilterExpression, HeartbeatRequest, Message,
+    MessageQueue, QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest,
+    SendResultEntry, Status, TelemetryCommand,
 };
 use crate::session::{RPCClient, Session, SessionManager};
 
@@ -34,10 +41,11 @@
 pub(crate) struct Client {
     logger: Logger,
     option: ClientOption,
-    session_manager: SessionManager,
+    session_manager: Arc<SessionManager>,
     route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
     id: String,
     access_endpoints: Endpoints,
+    settings: TelemetryCommand,
 }
 
 lazy_static::lazy_static! {
@@ -47,9 +55,16 @@
 impl Client {
     const OPERATION_CLIENT_NEW: &'static str = "client.new";
     const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
+    const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
     const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
+    const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
+    const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";
 
-    pub(crate) fn new(logger: &Logger, option: ClientOption) -> Result<Self, ClientError> {
+    pub(crate) fn new(
+        logger: &Logger,
+        option: ClientOption,
+        settings: TelemetryCommand,
+    ) -> Result<Self, ClientError> {
         let id = Self::generate_client_id();
         let endpoints = Endpoints::from_url(option.access_url())
             .map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
@@ -57,13 +72,85 @@
         Ok(Client {
             logger: logger.new(o!("component" => "client")),
             option,
-            session_manager,
+            session_manager: Arc::new(session_manager),
             route_table: Mutex::new(HashMap::new()),
             id,
             access_endpoints: endpoints,
+            settings,
         })
     }
 
+    async fn heart_beat(
+        logger: &Logger,
+        session_manager: Arc<SessionManager>,
+        group: &str,
+        namespace: &str,
+        client_type: &ClientType,
+    ) {
+        let sessions = session_manager.get_all_sessions().await;
+        if sessions.is_err() {
+            error!(
+                logger,
+                "send heartbeat failed: failed to get sessions: {}",
+                sessions.unwrap_err()
+            );
+            return;
+        }
+        for mut session in sessions.unwrap() {
+            let request = HeartbeatRequest {
+                group: Some(Resource {
+                    name: group.to_string(),
+                    resource_namespace: namespace.to_string(),
+                }),
+                client_type: client_type.clone() as i32,
+            };
+            let response = session.heartbeat(request).await;
+            if response.is_err() {
+                error!(
+                    logger,
+                    "send heartbeat failed: failed to send heartbeat rpc: {}",
+                    response.unwrap_err()
+                );
+                return;
+            }
+            let result =
+                Self::handle_response_status(response.unwrap().status, Self::OPERATION_HEARTBEAT);
+            if result.is_err() {
+                error!(
+                    logger,
+                    "send heartbeat failed: server return error: {}",
+                    result.unwrap_err()
+                );
+                return;
+            }
+            debug!(
+                logger,
+                "send heartbeat to server success, peer={}",
+                session.peer()
+            );
+        }
+    }
+
+    pub(crate) fn start(&self) {
+        let logger = self.logger.clone();
+        let session_manager = self.session_manager.clone();
+
+        let group = self.option.group.to_string();
+        let namespace = self.option.namespace.to_string();
+        let client_type = self.option.client_type.clone();
+
+        tokio::spawn(async move {
+            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
+            loop {
+                select! {
+                    _ = interval.tick() => {
+                        Self::heart_beat(&logger, session_manager.clone(), &group, &namespace, &client_type).await;
+                    },
+                }
+            }
+        });
+    }
+
     pub(crate) fn client_id(&self) -> &str {
         &self.id
     }
@@ -88,27 +175,32 @@
     }
 
     async fn get_session(&self) -> Result<Session, ClientError> {
-        self.session_manager
-            .get_session(&self.access_endpoints)
-            .await
+        let session = self
+            .session_manager
+            .get_or_create_session(&self.access_endpoints, self.settings.clone())
+            .await?;
+        Ok(session)
     }
 
     async fn get_session_with_endpoints(
         &self,
         endpoints: &Endpoints,
     ) -> Result<Session, ClientError> {
-        self.session_manager.get_session(endpoints).await
+        let session = self
+            .session_manager
+            .get_or_create_session(endpoints, self.settings.clone())
+            .await?;
+        Ok(session)
     }
 
     fn handle_response_status(
-        &self,
         status: Option<Status>,
         operation: &'static str,
     ) -> Result<(), ClientError> {
         if status.is_none() {
             return Err(ClientError::new(
                 ErrorKind::Server,
-                "Server do not return status, this may be a bug.",
+                "server do not return status, this may be a bug",
                 operation,
             ));
         }
@@ -117,7 +209,7 @@
         let status_code = Code::from_i32(status.code).unwrap();
         if !status_code.eq(&Code::Ok) {
             return Err(
-                ClientError::new(ErrorKind::Server, "Server return an error.", operation)
+                ClientError::new(ErrorKind::Server, "server return an error", operation)
                     .with_context("code", status_code.as_str_name())
                     .with_context("message", status.message),
             );
@@ -158,13 +250,13 @@
         let request = QueryRouteRequest {
             topic: Some(Resource {
                 name: topic.to_owned(),
-                resource_namespace: self.option.name_space().to_string(),
+                resource_namespace: self.option.namespace.to_string(),
             }),
             endpoints: Some(self.access_endpoints.inner().clone()),
         };
 
         let response = rpc_client.query_route(request).await?;
-        self.handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
+        Self::handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
 
         let route = Route {
             index: AtomicUsize::new(0),
@@ -209,7 +301,7 @@
                 Ok(route) => route,
                 Err(_e) => Err(ClientError::new(
                     ErrorKind::ChannelReceive,
-                    "Wait for inflight query topic route request failed.",
+                    "wait for inflight query topic route request failed",
                     Self::OPERATION_QUERY_ROUTE,
                 )),
             };
@@ -248,7 +340,7 @@
                 for item in v.drain(..) {
                     let _ = item.send(Err(ClientError::new(
                         ErrorKind::Server,
-                        "Query route failed.",
+                        "query topic route failed",
                         Self::OPERATION_QUERY_ROUTE,
                     )));
                 }
@@ -259,8 +351,8 @@
 
     pub(crate) async fn send_message(
         &self,
-        messages: Vec<Message>,
         endpoints: &Endpoints,
+        messages: Vec<Message>,
     ) -> Result<Vec<SendResultEntry>, ClientError> {
         self.send_message_inner(
             self.get_session_with_endpoints(endpoints).await.unwrap(),
@@ -277,7 +369,7 @@
         let message_count = messages.len();
         let request = SendMessageRequest { messages };
         let response = rpc_client.send_message(request).await?;
-        self.handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
+        Self::handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
 
         if response.entries.len() != message_count {
             error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
@@ -285,6 +377,104 @@
 
         Ok(response.entries)
     }
+
+    pub(crate) async fn receive_message(
+        &self,
+        endpoints: &Endpoints,
+        message_queue: MessageQueue,
+        expression: FilterExpression,
+        batch_size: i32,
+        invisible_duration: Duration,
+    ) -> Result<Vec<Message>, ClientError> {
+        self.receive_message_inner(
+            self.get_session_with_endpoints(endpoints).await.unwrap(),
+            message_queue,
+            expression,
+            batch_size,
+            invisible_duration,
+        )
+        .await
+    }
+
+    pub(crate) async fn receive_message_inner(
+        &self,
+        mut rpc_client: impl RPCClient,
+        message_queue: MessageQueue,
+        expression: FilterExpression,
+        batch_size: i32,
+        invisible_duration: Duration,
+    ) -> Result<Vec<Message>, ClientError> {
+        let request = ReceiveMessageRequest {
+            group: Some(Resource {
+                name: self.option.group.to_string(),
+                resource_namespace: self.option.namespace.to_string(),
+            }),
+            message_queue: Some(message_queue),
+            filter_expression: Some(expression),
+            batch_size,
+            invisible_duration: Some(invisible_duration),
+            auto_renew: false,
+            long_polling_timeout: Some(
+                Duration::try_from(*self.option.long_polling_timeout()).unwrap(),
+            ),
+        };
+        let responses = rpc_client.receive_message(request).await?;
+
+        let mut messages = Vec::with_capacity(batch_size as usize);
+        for response in responses {
+            match response.content.unwrap() {
+                Content::Status(status) => {
+                    Self::handle_response_status(Some(status), Self::OPERATION_RECEIVE_MESSAGE)?;
+                }
+                Content::Message(message) => {
+                    messages.push(message);
+                }
+                Content::DeliveryTimestamp(_) => {}
+            }
+        }
+        Ok(messages)
+    }
+
+    pub(crate) async fn ack_message(
+        &self,
+        ack_entry: impl AckMessageEntry,
+    ) -> Result<AckMessageResultEntry, ClientError> {
+        let result = self
+            .ack_message_inner(
+                self.get_session_with_endpoints(ack_entry.endpoints())
+                    .await
+                    .unwrap(),
+                ack_entry.topic(),
+                vec![pb::AckMessageEntry {
+                    message_id: ack_entry.message_id(),
+                    receipt_handle: ack_entry.receipt_handle(),
+                }],
+            )
+            .await?;
+        Ok(result[0].clone())
+    }
+
+    pub(crate) async fn ack_message_inner(
+        &self,
+        mut rpc_client: impl RPCClient,
+        topic: String,
+        entries: Vec<pb::AckMessageEntry>,
+    ) -> Result<Vec<AckMessageResultEntry>, ClientError> {
+        let request = AckMessageRequest {
+            group: Some(Resource {
+                name: self.option.group.to_string(),
+                resource_namespace: self.option.namespace.to_string(),
+            }),
+            topic: Some(Resource {
+                name: topic,
+                resource_namespace: self.option.namespace.to_string(),
+            }),
+            entries,
+        };
+        let response = rpc_client.ack_message(request).await?;
+        Self::handle_response_status(response.status, Self::OPERATION_ACK_MESSAGE)?;
+        Ok(response.entries)
+    }
 }
 
 #[cfg(test)]
@@ -299,8 +489,11 @@
     use crate::error::{ClientError, ErrorKind};
     use crate::log::terminal_logger;
     use crate::model::common::Route;
+    use crate::pb::receive_message_response::Content;
     use crate::pb::{
-        Code, MessageQueue, QueryRouteResponse, Resource, SendMessageResponse, Status,
+        AckMessageEntry, AckMessageResponse, Code, FilterExpression, Message, MessageQueue,
+        QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status,
+        TelemetryCommand,
     };
     use crate::session;
 
@@ -315,19 +508,17 @@
 
     #[test]
     fn handle_response_status() {
-        let client = Client::new(&terminal_logger(), ClientOption::default()).unwrap();
-
-        let result = client.handle_response_status(None, "test");
+        let result = Client::handle_response_status(None, "test");
         assert!(result.is_err(), "should return error when status is None");
         let result = result.unwrap_err();
         assert_eq!(result.kind, ErrorKind::Server);
         assert_eq!(
             result.message,
-            "Server do not return status, this may be a bug."
+            "server do not return status, this may be a bug"
         );
         assert_eq!(result.operation, "test");
 
-        let result = client.handle_response_status(
+        let result = Client::handle_response_status(
             Some(Status {
                 code: Code::BadRequest as i32,
                 message: "test failed".to_string(),
@@ -340,17 +531,17 @@
         );
         let result = result.unwrap_err();
         assert_eq!(result.kind, ErrorKind::Server);
-        assert_eq!(result.message, "Server return an error.");
+        assert_eq!(result.message, "server return an error");
         assert_eq!(result.operation, "test failed");
         assert_eq!(
             result.context,
             vec![
                 ("code", "BAD_REQUEST".to_string()),
-                ("message", "test failed".to_string())
+                ("message", "test failed".to_string()),
             ]
         );
 
-        let result = client.handle_response_status(
+        let result = Client::handle_response_status(
             Some(Status {
                 code: Code::Ok as i32,
                 message: "test success".to_string(),
@@ -382,7 +573,12 @@
     #[tokio::test]
     async fn client_query_route_from_cache() {
         let logger = terminal_logger();
-        let client = Client::new(&logger, ClientOption::default()).unwrap();
+        let client = Client::new(
+            &logger,
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
         client.route_table.lock().insert(
             "DefaultCluster".to_string(),
             super::RouteStatus::Found(Arc::new(Route {
@@ -397,7 +593,12 @@
     #[tokio::test]
     async fn client_query_route() {
         let logger = terminal_logger();
-        let client = Client::new(&logger, ClientOption::default()).unwrap();
+        let client = Client::new(
+            &logger,
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
 
         let mut mock = session::MockRPCClient::new();
         mock.expect_query_route()
@@ -420,7 +621,12 @@
     #[tokio::test(flavor = "multi_thread")]
     async fn client_query_route_with_inflight_request() {
         let logger = terminal_logger();
-        let client = Client::new(&logger, ClientOption::default()).unwrap();
+        let client = Client::new(
+            &logger,
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
         let client = Arc::new(client);
 
         let client_clone = client.clone();
@@ -448,7 +654,12 @@
     #[tokio::test(flavor = "multi_thread")]
     async fn client_query_route_with_failed_request() {
         let logger = terminal_logger();
-        let client = Client::new(&logger, ClientOption::default()).unwrap();
+        let client = Client::new(
+            &logger,
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
         let client = Arc::new(client);
 
         let client_clone = client.clone();
@@ -458,7 +669,7 @@
                 sleep(Duration::from_millis(200));
                 Box::pin(futures::future::ready(Err(ClientError::new(
                     ErrorKind::Server,
-                    "Server error",
+                    "server error",
                     "test",
                 ))))
             });
@@ -490,11 +701,138 @@
         mock.expect_send_message()
             .return_once(|_| Box::pin(futures::future::ready(response)));
 
-        let client = Client::new(&terminal_logger(), ClientOption::default()).unwrap();
+        let client = Client::new(
+            &terminal_logger(),
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
         let send_result = client.send_message_inner(mock, vec![]).await;
         assert!(send_result.is_ok());
 
         let send_results = send_result.unwrap();
         assert_eq!(send_results.len(), 0);
     }
+
+    #[tokio::test]
+    async fn client_receive_message() {
+        let response = Ok(vec![ReceiveMessageResponse {
+            content: Some(Content::Message(Message::default())),
+        }]);
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_receive_message()
+            .return_once(|_| Box::pin(futures::future::ready(response)));
+
+        let client = Client::new(
+            &terminal_logger(),
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
+        let receive_result = client
+            .receive_message_inner(
+                mock,
+                MessageQueue::default(),
+                FilterExpression::default(),
+                32,
+                prost_types::Duration::default(),
+            )
+            .await;
+        assert!(receive_result.is_ok());
+
+        let messages = receive_result.unwrap();
+        assert_eq!(messages.len(), 1);
+    }
+
+    #[tokio::test]
+    async fn client_receive_message_failed() {
+        let response = Ok(vec![ReceiveMessageResponse {
+            content: Some(Content::Status(Status {
+                code: Code::BadRequest as i32,
+                message: "Unknown error".to_string(),
+            })),
+        }]);
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_receive_message()
+            .return_once(|_| Box::pin(futures::future::ready(response)));
+
+        let client = Client::new(
+            &terminal_logger(),
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
+        let receive_result = client
+            .receive_message_inner(
+                mock,
+                MessageQueue::default(),
+                FilterExpression::default(),
+                32,
+                prost_types::Duration::default(),
+            )
+            .await;
+        assert!(receive_result.is_err());
+
+        let error = receive_result.unwrap_err();
+        assert_eq!(error.kind, ErrorKind::Server);
+        assert_eq!(error.message, "server return an error");
+        assert_eq!(error.operation, "client.receive_message");
+    }
+
+    #[tokio::test]
+    async fn client_ack_message() {
+        let response = Ok(AckMessageResponse {
+            status: Some(Status {
+                code: Code::Ok as i32,
+                message: "Success".to_string(),
+            }),
+            entries: vec![],
+        });
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_ack_message()
+            .return_once(|_| Box::pin(futures::future::ready(response)));
+
+        let client = Client::new(
+            &terminal_logger(),
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
+        let ack_entries: Vec<AckMessageEntry> = vec![];
+        let ack_result = client
+            .ack_message_inner(mock, "test_topic".to_string(), ack_entries)
+            .await;
+        assert!(ack_result.is_ok());
+        assert_eq!(ack_result.unwrap().len(), 0);
+    }
+
+    #[tokio::test]
+    async fn client_ack_message_failed() {
+        let response = Ok(AckMessageResponse {
+            status: Some(Status {
+                code: Code::BadRequest as i32,
+                message: "Success".to_string(),
+            }),
+            entries: vec![],
+        });
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_ack_message()
+            .return_once(|_| Box::pin(futures::future::ready(response)));
+
+        let client = Client::new(
+            &terminal_logger(),
+            ClientOption::default(),
+            TelemetryCommand::default(),
+        )
+        .unwrap();
+        let ack_entries: Vec<AckMessageEntry> = vec![];
+        let ack_result = client
+            .ack_message_inner(mock, "test_topic".to_string(), ack_entries)
+            .await;
+
+        let error = ack_result.unwrap_err();
+        assert_eq!(error.kind, ErrorKind::Server);
+        assert_eq!(error.message, "server return an error");
+        assert_eq!(error.operation, "client.ack_message");
+    }
 }
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 334a1e8..8423d90 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -14,36 +14,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use crate::model::common::ClientType;
+use std::time::Duration;
+
 #[derive(Debug, Clone)]
 pub struct ClientOption {
-    name_space: String,
-    access_url: String,
-    enable_tls: bool,
+    pub(crate) client_type: ClientType,
+    pub(crate) group: String,
+    pub(crate) namespace: String,
+    pub(crate) access_url: String,
+    pub(crate) enable_tls: bool,
+    pub(crate) timeout: Duration,
+    pub(crate) long_polling_timeout: Duration,
 }
 
 impl Default for ClientOption {
     fn default() -> Self {
         ClientOption {
-            name_space: "".to_string(),
+            client_type: ClientType::Producer,
+            group: "".to_string(),
+            namespace: "".to_string(),
             access_url: "localhost:8081".to_string(),
             enable_tls: false,
+            timeout: Duration::from_secs(3),
+            long_polling_timeout: Duration::from_secs(40),
         }
     }
 }
 
 impl ClientOption {
-    pub fn name_space(&self) -> &str {
-        &self.name_space
-    }
-    pub fn set_name_space(&mut self, name_space: String) {
-        self.name_space = name_space;
-    }
-
     pub fn access_url(&self) -> &str {
         &self.access_url
     }
-    pub fn set_access_url(&mut self, access_url: String) {
-        self.access_url = access_url;
+    pub fn set_access_url(&mut self, access_url: impl Into<String>) {
+        self.access_url = access_url.into();
     }
 
     pub fn enable_tls(&self) -> bool {
@@ -52,6 +56,20 @@
     pub fn set_enable_tls(&mut self, enable_tls: bool) {
         self.enable_tls = enable_tls;
     }
+
+    pub fn timeout(&self) -> &Duration {
+        &self.timeout
+    }
+    pub fn set_timeout(&mut self, timeout: Duration) {
+        self.timeout = timeout;
+    }
+
+    pub fn long_polling_timeout(&self) -> &Duration {
+        &self.long_polling_timeout
+    }
+    pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
+        self.long_polling_timeout = long_polling_timeout;
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -66,6 +84,7 @@
     prefetch_route: bool,
     topics: Option<Vec<String>>,
     namespace: String,
+    validate_message_type: bool,
 }
 
 impl Default for ProducerOption {
@@ -75,6 +94,7 @@
             prefetch_route: true,
             topics: None,
             namespace: "".to_string(),
+            validate_message_type: true,
         }
     }
 }
@@ -97,14 +117,81 @@
     pub fn topics(&self) -> &Option<Vec<String>> {
         &self.topics
     }
-    pub fn set_topics(&mut self, topics: Vec<String>) {
-        self.topics = Some(topics);
+    pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
+        self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
     }
 
-    pub fn namespace(&self) -> &str {
+    // not expose to user for now
+    pub(crate) fn namespace(&self) -> &str {
         &self.namespace
     }
-    pub fn set_namespace(&mut self, name_space: String) {
-        self.namespace = name_space;
+    pub(crate) fn set_namespace(&mut self, name_space: impl Into<String>) {
+        self.namespace = name_space.into();
+    }
+
+    pub fn validate_message_type(&self) -> bool {
+        self.validate_message_type
+    }
+    pub fn set_validate_message_type(&mut self, validate_message_type: bool) {
+        self.validate_message_type = validate_message_type;
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct SimpleConsumerOption {
+    logging_format: LoggingFormat,
+    consumer_group: String,
+    prefetch_route: bool,
+    topics: Option<Vec<String>>,
+    namespace: String,
+}
+
+impl Default for SimpleConsumerOption {
+    fn default() -> Self {
+        SimpleConsumerOption {
+            logging_format: LoggingFormat::Terminal,
+            consumer_group: "".to_string(),
+            prefetch_route: true,
+            topics: None,
+            namespace: "".to_string(),
+        }
+    }
+}
+
+impl SimpleConsumerOption {
+    pub fn logging_format(&self) -> &LoggingFormat {
+        &self.logging_format
+    }
+    pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
+        self.logging_format = logging_format;
+    }
+
+    pub fn consumer_group(&self) -> &str {
+        &self.consumer_group
+    }
+    pub fn set_consumer_group(&mut self, consumer_group: impl Into<String>) {
+        self.consumer_group = consumer_group.into();
+    }
+
+    pub fn prefetch_route(&self) -> &bool {
+        &self.prefetch_route
+    }
+    pub fn set_prefetch_route(&mut self, prefetch_route: bool) {
+        self.prefetch_route = prefetch_route;
+    }
+
+    pub fn topics(&self) -> &Option<Vec<String>> {
+        &self.topics
+    }
+    pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
+        self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
+    }
+
+    // not expose to user for now
+    pub(crate) fn namespace(&self) -> &str {
+        &self.namespace
+    }
+    pub(crate) fn set_namespace(&mut self, name_space: impl Into<String>) {
+        self.namespace = name_space.into();
     }
 }
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 6ae04e3..e7299a5 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -38,6 +38,9 @@
     #[error("Client internal error")]
     ClientInternal,
 
+    #[error("Client is not running")]
+    ClientIsNotRunning,
+
     #[error("Failed to send message via channel")]
     ChannelSend,
 
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 4a0c165..c6d1cc2 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 #[allow(dead_code)]
-mod conf;
+pub mod conf;
 #[allow(dead_code)]
 mod error;
 #[allow(dead_code)]
@@ -29,11 +29,12 @@
 mod session;
 
 #[allow(dead_code)]
-mod model;
+pub mod model;
+mod util;
+
 mod producer;
+mod simple_consumer;
 
 // Export structs that are part of crate API.
-pub use conf::ClientOption;
-pub use conf::ProducerOption;
-pub use model::message::MessageImpl;
 pub use producer::Producer;
+pub use simple_consumer::SimpleConsumer;
diff --git a/rust/src/log.rs b/rust/src/log.rs
index ae7edcc..db2f473 100644
--- a/rust/src/log.rs
+++ b/rust/src/log.rs
@@ -19,7 +19,7 @@
 use slog::{o, Drain, Logger};
 use slog_async::OverflowStrategy;
 
-use crate::conf::{LoggingFormat, ProducerOption};
+use crate::conf::LoggingFormat;
 
 pub(crate) fn terminal_logger() -> Logger {
     let decorator = slog_term::TermDecorator::new().build();
@@ -47,8 +47,8 @@
     Logger::root(drain, o!())
 }
 
-pub(crate) fn logger(_option: &ProducerOption) -> Logger {
-    match _option.logging_format() {
+pub(crate) fn logger(logging_format: &LoggingFormat) -> Logger {
+    match logging_format {
         LoggingFormat::Terminal => terminal_logger(),
         LoggingFormat::Json => json_logger("logs/rocketmq_client.log"),
     }
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 2082c28..9f1e947 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -24,8 +24,16 @@
 use crate::pb;
 use crate::pb::{Address, AddressScheme, MessageQueue};
 
+#[derive(Debug, Clone)]
+pub(crate) enum ClientType {
+    Producer = 1,
+    PushConsumer = 2,
+    SimpleConsumer = 3,
+    PullConsumer = 4,
+}
+
 #[derive(Debug)]
-pub struct Route {
+pub(crate) struct Route {
     pub(crate) index: AtomicUsize,
     pub queue: Vec<MessageQueue>,
 }
@@ -36,8 +44,8 @@
     Found(Arc<Route>),
 }
 
-#[derive(Debug)]
-pub(crate) struct Endpoints {
+#[derive(Debug, Clone)]
+pub struct Endpoints {
     endpoint_url: String,
     scheme: AddressScheme,
     inner: pb::Endpoints,
@@ -53,7 +61,7 @@
         if endpoint_url.is_empty() {
             return Err(ClientError::new(
                 ErrorKind::Config,
-                "Endpoint url is empty.",
+                "endpoint url is empty",
                 Self::OPERATION_PARSE,
             )
             .with_context("url", endpoint_url));
@@ -66,7 +74,7 @@
                 let port_i32 = port.parse::<i32>().map_err(|e| {
                     ClientError::new(
                         ErrorKind::Config,
-                        &format!("Port {} in endpoint url is invalid.", port),
+                        &format!("port {} in endpoint url is invalid", port),
                         Self::OPERATION_PARSE,
                     )
                     .with_context("url", endpoint_url)
@@ -76,7 +84,7 @@
             } else {
                 return Err(ClientError::new(
                     ErrorKind::Config,
-                    "Port in endpoint url is missing.",
+                    "port in endpoint url is missing",
                     Self::OPERATION_PARSE,
                 )
                 .with_context("url", endpoint_url));
@@ -93,7 +101,7 @@
                             if scheme == AddressScheme::IPv6 {
                                 return Err(ClientError::new(
                                     ErrorKind::Config,
-                                    "Multiple addresses not in the same schema.",
+                                    "multiple addresses not in the same schema",
                                     Self::OPERATION_PARSE,
                                 )
                                 .with_context("url", endpoint_url));
@@ -104,7 +112,7 @@
                             if scheme == AddressScheme::IPv4 {
                                 return Err(ClientError::new(
                                     ErrorKind::Config,
-                                    "Multiple addresses not in the same schema.",
+                                    "multiple addresses not in the same schema",
                                     Self::OPERATION_PARSE,
                                 )
                                 .with_context("url", endpoint_url));
@@ -118,7 +126,7 @@
                     if urls_len > 1 {
                         return Err(ClientError::new(
                             ErrorKind::Config,
-                            "Multiple addresses not allowed in domain schema.",
+                            "multiple addresses not allowed in domain schema",
                             Self::OPERATION_PARSE,
                         )
                         .with_context("url", endpoint_url));
@@ -169,6 +177,27 @@
     }
 }
 
+#[derive(Clone, Copy)]
+#[repr(i32)]
+pub enum FilterType {
+    Tag = 1,
+    Sql = 2,
+}
+
+pub struct FilterExpression {
+    pub(crate) filter_type: FilterType,
+    pub(crate) expression: String,
+}
+
+impl FilterExpression {
+    pub fn new(filter_type: FilterType, expression: impl Into<String>) -> Self {
+        FilterExpression {
+            filter_type,
+            expression: expression.into(),
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::error::ErrorKind;
@@ -214,31 +243,31 @@
         let err = Endpoints::from_url("").err().unwrap();
         assert_eq!(err.kind, ErrorKind::Config);
         assert_eq!(err.operation, "endpoint.parse");
-        assert_eq!(err.message, "Endpoint url is empty.");
+        assert_eq!(err.message, "endpoint url is empty");
 
         let err = Endpoints::from_url("localhost:<port>").err().unwrap();
         assert_eq!(err.kind, ErrorKind::Config);
         assert_eq!(err.operation, "endpoint.parse");
-        assert_eq!(err.message, "Port <port> in endpoint url is invalid.");
+        assert_eq!(err.message, "port <port> in endpoint url is invalid");
 
         let err = Endpoints::from_url("localhost").err().unwrap();
         assert_eq!(err.kind, ErrorKind::Config);
         assert_eq!(err.operation, "endpoint.parse");
-        assert_eq!(err.message, "Port in endpoint url is missing.");
+        assert_eq!(err.message, "port in endpoint url is missing");
 
         let err = Endpoints::from_url("127.0.0.1:8080,::1:8080")
             .err()
             .unwrap();
         assert_eq!(err.kind, ErrorKind::Config);
         assert_eq!(err.operation, "endpoint.parse");
-        assert_eq!(err.message, "Multiple addresses not in the same schema.");
+        assert_eq!(err.message, "multiple addresses not in the same schema");
 
         let err = Endpoints::from_url("::1:8080,127.0.0.1:8080")
             .err()
             .unwrap();
         assert_eq!(err.kind, ErrorKind::Config);
         assert_eq!(err.operation, "endpoint.parse");
-        assert_eq!(err.message, "Multiple addresses not in the same schema.");
+        assert_eq!(err.message, "multiple addresses not in the same schema");
 
         let err = Endpoints::from_url("localhost:8080,localhost:8081")
             .err()
@@ -247,7 +276,7 @@
         assert_eq!(err.operation, "endpoint.parse");
         assert_eq!(
             err.message,
-            "Multiple addresses not allowed in domain schema."
+            "multiple addresses not allowed in domain schema"
         );
     }
 
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 17584d0..259a484 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 use crate::error::{ClientError, ErrorKind};
+use crate::model::common::Endpoints;
 use crate::model::message_id::UNIQ_ID_GENERATOR;
+use crate::pb;
 use std::collections::HashMap;
 
 pub trait Message {
@@ -133,8 +135,8 @@
 impl MessageBuilder {
     const OPERATION_BUILD_MESSAGE: &'static str = "build_message";
 
-    pub fn set_topic(mut self, topic: String) -> Self {
-        self.message.topic = topic;
+    pub fn set_topic(mut self, topic: impl Into<String>) -> Self {
+        self.message.topic = topic.into();
         self
     }
 
@@ -143,23 +145,31 @@
         self
     }
 
-    pub fn set_tags(mut self, tags: String) -> Self {
-        self.message.tags = Some(tags);
+    pub fn set_tags(mut self, tags: impl Into<String>) -> Self {
+        self.message.tags = Some(tags.into());
         self
     }
 
-    pub fn set_keys(mut self, keys: Vec<String>) -> Self {
-        self.message.keys = Some(keys);
+    pub fn set_keys(mut self, keys: Vec<impl Into<String>>) -> Self {
+        self.message.keys = Some(keys.into_iter().map(|k| k.into()).collect());
         self
     }
 
-    pub fn set_properties(mut self, properties: HashMap<String, String>) -> Self {
-        self.message.properties = Some(properties);
+    pub fn set_properties(
+        mut self,
+        properties: HashMap<impl Into<String>, impl Into<String>>,
+    ) -> Self {
+        self.message.properties = Some(
+            properties
+                .into_iter()
+                .map(|(k, v)| (k.into(), v.into()))
+                .collect(),
+        );
         self
     }
 
-    pub fn set_message_group(mut self, message_group: String) -> Self {
-        self.message.message_group = Some(message_group);
+    pub fn set_message_group(mut self, message_group: impl Into<String>) -> Self {
+        self.message.message_group = Some(message_group.into());
         self
     }
 
@@ -191,6 +201,113 @@
     }
 }
 
+pub trait AckMessageEntry {
+    fn topic(&self) -> String;
+    fn message_id(&self) -> String;
+    fn receipt_handle(&self) -> String;
+    fn endpoints(&self) -> &Endpoints;
+}
+
+#[derive(Debug)]
+pub struct MessageView {
+    pub(crate) message_id: String,
+    pub(crate) receipt_handle: Option<String>,
+    pub(crate) topic: String,
+    pub(crate) body: Vec<u8>,
+    pub(crate) tag: Option<String>,
+    pub(crate) keys: Vec<String>,
+    pub(crate) properties: HashMap<String, String>,
+    pub(crate) message_group: Option<String>,
+    pub(crate) delivery_timestamp: Option<i64>,
+    pub(crate) born_host: String,
+    pub(crate) born_timestamp: i64,
+    pub(crate) delivery_attempt: i32,
+    pub(crate) endpoints: Endpoints,
+}
+
+impl AckMessageEntry for MessageView {
+    fn topic(&self) -> String {
+        self.topic.clone()
+    }
+
+    fn message_id(&self) -> String {
+        self.message_id.clone()
+    }
+
+    fn receipt_handle(&self) -> String {
+        self.receipt_handle.clone().unwrap()
+    }
+
+    fn endpoints(&self) -> &Endpoints {
+        &self.endpoints
+    }
+}
+
+impl MessageView {
+    pub(crate) fn from_pb_message(message: pb::Message, endpoints: Endpoints) -> Self {
+        let system_properties = message.system_properties.unwrap();
+        MessageView {
+            message_id: system_properties.message_id,
+            receipt_handle: system_properties.receipt_handle,
+            topic: message.topic.unwrap().name,
+            body: message.body,
+            tag: system_properties.tag,
+            keys: system_properties.keys,
+            properties: message.user_properties,
+            message_group: system_properties.message_group,
+            delivery_timestamp: system_properties.delivery_timestamp.map(|t| t.seconds),
+            born_host: system_properties.born_host,
+            born_timestamp: system_properties.born_timestamp.map_or(0, |t| t.seconds),
+            delivery_attempt: system_properties.delivery_attempt.unwrap_or(0),
+            endpoints,
+        }
+    }
+
+    pub fn message_id(&self) -> &str {
+        &self.message_id
+    }
+
+    pub fn topic(&self) -> &str {
+        &self.topic
+    }
+
+    pub fn body(&self) -> &[u8] {
+        &self.body
+    }
+
+    pub fn tag(&self) -> Option<&str> {
+        self.tag.as_deref()
+    }
+
+    pub fn keys(&self) -> &[String] {
+        &self.keys
+    }
+
+    pub fn properties(&self) -> &HashMap<String, String> {
+        &self.properties
+    }
+
+    pub fn message_group(&self) -> Option<&str> {
+        self.message_group.as_deref()
+    }
+
+    pub fn delivery_timestamp(&self) -> Option<i64> {
+        self.delivery_timestamp
+    }
+
+    pub fn born_host(&self) -> &str {
+        &self.born_host
+    }
+
+    pub fn born_timestamp(&self) -> i64 {
+        self.born_timestamp
+    }
+
+    pub fn delivery_attempt(&self) -> i32 {
+        self.delivery_attempt
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/rust/src/model/mod.rs b/rust/src/model/mod.rs
index 1a873d6..aef3261 100644
--- a/rust/src/model/mod.rs
+++ b/rust/src/model/mod.rs
@@ -14,6 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-pub(crate) mod common;
-pub(crate) mod message;
+pub mod common;
+pub mod message;
 pub(crate) mod message_id;
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index e288a5c..dda1e69 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -20,21 +20,21 @@
 //! `Producer` is a thin wrapper of internal `Client` struct that shoulders the actual workloads.
 //! Most of its methods take shared reference so that application developers may use it at will.
 
-use std::hash::Hasher;
-use std::sync::atomic::Ordering;
-use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
 use prost_types::Timestamp;
-use siphasher::sip::SipHasher24;
 use slog::{info, Logger};
 
 use crate::client::Client;
 use crate::conf::{ClientOption, ProducerOption};
 use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{Endpoints, Route};
+use crate::model::common::ClientType;
 use crate::model::message;
-use crate::pb::{Encoding, MessageQueue, Resource, SendResultEntry, SystemProperties};
+use crate::pb::{Encoding, Resource, SendResultEntry, SystemProperties};
+use crate::util::{
+    build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
+    select_message_queue_by_message_group, HOST_NAME,
+};
 use crate::{log, pb};
 
 /// `Producer` is the core struct, to which application developers should turn, when publishing messages to brokers.
@@ -47,22 +47,18 @@
     client: Client,
 }
 
-lazy_static::lazy_static! {
-    static ref HOST_NAME: String = match hostname::get() {
-        Ok(name) => name.to_str().unwrap_or("localhost").to_string(),
-        Err(_) => "localhost".to_string(),
-    };
-}
-
 impl Producer {
     const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
 
-    pub async fn new(
-        option: ProducerOption,
-        client_option: ClientOption,
-    ) -> Result<Self, ClientError> {
-        let logger = log::logger(&option);
-        let client = Client::new(&logger, client_option)?;
+    pub fn new(option: ProducerOption, client_option: ClientOption) -> Result<Self, ClientError> {
+        let client_option = ClientOption {
+            client_type: ClientType::Producer,
+            namespace: option.namespace().to_string(),
+            ..client_option
+        };
+        let logger = log::logger(option.logging_format());
+        let settings = build_producer_settings(&option, &client_option);
+        let client = Client::new(&logger, client_option, settings)?;
         Ok(Producer {
             option,
             logger,
@@ -76,6 +72,7 @@
                 self.client.topic_route(topic, true).await?;
             }
         }
+        self.client.start();
         info!(
             self.logger,
             "start producer success, client_id: {}",
@@ -91,7 +88,7 @@
         if messages.is_empty() {
             return Err(ClientError::new(
                 ErrorKind::InvalidMessage,
-                "No message found.",
+                "no message found",
                 Self::OPERATION_SEND_MESSAGE,
             ));
         }
@@ -126,7 +123,7 @@
                 if last_message_group.ne(&message_group) {
                     return Err(ClientError::new(
                         ErrorKind::InvalidMessage,
-                        "Not all messages have the same message group.",
+                        "not all messages have the same message group",
                         Self::OPERATION_SEND_MESSAGE,
                     ));
                 }
@@ -165,7 +162,7 @@
         if topic.is_empty() {
             return Err(ClientError::new(
                 ErrorKind::InvalidMessage,
-                "Message topic is empty.",
+                "message topic is empty",
                 Self::OPERATION_SEND_MESSAGE,
             ));
         }
@@ -191,54 +188,18 @@
         let route = self.client.topic_route(&topic, true).await?;
 
         let message_queue = if let Some(message_group) = message_group {
-            self.select_message_queue_by_message_group(route, message_group)
+            select_message_queue_by_message_group(route, message_group)
         } else {
-            self.select_message_queue(route)
+            select_message_queue(route)
         };
 
-        if message_queue.broker.is_none() {
-            return Err(ClientError::new(
-                ErrorKind::NoBrokerAvailable,
-                "Message queue do not have a available endpoint.",
-                Self::OPERATION_SEND_MESSAGE,
-            )
-            .with_context("message_queue", format!("{:?}", message_queue)));
-        }
-
-        let broker = message_queue.broker.unwrap();
-        if broker.endpoints.is_none() {
-            return Err(ClientError::new(
-                ErrorKind::NoBrokerAvailable,
-                "Message queue do not have a available endpoint.",
-                Self::OPERATION_SEND_MESSAGE,
-            )
-            .with_context("broker", broker.name)
-            .with_context("topic", topic)
-            .with_context("queue_id", message_queue.id.to_string()));
-        }
-
-        let endpoints = Endpoints::from_pb_endpoints(broker.endpoints.unwrap());
+        let endpoints =
+            build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?;
         for message in pb_messages.iter_mut() {
             message.system_properties.as_mut().unwrap().queue_id = message_queue.id;
         }
 
-        self.client.send_message(pb_messages, &endpoints).await
-    }
-
-    fn select_message_queue(&self, route: Arc<Route>) -> MessageQueue {
-        let i = route.index.fetch_add(1, Ordering::Relaxed);
-        route.queue[i % route.queue.len()].clone()
-    }
-
-    fn select_message_queue_by_message_group(
-        &self,
-        route: Arc<Route>,
-        message_group: String,
-    ) -> MessageQueue {
-        let mut sip_hasher24 = SipHasher24::default();
-        sip_hasher24.write(message_group.as_bytes());
-        let index = sip_hasher24.finish() % route.queue.len() as u64;
-        route.queue[index as usize].clone()
+        self.client.send_message(&endpoints, pb_messages).await
     }
 }
 
@@ -261,19 +222,13 @@
 
     #[tokio::test]
     async fn producer_transform_messages_to_protobuf() {
-        let producer = Producer::new(ProducerOption::default(), ClientOption::default())
-            .await
-            .unwrap();
+        let producer = Producer::new(ProducerOption::default(), ClientOption::default()).unwrap();
         let messages = vec![MessageImpl::builder()
-            .set_topic("DefaultCluster".to_string())
+            .set_topic("DefaultCluster")
             .set_body("hello world".as_bytes().to_vec())
-            .set_tags("tag".to_string())
-            .set_keys(vec!["key".to_string()])
-            .set_properties(
-                vec![("key".to_string(), "value".to_string())]
-                    .into_iter()
-                    .collect(),
-            )
+            .set_tags("tag")
+            .set_keys(vec!["key"])
+            .set_properties(vec![("key", "value")].into_iter().collect())
             .set_message_group("message_group".to_string())
             .build()
             .unwrap()];
@@ -298,16 +253,14 @@
 
     #[tokio::test]
     async fn producer_transform_messages_to_protobuf_failed() {
-        let producer = Producer::new(ProducerOption::default(), ClientOption::default())
-            .await
-            .unwrap();
+        let producer = Producer::new(ProducerOption::default(), ClientOption::default()).unwrap();
 
         let messages: Vec<MessageImpl> = vec![];
         let result = producer.transform_messages_to_protobuf(messages);
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert_eq!(err.kind, ErrorKind::InvalidMessage);
-        assert_eq!(err.message, "No message found.");
+        assert_eq!(err.message, "no message found");
 
         let messages = vec![MessageImpl {
             message_id: "".to_string(),
@@ -323,16 +276,16 @@
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert_eq!(err.kind, ErrorKind::InvalidMessage);
-        assert_eq!(err.message, "Message topic is empty.");
+        assert_eq!(err.message, "message topic is empty");
 
         let messages = vec![
             MessageImpl::builder()
-                .set_topic("DefaultCluster".to_string())
+                .set_topic("DefaultCluster")
                 .set_body("hello world".as_bytes().to_vec())
                 .build()
                 .unwrap(),
             MessageImpl::builder()
-                .set_topic("DefaultCluster_dup".to_string())
+                .set_topic("DefaultCluster_dup")
                 .set_body("hello world".as_bytes().to_vec())
                 .build()
                 .unwrap(),
@@ -345,15 +298,15 @@
 
         let messages = vec![
             MessageImpl::builder()
-                .set_topic("DefaultCluster".to_string())
+                .set_topic("DefaultCluster")
                 .set_body("hello world".as_bytes().to_vec())
-                .set_message_group("message_group".to_string())
+                .set_message_group("message_group")
                 .build()
                 .unwrap(),
             MessageImpl::builder()
-                .set_topic("DefaultCluster".to_string())
+                .set_topic("DefaultCluster")
                 .set_body("hello world".as_bytes().to_vec())
-                .set_message_group("message_group_dup".to_string())
+                .set_message_group("message_group_dup")
                 .build()
                 .unwrap(),
         ];
@@ -361,6 +314,6 @@
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert_eq!(err.kind, ErrorKind::InvalidMessage);
-        assert_eq!(err.message, "Not all messages have the same message group.");
+        assert_eq!(err.message, "not all messages have the same message group");
     }
 }
diff --git a/rust/src/session.rs b/rust/src/session.rs
index b84469e..fcb212a 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,31 +18,56 @@
 
 use async_trait::async_trait;
 use mockall::automock;
-use slog::{info, o, Logger};
-use tokio::sync::Mutex;
+use slog::{debug, error, info, o, Logger};
+use tokio::sync::{mpsc, Mutex};
+use tokio_stream::wrappers::ReceiverStream;
+use tokio_stream::StreamExt;
 use tonic::metadata::AsciiMetadataValue;
 use tonic::transport::{Channel, Endpoint};
 
 use crate::conf::ClientOption;
 use crate::error::ErrorKind;
 use crate::model::common::Endpoints;
-use crate::pb::{QueryRouteRequest, QueryRouteResponse, SendMessageRequest, SendMessageResponse};
+use crate::pb::{
+    AckMessageRequest, AckMessageResponse, HeartbeatRequest, HeartbeatResponse, QueryRouteRequest,
+    QueryRouteResponse, ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest,
+    SendMessageResponse, TelemetryCommand,
+};
+use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
 use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
 
 #[async_trait]
 #[automock]
 pub(crate) trait RPCClient {
+    const OPERATION_START: &'static str = "session.start";
+    const OPERATION_UPDATE_SETTINGS: &'static str = "session.update_settings";
+
     const OPERATION_QUERY_ROUTE: &'static str = "rpc.query_route";
+    const OPERATION_HEARTBEAT: &'static str = "rpc.heartbeat";
     const OPERATION_SEND_MESSAGE: &'static str = "rpc.send_message";
+    const OPERATION_RECEIVE_MESSAGE: &'static str = "rpc.receive_message";
+    const OPERATION_ACK_MESSAGE: &'static str = "rpc.ack_message";
 
     async fn query_route(
         &mut self,
         request: QueryRouteRequest,
     ) -> Result<QueryRouteResponse, ClientError>;
+    async fn heartbeat(
+        &mut self,
+        request: HeartbeatRequest,
+    ) -> Result<HeartbeatResponse, ClientError>;
     async fn send_message(
         &mut self,
         request: SendMessageRequest,
     ) -> Result<SendMessageResponse, ClientError>;
+    async fn receive_message(
+        &mut self,
+        request: ReceiveMessageRequest,
+    ) -> Result<Vec<ReceiveMessageResponse>, ClientError>;
+    async fn ack_message(
+        &mut self,
+        request: AckMessageRequest,
+    ) -> Result<AckMessageResponse, ClientError>;
 }
 
 #[allow(dead_code)]
@@ -50,7 +75,10 @@
 pub(crate) struct Session {
     logger: Logger,
     client_id: String,
+    option: ClientOption,
+    endpoints: Endpoints,
     stub: MessagingServiceClient<Channel>,
+    telemetry_tx: Box<Option<mpsc::Sender<TelemetryCommand>>>,
 }
 
 impl Session {
@@ -75,7 +103,7 @@
         if channel_endpoints.is_empty() {
             return Err(ClientError::new(
                 ErrorKind::Connect,
-                "No endpoint available.",
+                "no endpoint available",
                 Self::OPERATION_CREATE,
             )
             .with_context("peer", peer.clone()));
@@ -85,7 +113,7 @@
             channel_endpoints[0].connect().await.map_err(|e| {
                 ClientError::new(
                     ErrorKind::Connect,
-                    "Failed to connect to peer.",
+                    "failed to connect to peer",
                     Self::OPERATION_CREATE,
                 )
                 .set_source(e)
@@ -97,16 +125,16 @@
 
         let stub = MessagingServiceClient::new(channel);
 
-        info!(
-            logger,
-            "create session success, peer={}",
-            endpoints.endpoint_url()
-        );
+        let logger = logger.new(o!("peer" => peer.clone()));
+        info!(logger, "create session success");
 
         Ok(Session {
-            logger: logger.new(o!("component" => "session", "peer" => peer.clone())),
+            logger,
+            option: option.clone(),
+            endpoints: endpoints.clone(),
             client_id,
             stub,
+            telemetry_tx: Box::new(None),
         })
     }
 
@@ -124,7 +152,7 @@
             .map_err(|e| {
                 ClientError::new(
                     ErrorKind::Connect,
-                    "Failed to create channel endpoint.",
+                    "failed to create channel endpoint",
                     Self::OPERATION_CREATE,
                 )
                 .set_source(e)
@@ -141,24 +169,111 @@
             //     .set_source(e)
             //     .with_context("peer", &peer_addr)
             // })?
-            .connect_timeout(std::time::Duration::from_secs(3))
+            .connect_timeout(option.timeout)
             .tcp_nodelay(true);
         Ok(endpoint)
     }
 
-    fn sign(&self, metadata: &mut tonic::metadata::MetadataMap) {
+    pub(crate) fn peer(&self) -> &str {
+        self.endpoints.endpoint_url()
+    }
+
+    fn sign<T>(&self, mut request: tonic::Request<T>) -> tonic::Request<T> {
+        let metadata = request.metadata_mut();
         let _ = AsciiMetadataValue::try_from(&self.client_id)
             .map(|v| metadata.insert("x-mq-client-id", v));
 
-        metadata.insert("x-mq-language", AsciiMetadataValue::from_static("RUST"));
+        metadata.insert(
+            "x-mq-language",
+            AsciiMetadataValue::from_static(SDK_LANGUAGE.as_str_name()),
+        );
         metadata.insert(
             "x-mq-client-version",
-            AsciiMetadataValue::from_static("5.0.0"),
+            AsciiMetadataValue::from_static(SDK_VERSION),
         );
         metadata.insert(
             "x-mq-protocol-version",
-            AsciiMetadataValue::from_static("2.0.0"),
+            AsciiMetadataValue::from_static(PROTOCOL_VERSION),
         );
+
+        request.set_timeout(*self.option.timeout());
+        request
+    }
+
+    pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> {
+        let (tx, rx) = mpsc::channel(16);
+        tx.send(settings).await.map_err(|e| {
+            ClientError::new(
+                ErrorKind::ChannelSend,
+                "failed to send telemetry command",
+                Self::OPERATION_START,
+            )
+            .set_source(e)
+        })?;
+
+        let request = self.sign(tonic::Request::new(ReceiverStream::new(rx)));
+        let response = self.stub.telemetry(request).await.map_err(|e| {
+            ClientError::new(
+                ErrorKind::ClientInternal,
+                "send rpc telemetry failed",
+                Self::OPERATION_START,
+            )
+            .set_source(e)
+        })?;
+
+        let logger = self.logger.clone();
+        tokio::spawn(async move {
+            let mut stream = response.into_inner();
+            loop {
+                // TODO handle server stream
+                match stream.message().await {
+                    Ok(Some(item)) => {
+                        debug!(logger, "telemetry command: {:?}", item);
+                    }
+                    Ok(None) => {
+                        debug!(logger, "request stream closed");
+                        break;
+                    }
+                    Err(e) => {
+                        error!(logger, "telemetry response error: {:?}", e);
+                    }
+                }
+            }
+        });
+        let _ = self.telemetry_tx.insert(tx);
+        debug!(self.logger, "start session success");
+        Ok(())
+    }
+
+    #[allow(dead_code)]
+    pub(crate) fn is_started(&self) -> bool {
+        self.telemetry_tx.is_some()
+    }
+
+    #[allow(dead_code)]
+    pub(crate) async fn update_settings(
+        &mut self,
+        settings: TelemetryCommand,
+    ) -> Result<(), ClientError> {
+        if !self.is_started() {
+            return Err(ClientError::new(
+                ErrorKind::ClientIsNotRunning,
+                "session is not started",
+                Self::OPERATION_UPDATE_SETTINGS,
+            ));
+        }
+
+        if let Some(tx) = self.telemetry_tx.as_ref() {
+            tx.send(settings).await.map_err(|e| {
+                ClientError::new(
+                    ErrorKind::ChannelSend,
+                    "failed to send telemetry command",
+                    Self::OPERATION_UPDATE_SETTINGS,
+                )
+                .set_source(e)
+            })?;
+        }
+        Ok(())
     }
 }
 
@@ -168,12 +283,11 @@
         &mut self,
         request: QueryRouteRequest,
     ) -> Result<QueryRouteResponse, ClientError> {
-        let mut request = tonic::Request::new(request);
-        self.sign(request.metadata_mut());
+        let request = self.sign(tonic::Request::new(request));
         let response = self.stub.query_route(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
-                "Query topic route rpc failed.",
+                "send rpc query_route failed",
                 Self::OPERATION_QUERY_ROUTE,
             )
             .set_source(e)
@@ -181,22 +295,89 @@
         Ok(response.into_inner())
     }
 
+    async fn heartbeat(
+        &mut self,
+        request: HeartbeatRequest,
+    ) -> Result<HeartbeatResponse, ClientError> {
+        let request = self.sign(tonic::Request::new(request));
+        let response = self.stub.heartbeat(request).await.map_err(|e| {
+            ClientError::new(
+                ErrorKind::ClientInternal,
+                "send rpc heartbeat failed",
+                Self::OPERATION_HEARTBEAT,
+            )
+            .set_source(e)
+        })?;
+        Ok(response.into_inner())
+    }
+
     async fn send_message(
         &mut self,
         request: SendMessageRequest,
     ) -> Result<SendMessageResponse, ClientError> {
-        let mut request = tonic::Request::new(request);
-        self.sign(request.metadata_mut());
+        let request = self.sign(tonic::Request::new(request));
         let response = self.stub.send_message(request).await.map_err(|e| {
             ClientError::new(
                 ErrorKind::ClientInternal,
-                "Send message rpc failed.",
+                "send rpc send_message failed",
                 Self::OPERATION_SEND_MESSAGE,
             )
             .set_source(e)
         })?;
         Ok(response.into_inner())
     }
+
+    async fn receive_message(
+        &mut self,
+        request: ReceiveMessageRequest,
+    ) -> Result<Vec<ReceiveMessageResponse>, ClientError> {
+        let batch_size = request.batch_size;
+        let mut request = self.sign(tonic::Request::new(request));
+        request.set_timeout(*self.option.long_polling_timeout());
+        let mut stream = self
+            .stub
+            .receive_message(request)
+            .await
+            .map_err(|e| {
+                ClientError::new(
+                    ErrorKind::ClientInternal,
+                    "send rpc receive_message failed",
+                    Self::OPERATION_RECEIVE_MESSAGE,
+                )
+                .set_source(e)
+            })?
+            .into_inner();
+
+        let mut responses = Vec::with_capacity(batch_size as usize);
+        while let Some(item) = stream.next().await {
+            let response = item.map_err(|e| {
+                ClientError::new(
+                    ErrorKind::ClientInternal,
+                    "receive message failed: error in reading stream",
+                    Self::OPERATION_RECEIVE_MESSAGE,
+                )
+                .set_source(e)
+            })?;
+            responses.push(response);
+        }
+        Ok(responses)
+    }
+
+    async fn ack_message(
+        &mut self,
+        request: AckMessageRequest,
+    ) -> Result<AckMessageResponse, ClientError> {
+        let request = self.sign(tonic::Request::new(request));
+        let response = self.stub.ack_message(request).await.map_err(|e| {
+            ClientError::new(
+                ErrorKind::ClientInternal,
+                "send rpc ack_message failed",
+                Self::OPERATION_ACK_MESSAGE,
+            )
+            .set_source(e)
+        })?;
+        Ok(response.into_inner())
+    }
 }
 
 #[derive(Debug)]
@@ -209,7 +390,7 @@
 
 impl SessionManager {
     pub(crate) fn new(logger: &Logger, client_id: String, option: &ClientOption) -> Self {
-        let logger = logger.new(o!("component" => "session_manager"));
+        let logger = logger.new(o!("component" => "session"));
         let session_map = Mutex::new(HashMap::new());
         SessionManager {
             logger,
@@ -219,31 +400,48 @@
         }
     }
 
-    pub(crate) async fn get_session(&self, endpoints: &Endpoints) -> Result<Session, ClientError> {
+    pub(crate) async fn get_or_create_session(
+        &self,
+        endpoints: &Endpoints,
+        settings: TelemetryCommand,
+    ) -> Result<Session, ClientError> {
         let mut session_map = self.session_map.lock().await;
         let endpoint_url = endpoints.endpoint_url().to_string();
         return if session_map.contains_key(&endpoint_url) {
             Ok(session_map.get(&endpoint_url).unwrap().clone())
         } else {
-            let session = Session::new(
+            let mut session = Session::new(
                 &self.logger,
                 endpoints,
                 self.client_id.clone(),
                 &self.option,
             )
             .await?;
+            session.start(settings).await?;
             session_map.insert(endpoint_url.clone(), session.clone());
             Ok(session)
         };
     }
+
+    pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, ClientError> {
+        let session_map = self.session_map.lock().await;
+        let mut sessions = Vec::new();
+        for (_, session) in session_map.iter() {
+            sessions.push(session.clone());
+        }
+        Ok(sessions)
+    }
 }
 
 #[cfg(test)]
 mod tests {
-    use crate::log::terminal_logger;
+    use crate::conf::ProducerOption;
     use slog::debug;
     use wiremock_grpc::generate;
 
+    use crate::log::terminal_logger;
+    use crate::util::build_producer_settings;
+
     use super::*;
 
     generate!("apache.rocketmq.v2", RocketMQMockServer);
@@ -260,6 +458,7 @@
         )
         .await;
         debug!(logger, "session: {:?}", session);
+        assert!(session.is_ok());
     }
 
     #[tokio::test]
@@ -273,17 +472,61 @@
         )
         .await;
         debug!(logger, "session: {:?}", session);
+        assert!(session.is_ok());
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn session_start() {
+        let mut server = RocketMQMockServer::start_default().await;
+        server.setup(
+            MockBuilder::when()
+                //    👇 RPC prefix
+                .path("/apache.rocketmq.v2.MessagingService/Telemetry")
+                .then()
+                .return_status(Code::Ok),
+        );
+
+        let logger = terminal_logger();
+        let session = Session::new(
+            &logger,
+            &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
+            "test_client".to_string(),
+            &ClientOption::default(),
+        )
+        .await;
+        debug!(logger, "session: {:?}", session);
+        assert!(session.is_ok());
+
+        let mut session = session.unwrap();
+
+        let result = session
+            .start(build_producer_settings(
+                &ProducerOption::default(),
+                &ClientOption::default(),
+            ))
+            .await;
+        assert!(result.is_ok());
+        assert!(session.is_started());
     }
 
     #[tokio::test]
     async fn session_manager_new() {
-        let server = RocketMQMockServer::start_default().await;
+        let mut server = RocketMQMockServer::start_default().await;
+        server.setup(
+            MockBuilder::when()
+                //    👇 RPC prefix
+                .path("/apache.rocketmq.v2.MessagingService/Telemetry")
+                .then()
+                .return_status(Code::Ok),
+        );
+
         let logger = terminal_logger();
         let session_manager =
             SessionManager::new(&logger, "test_client".to_string(), &ClientOption::default());
         let session = session_manager
-            .get_session(
+            .get_or_create_session(
                 &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
+                build_producer_settings(&ProducerOption::default(), &ClientOption::default()),
             )
             .await
             .unwrap();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
new file mode 100644
index 0000000..e44f7cd
--- /dev/null
+++ b/rust/src/simple_consumer.rs
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::time::Duration;
+
+use slog::{info, Logger};
+
+use crate::client::Client;
+use crate::conf::{ClientOption, SimpleConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, FilterExpression};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::util::{
+    build_endpoints_by_message_queue, build_simple_consumer_settings, select_message_queue,
+};
+use crate::{log, pb};
+
+#[derive(Debug)]
+pub struct SimpleConsumer {
+    option: SimpleConsumerOption,
+    logger: Logger,
+    client: Client,
+}
+
+impl SimpleConsumer {
+    const OPERATION_START_SIMPLE_CONSUMER: &'static str = "simple_consumer.start";
+    const OPERATION_RECEIVE_MESSAGE: &'static str = "simple_consumer.receive_message";
+
+    pub fn new(
+        option: SimpleConsumerOption,
+        client_option: ClientOption,
+    ) -> Result<Self, ClientError> {
+        let client_option = ClientOption {
+            client_type: ClientType::SimpleConsumer,
+            group: option.consumer_group().to_string(),
+            namespace: option.namespace().to_string(),
+            ..client_option
+        };
+        let logger = log::logger(option.logging_format());
+        let settings = build_simple_consumer_settings(&option, &client_option);
+        let client = Client::new(&logger, client_option, settings)?;
+        Ok(SimpleConsumer {
+            option,
+            logger,
+            client,
+        })
+    }
+
+    pub async fn start(&self) -> Result<(), ClientError> {
+        if self.option.consumer_group().is_empty() {
+            return Err(ClientError::new(
+                ErrorKind::Config,
+                "required option is missing: consumer group is empty",
+                Self::OPERATION_START_SIMPLE_CONSUMER,
+            ));
+        }
+        if let Some(topics) = self.option.topics() {
+            for topic in topics {
+                self.client.topic_route(topic, true).await?;
+            }
+        }
+        self.client.start();
+        info!(
+            self.logger,
+            "start simple consumer success, client_id: {}",
+            self.client.client_id()
+        );
+        Ok(())
+    }
+
+    pub async fn receive(
+        &self,
+        topic: impl AsRef<str>,
+        expression: &FilterExpression,
+    ) -> Result<Vec<MessageView>, ClientError> {
+        self.receive_with_batch_size(topic.as_ref(), expression, 32, Duration::from_secs(15))
+            .await
+    }
+
+    pub async fn receive_with_batch_size(
+        &self,
+        topic: &str,
+        expression: &FilterExpression,
+        batch_size: i32,
+        invisible_duration: Duration,
+    ) -> Result<Vec<MessageView>, ClientError> {
+        let route = self.client.topic_route(topic, true).await?;
+        let message_queue = select_message_queue(route);
+        let endpoints =
+            build_endpoints_by_message_queue(&message_queue, Self::OPERATION_RECEIVE_MESSAGE)?;
+        let messages = self
+            .client
+            .receive_message(
+                &endpoints,
+                message_queue,
+                pb::FilterExpression {
+                    r#type: expression.filter_type as i32,
+                    expression: expression.expression.clone(),
+                },
+                batch_size,
+                prost_types::Duration::try_from(invisible_duration).unwrap(),
+            )
+            .await?;
+        Ok(messages
+            .into_iter()
+            .map(|message| MessageView::from_pb_message(message, endpoints.clone()))
+            .collect())
+    }
+
+    pub async fn ack(&self, ack_entry: impl AckMessageEntry) -> Result<(), ClientError> {
+        self.client.ack_message(ack_entry).await?;
+        Ok(())
+    }
+}
diff --git a/rust/src/util.rs b/rust/src/util.rs
new file mode 100644
index 0000000..94e4984
--- /dev/null
+++ b/rust/src/util.rs
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::hash::Hasher;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+use crate::conf::{ClientOption, ProducerOption, SimpleConsumerOption};
+use siphasher::sip::SipHasher24;
+
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{Endpoints, Route};
+use crate::pb::settings::PubSub;
+use crate::pb::telemetry_command::Command;
+use crate::pb::{
+    Language, MessageQueue, Publishing, Resource, Settings, Subscription, TelemetryCommand, Ua,
+};
+
+pub(crate) static SDK_LANGUAGE: Language = Language::Rust;
+pub(crate) static SDK_VERSION: &str = "5.0.0";
+pub(crate) static PROTOCOL_VERSION: &str = "2.0.0";
+
+lazy_static::lazy_static! {
+    pub(crate) static ref HOST_NAME: String = match hostname::get() {
+        Ok(name) => name.to_str().unwrap_or("localhost").to_string(),
+        Err(_) => "localhost".to_string(),
+    };
+}
+
+pub(crate) fn select_message_queue(route: Arc<Route>) -> MessageQueue {
+    let i = route.index.fetch_add(1, Ordering::Relaxed);
+    route.queue[i % route.queue.len()].clone()
+}
+
+pub(crate) fn select_message_queue_by_message_group(
+    route: Arc<Route>,
+    message_group: String,
+) -> MessageQueue {
+    let mut sip_hasher24 = SipHasher24::default();
+    sip_hasher24.write(message_group.as_bytes());
+    let index = sip_hasher24.finish() % route.queue.len() as u64;
+    route.queue[index as usize].clone()
+}
+
+pub(crate) fn build_endpoints_by_message_queue(
+    message_queue: &MessageQueue,
+    operation: &'static str,
+) -> Result<Endpoints, ClientError> {
+    let topic = message_queue.topic.clone().unwrap().name;
+    if message_queue.broker.is_none() {
+        return Err(ClientError::new(
+            ErrorKind::NoBrokerAvailable,
+            "message queue do not have a available endpoint",
+            operation,
+        )
+        .with_context("message_queue", format!("{:?}", message_queue)));
+    }
+
+    let broker = message_queue.broker.clone().unwrap();
+    if broker.endpoints.is_none() {
+        return Err(ClientError::new(
+            ErrorKind::NoBrokerAvailable,
+            "message queue do not have a available endpoint",
+            operation,
+        )
+        .with_context("broker", broker.name)
+        .with_context("topic", topic)
+        .with_context("queue_id", message_queue.id.to_string()));
+    }
+
+    Ok(Endpoints::from_pb_endpoints(broker.endpoints.unwrap()))
+}
+
+pub(crate) fn build_producer_settings(
+    option: &ProducerOption,
+    client_options: &ClientOption,
+) -> TelemetryCommand {
+    let topics = option
+        .topics()
+        .clone()
+        .unwrap_or(vec![])
+        .iter()
+        .map(|topic| Resource {
+            name: topic.to_string(),
+            resource_namespace: option.namespace().to_string(),
+        })
+        .collect();
+    let platform = os_type::current_platform();
+    TelemetryCommand {
+        command: Some(Command::Settings(Settings {
+            client_type: Some(client_options.client_type.clone() as i32),
+            request_timeout: Some(prost_types::Duration {
+                seconds: client_options.timeout().as_secs() as i64,
+                nanos: client_options.timeout().subsec_nanos() as i32,
+            }),
+            pub_sub: Some(PubSub::Publishing(Publishing {
+                topics,
+                validate_message_type: option.validate_message_type(),
+                ..Publishing::default()
+            })),
+            user_agent: Some(Ua {
+                language: SDK_LANGUAGE as i32,
+                version: SDK_VERSION.to_string(),
+                platform: format!("{:?} {}", platform.os_type, platform.version),
+                hostname: HOST_NAME.clone(),
+            }),
+            ..Settings::default()
+        })),
+        ..TelemetryCommand::default()
+    }
+}
+
+pub(crate) fn build_simple_consumer_settings(
+    option: &SimpleConsumerOption,
+    client_option: &ClientOption,
+) -> TelemetryCommand {
+    let platform = os_type::current_platform();
+    TelemetryCommand {
+        command: Some(Command::Settings(Settings {
+            client_type: Some(client_option.client_type.clone() as i32),
+            request_timeout: Some(prost_types::Duration {
+                seconds: client_option.timeout().as_secs() as i64,
+                nanos: client_option.timeout().subsec_nanos() as i32,
+            }),
+            pub_sub: Some(PubSub::Subscription(Subscription {
+                group: Some(Resource {
+                    name: option.consumer_group().to_string(),
+                    resource_namespace: option.namespace().to_string(),
+                }),
+                subscriptions: vec![],
+                fifo: Some(false),
+                receive_batch_size: None,
+                long_polling_timeout: Some(prost_types::Duration {
+                    seconds: client_option.long_polling_timeout().as_secs() as i64,
+                    nanos: client_option.long_polling_timeout().subsec_nanos() as i32,
+                }),
+            })),
+            user_agent: Some(Ua {
+                language: SDK_LANGUAGE as i32,
+                version: SDK_VERSION.to_string(),
+                platform: format!("{:?} {}", platform.os_type, platform.version),
+                hostname: HOST_NAME.clone(),
+            }),
+            ..Settings::default()
+        })),
+        ..TelemetryCommand::default()
+    }
+}