blob: 2082c2896d24f5a911ae25cd3ef4c14114d044fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::IpAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::oneshot;
use crate::error::{ClientError, ErrorKind};
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};
#[derive(Debug)]
pub struct Route {
pub(crate) index: AtomicUsize,
pub queue: Vec<MessageQueue>,
}
#[derive(Debug)]
pub(crate) enum RouteStatus {
Querying(Option<Vec<oneshot::Sender<Result<Arc<Route>, ClientError>>>>),
Found(Arc<Route>),
}
#[derive(Debug)]
pub(crate) struct Endpoints {
endpoint_url: String,
scheme: AddressScheme,
inner: pb::Endpoints,
}
impl Endpoints {
const OPERATION_PARSE: &'static str = "endpoint.parse";
const ENDPOINT_SEPARATOR: &'static str = ",";
const ADDRESS_SEPARATOR: &'static str = ":";
pub fn from_url(endpoint_url: &str) -> Result<Self, ClientError> {
if endpoint_url.is_empty() {
return Err(ClientError::new(
ErrorKind::Config,
"Endpoint url is empty.",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
let mut scheme = AddressScheme::DomainName;
let mut urls = Vec::new();
for url in endpoint_url.split(Self::ENDPOINT_SEPARATOR) {
if let Some((host, port)) = url.rsplit_once(Self::ADDRESS_SEPARATOR) {
let port_i32 = port.parse::<i32>().map_err(|e| {
ClientError::new(
ErrorKind::Config,
&format!("Port {} in endpoint url is invalid.", port),
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url)
.set_source(e)
})?;
urls.push((host.to_string(), port_i32));
} else {
return Err(ClientError::new(
ErrorKind::Config,
"Port in endpoint url is missing.",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
}
let mut addresses = Vec::new();
let urls_len = urls.len();
for (host, port) in urls.into_iter() {
match host.parse::<IpAddr>() {
Ok(ip_addr) => {
match ip_addr {
IpAddr::V4(_) => {
if scheme == AddressScheme::IPv6 {
return Err(ClientError::new(
ErrorKind::Config,
"Multiple addresses not in the same schema.",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
scheme = AddressScheme::IPv4
}
IpAddr::V6(_) => {
if scheme == AddressScheme::IPv4 {
return Err(ClientError::new(
ErrorKind::Config,
"Multiple addresses not in the same schema.",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
scheme = AddressScheme::IPv6
}
}
addresses.push(Address { host, port });
}
Err(_) => {
if urls_len > 1 {
return Err(ClientError::new(
ErrorKind::Config,
"Multiple addresses not allowed in domain schema.",
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url));
}
scheme = AddressScheme::DomainName;
addresses.push(Address { host, port });
}
}
}
Ok(Endpoints {
endpoint_url: endpoint_url.to_string(),
scheme,
inner: pb::Endpoints {
scheme: scheme as i32,
addresses,
},
})
}
pub(crate) fn from_pb_endpoints(endpoints: pb::Endpoints) -> Self {
let mut addresses = Vec::new();
for address in endpoints.addresses.iter() {
addresses.push(format!("{}:{}", address.host, address.port));
}
Endpoints {
endpoint_url: addresses.join(Self::ENDPOINT_SEPARATOR),
scheme: endpoints.scheme(),
inner: endpoints,
}
}
pub fn endpoint_url(&self) -> &str {
&self.endpoint_url
}
pub fn scheme(&self) -> AddressScheme {
self.scheme
}
pub fn inner(&self) -> &pb::Endpoints {
&self.inner
}
pub fn into_inner(self) -> pb::Endpoints {
self.inner
}
}
#[cfg(test)]
mod tests {
use crate::error::ErrorKind;
use crate::model::common::Endpoints;
use crate::pb;
use crate::pb::{Address, AddressScheme};
#[test]
fn parse_domain_endpoint_url() {
let endpoints = Endpoints::from_url("localhost:8080").unwrap();
assert_eq!(endpoints.endpoint_url(), "localhost:8080");
assert_eq!(endpoints.scheme(), AddressScheme::DomainName);
let inner = endpoints.into_inner();
assert_eq!(inner.addresses.len(), 1);
assert_eq!(inner.addresses[0].host, "localhost");
assert_eq!(inner.addresses[0].port, 8080);
}
#[test]
fn parse_ipv4_endpoint_url() {
let endpoints = Endpoints::from_url("127.0.0.1:8080").unwrap();
assert_eq!(endpoints.endpoint_url(), "127.0.0.1:8080");
assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
let inner = endpoints.into_inner();
assert_eq!(inner.addresses.len(), 1);
assert_eq!(inner.addresses[0].host, "127.0.0.1");
assert_eq!(inner.addresses[0].port, 8080);
}
#[test]
fn parse_ipv6_endpoint_url() {
let endpoints = Endpoints::from_url("::1:8080").unwrap();
assert_eq!(endpoints.endpoint_url(), "::1:8080");
assert_eq!(endpoints.scheme(), AddressScheme::IPv6);
let inner = endpoints.into_inner();
assert_eq!(inner.addresses.len(), 1);
assert_eq!(inner.addresses[0].host, "::1");
assert_eq!(inner.addresses[0].port, 8080);
}
#[test]
fn parse_endpoint_url_failed() {
let err = Endpoints::from_url("").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "Endpoint url is empty.");
let err = Endpoints::from_url("localhost:<port>").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "Port <port> in endpoint url is invalid.");
let err = Endpoints::from_url("localhost").err().unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "Port in endpoint url is missing.");
let err = Endpoints::from_url("127.0.0.1:8080,::1:8080")
.err()
.unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "Multiple addresses not in the same schema.");
let err = Endpoints::from_url("::1:8080,127.0.0.1:8080")
.err()
.unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(err.message, "Multiple addresses not in the same schema.");
let err = Endpoints::from_url("localhost:8080,localhost:8081")
.err()
.unwrap();
assert_eq!(err.kind, ErrorKind::Config);
assert_eq!(err.operation, "endpoint.parse");
assert_eq!(
err.message,
"Multiple addresses not allowed in domain schema."
);
}
#[test]
fn parse_pb_endpoints() {
let pb_endpoints = pb::Endpoints {
scheme: AddressScheme::IPv4 as i32,
addresses: vec![
Address {
host: "localhost".to_string(),
port: 8080,
},
Address {
host: "127.0.0.1".to_string(),
port: 8081,
},
],
};
let endpoints = Endpoints::from_pb_endpoints(pb_endpoints);
assert_eq!(endpoints.endpoint_url(), "localhost:8080,127.0.0.1:8081");
assert_eq!(endpoints.scheme(), AddressScheme::IPv4);
assert_eq!(endpoints.into_inner().addresses.len(), 2);
}
}