feat(rust): enhance producer and add example (#471)
* feat(rust): enhance producer and add example
* chore(rust): fix license
* fix(rust): fix according to review comments
diff --git a/.gitignore b/.gitignore
index 1f25160..d0df1ca 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
# Intellij IDEA
.idea/
+.run/
out/
*.ipr
*.iml
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index d415495..c99ca5f 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -53,9 +53,12 @@
mockall = "0.11.4"
+siphasher = "0.3.10"
+
[build-dependencies]
tonic-build = "0.9.0"
[dev-dependencies]
wiremock-grpc = "0.0.3-alpha2"
futures = "0.3"
+awaitility = "0.3.0"
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
new file mode 100644
index 0000000..e88a3bc
--- /dev/null
+++ b/rust/examples/producer.rs
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use rocketmq::{ClientOption, MessageImpl, Producer, ProducerOption};
+
+#[tokio::main]
+async fn main() {
+ // specify which topic(s) you would like to send message to
+ // producer will prefetch topic route when start and failed fast if topic not exist
+ let mut producer_option = ProducerOption::default();
+ producer_option.set_topics(vec!["test_topic".to_string()]);
+
+ // set which rocketmq proxy to connect
+ let mut client_option = ClientOption::default();
+ client_option.set_access_url("localhost:8081".to_string());
+
+ // build and start producer
+ let producer = Producer::new(producer_option, client_option).await.unwrap();
+ producer.start().await.unwrap();
+
+ // build message
+ let message = MessageImpl::builder()
+ .set_topic("test_topic".to_string())
+ .set_tags("test_tag".to_string())
+ .set_body("hello world".as_bytes().to_vec())
+ .build()
+ .unwrap();
+
+ // send message to rocketmq proxy
+ let result = producer.send_one(message).await;
+ debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ println!(
+ "send message success, message_id={}",
+ result.unwrap().message_id
+ );
+}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 0c80d04..a60777e 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -14,15 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use std::clone::Clone;
+use std::string::ToString;
use std::{collections::HashMap, sync::atomic::AtomicUsize, sync::Arc};
use parking_lot::Mutex;
-use slog::{debug, info, o, Logger};
+use slog::{debug, error, info, o, warn, Logger};
use tokio::sync::oneshot;
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
-use crate::model::{Endpoints, Route, RouteStatus};
+use crate::model::common::{Endpoints, Route, RouteStatus};
use crate::pb::{
Code, Message, QueryRouteRequest, Resource, SendMessageRequest, SendResultEntry, Status,
};
@@ -35,7 +37,7 @@
session_manager: SessionManager,
route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
id: String,
- endpoints: Endpoints,
+ access_endpoints: Endpoints,
}
lazy_static::lazy_static! {
@@ -49,7 +51,7 @@
pub(crate) fn new(logger: &Logger, option: ClientOption) -> Result<Self, ClientError> {
let id = Self::generate_client_id();
- let endpoints = Endpoints::from_access_url(option.access_url().to_string())
+ let endpoints = Endpoints::from_url(option.access_url())
.map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
let session_manager = SessionManager::new(logger, id.clone(), &option);
Ok(Client {
@@ -58,7 +60,7 @@
session_manager,
route_table: Mutex::new(HashMap::new()),
id,
- endpoints,
+ access_endpoints: endpoints,
})
}
@@ -86,18 +88,27 @@
}
async fn get_session(&self) -> Result<Session, ClientError> {
- // TODO: support multiple endpoints
- self.session_manager.get_session(&self.endpoints).await
+ self.session_manager
+ .get_session(&self.access_endpoints)
+ .await
+ }
+
+ async fn get_session_with_endpoints(
+ &self,
+ endpoints: &Endpoints,
+ ) -> Result<Session, ClientError> {
+ self.session_manager.get_session(endpoints).await
}
fn handle_response_status(
+ &self,
status: Option<Status>,
operation: &'static str,
) -> Result<(), ClientError> {
if status.is_none() {
return Err(ClientError::new(
ErrorKind::Server,
- "Server do not return status, this may be a bug.".to_string(),
+ "Server do not return status, this may be a bug.",
operation,
));
}
@@ -105,175 +116,252 @@
let status = status.unwrap();
let status_code = Code::from_i32(status.code).unwrap();
if !status_code.eq(&Code::Ok) {
- return Err(ClientError::new(
- ErrorKind::Server,
- "Server return an error.".to_string(),
- operation,
- )
- .with_context("code", status_code.as_str_name())
- .with_context("message", status.message));
+ return Err(
+ ClientError::new(ErrorKind::Server, "Server return an error.", operation)
+ .with_context("code", status_code.as_str_name())
+ .with_context("message", status.message),
+ );
}
Ok(())
}
+ pub(crate) fn topic_route_from_cache(&self, topic: &str) -> Option<Arc<Route>> {
+ self.route_table.lock().get(topic).and_then(|route_status| {
+ if let RouteStatus::Found(route) = route_status {
+ debug!(self.logger, "get route for topic={} from cache", topic);
+ Some(Arc::clone(route))
+ } else {
+ None
+ }
+ })
+ }
+
pub(crate) async fn topic_route(
&self,
topic: &str,
lookup_cache: bool,
) -> Result<Arc<Route>, ClientError> {
- self.topic_route_inner(self.get_session().await.unwrap(), topic, lookup_cache)
+ if lookup_cache {
+ if let Some(route) = self.topic_route_from_cache(topic) {
+ return Ok(route);
+ }
+ }
+ self.topic_route_inner(self.get_session().await.unwrap(), topic)
.await
}
- pub(crate) async fn topic_route_inner(
+ async fn query_topic_route(
&self,
mut rpc_client: impl RPCClient,
topic: &str,
- lookup_cache: bool,
- ) -> Result<Arc<Route>, ClientError> {
- debug!(self.logger, "query route for topic={}", topic);
- // TODO extract function to get route from cache
- let rx = match self
- .route_table
- .lock()
- .entry(topic.to_owned())
- .or_insert_with(|| RouteStatus::Querying(Vec::new()))
- {
- RouteStatus::Found(route) => {
- if lookup_cache {
- return Ok(Arc::clone(route));
- }
- None
- }
- RouteStatus::Querying(ref mut v) => {
- if v.is_empty() {
- None
- } else {
- let (tx, rx) = oneshot::channel();
- v.push(tx);
- Some(rx)
- }
- }
- };
-
- if let Some(rx) = rx {
- return match rx.await {
- Ok(route) => route,
- Err(e) => Err(ClientError::new(
- ErrorKind::ChannelReceive,
- "Wait inflight query request failed.".to_string(),
- Self::OPERATION_QUERY_ROUTE,
- )
- .set_source(e)),
- };
- }
-
+ ) -> Result<Route, ClientError> {
let request = QueryRouteRequest {
topic: Some(Resource {
name: topic.to_owned(),
resource_namespace: self.option.name_space().to_string(),
}),
- endpoints: Some(self.endpoints.inner().clone()),
+ endpoints: Some(self.access_endpoints.inner().clone()),
};
let response = rpc_client.query_route(request).await?;
- Self::handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
+ self.handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
let route = Route {
+ index: AtomicUsize::new(0),
queue: response.message_queues,
};
- debug!(
- self.logger,
- "query route for topic={} success: route={:?}", topic, route
- );
- let route = Arc::new(route);
- let prev = self
- .route_table
- .lock()
- .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
- info!(self.logger, "update route for topic={}", topic);
-
- match prev {
- Some(RouteStatus::Found(_)) => {}
- Some(RouteStatus::Querying(mut v)) => {
- for item in v.drain(..) {
- let _ = item.send(Ok(Arc::clone(&route)));
- }
- }
- None => {}
- };
Ok(route)
}
+ async fn topic_route_inner(
+ &self,
+ rpc_client: impl RPCClient,
+ topic: &str,
+ ) -> Result<Arc<Route>, ClientError> {
+ debug!(self.logger, "query route for topic={}", topic);
+ let rx = match self
+ .route_table
+ .lock()
+ .entry(topic.to_owned())
+ .or_insert_with(|| RouteStatus::Querying(None))
+ {
+ RouteStatus::Found(_route) => None,
+ RouteStatus::Querying(ref mut option) => {
+ match option {
+ Some(vec) => {
+ // add self to waiting list
+ let (tx, rx) = oneshot::channel();
+ vec.push(tx);
+ Some(rx)
+ }
+ None => {
+ // there is no ongoing request, so we need to send a new request
+ let _ = option.insert(Vec::new());
+ None
+ }
+ }
+ }
+ };
+
+ // wait for inflight request
+ if let Some(rx) = rx {
+ return match rx.await {
+ Ok(route) => route,
+ Err(_e) => Err(ClientError::new(
+ ErrorKind::ChannelReceive,
+ "Wait for inflight query topic route request failed.",
+ Self::OPERATION_QUERY_ROUTE,
+ )),
+ };
+ }
+
+ let result = self.query_topic_route(rpc_client, topic).await;
+
+ // send result to all waiters
+ if result.is_ok() {
+ let route = result.unwrap();
+ debug!(
+ self.logger,
+ "query route for topic={} success: route={:?}", topic, route
+ );
+ let route = Arc::new(route);
+ let prev = self
+ .route_table
+ .lock()
+ .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
+ info!(self.logger, "update route for topic={}", topic);
+
+ if let Some(RouteStatus::Querying(Some(mut v))) = prev {
+ for item in v.drain(..) {
+ let _ = item.send(Ok(Arc::clone(&route)));
+ }
+ };
+ Ok(route)
+ } else {
+ let err = result.unwrap_err();
+ warn!(
+ self.logger,
+ "query route for topic={} failed: error={}", topic, err
+ );
+ let prev = self.route_table.lock().remove(topic);
+ if let Some(RouteStatus::Querying(Some(mut v))) = prev {
+ for item in v.drain(..) {
+ let _ = item.send(Err(ClientError::new(
+ ErrorKind::Server,
+ "Query route failed.",
+ Self::OPERATION_QUERY_ROUTE,
+ )));
+ }
+ };
+ Err(err)
+ }
+ }
+
pub(crate) async fn send_message(
&self,
- message: Message,
- ) -> Result<SendResultEntry, ClientError> {
- self.send_message_inner(self.get_session().await.unwrap(), message)
- .await
+ messages: Vec<Message>,
+ endpoints: &Endpoints,
+ ) -> Result<Vec<SendResultEntry>, ClientError> {
+ self.send_message_inner(
+ self.get_session_with_endpoints(endpoints).await.unwrap(),
+ messages,
+ )
+ .await
}
pub(crate) async fn send_message_inner(
&self,
mut rpc_client: impl RPCClient,
- message: Message,
- ) -> Result<SendResultEntry, ClientError> {
- if let Some(properties) = &message.system_properties {
- debug!(
- self.logger,
- "send for topic={:?} message_id={}", message.topic, properties.message_id
- );
- } else {
- return Err(ClientError::new(
- ErrorKind::ClientInternal,
- "Message do not have system properties.".to_string(),
- Self::OPERATION_SEND_MESSAGE,
- ));
- }
-
- let request = SendMessageRequest {
- messages: vec![message],
- };
+ messages: Vec<Message>,
+ ) -> Result<Vec<SendResultEntry>, ClientError> {
+ let message_count = messages.len();
+ let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
- Self::handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
+ self.handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
- let send_result = response.entries.get(0);
- match send_result {
- Some(send_result) => Ok(send_result.clone()),
- None => Err(ClientError::new(
- ErrorKind::Server,
- "Server do not return send result, this may be a bug.".to_string(),
- Self::OPERATION_SEND_MESSAGE,
- )),
+ if response.entries.len() != message_count {
+ error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
}
+
+ Ok(response.entries)
}
}
#[cfg(test)]
mod tests {
- use std::sync::atomic::Ordering;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::Arc;
+ use std::thread::sleep;
+ use std::time::Duration;
use crate::client::Client;
use crate::conf::ClientOption;
+ use crate::error::{ClientError, ErrorKind};
use crate::log::terminal_logger;
- use crate::pb::{Code, MessageQueue, QueryRouteResponse, Resource, Status};
+ use crate::model::common::Route;
+ use crate::pb::{
+ Code, MessageQueue, QueryRouteResponse, Resource, SendMessageResponse, Status,
+ };
use crate::session;
use super::CLIENT_ID_SEQUENCE;
#[test]
- fn test_client_id_sequence() {
+ fn client_id_sequence() {
let v1 = CLIENT_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed);
let v2 = CLIENT_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed);
assert!(v2 > v1, "Client ID sequence should be increasing");
}
- #[tokio::test]
- async fn client_query_route() {
- let logger = terminal_logger();
- let client = Client::new(&logger, ClientOption::default()).unwrap();
+ #[test]
+ fn handle_response_status() {
+ let client = Client::new(&terminal_logger(), ClientOption::default()).unwrap();
- let response = Ok(QueryRouteResponse {
+ let result = client.handle_response_status(None, "test");
+ assert!(result.is_err(), "should return error when status is None");
+ let result = result.unwrap_err();
+ assert_eq!(result.kind, ErrorKind::Server);
+ assert_eq!(
+ result.message,
+ "Server do not return status, this may be a bug."
+ );
+ assert_eq!(result.operation, "test");
+
+ let result = client.handle_response_status(
+ Some(Status {
+ code: Code::BadRequest as i32,
+ message: "test failed".to_string(),
+ }),
+ "test failed",
+ );
+ assert!(
+ result.is_err(),
+ "should return error when status is BadRequest"
+ );
+ let result = result.unwrap_err();
+ assert_eq!(result.kind, ErrorKind::Server);
+ assert_eq!(result.message, "Server return an error.");
+ assert_eq!(result.operation, "test failed");
+ assert_eq!(
+ result.context,
+ vec![
+ ("code", "BAD_REQUEST".to_string()),
+ ("message", "test failed".to_string())
+ ]
+ );
+
+ let result = client.handle_response_status(
+ Some(Status {
+ code: Code::Ok as i32,
+ message: "test success".to_string(),
+ }),
+ "test success",
+ );
+ assert!(result.is_ok(), "should not return error when status is Ok");
+ }
+
+ fn new_topic_route_response() -> Result<QueryRouteResponse, ClientError> {
+ Ok(QueryRouteResponse {
status: Some(Status {
code: Code::Ok as i32,
message: "Success".to_string(),
@@ -288,14 +376,34 @@
broker: None,
accept_message_types: vec![],
}],
- });
+ })
+ }
+
+ #[tokio::test]
+ async fn client_query_route_from_cache() {
+ let logger = terminal_logger();
+ let client = Client::new(&logger, ClientOption::default()).unwrap();
+ client.route_table.lock().insert(
+ "DefaultCluster".to_string(),
+ super::RouteStatus::Found(Arc::new(Route {
+ index: AtomicUsize::new(0),
+ queue: vec![],
+ })),
+ );
+ let option = client.topic_route_from_cache("DefaultCluster");
+ assert!(option.is_some());
+ }
+
+ #[tokio::test]
+ async fn client_query_route() {
+ let logger = terminal_logger();
+ let client = Client::new(&logger, ClientOption::default()).unwrap();
let mut mock = session::MockRPCClient::new();
mock.expect_query_route()
- .times(1)
- .return_once(|_| Box::pin(futures::future::ready(response)));
+ .return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));
- let result = client.topic_route_inner(mock, "DefaultCluster", true).await;
+ let result = client.topic_route_inner(mock, "DefaultCluster").await;
assert!(result.is_ok());
let route = result.unwrap();
@@ -308,4 +416,85 @@
assert_eq!(topic.name, "DefaultCluster");
assert_eq!(topic.resource_namespace, "default");
}
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn client_query_route_with_inflight_request() {
+ let logger = terminal_logger();
+ let client = Client::new(&logger, ClientOption::default()).unwrap();
+ let client = Arc::new(client);
+
+ let client_clone = client.clone();
+ tokio::spawn(async move {
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_query_route().return_once(|_| {
+ sleep(Duration::from_millis(200));
+ Box::pin(futures::future::ready(new_topic_route_response()))
+ });
+
+ let result = client_clone.topic_route_inner(mock, "DefaultCluster").await;
+ assert!(result.is_ok());
+ });
+
+ let handle = tokio::spawn(async move {
+ sleep(Duration::from_millis(100));
+ let mock = session::MockRPCClient::new();
+ let result = client.topic_route_inner(mock, "DefaultCluster").await;
+ assert!(result.is_ok());
+ });
+
+ awaitility::at_most(Duration::from_secs(1)).until(|| handle.is_finished());
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn client_query_route_with_failed_request() {
+ let logger = terminal_logger();
+ let client = Client::new(&logger, ClientOption::default()).unwrap();
+ let client = Arc::new(client);
+
+ let client_clone = client.clone();
+ tokio::spawn(async move {
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_query_route().return_once(|_| {
+ sleep(Duration::from_millis(200));
+ Box::pin(futures::future::ready(Err(ClientError::new(
+ ErrorKind::Server,
+ "Server error",
+ "test",
+ ))))
+ });
+
+ let result = client_clone.topic_route_inner(mock, "DefaultCluster").await;
+ assert!(result.is_err());
+ });
+
+ let handle = tokio::spawn(async move {
+ sleep(Duration::from_millis(100));
+ let mock = session::MockRPCClient::new();
+ let result = client.topic_route_inner(mock, "DefaultCluster").await;
+ assert!(result.is_err());
+ });
+
+ awaitility::at_most(Duration::from_secs(1)).until(|| handle.is_finished());
+ }
+
+ #[tokio::test]
+ async fn client_send_message() {
+ let response = Ok(SendMessageResponse {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "Success".to_string(),
+ }),
+ entries: vec![],
+ });
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_send_message()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+
+ let client = Client::new(&terminal_logger(), ClientOption::default()).unwrap();
+ let send_result = client.send_message_inner(mock, vec![]).await;
+ assert!(send_result.is_ok());
+
+ let send_results = send_result.unwrap();
+ assert_eq!(send_results.len(), 0);
+ }
}
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 233742a..334a1e8 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -65,6 +65,7 @@
logging_format: LoggingFormat,
prefetch_route: bool,
topics: Option<Vec<String>>,
+ namespace: String,
}
impl Default for ProducerOption {
@@ -73,6 +74,7 @@
logging_format: LoggingFormat::Terminal,
prefetch_route: true,
topics: None,
+ namespace: "".to_string(),
}
}
}
@@ -98,4 +100,11 @@
pub fn set_topics(&mut self, topics: Vec<String>) {
self.topics = Some(topics);
}
+
+ pub fn namespace(&self) -> &str {
+ &self.namespace
+ }
+ pub fn set_namespace(&mut self, name_space: String) {
+ self.namespace = name_space;
+ }
}
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 9d3af04..6ae04e3 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -26,9 +26,15 @@
#[error("Failed to create session")]
Connect,
+ #[error("Message is invalid")]
+ InvalidMessage,
+
#[error("Server error")]
Server,
+ #[error("No broker available to send message")]
+ NoBrokerAvailable,
+
#[error("Client internal error")]
ClientInternal,
@@ -53,10 +59,10 @@
impl Error for ClientError {}
impl ClientError {
- pub fn new(kind: ErrorKind, message: String, operation: &'static str) -> Self {
+ pub fn new(kind: ErrorKind, message: &str, operation: &'static str) -> Self {
Self {
kind,
- message,
+ message: message.to_string(),
operation,
context: Vec::new(),
source: None,
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 17b9bd0..4a0c165 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -22,16 +22,18 @@
mod log;
mod client;
-mod model;
#[allow(clippy::all)]
#[path = "pb/apache.rocketmq.v2.rs"]
mod pb;
mod session;
-pub(crate) mod producer;
+#[allow(dead_code)]
+mod model;
+mod producer;
// Export structs that are part of crate API.
+pub use conf::ClientOption;
+pub use conf::ProducerOption;
+pub use model::message::MessageImpl;
pub use producer::Producer;
-
-pub mod models;
diff --git a/rust/src/model.rs b/rust/src/model.rs
deleted file mode 100644
index e549b06..0000000
--- a/rust/src/model.rs
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use std::net::IpAddr;
-use std::sync::Arc;
-
-use tokio::sync::oneshot;
-
-use crate::error::{ClientError, ErrorKind};
-use crate::pb;
-use crate::pb::{Address, AddressScheme, MessageQueue};
-
-#[derive(Debug)]
-pub struct Route {
- pub queue: Vec<MessageQueue>,
-}
-
-#[derive(Debug)]
-pub(crate) enum RouteStatus {
- Querying(Vec<oneshot::Sender<Result<Arc<Route>, ClientError>>>),
- Found(Arc<Route>),
-}
-
-#[derive(Debug)]
-pub(crate) struct Endpoints {
- access_url: String,
- scheme: AddressScheme,
- inner: pb::Endpoints,
-}
-
-impl Endpoints {
- const ENDPOINT_SEPARATOR: &'static str = ",";
- const ADDRESS_SEPARATOR: &'static str = ":";
-
- pub fn from_access_url(access_url: String) -> Result<Self, ClientError> {
- if access_url.is_empty() {
- return Err(ClientError::new(
- ErrorKind::Config,
- "Access url is empty.".to_string(),
- "access_url.parse",
- )
- .with_context("access_url", access_url));
- }
-
- let mut scheme = AddressScheme::DomainName;
- let mut urls = Vec::new();
- for url in access_url.split(Self::ENDPOINT_SEPARATOR) {
- if let Some((host, port)) = url.rsplit_once(Self::ADDRESS_SEPARATOR) {
- let port_i32 = port.parse::<i32>().map_err(|e| {
- ClientError::new(
- ErrorKind::Config,
- format!("Port {} in access url is invalid.", port),
- "access_url.parse",
- )
- .with_context("access_url", access_url.clone())
- .set_source(e)
- })?;
- urls.push((host.to_string(), port_i32));
- } else {
- return Err(ClientError::new(
- ErrorKind::Config,
- "Port in access url is missing.".to_string(),
- "access_url.parse",
- )
- .with_context("access_url", access_url));
- }
- }
-
- let mut addresses = Vec::new();
- let urls_len = urls.len();
- for (host, port) in urls.into_iter() {
- match host.parse::<IpAddr>() {
- Ok(ip_addr) => {
- match ip_addr {
- IpAddr::V4(_) => {
- if scheme == AddressScheme::IPv6 {
- return Err(ClientError::new(
- ErrorKind::Config,
- "Multiple addresses not in the same schema.".to_string(),
- "access_url.parse",
- )
- .with_context("access_url", access_url));
- }
- scheme = AddressScheme::IPv4
- }
- IpAddr::V6(_) => {
- if scheme == AddressScheme::IPv4 {
- return Err(ClientError::new(
- ErrorKind::Config,
- "Multiple addresses not in the same schema.".to_string(),
- "access_url.parse",
- )
- .with_context("access_url", access_url));
- }
- scheme = AddressScheme::IPv6
- }
- }
- addresses.push(Address { host, port });
- }
- Err(_) => {
- if urls_len > 1 {
- return Err(ClientError::new(
- ErrorKind::Config,
- "Multiple addresses not allowed in domain schema.".to_string(),
- "access_url.parse",
- )
- .with_context("access_url", access_url));
- }
- scheme = AddressScheme::DomainName;
- addresses.push(Address { host, port });
- }
- }
- }
-
- Ok(Endpoints {
- access_url,
- scheme,
- inner: pb::Endpoints {
- scheme: scheme as i32,
- addresses,
- },
- })
- }
-
- pub fn access_url(&self) -> &str {
- &self.access_url
- }
-
- pub fn scheme(&self) -> AddressScheme {
- self.scheme
- }
-
- pub fn inner(&self) -> &pb::Endpoints {
- &self.inner
- }
-
- pub fn into_inner(self) -> pb::Endpoints {
- self.inner
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::error::ErrorKind;
- use crate::model::Endpoints;
- use crate::pb::AddressScheme;
-
- #[test]
- fn parse_domain_access_url() {
- let endpoints = Endpoints::from_access_url("localhost:8080".to_string()).unwrap();
- assert_eq!(endpoints.access_url(), "localhost:8080");
- assert_eq!(endpoints.scheme(), AddressScheme::DomainName);
- let inner = endpoints.into_inner();
- assert_eq!(inner.addresses.len(), 1);
- assert_eq!(inner.addresses[0].host, "localhost");
- assert_eq!(inner.addresses[0].port, 8080);
- }
-
- #[test]
- fn parse_ipv4_access_url() {
- let endpoints = Endpoints::from_access_url("127.0.0.1:8080".to_string()).unwrap();
- assert_eq!(endpoints.access_url(), "127.0.0.1:8080");
- assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
- let inner = endpoints.into_inner();
- assert_eq!(inner.addresses.len(), 1);
- assert_eq!(inner.addresses[0].host, "127.0.0.1");
- assert_eq!(inner.addresses[0].port, 8080);
- }
-
- #[test]
- fn parse_ipv6_access_url() {
- let endpoints = Endpoints::from_access_url("::1:8080".to_string()).unwrap();
- assert_eq!(endpoints.access_url(), "::1:8080");
- assert_eq!(endpoints.scheme(), AddressScheme::IPv6);
- let inner = endpoints.into_inner();
- assert_eq!(inner.addresses.len(), 1);
- assert_eq!(inner.addresses[0].host, "::1");
- assert_eq!(inner.addresses[0].port, 8080);
- }
-
- #[test]
- fn parse_access_url_failed() {
- let err = Endpoints::from_access_url("".to_string()).err().unwrap();
- assert_eq!(err.kind, ErrorKind::Config);
- assert_eq!(err.operation, "access_url.parse");
- assert_eq!(err.message, "Access url is empty.");
-
- let err = Endpoints::from_access_url("localhost:<port>".to_string())
- .err()
- .unwrap();
- assert_eq!(err.kind, ErrorKind::Config);
- assert_eq!(err.operation, "access_url.parse");
- assert_eq!(err.message, "Port <port> in access url is invalid.");
-
- let err = Endpoints::from_access_url("localhost".to_string())
- .err()
- .unwrap();
- assert_eq!(err.kind, ErrorKind::Config);
- assert_eq!(err.operation, "access_url.parse");
- assert_eq!(err.message, "Port in access url is missing.");
-
- let err = Endpoints::from_access_url("127.0.0.1:8080,::1:8080".to_string())
- .err()
- .unwrap();
- assert_eq!(err.kind, ErrorKind::Config);
- assert_eq!(err.operation, "access_url.parse");
- assert_eq!(err.message, "Multiple addresses not in the same schema.");
-
- let err = Endpoints::from_access_url("::1:8080,127.0.0.1:8080".to_string())
- .err()
- .unwrap();
- assert_eq!(err.kind, ErrorKind::Config);
- assert_eq!(err.operation, "access_url.parse");
- assert_eq!(err.message, "Multiple addresses not in the same schema.");
-
- let err = Endpoints::from_access_url("localhost:8080,localhost:8081".to_string())
- .err()
- .unwrap();
- assert_eq!(err.kind, ErrorKind::Config);
- assert_eq!(err.operation, "access_url.parse");
- assert_eq!(
- err.message,
- "Multiple addresses not allowed in domain schema."
- );
- }
-}
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
new file mode 100644
index 0000000..2082c28
--- /dev/null
+++ b/rust/src/model/common.rs
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::net::IpAddr;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+use tokio::sync::oneshot;
+
+use crate::error::{ClientError, ErrorKind};
+use crate::pb;
+use crate::pb::{Address, AddressScheme, MessageQueue};
+
+#[derive(Debug)]
+pub struct Route {
+ pub(crate) index: AtomicUsize,
+ pub queue: Vec<MessageQueue>,
+}
+
+#[derive(Debug)]
+pub(crate) enum RouteStatus {
+ Querying(Option<Vec<oneshot::Sender<Result<Arc<Route>, ClientError>>>>),
+ Found(Arc<Route>),
+}
+
+#[derive(Debug)]
+pub(crate) struct Endpoints {
+ endpoint_url: String,
+ scheme: AddressScheme,
+ inner: pb::Endpoints,
+}
+
+impl Endpoints {
+ const OPERATION_PARSE: &'static str = "endpoint.parse";
+
+ const ENDPOINT_SEPARATOR: &'static str = ",";
+ const ADDRESS_SEPARATOR: &'static str = ":";
+
+ pub fn from_url(endpoint_url: &str) -> Result<Self, ClientError> {
+ if endpoint_url.is_empty() {
+ return Err(ClientError::new(
+ ErrorKind::Config,
+ "Endpoint url is empty.",
+ Self::OPERATION_PARSE,
+ )
+ .with_context("url", endpoint_url));
+ }
+
+ let mut scheme = AddressScheme::DomainName;
+ let mut urls = Vec::new();
+ for url in endpoint_url.split(Self::ENDPOINT_SEPARATOR) {
+ if let Some((host, port)) = url.rsplit_once(Self::ADDRESS_SEPARATOR) {
+ let port_i32 = port.parse::<i32>().map_err(|e| {
+ ClientError::new(
+ ErrorKind::Config,
+ &format!("Port {} in endpoint url is invalid.", port),
+ Self::OPERATION_PARSE,
+ )
+ .with_context("url", endpoint_url)
+ .set_source(e)
+ })?;
+ urls.push((host.to_string(), port_i32));
+ } else {
+ return Err(ClientError::new(
+ ErrorKind::Config,
+ "Port in endpoint url is missing.",
+ Self::OPERATION_PARSE,
+ )
+ .with_context("url", endpoint_url));
+ }
+ }
+
+ let mut addresses = Vec::new();
+ let urls_len = urls.len();
+ for (host, port) in urls.into_iter() {
+ match host.parse::<IpAddr>() {
+ Ok(ip_addr) => {
+ match ip_addr {
+ IpAddr::V4(_) => {
+ if scheme == AddressScheme::IPv6 {
+ return Err(ClientError::new(
+ ErrorKind::Config,
+ "Multiple addresses not in the same schema.",
+ Self::OPERATION_PARSE,
+ )
+ .with_context("url", endpoint_url));
+ }
+ scheme = AddressScheme::IPv4
+ }
+ IpAddr::V6(_) => {
+ if scheme == AddressScheme::IPv4 {
+ return Err(ClientError::new(
+ ErrorKind::Config,
+ "Multiple addresses not in the same schema.",
+ Self::OPERATION_PARSE,
+ )
+ .with_context("url", endpoint_url));
+ }
+ scheme = AddressScheme::IPv6
+ }
+ }
+ addresses.push(Address { host, port });
+ }
+ Err(_) => {
+ if urls_len > 1 {
+ return Err(ClientError::new(
+ ErrorKind::Config,
+ "Multiple addresses not allowed in domain schema.",
+ Self::OPERATION_PARSE,
+ )
+ .with_context("url", endpoint_url));
+ }
+ scheme = AddressScheme::DomainName;
+ addresses.push(Address { host, port });
+ }
+ }
+ }
+
+ Ok(Endpoints {
+ endpoint_url: endpoint_url.to_string(),
+ scheme,
+ inner: pb::Endpoints {
+ scheme: scheme as i32,
+ addresses,
+ },
+ })
+ }
+
+ pub(crate) fn from_pb_endpoints(endpoints: pb::Endpoints) -> Self {
+ let mut addresses = Vec::new();
+ for address in endpoints.addresses.iter() {
+ addresses.push(format!("{}:{}", address.host, address.port));
+ }
+
+ Endpoints {
+ endpoint_url: addresses.join(Self::ENDPOINT_SEPARATOR),
+ scheme: endpoints.scheme(),
+ inner: endpoints,
+ }
+ }
+
+ pub fn endpoint_url(&self) -> &str {
+ &self.endpoint_url
+ }
+
+ pub fn scheme(&self) -> AddressScheme {
+ self.scheme
+ }
+
+ pub fn inner(&self) -> &pb::Endpoints {
+ &self.inner
+ }
+
+ pub fn into_inner(self) -> pb::Endpoints {
+ self.inner
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::error::ErrorKind;
+ use crate::model::common::Endpoints;
+ use crate::pb;
+ use crate::pb::{Address, AddressScheme};
+
+ #[test]
+ fn parse_domain_endpoint_url() {
+ let endpoints = Endpoints::from_url("localhost:8080").unwrap();
+ assert_eq!(endpoints.endpoint_url(), "localhost:8080");
+ assert_eq!(endpoints.scheme(), AddressScheme::DomainName);
+ let inner = endpoints.into_inner();
+ assert_eq!(inner.addresses.len(), 1);
+ assert_eq!(inner.addresses[0].host, "localhost");
+ assert_eq!(inner.addresses[0].port, 8080);
+ }
+
+ #[test]
+ fn parse_ipv4_endpoint_url() {
+ let endpoints = Endpoints::from_url("127.0.0.1:8080").unwrap();
+ assert_eq!(endpoints.endpoint_url(), "127.0.0.1:8080");
+ assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
+ let inner = endpoints.into_inner();
+ assert_eq!(inner.addresses.len(), 1);
+ assert_eq!(inner.addresses[0].host, "127.0.0.1");
+ assert_eq!(inner.addresses[0].port, 8080);
+ }
+
+ #[test]
+ fn parse_ipv6_endpoint_url() {
+ let endpoints = Endpoints::from_url("::1:8080").unwrap();
+ assert_eq!(endpoints.endpoint_url(), "::1:8080");
+ assert_eq!(endpoints.scheme(), AddressScheme::IPv6);
+ let inner = endpoints.into_inner();
+ assert_eq!(inner.addresses.len(), 1);
+ assert_eq!(inner.addresses[0].host, "::1");
+ assert_eq!(inner.addresses[0].port, 8080);
+ }
+
+ #[test]
+ fn parse_endpoint_url_failed() {
+ let err = Endpoints::from_url("").err().unwrap();
+ assert_eq!(err.kind, ErrorKind::Config);
+ assert_eq!(err.operation, "endpoint.parse");
+ assert_eq!(err.message, "Endpoint url is empty.");
+
+ let err = Endpoints::from_url("localhost:<port>").err().unwrap();
+ assert_eq!(err.kind, ErrorKind::Config);
+ assert_eq!(err.operation, "endpoint.parse");
+ assert_eq!(err.message, "Port <port> in endpoint url is invalid.");
+
+ let err = Endpoints::from_url("localhost").err().unwrap();
+ assert_eq!(err.kind, ErrorKind::Config);
+ assert_eq!(err.operation, "endpoint.parse");
+ assert_eq!(err.message, "Port in endpoint url is missing.");
+
+ let err = Endpoints::from_url("127.0.0.1:8080,::1:8080")
+ .err()
+ .unwrap();
+ assert_eq!(err.kind, ErrorKind::Config);
+ assert_eq!(err.operation, "endpoint.parse");
+ assert_eq!(err.message, "Multiple addresses not in the same schema.");
+
+ let err = Endpoints::from_url("::1:8080,127.0.0.1:8080")
+ .err()
+ .unwrap();
+ assert_eq!(err.kind, ErrorKind::Config);
+ assert_eq!(err.operation, "endpoint.parse");
+ assert_eq!(err.message, "Multiple addresses not in the same schema.");
+
+ let err = Endpoints::from_url("localhost:8080,localhost:8081")
+ .err()
+ .unwrap();
+ assert_eq!(err.kind, ErrorKind::Config);
+ assert_eq!(err.operation, "endpoint.parse");
+ assert_eq!(
+ err.message,
+ "Multiple addresses not allowed in domain schema."
+ );
+ }
+
+ #[test]
+ fn parse_pb_endpoints() {
+ let pb_endpoints = pb::Endpoints {
+ scheme: AddressScheme::IPv4 as i32,
+ addresses: vec![
+ Address {
+ host: "localhost".to_string(),
+ port: 8080,
+ },
+ Address {
+ host: "127.0.0.1".to_string(),
+ port: 8081,
+ },
+ ],
+ };
+
+ let endpoints = Endpoints::from_pb_endpoints(pb_endpoints);
+ assert_eq!(endpoints.endpoint_url(), "localhost:8080,127.0.0.1:8081");
+ assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
+ assert_eq!(endpoints.into_inner().addresses.len(), 2);
+ }
+}
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
new file mode 100644
index 0000000..17584d0
--- /dev/null
+++ b/rust/src/model/message.rs
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::error::{ClientError, ErrorKind};
+use crate::model::message_id::UNIQ_ID_GENERATOR;
+use std::collections::HashMap;
+
+pub trait Message {
+ fn take_message_id(&mut self) -> String;
+ fn take_topic(&mut self) -> String;
+ fn take_body(&mut self) -> Vec<u8>;
+ fn take_tag(&mut self) -> Option<String>;
+ fn take_keys(&mut self) -> Vec<String>;
+ fn take_properties(&mut self) -> HashMap<String, String>;
+ fn take_message_group(&mut self) -> Option<String>;
+ fn take_delivery_timestamp(&mut self) -> Option<i64>;
+}
+
+#[derive(Debug)]
+pub struct MessageImpl {
+ pub(crate) message_id: String,
+ pub(crate) topic: String,
+ pub(crate) body: Option<Vec<u8>>,
+ pub(crate) tags: Option<String>,
+ pub(crate) keys: Option<Vec<String>>,
+ pub(crate) properties: Option<HashMap<String, String>>,
+ pub(crate) message_group: Option<String>,
+ pub(crate) delivery_timestamp: Option<i64>,
+}
+
+impl Message for MessageImpl {
+ fn take_message_id(&mut self) -> String {
+ self.message_id.clone()
+ }
+
+ fn take_topic(&mut self) -> String {
+ self.topic.clone()
+ }
+
+ fn take_body(&mut self) -> Vec<u8> {
+ self.body.take().unwrap_or(vec![])
+ }
+
+ fn take_tag(&mut self) -> Option<String> {
+ self.tags.take()
+ }
+
+ fn take_keys(&mut self) -> Vec<String> {
+ self.keys.take().unwrap_or(vec![])
+ }
+
+ fn take_properties(&mut self) -> HashMap<String, String> {
+ self.properties.take().unwrap_or(HashMap::new())
+ }
+
+ fn take_message_group(&mut self) -> Option<String> {
+ self.message_group.take()
+ }
+
+ fn take_delivery_timestamp(&mut self) -> Option<i64> {
+ self.delivery_timestamp.take()
+ }
+}
+
+impl MessageImpl {
+ pub fn builder() -> MessageBuilder {
+ MessageBuilder {
+ message: MessageImpl {
+ message_id: UNIQ_ID_GENERATOR.lock().next_id(),
+ topic: "".to_string(),
+ body: None,
+ tags: None,
+ keys: None,
+ properties: None,
+ message_group: None,
+ delivery_timestamp: None,
+ },
+ }
+ }
+
+ pub fn fifo_message_builder(
+ topic: String,
+ body: Vec<u8>,
+ message_group: String,
+ ) -> MessageBuilder {
+ MessageBuilder {
+ message: MessageImpl {
+ message_id: UNIQ_ID_GENERATOR.lock().next_id(),
+ topic,
+ body: Some(body),
+ tags: None,
+ keys: None,
+ properties: None,
+ message_group: Some(message_group),
+ delivery_timestamp: None,
+ },
+ }
+ }
+
+ pub fn delay_message_builder(topic: String, body: Vec<u8>, delay_time: i64) -> MessageBuilder {
+ MessageBuilder {
+ message: MessageImpl {
+ message_id: UNIQ_ID_GENERATOR.lock().next_id(),
+ topic,
+ body: Some(body),
+ tags: None,
+ keys: None,
+ properties: None,
+ message_group: None,
+ delivery_timestamp: Some(delay_time),
+ },
+ }
+ }
+}
+
+pub struct MessageBuilder {
+ message: MessageImpl,
+}
+
+impl MessageBuilder {
+ const OPERATION_BUILD_MESSAGE: &'static str = "build_message";
+
+ pub fn set_topic(mut self, topic: String) -> Self {
+ self.message.topic = topic;
+ self
+ }
+
+ pub fn set_body(mut self, body: Vec<u8>) -> Self {
+ self.message.body = Some(body);
+ self
+ }
+
+ pub fn set_tags(mut self, tags: String) -> Self {
+ self.message.tags = Some(tags);
+ self
+ }
+
+ pub fn set_keys(mut self, keys: Vec<String>) -> Self {
+ self.message.keys = Some(keys);
+ self
+ }
+
+ pub fn set_properties(mut self, properties: HashMap<String, String>) -> Self {
+ self.message.properties = Some(properties);
+ self
+ }
+
+ pub fn set_message_group(mut self, message_group: String) -> Self {
+ self.message.message_group = Some(message_group);
+ self
+ }
+
+ pub fn set_delivery_timestamp(mut self, delivery_timestamp: i64) -> Self {
+ self.message.delivery_timestamp = Some(delivery_timestamp);
+ self
+ }
+
+ fn check_message(&self) -> Result<(), String> {
+ if self.message.topic.is_empty() {
+ return Err("Topic is empty.".to_string());
+ }
+ if self.message.body.is_none() {
+ return Err("Body is empty.".to_string());
+ }
+ if self.message.message_group.is_some() && self.message.delivery_timestamp.is_some() {
+ return Err(
+ "message_group and delivery_timestamp can not be set at the same time.".to_string(),
+ );
+ }
+ Ok(())
+ }
+
+ pub fn build(self) -> Result<MessageImpl, ClientError> {
+ self.check_message().map_err(|e| {
+ ClientError::new(ErrorKind::InvalidMessage, &e, Self::OPERATION_BUILD_MESSAGE)
+ })?;
+ Ok(self.message)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_message() {
+ let mut properties = HashMap::new();
+ properties.insert("key".to_string(), "value".to_string());
+ let message = MessageImpl::builder()
+ .set_topic("test".to_string())
+ .set_body(vec![1, 2, 3])
+ .set_tags("tag".to_string())
+ .set_keys(vec!["key".to_string()])
+ .set_properties(properties)
+ .build();
+ assert!(message.is_ok());
+
+ let mut message = message.unwrap();
+ assert_eq!(message.take_topic(), "test");
+ assert_eq!(message.take_body(), vec![1, 2, 3]);
+ assert_eq!(message.take_tag(), Some("tag".to_string()));
+ assert_eq!(message.take_keys(), vec!["key".to_string()]);
+ assert_eq!(message.take_properties(), {
+ let mut properties = HashMap::new();
+ properties.insert("key".to_string(), "value".to_string());
+ properties
+ });
+
+ let message = MessageImpl::builder()
+ .set_topic("test".to_string())
+ .set_body(vec![1, 2, 3])
+ .set_message_group("message_group".to_string())
+ .set_delivery_timestamp(123456789)
+ .build();
+ assert!(message.is_err());
+ let err = message.unwrap_err();
+ assert_eq!(err.kind, ErrorKind::InvalidMessage);
+ assert_eq!(
+ err.message,
+ "message_group and delivery_timestamp can not be set at the same time."
+ );
+
+ let message = MessageImpl::builder()
+ .set_topic("test".to_string())
+ .set_body(vec![1, 2, 3])
+ .set_message_group("message_group".to_string())
+ .build();
+ let mut message = message.unwrap();
+ assert_eq!(
+ message.take_message_group(),
+ Some("message_group".to_string())
+ );
+
+ let message = MessageImpl::builder()
+ .set_topic("test".to_string())
+ .set_body(vec![1, 2, 3])
+ .set_delivery_timestamp(123456789)
+ .build();
+ let mut message = message.unwrap();
+ assert_eq!(message.take_delivery_timestamp(), Some(123456789));
+ }
+}
diff --git a/rust/src/models/message_id.rs b/rust/src/model/message_id.rs
similarity index 95%
rename from rust/src/models/message_id.rs
rename to rust/src/model/message_id.rs
index 92d4e44..1357a60 100644
--- a/rust/src/models/message_id.rs
+++ b/rust/src/model/message_id.rs
@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-use byteorder::{BigEndian, WriteBytesExt};
-use once_cell::sync::Lazy;
-use parking_lot::Mutex;
use std::io::Write;
use std::process;
use std::time::SystemTime;
+
+use byteorder::{BigEndian, WriteBytesExt};
+use once_cell::sync::Lazy;
+use parking_lot::Mutex;
use time::{Date, OffsetDateTime, PrimitiveDateTime, Time};
/**
@@ -62,7 +63,7 @@
pub(crate) static UNIQ_ID_GENERATOR: Lazy<Mutex<UniqueIdGenerator>> = Lazy::new(|| {
let mut wtr = Vec::new();
wtr.write_u8(1).unwrap();
- //mac
+ // mac
let x = mac_address::get_mac_address().unwrap();
let ma = match x {
Some(ma) => ma,
@@ -71,9 +72,8 @@
}
};
wtr.write_all(&ma.bytes()).unwrap();
- //processid
- wtr.write_u16::<byteorder::BigEndian>(process::id() as u16)
- .unwrap();
+ // process id
+ wtr.write_u16::<BigEndian>(process::id() as u16).unwrap();
let generator = UniqueIdGenerator {
counter: 0,
start_timestamp: 0,
@@ -91,7 +91,7 @@
}
impl UniqueIdGenerator {
- pub fn generate(&mut self) -> String {
+ pub fn next_id(&mut self) -> String {
if SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
@@ -133,7 +133,7 @@
fn text_generate_uniq_id() {
use super::UNIQ_ID_GENERATOR;
for i in 0..10 {
- let uid = UNIQ_ID_GENERATOR.lock().generate();
+ let uid = UNIQ_ID_GENERATOR.lock().next_id();
println!("i: {}, uid: {}", i, uid);
}
}
diff --git a/rust/src/models/mod.rs b/rust/src/model/mod.rs
similarity index 96%
rename from rust/src/models/mod.rs
rename to rust/src/model/mod.rs
index 8496998..1a873d6 100644
--- a/rust/src/models/mod.rs
+++ b/rust/src/model/mod.rs
@@ -14,6 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub(crate) mod common;
pub(crate) mod message;
pub(crate) mod message_id;
-pub(crate) mod message_view;
diff --git a/rust/src/models/message.rs b/rust/src/models/message.rs
deleted file mode 100644
index f8366aa..0000000
--- a/rust/src/models/message.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use std::{
- collections::HashMap,
- io::Write,
- mem, process,
- sync::Arc,
- sync::{atomic::AtomicUsize, Weak},
-};
-pub(crate) struct MessageImpl {
- pub(crate) keys: Vec<String>,
- pub(crate) body: Vec<u8>,
- pub(crate) topic: String,
- pub(crate) tags: String,
- pub(crate) message_group: String,
- pub(crate) delivery_timestamp: i64,
- pub(crate) properties: HashMap<String, String>,
-}
-
-impl MessageImpl {
- pub fn new(topic: &str, tags: &str, keys: Vec<String>, body: &str) -> Self {
- MessageImpl {
- keys: keys,
- body: body.as_bytes().to_vec(),
- topic: topic.to_string(),
- tags: tags.to_string(),
- message_group: "".to_string(),
- delivery_timestamp: 0,
- properties: HashMap::new(),
- }
- }
-}
diff --git a/rust/src/models/message_view.rs b/rust/src/models/message_view.rs
deleted file mode 100644
index f0849b0..0000000
--- a/rust/src/models/message_view.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#[derive(Debug)]
-pub(crate) struct MessageView {
- pub(crate) body: Vec<u8>,
- pub(crate) message_id: String,
- pub(crate) topic: String,
- pub(crate) consume_group: String,
- pub(crate) endpoint: String,
- pub(crate) receipt_handle: String,
-}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 891c9c4..e288a5c 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -20,12 +20,22 @@
//! `Producer` is a thin wrapper of internal `Client` struct that shoulders the actual workloads.
//! Most of its methods take shared reference so that application developers may use it at will.
+use std::hash::Hasher;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use prost_types::Timestamp;
+use siphasher::sip::SipHasher24;
+use slog::{info, Logger};
+
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
-use crate::error::ClientError;
-use crate::log;
-use crate::pb::{Message, SendResultEntry};
-use slog::{info, Logger};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{Endpoints, Route};
+use crate::model::message;
+use crate::pb::{Encoding, MessageQueue, Resource, SendResultEntry, SystemProperties};
+use crate::{log, pb};
/// `Producer` is the core struct, to which application developers should turn, when publishing messages to brokers.
///
@@ -37,7 +47,16 @@
client: Client,
}
+lazy_static::lazy_static! {
+ static ref HOST_NAME: String = match hostname::get() {
+ Ok(name) => name.to_str().unwrap_or("localhost").to_string(),
+ Err(_) => "localhost".to_string(),
+ };
+}
+
impl Producer {
+ const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
+
pub async fn new(
option: ProducerOption,
client_option: ClientOption,
@@ -65,49 +84,283 @@
Ok(())
}
- pub async fn send(&self, message: Message) -> Result<SendResultEntry, ClientError> {
- self.client.send_message(message).await
+ fn transform_messages_to_protobuf(
+ &self,
+ messages: Vec<impl message::Message>,
+ ) -> Result<(String, Option<String>, Vec<pb::Message>), ClientError> {
+ if messages.is_empty() {
+ return Err(ClientError::new(
+ ErrorKind::InvalidMessage,
+ "No message found.",
+ Self::OPERATION_SEND_MESSAGE,
+ ));
+ }
+
+ let born_timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
+ Ok(duration) => Some(Timestamp {
+ seconds: duration.as_secs() as i64,
+ nanos: 0,
+ }),
+ Err(_) => None,
+ };
+
+ let mut pb_messages = Vec::with_capacity(messages.len());
+ let mut last_topic: Option<String> = None;
+ let mut last_message_group: Option<Option<String>> = None;
+
+ for mut message in messages {
+ if let Some(last_topic) = last_topic.clone() {
+ if last_topic.ne(&message.take_topic()) {
+ return Err(ClientError::new(
+ ErrorKind::InvalidMessage,
+ "Not all messages have the same topic.",
+ Self::OPERATION_SEND_MESSAGE,
+ ));
+ }
+ } else {
+ last_topic = Some(message.take_topic());
+ }
+
+ let message_group = message.take_message_group();
+ if let Some(last_message_group) = last_message_group.clone() {
+ if last_message_group.ne(&message_group) {
+ return Err(ClientError::new(
+ ErrorKind::InvalidMessage,
+ "Not all messages have the same message group.",
+ Self::OPERATION_SEND_MESSAGE,
+ ));
+ }
+ } else {
+ last_message_group = Some(message_group.clone());
+ }
+
+ let delivery_timestamp = message
+ .take_delivery_timestamp()
+ .map(|seconds| Timestamp { seconds, nanos: 0 });
+
+ let pb_message = pb::Message {
+ topic: Some(Resource {
+ name: message.take_topic(),
+ resource_namespace: self.option.namespace().to_string(),
+ }),
+ user_properties: message.take_properties(),
+ system_properties: Some(SystemProperties {
+ tag: message.take_tag(),
+ keys: message.take_keys(),
+ message_id: message.take_message_id(),
+ message_group,
+ delivery_timestamp,
+ born_host: HOST_NAME.clone(),
+ born_timestamp: born_timestamp.clone(),
+ body_digest: None,
+ body_encoding: Encoding::Identity as i32,
+ ..SystemProperties::default()
+ }),
+ body: message.take_body(),
+ };
+ pb_messages.push(pb_message);
+ }
+
+ let topic = last_topic.unwrap();
+ if topic.is_empty() {
+ return Err(ClientError::new(
+ ErrorKind::InvalidMessage,
+ "Message topic is empty.",
+ Self::OPERATION_SEND_MESSAGE,
+ ));
+ }
+
+ Ok((topic, last_message_group.unwrap(), pb_messages))
+ }
+
+ pub async fn send_one(
+ &self,
+ message: impl message::Message,
+ ) -> Result<SendResultEntry, ClientError> {
+ let results = self.send(vec![message]).await?;
+ Ok(results[0].clone())
+ }
+
+ pub async fn send(
+ &self,
+ messages: Vec<impl message::Message>,
+ ) -> Result<Vec<SendResultEntry>, ClientError> {
+ let (topic, message_group, mut pb_messages) =
+ self.transform_messages_to_protobuf(messages)?;
+
+ let route = self.client.topic_route(&topic, true).await?;
+
+ let message_queue = if let Some(message_group) = message_group {
+ self.select_message_queue_by_message_group(route, message_group)
+ } else {
+ self.select_message_queue(route)
+ };
+
+ if message_queue.broker.is_none() {
+ return Err(ClientError::new(
+ ErrorKind::NoBrokerAvailable,
+ "Message queue do not have a available endpoint.",
+ Self::OPERATION_SEND_MESSAGE,
+ )
+ .with_context("message_queue", format!("{:?}", message_queue)));
+ }
+
+ let broker = message_queue.broker.unwrap();
+ if broker.endpoints.is_none() {
+ return Err(ClientError::new(
+ ErrorKind::NoBrokerAvailable,
+ "Message queue do not have a available endpoint.",
+ Self::OPERATION_SEND_MESSAGE,
+ )
+ .with_context("broker", broker.name)
+ .with_context("topic", topic)
+ .with_context("queue_id", message_queue.id.to_string()));
+ }
+
+ let endpoints = Endpoints::from_pb_endpoints(broker.endpoints.unwrap());
+ for message in pb_messages.iter_mut() {
+ message.system_properties.as_mut().unwrap().queue_id = message_queue.id;
+ }
+
+ self.client.send_message(pb_messages, &endpoints).await
+ }
+
+ fn select_message_queue(&self, route: Arc<Route>) -> MessageQueue {
+ let i = route.index.fetch_add(1, Ordering::Relaxed);
+ route.queue[i % route.queue.len()].clone()
+ }
+
+ fn select_message_queue_by_message_group(
+ &self,
+ route: Arc<Route>,
+ message_group: String,
+ ) -> MessageQueue {
+ let mut sip_hasher24 = SipHasher24::default();
+ sip_hasher24.write(message_group.as_bytes());
+ let index = sip_hasher24.finish() % route.queue.len() as u64;
+ route.queue[index as usize].clone()
}
}
-// #[cfg(test)]
-// mod tests {
-// use crate::conf::{ClientOption, ProducerOption};
-// use crate::pb::{Message, Resource, SystemProperties};
-// use crate::producer::Producer;
-//
-// #[tokio::test]
-// async fn test_producer_start() {
-// let mut producer_option = ProducerOption::default();
-// producer_option.set_topics(vec!["DefaultCluster".to_string()]);
-// let producer = Producer::new(producer_option, ClientOption::default())
-// .await
-// .unwrap();
-// producer.start().await.unwrap();
-// }
-//
-// #[tokio::test]
-// async fn test_producer_send() {
-// let mut producer_option = ProducerOption::default();
-// producer_option.set_topics(vec!["DefaultCluster".to_string()]);
-// let producer = Producer::new(producer_option, ClientOption::default())
-// .await
-// .unwrap();
-// producer.start().await.unwrap();
-// let send_result = producer
-// .send(Message {
-// topic: Some(Resource {
-// resource_namespace: "".to_string(),
-// name: "DefaultCluster".to_string(),
-// }),
-// user_properties: Default::default(),
-// system_properties: Some(SystemProperties {
-// message_id: "message_test_id".to_string(),
-// ..SystemProperties::default()
-// }),
-// body: "Hello world".to_string().into_bytes(),
-// })
-// .await;
-// println!("{:?}", send_result);
-// }
-// }
+#[cfg(test)]
+mod tests {
+ use crate::conf::{ClientOption, ProducerOption};
+ use crate::error::ErrorKind;
+ use crate::model::message::MessageImpl;
+ use crate::producer::Producer;
+
+ // #[tokio::test]
+ // async fn producer_start() {
+ // let mut producer_option = ProducerOption::default();
+ // producer_option.set_topics(vec!["DefaultCluster".to_string()]);
+ // let producer = Producer::new(producer_option, ClientOption::default())
+ // .await
+ // .unwrap();
+ // producer.start().await.unwrap();
+ // }
+
+ #[tokio::test]
+ async fn producer_transform_messages_to_protobuf() {
+ let producer = Producer::new(ProducerOption::default(), ClientOption::default())
+ .await
+ .unwrap();
+ let messages = vec![MessageImpl::builder()
+ .set_topic("DefaultCluster".to_string())
+ .set_body("hello world".as_bytes().to_vec())
+ .set_tags("tag".to_string())
+ .set_keys(vec!["key".to_string()])
+ .set_properties(
+ vec![("key".to_string(), "value".to_string())]
+ .into_iter()
+ .collect(),
+ )
+ .set_message_group("message_group".to_string())
+ .build()
+ .unwrap()];
+ let result = producer.transform_messages_to_protobuf(messages);
+ assert!(result.is_ok());
+
+ let (topic, message_group, pb_messages) = result.unwrap();
+ assert_eq!(topic, "DefaultCluster");
+ assert_eq!(message_group, Some("message_group".to_string()));
+
+ // check message
+ assert_eq!(pb_messages.len(), 1);
+ let message = pb_messages.get(0).unwrap().clone();
+ assert_eq!(message.topic.unwrap().name, "DefaultCluster");
+ let system_properties = message.system_properties.unwrap();
+ assert_eq!(system_properties.tag.unwrap(), "tag");
+ assert_eq!(system_properties.keys, vec!["key".to_string()]);
+ assert_eq!(message.user_properties.get("key").unwrap(), "value");
+ assert_eq!(std::str::from_utf8(&message.body).unwrap(), "hello world");
+ assert!(!system_properties.message_id.is_empty());
+ }
+
+ #[tokio::test]
+ async fn producer_transform_messages_to_protobuf_failed() {
+ let producer = Producer::new(ProducerOption::default(), ClientOption::default())
+ .await
+ .unwrap();
+
+ let messages: Vec<MessageImpl> = vec![];
+ let result = producer.transform_messages_to_protobuf(messages);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert_eq!(err.kind, ErrorKind::InvalidMessage);
+ assert_eq!(err.message, "No message found.");
+
+ let messages = vec![MessageImpl {
+ message_id: "".to_string(),
+ topic: "".to_string(),
+ body: None,
+ tags: None,
+ keys: None,
+ properties: None,
+ message_group: None,
+ delivery_timestamp: None,
+ }];
+ let result = producer.transform_messages_to_protobuf(messages);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert_eq!(err.kind, ErrorKind::InvalidMessage);
+ assert_eq!(err.message, "Message topic is empty.");
+
+ let messages = vec![
+ MessageImpl::builder()
+ .set_topic("DefaultCluster".to_string())
+ .set_body("hello world".as_bytes().to_vec())
+ .build()
+ .unwrap(),
+ MessageImpl::builder()
+ .set_topic("DefaultCluster_dup".to_string())
+ .set_body("hello world".as_bytes().to_vec())
+ .build()
+ .unwrap(),
+ ];
+ let result = producer.transform_messages_to_protobuf(messages);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert_eq!(err.kind, ErrorKind::InvalidMessage);
+ assert_eq!(err.message, "Not all messages have the same topic.");
+
+ let messages = vec![
+ MessageImpl::builder()
+ .set_topic("DefaultCluster".to_string())
+ .set_body("hello world".as_bytes().to_vec())
+ .set_message_group("message_group".to_string())
+ .build()
+ .unwrap(),
+ MessageImpl::builder()
+ .set_topic("DefaultCluster".to_string())
+ .set_body("hello world".as_bytes().to_vec())
+ .set_message_group("message_group_dup".to_string())
+ .build()
+ .unwrap(),
+ ];
+ let result = producer.transform_messages_to_protobuf(messages);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert_eq!(err.kind, ErrorKind::InvalidMessage);
+ assert_eq!(err.message, "Not all messages have the same message group.");
+ }
+}
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 182a235..b84469e 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,13 +18,14 @@
use async_trait::async_trait;
use mockall::automock;
-use slog::{debug, info, o, Logger};
+use slog::{info, o, Logger};
use tokio::sync::Mutex;
+use tonic::metadata::AsciiMetadataValue;
use tonic::transport::{Channel, Endpoint};
use crate::conf::ClientOption;
use crate::error::ErrorKind;
-use crate::model::Endpoints;
+use crate::model::common::Endpoints;
use crate::pb::{QueryRouteRequest, QueryRouteResponse, SendMessageRequest, SendMessageResponse};
use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
@@ -64,8 +65,7 @@
client_id: String,
option: &ClientOption,
) -> Result<Self, ClientError> {
- let peer = endpoints.access_url().to_owned();
- debug!(logger, "creating session, peer={}", peer);
+ let peer = endpoints.endpoint_url().to_owned();
let mut channel_endpoints = Vec::new();
for endpoint in endpoints.inner().addresses.clone() {
@@ -75,7 +75,7 @@
if channel_endpoints.is_empty() {
return Err(ClientError::new(
ErrorKind::Connect,
- "No endpoint available.".to_string(),
+ "No endpoint available.",
Self::OPERATION_CREATE,
)
.with_context("peer", peer.clone()));
@@ -85,7 +85,7 @@
channel_endpoints[0].connect().await.map_err(|e| {
ClientError::new(
ErrorKind::Connect,
- "Failed to connect to peer.".to_string(),
+ "Failed to connect to peer.",
Self::OPERATION_CREATE,
)
.set_source(e)
@@ -100,7 +100,7 @@
info!(
logger,
"create session success, peer={}",
- endpoints.access_url()
+ endpoints.endpoint_url()
);
Ok(Session {
@@ -124,7 +124,7 @@
.map_err(|e| {
ClientError::new(
ErrorKind::Connect,
- "Failed to create channel endpoint.".to_string(),
+ "Failed to create channel endpoint.",
Self::OPERATION_CREATE,
)
.set_source(e)
@@ -147,22 +147,17 @@
}
fn sign(&self, metadata: &mut tonic::metadata::MetadataMap) {
- // let _ = tonic::metadata::AsciiMetadataValue::try_from(&self.id).and_then(|v| {
- // metadata.insert("x-mq-client-id", v);
- // Ok(())
- // });
+ let _ = AsciiMetadataValue::try_from(&self.client_id)
+ .map(|v| metadata.insert("x-mq-client-id", v));
- metadata.insert(
- "x-mq-language",
- tonic::metadata::AsciiMetadataValue::from_static("RUST"),
- );
+ metadata.insert("x-mq-language", AsciiMetadataValue::from_static("RUST"));
metadata.insert(
"x-mq-client-version",
- tonic::metadata::AsciiMetadataValue::from_static("5.0.0"),
+ AsciiMetadataValue::from_static("5.0.0"),
);
metadata.insert(
"x-mq-protocol-version",
- tonic::metadata::AsciiMetadataValue::from_static("2.0.0"),
+ AsciiMetadataValue::from_static("2.0.0"),
);
}
}
@@ -178,7 +173,7 @@
let response = self.stub.query_route(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
- "Query topic route rpc failed.".to_string(),
+ "Query topic route rpc failed.",
Self::OPERATION_QUERY_ROUTE,
)
.set_source(e)
@@ -190,10 +185,12 @@
&mut self,
request: SendMessageRequest,
) -> Result<SendMessageResponse, ClientError> {
+ let mut request = tonic::Request::new(request);
+ self.sign(request.metadata_mut());
let response = self.stub.send_message(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
- "Send message rpc failed.".to_string(),
+ "Send message rpc failed.",
Self::OPERATION_SEND_MESSAGE,
)
.set_source(e)
@@ -224,9 +221,9 @@
pub(crate) async fn get_session(&self, endpoints: &Endpoints) -> Result<Session, ClientError> {
let mut session_map = self.session_map.lock().await;
- let access_url = endpoints.access_url().to_string();
- return if session_map.contains_key(&access_url) {
- Ok(session_map.get(&access_url).unwrap().clone())
+ let endpoint_url = endpoints.endpoint_url().to_string();
+ return if session_map.contains_key(&endpoint_url) {
+ Ok(session_map.get(&endpoint_url).unwrap().clone())
} else {
let session = Session::new(
&self.logger,
@@ -235,7 +232,7 @@
&self.option,
)
.await?;
- session_map.insert(access_url.clone(), session.clone());
+ session_map.insert(endpoint_url.clone(), session.clone());
Ok(session)
};
}
@@ -244,6 +241,7 @@
#[cfg(test)]
mod tests {
use crate::log::terminal_logger;
+ use slog::debug;
use wiremock_grpc::generate;
use super::*;
@@ -256,7 +254,7 @@
let logger = terminal_logger();
let session = Session::new(
&logger,
- &Endpoints::from_access_url(format!("localhost:{}", server.address().port())).unwrap(),
+ &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
"test_client".to_string(),
&ClientOption::default(),
)
@@ -269,7 +267,7 @@
let logger = terminal_logger();
let session = Session::new(
&logger,
- &Endpoints::from_access_url("127.0.0.1:8080,127.0.0.1:8081".to_string()).unwrap(),
+ &Endpoints::from_url("127.0.0.1:8080,127.0.0.1:8081").unwrap(),
"test_client".to_string(),
&ClientOption::default(),
)
@@ -285,8 +283,7 @@
SessionManager::new(&logger, "test_client".to_string(), &ClientOption::default());
let session = session_manager
.get_session(
- &Endpoints::from_access_url(format!("localhost:{}", server.address().port()))
- .unwrap(),
+ &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(),
)
.await
.unwrap();