feat(rust): implement simple consumer (#476)
* feat(rust): implement simple consumer
* chore(rust): improve sdk api signature
* build(rust): check protoc version before compile proto files
* style(rust): fix markdown style
* fix(rust): fix config according to review comments
* doc(rust): fix README.md
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index c99ca5f..35a229c 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -33,6 +33,7 @@
parking_lot = "0.12"
hmac = "0.12"
hostname = "0.3.1"
+os_type = "2.6.0"
slog = {version = "2.7.0", features=["max_level_trace", "release_max_level_info"]}
slog-term = "2.9.0"
@@ -57,6 +58,9 @@
[build-dependencies]
tonic-build = "0.9.0"
+which = "4.4.0"
+version_check = "0.9.4"
+regex = "1.7.3"
[dev-dependencies]
wiremock-grpc = "0.0.3-alpha2"
diff --git a/rust/README.md b/rust/README.md
new file mode 100644
index 0000000..5c48483
--- /dev/null
+++ b/rust/README.md
@@ -0,0 +1,29 @@
+# The Rust Implementation of Apache RocketMQ Client
+
+[RocketMQ Website](https://rocketmq.apache.org/)
+
+## Overview
+
+Here is the rust implementation of the client for [Apache RocketMQ](https://rocketmq.apache.org/). Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client), the current implementation is based on separating architecture for computing and storage, which is the more recommended way to access the RocketMQ service.
+
+Here are some preparations you may need to know (or refer to [here](https://rocketmq.apache.org/docs/quickStart/02quickstart)).
+
+## Getting Started
+
+### Requirements
+
+1. rust and cargo
+2. protoc 3.15.0+
+3. setup name server, broker, and [proxy](https://github.com/apache/rocketmq/tree/develop/proxy).
+
+### Run Example
+
+Run the following command to start the example:
+
+```sh
+# send message via producer
+cargo run --example producer
+
+# consume message via simple consumer
+cargo run --example simple_consumer
+```
diff --git a/rust/build.rs b/rust/build.rs
index aada9c3..41efaab 100644
--- a/rust/build.rs
+++ b/rust/build.rs
@@ -14,7 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use regex::Regex;
+use std::path::PathBuf;
+use std::process::Command;
+use std::{env, str};
+use version_check::Version;
+
fn main() {
+ check_protoc_version();
+
tonic_build::configure()
.build_client(true)
.build_server(false)
@@ -28,3 +36,38 @@
)
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}
+
+fn check_protoc_version() {
+ let protoc = env::var_os("PROTOC")
+ .map(PathBuf::from)
+ .or_else(|| which::which("protoc").ok());
+
+ if protoc.is_none() {
+ panic!("protoc not found");
+ }
+
+ let mut cmd = Command::new(protoc.unwrap());
+ cmd.arg("--version");
+ let result = cmd.output();
+
+ if result.is_err() {
+ panic!("failed to invoke protoc: {:?}", result)
+ }
+
+ let output = result.unwrap();
+ if !output.status.success() {
+ panic!("protoc failed: {:?}", output)
+ }
+
+ let version_regex = Regex::new(r"(?:(\d+)\.)?(?:(\d+)\.)?(\*|\d+)").unwrap();
+ let protoc_version = version_regex.find(str::from_utf8(&output.stdout).unwrap());
+ if protoc_version.is_none() {
+ panic!("failed to parse protoc version");
+ }
+
+ let protoc_version = Version::parse(protoc_version.unwrap().as_str()).unwrap();
+ let min_version = Version::from_mmp(3, 15, 0);
+ if protoc_version.cmp(&min_version).is_le() {
+ panic!("protoc version must be >= 3.15.0");
+ }
+}
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index e88a3bc..c1b00d1 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -14,27 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-use rocketmq::{ClientOption, MessageImpl, Producer, ProducerOption};
+use rocketmq::conf::{ClientOption, ProducerOption};
+use rocketmq::model::message::MessageImpl;
+use rocketmq::Producer;
#[tokio::main]
async fn main() {
- // specify which topic(s) you would like to send message to
+ // recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
let mut producer_option = ProducerOption::default();
- producer_option.set_topics(vec!["test_topic".to_string()]);
+ producer_option.set_topics(vec!["test_topic"]);
// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
- client_option.set_access_url("localhost:8081".to_string());
+ client_option.set_access_url("localhost:8081");
// build and start producer
- let producer = Producer::new(producer_option, client_option).await.unwrap();
+ let producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();
// build message
let message = MessageImpl::builder()
- .set_topic("test_topic".to_string())
- .set_tags("test_tag".to_string())
+ .set_topic("test_topic")
+ .set_tags("test_tag")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap();
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
new file mode 100644
index 0000000..4011556
--- /dev/null
+++ b/rust/examples/simple_consumer.rs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use rocketmq::conf::{ClientOption, SimpleConsumerOption};
+use rocketmq::model::common::{FilterExpression, FilterType};
+use rocketmq::SimpleConsumer;
+
+#[tokio::main]
+async fn main() {
+ // recommend to specify which topic(s) you would like to send message to
+ // simple consumer will prefetch topic route when start and failed fast if topic not exist
+ let mut consumer_option = SimpleConsumerOption::default();
+ consumer_option.set_topics(vec!["test_topic"]);
+ consumer_option.set_consumer_group("SimpleConsumerGroup");
+
+ // set which rocketmq proxy to connect
+ let mut client_option = ClientOption::default();
+ client_option.set_access_url("localhost:8081");
+
+ // build and start simple consumer
+ let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
+ consumer.start().await.unwrap();
+
+ // pop message from rocketmq proxy
+ let receive_result = consumer
+ .receive(
+ "test_topic".to_string(),
+ &FilterExpression::new(FilterType::Tag, "test_tag"),
+ )
+ .await;
+ debug_assert!(
+ receive_result.is_ok(),
+ "receive message failed: {:?}",
+ receive_result.unwrap_err()
+ );
+
+ let messages = receive_result.unwrap();
+ for message in messages {
+ println!("receive message: {:?}", message);
+ // ack message to rocketmq proxy
+ let ack_result = consumer.ack(message).await;
+ debug_assert!(
+ ack_result.is_ok(),
+ "ack message failed: {:?}",
+ ack_result.unwrap_err()
+ );
+ }
+}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index a60777e..7e50b3b 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -19,14 +19,21 @@
use std::{collections::HashMap, sync::atomic::AtomicUsize, sync::Arc};
use parking_lot::Mutex;
+use prost_types::Duration;
use slog::{debug, error, info, o, warn, Logger};
+use tokio::select;
use tokio::sync::oneshot;
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{Endpoints, Route, RouteStatus};
+use crate::model::common::{ClientType, Endpoints, Route, RouteStatus};
+use crate::model::message::AckMessageEntry;
+use crate::pb;
+use crate::pb::receive_message_response::Content;
use crate::pb::{
- Code, Message, QueryRouteRequest, Resource, SendMessageRequest, SendResultEntry, Status,
+ AckMessageRequest, AckMessageResultEntry, Code, FilterExpression, HeartbeatRequest, Message,
+ MessageQueue, QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest,
+ SendResultEntry, Status, TelemetryCommand,
};
use crate::session::{RPCClient, Session, SessionManager};
@@ -34,10 +41,11 @@
pub(crate) struct Client {
logger: Logger,
option: ClientOption,
- session_manager: SessionManager,
+ session_manager: Arc<SessionManager>,
route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
id: String,
access_endpoints: Endpoints,
+ settings: TelemetryCommand,
}
lazy_static::lazy_static! {
@@ -47,9 +55,16 @@
impl Client {
const OPERATION_CLIENT_NEW: &'static str = "client.new";
const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
+ const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
+ const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
+ const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";
- pub(crate) fn new(logger: &Logger, option: ClientOption) -> Result<Self, ClientError> {
+ pub(crate) fn new(
+ logger: &Logger,
+ option: ClientOption,
+ settings: TelemetryCommand,
+ ) -> Result<Self, ClientError> {
let id = Self::generate_client_id();
let endpoints = Endpoints::from_url(option.access_url())
.map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
@@ -57,13 +72,85 @@
Ok(Client {
logger: logger.new(o!("component" => "client")),
option,
- session_manager,
+ session_manager: Arc::new(session_manager),
route_table: Mutex::new(HashMap::new()),
id,
access_endpoints: endpoints,
+ settings,
})
}
+ async fn heart_beat(
+ logger: &Logger,
+ session_manager: Arc<SessionManager>,
+ group: &str,
+ namespace: &str,
+ client_type: &ClientType,
+ ) {
+ let sessions = session_manager.get_all_sessions().await;
+ if sessions.is_err() {
+ error!(
+ logger,
+ "send heartbeat failed: failed to get sessions: {}",
+ sessions.unwrap_err()
+ );
+ return;
+ }
+ for mut session in sessions.unwrap() {
+ let request = HeartbeatRequest {
+ group: Some(Resource {
+ name: group.to_string(),
+ resource_namespace: namespace.to_string(),
+ }),
+ client_type: client_type.clone() as i32,
+ };
+ let response = session.heartbeat(request).await;
+ if response.is_err() {
+ error!(
+ logger,
+ "send heartbeat failed: failed to send heartbeat rpc: {}",
+ response.unwrap_err()
+ );
+ return;
+ }
+ let result =
+ Self::handle_response_status(response.unwrap().status, Self::OPERATION_HEARTBEAT);
+ if result.is_err() {
+ error!(
+ logger,
+ "send heartbeat failed: server return error: {}",
+ result.unwrap_err()
+ );
+ return;
+ }
+ debug!(
+ logger,
+ "send heartbeat to server success, peer={}",
+ session.peer()
+ );
+ }
+ }
+
+ pub(crate) fn start(&self) {
+ let logger = self.logger.clone();
+ let session_manager = self.session_manager.clone();
+
+ let group = self.option.group.to_string();
+ let namespace = self.option.namespace.to_string();
+ let client_type = self.option.client_type.clone();
+
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
+ loop {
+ select! {
+ _ = interval.tick() => {
+ Self::heart_beat(&logger, session_manager.clone(), &group, &namespace, &client_type).await;
+ },
+ }
+ }
+ });
+ }
+
pub(crate) fn client_id(&self) -> &str {
&self.id
}
@@ -88,27 +175,32 @@
}
async fn get_session(&self) -> Result<Session, ClientError> {
- self.session_manager
- .get_session(&self.access_endpoints)
- .await
+ let session = self
+ .session_manager
+ .get_or_create_session(&self.access_endpoints, self.settings.clone())
+ .await?;
+ Ok(session)
}
async fn get_session_with_endpoints(
&self,
endpoints: &Endpoints,
) -> Result<Session, ClientError> {
- self.session_manager.get_session(endpoints).await
+ let session = self
+ .session_manager
+ .get_or_create_session(endpoints, self.settings.clone())
+ .await?;
+ Ok(session)
}
fn handle_response_status(
- &self,
status: Option<Status>,
operation: &'static str,
) -> Result<(), ClientError> {
if status.is_none() {
return Err(ClientError::new(
ErrorKind::Server,
- "Server do not return status, this may be a bug.",
+ "server do not return status, this may be a bug",
operation,
));
}
@@ -117,7 +209,7 @@
let status_code = Code::from_i32(status.code).unwrap();
if !status_code.eq(&Code::Ok) {
return Err(
- ClientError::new(ErrorKind::Server, "Server return an error.", operation)
+ ClientError::new(ErrorKind::Server, "server return an error", operation)
.with_context("code", status_code.as_str_name())
.with_context("message", status.message),
);
@@ -158,13 +250,13 @@
let request = QueryRouteRequest {
topic: Some(Resource {
name: topic.to_owned(),
- resource_namespace: self.option.name_space().to_string(),
+ resource_namespace: self.option.namespace.to_string(),
}),
endpoints: Some(self.access_endpoints.inner().clone()),
};
let response = rpc_client.query_route(request).await?;
- self.handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
+ Self::handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
let route = Route {
index: AtomicUsize::new(0),
@@ -209,7 +301,7 @@
Ok(route) => route,
Err(_e) => Err(ClientError::new(
ErrorKind::ChannelReceive,
- "Wait for inflight query topic route request failed.",
+ "wait for inflight query topic route request failed",
Self::OPERATION_QUERY_ROUTE,
)),
};
@@ -248,7 +340,7 @@
for item in v.drain(..) {
let _ = item.send(Err(ClientError::new(
ErrorKind::Server,
- "Query route failed.",
+ "query topic route failed",
Self::OPERATION_QUERY_ROUTE,
)));
}
@@ -259,8 +351,8 @@
pub(crate) async fn send_message(
&self,
- messages: Vec<Message>,
endpoints: &Endpoints,
+ messages: Vec<Message>,
) -> Result<Vec<SendResultEntry>, ClientError> {
self.send_message_inner(
self.get_session_with_endpoints(endpoints).await.unwrap(),
@@ -277,7 +369,7 @@
let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
- self.handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
+ Self::handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
if response.entries.len() != message_count {
error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
@@ -285,6 +377,104 @@
Ok(response.entries)
}
+
+ pub(crate) async fn receive_message(
+ &self,
+ endpoints: &Endpoints,
+ message_queue: MessageQueue,
+ expression: FilterExpression,
+ batch_size: i32,
+ invisible_duration: Duration,
+ ) -> Result<Vec<Message>, ClientError> {
+ self.receive_message_inner(
+ self.get_session_with_endpoints(endpoints).await.unwrap(),
+ message_queue,
+ expression,
+ batch_size,
+ invisible_duration,
+ )
+ .await
+ }
+
+ pub(crate) async fn receive_message_inner(
+ &self,
+ mut rpc_client: impl RPCClient,
+ message_queue: MessageQueue,
+ expression: FilterExpression,
+ batch_size: i32,
+ invisible_duration: Duration,
+ ) -> Result<Vec<Message>, ClientError> {
+ let request = ReceiveMessageRequest {
+ group: Some(Resource {
+ name: self.option.group.to_string(),
+ resource_namespace: self.option.namespace.to_string(),
+ }),
+ message_queue: Some(message_queue),
+ filter_expression: Some(expression),
+ batch_size,
+ invisible_duration: Some(invisible_duration),
+ auto_renew: false,
+ long_polling_timeout: Some(
+ Duration::try_from(*self.option.long_polling_timeout()).unwrap(),
+ ),
+ };
+ let responses = rpc_client.receive_message(request).await?;
+
+ let mut messages = Vec::with_capacity(batch_size as usize);
+ for response in responses {
+ match response.content.unwrap() {
+ Content::Status(status) => {
+ Self::handle_response_status(Some(status), Self::OPERATION_RECEIVE_MESSAGE)?;
+ }
+ Content::Message(message) => {
+ messages.push(message);
+ }
+ Content::DeliveryTimestamp(_) => {}
+ }
+ }
+ Ok(messages)
+ }
+
+ pub(crate) async fn ack_message(
+ &self,
+ ack_entry: impl AckMessageEntry,
+ ) -> Result<AckMessageResultEntry, ClientError> {
+ let result = self
+ .ack_message_inner(
+ self.get_session_with_endpoints(ack_entry.endpoints())
+ .await
+ .unwrap(),
+ ack_entry.topic(),
+ vec![pb::AckMessageEntry {
+ message_id: ack_entry.message_id(),
+ receipt_handle: ack_entry.receipt_handle(),
+ }],
+ )
+ .await?;
+ Ok(result[0].clone())
+ }
+
+ pub(crate) async fn ack_message_inner(
+ &self,
+ mut rpc_client: impl RPCClient,
+ topic: String,
+ entries: Vec<pb::AckMessageEntry>,
+ ) -> Result<Vec<AckMessageResultEntry>, ClientError> {
+ let request = AckMessageRequest {
+ group: Some(Resource {
+ name: self.option.group.to_string(),
+ resource_namespace: self.option.namespace.to_string(),
+ }),
+ topic: Some(Resource {
+ name: topic,
+ resource_namespace: self.option.namespace.to_string(),
+ }),
+ entries,
+ };
+ let response = rpc_client.ack_message(request).await?;
+ Self::handle_response_status(response.status, Self::OPERATION_ACK_MESSAGE)?;
+ Ok(response.entries)
+ }
}
#[cfg(test)]
@@ -299,8 +489,11 @@
use crate::error::{ClientError, ErrorKind};
use crate::log::terminal_logger;
use crate::model::common::Route;
+ use crate::pb::receive_message_response::Content;
use crate::pb::{
- Code, MessageQueue, QueryRouteResponse, Resource, SendMessageResponse, Status,
+ AckMessageEntry, AckMessageResponse, Code, FilterExpression, Message, MessageQueue,
+ QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status,
+ TelemetryCommand,
};
use crate::session;
@@ -315,19 +508,17 @@
#[test]
fn handle_response_status() {
- let client = Client::new(&terminal_logger(), ClientOption::default()).unwrap();
-
- let result = client.handle_response_status(None, "test");
+ let result = Client::handle_response_status(None, "test");
assert!(result.is_err(), "should return error when status is None");
let result = result.unwrap_err();
assert_eq!(result.kind, ErrorKind::Server);
assert_eq!(
result.message,
- "Server do not return status, this may be a bug."
+ "server do not return status, this may be a bug"
);
assert_eq!(result.operation, "test");
- let result = client.handle_response_status(
+ let result = Client::handle_response_status(
Some(Status {
code: Code::BadRequest as i32,
message: "test failed".to_string(),
@@ -340,17 +531,17 @@
);
let result = result.unwrap_err();
assert_eq!(result.kind, ErrorKind::Server);
- assert_eq!(result.message, "Server return an error.");
+ assert_eq!(result.message, "server return an error");
assert_eq!(result.operation, "test failed");
assert_eq!(
result.context,
vec![
("code", "BAD_REQUEST".to_string()),
- ("message", "test failed".to_string())
+ ("message", "test failed".to_string()),
]
);
- let result = client.handle_response_status(
+ let result = Client::handle_response_status(
Some(Status {
code: Code::Ok as i32,
message: "test success".to_string(),
@@ -382,7 +573,12 @@
#[tokio::test]
async fn client_query_route_from_cache() {
let logger = terminal_logger();
- let client = Client::new(&logger, ClientOption::default()).unwrap();
+ let client = Client::new(
+ &logger,
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
client.route_table.lock().insert(
"DefaultCluster".to_string(),
super::RouteStatus::Found(Arc::new(Route {
@@ -397,7 +593,12 @@
#[tokio::test]
async fn client_query_route() {
let logger = terminal_logger();
- let client = Client::new(&logger, ClientOption::default()).unwrap();
+ let client = Client::new(
+ &logger,
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
let mut mock = session::MockRPCClient::new();
mock.expect_query_route()
@@ -420,7 +621,12 @@
#[tokio::test(flavor = "multi_thread")]
async fn client_query_route_with_inflight_request() {
let logger = terminal_logger();
- let client = Client::new(&logger, ClientOption::default()).unwrap();
+ let client = Client::new(
+ &logger,
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
let client = Arc::new(client);
let client_clone = client.clone();
@@ -448,7 +654,12 @@
#[tokio::test(flavor = "multi_thread")]
async fn client_query_route_with_failed_request() {
let logger = terminal_logger();
- let client = Client::new(&logger, ClientOption::default()).unwrap();
+ let client = Client::new(
+ &logger,
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
let client = Arc::new(client);
let client_clone = client.clone();
@@ -458,7 +669,7 @@
sleep(Duration::from_millis(200));
Box::pin(futures::future::ready(Err(ClientError::new(
ErrorKind::Server,
- "Server error",
+ "server error",
"test",
))))
});
@@ -490,11 +701,138 @@
mock.expect_send_message()
.return_once(|_| Box::pin(futures::future::ready(response)));
- let client = Client::new(&terminal_logger(), ClientOption::default()).unwrap();
+ let client = Client::new(
+ &terminal_logger(),
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
let send_result = client.send_message_inner(mock, vec![]).await;
assert!(send_result.is_ok());
let send_results = send_result.unwrap();
assert_eq!(send_results.len(), 0);
}
+
+ #[tokio::test]
+ async fn client_receive_message() {
+ let response = Ok(vec![ReceiveMessageResponse {
+ content: Some(Content::Message(Message::default())),
+ }]);
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_receive_message()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+
+ let client = Client::new(
+ &terminal_logger(),
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
+ let receive_result = client
+ .receive_message_inner(
+ mock,
+ MessageQueue::default(),
+ FilterExpression::default(),
+ 32,
+ prost_types::Duration::default(),
+ )
+ .await;
+ assert!(receive_result.is_ok());
+
+ let messages = receive_result.unwrap();
+ assert_eq!(messages.len(), 1);
+ }
+
+ #[tokio::test]
+ async fn client_receive_message_failed() {
+ let response = Ok(vec![ReceiveMessageResponse {
+ content: Some(Content::Status(Status {
+ code: Code::BadRequest as i32,
+ message: "Unknown error".to_string(),
+ })),
+ }]);
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_receive_message()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+
+ let client = Client::new(
+ &terminal_logger(),
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
+ let receive_result = client
+ .receive_message_inner(
+ mock,
+ MessageQueue::default(),
+ FilterExpression::default(),
+ 32,
+ prost_types::Duration::default(),
+ )
+ .await;
+ assert!(receive_result.is_err());
+
+ let error = receive_result.unwrap_err();
+ assert_eq!(error.kind, ErrorKind::Server);
+ assert_eq!(error.message, "server return an error");
+ assert_eq!(error.operation, "client.receive_message");
+ }
+
+ #[tokio::test]
+ async fn client_ack_message() {
+ let response = Ok(AckMessageResponse {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "Success".to_string(),
+ }),
+ entries: vec![],
+ });
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_ack_message()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+
+ let client = Client::new(
+ &terminal_logger(),
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
+ let ack_entries: Vec<AckMessageEntry> = vec![];
+ let ack_result = client
+ .ack_message_inner(mock, "test_topic".to_string(), ack_entries)
+ .await;
+ assert!(ack_result.is_ok());
+ assert_eq!(ack_result.unwrap().len(), 0);
+ }
+
+ #[tokio::test]
+ async fn client_ack_message_failed() {
+ let response = Ok(AckMessageResponse {
+ status: Some(Status {
+ code: Code::BadRequest as i32,
+ message: "Success".to_string(),
+ }),
+ entries: vec![],
+ });
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_ack_message()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+
+ let client = Client::new(
+ &terminal_logger(),
+ ClientOption::default(),
+ TelemetryCommand::default(),
+ )
+ .unwrap();
+ let ack_entries: Vec<AckMessageEntry> = vec![];
+ let ack_result = client
+ .ack_message_inner(mock, "test_topic".to_string(), ack_entries)
+ .await;
+
+ let error = ack_result.unwrap_err();
+ assert_eq!(error.kind, ErrorKind::Server);
+ assert_eq!(error.message, "server return an error");
+ assert_eq!(error.operation, "client.ack_message");
+ }
}
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 334a1e8..8423d90 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -14,36 +14,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use crate::model::common::ClientType;
+use std::time::Duration;
+
#[derive(Debug, Clone)]
pub struct ClientOption {
- name_space: String,
- access_url: String,
- enable_tls: bool,
+ pub(crate) client_type: ClientType,
+ pub(crate) group: String,
+ pub(crate) namespace: String,
+ pub(crate) access_url: String,
+ pub(crate) enable_tls: bool,
+ pub(crate) timeout: Duration,
+ pub(crate) long_polling_timeout: Duration,
}
impl Default for ClientOption {
fn default() -> Self {
ClientOption {
- name_space: "".to_string(),
+ client_type: ClientType::Producer,
+ group: "".to_string(),
+ namespace: "".to_string(),
access_url: "localhost:8081".to_string(),
enable_tls: false,
+ timeout: Duration::from_secs(3),
+ long_polling_timeout: Duration::from_secs(40),
}
}
}
impl ClientOption {
- pub fn name_space(&self) -> &str {
- &self.name_space
- }
- pub fn set_name_space(&mut self, name_space: String) {
- self.name_space = name_space;
- }
-
pub fn access_url(&self) -> &str {
&self.access_url
}
- pub fn set_access_url(&mut self, access_url: String) {
- self.access_url = access_url;
+ pub fn set_access_url(&mut self, access_url: impl Into<String>) {
+ self.access_url = access_url.into();
}
pub fn enable_tls(&self) -> bool {
@@ -52,6 +56,20 @@
pub fn set_enable_tls(&mut self, enable_tls: bool) {
self.enable_tls = enable_tls;
}
+
+ pub fn timeout(&self) -> &Duration {
+ &self.timeout
+ }
+ pub fn set_timeout(&mut self, timeout: Duration) {
+ self.timeout = timeout;
+ }
+
+ pub fn long_polling_timeout(&self) -> &Duration {
+ &self.long_polling_timeout
+ }
+ pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
+ self.long_polling_timeout = long_polling_timeout;
+ }
}
#[derive(Debug, Clone)]
@@ -66,6 +84,7 @@
prefetch_route: bool,
topics: Option<Vec<String>>,
namespace: String,
+ validate_message_type: bool,
}
impl Default for ProducerOption {
@@ -75,6 +94,7 @@
prefetch_route: true,
topics: None,
namespace: "".to_string(),
+ validate_message_type: true,
}
}
}
@@ -97,14 +117,81 @@
pub fn topics(&self) -> &Option<Vec<String>> {
&self.topics
}
- pub fn set_topics(&mut self, topics: Vec<String>) {
- self.topics = Some(topics);
+ pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
+ self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
}
- pub fn namespace(&self) -> &str {
+ // not expose to user for now
+ pub(crate) fn namespace(&self) -> &str {
&self.namespace
}
- pub fn set_namespace(&mut self, name_space: String) {
- self.namespace = name_space;
+ pub(crate) fn set_namespace(&mut self, name_space: impl Into<String>) {
+ self.namespace = name_space.into();
+ }
+
+ pub fn validate_message_type(&self) -> bool {
+ self.validate_message_type
+ }
+ pub fn set_validate_message_type(&mut self, validate_message_type: bool) {
+ self.validate_message_type = validate_message_type;
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct SimpleConsumerOption {
+ logging_format: LoggingFormat,
+ consumer_group: String,
+ prefetch_route: bool,
+ topics: Option<Vec<String>>,
+ namespace: String,
+}
+
+impl Default for SimpleConsumerOption {
+ fn default() -> Self {
+ SimpleConsumerOption {
+ logging_format: LoggingFormat::Terminal,
+ consumer_group: "".to_string(),
+ prefetch_route: true,
+ topics: None,
+ namespace: "".to_string(),
+ }
+ }
+}
+
+impl SimpleConsumerOption {
+ pub fn logging_format(&self) -> &LoggingFormat {
+ &self.logging_format
+ }
+ pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
+ self.logging_format = logging_format;
+ }
+
+ pub fn consumer_group(&self) -> &str {
+ &self.consumer_group
+ }
+ pub fn set_consumer_group(&mut self, consumer_group: impl Into<String>) {
+ self.consumer_group = consumer_group.into();
+ }
+
+ pub fn prefetch_route(&self) -> &bool {
+ &self.prefetch_route
+ }
+ pub fn set_prefetch_route(&mut self, prefetch_route: bool) {
+ self.prefetch_route = prefetch_route;
+ }
+
+ pub fn topics(&self) -> &Option<Vec<String>> {
+ &self.topics
+ }
+ pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
+ self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
+ }
+
+ // not expose to user for now
+ pub(crate) fn namespace(&self) -> &str {
+ &self.namespace
+ }
+ pub(crate) fn set_namespace(&mut self, name_space: impl Into<String>) {
+ self.namespace = name_space.into();
}
}
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 6ae04e3..e7299a5 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -38,6 +38,9 @@
#[error("Client internal error")]
ClientInternal,
+ #[error("Client is not running")]
+ ClientIsNotRunning,
+
#[error("Failed to send message via channel")]
ChannelSend,
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 4a0c165..c6d1cc2 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
#[allow(dead_code)]
-mod conf;
+pub mod conf;
#[allow(dead_code)]
mod error;
#[allow(dead_code)]
@@ -29,11 +29,12 @@
mod session;
#[allow(dead_code)]
-mod model;
+pub mod model;
+mod util;
+
mod producer;
+mod simple_consumer;
// Export structs that are part of crate API.
-pub use conf::ClientOption;
-pub use conf::ProducerOption;
-pub use model::message::MessageImpl;
pub use producer::Producer;
+pub use simple_consumer::SimpleConsumer;
diff --git a/rust/src/log.rs b/rust/src/log.rs
index ae7edcc..db2f473 100644
--- a/rust/src/log.rs
+++ b/rust/src/log.rs
@@ -19,7 +19,7 @@
use slog::{o, Drain, Logger};
use slog_async::OverflowStrategy;
-use crate::conf::{LoggingFormat, ProducerOption};
+use crate::conf::LoggingFormat;
pub(crate) fn terminal_logger() -> Logger {
let decorator = slog_term::TermDecorator::new().build();
@@ -47,8 +47,8 @@
Logger::root(drain, o!())
}
-pub(crate) fn logger(_option: &ProducerOption) -> Logger {
- match _option.logging_format() {
+pub(crate) fn logger(logging_format: &LoggingFormat) -> Logger {
+ match logging_format {
LoggingFormat::Terminal => terminal_logger(),
LoggingFormat::Json => json_logger("logs/rocketmq_client.log"),
}
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 2082c28..9f1e947 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -24,8 +24,16 @@
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};
+#[derive(Debug, Clone)]
+pub(crate) enum ClientType {
+ Producer = 1,
+ PushConsumer = 2,
+ SimpleConsumer = 3,
+ PullConsumer = 4,
+}
+
#[derive(Debug)]
-pub struct Route {
+pub(crate) struct Route {
pub(crate) index: AtomicUsize,
pub queue: Vec<MessageQueue>,
}
@@ -36,8 +44,8 @@
Found(Arc<Route>),
}
-#[derive(Debug)]
-pub(crate) struct Endpoints {
+#[derive(Debug, Clone)]
+pub struct Endpoints {
endpoint_url: String,
scheme: AddressScheme,
inner: pb::Endpoints,
@@ -53,7 +61,7 @@
if endpoint_url.is_empty() {
return Err(ClientError::new(
ErrorKind::Config,
- "Endpoint url is empty.",
+ "endpoint url is empty",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
@@ -66,7 +74,7 @@
let port_i32 = port.parse::<i32>().map_err(|e| {
ClientError::new(
ErrorKind::Config,
- &format!("Port {} in endpoint url is invalid.", port),
+ &format!("port {} in endpoint url is invalid", port),
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url)
@@ -76,7 +84,7 @@
} else {
return Err(ClientError::new(
ErrorKind::Config,
- "Port in endpoint url is missing.",
+ "port in endpoint url is missing",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
@@ -93,7 +101,7 @@
if scheme == AddressScheme::IPv6 {
return Err(ClientError::new(
ErrorKind::Config,
- "Multiple addresses not in the same schema.",
+ "multiple addresses not in the same schema",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
@@ -104,7 +112,7 @@
if scheme == AddressScheme::IPv4 {
return Err(ClientError::new(
ErrorKind::Config,
- "Multiple addresses not in the same schema.",
+ "multiple addresses not in the same schema",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
@@ -118,7 +126,7 @@
if urls_len > 1 {
return Err(ClientError::new(
ErrorKind::Config,
- "Multiple addresses not allowed in domain schema.",
+ "multiple addresses not allowed in domain schema",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
@@ -169,6 +177,27 @@
}
}
+#[derive(Clone, Copy)]
+#[repr(i32)]
+pub enum FilterType {
+ Tag = 1,
+ Sql = 2,
+}
+
+pub struct FilterExpression {
+ pub(crate) filter_type: FilterType,
+ pub(crate) expression: String,
+}
+
+impl FilterExpression {
+ pub fn new(filter_type: FilterType, expression: impl Into<String>) -> Self {
+ FilterExpression {
+ filter_type,
+ expression: expression.into(),
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::error::ErrorKind;
@@ -214,31 +243,31 @@
let err = Endpoints::from_url("").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
- assert_eq!(err.message, "Endpoint url is empty.");
+ assert_eq!(err.message, "endpoint url is empty");
let err = Endpoints::from_url("localhost:<port>").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
- assert_eq!(err.message, "Port <port> in endpoint url is invalid.");
+ assert_eq!(err.message, "port <port> in endpoint url is invalid");
let err = Endpoints::from_url("localhost").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
- assert_eq!(err.message, "Port in endpoint url is missing.");
+ assert_eq!(err.message, "port in endpoint url is missing");
let err = Endpoints::from_url("127.0.0.1:8080,::1:8080")
.err()
.unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
- assert_eq!(err.message, "Multiple addresses not in the same schema.");
+ assert_eq!(err.message, "multiple addresses not in the same schema");
let err = Endpoints::from_url("::1:8080,127.0.0.1:8080")
.err()
.unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
- assert_eq!(err.message, "Multiple addresses not in the same schema.");
+ assert_eq!(err.message, "multiple addresses not in the same schema");
let err = Endpoints::from_url("localhost:8080,localhost:8081")
.err()
@@ -247,7 +276,7 @@
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(
err.message,
- "Multiple addresses not allowed in domain schema."
+ "multiple addresses not allowed in domain schema"
);
}
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 17584d0..259a484 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -15,7 +15,9 @@
* limitations under the License.
*/
use crate::error::{ClientError, ErrorKind};
+use crate::model::common::Endpoints;
use crate::model::message_id::UNIQ_ID_GENERATOR;
+use crate::pb;
use std::collections::HashMap;
pub trait Message {
@@ -133,8 +135,8 @@
impl MessageBuilder {
const OPERATION_BUILD_MESSAGE: &'static str = "build_message";
- pub fn set_topic(mut self, topic: String) -> Self {
- self.message.topic = topic;
+ pub fn set_topic(mut self, topic: impl Into<String>) -> Self {
+ self.message.topic = topic.into();
self
}
@@ -143,23 +145,31 @@
self
}
- pub fn set_tags(mut self, tags: String) -> Self {
- self.message.tags = Some(tags);
+ pub fn set_tags(mut self, tags: impl Into<String>) -> Self {
+ self.message.tags = Some(tags.into());
self
}
- pub fn set_keys(mut self, keys: Vec<String>) -> Self {
- self.message.keys = Some(keys);
+ pub fn set_keys(mut self, keys: Vec<impl Into<String>>) -> Self {
+ self.message.keys = Some(keys.into_iter().map(|k| k.into()).collect());
self
}
- pub fn set_properties(mut self, properties: HashMap<String, String>) -> Self {
- self.message.properties = Some(properties);
+ pub fn set_properties(
+ mut self,
+ properties: HashMap<impl Into<String>, impl Into<String>>,
+ ) -> Self {
+ self.message.properties = Some(
+ properties
+ .into_iter()
+ .map(|(k, v)| (k.into(), v.into()))
+ .collect(),
+ );
self
}
- pub fn set_message_group(mut self, message_group: String) -> Self {
- self.message.message_group = Some(message_group);
+ pub fn set_message_group(mut self, message_group: impl Into<String>) -> Self {
+ self.message.message_group = Some(message_group.into());
self
}
@@ -191,6 +201,113 @@
}
}
+pub trait AckMessageEntry {
+ fn topic(&self) -> String;
+ fn message_id(&self) -> String;
+ fn receipt_handle(&self) -> String;
+ fn endpoints(&self) -> &Endpoints;
+}
+
+#[derive(Debug)]
+pub struct MessageView {
+ pub(crate) message_id: String,
+ pub(crate) receipt_handle: Option<String>,
+ pub(crate) topic: String,
+ pub(crate) body: Vec<u8>,
+ pub(crate) tag: Option<String>,
+ pub(crate) keys: Vec<String>,
+ pub(crate) properties: HashMap<String, String>,
+ pub(crate) message_group: Option<String>,
+ pub(crate) delivery_timestamp: Option<i64>,
+ pub(crate) born_host: String,
+ pub(crate) born_timestamp: i64,
+ pub(crate) delivery_attempt: i32,
+ pub(crate) endpoints: Endpoints,
+}
+
+impl AckMessageEntry for MessageView {
+ fn topic(&self) -> String {
+ self.topic.clone()
+ }
+
+ fn message_id(&self) -> String {
+ self.message_id.clone()
+ }
+
+ fn receipt_handle(&self) -> String {
+ self.receipt_handle.clone().unwrap()
+ }
+
+ fn endpoints(&self) -> &Endpoints {
+ &self.endpoints
+ }
+}
+
+impl MessageView {
+ pub(crate) fn from_pb_message(message: pb::Message, endpoints: Endpoints) -> Self {
+ let system_properties = message.system_properties.unwrap();
+ MessageView {
+ message_id: system_properties.message_id,
+ receipt_handle: system_properties.receipt_handle,
+ topic: message.topic.unwrap().name,
+ body: message.body,
+ tag: system_properties.tag,
+ keys: system_properties.keys,
+ properties: message.user_properties,
+ message_group: system_properties.message_group,
+ delivery_timestamp: system_properties.delivery_timestamp.map(|t| t.seconds),
+ born_host: system_properties.born_host,
+ born_timestamp: system_properties.born_timestamp.map_or(0, |t| t.seconds),
+ delivery_attempt: system_properties.delivery_attempt.unwrap_or(0),
+ endpoints,
+ }
+ }
+
+ pub fn message_id(&self) -> &str {
+ &self.message_id
+ }
+
+ pub fn topic(&self) -> &str {
+ &self.topic
+ }
+
+ pub fn body(&self) -> &[u8] {
+ &self.body
+ }
+
+ pub fn tag(&self) -> Option<&str> {
+ self.tag.as_deref()
+ }
+
+ pub fn keys(&self) -> &[String] {
+ &self.keys
+ }
+
+ pub fn properties(&self) -> &HashMap<String, String> {
+ &self.properties
+ }
+
+ pub fn message_group(&self) -> Option<&str> {
+ self.message_group.as_deref()
+ }
+
+ pub fn delivery_timestamp(&self) -> Option<i64> {
+ self.delivery_timestamp
+ }
+
+ pub fn born_host(&self) -> &str {
+ &self.born_host
+ }
+
+ pub fn born_timestamp(&self) -> i64 {
+ self.born_timestamp
+ }
+
+ pub fn delivery_attempt(&self) -> i32 {
+ self.delivery_attempt
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/rust/src/model/mod.rs b/rust/src/model/mod.rs
index 1a873d6..aef3261 100644
--- a/rust/src/model/mod.rs
+++ b/rust/src/model/mod.rs
@@ -14,6 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub(crate) mod common;
-pub(crate) mod message;
+pub mod common;
+pub mod message;
pub(crate) mod message_id;
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index e288a5c..dda1e69 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -20,21 +20,21 @@
//! `Producer` is a thin wrapper of internal `Client` struct that shoulders the actual workloads.
//! Most of its methods take shared reference so that application developers may use it at will.
-use std::hash::Hasher;
-use std::sync::atomic::Ordering;
-use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use prost_types::Timestamp;
-use siphasher::sip::SipHasher24;
use slog::{info, Logger};
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{Endpoints, Route};
+use crate::model::common::ClientType;
use crate::model::message;
-use crate::pb::{Encoding, MessageQueue, Resource, SendResultEntry, SystemProperties};
+use crate::pb::{Encoding, Resource, SendResultEntry, SystemProperties};
+use crate::util::{
+ build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
+ select_message_queue_by_message_group, HOST_NAME,
+};
use crate::{log, pb};
/// `Producer` is the core struct, to which application developers should turn, when publishing messages to brokers.
@@ -47,22 +47,18 @@
client: Client,
}
-lazy_static::lazy_static! {
- static ref HOST_NAME: String = match hostname::get() {
- Ok(name) => name.to_str().unwrap_or("localhost").to_string(),
- Err(_) => "localhost".to_string(),
- };
-}
-
impl Producer {
const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
- pub async fn new(
- option: ProducerOption,
- client_option: ClientOption,
- ) -> Result<Self, ClientError> {
- let logger = log::logger(&option);
- let client = Client::new(&logger, client_option)?;
+ pub fn new(option: ProducerOption, client_option: ClientOption) -> Result<Self, ClientError> {
+ let client_option = ClientOption {
+ client_type: ClientType::Producer,
+ namespace: option.namespace().to_string(),
+ ..client_option
+ };
+ let logger = log::logger(option.logging_format());
+ let settings = build_producer_settings(&option, &client_option);
+ let client = Client::new(&logger, client_option, settings)?;
Ok(Producer {
option,
logger,
@@ -76,6 +72,7 @@
self.client.topic_route(topic, true).await?;
}
}
+ self.client.start();
info!(
self.logger,
"start producer success, client_id: {}",
@@ -91,7 +88,7 @@
if messages.is_empty() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
- "No message found.",
+ "no message found",
Self::OPERATION_SEND_MESSAGE,
));
}
@@ -126,7 +123,7 @@
if last_message_group.ne(&message_group) {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
- "Not all messages have the same message group.",
+ "not all messages have the same message group",
Self::OPERATION_SEND_MESSAGE,
));
}
@@ -165,7 +162,7 @@
if topic.is_empty() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
- "Message topic is empty.",
+ "message topic is empty",
Self::OPERATION_SEND_MESSAGE,
));
}
@@ -191,54 +188,18 @@
let route = self.client.topic_route(&topic, true).await?;
let message_queue = if let Some(message_group) = message_group {
- self.select_message_queue_by_message_group(route, message_group)
+ select_message_queue_by_message_group(route, message_group)
} else {
- self.select_message_queue(route)
+ select_message_queue(route)
};
- if message_queue.broker.is_none() {
- return Err(ClientError::new(
- ErrorKind::NoBrokerAvailable,
- "Message queue do not have a available endpoint.",
- Self::OPERATION_SEND_MESSAGE,
- )
- .with_context("message_queue", format!("{:?}", message_queue)));
- }
-
- let broker = message_queue.broker.unwrap();
- if broker.endpoints.is_none() {
- return Err(ClientError::new(
- ErrorKind::NoBrokerAvailable,
- "Message queue do not have a available endpoint.",
- Self::OPERATION_SEND_MESSAGE,
- )
- .with_context("broker", broker.name)
- .with_context("topic", topic)
- .with_context("queue_id", message_queue.id.to_string()));
- }
-
- let endpoints = Endpoints::from_pb_endpoints(broker.endpoints.unwrap());
+ let endpoints =
+ build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?;
for message in pb_messages.iter_mut() {
message.system_properties.as_mut().unwrap().queue_id = message_queue.id;
}
- self.client.send_message(pb_messages, &endpoints).await
- }
-
- fn select_message_queue(&self, route: Arc<Route>) -> MessageQueue {
- let i = route.index.fetch_add(1, Ordering::Relaxed);
- route.queue[i % route.queue.len()].clone()
- }
-
- fn select_message_queue_by_message_group(
- &self,
- route: Arc<Route>,
- message_group: String,
- ) -> MessageQueue {
- let mut sip_hasher24 = SipHasher24::default();
- sip_hasher24.write(message_group.as_bytes());
- let index = sip_hasher24.finish() % route.queue.len() as u64;
- route.queue[index as usize].clone()
+ self.client.send_message(&endpoints, pb_messages).await
}
}
@@ -261,19 +222,13 @@
#[tokio::test]
async fn producer_transform_messages_to_protobuf() {
- let producer = Producer::new(ProducerOption::default(), ClientOption::default())
- .await
- .unwrap();
+ let producer = Producer::new(ProducerOption::default(), ClientOption::default()).unwrap();
let messages = vec![MessageImpl::builder()
- .set_topic("DefaultCluster".to_string())
+ .set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
- .set_tags("tag".to_string())
- .set_keys(vec!["key".to_string()])
- .set_properties(
- vec![("key".to_string(), "value".to_string())]
- .into_iter()
- .collect(),
- )
+ .set_tags("tag")
+ .set_keys(vec!["key"])
+ .set_properties(vec![("key", "value")].into_iter().collect())
.set_message_group("message_group".to_string())
.build()
.unwrap()];
@@ -298,16 +253,14 @@
#[tokio::test]
async fn producer_transform_messages_to_protobuf_failed() {
- let producer = Producer::new(ProducerOption::default(), ClientOption::default())
- .await
- .unwrap();
+ let producer = Producer::new(ProducerOption::default(), ClientOption::default()).unwrap();
let messages: Vec<MessageImpl> = vec![];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
- assert_eq!(err.message, "No message found.");
+ assert_eq!(err.message, "no message found");
let messages = vec![MessageImpl {
message_id: "".to_string(),
@@ -323,16 +276,16 @@
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
- assert_eq!(err.message, "Message topic is empty.");
+ assert_eq!(err.message, "message topic is empty");
let messages = vec![
MessageImpl::builder()
- .set_topic("DefaultCluster".to_string())
+ .set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap(),
MessageImpl::builder()
- .set_topic("DefaultCluster_dup".to_string())
+ .set_topic("DefaultCluster_dup")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap(),
@@ -345,15 +298,15 @@
let messages = vec![
MessageImpl::builder()
- .set_topic("DefaultCluster".to_string())
+ .set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
- .set_message_group("message_group".to_string())
+ .set_message_group("message_group")
.build()
.unwrap(),
MessageImpl::builder()
- .set_topic("DefaultCluster".to_string())
+ .set_topic("DefaultCluster")
.set_body("hello world".as_bytes().to_vec())
- .set_message_group("message_group_dup".to_string())
+ .set_message_group("message_group_dup")
.build()
.unwrap(),
];
@@ -361,6 +314,6 @@
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
- assert_eq!(err.message, "Not all messages have the same message group.");
+ assert_eq!(err.message, "not all messages have the same message group");
}
}
diff --git a/rust/src/session.rs b/rust/src/session.rs
index b84469e..fcb212a 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,31 +18,56 @@
use async_trait::async_trait;
use mockall::automock;
-use slog::{info, o, Logger};
-use tokio::sync::Mutex;
+use slog::{debug, error, info, o, Logger};
+use tokio::sync::{mpsc, Mutex};
+use tokio_stream::wrappers::ReceiverStream;
+use tokio_stream::StreamExt;
use tonic::metadata::AsciiMetadataValue;
use tonic::transport::{Channel, Endpoint};
use crate::conf::ClientOption;
use crate::error::ErrorKind;
use crate::model::common::Endpoints;
-use crate::pb::{QueryRouteRequest, QueryRouteResponse, SendMessageRequest, SendMessageResponse};
+use crate::pb::{
+ AckMessageRequest, AckMessageResponse, HeartbeatRequest, HeartbeatResponse, QueryRouteRequest,
+ QueryRouteResponse, ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest,
+ SendMessageResponse, TelemetryCommand,
+};
+use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
#[async_trait]
#[automock]
pub(crate) trait RPCClient {
+ const OPERATION_START: &'static str = "session.start";
+ const OPERATION_UPDATE_SETTINGS: &'static str = "session.update_settings";
+
const OPERATION_QUERY_ROUTE: &'static str = "rpc.query_route";
+ const OPERATION_HEARTBEAT: &'static str = "rpc.heartbeat";
const OPERATION_SEND_MESSAGE: &'static str = "rpc.send_message";
+ const OPERATION_RECEIVE_MESSAGE: &'static str = "rpc.receive_message";
+ const OPERATION_ACK_MESSAGE: &'static str = "rpc.ack_message";
async fn query_route(
&mut self,
request: QueryRouteRequest,
) -> Result<QueryRouteResponse, ClientError>;
+ async fn heartbeat(
+ &mut self,
+ request: HeartbeatRequest,
+ ) -> Result<HeartbeatResponse, ClientError>;
async fn send_message(
&mut self,
request: SendMessageRequest,
) -> Result<SendMessageResponse, ClientError>;
+ async fn receive_message(
+ &mut self,
+ request: ReceiveMessageRequest,
+ ) -> Result<Vec<ReceiveMessageResponse>, ClientError>;
+ async fn ack_message(
+ &mut self,
+ request: AckMessageRequest,
+ ) -> Result<AckMessageResponse, ClientError>;
}
#[allow(dead_code)]
@@ -50,7 +75,10 @@
pub(crate) struct Session {
logger: Logger,
client_id: String,
+ option: ClientOption,
+ endpoints: Endpoints,
stub: MessagingServiceClient<Channel>,
+ telemetry_tx: Box<Option<mpsc::Sender<TelemetryCommand>>>,
}
impl Session {
@@ -75,7 +103,7 @@
if channel_endpoints.is_empty() {
return Err(ClientError::new(
ErrorKind::Connect,
- "No endpoint available.",
+ "no endpoint available",
Self::OPERATION_CREATE,
)
.with_context("peer", peer.clone()));
@@ -85,7 +113,7 @@
channel_endpoints[0].connect().await.map_err(|e| {
ClientError::new(
ErrorKind::Connect,
- "Failed to connect to peer.",
+ "failed to connect to peer",
Self::OPERATION_CREATE,
)
.set_source(e)
@@ -97,16 +125,16 @@
let stub = MessagingServiceClient::new(channel);
- info!(
- logger,
- "create session success, peer={}",
- endpoints.endpoint_url()
- );
+ let logger = logger.new(o!("peer" => peer.clone()));
+ info!(logger, "create session success");
Ok(Session {
- logger: logger.new(o!("component" => "session", "peer" => peer.clone())),
+ logger,
+ option: option.clone(),
+ endpoints: endpoints.clone(),
client_id,
stub,
+ telemetry_tx: Box::new(None),
})
}
@@ -124,7 +152,7 @@
.map_err(|e| {
ClientError::new(
ErrorKind::Connect,
- "Failed to create channel endpoint.",
+ "failed to create channel endpoint",
Self::OPERATION_CREATE,
)
.set_source(e)
@@ -141,24 +169,111 @@
// .set_source(e)
// .with_context("peer", &peer_addr)
// })?
- .connect_timeout(std::time::Duration::from_secs(3))
+ .connect_timeout(option.timeout)
.tcp_nodelay(true);
Ok(endpoint)
}
- fn sign(&self, metadata: &mut tonic::metadata::MetadataMap) {
+ pub(crate) fn peer(&self) -> &str {
+ self.endpoints.endpoint_url()
+ }
+
+ fn sign<T>(&self, mut request: tonic::Request<T>) -> tonic::Request<T> {
+ let metadata = request.metadata_mut();
let _ = AsciiMetadataValue::try_from(&self.client_id)
.map(|v| metadata.insert("x-mq-client-id", v));
- metadata.insert("x-mq-language", AsciiMetadataValue::from_static("RUST"));
+ metadata.insert(
+ "x-mq-language",
+ AsciiMetadataValue::from_static(SDK_LANGUAGE.as_str_name()),
+ );
metadata.insert(
"x-mq-client-version",
- AsciiMetadataValue::from_static("5.0.0"),
+ AsciiMetadataValue::from_static(SDK_VERSION),
);
metadata.insert(
"x-mq-protocol-version",
- AsciiMetadataValue::from_static("2.0.0"),
+ AsciiMetadataValue::from_static(PROTOCOL_VERSION),
);
+
+ request.set_timeout(*self.option.timeout());
+ request
+ }
+
+ pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> {
+ let (tx, rx) = mpsc::channel(16);
+ tx.send(settings).await.map_err(|e| {
+ ClientError::new(
+ ErrorKind::ChannelSend,
+ "failed to send telemetry command",
+ Self::OPERATION_START,
+ )
+ .set_source(e)
+ })?;
+
+ let request = self.sign(tonic::Request::new(ReceiverStream::new(rx)));
+ let response = self.stub.telemetry(request).await.map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc telemetry failed",
+ Self::OPERATION_START,
+ )
+ .set_source(e)
+ })?;
+
+ let logger = self.logger.clone();
+ tokio::spawn(async move {
+ let mut stream = response.into_inner();
+ loop {
+ // TODO handle server stream
+ match stream.message().await {
+ Ok(Some(item)) => {
+ debug!(logger, "telemetry command: {:?}", item);
+ }
+ Ok(None) => {
+ debug!(logger, "request stream closed");
+ break;
+ }
+ Err(e) => {
+ error!(logger, "telemetry response error: {:?}", e);
+ }
+ }
+ }
+ });
+ let _ = self.telemetry_tx.insert(tx);
+ debug!(self.logger, "start session success");
+ Ok(())
+ }
+
+ #[allow(dead_code)]
+ pub(crate) fn is_started(&self) -> bool {
+ self.telemetry_tx.is_some()
+ }
+
+ #[allow(dead_code)]
+ pub(crate) async fn update_settings(
+ &mut self,
+ settings: TelemetryCommand,
+ ) -> Result<(), ClientError> {
+ if !self.is_started() {
+ return Err(ClientError::new(
+ ErrorKind::ClientIsNotRunning,
+ "session is not started",
+ Self::OPERATION_UPDATE_SETTINGS,
+ ));
+ }
+
+ if let Some(tx) = self.telemetry_tx.as_ref() {
+ tx.send(settings).await.map_err(|e| {
+ ClientError::new(
+ ErrorKind::ChannelSend,
+ "failed to send telemetry command",
+ Self::OPERATION_UPDATE_SETTINGS,
+ )
+ .set_source(e)
+ })?;
+ }
+ Ok(())
}
}
@@ -168,12 +283,11 @@
&mut self,
request: QueryRouteRequest,
) -> Result<QueryRouteResponse, ClientError> {
- let mut request = tonic::Request::new(request);
- self.sign(request.metadata_mut());
+ let request = self.sign(tonic::Request::new(request));
let response = self.stub.query_route(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
- "Query topic route rpc failed.",
+ "send rpc query_route failed",
Self::OPERATION_QUERY_ROUTE,
)
.set_source(e)
@@ -181,22 +295,89 @@
Ok(response.into_inner())
}
+ async fn heartbeat(
+ &mut self,
+ request: HeartbeatRequest,
+ ) -> Result<HeartbeatResponse, ClientError> {
+ let request = self.sign(tonic::Request::new(request));
+ let response = self.stub.heartbeat(request).await.map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc heartbeat failed",
+ Self::OPERATION_HEARTBEAT,
+ )
+ .set_source(e)
+ })?;
+ Ok(response.into_inner())
+ }
+
async fn send_message(
&mut self,
request: SendMessageRequest,
) -> Result<SendMessageResponse, ClientError> {
- let mut request = tonic::Request::new(request);
- self.sign(request.metadata_mut());
+ let request = self.sign(tonic::Request::new(request));
let response = self.stub.send_message(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
- "Send message rpc failed.",
+ "send rpc send_message failed",
Self::OPERATION_SEND_MESSAGE,
)
.set_source(e)
})?;
Ok(response.into_inner())
}
+
+ async fn receive_message(
+ &mut self,
+ request: ReceiveMessageRequest,
+ ) -> Result<Vec<ReceiveMessageResponse>, ClientError> {
+ let batch_size = request.batch_size;
+ let mut request = self.sign(tonic::Request::new(request));
+ request.set_timeout(*self.option.long_polling_timeout());
+ let mut stream = self
+ .stub
+ .receive_message(request)
+ .await
+ .map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc receive_message failed",
+ Self::OPERATION_RECEIVE_MESSAGE,
+ )
+ .set_source(e)
+ })?
+ .into_inner();
+
+ let mut responses = Vec::with_capacity(batch_size as usize);
+ while let Some(item) = stream.next().await {
+ let response = item.map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "receive message failed: error in reading stream",
+ Self::OPERATION_RECEIVE_MESSAGE,
+ )
+ .set_source(e)
+ })?;
+ responses.push(response);
+ }
+ Ok(responses)
+ }
+
+ async fn ack_message(
+ &mut self,
+ request: AckMessageRequest,
+ ) -> Result<AckMessageResponse, ClientError> {
+ let request = self.sign(tonic::Request::new(request));
+ let response = self.stub.ack_message(request).await.map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc ack_message failed",
+ Self::OPERATION_ACK_MESSAGE,
+ )
+ .set_source(e)
+ })?;
+ Ok(response.into_inner())
+ }
}
#[derive(Debug)]
@@ -209,7 +390,7 @@
impl SessionManager {
pub(crate) fn new(logger: &Logger, client_id: String, option: &ClientOption) -> Self {
- let logger = logger.new(o!("component" => "session_manager"));
+ let logger = logger.new(o!("component" => "session"));
let session_map = Mutex::new(HashMap::new());
SessionManager {
logger,
@@ -219,31 +400,48 @@
}
}
- pub(crate) async fn get_session(&self, endpoints: &Endpoints) -> Result<Session, ClientError> {
+ pub(crate) async fn get_or_create_session(
+ &self,
+ endpoints: &Endpoints,
+ settings: TelemetryCommand,
+ ) -> Result<Session, ClientError> {
let mut session_map = self.session_map.lock().await;
let endpoint_url = endpoints.endpoint_url().to_string();
return if session_map.contains_key(&endpoint_url) {
Ok(session_map.get(&endpoint_url).unwrap().clone())
} else {
- let session = Session::new(
+ let mut session = Session::new(
&self.logger,
endpoints,
self.client_id.clone(),
&self.option,
)
.await?;
+ session.start(settings).await?;
session_map.insert(endpoint_url.clone(), session.clone());
Ok(session)
};
}
+
+ pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, ClientError> {
+ let session_map = self.session_map.lock().await;
+ let mut sessions = Vec::new();
+ for (_, session) in session_map.iter() {
+ sessions.push(session.clone());
+ }
+ Ok(sessions)
+ }
}
#[cfg(test)]
mod tests {
- use crate::log::terminal_logger;
+ use crate::conf::ProducerOption;
use slog::debug;
use wiremock_grpc::generate;
+ use crate::log::terminal_logger;
+ use crate::util::build_producer_settings;
+
use super::*;
generate!("apache.rocketmq.v2", RocketMQMockServer);
@@ -260,6 +458,7 @@
)
.await;
debug!(logger, "session: {:?}", session);
+ assert!(session.is_ok());
}
#[tokio::test]
@@ -273,17 +472,61 @@
)
.await;
debug!(logger, "session: {:?}", session);
+ assert!(session.is_ok());
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn session_start() {
+ let mut server = RocketMQMockServer::start_default().await;
+ server.setup(
+ MockBuilder::when()
+ // 👇 RPC prefix
+ .path("/apache.rocketmq.v2.MessagingService/Telemetry")
+ .then()
+ .return_status(Code::Ok),
+ );
+
+ let logger = terminal_logger();
+ let session = Session::new(
+ &logger,
+ &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
+ "test_client".to_string(),
+ &ClientOption::default(),
+ )
+ .await;
+ debug!(logger, "session: {:?}", session);
+ assert!(session.is_ok());
+
+ let mut session = session.unwrap();
+
+ let result = session
+ .start(build_producer_settings(
+ &ProducerOption::default(),
+ &ClientOption::default(),
+ ))
+ .await;
+ assert!(result.is_ok());
+ assert!(session.is_started());
}
#[tokio::test]
async fn session_manager_new() {
- let server = RocketMQMockServer::start_default().await;
+ let mut server = RocketMQMockServer::start_default().await;
+ server.setup(
+ MockBuilder::when()
+ // 👇 RPC prefix
+ .path("/apache.rocketmq.v2.MessagingService/Telemetry")
+ .then()
+ .return_status(Code::Ok),
+ );
+
let logger = terminal_logger();
let session_manager =
SessionManager::new(&logger, "test_client".to_string(), &ClientOption::default());
let session = session_manager
- .get_session(
+ .get_or_create_session(
&Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
+ build_producer_settings(&ProducerOption::default(), &ClientOption::default()),
)
.await
.unwrap();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
new file mode 100644
index 0000000..e44f7cd
--- /dev/null
+++ b/rust/src/simple_consumer.rs
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::time::Duration;
+
+use slog::{info, Logger};
+
+use crate::client::Client;
+use crate::conf::{ClientOption, SimpleConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, FilterExpression};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::util::{
+ build_endpoints_by_message_queue, build_simple_consumer_settings, select_message_queue,
+};
+use crate::{log, pb};
+
+#[derive(Debug)]
+pub struct SimpleConsumer {
+ option: SimpleConsumerOption,
+ logger: Logger,
+ client: Client,
+}
+
+impl SimpleConsumer {
+ const OPERATION_START_SIMPLE_CONSUMER: &'static str = "simple_consumer.start";
+ const OPERATION_RECEIVE_MESSAGE: &'static str = "simple_consumer.receive_message";
+
+ pub fn new(
+ option: SimpleConsumerOption,
+ client_option: ClientOption,
+ ) -> Result<Self, ClientError> {
+ let client_option = ClientOption {
+ client_type: ClientType::SimpleConsumer,
+ group: option.consumer_group().to_string(),
+ namespace: option.namespace().to_string(),
+ ..client_option
+ };
+ let logger = log::logger(option.logging_format());
+ let settings = build_simple_consumer_settings(&option, &client_option);
+ let client = Client::new(&logger, client_option, settings)?;
+ Ok(SimpleConsumer {
+ option,
+ logger,
+ client,
+ })
+ }
+
+ pub async fn start(&self) -> Result<(), ClientError> {
+ if self.option.consumer_group().is_empty() {
+ return Err(ClientError::new(
+ ErrorKind::Config,
+ "required option is missing: consumer group is empty",
+ Self::OPERATION_START_SIMPLE_CONSUMER,
+ ));
+ }
+ if let Some(topics) = self.option.topics() {
+ for topic in topics {
+ self.client.topic_route(topic, true).await?;
+ }
+ }
+ self.client.start();
+ info!(
+ self.logger,
+ "start simple consumer success, client_id: {}",
+ self.client.client_id()
+ );
+ Ok(())
+ }
+
+ pub async fn receive(
+ &self,
+ topic: impl AsRef<str>,
+ expression: &FilterExpression,
+ ) -> Result<Vec<MessageView>, ClientError> {
+ self.receive_with_batch_size(topic.as_ref(), expression, 32, Duration::from_secs(15))
+ .await
+ }
+
+ pub async fn receive_with_batch_size(
+ &self,
+ topic: &str,
+ expression: &FilterExpression,
+ batch_size: i32,
+ invisible_duration: Duration,
+ ) -> Result<Vec<MessageView>, ClientError> {
+ let route = self.client.topic_route(topic, true).await?;
+ let message_queue = select_message_queue(route);
+ let endpoints =
+ build_endpoints_by_message_queue(&message_queue, Self::OPERATION_RECEIVE_MESSAGE)?;
+ let messages = self
+ .client
+ .receive_message(
+ &endpoints,
+ message_queue,
+ pb::FilterExpression {
+ r#type: expression.filter_type as i32,
+ expression: expression.expression.clone(),
+ },
+ batch_size,
+ prost_types::Duration::try_from(invisible_duration).unwrap(),
+ )
+ .await?;
+ Ok(messages
+ .into_iter()
+ .map(|message| MessageView::from_pb_message(message, endpoints.clone()))
+ .collect())
+ }
+
+ pub async fn ack(&self, ack_entry: impl AckMessageEntry) -> Result<(), ClientError> {
+ self.client.ack_message(ack_entry).await?;
+ Ok(())
+ }
+}
diff --git a/rust/src/util.rs b/rust/src/util.rs
new file mode 100644
index 0000000..94e4984
--- /dev/null
+++ b/rust/src/util.rs
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::hash::Hasher;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+use crate::conf::{ClientOption, ProducerOption, SimpleConsumerOption};
+use siphasher::sip::SipHasher24;
+
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{Endpoints, Route};
+use crate::pb::settings::PubSub;
+use crate::pb::telemetry_command::Command;
+use crate::pb::{
+ Language, MessageQueue, Publishing, Resource, Settings, Subscription, TelemetryCommand, Ua,
+};
+
+pub(crate) static SDK_LANGUAGE: Language = Language::Rust;
+pub(crate) static SDK_VERSION: &str = "5.0.0";
+pub(crate) static PROTOCOL_VERSION: &str = "2.0.0";
+
+lazy_static::lazy_static! {
+ pub(crate) static ref HOST_NAME: String = match hostname::get() {
+ Ok(name) => name.to_str().unwrap_or("localhost").to_string(),
+ Err(_) => "localhost".to_string(),
+ };
+}
+
+pub(crate) fn select_message_queue(route: Arc<Route>) -> MessageQueue {
+ let i = route.index.fetch_add(1, Ordering::Relaxed);
+ route.queue[i % route.queue.len()].clone()
+}
+
+pub(crate) fn select_message_queue_by_message_group(
+ route: Arc<Route>,
+ message_group: String,
+) -> MessageQueue {
+ let mut sip_hasher24 = SipHasher24::default();
+ sip_hasher24.write(message_group.as_bytes());
+ let index = sip_hasher24.finish() % route.queue.len() as u64;
+ route.queue[index as usize].clone()
+}
+
+pub(crate) fn build_endpoints_by_message_queue(
+ message_queue: &MessageQueue,
+ operation: &'static str,
+) -> Result<Endpoints, ClientError> {
+ let topic = message_queue.topic.clone().unwrap().name;
+ if message_queue.broker.is_none() {
+ return Err(ClientError::new(
+ ErrorKind::NoBrokerAvailable,
+ "message queue do not have a available endpoint",
+ operation,
+ )
+ .with_context("message_queue", format!("{:?}", message_queue)));
+ }
+
+ let broker = message_queue.broker.clone().unwrap();
+ if broker.endpoints.is_none() {
+ return Err(ClientError::new(
+ ErrorKind::NoBrokerAvailable,
+ "message queue do not have a available endpoint",
+ operation,
+ )
+ .with_context("broker", broker.name)
+ .with_context("topic", topic)
+ .with_context("queue_id", message_queue.id.to_string()));
+ }
+
+ Ok(Endpoints::from_pb_endpoints(broker.endpoints.unwrap()))
+}
+
+pub(crate) fn build_producer_settings(
+ option: &ProducerOption,
+ client_options: &ClientOption,
+) -> TelemetryCommand {
+ let topics = option
+ .topics()
+ .clone()
+ .unwrap_or(vec![])
+ .iter()
+ .map(|topic| Resource {
+ name: topic.to_string(),
+ resource_namespace: option.namespace().to_string(),
+ })
+ .collect();
+ let platform = os_type::current_platform();
+ TelemetryCommand {
+ command: Some(Command::Settings(Settings {
+ client_type: Some(client_options.client_type.clone() as i32),
+ request_timeout: Some(prost_types::Duration {
+ seconds: client_options.timeout().as_secs() as i64,
+ nanos: client_options.timeout().subsec_nanos() as i32,
+ }),
+ pub_sub: Some(PubSub::Publishing(Publishing {
+ topics,
+ validate_message_type: option.validate_message_type(),
+ ..Publishing::default()
+ })),
+ user_agent: Some(Ua {
+ language: SDK_LANGUAGE as i32,
+ version: SDK_VERSION.to_string(),
+ platform: format!("{:?} {}", platform.os_type, platform.version),
+ hostname: HOST_NAME.clone(),
+ }),
+ ..Settings::default()
+ })),
+ ..TelemetryCommand::default()
+ }
+}
+
+pub(crate) fn build_simple_consumer_settings(
+ option: &SimpleConsumerOption,
+ client_option: &ClientOption,
+) -> TelemetryCommand {
+ let platform = os_type::current_platform();
+ TelemetryCommand {
+ command: Some(Command::Settings(Settings {
+ client_type: Some(client_option.client_type.clone() as i32),
+ request_timeout: Some(prost_types::Duration {
+ seconds: client_option.timeout().as_secs() as i64,
+ nanos: client_option.timeout().subsec_nanos() as i32,
+ }),
+ pub_sub: Some(PubSub::Subscription(Subscription {
+ group: Some(Resource {
+ name: option.consumer_group().to_string(),
+ resource_namespace: option.namespace().to_string(),
+ }),
+ subscriptions: vec![],
+ fifo: Some(false),
+ receive_batch_size: None,
+ long_polling_timeout: Some(prost_types::Duration {
+ seconds: client_option.long_polling_timeout().as_secs() as i64,
+ nanos: client_option.long_polling_timeout().subsec_nanos() as i32,
+ }),
+ })),
+ user_agent: Some(Ua {
+ language: SDK_LANGUAGE as i32,
+ version: SDK_VERSION.to_string(),
+ platform: format!("{:?} {}", platform.os_type, platform.version),
+ hostname: HOST_NAME.clone(),
+ }),
+ ..Settings::default()
+ })),
+ ..TelemetryCommand::default()
+ }
+}