blob: 4ce209e214198b4de84d4643bdd2bdd6d90fee1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//! Common data model of RocketMQ rust client.
use std::net::IpAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::oneshot;
use crate::error::{ClientError, ErrorKind};
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
Producer = 1,
PushConsumer = 2,
SimpleConsumer = 3,
PullConsumer = 4,
}
#[derive(Debug)]
pub(crate) struct Route {
pub(crate) index: AtomicUsize,
pub queue: Vec<MessageQueue>,
}
#[derive(Debug)]
pub(crate) enum RouteStatus {
Querying(Option<Vec<oneshot::Sender<Result<Arc<Route>, ClientError>>>>),
Found(Arc<Route>),
}
/// Access points for receive messages or querying topic routes.
#[derive(Debug, Clone)]
pub struct Endpoints {
endpoint_url: String,
scheme: AddressScheme,
inner: pb::Endpoints,
}
impl Endpoints {
const OPERATION_PARSE: &'static str = "endpoint.parse";
const ENDPOINT_SEPARATOR: &'static str = ",";
const ADDRESS_SEPARATOR: &'static str = ":";
pub(crate) fn from_url(endpoint_url: &str) -> Result<Self, ClientError> {
if endpoint_url.is_empty() {
return Err(ClientError::new(
ErrorKind::Config,
"endpoint url is empty",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
let mut scheme = AddressScheme::DomainName;
let mut urls = Vec::new();
for url in endpoint_url.split(Self::ENDPOINT_SEPARATOR) {
if let Some((host, port)) = url.rsplit_once(Self::ADDRESS_SEPARATOR) {
let port_i32 = port.parse::<i32>().map_err(|e| {
ClientError::new(
ErrorKind::Config,
&format!("port {} in endpoint url is invalid", port),
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url)
.set_source(e)
})?;
urls.push((host.to_string(), port_i32));
} else {
return Err(ClientError::new(
ErrorKind::Config,
"port in endpoint url is missing",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
}
let mut addresses = Vec::new();
let urls_len = urls.len();
for (host, port) in urls.into_iter() {
match host.parse::<IpAddr>() {
Ok(ip_addr) => {
match ip_addr {
IpAddr::V4(_) => {
if scheme == AddressScheme::IPv6 {
return Err(ClientError::new(
ErrorKind::Config,
"multiple addresses not in the same schema",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
scheme = AddressScheme::IPv4
}
IpAddr::V6(_) => {
if scheme == AddressScheme::IPv4 {
return Err(ClientError::new(
ErrorKind::Config,
"multiple addresses not in the same schema",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
scheme = AddressScheme::IPv6
}
}
addresses.push(Address { host, port });
}
Err(_) => {
if urls_len > 1 {
return Err(ClientError::new(
ErrorKind::Config,
"multiple addresses not allowed in domain schema",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
scheme = AddressScheme::DomainName;
addresses.push(Address { host, port });
}
}
}
Ok(Endpoints {
endpoint_url: endpoint_url.to_string(),
scheme,
inner: pb::Endpoints {
scheme: scheme as i32,
addresses,
},
})
}
pub(crate) fn from_pb_endpoints(endpoints: pb::Endpoints) -> Self {
let mut addresses = Vec::new();
for address in endpoints.addresses.iter() {
addresses.push(format!("{}:{}", address.host, address.port));
}
Endpoints {
endpoint_url: addresses.join(Self::ENDPOINT_SEPARATOR),
scheme: endpoints.scheme(),
inner: endpoints,
}
}
/// Get endpoint url
pub fn endpoint_url(&self) -> &str {
&self.endpoint_url
}
/// Get address scheme of endpoint
pub fn scheme(&self) -> AddressScheme {
self.scheme
}
pub(crate) fn inner(&self) -> &pb::Endpoints {
&self.inner
}
#[allow(dead_code)]
pub(crate) fn into_inner(self) -> pb::Endpoints {
self.inner
}
}
/// Filter type for message filtering.
///
/// RocketMQ allows to filter messages by tag or SQL.
#[derive(Clone, Copy)]
#[repr(i32)]
pub enum FilterType {
/// Filter by tag
Tag = 1,
/// Filter by SQL
Sql = 2,
}
/// Filter expression for message filtering.
pub struct FilterExpression {
filter_type: FilterType,
expression: String,
}
impl FilterExpression {
/// Create a new filter expression
///
/// # Arguments
///
/// * `filter_type` - set filter type
/// * `expression` - set message tag or SQL query string
pub fn new(filter_type: FilterType, expression: impl Into<String>) -> Self {
FilterExpression {
filter_type,
expression: expression.into(),
}
}
/// Get filter type
pub fn filter_type(&self) -> FilterType {
self.filter_type
}
/// Get message tag or SQL query string
pub fn expression(&self) -> &str {
&self.expression
}
}
/// Send result returned by producer.
#[derive(Clone, Debug)]
pub struct SendReceipt {
message_id: String,
transaction_id: String,
}
impl SendReceipt {
pub(crate) fn from_pb_send_result(entry: &pb::SendResultEntry) -> Self {
SendReceipt {
message_id: entry.message_id.clone(),
transaction_id: entry.transaction_id.clone(),
}
}
/// Get message id
pub fn message_id(&self) -> &str {
&self.message_id
}
/// Get transaction id
pub fn transaction_id(&self) -> &str {
&self.transaction_id
}
}
#[cfg(test)]
mod tests {
use crate::error::ErrorKind;
use crate::model::common::Endpoints;
use crate::pb;
use crate::pb::{Address, AddressScheme};
#[test]
fn parse_domain_endpoint_url() {
let endpoints = Endpoints::from_url("localhost:8080").unwrap();
assert_eq!(endpoints.endpoint_url(), "localhost:8080");
assert_eq!(endpoints.scheme(), AddressScheme::DomainName);
let inner = endpoints.into_inner();
assert_eq!(inner.addresses.len(), 1);
assert_eq!(inner.addresses[0].host, "localhost");
assert_eq!(inner.addresses[0].port, 8080);
}
#[test]
fn parse_ipv4_endpoint_url() {
let endpoints = Endpoints::from_url("127.0.0.1:8080").unwrap();
assert_eq!(endpoints.endpoint_url(), "127.0.0.1:8080");
assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
let inner = endpoints.into_inner();
assert_eq!(inner.addresses.len(), 1);
assert_eq!(inner.addresses[0].host, "127.0.0.1");
assert_eq!(inner.addresses[0].port, 8080);
}
#[test]
fn parse_ipv6_endpoint_url() {
let endpoints = Endpoints::from_url("::1:8080").unwrap();
assert_eq!(endpoints.endpoint_url(), "::1:8080");
assert_eq!(endpoints.scheme(), AddressScheme::IPv6);
let inner = endpoints.into_inner();
assert_eq!(inner.addresses.len(), 1);
assert_eq!(inner.addresses[0].host, "::1");
assert_eq!(inner.addresses[0].port, 8080);
}
#[test]
fn parse_endpoint_url_failed() {
let err = Endpoints::from_url("").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "endpoint url is empty");
let err = Endpoints::from_url("localhost:<port>").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "port <port> in endpoint url is invalid");
let err = Endpoints::from_url("localhost").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "port in endpoint url is missing");
let err = Endpoints::from_url("127.0.0.1:8080,::1:8080")
.err()
.unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "multiple addresses not in the same schema");
let err = Endpoints::from_url("::1:8080,127.0.0.1:8080")
.err()
.unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "multiple addresses not in the same schema");
let err = Endpoints::from_url("localhost:8080,localhost:8081")
.err()
.unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(
err.message,
"multiple addresses not allowed in domain schema"
);
}
#[test]
fn parse_pb_endpoints() {
let pb_endpoints = pb::Endpoints {
scheme: AddressScheme::IPv4 as i32,
addresses: vec![
Address {
host: "localhost".to_string(),
port: 8080,
},
Address {
host: "127.0.0.1".to_string(),
port: 8081,
},
],
};
let endpoints = Endpoints::from_pb_endpoints(pb_endpoints);
assert_eq!(endpoints.endpoint_url(), "localhost:8080,127.0.0.1:8081");
assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
assert_eq!(endpoints.into_inner().addresses.len(), 2);
}
}