Feat: cluster and extension (#185)
* feat(dubbo): add unix feature
* Rft: replace feature with target_os cfg
* Rft(dubbo): add ClientBuilder for client
* Rftï(dubbo-build): add build api for client
* style(examples): cargo fmt
* Rft: move connection from client to transport mod
* Rft(dubbo): add default timeout for client
* Ftr: add serverBuilder for Server, support multiple ways to start server
* Rft(examples): update yaml
* refactor(dubbo): update invoker trait
* refactor(cluster): add Cluster MockImpl
* refactor(triple): use ClientBuilder to init Cluster ability
* Update builder.rs
update default direct value
* Update triple.rs
handle unused var
* Update mod.rs
comment some codes
* refactor(triple): rm unused var in clientBuilder
* refactor(dubbo): delete some codes
* refactor(cluster): rm some duplicate codes
* refactor(registry): rm unused import
* refactor(triple): use two build func for different usage
* style: cargo fmt --all
* refactor(cluster): rm registryWrapper
* refactor(cluster): delete print
* chore(dubbo): upgrade hyper version in cargo.toml
* refactor(cluster): comment the logic of clone body
* Rft(triple): remove Clone of Invoker
* Rft(cluster): use ready_cache to manage Invokers, add ready_cache in FailoverCluster
* Rft(protocol): use interface Inheritance to redesign Invoker
* Feat(cluster): Cluster Policy Impl (#146)
* refactor(cluster): comment the logic of clone body
* Rft(triple): remove Clone of Invoker
* Rft(cluster): use ready_cache to manage Invokers, add ready_cache in FailoverCluster
* Rft(protocol): use interface Inheritance to redesign Invoker
---------
Co-authored-by: G-XD <38717659+G-XD@users.noreply.github.com>
Co-authored-by: GXD <gexiangdong@highlight.mobi>
* chore(github): rm workflow_dispatch in workflow (#149)
* feat: Add Router Module(#144) (#153)
add condition router,
add tag router,
use nacos as router config center
* Ftr: failover cluster (#156)
* Ftr: add ServiceNameDirectory (#157)
* Ftr: failover cluster
* Ftr: add ServiceNameDirectory
* Feat/cluster Optimized the Router module (#160)
* perf: Optimized the logic of the routing module.
Refactored route logic decision-making, eliminating unnecessary cloning
and improving performance.
* perf: Optimized the logic of the routing module.
Refactored route logic decision-making, eliminating unnecessary cloning
and improving performance.
* perf: Removed unnecessary configurations.
* perf: Removed unnecessary configurations.
* perf: Optimized the Router module
Optimized the Router module
Added Router Chain to MockDirectory
* Refactor: refactor Cluster component (#165)
* Refactor: refactor Cluster component
- add p2c loadbalance component
- add simple router component
- add replay body component
- add failover component
- add service directory compoent
* Enhance: add cache for routers
* Tst: local test passed (#166)
* Tst: local test passed
* Enhance: remove unnecessary key
* Enhance: add BUFFER SIZE const variable
* style(dubbo): cargo fmt --all
* style(dubbo): cargo fix --lib -p dubbo --allow-dirty
* chore(github): update branch in pr
* Mod: format code and fix some warnings (#167)
* style(dubbo): cargo fmt --all
* style(dubbo): cargo fix --lib -p dubbo --allow-dirty
* chore(github): update branch in pr
* Rft: adapt nacos registry and zookeeper registry (#169)
* Rft: adapt nacos registry and zookeeper registry
Close #168
* Rft: adapt static registry
* Rft: cargo fmt
* Ftr: add extension module (#181)
* Ftr: add extension module
- adapt static registry by extension
- adapt nacos registry by extension
link #180
* cargo fmt all
* fix ci error
* fix nacos image version error
* Rft: re-design extension register
* Fix: cargo fix
* Fix: add some license for every files
- extract UrlParam to single file
- fix github ci error
* Fix: fmt all
* Fix: Add license for extension_param.rs and registry_param.rs
* Fix: rename query_param_by_kv method name
* Fix: get stuck when load extension in the concurrency environment (#184)
* Fix: get stuck when load extension in the concurrency environment
- Add a new struct called LoadExtensionPromise
- Remove async modifier in ExtensionDirectory
Close #183
* Ftr: use RwLock instead of unsafe
* Rft: simplify the code of extension promise resolve
* refeat(extensions): add sync for Registry trait
* chore: cargo fmt
* chore: cargo fmt
* chore: cargo fmt
* chore: cargo fmt
---------
Co-authored-by: G-XD <38717659+G-XD@users.noreply.github.com>
Co-authored-by: GXD <gexiangdong@highlight.mobi>
Co-authored-by: Urara <95117705+AdachiAndShimamura@users.noreply.github.com>
Co-authored-by: 毛文超 <admin@onew.me>
diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml
index b02b3bc..8f5e9aa 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -6,9 +6,10 @@
push:
branches: ["*"]
pull_request:
- branches: ["*"]
+ branches:
+ - '*'
+ - 'refact/*'
- workflow_dispatch:
jobs:
check:
@@ -59,6 +60,13 @@
- 2181:2181
env:
ZOO_MY_ID: 1
+ nacos:
+ image: nacos/nacos-server:v2.3.1
+ ports:
+ - 8848:8848
+ - 9848:9848
+ env:
+ MODE: standalone
steps:
- uses: actions/checkout@main
- uses: actions-rs/toolchain@v1
diff --git a/Cargo.toml b/Cargo.toml
index ad39f08..4f96717 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,4 +1,5 @@
[workspace]
+resolver = "2"
members = [
"common/logger",
"common/utils",
@@ -59,5 +60,7 @@
bytes = "1.0"
prost-serde = "0.3.0"
prost-serde-derive = "0.1.2"
+url = "2.5.0"
+
diff --git a/application.yaml b/application.yaml
index d357db1..bec29a6 100644
--- a/application.yaml
+++ b/application.yaml
@@ -21,4 +21,9 @@
references:
GreeterClientImpl:
url: tri://localhost:20000
- protocol: tri
\ No newline at end of file
+ protocol: tri
+ routers:
+ consumer:
+ - service: "org.apache.dubbo.sample.tri.Greeter"
+ url: tri://127.0.0.1:20000
+ protocol: triple
\ No newline at end of file
diff --git a/common/base/Cargo.toml b/common/base/Cargo.toml
index 40579e0..678af77 100644
--- a/common/base/Cargo.toml
+++ b/common/base/Cargo.toml
@@ -6,6 +6,5 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-urlencoding.workspace = true
-http = "0.2"
-dubbo-logger.workspace = true
\ No newline at end of file
+dubbo-logger.workspace = true
+url.workspace = true
\ No newline at end of file
diff --git a/common/base/src/extension_param.rs b/common/base/src/extension_param.rs
new file mode 100644
index 0000000..93e0a16
--- /dev/null
+++ b/common/base/src/extension_param.rs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::{url::UrlParam, StdError};
+use std::{borrow::Cow, convert::Infallible, str::FromStr};
+
+pub struct ExtensionName(String);
+
+impl ExtensionName {
+ pub fn new(name: String) -> Self {
+ ExtensionName(name)
+ }
+}
+
+impl UrlParam for ExtensionName {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "extension-name"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for ExtensionName {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(ExtensionName::new(s.to_string()))
+ }
+}
+
+pub enum ExtensionType {
+ Registry,
+}
+
+impl UrlParam for ExtensionType {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "extension-type"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ match self {
+ ExtensionType::Registry => "registry".to_owned(),
+ }
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ match self {
+ ExtensionType::Registry => Cow::Borrowed("registry"),
+ }
+ }
+}
+
+impl FromStr for ExtensionType {
+ type Err = Infallible;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "registry" => Ok(ExtensionType::Registry),
+ _ => panic!("the extension type enum is not in range"),
+ }
+ }
+}
diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs
index b97b342..dcc9256 100644
--- a/common/base/src/lib.rs
+++ b/common/base/src/lib.rs
@@ -19,8 +19,12 @@
allow(dead_code, unused_imports, unused_variables, unused_mut)
)]
pub mod constants;
+pub mod extension_param;
pub mod node;
+pub mod registry_param;
pub mod url;
pub use node::Node;
pub use url::Url;
+
+pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
diff --git a/common/base/src/registry_param.rs b/common/base/src/registry_param.rs
new file mode 100644
index 0000000..8aa7c07
--- /dev/null
+++ b/common/base/src/registry_param.rs
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::{url::UrlParam, StdError, Url};
+use std::{borrow::Cow, convert::Infallible, str::FromStr};
+
+pub struct RegistryUrl(Url);
+
+impl RegistryUrl {
+ pub fn new(url: Url) -> Self {
+ Self(url)
+ }
+}
+
+impl UrlParam for RegistryUrl {
+ type TargetType = Url;
+
+ fn name() -> &'static str {
+ "registry"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for RegistryUrl {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.parse()?))
+ }
+}
+
+pub struct ServiceNamespace(String);
+
+impl ServiceNamespace {
+ pub fn new(namespace: String) -> Self {
+ Self(namespace)
+ }
+}
+
+impl UrlParam for ServiceNamespace {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "namespace"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for ServiceNamespace {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for ServiceNamespace {
+ fn default() -> Self {
+ Self("public".to_string())
+ }
+}
+
+pub struct AppName(String);
+
+impl AppName {
+ pub fn new(app_name: String) -> Self {
+ Self(app_name)
+ }
+}
+
+impl UrlParam for AppName {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "app_name"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for AppName {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for AppName {
+ fn default() -> Self {
+ Self("UnknownApp".to_string())
+ }
+}
+
+pub struct InterfaceName(String);
+
+impl InterfaceName {
+ pub fn new(interface_name: String) -> Self {
+ Self(interface_name)
+ }
+}
+
+impl UrlParam for InterfaceName {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "interface"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for InterfaceName {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for InterfaceName {
+ fn default() -> Self {
+ Self("".to_string())
+ }
+}
+
+pub struct Category(String);
+
+impl Category {
+ pub fn new(category: String) -> Self {
+ Self(category)
+ }
+}
+
+impl UrlParam for Category {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "category"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for Category {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for Category {
+ fn default() -> Self {
+ Self("".to_string())
+ }
+}
+
+pub struct Version(String);
+
+impl Version {
+ pub fn new(version: String) -> Self {
+ Self(version)
+ }
+}
+
+impl UrlParam for Version {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "version"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for Version {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for Version {
+ fn default() -> Self {
+ Self("".to_string())
+ }
+}
+
+pub struct Group(String);
+
+impl Group {
+ pub fn new(group: String) -> Self {
+ Self(group)
+ }
+}
+
+impl UrlParam for Group {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "group"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for Group {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for Group {
+ fn default() -> Self {
+ Self("".to_string())
+ }
+}
+
+pub enum Side {
+ Provider,
+ Consumer,
+}
+
+impl UrlParam for Side {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "side"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ match self {
+ Side::Consumer => "consumer".to_owned(),
+ Side::Provider => "provider".to_owned(),
+ }
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ match self {
+ Side::Consumer => Cow::Borrowed("consumer"),
+ Side::Provider => Cow::Borrowed("provider"),
+ }
+ }
+}
+
+impl FromStr for Side {
+ type Err = Infallible;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "consumer" => Ok(Side::Consumer),
+ "provider" => Ok(Side::Provider),
+ _ => Ok(Side::Consumer),
+ }
+ }
+}
+
+impl Default for Side {
+ fn default() -> Self {
+ Side::Consumer
+ }
+}
+
+pub struct StaticInvokerUrls(String);
+
+impl UrlParam for StaticInvokerUrls {
+ type TargetType = Vec<Url>;
+
+ fn name() -> &'static str {
+ "static-invoker-urls"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.split(",").map(|url| url.parse().unwrap()).collect()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ Cow::Borrowed(&self.0)
+ }
+}
+
+impl FromStr for StaticInvokerUrls {
+ type Err = Infallible;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for StaticInvokerUrls {
+ fn default() -> Self {
+ Self(String::default())
+ }
+}
diff --git a/common/base/src/url.rs b/common/base/src/url.rs
index 82b026f..ac97f26 100644
--- a/common/base/src/url.rs
+++ b/common/base/src/url.rs
@@ -16,237 +16,162 @@
*/
use std::{
+ borrow::Cow,
collections::HashMap,
- fmt::{Display, Formatter},
+ fmt::{Debug, Display, Formatter},
+ str::FromStr,
};
-use crate::constants::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
-use http::Uri;
-
-#[derive(Debug, Clone, Default, PartialEq)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Url {
- pub raw_url_string: String,
- // value of scheme is different to base name, eg. triple -> tri://
- pub scheme: String,
- pub location: String,
- pub ip: String,
- pub port: String,
- // serviceKey format in dubbo java and go '{group}/{interfaceName}:{version}'
- pub service_key: String,
- // same to interfaceName
- pub service_name: String,
- pub params: HashMap<String, String>,
+ inner: url::Url,
}
impl Url {
- pub fn new() -> Self {
- Default::default()
+ pub fn empty() -> Self {
+ "empty://localhost".parse().unwrap()
}
- pub fn from_url(url: &str) -> Option<Self> {
- // url: triple://127.0.0.1:8888/helloworld.Greeter
- let uri = url
- .parse::<http::Uri>()
- .map_err(|err| {
- dubbo_logger::tracing::error!("fail to parse url({}), err: {:?}", url, err);
- })
- .unwrap();
- let query = uri.path_and_query().unwrap().query();
- let mut url_inst = Self {
- raw_url_string: url.to_string(),
- scheme: uri.scheme_str()?.to_string(),
- ip: uri.authority()?.host().to_string(),
- port: uri.authority()?.port()?.to_string(),
- location: uri.authority()?.to_string(),
- service_key: uri.path().trim_start_matches('/').to_string(),
- service_name: uri.path().trim_start_matches('/').to_string(),
- params: if let Some(..) = query {
- Url::decode(query.unwrap())
- } else {
- HashMap::new()
- },
- };
- url_inst.renew_raw_url_string();
- Some(url_inst)
+ pub fn protocol(&self) -> &str {
+ self.inner.scheme()
}
- pub fn get_service_key(&self) -> String {
- self.service_key.clone()
+ pub fn host(&self) -> Option<&str> {
+ self.inner.host_str()
}
- pub fn get_service_name(&self) -> String {
- self.service_name.clone()
+ pub fn authority(&self) -> &str {
+ self.inner.authority()
}
- pub fn get_param(&self, key: &str) -> Option<String> {
- self.params.get(key).cloned()
+ pub fn username(&self) -> &str {
+ self.inner.username()
}
- fn encode_param(&self) -> String {
- let mut params_vec: Vec<String> = Vec::new();
- for (k, v) in self.params.iter() {
- // let tmp = format!("{}={}", k, v);
- params_vec.push(format!("{}={}", k, v));
- }
- if params_vec.is_empty() {
- "".to_string()
- } else {
- format!("?{}", params_vec.join("&"))
- }
+ pub fn password(&self) -> Option<&str> {
+ self.inner.password()
}
- pub fn params_count(&self) -> usize {
- self.params.len()
+ pub fn port(&self) -> Option<u16> {
+ self.inner.port_or_known_default()
}
- fn decode(raw_query_string: &str) -> HashMap<String, String> {
- let mut params = HashMap::new();
- let p: Vec<String> = raw_query_string
- .split('&')
- .map(|v| v.trim().to_string())
- .collect();
- for v in p.iter() {
- let values: Vec<String> = v.split('=').map(|v| v.trim().to_string()).collect();
- if values.len() != 2 {
- continue;
- }
- params.insert(values[0].clone(), values[1].clone());
- }
- params
+ pub fn path(&self) -> &str {
+ self.inner.path()
}
- pub fn set_param(&mut self, key: &str, value: &str) {
- self.params.insert(key.to_string(), value.to_string());
- self.renew_raw_url_string();
+ pub fn query<T: UrlParam>(&self) -> Option<T> {
+ self.inner
+ .query_pairs()
+ .find(|(k, _)| k == T::name())
+ .map(|(_, v)| T::from_str(&v).ok())
+ .flatten()
}
- pub fn raw_url_string(&self) -> String {
- self.raw_url_string.clone()
+ pub fn query_param_by_key(&self, key: &str) -> Option<String> {
+ self.inner
+ .query_pairs()
+ .find(|(k, _)| k == key)
+ .map(|(_, v)| v.into_owned())
}
- pub fn encoded_raw_url_string(&self) -> String {
- urlencoding::encode(self.raw_url_string.as_str()).to_string()
+ pub fn all_query_params(&self) -> HashMap<String, String> {
+ self.inner
+ .query_pairs()
+ .map(|(k, v)| (k.into_owned(), v.into_owned()))
+ .collect()
}
- fn build_service_key(&self) -> String {
- format!(
- "{group}/{interfaceName}:{version}",
- group = self.get_param(GROUP_KEY).unwrap_or("default".to_string()),
- interfaceName = self.get_param(INTERFACE_KEY).unwrap_or("error".to_string()),
- version = self.get_param(VERSION_KEY).unwrap_or("1.0.0".to_string())
- )
+ pub fn set_protocol(&mut self, protocol: &str) {
+ let _ = self.inner.set_scheme(protocol);
}
- pub fn to_url(&self) -> String {
- self.raw_url_string()
+ pub fn set_host(&mut self, host: &str) {
+ let _ = self.inner.set_host(Some(host));
}
- fn renew_raw_url_string(&mut self) {
- self.raw_url_string = format!(
- "{}://{}:{}/{}{}",
- self.scheme,
- self.ip,
- self.port,
- self.service_name,
- self.encode_param()
- );
- self.service_key = self.build_service_key()
+ pub fn set_port(&mut self, port: u16) {
+ let _ = self.inner.set_port(Some(port));
}
- // short_url is used for tcp listening
- pub fn short_url(&self) -> String {
- format!(
- "{}://{}:{}/{}",
- self.scheme, self.ip, self.port, self.service_name
- )
+ pub fn set_username(&mut self, username: &str) {
+ let _ = self.inner.set_username(username);
}
- pub fn protocol(&self) -> String {
- self.scheme.clone()
+ pub fn set_password(&mut self, password: &str) {
+ let _ = self.inner.set_password(Some(password));
}
- pub fn get_ip_port(&self) -> String {
- format!("{}:{}", self.ip, self.port)
+ pub fn set_path(&mut self, path: &str) {
+ let _ = self.inner.set_path(path);
+ }
+
+ pub fn extend_pairs(&mut self, pairs: impl Iterator<Item = (String, String)>) {
+ let mut query_pairs = self.inner.query_pairs_mut();
+ query_pairs.extend_pairs(pairs);
+ }
+
+ pub fn add_query_param<T: UrlParam>(&mut self, param: T) {
+ let mut pairs = self.inner.query_pairs_mut();
+ pairs.append_pair(T::name(), ¶m.as_str());
+ }
+
+ pub fn remove_query_param<T: UrlParam>(&mut self) {
+ let query = self.inner.query_pairs().filter(|(k, v)| k.ne(T::name()));
+ let mut inner_url = self.inner.clone();
+ inner_url.query_pairs_mut().clear().extend_pairs(query);
+ self.inner = inner_url;
+ }
+
+ pub fn remove_all_param(&mut self) {
+ self.inner.query_pairs_mut().clear();
+ }
+
+ pub fn as_str(&self) -> &str {
+ self.inner.as_str()
+ }
+
+ pub fn short_url_without_query(&self) -> String {
+ let mut url = self.inner.clone();
+ url.set_query(Some(""));
+ url.into()
+ }
+}
+
+impl FromStr for Url {
+ type Err = url::ParseError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Url {
+ inner: url::Url::parse(s)?,
+ })
}
}
impl Display for Url {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.write_str(self.raw_url_string().as_str())
+ std::fmt::Display::fmt(&self.inner, f)
}
}
-impl Into<Uri> for Url {
- fn into(self) -> Uri {
- self.raw_url_string.parse::<Uri>().unwrap()
+impl Debug for Url {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ std::fmt::Debug::fmt(&self.inner, f)
}
}
-impl From<&str> for Url {
- fn from(url: &str) -> Self {
- Url::from_url(url).unwrap()
+impl From<Url> for String {
+ fn from(url: Url) -> Self {
+ url.inner.into()
}
}
-#[cfg(test)]
-mod tests {
- use crate::{
- constants::{ANYHOST_KEY, VERSION_KEY},
- url::Url,
- };
+pub trait UrlParam: FromStr {
+ type TargetType;
- #[test]
- fn test_from_url() {
- let mut u1 = Url::from_url("tri://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&\
- application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&\
- environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&\
- module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&\
- side=provider&timeout=3000×tamp=1556509797245&version=1.0.0&application=test");
- assert_eq!(
- u1.as_ref().unwrap().service_key,
- "default/com.ikurento.user.UserProvider:1.0.0"
- );
- assert_eq!(
- u1.as_ref()
- .unwrap()
- .get_param(ANYHOST_KEY)
- .unwrap()
- .as_str(),
- "true"
- );
- assert_eq!(
- u1.as_ref()
- .unwrap()
- .get_param("default.timeout")
- .unwrap()
- .as_str(),
- "10000"
- );
- assert_eq!(u1.as_ref().unwrap().scheme, "tri");
- assert_eq!(u1.as_ref().unwrap().ip, "127.0.0.1");
- assert_eq!(u1.as_ref().unwrap().port, "20000");
- assert_eq!(u1.as_ref().unwrap().params_count(), 18);
- u1.as_mut().unwrap().set_param("key1", "value1");
- assert_eq!(
- u1.as_ref().unwrap().get_param("key1").unwrap().as_str(),
- "value1"
- );
- assert_eq!(
- u1.as_ref()
- .unwrap()
- .get_param(VERSION_KEY)
- .unwrap()
- .as_str(),
- "1.0.0"
- );
- }
+ fn name() -> &'static str;
- #[test]
- fn test2() {
- let url: Url = "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter".into();
- assert_eq!(
- url.raw_url_string(),
- "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter"
- )
- }
+ fn value(&self) -> Self::TargetType;
+
+ fn as_str(&self) -> Cow<str>;
}
diff --git a/config/src/config.rs b/config/src/config.rs
index 646873d..cf4e441 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -17,7 +17,7 @@
use std::{collections::HashMap, env, path::PathBuf};
-use crate::{protocol::Protocol, registry::RegistryConfig};
+use crate::{protocol::Protocol, registry::RegistryConfig, router::RouterConfig};
use dubbo_logger::tracing;
use dubbo_utils::yaml_util::yaml_file_parser;
use once_cell::sync::OnceCell;
@@ -45,6 +45,9 @@
pub registries: HashMap<String, RegistryConfig>,
#[serde(default)]
+ pub routers: RouterConfig,
+
+ #[serde(default)]
pub data: HashMap<String, String>,
}
@@ -63,6 +66,7 @@
protocols: HashMap::new(),
registries: HashMap::new(),
provider: ProviderConfig::new(),
+ routers: RouterConfig::default(),
data: HashMap::new(),
}
}
diff --git a/config/src/lib.rs b/config/src/lib.rs
index 0748c66..6fa3880 100644
--- a/config/src/lib.rs
+++ b/config/src/lib.rs
@@ -21,4 +21,5 @@
pub mod protocol;
pub mod provider;
pub mod registry;
+pub mod router;
pub mod service;
diff --git a/config/src/router.rs b/config/src/router.rs
new file mode 100644
index 0000000..7976f6e
--- /dev/null
+++ b/config/src/router.rs
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
+pub struct ConditionRouterConfig {
+ #[serde(rename = "configVersion")]
+ pub config_version: String,
+ pub scope: String,
+ pub force: bool,
+ pub enabled: bool,
+ pub key: String,
+ pub conditions: Vec<String>,
+}
+
+#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
+pub struct TagRouterConfig {
+ #[serde(rename = "configVersion")]
+ pub config_version: String,
+ pub force: bool,
+ pub enabled: bool,
+ pub key: String,
+ pub tags: Vec<Tag>,
+}
+
+#[derive(Serialize, Deserialize, Clone, PartialEq, Default, Debug)]
+pub struct ConsumerConfig {
+ pub service: String,
+ pub url: String,
+ pub protocol: String,
+}
+
+#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
+pub struct Tag {
+ pub name: String,
+ #[serde(rename = "match")]
+ pub matches: Vec<TagMatchRule>,
+}
+
+#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
+pub struct TagMatchRule {
+ pub key: String,
+ pub value: String,
+}
+
+impl ConditionRouterConfig {
+ pub fn new(config: &String) -> Self {
+ serde_yaml::from_str(config).expect("parse error")
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)]
+pub struct EnableAuth {
+ pub auth_username: String,
+ pub auth_password: String,
+}
+
+#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)]
+pub struct NacosConfig {
+ pub addr: String,
+ pub namespace: String,
+ pub app: String,
+ pub enable_auth: Option<EnableAuth>,
+ pub enable_auth_plugin_http: Option<bool>,
+}
+
+#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
+pub struct RouterConfig {
+ pub consumer: Option<Vec<ConsumerConfig>>,
+ pub nacos: Option<NacosConfig>,
+ pub conditions: Option<Vec<ConditionRouterConfig>>,
+ pub tags: Option<TagRouterConfig>,
+}
diff --git a/config/src/service.rs b/config/src/service.rs
index 1f85a92..8a1f191 100644
--- a/config/src/service.rs
+++ b/config/src/service.rs
@@ -41,16 +41,4 @@
pub fn protocol(self, protocol: String) -> Self {
Self { protocol, ..self }
}
-
- // pub fn get_url(&self) -> Vec<Url> {
- // let mut urls = Vec::new();
- // for (_, conf) in self.protocol_configs.iter() {
- // urls.push(Url {
- // url: conf.to_owned().to_url(),
- // service_key: "".to_string(),
- // });
- // }
-
- // urls
- // }
}
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index af32b64..a3290dd 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -63,7 +63,7 @@
#service_doc
#(#struct_attributes)*
- #[derive(Debug, Clone, Default)]
+ #[derive(Clone)]
pub struct #service_ident {
inner: TripleClient,
}
@@ -76,12 +76,6 @@
}
}
- // pub fn build(builder: ClientBuilder) -> Self {
- // Self {
- // inner: TripleClient::new(builder),
- // }
- // }
-
pub fn new(builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::new(builder),
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 8c2821e..dbbb96a 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -14,7 +14,7 @@
http = "0.2"
tower-service.workspace = true
http-body = "0.4.4"
-tower = { workspace = true, features = ["timeout"] }
+tower = { workspace = true, features = ["timeout", "ready-cache","discover","retry"] }
futures-util = "0.3.23"
futures-core ="0.3.23"
argh = "0.1"
@@ -24,6 +24,8 @@
tokio-rustls="0.24.1"
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal", "full" ] }
prost = "0.11.9"
+tokio-util = "0.7.9"
+tokio-stream = "0.1"
async-trait = "0.1.56"
tower-layer.workspace = true
bytes.workspace = true
@@ -42,8 +44,13 @@
lazy_static.workspace = true
dubbo-base.workspace = true
dubbo-logger.workspace = true
+once_cell.workspace = true
dubbo-config = { path = "../config", version = "0.3.0" }
#对象存储
state = { version = "0.5", features = ["tls"] }
+thiserror = "1.0.48"
+regex = "1.9.1"
+nacos-sdk = { version = "0.3.0", features = ["default"] }
+serde_yaml = "0.9.22"
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
deleted file mode 100644
index 144f011..0000000
--- a/dubbo/src/cluster/directory.rs
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use std::{
- collections::HashMap,
- fmt::Debug,
- str::FromStr,
- sync::{Arc, RwLock},
-};
-
-use crate::{
- codegen::TripleInvoker,
- invocation::{Invocation, RpcInvocation},
- protocol::BoxInvoker,
- registry::{memory_registry::MemoryNotifyListener, BoxRegistry},
-};
-use dubbo_base::Url;
-use dubbo_logger::tracing;
-
-use crate::cluster::Directory;
-
-/// Directory.
-///
-/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
-
-#[derive(Debug, Clone)]
-pub struct StaticDirectory {
- uri: http::Uri,
-}
-
-impl StaticDirectory {
- pub fn new(host: &str) -> StaticDirectory {
- let uri = match http::Uri::from_str(host) {
- Ok(v) => v,
- Err(err) => {
- tracing::error!("http uri parse error: {}, host: {}", err, host);
- panic!("http uri parse error: {}, host: {}", err, host)
- }
- };
- StaticDirectory { uri: uri }
- }
-
- pub fn from_uri(uri: &http::Uri) -> StaticDirectory {
- StaticDirectory { uri: uri.clone() }
- }
-}
-
-impl Directory for StaticDirectory {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
- let url = Url::from_url(&format!(
- "{}://{}:{}/{}",
- self.uri.scheme_str().unwrap_or("tri"),
- self.uri.host().unwrap(),
- self.uri.port().unwrap(),
- invocation.get_target_service_unique_name(),
- ))
- .unwrap();
- let invoker = Box::new(TripleInvoker::new(url));
- vec![invoker]
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct RegistryDirectory {
- registry: Arc<BoxRegistry>,
- service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
-}
-
-impl RegistryDirectory {
- pub fn new(registry: BoxRegistry) -> RegistryDirectory {
- RegistryDirectory {
- registry: Arc::new(registry),
- service_instances: Arc::new(RwLock::new(HashMap::new())),
- }
- }
-}
-
-impl Directory for RegistryDirectory {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
- let service_name = invocation.get_target_service_unique_name();
-
- let url = Url::from_url(&format!(
- "triple://{}:{}/{}",
- "127.0.0.1", "8888", service_name
- ))
- .unwrap();
-
- self.registry
- .subscribe(
- url,
- Arc::new(MemoryNotifyListener {
- service_instances: Arc::clone(&self.service_instances),
- }),
- )
- .expect("subscribe");
-
- let map = self
- .service_instances
- .read()
- .expect("service_instances.read");
- let binding = Vec::new();
- let url_vec = map.get(&service_name).unwrap_or(&binding);
- // url_vec.to_vec()
- let mut invokers: Vec<BoxInvoker> = vec![];
- for item in url_vec.iter() {
- invokers.push(Box::new(TripleInvoker::new(item.clone())));
- }
- invokers
- }
-}
diff --git a/dubbo/src/cluster/failover.rs b/dubbo/src/cluster/failover.rs
new file mode 100644
index 0000000..a223ddf
--- /dev/null
+++ b/dubbo/src/cluster/failover.rs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::task::Poll;
+
+use dubbo_base::StdError;
+use futures_util::future;
+use http::Request;
+use tower::{retry::Retry, util::Oneshot, ServiceExt};
+use tower_service::Service;
+
+pub struct Failover<N> {
+ inner: N, // loadbalancer service
+}
+
+#[derive(Clone)]
+pub struct FailoverPolicy;
+
+impl<N> Failover<N> {
+ pub fn new(inner: N) -> Self {
+ Self { inner }
+ }
+}
+
+impl<B, Res, E> tower::retry::Policy<Request<B>, Res, E> for FailoverPolicy
+where
+ B: http_body::Body + Clone,
+{
+ type Future = future::Ready<Self>;
+
+ fn retry(&self, _req: &Request<B>, result: Result<&Res, &E>) -> Option<Self::Future> {
+ //TODO some error handling or logging
+ match result {
+ Ok(_) => None,
+ Err(_) => Some(future::ready(self.clone())),
+ }
+ }
+
+ fn clone_request(&self, req: &Request<B>) -> Option<Request<B>> {
+ let mut clone = http::Request::new(req.body().clone());
+ *clone.method_mut() = req.method().clone();
+ *clone.uri_mut() = req.uri().clone();
+ *clone.headers_mut() = req.headers().clone();
+ *clone.version_mut() = req.version();
+
+ Some(clone)
+ }
+}
+
+impl<N, B> Service<Request<B>> for Failover<N>
+where
+ // B is CloneBody<B>
+ B: http_body::Body + Clone,
+ // loadbalancer service
+ N: Service<Request<B>> + Clone + 'static,
+ N::Error: Into<StdError>,
+ N::Future: Send,
+{
+ type Response = N::Response;
+
+ type Error = N::Error;
+
+ type Future = Oneshot<Retry<FailoverPolicy, N>, Request<B>>;
+
+ fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx)
+ }
+
+ fn call(&mut self, req: Request<B>) -> Self::Future {
+ let retry = Retry::new(FailoverPolicy, self.inner.clone());
+ retry.oneshot(req)
+ }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/random.rs b/dubbo/src/cluster/loadbalance/impls/random.rs
deleted file mode 100644
index 3e1cf65..0000000
--- a/dubbo/src/cluster/loadbalance/impls/random.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use dubbo_base::Url;
-use std::{
- fmt::{Debug, Formatter},
- sync::Arc,
-};
-
-use crate::{
- cluster::loadbalance::types::{LoadBalance, Metadata},
- codegen::RpcInvocation,
-};
-
-pub struct RandomLoadBalance {
- pub metadata: Metadata,
-}
-
-impl Default for RandomLoadBalance {
- fn default() -> Self {
- RandomLoadBalance {
- metadata: Metadata::new("random"),
- }
- }
-}
-
-impl Debug for RandomLoadBalance {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- write!(f, "RandomLoadBalance")
- }
-}
-
-impl LoadBalance for RandomLoadBalance {
- fn select(
- &self,
- invokers: Arc<Vec<Url>>,
- _url: Option<Url>,
- _invocation: Arc<RpcInvocation>,
- ) -> Option<Url> {
- if invokers.is_empty() {
- return None;
- }
- let index = rand::random::<usize>() % invokers.len();
- Some(invokers[index].clone())
- }
-}
diff --git a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
deleted file mode 100644
index 5fd0ed4..0000000
--- a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use dubbo_base::Url;
-use std::{
- collections::HashMap,
- fmt::{Debug, Formatter},
- sync::{
- atomic::{AtomicUsize, Ordering},
- Arc, RwLock,
- },
-};
-
-use crate::{
- cluster::loadbalance::types::{LoadBalance, Metadata},
- codegen::RpcInvocation,
-};
-
-pub struct RoundRobinLoadBalance {
- pub metadata: Metadata,
- pub counter_map: RwLock<HashMap<String, AtomicUsize>>,
-}
-
-impl Default for RoundRobinLoadBalance {
- fn default() -> Self {
- RoundRobinLoadBalance {
- metadata: Metadata::new("roundrobin"),
- counter_map: RwLock::new(HashMap::new()),
- }
- }
-}
-
-impl Debug for RoundRobinLoadBalance {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- write!(f, "RoundRobinLoadBalance")
- }
-}
-
-impl RoundRobinLoadBalance {
- fn guarantee_counter_key(&self, key: &str) {
- let contained = self.counter_map.try_read().unwrap().contains_key(key);
- if !contained {
- self.counter_map
- .try_write()
- .unwrap()
- .insert(key.to_string(), AtomicUsize::new(0));
- }
- }
-}
-
-impl LoadBalance for RoundRobinLoadBalance {
- fn select(
- &self,
- invokers: Arc<Vec<Url>>,
- _url: Option<Url>,
- invocation: Arc<RpcInvocation>,
- ) -> Option<Url> {
- if invokers.is_empty() {
- return None;
- }
- let fingerprint = invocation.unique_fingerprint();
- self.guarantee_counter_key(fingerprint.as_str());
- let index = self
- .counter_map
- .try_read()
- .unwrap()
- .get(fingerprint.as_str())?
- .fetch_add(1, Ordering::SeqCst)
- % invokers.len();
- Some(invokers[index].clone())
- }
-}
diff --git a/dubbo/src/cluster/loadbalance/mod.rs b/dubbo/src/cluster/loadbalance/mod.rs
deleted file mode 100644
index 1d04f7f..0000000
--- a/dubbo/src/cluster/loadbalance/mod.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use std::collections::HashMap;
-
-use lazy_static::lazy_static;
-
-use crate::cluster::loadbalance::{
- impls::{random::RandomLoadBalance, roundrobin::RoundRobinLoadBalance},
- types::BoxLoadBalance,
-};
-
-pub mod impls;
-pub mod types;
-
-lazy_static! {
- pub static ref LOAD_BALANCE_EXTENSIONS: HashMap<String, BoxLoadBalance> =
- init_loadbalance_extensions();
-}
-
-fn init_loadbalance_extensions() -> HashMap<String, BoxLoadBalance> {
- let mut loadbalance_map: HashMap<String, BoxLoadBalance> = HashMap::new();
- loadbalance_map.insert("random".to_string(), Box::new(RandomLoadBalance::default()));
- loadbalance_map.insert(
- "roundrobin".to_string(),
- Box::new(RoundRobinLoadBalance::default()),
- );
- loadbalance_map
-}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index d1f96f9..1a20c16 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,165 +15,71 @@
* limitations under the License.
*/
-use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll};
-
-use aws_smithy_http::body::SdkBody;
-use dubbo_base::Url;
-use dyn_clone::DynClone;
+use http::Request;
+use tower_service::Service;
use crate::{
- empty_body,
- invocation::RpcInvocation,
- protocol::{BoxInvoker, Invoker},
+ codegen::RpcInvocation, invoker::clone_body::CloneBody, param::Param, svc::NewService,
};
-pub mod directory;
-pub mod loadbalance;
+use self::failover::Failover;
-pub trait Directory: Debug + DynClone {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
- // fn is_empty(&self) -> bool;
+mod failover;
+
+pub struct NewCluster<N> {
+ inner: N, // new loadbalancer service
}
-dyn_clone::clone_trait_object!(Directory);
-
-type BoxDirectory = Box<dyn Directory + Send + Sync>;
-
-pub trait Cluster {
- fn join(&self, dir: BoxDirectory) -> BoxInvoker;
+pub struct Cluster<S> {
+ inner: S, // failover service
}
-#[derive(Debug, Default)]
-pub struct MockCluster {}
-
-impl Cluster for MockCluster {
- fn join(&self, dir: BoxDirectory) -> BoxInvoker {
- Box::new(FailoverCluster::new(dir))
- }
-}
-#[derive(Clone, Debug)]
-pub struct FailoverCluster {
- dir: Arc<BoxDirectory>,
-}
-
-impl FailoverCluster {
- pub fn new(dir: BoxDirectory) -> FailoverCluster {
- Self { dir: Arc::new(dir) }
+impl<N> NewCluster<N> {
+ pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+ tower_layer::layer_fn(|inner: N| {
+ NewCluster {
+ inner, // new loadbalancer service
+ }
+ })
}
}
-impl Invoker<http::Request<SdkBody>> for FailoverCluster {
- type Response = http::Response<crate::BoxBody>;
+impl<S, T> NewService<T> for NewCluster<S>
+where
+ T: Param<RpcInvocation>,
+ // new loadbalancer service
+ S: NewService<T>,
+{
+ type Service = Cluster<Failover<S::Service>>;
- type Error = crate::Error;
+ fn new_service(&self, target: T) -> Self::Service {
+ Cluster {
+ inner: Failover::new(self.inner.new_service(target)),
+ }
+ }
+}
- type Future = crate::BoxFuture<Self::Response, Self::Error>;
+impl<S> Service<Request<hyper::Body>> for Cluster<S>
+where
+ S: Service<Request<CloneBody>>,
+{
+ type Response = S::Response;
+
+ type Error = S::Error;
+
+ type Future = S::Future;
fn poll_ready(
&mut self,
- _cx: &mut std::task::Context<'_>,
+ cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
- // if self.dir.is_empty() return err
- Poll::Ready(Ok(()))
+ self.inner.poll_ready(cx)
}
- fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
- // let clone_body = req.body().try_clone().unwrap();
- // let mut clone_req = http::Request::builder()
- // .uri(req.uri().clone())
- // .method(req.method().clone());
- // *clone_req.headers_mut().unwrap() = req.headers().clone();
- // let r = clone_req.body(clone_body).unwrap();
- let invokers = self.dir.list(
- RpcInvocation::default()
- .with_service_unique_name("hello".to_string())
- .into(),
- );
- for mut invoker in invokers {
- let fut = async move {
- let res = invoker.call(req).await;
- return res;
- };
- return Box::pin(fut);
- }
- Box::pin(async move {
- Ok(http::Response::builder()
- .status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
- .body(empty_body())
- .unwrap())
- })
- }
-
- fn get_url(&self) -> dubbo_base::Url {
- Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap()
- }
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct MockDirectory {
- // router_chain: RouterChain,
- invokers: Vec<BoxInvoker>,
-}
-
-impl MockDirectory {
- pub fn new(invokers: Vec<BoxInvoker>) -> MockDirectory {
- Self {
- // router_chain: RouterChain::default(),
- invokers,
- }
- }
-}
-
-impl Directory for MockDirectory {
- fn list(&self, _invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
- // tracing::info!("MockDirectory: {}", meta);
- let _u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
- // vec![Box::new(TripleInvoker::new(u))]
- // self.router_chain.route(u, invo);
- self.invokers.clone()
- }
-
- // fn is_empty(&self) -> bool {
- // false
- // }
-}
-
-#[derive(Debug, Default)]
-pub struct RouterChain {
- router: HashMap<String, BoxRouter>,
- invokers: Vec<BoxInvoker>,
-}
-
-impl RouterChain {
- pub fn route(&self, url: Url, invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
- let r = self.router.get("mock").unwrap();
- r.route(self.invokers.clone(), url, invo)
- }
-}
-
-pub trait Router: Debug {
- fn route(
- &self,
- invokers: Vec<BoxInvoker>,
- url: Url,
- invo: Arc<RpcInvocation>,
- ) -> Vec<BoxInvoker>;
-}
-
-pub type BoxRouter = Box<dyn Router + Sync + Send>;
-
-#[derive(Debug, Default)]
-pub struct MockRouter {}
-
-impl Router for MockRouter {
- fn route(
- &self,
- invokers: Vec<BoxInvoker>,
- _url: Url,
- _invo: Arc<RpcInvocation>,
- ) -> Vec<BoxInvoker> {
- invokers
+ fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
+ let (parts, body) = req.into_parts();
+ let clone_body = CloneBody::new(body);
+ let req = Request::from_parts(parts, clone_body);
+ self.inner.call(req)
}
}
diff --git a/dubbo/src/cluster/router/condition/condition_router.rs b/dubbo/src/cluster/router/condition/condition_router.rs
new file mode 100644
index 0000000..21b525a
--- /dev/null
+++ b/dubbo/src/cluster/router/condition/condition_router.rs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::{
+ cluster::router::{condition::single_router::ConditionSingleRouter, Router},
+ codegen::RpcInvocation,
+};
+use dubbo_base::Url;
+use std::{
+ fmt::Debug,
+ sync::{Arc, RwLock},
+};
+
+#[derive(Default, Debug, Clone)]
+pub struct ConditionRouter {
+ //condition router for service scope
+ pub service_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
+ //condition router for application scope
+ pub application_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
+}
+
+impl Router for ConditionRouter {
+ fn route(&self, mut invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
+ if let Some(routers) = &self.application_routers {
+ for router in &routers.read().unwrap().routers {
+ invokers = router.route(invokers, url.clone(), invo.clone());
+ }
+ }
+ if let Some(routers) = &self.service_routers {
+ for router in &routers.read().unwrap().routers {
+ invokers = router.route(invokers, url.clone(), invo.clone());
+ }
+ }
+ invokers
+ }
+}
+
+impl ConditionRouter {
+ pub fn new(
+ service_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
+ application_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
+ ) -> Self {
+ Self {
+ service_routers,
+ application_routers,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct ConditionSingleRouters {
+ pub routers: Vec<ConditionSingleRouter>,
+}
+
+impl ConditionSingleRouters {
+ pub fn new(routers: Vec<ConditionSingleRouter>) -> Self {
+ Self { routers }
+ }
+ pub fn is_null(&self) -> bool {
+ self.routers.is_empty()
+ }
+}
diff --git a/dubbo/src/cluster/router/condition/matcher.rs b/dubbo/src/cluster/router/condition/matcher.rs
new file mode 100644
index 0000000..2ee33d6
--- /dev/null
+++ b/dubbo/src/cluster/router/condition/matcher.rs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use regex::Regex;
+use std::{collections::HashSet, error::Error, option::Option};
+
+#[derive(Clone, Debug, Default)]
+pub struct ConditionMatcher {
+ _key: String,
+ matches: HashSet<String>,
+ mismatches: HashSet<String>,
+}
+
+impl ConditionMatcher {
+ pub fn new(_key: String) -> Self {
+ ConditionMatcher {
+ _key,
+ matches: HashSet::new(),
+ mismatches: HashSet::new(),
+ }
+ }
+
+ pub fn is_match(&self, value: Option<String>) -> Result<bool, Box<dyn Error>> {
+ match value {
+ None => Ok(false),
+ Some(val) => {
+ for match_ in self.matches.iter() {
+ if self.do_pattern_match(match_, &val) {
+ return Ok(true);
+ }
+ }
+ for mismatch in self.mismatches.iter() {
+ if !self.do_pattern_match(mismatch, &val) {
+ return Ok(true);
+ }
+ }
+ Ok(false)
+ }
+ }
+ }
+
+ pub fn get_matches(&mut self) -> &mut HashSet<String> {
+ &mut self.matches
+ }
+ pub fn get_mismatches(&mut self) -> &mut HashSet<String> {
+ &mut self.mismatches
+ }
+
+ fn do_pattern_match(&self, pattern: &str, value: &str) -> bool {
+ if pattern.contains('*') {
+ return star_matcher(pattern, value);
+ }
+
+ if pattern.contains('~') {
+ let parts: Vec<&str> = pattern.split('~').collect();
+
+ if parts.len() == 2 {
+ if let (Ok(left), Ok(right), Ok(val)) = (
+ parts[0].parse::<i32>(),
+ parts[1].parse::<i32>(),
+ value.parse::<i32>(),
+ ) {
+ return range_matcher(val, left, right);
+ }
+ }
+ return false;
+ }
+ pattern == value
+ }
+}
+
+pub fn star_matcher(pattern: &str, input: &str) -> bool {
+ // 将*替换为任意字符的正则表达式
+ let pattern = pattern.replace("*", ".*");
+ let regex = Regex::new(&pattern).unwrap();
+ regex.is_match(input)
+}
+
+pub fn range_matcher(val: i32, min: i32, max: i32) -> bool {
+ min <= val && val <= max
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/router/condition/mod.rs
similarity index 92%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/cluster/router/condition/mod.rs
index 5a84af8..d4a83b9 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/cluster/router/condition/mod.rs
@@ -14,5 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub mod random;
-pub mod roundrobin;
+pub mod condition_router;
+pub mod matcher;
+pub mod single_router;
diff --git a/dubbo/src/cluster/router/condition/single_router.rs b/dubbo/src/cluster/router/condition/single_router.rs
new file mode 100644
index 0000000..54c61cb
--- /dev/null
+++ b/dubbo/src/cluster/router/condition/single_router.rs
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use dubbo_base::Url;
+use dubbo_logger::tracing::info;
+use regex::Regex;
+use std::{
+ collections::HashMap,
+ sync::{Arc, RwLock},
+};
+
+use crate::{
+ cluster::router::{condition::matcher::ConditionMatcher, utils::to_original_map, Router},
+ codegen::RpcInvocation,
+ invocation::Invocation,
+};
+
+#[derive(Debug, Clone, Default)]
+pub struct ConditionSingleRouter {
+ pub name: String,
+ pub when_condition: HashMap<String, Arc<RwLock<ConditionMatcher>>>,
+ pub then_condition: HashMap<String, Arc<RwLock<ConditionMatcher>>>,
+ pub enabled: bool,
+ pub force: bool,
+}
+
+impl Router for ConditionSingleRouter {
+ fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ if !self.enabled {
+ return invokers;
+ };
+ let mut result = Vec::new();
+ if self.match_when(url.clone(), invocation.clone()) {
+ for invoker in &invokers.clone() {
+ if self.match_then(invoker.clone(), invocation.clone()) {
+ result.push(invoker.clone());
+ }
+ }
+ if result.is_empty() && self.force == false {
+ invokers
+ } else {
+ result
+ }
+ } else {
+ invokers
+ }
+ }
+}
+
+impl ConditionSingleRouter {
+ pub fn new(rule: String, force: bool, enabled: bool) -> Self {
+ let mut router = Self {
+ name: "condition".to_string(),
+ when_condition: HashMap::new(),
+ then_condition: HashMap::new(),
+ enabled,
+ force,
+ };
+ if enabled {
+ router.init(rule).expect("parse rule error");
+ }
+ router
+ }
+
+ fn init(&mut self, rule: String) -> Result<(), Box<dyn std::error::Error>> {
+ match rule.trim().is_empty() {
+ true => Err("Illegal route rule!".into()),
+ false => {
+ let r = rule.replace("consumer.", "").replace("provider.", "");
+ let i = r.find("=>").unwrap_or_else(|| r.len());
+ let when_rule = r[..i].trim().to_string();
+ let then_rule = r[(i + 2)..].trim().to_string();
+ let when = if when_rule.is_empty() || when_rule == "true" {
+ HashMap::new()
+ } else {
+ self.parse_rule(&when_rule)?
+ };
+ let then = if then_rule.is_empty() || then_rule == "false" {
+ HashMap::new()
+ } else {
+ self.parse_rule(&then_rule)?
+ };
+ self.when_condition = when;
+ self.then_condition = then;
+ Ok(())
+ }
+ }
+ }
+
+ fn parse_rule(
+ &mut self,
+ rule: &str,
+ ) -> Result<HashMap<String, Arc<RwLock<ConditionMatcher>>>, Box<dyn std::error::Error>> {
+ let mut conditions: HashMap<String, Arc<RwLock<ConditionMatcher>>> = HashMap::new();
+ let mut current_matcher: Option<Arc<RwLock<ConditionMatcher>>> = None;
+ let regex = Regex::new(r"([&!=,]*)\s*([^&!=,\s]+)").unwrap();
+ for cap in regex.captures_iter(rule) {
+ let separator = &cap[1];
+ let content = &cap[2];
+
+ match separator {
+ "" => {
+ let current_key = content.to_string();
+ current_matcher =
+ Some(Arc::new(RwLock::new(self.get_matcher(current_key.clone()))));
+ conditions.insert(
+ current_key.clone(),
+ current_matcher.as_ref().unwrap().clone(),
+ );
+ }
+ "&" => {
+ let current_key = content.to_string();
+ current_matcher = conditions.get(¤t_key).cloned();
+ if current_matcher.is_none() {
+ let matcher = Arc::new(RwLock::new(self.get_matcher(current_key.clone())));
+ conditions.insert(current_key.clone(), matcher.clone());
+ current_matcher = Some(matcher);
+ }
+ }
+ "=" => {
+ if let Some(matcher) = ¤t_matcher {
+ let mut matcher_borrowed = matcher.write().unwrap();
+ matcher_borrowed
+ .get_matches()
+ .insert(content.to_string().clone());
+ } else {
+ return Err("Error: ...".into());
+ }
+ }
+ "!=" => {
+ if let Some(matcher) = ¤t_matcher {
+ let mut matcher_borrowed = matcher.write().unwrap();
+ matcher_borrowed
+ .get_mismatches()
+ .insert(content.to_string().clone());
+ } else {
+ return Err("Error: ...".into());
+ }
+ }
+ "," => {
+ if let Some(matcher) = ¤t_matcher {
+ let mut matcher_borrowed = matcher.write().unwrap();
+ if matcher_borrowed.get_matches().is_empty()
+ && matcher_borrowed.get_mismatches().is_empty()
+ {
+ return Err("Error: ...".into());
+ }
+ drop(matcher_borrowed);
+ let mut matcher_borrowed_mut = matcher.write().unwrap();
+ matcher_borrowed_mut
+ .get_matches()
+ .insert(content.to_string().clone());
+ } else {
+ return Err("Error: ...".into());
+ }
+ }
+ _ => {
+ return Err("Error: ...".into());
+ }
+ }
+ }
+ Ok(conditions)
+ }
+
+ // pub fn is_runtime(&self) -> bool {
+ // // same as the Java version
+ // }
+
+ pub fn get_matcher(&self, key: String) -> ConditionMatcher {
+ ConditionMatcher::new(key)
+ }
+
+ pub fn match_when(&self, url: Url, invocation: Arc<RpcInvocation>) -> bool {
+ if self.when_condition.is_empty() {
+ return true;
+ }
+ self.do_match(url, &self.when_condition, invocation)
+ }
+
+ pub fn match_then(&self, url: Url, invocation: Arc<RpcInvocation>) -> bool {
+ if self.then_condition.is_empty() {
+ return false;
+ }
+ self.do_match(url, &self.then_condition, invocation)
+ }
+
+ pub fn do_match(
+ &self,
+ url: Url,
+ conditions: &HashMap<String, Arc<RwLock<ConditionMatcher>>>,
+ invocation: Arc<RpcInvocation>,
+ ) -> bool {
+ let sample: HashMap<String, String> = to_original_map(url);
+ conditions.iter().all(|(key, condition_matcher)| {
+ let matcher = condition_matcher.read().unwrap();
+ let value = get_value(key, &sample, invocation.clone());
+ match matcher.is_match(value) {
+ Ok(result) => result,
+ Err(error) => {
+ info!("Error occurred: {:?}", error);
+ false
+ }
+ }
+ })
+ }
+}
+
+fn get_value(
+ key: &String,
+ sample: &HashMap<String, String>,
+ invocation: Arc<RpcInvocation>,
+) -> Option<String> {
+ if key == "method" {
+ let method_param = invocation.get_method_name();
+ return Some(method_param);
+ }
+ sample.get(key).cloned()
+}
diff --git a/dubbo/src/cluster/router/manager/condition_manager.rs b/dubbo/src/cluster/router/manager/condition_manager.rs
new file mode 100644
index 0000000..7772950
--- /dev/null
+++ b/dubbo/src/cluster/router/manager/condition_manager.rs
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::cluster::router::condition::{
+ condition_router::{ConditionRouter, ConditionSingleRouters},
+ single_router::ConditionSingleRouter,
+};
+use dubbo_config::router::ConditionRouterConfig;
+use std::{
+ collections::HashMap,
+ sync::{Arc, RwLock},
+};
+
+#[derive(Debug, Clone, Default)]
+pub struct ConditionRouterManager {
+ //Application-level routing applies globally, while service-level routing only affects a specific service.
+ pub routers_service: HashMap<String, Arc<RwLock<ConditionSingleRouters>>>,
+ pub routers_application: Arc<RwLock<ConditionSingleRouters>>,
+}
+
+impl ConditionRouterManager {
+ pub fn get_router(&self, service_name: &String) -> Option<ConditionRouter> {
+ let routers_application_is_null = self.routers_application.read().unwrap().is_null();
+ self.routers_service
+ .get(service_name)
+ .map(|routers_service| {
+ ConditionRouter::new(
+ Some(routers_service.clone()),
+ if routers_application_is_null {
+ None
+ } else {
+ Some(self.routers_application.clone())
+ },
+ )
+ })
+ .or_else(|| {
+ if routers_application_is_null {
+ None
+ } else {
+ Some(ConditionRouter::new(
+ None,
+ Some(self.routers_application.clone()),
+ ))
+ }
+ })
+ }
+
+ pub fn update(&mut self, config: ConditionRouterConfig) {
+ let force = config.force;
+ let scope = config.scope;
+ let key = config.key;
+ let enable = config.enabled;
+
+ let routers = config
+ .conditions
+ .into_iter()
+ .map(|condition| ConditionSingleRouter::new(condition, force, enable))
+ .collect::<Vec<_>>();
+
+ match scope.as_str() {
+ "application" => {
+ self.routers_application.write().unwrap().routers = routers;
+ }
+ "service" => {
+ self.routers_service
+ .entry(key)
+ .or_insert_with(|| Arc::new(RwLock::new(ConditionSingleRouters::new(vec![]))))
+ .write()
+ .unwrap()
+ .routers = routers;
+ }
+ _ => {}
+ }
+ }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/router/manager/mod.rs
similarity index 92%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/cluster/router/manager/mod.rs
index 5a84af8..593fa22 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/cluster/router/manager/mod.rs
@@ -14,5 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub mod random;
-pub mod roundrobin;
+mod condition_manager;
+pub mod router_manager;
+mod tag_manager;
diff --git a/dubbo/src/cluster/router/manager/router_manager.rs b/dubbo/src/cluster/router/manager/router_manager.rs
new file mode 100644
index 0000000..e6c8b6c
--- /dev/null
+++ b/dubbo/src/cluster/router/manager/router_manager.rs
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::cluster::router::{
+ manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager},
+ nacos_config_center::nacos_client::NacosClient,
+ router_chain::RouterChain,
+};
+use dubbo_base::Url;
+use dubbo_config::{
+ get_global_config,
+ router::{ConditionRouterConfig, NacosConfig, TagRouterConfig},
+};
+use dubbo_logger::tracing::{info, trace};
+use once_cell::sync::OnceCell;
+use std::{
+ collections::HashMap,
+ sync::{Arc, RwLock},
+};
+
+pub static GLOBAL_ROUTER_MANAGER: OnceCell<Arc<RwLock<RouterManager>>> = OnceCell::new();
+const TAG: &str = "tag";
+const CONDITION: &str = "condition";
+pub struct RouterManager {
+ pub condition_router_manager: ConditionRouterManager,
+ pub tag_router_manager: TagRouterManager,
+ pub nacos: Option<NacosClient>,
+ pub consumer: HashMap<String, Url>,
+}
+
+impl RouterManager {
+ pub fn get_router_chain(&self, service: String) -> RouterChain {
+ let mut chain = RouterChain::new();
+ if let Some(url) = self.consumer.get(service.as_str()) {
+ if let Some(tag_router) = self.tag_router_manager.get_router(&service) {
+ chain.add_router(TAG.to_string(), Box::new(tag_router));
+ }
+ if let Some(condition_router) = self.condition_router_manager.get_router(&service) {
+ chain.add_router(CONDITION.to_string(), Box::new(condition_router));
+ }
+ chain.self_url = url.clone();
+ }
+ chain
+ }
+
+ pub fn notify(&mut self, event: RouterConfigChangeEvent) {
+ match event.router_kind.as_str() {
+ CONDITION => {
+ let config: ConditionRouterConfig =
+ serde_yaml::from_str(event.content.as_str()).unwrap();
+ self.condition_router_manager.update(config)
+ }
+ TAG => {
+ let config: TagRouterConfig = serde_yaml::from_str(event.content.as_str()).unwrap();
+ self.tag_router_manager.update(config)
+ }
+ _ => {
+ info!("other router change event")
+ }
+ }
+ }
+
+ pub fn init_nacos(&mut self, config: NacosConfig) {
+ self.nacos = Some(NacosClient::new_init_client(config));
+ self.init_router_managers_for_nacos();
+ }
+
+ fn init_router_managers_for_nacos(&mut self) {
+ if let Some(tag_config) = self
+ .nacos
+ .as_ref()
+ .and_then(|n| n.get_config("application", TAG, TAG))
+ {
+ self.tag_router_manager.update(tag_config);
+ }
+
+ if let Some(condition_app_config) = self
+ .nacos
+ .as_ref()
+ .and_then(|n| n.get_config("application", CONDITION, TAG))
+ {
+ self.condition_router_manager.update(condition_app_config);
+ }
+
+ for (service_name, _) in &self.consumer {
+ if let Some(condition_config) = self
+ .nacos
+ .as_ref()
+ .and_then(|n| n.get_config(service_name, CONDITION, CONDITION))
+ {
+ self.condition_router_manager.update(condition_config);
+ }
+ }
+ }
+
+ pub fn init(&mut self) {
+ let config = get_global_config().routers.clone();
+ self.init_consumer_configs();
+ if let Some(nacos_config) = &config.nacos {
+ self.init_nacos(nacos_config.clone());
+ } else {
+ trace!("Nacos not configured, using local YAML configuration for routing");
+ if let Some(condition_configs) = &config.conditions {
+ for condition_config in condition_configs {
+ self.condition_router_manager
+ .update(condition_config.clone());
+ }
+ } else {
+ info!("Unconfigured Condition Router")
+ }
+ if let Some(tag_config) = &config.tags {
+ self.tag_router_manager.update(tag_config.clone());
+ } else {
+ info!("Unconfigured Tag Router")
+ }
+ }
+ }
+
+ fn init_consumer_configs(&mut self) {
+ let consumer_configs = get_global_config()
+ .routers
+ .consumer
+ .clone()
+ .unwrap_or_else(Vec::new);
+
+ for consumer_config in consumer_configs {
+ let service_url = Url::from_url(
+ format!("{}/{}", consumer_config.url, consumer_config.service).as_str(),
+ )
+ .expect("Consumer config error");
+
+ self.consumer.insert(consumer_config.service, service_url);
+ }
+ }
+}
+
+pub fn get_global_router_manager() -> &'static Arc<RwLock<RouterManager>> {
+ GLOBAL_ROUTER_MANAGER.get_or_init(|| {
+ let mut router_manager = RouterManager {
+ condition_router_manager: ConditionRouterManager::default(),
+ tag_router_manager: TagRouterManager::default(),
+ nacos: None,
+ consumer: HashMap::new(),
+ };
+ router_manager.init();
+ return Arc::new(RwLock::new(router_manager));
+ })
+}
+
+#[derive(Debug, Default, Clone)]
+pub struct RouterConfigChangeEvent {
+ pub service_name: String,
+ pub router_kind: String,
+ pub content: String,
+}
diff --git a/dubbo/src/cluster/router/manager/tag_manager.rs b/dubbo/src/cluster/router/manager/tag_manager.rs
new file mode 100644
index 0000000..f028af2
--- /dev/null
+++ b/dubbo/src/cluster/router/manager/tag_manager.rs
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::cluster::router::tag::tag_router::{TagRouter, TagRouterInner};
+use dubbo_config::router::TagRouterConfig;
+use std::sync::{Arc, RwLock};
+
+#[derive(Debug, Clone, Default)]
+pub struct TagRouterManager {
+ pub tag_router: Arc<RwLock<TagRouterInner>>,
+}
+
+impl TagRouterManager {
+ pub fn get_router(&self, _service_name: &String) -> Option<TagRouter> {
+ Some(TagRouter {
+ inner: self.tag_router.clone(),
+ })
+ }
+
+ pub fn update(&mut self, config: TagRouterConfig) {
+ self.tag_router.write().unwrap().parse_config(config);
+ }
+}
diff --git a/dubbo/src/cluster/loadbalance/types.rs b/dubbo/src/cluster/router/mod.rs
similarity index 61%
rename from dubbo/src/cluster/loadbalance/types.rs
rename to dubbo/src/cluster/router/mod.rs
index 9273d07..edc081b 100644
--- a/dubbo/src/cluster/loadbalance/types.rs
+++ b/dubbo/src/cluster/router/mod.rs
@@ -14,29 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub mod condition;
+pub mod manager;
+pub mod nacos_config_center;
+pub mod router_chain;
+pub mod tag;
+pub mod utils;
+use crate::invocation::RpcInvocation;
use dubbo_base::Url;
use std::{fmt::Debug, sync::Arc};
-use crate::codegen::RpcInvocation;
-
-pub type BoxLoadBalance = Box<dyn LoadBalance + Send + Sync>;
-
-pub trait LoadBalance: Debug {
- fn select(
- &self,
- invokers: Arc<Vec<Url>>,
- url: Option<Url>,
- invocation: Arc<RpcInvocation>,
- ) -> Option<Url>;
+pub trait Router: Debug {
+ fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url>;
}
-pub struct Metadata {
- pub name: &'static str,
-}
+pub type BoxRouter = Box<dyn Router + Sync + Send>;
-impl Metadata {
- pub fn new(name: &'static str) -> Self {
- Metadata { name }
+#[derive(Debug, Default, Clone)]
+pub struct MockRouter {}
+
+impl Router for MockRouter {
+ fn route(&self, invokers: Vec<Url>, _url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ invokers
}
}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/router/nacos_config_center/mod.rs
similarity index 95%
rename from dubbo/src/cluster/loadbalance/impls/mod.rs
rename to dubbo/src/cluster/router/nacos_config_center/mod.rs
index 5a84af8..7172231 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/cluster/router/nacos_config_center/mod.rs
@@ -14,5 +14,4 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub mod random;
-pub mod roundrobin;
+pub mod nacos_client;
diff --git a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
new file mode 100644
index 0000000..68b5f09
--- /dev/null
+++ b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::cluster::router::manager::router_manager::{
+ get_global_router_manager, RouterConfigChangeEvent,
+};
+use dubbo_config::router::NacosConfig;
+use dubbo_logger::{tracing, tracing::info};
+use nacos_sdk::api::{
+ config::{ConfigChangeListener, ConfigResponse, ConfigService, ConfigServiceBuilder},
+ props::ClientProps,
+};
+use std::sync::{Arc, RwLock};
+
+pub struct NacosClient {
+ pub client: Arc<RwLock<dyn ConfigService>>,
+}
+
+unsafe impl Send for NacosClient {}
+
+unsafe impl Sync for NacosClient {}
+
+pub struct ConfigChangeListenerImpl;
+
+impl NacosClient {
+ pub fn new_init_client(config: NacosConfig) -> Self {
+ let server_addr = config.addr;
+ let namespace = config.namespace;
+ let app = config.app;
+ let enable_auth = config.enable_auth;
+
+ let mut props = ClientProps::new()
+ .server_addr(server_addr)
+ .namespace(namespace)
+ .app_name(app);
+
+ if enable_auth.is_some() {
+ info!("enable nacos auth!");
+ } else {
+ info!("disable nacos auth!");
+ }
+
+ if let Some(auth) = enable_auth {
+ props = props
+ .auth_username(auth.auth_username)
+ .auth_password(auth.auth_password);
+ }
+
+ let client = Arc::new(RwLock::new(
+ ConfigServiceBuilder::new(props)
+ .build()
+ .expect("NacosClient build failed! Please check NacosConfig"),
+ ));
+
+ Self { client }
+ }
+
+ pub fn get_config<T>(&self, data_id: &str, group: &str, config_type: &str) -> Option<T>
+ where
+ T: serde::de::DeserializeOwned,
+ {
+ let config_resp = self
+ .client
+ .read()
+ .unwrap()
+ .get_config(data_id.to_string(), group.to_string());
+
+ match config_resp {
+ Ok(config_resp) => {
+ self.add_listener(data_id, group);
+ let string = config_resp.content();
+ let result = serde_yaml::from_str(string);
+
+ match result {
+ Ok(config) => {
+ info!(
+ "success to get {}Router config and parse success",
+ config_type
+ );
+ Some(config)
+ }
+ Err(_) => {
+ info!("failed to parse {}Router rule", config_type);
+ None
+ }
+ }
+ }
+ Err(_) => None,
+ }
+ }
+
+ pub fn add_listener(&self, data_id: &str, group: &str) {
+ if let Err(err) = self
+ .client
+ .write()
+ .map_err(|e| format!("failed to create nacos config listener: {}", e))
+ .and_then(|client| {
+ client
+ .add_listener(
+ data_id.to_string(),
+ group.to_string(),
+ Arc::new(ConfigChangeListenerImpl {}),
+ )
+ .map_err(|e| format!("failed to add nacos config listener: {}", e))
+ })
+ {
+ tracing::error!("{}", err);
+ } else {
+ info!("listening the config success");
+ }
+ }
+}
+
+impl ConfigChangeListener for ConfigChangeListenerImpl {
+ fn notify(&self, config_resp: ConfigResponse) {
+ let content_type = config_resp.content_type();
+ let event = RouterConfigChangeEvent {
+ service_name: config_resp.data_id().to_string(),
+ router_kind: config_resp.group().to_string(),
+ content: config_resp.content().to_string(),
+ };
+
+ if content_type == "yaml" {
+ get_global_router_manager().write().unwrap().notify(event);
+ }
+
+ info!("notify config: {:?}", config_resp);
+ }
+}
diff --git a/dubbo/src/cluster/router/router_chain.rs b/dubbo/src/cluster/router/router_chain.rs
new file mode 100644
index 0000000..601bc5e
--- /dev/null
+++ b/dubbo/src/cluster/router/router_chain.rs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::{cluster::router::BoxRouter, invocation::RpcInvocation};
+use dubbo_base::Url;
+use std::{collections::HashMap, sync::Arc};
+
+#[derive(Debug, Default)]
+pub struct RouterChain {
+ pub routers: HashMap<String, BoxRouter>,
+ pub self_url: Url,
+}
+
+impl RouterChain {
+ pub fn new() -> Self {
+ RouterChain {
+ routers: HashMap::new(),
+ self_url: Url::new(),
+ }
+ }
+
+ pub fn route(&self, mut invokers: Vec<Url>, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ for (_, value) in self.routers.iter() {
+ invokers = value.route(invokers, self.self_url.clone(), invocation.clone())
+ }
+ invokers
+ }
+
+ pub fn add_router(&mut self, key: String, router: BoxRouter) {
+ self.routers.insert(key, router);
+ }
+}
+
+#[test]
+fn test() {
+ use crate::cluster::router::manager::router_manager::get_global_router_manager;
+
+ let u1 = Url::from_url("tri://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u2 = Url::from_url("tri://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u3 = Url::from_url("tri://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u4 = Url::from_url("tri://127.0.2.1:8880/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u5 = Url::from_url("tri://127.0.1.1:8882/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u6 = Url::from_url("tri://213.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u7 = Url::from_url("tri://169.0.1.1:8887/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let invs = vec![u1, u2, u3, u4, u5, u6, u7];
+ let len = invs.len().clone();
+ let inv = Arc::new(
+ RpcInvocation::default()
+ .with_method_name("greet".to_string())
+ .with_service_unique_name("org.apache.dubbo.sample.tri.Greeter".to_string()),
+ );
+ let x = get_global_router_manager()
+ .read()
+ .unwrap()
+ .get_router_chain(inv.get_target_service_unique_name());
+ let result = x.route(invs, inv.clone());
+ println!("total:{},result:{}", len, result.len().clone());
+ dbg!(result);
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/router/tag/mod.rs
similarity index 95%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/cluster/router/tag/mod.rs
index 5a84af8..673a720 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/cluster/router/tag/mod.rs
@@ -14,5 +14,4 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub mod random;
-pub mod roundrobin;
+pub mod tag_router;
diff --git a/dubbo/src/cluster/router/tag/tag_router.rs b/dubbo/src/cluster/router/tag/tag_router.rs
new file mode 100644
index 0000000..3d28f93
--- /dev/null
+++ b/dubbo/src/cluster/router/tag/tag_router.rs
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::{
+ cluster::router::{utils::to_original_map, Router},
+ codegen::RpcInvocation,
+};
+use dubbo_base::Url;
+use dubbo_config::router::TagRouterConfig;
+use std::{
+ collections::HashMap,
+ fmt::Debug,
+ sync::{Arc, RwLock},
+};
+
+#[derive(Debug, Clone, Default)]
+pub struct TagRouterInner {
+ pub tag_rules: HashMap<String, HashMap<String, String>>,
+ pub force: bool,
+ pub enabled: bool,
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct TagRouter {
+ pub(crate) inner: Arc<RwLock<TagRouterInner>>,
+}
+impl Router for TagRouter {
+ fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ return self.inner.read().unwrap().route(invokers, url, invocation);
+ }
+}
+
+impl TagRouterInner {
+ pub fn parse_config(&mut self, config: TagRouterConfig) {
+ self.tag_rules = HashMap::new();
+ self.force = config.force;
+ self.enabled = config.enabled;
+ for tag in &config.tags {
+ let mut tags = HashMap::new();
+ for rule in &tag.matches {
+ tags.insert(rule.key.clone(), rule.value.clone());
+ }
+ self.tag_rules.insert(tag.name.clone(), tags);
+ }
+ }
+
+ pub fn match_tag(&self, params: HashMap<String, String>) -> Option<String> {
+ let mut tag_result = None;
+ for (tag, tag_rules) in &self.tag_rules {
+ for (key, value) in tag_rules {
+ match params.get(key.as_str()) {
+ None => {}
+ Some(val) => {
+ if val == value {
+ tag_result = Some(tag.clone())
+ }
+ }
+ }
+ }
+ }
+ tag_result
+ }
+
+ pub fn route(&self, invokers: Vec<Url>, url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ if !self.enabled {
+ return invokers;
+ };
+ let self_param = to_original_map(url);
+ let invocation_tag = self.match_tag(self_param);
+ let mut invokers_result = Vec::new();
+ let mut invokers_no_tag = Vec::new();
+ for invoker in &invokers {
+ let invoker_param = to_original_map(invoker.clone());
+ let invoker_tag = self.match_tag(invoker_param);
+ if invoker_tag == None {
+ invokers_no_tag.push(invoker.clone());
+ }
+ if invoker_tag == invocation_tag {
+ invokers_result.push(invoker.clone());
+ }
+ }
+ if invokers_result.is_empty() {
+ if !self.force {
+ return invokers_no_tag;
+ }
+ }
+ invokers_result
+ }
+}
diff --git a/dubbo/src/cluster/router/utils.rs b/dubbo/src/cluster/router/utils.rs
new file mode 100644
index 0000000..eca98f6
--- /dev/null
+++ b/dubbo/src/cluster/router/utils.rs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use dubbo_base::Url;
+use std::{collections::HashMap, string::String};
+
+pub fn to_original_map(url: Url) -> HashMap<String, String> {
+ let mut result: HashMap<String, String> = HashMap::new();
+ result.insert("scheme".parse().unwrap(), url.scheme);
+ result.insert("location".parse().unwrap(), url.location);
+ result.insert("ip".parse().unwrap(), url.ip);
+ result.insert("port".parse().unwrap(), url.port);
+ result.insert("service_name".parse().unwrap(), url.service_name);
+ result.insert("service_key".parse().unwrap(), url.service_key);
+ for (key, value) in url.params {
+ result.insert(key, value);
+ }
+ result
+}
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 8d95c21..77f8f5a 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -22,16 +22,15 @@
pub use async_trait::async_trait;
pub use bytes::Bytes;
+pub use dubbo_base::StdError;
pub use http_body::Body;
pub use hyper::Body as hyperBody;
pub use tower_service::Service;
pub use super::{
- cluster::directory::RegistryDirectory,
empty_body,
invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
protocol::{triple::triple_invoker::TripleInvoker, Invoker},
- registry::{BoxRegistry, Registry},
triple::{
client::TripleClient,
codec::{prost::ProstCodec, serde_codec::SerdeCodec, Codec},
@@ -41,13 +40,12 @@
TripleServer,
},
},
- BoxBody, BoxFuture, StdError,
+ BoxBody, BoxFuture,
};
pub use crate::{
filter::{service::FilterService, Filter},
triple::{
- client::builder::{ClientBoxService, ClientBuilder},
- server::builder::ServerBuilder,
+ client::builder::ClientBuilder, server::builder::ServerBuilder,
transport::connection::Connection,
},
};
diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs
new file mode 100644
index 0000000..ace67e6
--- /dev/null
+++ b/dubbo/src/directory/mod.rs
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{
+ collections::HashMap,
+ pin::Pin,
+ sync::{Arc, Mutex},
+ task::{Context, Poll},
+};
+
+use crate::{
+ codegen::{RpcInvocation, TripleInvoker},
+ invocation::Invocation,
+ invoker::{clone_invoker::CloneInvoker, NewInvoker},
+ param::Param,
+ svc::NewService,
+};
+use dubbo_base::{StdError, Url};
+use dubbo_logger::tracing::{debug, error};
+use futures_util::future;
+use tokio::sync::mpsc::channel;
+use tokio_stream::wrappers::ReceiverStream;
+use tower::{
+ buffer::Buffer,
+ discover::{Change, Discover},
+ ServiceExt,
+};
+
+use crate::extension::registry_extension::{proxy::RegistryProxy, Registry};
+use dubbo_base::registry_param::InterfaceName;
+use tower_service::Service;
+
+type BufferedDirectory =
+ Buffer<Directory<ReceiverStream<Result<Change<String, ()>, StdError>>>, ()>;
+
+pub struct NewCachedDirectory<N>
+where
+ N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+ <N as Service<()>>::Future: Send + 'static,
+{
+ inner: CachedDirectory<NewDirectory<N>, RpcInvocation>,
+}
+
+pub struct CachedDirectory<N, T>
+where
+ // NewDirectory
+ N: NewService<T>,
+{
+ inner: N,
+ cache: Arc<Mutex<HashMap<String, N::Service>>>,
+}
+
+pub struct NewDirectory<N> {
+ // registry
+ inner: N,
+}
+
+pub struct Directory<D> {
+ directory: HashMap<String, CloneInvoker<TripleInvoker>>,
+ discover: D,
+ new_invoker: NewInvoker,
+}
+
+impl<N> NewCachedDirectory<N>
+where
+ N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+ <N as Service<()>>::Future: Send + 'static,
+{
+ pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+ tower_layer::layer_fn(|inner: N| {
+ NewCachedDirectory {
+ // inner is registry
+ inner: CachedDirectory::new(NewDirectory::new(inner)),
+ }
+ })
+ }
+}
+
+impl<N, T> NewService<T> for NewCachedDirectory<N>
+where
+ T: Param<RpcInvocation>,
+ // service registry
+ N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+ <N as Service<()>>::Future: Send + 'static,
+{
+ type Service = BufferedDirectory;
+
+ fn new_service(&self, target: T) -> Self::Service {
+ self.inner.new_service(target.param())
+ }
+}
+
+impl<N, T> CachedDirectory<N, T>
+where
+ N: NewService<T>,
+{
+ pub fn new(inner: N) -> Self {
+ CachedDirectory {
+ inner,
+ cache: Default::default(),
+ }
+ }
+}
+
+impl<N, T> NewService<T> for CachedDirectory<N, T>
+where
+ T: Param<RpcInvocation>,
+ // NewDirectory
+ N: NewService<T>,
+ // Buffered directory
+ N::Service: Clone,
+{
+ type Service = N::Service;
+
+ fn new_service(&self, target: T) -> Self::Service {
+ let rpc_invocation = target.param();
+ let service_name = rpc_invocation.get_target_service_unique_name();
+ let mut cache = self.cache.lock().expect("cached directory lock failed.");
+ let value = cache.get(&service_name).map(|val| val.clone());
+ match value {
+ None => {
+ let new_service = self.inner.new_service(target);
+ cache.insert(service_name, new_service.clone());
+ new_service
+ }
+ Some(value) => value,
+ }
+ }
+}
+
+impl<N> NewDirectory<N> {
+ const MAX_DIRECTORY_BUFFER_SIZE: usize = 16;
+
+ pub fn new(inner: N) -> Self {
+ NewDirectory { inner }
+ }
+}
+
+impl<N, T> NewService<T> for NewDirectory<N>
+where
+ T: Param<RpcInvocation>,
+ // service registry
+ N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+ <N as Service<()>>::Future: Send + 'static,
+{
+ type Service = BufferedDirectory;
+
+ fn new_service(&self, target: T) -> Self::Service {
+ let service_name = target.param().get_target_service_unique_name();
+
+ let fut = self.inner.clone().oneshot(());
+
+ let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE);
+
+ tokio::spawn(async move {
+ // todo use dubbo url model generate subscribe url
+ // category:serviceInterface:version:group
+ let consumer_url = format!("consumer://{}/{}", "127.0.0.1:8888", service_name);
+ let mut subscribe_url: Url = consumer_url.parse().unwrap();
+ subscribe_url.add_query_param(InterfaceName::new(service_name));
+
+ let Ok(registry) = fut.await else {
+ error!("registry extension load failed.");
+ return;
+ };
+
+ let receiver = registry.subscribe(subscribe_url).await;
+ debug!("discover start!");
+ match receiver {
+ Err(_e) => {
+ // error!("discover stream error: {}", e);
+ debug!("discover stream error");
+ }
+ Ok(mut receiver) => loop {
+ let change = receiver.recv().await;
+ debug!("receive change: {:?}", change);
+ match change {
+ None => {
+ debug!("discover stream closed.");
+ break;
+ }
+ Some(change) => {
+ let _ = tx.send(change).await;
+ }
+ }
+ },
+ }
+ });
+
+ Buffer::new(
+ Directory::new(ReceiverStream::new(rx)),
+ Self::MAX_DIRECTORY_BUFFER_SIZE,
+ )
+ }
+}
+
+impl<D> Directory<D> {
+ pub fn new(discover: D) -> Self {
+ Directory {
+ directory: Default::default(),
+ discover,
+ new_invoker: NewInvoker,
+ }
+ }
+}
+
+impl<D> Service<()> for Directory<D>
+where
+ // Discover
+ D: Discover<Key = String> + Unpin + Send,
+ D::Error: Into<StdError>,
+{
+ type Response = Vec<CloneInvoker<TripleInvoker>>;
+
+ type Error = StdError;
+
+ type Future = future::Ready<Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ loop {
+ let pin_discover = Pin::new(&mut self.discover);
+
+ match pin_discover.poll_discover(cx) {
+ Poll::Pending => {
+ if self.directory.is_empty() {
+ return Poll::Pending;
+ } else {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ Poll::Ready(change) => {
+ let change = change.transpose().map_err(|e| e.into())?;
+ match change {
+ Some(Change::Remove(key)) => {
+ debug!("remove key: {}", key);
+ self.directory.remove(&key);
+ }
+ Some(Change::Insert(key, _)) => {
+ debug!("insert key: {}", key);
+ let invoker = self.new_invoker.new_service(key.clone());
+ self.directory.insert(key, invoker);
+ }
+ None => {
+ debug!("stream closed");
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ fn call(&mut self, _: ()) -> Self::Future {
+ let vec = self
+ .directory
+ .values()
+ .map(|val| val.clone())
+ .collect::<Vec<CloneInvoker<TripleInvoker>>>();
+ future::ok(vec)
+ }
+}
diff --git a/dubbo/src/extension/mod.rs b/dubbo/src/extension/mod.rs
new file mode 100644
index 0000000..c1d0395
--- /dev/null
+++ b/dubbo/src/extension/mod.rs
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod registry_extension;
+
+use crate::{
+ extension::registry_extension::proxy::RegistryProxy, registry::registry::StaticRegistry,
+};
+use dubbo_base::{extension_param::ExtensionType, url::UrlParam, StdError, Url};
+use dubbo_logger::tracing::{error, info};
+use std::{future::Future, pin::Pin, sync::Arc};
+use thiserror::Error;
+use tokio::sync::{oneshot, RwLock};
+
+pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectoryCommander> =
+ once_cell::sync::Lazy::new(|| ExtensionDirectory::init());
+
+#[derive(Default)]
+struct ExtensionDirectory {
+ registry_extension_loader: registry_extension::RegistryExtensionLoader,
+}
+
+impl ExtensionDirectory {
+ fn init() -> ExtensionDirectoryCommander {
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<ExtensionOpt>(64);
+
+ tokio::spawn(async move {
+ let mut extension_directory = ExtensionDirectory::default();
+
+ // register static registry extension
+ let _ = extension_directory.register(
+ StaticRegistry::name(),
+ StaticRegistry::convert_to_extension_factories(),
+ ExtensionType::Registry,
+ );
+
+ while let Some(extension_opt) = rx.recv().await {
+ match extension_opt {
+ ExtensionOpt::Register(
+ extension_name,
+ extension_factories,
+ extension_type,
+ tx,
+ ) => {
+ let result = extension_directory.register(
+ extension_name,
+ extension_factories,
+ extension_type,
+ );
+ let _ = tx.send(result);
+ }
+ ExtensionOpt::Remove(extension_name, extension_type, tx) => {
+ let result = extension_directory.remove(extension_name, extension_type);
+ let _ = tx.send(result);
+ }
+ ExtensionOpt::Load(url, extension_type, tx) => {
+ let _ = extension_directory.load(url, extension_type, tx);
+ }
+ }
+ }
+ });
+
+ ExtensionDirectoryCommander { sender: tx }
+ }
+
+ fn register(
+ &mut self,
+ extension_name: String,
+ extension_factories: ExtensionFactories,
+ extension_type: ExtensionType,
+ ) -> Result<(), StdError> {
+ match extension_type {
+ ExtensionType::Registry => match extension_factories {
+ ExtensionFactories::RegistryExtensionFactory(registry_extension_factory) => {
+ self.registry_extension_loader
+ .register(extension_name, registry_extension_factory);
+ Ok(())
+ }
+ },
+ }
+ }
+
+ fn remove(
+ &mut self,
+ extension_name: String,
+ extension_type: ExtensionType,
+ ) -> Result<(), StdError> {
+ match extension_type {
+ ExtensionType::Registry => {
+ self.registry_extension_loader.remove(extension_name);
+ Ok(())
+ }
+ }
+ }
+
+ fn load(
+ &mut self,
+ url: Url,
+ extension_type: ExtensionType,
+ callback: oneshot::Sender<Result<Extensions, StdError>>,
+ ) {
+ match extension_type {
+ ExtensionType::Registry => {
+ let extension = self.registry_extension_loader.load(url);
+ match extension {
+ Ok(mut extension) => {
+ tokio::spawn(async move {
+ let extension = extension.resolve().await;
+ match extension {
+ Ok(extension) => {
+ let _ = callback.send(Ok(Extensions::Registry(extension)));
+ }
+ Err(err) => {
+ error!("load extension failed: {}", err);
+ let _ = callback.send(Err(err));
+ }
+ }
+ });
+ }
+ Err(err) => {
+ error!("load extension failed: {}", err);
+ let _ = callback.send(Err(err));
+ }
+ }
+ }
+ }
+ }
+}
+
+type ExtensionCreator<T> = Box<
+ dyn Fn(Url) -> Pin<Box<dyn Future<Output = Result<T, StdError>> + Send + 'static>>
+ + Send
+ + Sync
+ + 'static,
+>;
+pub(crate) struct ExtensionPromiseResolver<T> {
+ resolved_data: Option<T>,
+ creator: ExtensionCreator<T>,
+ url: Url,
+}
+
+impl<T> ExtensionPromiseResolver<T>
+where
+ T: Send + Clone + 'static,
+{
+ fn new(creator: ExtensionCreator<T>, url: Url) -> Self {
+ ExtensionPromiseResolver {
+ resolved_data: None,
+ creator,
+ url,
+ }
+ }
+
+ fn resolved_data(&self) -> Option<T> {
+ self.resolved_data.clone()
+ }
+
+ async fn resolve(&mut self) -> Result<T, StdError> {
+ match (self.creator)(self.url.clone()).await {
+ Ok(data) => {
+ self.resolved_data = Some(data.clone());
+ Ok(data)
+ }
+ Err(err) => {
+ error!("create extension failed: {}", err);
+ Err(LoadExtensionError::new(
+ "load extension failed, create extension occur an error".to_string(),
+ )
+ .into())
+ }
+ }
+ }
+}
+
+pub(crate) struct LoadExtensionPromise<T> {
+ resolver: Arc<RwLock<ExtensionPromiseResolver<T>>>,
+}
+
+impl<T> LoadExtensionPromise<T>
+where
+ T: Send + Clone + 'static,
+{
+ pub(crate) fn new(creator: ExtensionCreator<T>, url: Url) -> Self {
+ let resolver = ExtensionPromiseResolver::new(creator, url);
+ LoadExtensionPromise {
+ resolver: Arc::new(RwLock::new(resolver)),
+ }
+ }
+
+ pub(crate) async fn resolve(&mut self) -> Result<T, StdError> {
+ // get read lock
+ let resolver_read_lock = self.resolver.read().await;
+ // if extension is not None, return it
+ if let Some(extension) = resolver_read_lock.resolved_data() {
+ return Ok(extension);
+ }
+ drop(resolver_read_lock);
+
+ let mut write_lock = self.resolver.write().await;
+
+ match write_lock.resolved_data() {
+ Some(extension) => Ok(extension),
+ None => {
+ let extension = write_lock.resolve().await;
+ extension
+ }
+ }
+ }
+}
+
+impl<T> Clone for LoadExtensionPromise<T> {
+ fn clone(&self) -> Self {
+ LoadExtensionPromise {
+ resolver: self.resolver.clone(),
+ }
+ }
+}
+
+pub struct ExtensionDirectoryCommander {
+ sender: tokio::sync::mpsc::Sender<ExtensionOpt>,
+}
+
+impl ExtensionDirectoryCommander {
+ #[allow(private_bounds)]
+ pub async fn register<T>(&self) -> Result<(), StdError>
+ where
+ T: Extension,
+ T: ExtensionMetaInfo,
+ T: ConvertToExtensionFactories,
+ {
+ let extension_name = T::name();
+ let extension_factories = T::convert_to_extension_factories();
+ let extension_type = T::extension_type();
+
+ info!(
+ "register extension: {}, type: {}",
+ extension_name,
+ extension_type.as_str()
+ );
+
+ let (tx, rx) = oneshot::channel();
+
+ let send = self
+ .sender
+ .send(ExtensionOpt::Register(
+ extension_name.clone(),
+ extension_factories,
+ extension_type,
+ tx,
+ ))
+ .await;
+
+ let Ok(_) = send else {
+ let err_msg = format!("register extension {} failed", extension_name);
+ return Err(RegisterExtensionError::new(err_msg).into());
+ };
+
+ let ret = rx.await;
+
+ match ret {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ let err_msg = format!("register extension {} failed", extension_name);
+ Err(RegisterExtensionError::new(err_msg).into())
+ }
+ }
+ }
+
+ #[allow(private_bounds)]
+ pub async fn remove<T>(&self) -> Result<(), StdError>
+ where
+ T: Extension,
+ T: ExtensionMetaInfo,
+ {
+ let extension_name = T::name();
+ let extension_type = T::extension_type();
+
+ info!(
+ "remove extension: {}, type: {}",
+ extension_name,
+ extension_type.as_str()
+ );
+
+ let (tx, rx) = oneshot::channel();
+
+ let send = self
+ .sender
+ .send(ExtensionOpt::Remove(
+ extension_name.clone(),
+ extension_type,
+ tx,
+ ))
+ .await;
+
+ let Ok(_) = send else {
+ let err_msg = format!("remove extension {} failed", extension_name);
+ return Err(RemoveExtensionError::new(err_msg).into());
+ };
+
+ let ret = rx.await;
+
+ match ret {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ let err_msg = format!("remove extension {} failed", extension_name);
+ Err(RemoveExtensionError::new(err_msg).into())
+ }
+ }
+ }
+
+ pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, StdError> {
+ let url_str = url.to_string();
+ info!("load registry extension: {}", url_str);
+
+ let (tx, rx) = oneshot::channel();
+
+ let send = self
+ .sender
+ .send(ExtensionOpt::Load(url, ExtensionType::Registry, tx))
+ .await;
+
+ let Ok(_) = send else {
+ let err_msg = format!("load registry extension failed: {}", url_str);
+ return Err(LoadExtensionError::new(err_msg).into());
+ };
+
+ let extensions = rx.await;
+
+ let Ok(extension) = extensions else {
+ let err_msg = format!("load registry extension failed: {}", url_str);
+ return Err(LoadExtensionError::new(err_msg).into());
+ };
+
+ let Ok(extensions) = extension else {
+ let err_msg = format!("load registry extension failed: {}", url_str);
+ return Err(LoadExtensionError::new(err_msg).into());
+ };
+
+ match extensions {
+ Extensions::Registry(proxy) => Ok(proxy),
+ }
+ }
+}
+
+enum ExtensionOpt {
+ Register(
+ String,
+ ExtensionFactories,
+ ExtensionType,
+ oneshot::Sender<Result<(), StdError>>,
+ ),
+ Remove(String, ExtensionType, oneshot::Sender<Result<(), StdError>>),
+ Load(
+ Url,
+ ExtensionType,
+ oneshot::Sender<Result<Extensions, StdError>>,
+ ),
+}
+
+pub(crate) trait Sealed {}
+
+#[allow(private_bounds)]
+#[async_trait::async_trait]
+pub trait Extension: Sealed {
+ type Target;
+
+ fn name() -> String;
+
+ async fn create(url: Url) -> Result<Self::Target, StdError>;
+}
+
+#[allow(private_bounds)]
+pub(crate) trait ExtensionMetaInfo {
+ fn extension_type() -> ExtensionType;
+}
+
+pub(crate) enum Extensions {
+ Registry(RegistryProxy),
+}
+
+pub(crate) enum ExtensionFactories {
+ RegistryExtensionFactory(registry_extension::RegistryExtensionFactory),
+}
+
+#[allow(private_bounds)]
+pub(crate) trait ConvertToExtensionFactories {
+ fn convert_to_extension_factories() -> ExtensionFactories;
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub(crate) struct RegisterExtensionError(String);
+
+impl RegisterExtensionError {
+ pub fn new(msg: String) -> Self {
+ RegisterExtensionError(msg)
+ }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub struct RemoveExtensionError(String);
+
+impl RemoveExtensionError {
+ pub fn new(msg: String) -> Self {
+ RemoveExtensionError(msg)
+ }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub struct LoadExtensionError(String);
+
+impl LoadExtensionError {
+ pub fn new(msg: String) -> Self {
+ LoadExtensionError(msg)
+ }
+}
diff --git a/dubbo/src/extension/registry_extension.rs b/dubbo/src/extension/registry_extension.rs
new file mode 100644
index 0000000..b1f75ba
--- /dev/null
+++ b/dubbo/src/extension/registry_extension.rs
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{collections::HashMap, future::Future, pin::Pin};
+
+use async_trait::async_trait;
+use thiserror::Error;
+use tokio::sync::mpsc::Receiver;
+use tower::discover::Change;
+
+use dubbo_base::{
+ extension_param::ExtensionName, registry_param::RegistryUrl, url::UrlParam, StdError, Url,
+};
+use proxy::RegistryProxy;
+
+use crate::extension::{
+ ConvertToExtensionFactories, Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType,
+ LoadExtensionPromise,
+};
+
+// extension://0.0.0.0/?extension-type=registry&extension-name=nacos®istry-url=nacos://127.0.0.1:8848
+pub fn to_extension_url(registry_url: Url) -> Url {
+ let mut registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap();
+
+ let protocol = registry_url.protocol();
+
+ registry_extension_loader_url.add_query_param(ExtensionType::Registry);
+ registry_extension_loader_url.add_query_param(ExtensionName::new(protocol.to_string()));
+ registry_extension_loader_url.add_query_param(RegistryUrl::new(registry_url));
+
+ registry_extension_loader_url
+}
+
+pub type ServiceChange = Change<String, ()>;
+pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
+
+#[async_trait]
+pub trait Registry {
+ async fn register(&self, url: Url) -> Result<(), StdError>;
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError>;
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
+
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
+
+ fn url(&self) -> &Url;
+}
+
+impl<T> crate::extension::Sealed for T where T: Registry + Send + Sync + 'static {}
+
+impl<T> ExtensionMetaInfo for T
+where
+ T: Registry + Send + Sync + 'static,
+ T: Extension<Target = Box<dyn Registry + Send + Sync + 'static>>,
+{
+ fn extension_type() -> ExtensionType {
+ ExtensionType::Registry
+ }
+}
+
+impl<T> ConvertToExtensionFactories for T
+where
+ T: Registry + Send + Sync + 'static,
+ T: Extension<Target = Box<dyn Registry + Send + Sync + 'static>>,
+{
+ fn convert_to_extension_factories() -> ExtensionFactories {
+ ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(
+ <T as Extension>::create,
+ ))
+ }
+}
+
+#[derive(Default)]
+pub(super) struct RegistryExtensionLoader {
+ factories: HashMap<String, RegistryExtensionFactory>,
+}
+
+impl RegistryExtensionLoader {
+ pub(crate) fn register(&mut self, extension_name: String, factory: RegistryExtensionFactory) {
+ self.factories.insert(extension_name, factory);
+ }
+
+ pub(crate) fn remove(&mut self, extension_name: String) {
+ self.factories.remove(&extension_name);
+ }
+
+ pub(crate) fn load(
+ &mut self,
+ url: Url,
+ ) -> Result<LoadExtensionPromise<RegistryProxy>, StdError> {
+ let extension_name = url.query::<ExtensionName>().unwrap();
+ let extension_name = extension_name.value();
+ let factory = self.factories.get_mut(&extension_name).ok_or_else(|| {
+ RegistryExtensionLoaderError::new(format!(
+ "registry extension loader error: extension name {} not found",
+ extension_name
+ ))
+ })?;
+ factory.create(url)
+ }
+}
+
+type RegistryConstructor = fn(
+ Url,
+) -> Pin<
+ Box<dyn Future<Output = Result<Box<dyn Registry + Send + Sync + 'static>, StdError>> + Send>,
+>;
+
+pub(crate) struct RegistryExtensionFactory {
+ constructor: RegistryConstructor,
+ instances: HashMap<String, LoadExtensionPromise<RegistryProxy>>,
+}
+
+impl RegistryExtensionFactory {
+ pub(super) fn new(constructor: RegistryConstructor) -> Self {
+ Self {
+ constructor,
+ instances: HashMap::new(),
+ }
+ }
+}
+
+impl RegistryExtensionFactory {
+ pub(super) fn create(
+ &mut self,
+ url: Url,
+ ) -> Result<LoadExtensionPromise<RegistryProxy>, StdError> {
+ let registry_url = url.query::<RegistryUrl>().unwrap();
+ let registry_url = registry_url.value();
+ let url_str = registry_url.as_str().to_string();
+ match self.instances.get(&url_str) {
+ Some(proxy) => {
+ let proxy = proxy.clone();
+ Ok(proxy)
+ }
+ None => {
+ let constructor = self.constructor;
+
+ let creator = move |url: Url| {
+ let registry = constructor(url);
+ Box::pin(async move {
+ let registry = registry.await?;
+ let proxy = <RegistryProxy as From<Box<dyn Registry + Send + Sync>>>::from(
+ registry,
+ );
+ Ok(proxy)
+ })
+ as Pin<
+ Box<
+ dyn Future<Output = Result<RegistryProxy, StdError>>
+ + Send
+ + 'static,
+ >,
+ >
+ };
+
+ let promise = LoadExtensionPromise::new(Box::new(creator), url);
+ self.instances.insert(url_str, promise.clone());
+ Ok(promise)
+ }
+ }
+ }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub(crate) struct RegistryExtensionLoaderError(String);
+
+impl RegistryExtensionLoaderError {
+ pub(crate) fn new(msg: String) -> Self {
+ RegistryExtensionLoaderError(msg)
+ }
+}
+
+pub mod proxy {
+ use async_trait::async_trait;
+ use thiserror::Error;
+ use tokio::sync::oneshot;
+
+ use dubbo_base::{StdError, Url};
+ use dubbo_logger::tracing::error;
+
+ use crate::extension::registry_extension::{DiscoverStream, Registry};
+
+ pub(super) enum RegistryOpt {
+ Register(Url, oneshot::Sender<Result<(), StdError>>),
+ Unregister(Url, oneshot::Sender<Result<(), StdError>>),
+ Subscribe(Url, oneshot::Sender<Result<DiscoverStream, StdError>>),
+ UnSubscribe(Url, oneshot::Sender<Result<(), StdError>>),
+ }
+
+ #[derive(Clone)]
+ pub struct RegistryProxy {
+ sender: tokio::sync::mpsc::Sender<RegistryOpt>,
+ url: Url,
+ }
+
+ #[async_trait]
+ impl Registry for RegistryProxy {
+ async fn register(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+
+ match self
+ .sender
+ .send(RegistryOpt::Register(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive register response failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive register response failed").into()
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send register request failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send register opt failed").into());
+ }
+ }
+ }
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+ match self
+ .sender
+ .send(RegistryOpt::Unregister(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive unregister response failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive unregister response failed").into(),
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send unregister request failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send unregister opt failed").into());
+ }
+ }
+ }
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+ let (tx, rx) = oneshot::channel();
+
+ match self
+ .sender
+ .send(RegistryOpt::Subscribe(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive subscribe response failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive subscribe response failed").into()
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send subscribe request failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send subscribe opt failed").into());
+ }
+ }
+ }
+
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+ match self
+ .sender
+ .send(RegistryOpt::UnSubscribe(url.clone(), tx))
+ .await
+ {
+ Ok(_) => {
+ match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!("registry proxy error: receive unsubscribe response failed, url: {}", url);
+ return Err(RegistryProxyError::new(
+ "receive unsubscribe response failed",
+ )
+ .into());
+ }
+ }
+ }
+ Err(_) => {
+ error!(
+ "registry proxy error: send unsubscribe request failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send unsubscribe opt failed").into());
+ }
+ }
+ }
+
+ fn url(&self) -> &Url {
+ &self.url
+ }
+ }
+
+ impl From<Box<dyn Registry + Send + Sync>> for RegistryProxy {
+ fn from(registry: Box<dyn Registry + Send + Sync>) -> Self {
+ let url = registry.url().clone();
+
+ let (sender, mut receiver) = tokio::sync::mpsc::channel(1024);
+
+ tokio::spawn(async move {
+ while let Some(opt) = receiver.recv().await {
+ match opt {
+ RegistryOpt::Register(url, tx) => {
+ let register = registry.register(url).await;
+ if let Err(_) = tx.send(register) {
+ error!("registry proxy error: send register response failed");
+ }
+ }
+ RegistryOpt::Unregister(url, tx) => {
+ let unregister = registry.unregister(url).await;
+ if let Err(_) = tx.send(unregister) {
+ error!("registry proxy error: send unregister response failed");
+ }
+ }
+ RegistryOpt::Subscribe(url, tx) => {
+ let subscribe = registry.subscribe(url).await;
+ if let Err(_) = tx.send(subscribe) {
+ error!("registry proxy error: send subscribe response failed");
+ }
+ }
+ RegistryOpt::UnSubscribe(url, tx) => {
+ let unsubscribe = registry.unsubscribe(url).await;
+ if let Err(_) = tx.send(unsubscribe) {
+ error!("registry proxy error: send unsubscribe response failed");
+ }
+ }
+ }
+ }
+ });
+
+ RegistryProxy { sender, url }
+ }
+ }
+
+ #[derive(Error, Debug)]
+ #[error("registry proxy error: {0}")]
+ pub(crate) struct RegistryProxyError(String);
+
+ impl RegistryProxyError {
+ pub(crate) fn new(msg: &str) -> Self {
+ RegistryProxyError(msg.to_string())
+ }
+ }
+}
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index d595f38..f3c6dc1 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -15,20 +15,13 @@
* limitations under the License.
*/
-use std::{
- collections::HashMap,
- error::Error,
- pin::Pin,
- sync::{Arc, Mutex},
-};
+use std::{collections::HashMap, error::Error, pin::Pin};
use crate::{
+ extension,
+ extension::registry_extension::Registry,
protocol::{BoxExporter, Protocol},
- registry::{
- protocol::RegistryProtocol,
- types::{Registries, RegistriesOperation},
- BoxRegistry, Registry,
- },
+ registry::protocol::RegistryProtocol,
};
use dubbo_base::Url;
use dubbo_config::{get_global_config, protocol::ProtocolRetrieve, RootConfig};
@@ -40,7 +33,7 @@
#[derive(Default)]
pub struct Dubbo {
protocols: HashMap<String, Vec<Url>>,
- registries: Option<Registries>,
+ registries: Vec<Url>,
service_registry: HashMap<String, Vec<Url>>, // registry: Urls
config: Option<&'static RootConfig>,
}
@@ -49,7 +42,7 @@
pub fn new() -> Dubbo {
Self {
protocols: HashMap::new(),
- registries: None,
+ registries: Vec::default(),
service_registry: HashMap::new(),
config: None,
}
@@ -60,14 +53,10 @@
self
}
- pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry) -> Self {
- if self.registries.is_none() {
- self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
- }
- self.registries
- .as_ref()
- .unwrap()
- .insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
+ pub fn add_registry(mut self, registry: &str) -> Self {
+ let url: Url = registry.parse().unwrap();
+ let url = extension::registry_extension::to_extension_url(url);
+ self.registries.push(url);
self
}
@@ -88,10 +77,15 @@
let protocol = root_config
.protocols
.get_protocol_or_default(service_config.protocol.as_str());
- let protocol_url =
- format!("{}/{}", protocol.to_url(), service_config.interface.clone(),);
+ let interface_name = service_config.interface.clone();
+ let protocol_url = format!(
+ "{}/{}?interface={}",
+ protocol.to_url(),
+ interface_name,
+ interface_name
+ );
tracing::info!("protocol_url: {:?}", protocol_url);
- Url::from_url(&protocol_url)
+ protocol_url.parse().ok()
} else {
return Err(format!("base {:?} not exists", service_config.protocol).into());
};
@@ -117,9 +111,20 @@
self.init().unwrap();
tracing::info!("starting...");
// TODO: server registry
+
+ let mut registry_extensions = Vec::new();
+
+ for registry_url in &self.registries {
+ let registry_url = registry_url.clone();
+ let registry_extension = extension::EXTENSIONS.load_registry(registry_url).await;
+ if let Ok(registry_extension) = registry_extension {
+ registry_extensions.push(registry_extension);
+ }
+ }
+
let mem_reg = Box::new(
RegistryProtocol::new()
- .with_registries(self.registries.as_ref().unwrap().clone())
+ .with_registries(registry_extensions.clone())
.with_services(self.service_registry.clone()),
);
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
@@ -128,14 +133,10 @@
tracing::info!("base: {:?}, service url: {:?}", name, url);
let exporter = mem_reg.clone().export(url.to_owned());
async_vec.push(exporter);
+
//TODO multiple registry
- if self.registries.is_some() {
- self.registries
- .as_ref()
- .unwrap()
- .default_registry()
- .register(url.clone())
- .unwrap();
+ for registry_extension in ®istry_extensions {
+ let _ = registry_extension.register(url.clone()).await;
}
}
}
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index bdd152b..5750337 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -196,7 +196,7 @@
fn get_method_name(&self) -> String;
}
-#[derive(Default)]
+#[derive(Default, Clone)]
pub struct RpcInvocation {
target_service_unique_name: String,
method_name: String,
diff --git a/dubbo/src/invoker/clone_body.rs b/dubbo/src/invoker/clone_body.rs
new file mode 100644
index 0000000..913910a
--- /dev/null
+++ b/dubbo/src/invoker/clone_body.rs
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{
+ collections::VecDeque,
+ pin::Pin,
+ sync::{Arc, Mutex},
+ task::{Context, Poll},
+};
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use futures_core::ready;
+
+use dubbo_base::StdError;
+use http::HeaderMap;
+use http_body::Body;
+use pin_project::pin_project;
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+#[error("buffered body reach max capacity.")]
+pub struct ReachMaxCapacityError;
+
+pub struct BufferedBody {
+ shared: Arc<Mutex<Option<OwnedBufferedBody>>>,
+ owned: Option<OwnedBufferedBody>,
+ replay_body: bool,
+ replay_trailers: bool,
+ is_empty: bool,
+ size_hint: http_body::SizeHint,
+}
+
+pub struct OwnedBufferedBody {
+ body: hyper::Body,
+ trailers: Option<HeaderMap>,
+ buf: InnerBuffer,
+}
+
+impl BufferedBody {
+ pub fn new(body: hyper::Body, buf_size: usize) -> Self {
+ let size_hint = body.size_hint();
+ let is_empty = body.is_end_stream();
+ BufferedBody {
+ shared: Default::default(),
+ owned: Some(OwnedBufferedBody {
+ body,
+ trailers: None,
+ buf: InnerBuffer {
+ bufs: Default::default(),
+ capacity: buf_size,
+ },
+ }),
+ replay_body: false,
+ replay_trailers: false,
+ is_empty,
+ size_hint,
+ }
+ }
+}
+
+impl Clone for BufferedBody {
+ fn clone(&self) -> Self {
+ Self {
+ shared: self.shared.clone(),
+ owned: None,
+ replay_body: true,
+ replay_trailers: true,
+ is_empty: self.is_empty,
+ size_hint: self.size_hint.clone(),
+ }
+ }
+}
+
+impl Drop for BufferedBody {
+ fn drop(&mut self) {
+ if let Some(owned) = self.owned.take() {
+ let lock = self.shared.lock();
+ if let Ok(mut lock) = lock {
+ *lock = Some(owned);
+ }
+ }
+ }
+}
+
+impl Body for BufferedBody {
+ type Data = BytesData;
+ type Error = StdError;
+
+ fn poll_data(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+ let mut_self = self.get_mut();
+
+ let owned_body = mut_self.owned.get_or_insert_with(|| {
+ let lock = mut_self.shared.lock();
+ if let Err(e) = lock {
+ panic!("buffered body get shared data lock failed. {}", e);
+ }
+ let mut data = lock.unwrap();
+
+ data.take().expect("cannot get shared buffered body.")
+ });
+
+ if mut_self.replay_body {
+ mut_self.replay_body = false;
+ if owned_body.buf.has_remaining() {
+ return Poll::Ready(Some(Ok(BytesData::BufferedBytes(owned_body.buf.clone()))));
+ }
+
+ if owned_body.buf.is_capped() {
+ return Poll::Ready(Some(Err(ReachMaxCapacityError.into())));
+ }
+ }
+
+ if mut_self.is_empty {
+ return Poll::Ready(None);
+ }
+
+ let mut data = {
+ let pin = Pin::new(&mut owned_body.body);
+ let data = ready!(pin.poll_data(cx));
+ match data {
+ Some(Ok(data)) => data,
+ Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
+ None => {
+ mut_self.is_empty = true;
+ return Poll::Ready(None);
+ }
+ }
+ };
+
+ let len = data.remaining();
+
+ owned_body.buf.capacity = owned_body.buf.capacity.saturating_sub(len);
+
+ let data = if owned_body.buf.is_capped() {
+ if owned_body.buf.has_remaining() {
+ owned_body.buf.bufs = VecDeque::default();
+ }
+ data.copy_to_bytes(len)
+ } else {
+ owned_body.buf.push_bytes(data.copy_to_bytes(len))
+ };
+
+ Poll::Ready(Some(Ok(BytesData::OriginBytes(data))))
+ }
+
+ fn poll_trailers(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+ let mut_self = self.get_mut();
+ let owned_body = mut_self.owned.get_or_insert_with(|| {
+ let lock = mut_self.shared.lock();
+ if let Err(e) = lock {
+ panic!("buffered body get shared data lock failed. {}", e);
+ }
+ let mut data = lock.unwrap();
+
+ data.take().expect("cannot get shared buffered body.")
+ });
+
+ if mut_self.replay_trailers {
+ mut_self.replay_trailers = false;
+ if let Some(ref trailers) = owned_body.trailers {
+ return Poll::Ready(Ok(Some(trailers.clone())));
+ }
+ }
+
+ let mut_body = &mut owned_body.body;
+ if !mut_body.is_end_stream() {
+ let trailers = ready!(Pin::new(mut_body).poll_trailers(cx)).map(|trailers| {
+ owned_body.trailers = trailers.clone();
+ trailers
+ });
+ return Poll::Ready(trailers.map_err(|e| e.into()));
+ }
+
+ Poll::Ready(Ok(None))
+ }
+
+ fn is_end_stream(&self) -> bool {
+ if self.is_empty {
+ return true;
+ }
+
+ let is_end = self
+ .owned
+ .as_ref()
+ .map(|owned| owned.body.is_end_stream())
+ .unwrap_or(false);
+
+ !self.replay_body && !self.replay_trailers && is_end
+ }
+
+ fn size_hint(&self) -> http_body::SizeHint {
+ self.size_hint.clone()
+ }
+}
+
+#[derive(Clone)]
+pub struct InnerBuffer {
+ bufs: VecDeque<Bytes>,
+ capacity: usize,
+}
+
+impl InnerBuffer {
+ pub fn push_bytes(&mut self, bytes: Bytes) -> Bytes {
+ self.bufs.push_back(bytes.clone());
+ bytes
+ }
+
+ pub fn is_capped(&self) -> bool {
+ self.capacity == 0
+ }
+}
+
+impl Buf for InnerBuffer {
+ fn remaining(&self) -> usize {
+ self.bufs.iter().map(|bytes| bytes.remaining()).sum()
+ }
+
+ fn chunk(&self) -> &[u8] {
+ self.bufs.front().map(Buf::chunk).unwrap_or(&[])
+ }
+
+ fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
+ if dst.is_empty() {
+ return 0;
+ }
+
+ let mut filled = 0;
+
+ for bytes in self.bufs.iter() {
+ filled += bytes.chunks_vectored(&mut dst[filled..])
+ }
+
+ filled
+ }
+
+ fn advance(&mut self, mut cnt: usize) {
+ while cnt > 0 {
+ let first = self.bufs.front_mut();
+ if first.is_none() {
+ break;
+ }
+ let first = first.unwrap();
+ let first_remaining = first.remaining();
+ if first_remaining > cnt {
+ first.advance(cnt);
+ break;
+ }
+
+ first.advance(first_remaining);
+ cnt = cnt - first_remaining;
+ self.bufs.pop_front();
+ }
+ }
+
+ fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes {
+ match self.bufs.front_mut() {
+ Some(buf) if len <= buf.remaining() => {
+ let bytes = buf.copy_to_bytes(len);
+ if buf.remaining() == 0 {
+ self.bufs.pop_front();
+ }
+ bytes
+ }
+ _ => {
+ let mut bytes = BytesMut::with_capacity(len);
+ bytes.put(self.take(len));
+ bytes.freeze()
+ }
+ }
+ }
+}
+
+pub enum BytesData {
+ BufferedBytes(InnerBuffer),
+ OriginBytes(Bytes),
+}
+
+impl Buf for BytesData {
+ fn remaining(&self) -> usize {
+ match self {
+ BytesData::BufferedBytes(bytes) => bytes.remaining(),
+ BytesData::OriginBytes(bytes) => bytes.remaining(),
+ }
+ }
+
+ fn chunk(&self) -> &[u8] {
+ match self {
+ BytesData::BufferedBytes(bytes) => bytes.chunk(),
+ BytesData::OriginBytes(bytes) => bytes.chunk(),
+ }
+ }
+
+ fn advance(&mut self, cnt: usize) {
+ match self {
+ BytesData::BufferedBytes(bytes) => bytes.advance(cnt),
+ BytesData::OriginBytes(bytes) => bytes.advance(cnt),
+ }
+ }
+
+ fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes {
+ match self {
+ BytesData::BufferedBytes(bytes) => bytes.copy_to_bytes(len),
+ BytesData::OriginBytes(bytes) => bytes.copy_to_bytes(len),
+ }
+ }
+}
+
+#[pin_project]
+pub struct CloneBody(#[pin] BufferedBody);
+
+impl CloneBody {
+ pub fn new(inner_body: hyper::Body) -> Self {
+ let inner_body = BufferedBody::new(inner_body, 1024 * 64);
+ CloneBody(inner_body)
+ }
+}
+
+impl Body for CloneBody {
+ type Data = BytesData;
+
+ type Error = StdError;
+
+ fn poll_data(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+ self.project().0.poll_data(cx)
+ }
+
+ fn poll_trailers(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+ self.project().0.poll_trailers(cx)
+ }
+
+ fn size_hint(&self) -> http_body::SizeHint {
+ self.0.size_hint()
+ }
+}
+
+impl Clone for CloneBody {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
diff --git a/dubbo/src/invoker/clone_invoker.rs b/dubbo/src/invoker/clone_invoker.rs
new file mode 100644
index 0000000..557d76e
--- /dev/null
+++ b/dubbo/src/invoker/clone_invoker.rs
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{mem, pin::Pin, task::Poll};
+
+use dubbo_base::StdError;
+use dubbo_logger::tracing::debug;
+use futures_core::{future::BoxFuture, ready, Future, TryFuture};
+use futures_util::FutureExt;
+use pin_project::pin_project;
+use thiserror::Error;
+use tokio::{
+ sync::{
+ self,
+ watch::{Receiver, Sender},
+ },
+ task::JoinHandle,
+};
+use tokio_util::sync::ReusableBoxFuture;
+use tower::{buffer::Buffer, ServiceExt};
+use tower_service::Service;
+
+use super::clone_body::CloneBody;
+
+enum Inner<S> {
+ Invalid,
+ Ready(S),
+ Pending(JoinHandle<Result<S, (S, StdError)>>),
+}
+
+#[derive(Debug, Error)]
+#[error("the inner service has not got ready yet!")]
+struct InnerServiceNotReadyErr;
+
+#[pin_project(project = InnerServiceCallingResponseProj)]
+enum InnerServiceCallingResponse<Fut> {
+ Call(#[pin] Fut),
+ Fail,
+}
+
+impl<Fut> Future for InnerServiceCallingResponse<Fut>
+where
+ Fut: TryFuture,
+ Fut::Error: Into<StdError>,
+{
+ type Output = Result<Fut::Ok, StdError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ match self.project() {
+ InnerServiceCallingResponseProj::Call(call) => call.try_poll(cx).map_err(Into::into),
+ InnerServiceCallingResponseProj::Fail => {
+ Poll::Ready(Err(InnerServiceNotReadyErr.into()))
+ }
+ }
+ }
+}
+
+#[derive(Clone)]
+enum ObserveState {
+ Ready,
+ Pending,
+}
+
+struct ReadyService<S> {
+ inner: Inner<S>,
+ tx: Sender<ObserveState>,
+}
+
+impl<S> ReadyService<S> {
+ fn new(inner: S) -> (Self, Receiver<ObserveState>) {
+ let (tx, rx) = sync::watch::channel(ObserveState::Ready);
+ let ready_service = Self {
+ inner: Inner::Ready(inner),
+ tx,
+ };
+ (ready_service, rx)
+ }
+}
+
+impl<S, Req> Service<Req> for ReadyService<S>
+where
+ S: Service<Req> + Send + 'static,
+ <S as Service<Req>>::Error: Into<StdError>,
+{
+ type Response = S::Response;
+
+ type Error = StdError;
+
+ type Future = InnerServiceCallingResponse<S::Future>;
+
+ fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ loop {
+ match mem::replace(&mut self.inner, Inner::Invalid) {
+ Inner::Ready(mut svc) => {
+ let poll_ready = svc.poll_ready(cx);
+ match poll_ready {
+ Poll::Pending => {
+ self.inner = Inner::Pending(tokio::spawn(async move {
+ let poll_ready = svc.ready().await;
+ match poll_ready {
+ Ok(_) => Ok(svc),
+ Err(err) => Err((svc, err.into())),
+ }
+ }));
+
+ let _ = self.tx.send(ObserveState::Pending);
+ continue;
+ }
+ Poll::Ready(ret) => {
+ self.inner = Inner::Ready(svc);
+
+ let _ = self.tx.send(ObserveState::Ready);
+ return Poll::Ready(ret.map_err(Into::into));
+ }
+ }
+ }
+ Inner::Pending(mut join_handle) => {
+ if let Poll::Ready(res) = join_handle.poll_unpin(cx) {
+ let (svc, res) = match res {
+ Err(join_err) => panic!("ReadyService panicked: {join_err}"),
+ Ok(Err((svc, err))) => (svc, Poll::Ready(Err(err))),
+ Ok(Ok(svc)) => (svc, Poll::Ready(Ok(()))),
+ };
+
+ self.inner = Inner::Ready(svc);
+
+ let _ = self.tx.send(ObserveState::Ready);
+ return res;
+ } else {
+ self.inner = Inner::Pending(join_handle);
+
+ let _ = self.tx.send(ObserveState::Pending);
+ return Poll::Pending;
+ }
+ }
+ Inner::Invalid => panic!("ReadyService panicked: inner state is invalid"),
+ }
+ }
+ }
+
+ fn call(&mut self, req: Req) -> Self::Future {
+ match self.inner {
+ Inner::Ready(ref mut svc) => InnerServiceCallingResponse::Call(svc.call(req)),
+ _ => InnerServiceCallingResponse::Fail,
+ }
+ }
+}
+
+impl<S> Drop for ReadyService<S> {
+ fn drop(&mut self) {
+ if let Inner::Pending(ref handler) = self.inner {
+ handler.abort();
+ }
+ }
+}
+
+pub struct CloneInvoker<Inv>
+where
+ Inv: Service<http::Request<CloneBody>> + Send + 'static,
+ Inv::Error: Into<StdError> + Send + Sync + 'static,
+ Inv::Future: Send,
+{
+ inner: Buffer<ReadyService<Inv>, http::Request<CloneBody>>,
+ rx: Receiver<ObserveState>,
+ poll: ReusableBoxFuture<'static, ObserveState>,
+ polling: bool,
+}
+
+impl<Inv> CloneInvoker<Inv>
+where
+ Inv: Service<http::Request<CloneBody>> + Send + 'static,
+ Inv::Error: Into<StdError> + Send + Sync + 'static,
+ Inv::Future: Send,
+{
+ const MAX_INVOKER_BUFFER_SIZE: usize = 16;
+
+ pub fn new(invoker: Inv) -> Self {
+ let (ready_service, rx) = ReadyService::new(invoker);
+
+ let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> =
+ Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE);
+
+ Self {
+ inner: buffer,
+ rx,
+ polling: false,
+ poll: ReusableBoxFuture::new(futures::future::pending()),
+ }
+ }
+}
+
+impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv>
+where
+ Inv: Service<http::Request<CloneBody>> + Send + 'static,
+ Inv::Error: Into<StdError> + Send + Sync + 'static,
+ Inv::Future: Send,
+{
+ type Response = Inv::Response;
+
+ type Error = StdError;
+
+ type Future = BoxFuture<'static, Result<Self::Response, StdError>>;
+
+ fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ loop {
+ if !self.polling {
+ match self.rx.borrow().clone() {
+ ObserveState::Ready => return self.inner.poll_ready(cx),
+ ObserveState::Pending => {
+ self.polling = true;
+ let mut rx = self.rx.clone();
+ self.poll.set(async move {
+ loop {
+ let current_state = rx.borrow_and_update().clone();
+ if matches!(current_state, ObserveState::Ready) {
+ return current_state;
+ }
+ if let Err(_) = rx.changed().await {
+ debug!("the readyService has already shutdown!");
+ futures::future::pending::<ObserveState>().await;
+ }
+ }
+ });
+ }
+ }
+ }
+
+ let state = ready!(self.poll.poll_unpin(cx));
+ self.polling = false;
+
+ if matches!(state, ObserveState::Pending) {
+ continue;
+ }
+
+ return self.inner.poll_ready(cx);
+ }
+ }
+
+ fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+ Box::pin(self.inner.call(req))
+ }
+}
+
+impl<Inv> Clone for CloneInvoker<Inv>
+where
+ Inv: Service<http::Request<CloneBody>> + Send + 'static,
+ Inv::Error: Into<StdError> + Send + Sync + 'static,
+ Inv::Future: Send,
+{
+ fn clone(&self) -> Self {
+ Self {
+ inner: self.inner.clone(),
+ rx: self.rx.clone(),
+ polling: false,
+ poll: ReusableBoxFuture::new(futures::future::pending()),
+ }
+ }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/invoker/mod.rs
similarity index 63%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/invoker/mod.rs
index 5a84af8..1c87c0e 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/invoker/mod.rs
@@ -14,5 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub mod random;
-pub mod roundrobin;
+use crate::{codegen::TripleInvoker, invoker::clone_invoker::CloneInvoker, svc::NewService};
+
+pub mod clone_body;
+pub mod clone_invoker;
+
+pub struct NewInvoker;
+
+impl NewService<String> for NewInvoker {
+ type Service = CloneInvoker<TripleInvoker>;
+
+ fn new_service(&self, url: String) -> Self::Service {
+ // todo create another invoker by url protocol
+
+ let url = url.parse().unwrap();
+ CloneInvoker::new(TripleInvoker::new(url))
+ }
+}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 63c09d3..1a521a2 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -18,12 +18,19 @@
pub mod cluster;
pub mod codegen;
pub mod context;
+pub mod directory;
+pub mod extension;
pub mod filter;
mod framework;
pub mod invocation;
+pub mod invoker;
+pub mod loadbalancer;
+pub mod param;
pub mod protocol;
pub mod registry;
+pub mod route;
pub mod status;
+pub mod svc;
pub mod triple;
pub mod utils;
@@ -32,7 +39,6 @@
pub use framework::Dubbo;
-pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type BoxFuture<T, E> = self::Pin<Box<dyn self::Future<Output = Result<T, E>> + Send + 'static>>;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, self::status::Status>;
diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs
new file mode 100644
index 0000000..74f2217
--- /dev/null
+++ b/dubbo/src/loadbalancer/mod.rs
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use dubbo_base::StdError;
+use futures_core::future::BoxFuture;
+use tower::{discover::ServiceList, ServiceExt};
+use tower_service::Service;
+
+use crate::{
+ codegen::RpcInvocation,
+ invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker},
+ param::Param,
+ svc::NewService,
+};
+
+use crate::protocol::triple::triple_invoker::TripleInvoker;
+
+pub struct NewLoadBalancer<N> {
+ inner: N,
+}
+
+#[derive(Clone)]
+pub struct LoadBalancer<S> {
+ inner: S, // Routes service
+}
+
+impl<N> NewLoadBalancer<N> {
+ pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+ tower_layer::layer_fn(|inner| {
+ NewLoadBalancer {
+ inner, // NewRoutes
+ }
+ })
+ }
+}
+
+impl<N, T> NewService<T> for NewLoadBalancer<N>
+where
+ T: Param<RpcInvocation> + Clone,
+ // NewRoutes
+ N: NewService<T>,
+{
+ type Service = LoadBalancer<N::Service>;
+
+ fn new_service(&self, target: T) -> Self::Service {
+ // Routes service
+ let svc = self.inner.new_service(target);
+
+ LoadBalancer { inner: svc }
+ }
+}
+
+impl<N> Service<http::Request<CloneBody>> for LoadBalancer<N>
+where
+ // Routes service
+ N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Clone,
+ N::Error: Into<StdError> + Send,
+ N::Future: Send + 'static,
+{
+ type Response = <CloneInvoker<TripleInvoker> as Service<http::Request<CloneBody>>>::Response;
+
+ type Error = StdError;
+
+ type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(
+ &mut self,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx).map_err(Into::into)
+ }
+
+ fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+ let routes = self.inner.call(());
+
+ let fut = async move {
+ let routes = routes.await;
+
+ let routes: Vec<CloneInvoker<TripleInvoker>> = match routes {
+ Err(e) => return Err(Into::<StdError>::into(e)),
+ Ok(routes) => routes,
+ };
+
+ let service_list: Vec<_> = routes
+ .into_iter()
+ .map(|invoker| tower::load::Constant::new(invoker, 1))
+ .collect();
+
+ let service_list = ServiceList::new(service_list);
+
+ let p2c = tower::balance::p2c::Balance::new(service_list);
+
+ p2c.oneshot(req).await
+ };
+
+ Box::pin(fut)
+ }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/param.rs
similarity index 82%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/param.rs
index 5a84af8..298c3b3 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/param.rs
@@ -14,5 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub mod random;
-pub mod roundrobin;
+
+pub trait Param<T> {
+ fn param(&self) -> T;
+}
+
+impl<T: ToOwned> Param<T::Owned> for T {
+ fn param(&self) -> T::Owned {
+ self.to_owned()
+ }
+}
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 145bcc8..7dbf1f3 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -15,15 +15,10 @@
* limitations under the License.
*/
-use std::{
- fmt::Debug,
- future::Future,
- task::{Context, Poll},
-};
+use std::task::{Context, Poll};
use async_trait::async_trait;
use aws_smithy_http::body::SdkBody;
-use dyn_clone::DynClone;
use tower_service::Service;
use dubbo_base::Url;
@@ -44,18 +39,8 @@
fn unexport(&self);
}
-pub trait Invoker<ReqBody>: Debug + DynClone {
- type Response;
-
- type Error;
-
- type Future: Future<Output = Result<Self::Response, Self::Error>>;
-
+pub trait Invoker<ReqBody>: Service<ReqBody> {
fn get_url(&self) -> Url;
-
- fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
-
- fn call(&mut self, req: ReqBody) -> Self::Future;
}
pub type BoxExporter = Box<dyn Exporter + Send + Sync>;
@@ -69,15 +54,6 @@
+ Sync,
>;
-dyn_clone::clone_trait_object!(
- Invoker<
- http::Request<SdkBody>,
- Response = http::Response<crate::BoxBody>,
- Error = crate::Error,
- Future = crate::BoxFuture<http::Response<crate::BoxBody>, crate::Error>,
- >
-);
-
pub struct WrapperInvoker<T>(T);
impl<T, ReqBody> Service<http::Request<ReqBody>> for WrapperInvoker<T>
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index 42dfebe..cb6b08c 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
+use http::{HeaderValue, Uri};
use std::{
fmt::{Debug, Formatter},
str::FromStr,
@@ -24,27 +24,21 @@
use tower_service::Service;
use crate::{
- protocol::Invoker,
- triple::{client::builder::ClientBoxService, transport::connection::Connection},
- utils::boxed_clone::BoxCloneService,
+ invoker::clone_body::CloneBody,
+ triple::transport::{self, connection::Connection},
};
-#[derive(Clone)]
pub struct TripleInvoker {
url: Url,
- conn: ClientBoxService,
+ conn: Connection,
}
impl TripleInvoker {
pub fn new(url: Url) -> TripleInvoker {
- let uri = http::Uri::from_str(&url.to_url()).unwrap();
- let mut conn = Connection::new().with_host(uri.clone());
- if let Some(scheme) = uri.scheme_str() {
- conn = conn.with_connector(scheme.to_string());
- }
+ let uri = http::Uri::from_str(url.as_str()).unwrap();
Self {
url,
- conn: BoxCloneService::new(conn),
+ conn: Connection::new().with_host(uri).build(),
}
}
}
@@ -55,25 +49,107 @@
}
}
-impl Invoker<http::Request<SdkBody>> for TripleInvoker {
+impl TripleInvoker {
+ pub fn map_request(&self, req: http::Request<CloneBody>) -> http::Request<CloneBody> {
+ let (parts, body) = req.into_parts();
+
+ let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap();
+
+ let authority = self.url.authority();
+
+ let uri = Uri::builder()
+ .scheme("http")
+ .authority(authority)
+ .path_and_query(path_and_query)
+ .build()
+ .unwrap();
+
+ let mut req = hyper::Request::builder()
+ .version(http::Version::HTTP_2)
+ .uri(uri.clone())
+ .method("POST")
+ .body(body)
+ .unwrap();
+
+ // *req.version_mut() = http::Version::HTTP_2;
+ req.headers_mut()
+ .insert("method", HeaderValue::from_static("POST"));
+ req.headers_mut().insert(
+ "scheme",
+ HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
+ );
+ req.headers_mut()
+ .insert("path", HeaderValue::from_str(uri.path()).unwrap());
+ req.headers_mut().insert(
+ "authority",
+ HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
+ );
+ req.headers_mut().insert(
+ "content-type",
+ HeaderValue::from_static("application/grpc+proto"),
+ );
+ req.headers_mut()
+ .insert("user-agent", HeaderValue::from_static("dubbo-rust/0.1.0"));
+ req.headers_mut()
+ .insert("te", HeaderValue::from_static("trailers"));
+ req.headers_mut().insert(
+ "tri-service-version",
+ HeaderValue::from_static("dubbo-rust/0.1.0"),
+ );
+ req.headers_mut()
+ .insert("tri-service-group", HeaderValue::from_static("cluster"));
+ req.headers_mut().insert(
+ "tri-unit-info",
+ HeaderValue::from_static("dubbo-rust/0.1.0"),
+ );
+ // if let Some(_encoding) = self.send_compression_encoding {
+
+ // }
+
+ req.headers_mut()
+ .insert("grpc-encoding", http::HeaderValue::from_static("gzip"));
+
+ req.headers_mut().insert(
+ "grpc-accept-encoding",
+ http::HeaderValue::from_static("gzip"),
+ );
+
+ // // const (
+ // // TripleContentType = "application/grpc+proto"
+ // // TripleUserAgent = "grpc-go/1.35.0-dev"
+ // // TripleServiceVersion = "tri-service-version"
+ // // TripleAttachement = "tri-attachment"
+ // // TripleServiceGroup = "tri-service-group"
+ // // TripleRequestID = "tri-req-id"
+ // // TripleTraceID = "tri-trace-traceid"
+ // // TripleTraceRPCID = "tri-trace-rpcid"
+ // // TripleTraceProtoBin = "tri-trace-proto-bin"
+ // // TripleUnitInfo = "tri-unit-info"
+ // // )
+ req
+ }
+}
+
+impl Service<http::Request<CloneBody>> for TripleInvoker {
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
type Future = crate::BoxFuture<Self::Response, Self::Error>;
- fn get_url(&self) -> Url {
- self.url.clone()
- }
-
- fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
- self.conn.call(req)
- }
-
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
- self.conn.poll_ready(cx)
+ <transport::connection::Connection as Service<http::Request<CloneBody>>>::poll_ready(
+ &mut self.conn,
+ cx,
+ )
+ }
+
+ fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+ let req = self.map_request(req);
+
+ self.conn.call(req)
}
}
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs
index 71f6edc..27174ba 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-use std::{boxed::Box, collections::HashMap};
+use std::collections::HashMap;
use async_trait::async_trait;
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
use super::{
triple_exporter::TripleExporter, triple_invoker::TripleInvoker, triple_server::TripleServer,
@@ -44,8 +44,9 @@
}
pub fn get_server(&self, url: Url) -> Option<TripleServer> {
+ let interface_name = url.query::<InterfaceName>().unwrap();
self.servers
- .get(&url.service_key)
+ .get(interface_name.value().as_str())
.map(|data| data.to_owned())
}
}
@@ -61,8 +62,12 @@
async fn export(mut self, url: Url) -> BoxExporter {
// service_key is same to key of TRIPLE_SERVICES
let server = TripleServer::new();
- self.servers.insert(url.service_key.clone(), server.clone());
- server.serve(url.short_url().as_str().into()).await;
+
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ self.servers.insert(interface_name, server.clone());
+ server.serve(url).await;
Box::new(TripleExporter::new())
}
diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs
index db878d6..093c16e 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-#![allow(unused_variables, dead_code, missing_docs)]
use dubbo_logger::tracing::debug;
use std::{
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 2a95452..ac4c012 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -15,48 +15,42 @@
* limitations under the License.
*/
-#![allow(unused_variables, dead_code, missing_docs)]
-pub mod integration;
-pub mod memory_registry;
-pub mod protocol;
-pub mod types;
-
+use crate::{extension, extension::registry_extension::proxy::RegistryProxy};
+use dubbo_base::{StdError, Url};
use std::{
- fmt::{Debug, Formatter},
- sync::Arc,
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
};
+use tower_service::Service;
-use dubbo_base::Url;
+pub mod integration;
+pub mod protocol;
+pub mod registry;
-pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 'static>;
-pub trait Registry {
- fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
- fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;
-
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), crate::StdError>;
- fn unsubscribe(
- &self,
- url: Url,
- listener: RegistryNotifyListener,
- ) -> Result<(), crate::StdError>;
+#[derive(Clone)]
+pub struct MkRegistryService {
+ registry_url: Url,
}
-pub trait NotifyListener {
- fn notify(&self, event: ServiceEvent);
- fn notify_all(&self, event: ServiceEvent);
+impl MkRegistryService {
+ pub fn new(registry_url: Url) -> Self {
+ Self { registry_url }
+ }
}
-#[derive(Debug)]
-pub struct ServiceEvent {
- pub key: String,
- pub action: String,
- pub service: Vec<Url>,
-}
+impl Service<()> for MkRegistryService {
+ type Response = RegistryProxy;
+ type Error = StdError;
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
-pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
-impl Debug for BoxRegistry {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.write_str("BoxRegistry")
+ fn call(&mut self, _req: ()) -> Self::Future {
+ let fut = extension::EXTENSIONS.load_registry(self.registry_url.clone());
+ Box::pin(fut)
}
}
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 1250950..f909988 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -15,57 +15,43 @@
* limitations under the License.
*/
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
use dubbo_logger::tracing;
use std::{
collections::HashMap,
- fmt::{Debug, Formatter},
- sync::{Arc, Mutex, RwLock},
+ sync::{Arc, RwLock},
};
-use super::{memory_registry::MemoryRegistry, BoxRegistry};
use crate::{
+ extension::registry_extension::{proxy::RegistryProxy, Registry},
protocol::{
triple::{triple_exporter::TripleExporter, triple_protocol::TripleProtocol},
BoxExporter, BoxInvoker, Protocol,
},
- registry::types::Registries,
};
#[derive(Clone, Default)]
pub struct RegistryProtocol {
// registerAddr: Registry
- registries: Option<Registries>,
+ registries: Vec<RegistryProxy>,
// providerUrl: Exporter
+ #[allow(dead_code)]
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
// serviceName: registryUrls
services: HashMap<String, Vec<Url>>,
}
-impl Debug for RegistryProtocol {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.write_str(
- format!(
- "RegistryProtocol services{:#?},registries,{:#?}",
- self.services,
- self.registries.clone()
- )
- .as_str(),
- )
- }
-}
-
impl RegistryProtocol {
pub fn new() -> Self {
RegistryProtocol {
- registries: None,
+ registries: Vec::default(),
exporters: Arc::new(RwLock::new(HashMap::new())),
services: HashMap::new(),
}
}
- pub fn with_registries(mut self, registries: Registries) -> Self {
- self.registries = Some(registries);
+ pub fn with_registries(mut self, registries: Vec<RegistryProxy>) -> Self {
+ self.registries.extend(registries);
self
}
@@ -73,18 +59,6 @@
self.services.extend(services);
self
}
-
- pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
- let mem = MemoryRegistry::default();
- self.registries
- .as_ref()
- .unwrap()
- .lock()
- .unwrap()
- .insert(url.location, Arc::new(Mutex::new(Box::new(mem.clone()))));
-
- Box::new(mem)
- }
}
#[async_trait::async_trait]
@@ -101,29 +75,29 @@
// init Exporter based on provider_url
// server registry based on register_url
// start server health check
- let registry_url = self.services.get(url.get_service_name().as_str());
+ let service_name = url.query::<InterfaceName>().unwrap();
+ let registry_url = self.services.get(service_name.as_str().as_ref());
if let Some(urls) = registry_url {
- for url in urls.clone().iter() {
- if !url.service_key.is_empty() {
- let mut reg = self.get_registry(url.clone());
- reg.register(url.clone()).unwrap();
+ for url in urls.iter() {
+ for registry_proxy in &self.registries {
+ let _ = registry_proxy.register(url.clone()).await;
}
}
}
- match url.clone().scheme.as_str() {
+ match url.clone().protocol() {
"tri" => {
let pro = Box::new(TripleProtocol::new());
return pro.export(url).await;
}
_ => {
- tracing::error!("base {:?} not implemented", url.scheme);
+ tracing::error!("base {:?} not implemented", url.protocol());
Box::new(TripleExporter::new())
}
}
}
- async fn refer(self, url: Url) -> Self::Invoker {
+ async fn refer(self, _url: Url) -> Self::Invoker {
// getRegisterUrl
// get Registry from registry_url
// init directory based on registry_url and Registry
diff --git a/dubbo/src/registry/registry.rs b/dubbo/src/registry/registry.rs
new file mode 100644
index 0000000..2fc8258
--- /dev/null
+++ b/dubbo/src/registry/registry.rs
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::collections::{HashMap, HashSet};
+
+use async_trait::async_trait;
+use itertools::Itertools;
+
+use tokio::sync::{
+ mpsc::{self},
+ Mutex,
+};
+
+use dubbo_base::{
+ extension_param::{ExtensionName, ExtensionType},
+ registry_param::{InterfaceName, RegistryUrl, StaticInvokerUrls},
+ url::UrlParam,
+ StdError, Url,
+};
+
+use crate::extension::{
+ registry_extension::{DiscoverStream, Registry, ServiceChange},
+ Extension,
+};
+
+pub struct StaticServiceValues {
+ listeners: Vec<mpsc::Sender<Result<ServiceChange, StdError>>>,
+ urls: HashSet<String>,
+}
+
+pub struct StaticRegistry {
+ urls: Mutex<HashMap<String, StaticServiceValues>>,
+ self_url: Url,
+}
+
+impl StaticRegistry {
+ pub fn to_extension_url(static_invoker_urls: Vec<Url>) -> Url {
+ let static_invoker_urls: StaticInvokerUrls =
+ static_invoker_urls.iter().join(",").parse().unwrap();
+ let mut static_registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap();
+
+ static_registry_extension_loader_url.add_query_param(ExtensionType::Registry);
+ static_registry_extension_loader_url.add_query_param(ExtensionName::new(Self::name()));
+ static_registry_extension_loader_url
+ .add_query_param(RegistryUrl::new("static://127.0.0.1".parse().unwrap()));
+ static_registry_extension_loader_url.add_query_param(static_invoker_urls);
+
+ static_registry_extension_loader_url
+ }
+}
+
+impl StaticRegistry {
+ pub fn new(url: Url) -> Self {
+ let static_urls = url.query::<StaticInvokerUrls>();
+ let static_urls = match static_urls {
+ None => Vec::default(),
+ Some(static_urls) => static_urls.value(),
+ };
+
+ let mut map = HashMap::with_capacity(static_urls.len());
+
+ for url in static_urls {
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ let static_values = map
+ .entry(interface_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+ let url = url.to_string();
+ static_values.urls.insert(url.clone());
+ }
+
+ let self_url = "static://0.0.0.0".parse().unwrap();
+
+ Self {
+ urls: Mutex::new(map),
+ self_url,
+ }
+ }
+}
+
+impl Default for StaticRegistry {
+ fn default() -> Self {
+ let self_url = "static://0.0.0.0".parse().unwrap();
+
+ Self {
+ self_url,
+ urls: Mutex::new(HashMap::new()),
+ }
+ }
+}
+#[async_trait]
+impl Registry for StaticRegistry {
+ async fn register(&self, url: Url) -> Result<(), StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ let mut lock = self.urls.lock().await;
+
+ let static_values = lock
+ .entry(interface_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+ let url = url.to_string();
+ static_values.urls.insert(url.clone());
+
+ static_values.listeners.retain(|listener| {
+ let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
+ ret.is_ok()
+ });
+
+ Ok(())
+ }
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ let mut lock = self.urls.lock().await;
+
+ match lock.get_mut(&interface_name) {
+ None => Ok(()),
+ Some(static_values) => {
+ let url = url.to_string();
+ static_values.urls.remove(&url);
+ static_values.listeners.retain(|listener| {
+ let ret = listener.try_send(Ok(ServiceChange::Remove(url.clone())));
+ ret.is_ok()
+ });
+ if static_values.urls.is_empty() {
+ lock.remove(&interface_name);
+ }
+ Ok(())
+ }
+ }
+ }
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ let change_rx = {
+ let mut lock = self.urls.lock().await;
+ let static_values = lock
+ .entry(interface_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+
+ let (tx, change_rx) = mpsc::channel(64);
+ static_values.listeners.push(tx);
+
+ for listener in &static_values.listeners {
+ for url in &static_values.urls {
+ let _ = listener
+ .send(Ok(ServiceChange::Insert(url.clone(), ())))
+ .await;
+ }
+ }
+
+ change_rx
+ };
+
+ Ok(change_rx)
+ }
+
+ async fn unsubscribe(&self, _url: Url) -> Result<(), StdError> {
+ Ok(())
+ }
+
+ fn url(&self) -> &Url {
+ &self.self_url
+ }
+}
+
+#[async_trait::async_trait]
+impl Extension for StaticRegistry {
+ type Target = Box<dyn Registry + Send + Sync + 'static>;
+
+ fn name() -> String {
+ "static".to_string()
+ }
+
+ async fn create(url: Url) -> Result<Self::Target, StdError> {
+ // url example:
+ // extension://0.0.0.0?extension-type=registry&extension-name=static®istry=static://127.0.0.1
+ let static_invoker_urls = url.query::<StaticInvokerUrls>();
+
+ let registry_url = url.query::<RegistryUrl>().unwrap();
+ let mut registry_url = registry_url.value();
+
+ if let Some(static_invoker_urls) = static_invoker_urls {
+ registry_url.add_query_param(static_invoker_urls);
+ }
+
+ let static_registry = StaticRegistry::new(registry_url);
+
+ Ok(Box::new(static_registry))
+ }
+}
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
deleted file mode 100644
index ae7c7ca..0000000
--- a/dubbo/src/registry/types.rs
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use std::{
- collections::HashMap,
- sync::{Arc, Mutex},
-};
-
-use dubbo_base::Url;
-use dubbo_logger::tracing::info;
-use itertools::Itertools;
-
-use crate::{
- registry::{BoxRegistry, Registry},
- StdError,
-};
-
-use super::RegistryNotifyListener;
-
-pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
-pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;
-
-pub const DEFAULT_REGISTRY_KEY: &str = "default";
-
-pub trait RegistriesOperation {
- fn get(&self, registry_key: &str) -> SafeRegistry;
- fn insert(&self, registry_key: String, registry: SafeRegistry);
- fn default_registry(&self) -> SafeRegistry;
-}
-
-impl RegistriesOperation for Registries {
- fn get(&self, registry_key: &str) -> SafeRegistry {
- self.as_ref()
- .lock()
- .unwrap()
- .get(registry_key)
- .unwrap()
- .clone()
- }
-
- fn insert(&self, registry_key: String, registry: SafeRegistry) {
- self.as_ref().lock().unwrap().insert(registry_key, registry);
- }
-
- fn default_registry(&self) -> SafeRegistry {
- let guard = self.as_ref().lock().unwrap();
- let (_, result) = guard
- .iter()
- .find_or_first(|e| e.0 == DEFAULT_REGISTRY_KEY)
- .unwrap()
- .to_owned();
- result.clone()
- }
-}
-
-impl Registry for SafeRegistry {
- fn register(&mut self, url: Url) -> Result<(), StdError> {
- info!("register {}.", url);
- self.lock().unwrap().register(url).expect("registry err.");
- Ok(())
- }
-
- fn unregister(&mut self, url: Url) -> Result<(), StdError> {
- self.lock().unwrap().register(url).expect("registry err.");
- Ok(())
- }
-
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
- self.lock().unwrap().register(url).expect("registry err.");
- Ok(())
- }
-
- fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
- self.lock().unwrap().register(url).expect("registry err.");
- Ok(())
- }
-}
diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs
new file mode 100644
index 0000000..28dfda7
--- /dev/null
+++ b/dubbo/src/route/mod.rs
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::pin::Pin;
+
+use dubbo_base::StdError;
+use dubbo_logger::tracing::debug;
+use futures_core::{ready, Future};
+use futures_util::{future::Ready, FutureExt, TryFutureExt};
+use tower::{buffer::Buffer, util::FutureService};
+use tower_service::Service;
+
+use crate::{
+ codegen::{RpcInvocation, TripleInvoker},
+ invoker::clone_invoker::CloneInvoker,
+ param::Param,
+ svc::NewService,
+};
+
+pub struct NewRoutes<N> {
+ inner: N,
+}
+
+pub struct NewRoutesFuture<S, T> {
+ inner: RoutesFutureInnerState<S>,
+ #[allow(dead_code)]
+ target: T,
+}
+
+pub enum RoutesFutureInnerState<S> {
+ Service(S),
+ Future(
+ Pin<
+ Box<
+ dyn Future<Output = Result<Vec<CloneInvoker<TripleInvoker>>, StdError>>
+ + Send
+ + 'static,
+ >,
+ >,
+ ),
+ Ready(Vec<CloneInvoker<TripleInvoker>>),
+}
+
+#[derive(Clone)]
+pub struct Routes<T> {
+ #[allow(dead_code)]
+ target: T,
+ invokers: Vec<CloneInvoker<TripleInvoker>>,
+}
+
+impl<N> NewRoutes<N> {
+ pub fn new(inner: N) -> Self {
+ Self { inner }
+ }
+}
+
+impl<N> NewRoutes<N> {
+ const MAX_ROUTE_BUFFER_SIZE: usize = 16;
+
+ pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+ tower_layer::layer_fn(|inner: N| NewRoutes::new(inner))
+ }
+}
+
+impl<N, T> NewService<T> for NewRoutes<N>
+where
+ T: Param<RpcInvocation> + Clone + Send + Unpin + 'static,
+ // NewDirectory
+ N: NewService<T>,
+ // Directory
+ N::Service: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin + Send + 'static,
+ <N::Service as Service<()>>::Error: Into<StdError>,
+ <N::Service as Service<()>>::Future: Send + 'static,
+{
+ type Service =
+ Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, T>, Routes<T>>, ()>;
+
+ fn new_service(&self, target: T) -> Self::Service {
+ let inner = self.inner.new_service(target.clone());
+
+ Buffer::new(
+ FutureService::new(NewRoutesFuture {
+ inner: RoutesFutureInnerState::Service(inner),
+ target,
+ }),
+ Self::MAX_ROUTE_BUFFER_SIZE,
+ )
+ }
+}
+
+impl<N, T> Future for NewRoutesFuture<N, T>
+where
+ T: Param<RpcInvocation> + Clone + Unpin,
+ // Directory
+ N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin,
+ N::Error: Into<StdError>,
+ N::Future: Send + 'static,
+{
+ type Output = Result<Routes<T>, StdError>;
+
+ fn poll(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let this = self.get_mut();
+
+ loop {
+ match this.inner {
+ RoutesFutureInnerState::Service(ref mut service) => {
+ debug!("RoutesFutureInnerState::Service");
+ let _ = ready!(service.poll_ready(cx)).map_err(Into::into)?;
+ let fut = service.call(()).map_err(|e| e.into()).boxed();
+ this.inner = RoutesFutureInnerState::Future(fut);
+ }
+ RoutesFutureInnerState::Future(ref mut futures) => {
+ debug!("RoutesFutureInnerState::Future");
+ let invokers = ready!(futures.as_mut().poll(cx))?;
+ this.inner = RoutesFutureInnerState::Ready(invokers);
+ }
+ RoutesFutureInnerState::Ready(ref invokers) => {
+ debug!("RoutesFutureInnerState::Ready");
+ let target = this.target.clone();
+ return std::task::Poll::Ready(Ok(Routes {
+ invokers: invokers.clone(),
+ target,
+ }));
+ }
+ }
+ }
+ }
+}
+
+impl<T> Service<()> for Routes<T>
+where
+ T: Param<RpcInvocation> + Clone,
+{
+ type Response = Vec<CloneInvoker<TripleInvoker>>;
+
+ type Error = StdError;
+
+ type Future = Ready<Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(
+ &mut self,
+ _: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ std::task::Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, _: ()) -> Self::Future {
+ // some router operator
+ // if new_invokers changed, send new invokers to routes_rx after router operator
+ futures_util::future::ok(self.invokers.clone())
+ }
+}
diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs
new file mode 100644
index 0000000..f846663
--- /dev/null
+++ b/dubbo/src/svc.rs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::sync::Arc;
+
+pub trait NewService<T> {
+ type Service;
+
+ fn new_service(&self, target: T) -> Self::Service;
+}
+
+pub struct ArcNewService<T, S> {
+ inner: Arc<dyn NewService<T, Service = S> + Send + Sync>,
+}
+
+impl<T, S> ArcNewService<T, S> {
+ pub fn layer<N>() -> impl tower_layer::Layer<N, Service = Self> + Clone + Copy
+ where
+ N: NewService<T, Service = S> + Send + Sync + 'static,
+ S: Send + 'static,
+ {
+ tower_layer::layer_fn(Self::new)
+ }
+
+ pub fn new<N>(inner: N) -> Self
+ where
+ N: NewService<T, Service = S> + Send + Sync + 'static,
+ S: Send + 'static,
+ {
+ Self {
+ inner: Arc::new(inner),
+ }
+ }
+}
+
+impl<T, S> Clone for ArcNewService<T, S> {
+ fn clone(&self) -> Self {
+ Self {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+impl<T, S> NewService<T> for ArcNewService<T, S> {
+ type Service = S;
+
+ fn new_service(&self, t: T) -> S {
+ self.inner.new_service(t)
+ }
+}
diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs
index 9dae0f9..94c855b 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -18,28 +18,27 @@
use std::sync::Arc;
use crate::{
- cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory},
- codegen::{RegistryDirectory, RpcInvocation, TripleInvoker},
- protocol::BoxInvoker,
- triple::compression::CompressionEncoding,
- utils::boxed_clone::BoxCloneService,
+ cluster::NewCluster, directory::NewCachedDirectory, extension, loadbalancer::NewLoadBalancer,
+ route::NewRoutes, utils::boxed_clone::BoxCloneService,
};
+use crate::registry::{registry::StaticRegistry, MkRegistryService};
use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
-
-use super::TripleClient;
+use tower::ServiceBuilder;
pub type ClientBoxService =
BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, crate::Error>;
-#[derive(Clone, Debug, Default)]
+pub type ServiceMK =
+ Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<MkRegistryService>>>>>;
+
+#[derive(Default)]
pub struct ClientBuilder {
pub timeout: Option<u64>,
pub connector: &'static str,
- directory: Option<Box<dyn Directory>>,
+ registry_extension_url: Option<Url>,
pub direct: bool,
- host: String,
}
impl ClientBuilder {
@@ -47,19 +46,18 @@
ClientBuilder {
timeout: None,
connector: "",
- directory: None,
+ registry_extension_url: None,
direct: false,
- host: "".to_string(),
}
}
pub fn from_static(host: &str) -> ClientBuilder {
+ let registry_extension_url = StaticRegistry::to_extension_url(vec![host.parse().unwrap()]);
Self {
timeout: None,
connector: "",
- directory: Some(Box::new(StaticDirectory::new(&host))),
+ registry_extension_url: Some(registry_extension_url),
direct: true,
- host: host.to_string(),
}
}
@@ -70,64 +68,44 @@
}
}
- /// host: http://0.0.0.0:8888
- pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
+ pub fn with_registry(self, registry: Url) -> Self {
+ let registry_extension_url = extension::registry_extension::to_extension_url(registry);
Self {
- directory: Some(directory),
- ..self
- }
- }
-
- pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self {
- Self {
- directory: Some(Box::new(registry)),
+ registry_extension_url: Some(registry_extension_url),
..self
}
}
pub fn with_host(self, host: &'static str) -> Self {
+ let registry_extension_url = StaticRegistry::to_extension_url(vec![host.parse().unwrap()]);
+
Self {
- directory: Some(Box::new(StaticDirectory::new(&host))),
+ registry_extension_url: Some(registry_extension_url),
..self
}
}
pub fn with_connector(self, connector: &'static str) -> Self {
- Self {
- connector: connector,
- ..self
- }
+ Self { connector, ..self }
}
pub fn with_direct(self, direct: bool) -> Self {
Self { direct, ..self }
}
- pub(crate) fn direct_build(self) -> TripleClient {
- let mut cli = TripleClient {
- send_compression_encoding: Some(CompressionEncoding::Gzip),
- builder: Some(self.clone()),
- invoker: None,
- };
- cli.invoker = Some(Box::new(TripleInvoker::new(
- Url::from_url(&self.host).unwrap(),
- )));
- return cli;
- }
+ pub fn build(mut self) -> ServiceMK {
+ let registry = self
+ .registry_extension_url
+ .take()
+ .expect("registry must not be empty");
- pub fn build(self, invocation: Arc<RpcInvocation>) -> Option<BoxInvoker> {
- if self.direct {
- return Some(Box::new(TripleInvoker::new(
- Url::from_url(&self.host).unwrap(),
- )));
- }
- let invokers = match self.directory {
- Some(v) => v.list(invocation),
- None => panic!("use direct connection"),
- };
+ let mk_service = ServiceBuilder::new()
+ .layer(NewCluster::layer())
+ .layer(NewLoadBalancer::layer())
+ .layer(NewRoutes::layer())
+ .layer(NewCachedDirectory::layer())
+ .service(MkRegistryService::new(registry));
- let cluster = MockCluster::default().join(Box::new(MockDirectory::new(invokers)));
-
- return Some(cluster);
+ Arc::new(mk_service)
}
}
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index d4bc220..46f8e9c 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,22 +15,19 @@
* limitations under the License.
*/
-use std::str::FromStr;
-
-use futures_util::{future, stream, StreamExt, TryStreamExt};
-
use aws_smithy_http::body::SdkBody;
+use futures_util::{future, stream, StreamExt, TryStreamExt};
use http::HeaderValue;
use prost::Message;
use serde::{Deserialize, Serialize};
+use tower_service::Service;
-use super::builder::ClientBuilder;
use crate::codegen::{ProstCodec, RpcInvocation, SerdeCodec};
use crate::{
invocation::{IntoStreamingRequest, Metadata, Request, Response},
- protocol::BoxInvoker,
status::Status,
+ svc::NewService,
triple::{
codec::{Codec, Decoder, Encoder},
compression::CompressionEncoding,
@@ -39,25 +36,29 @@
},
};
-#[derive(Debug, Clone, Default)]
+use super::builder::{ClientBuilder, ServiceMK};
+
+#[derive(Clone)]
pub struct TripleClient {
pub(crate) send_compression_encoding: Option<CompressionEncoding>,
- pub(crate) builder: Option<ClientBuilder>,
- pub invoker: Option<BoxInvoker>,
+ pub(crate) mk: ServiceMK,
}
impl TripleClient {
pub fn connect(host: String) -> Self {
let builder = ClientBuilder::from_static(&host).with_direct(true);
+ let mk = builder.build();
- builder.direct_build()
+ TripleClient {
+ send_compression_encoding: Some(CompressionEncoding::Gzip),
+ mk,
+ }
}
pub fn new(builder: ClientBuilder) -> Self {
TripleClient {
send_compression_encoding: Some(CompressionEncoding::Gzip),
- builder: Some(builder),
- invoker: None,
+ mk: builder.build(),
}
}
@@ -155,24 +156,16 @@
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
- let bytes = hyper::body::to_bytes(body).await.unwrap();
- let sdk_body = SdkBody::from(bytes);
- let mut conn = match self.invoker.clone() {
- Some(v) => v,
- None => self
- .builder
- .clone()
- .unwrap()
- .build(invocation.into())
- .unwrap(),
- };
+ let mut invoker = self.mk.new_service(invocation);
- let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
- let req = self.map_request(http_uri.clone(), path, sdk_body);
+ let request = http::Request::builder()
+ .header("path", path.to_string())
+ .body(body)
+ .unwrap();
- let response = conn
- .call(req)
+ let response = invoker
+ .call(request)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
@@ -226,23 +219,16 @@
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
- let sdk_body = SdkBody::from(body);
- let mut conn = match self.invoker.clone() {
- Some(v) => v,
- None => self
- .builder
- .clone()
- .unwrap()
- .build(invocation.into())
- .unwrap(),
- };
+ let mut invoker = self.mk.new_service(invocation);
- let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
- let req = self.map_request(http_uri.clone(), path, sdk_body);
+ let request = http::Request::builder()
+ .header("path", path.to_string())
+ .body(body)
+ .unwrap();
- let response = conn
- .call(req)
+ let response = invoker
+ .call(request)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
@@ -280,24 +266,16 @@
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
- let sdk_body = SdkBody::from(body);
+ let mut invoker = self.mk.new_service(invocation);
- let mut conn = match self.invoker.clone() {
- Some(v) => v,
- None => self
- .builder
- .clone()
- .unwrap()
- .build(invocation.into())
- .unwrap(),
- };
-
- let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
- let req = self.map_request(http_uri.clone(), path, sdk_body);
+ let request = http::Request::builder()
+ .header("path", path.to_string())
+ .body(body)
+ .unwrap();
// let mut conn = Connection::new().with_host(http_uri);
- let response = conn
- .call(req)
+ let response = invoker
+ .call(request)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
@@ -351,22 +329,15 @@
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
- let sdk_body = SdkBody::from(body);
+ let mut invoker = self.mk.new_service(invocation);
- let mut conn = match self.invoker.clone() {
- Some(v) => v,
- None => self
- .builder
- .clone()
- .unwrap()
- .build(invocation.into())
- .unwrap(),
- };
- let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
- let req = self.map_request(http_uri.clone(), path, sdk_body);
+ let request = http::Request::builder()
+ .header("path", path.to_string())
+ .body(body)
+ .unwrap();
- let response = conn
- .call(req)
+ let response = invoker
+ .call(request)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
diff --git a/dubbo/src/triple/server/builder.rs b/dubbo/src/triple/server/builder.rs
index 9ef7152..b473dd8 100644
--- a/dubbo/src/triple/server/builder.rs
+++ b/dubbo/src/triple/server/builder.rs
@@ -21,7 +21,7 @@
str::FromStr,
};
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
use dubbo_logger::tracing;
use http::{Request, Response, Uri};
use hyper::body::Body;
@@ -132,7 +132,7 @@
impl From<Url> for ServerBuilder {
fn from(u: Url) -> Self {
- let uri = match http::Uri::from_str(&u.raw_url_string()) {
+ let uri = match http::Uri::from_str(&u.as_str()) {
Ok(v) => v,
Err(err) => {
tracing::error!("http uri parse error: {}, url: {:?}", err, &u);
@@ -142,10 +142,14 @@
let authority = uri.authority().unwrap();
+ let service_name = u.query::<InterfaceName>().unwrap().value();
+
Self {
- listener: u.get_param("listener").unwrap_or("tcp".to_string()),
+ listener: u
+ .query_param_by_key("listener")
+ .unwrap_or("tcp".to_string()),
addr: authority.to_string().to_socket_addrs().unwrap().next(),
- service_names: vec![u.service_name],
+ service_names: vec![service_name],
server: DubboServer::default(),
certs: Vec::new(),
keys: Vec::new(),
diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs
index 10a879a..4369b82 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -15,19 +15,23 @@
* limitations under the License.
*/
-use std::task::Poll;
-
-use dubbo_logger::tracing::debug;
+use dubbo_base::StdError;
use hyper::client::{conn::Builder, service::Connect};
use tower_service::Service;
-use crate::{boxed, triple::transport::connector::get_connector};
+use crate::{boxed, invoker::clone_body::CloneBody, triple::transport::connector::get_connector};
-#[derive(Debug, Clone)]
+type HyperConnect = Connect<
+ crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, StdError>,
+ CloneBody,
+ http::Uri,
+>;
+
pub struct Connection {
host: hyper::Uri,
connector: String,
builder: Builder,
+ connect: Option<HyperConnect>,
}
impl Default for Connection {
@@ -42,6 +46,7 @@
host: hyper::Uri::default(),
connector: "http".to_string(),
builder: Builder::new(),
+ connect: None,
}
}
@@ -59,14 +64,16 @@
self.builder = builder;
self
}
+
+ pub fn build(mut self) -> Self {
+ let builder = self.builder.clone().http2_only(true).to_owned();
+ let hyper_connect: HyperConnect = Connect::new(get_connector(&self.connector), builder);
+ self.connect = Some(hyper_connect);
+ self
+ }
}
-impl<ReqBody> Service<http::Request<ReqBody>> for Connection
-where
- ReqBody: http_body::Body + Unpin + Send + 'static,
- ReqBody::Data: Send + Unpin,
- ReqBody::Error: Into<crate::Error>,
-{
+impl Service<http::Request<CloneBody>> for Connection {
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
@@ -75,25 +82,34 @@
fn poll_ready(
&mut self,
- _cx: &mut std::task::Context<'_>,
+ cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
+ match self.connect {
+ None => {
+ panic!("connection must be built before use")
+ }
+ Some(ref mut connect) => connect.poll_ready(cx).map_err(|e| e.into()),
+ }
}
- fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
- let builder = self.builder.clone().http2_only(true).to_owned();
- let mut connector = Connect::new(get_connector(self.connector.as_str()), builder);
- let uri = self.host.clone();
- let fut = async move {
- debug!("send base call to {}", uri);
- let mut con = connector.call(uri).await.unwrap();
+ fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+ match self.connect {
+ None => {
+ panic!("connection must be built before use")
+ }
+ Some(ref mut connect) => {
+ let uri = self.host.clone();
+ let call_fut = connect.call(uri);
+ let fut = async move {
+ let mut con = call_fut.await.unwrap();
+ con.call(req)
+ .await
+ .map_err(|err| err.into())
+ .map(|res| res.map(boxed))
+ };
- con.call(req)
- .await
- .map_err(|err| err.into())
- .map(|res| res.map(boxed))
- };
-
- Box::pin(fut)
+ return Box::pin(fut);
+ }
+ }
}
}
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index 1a21000..8f6a7bf 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -34,9 +34,10 @@
// let builder = ClientBuilder::new()
// .with_connector("unix")
// .with_host("unix://127.0.0.1:8888");
- let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888")
- .with_timeout(1000000)
- .with_direct(true);
+ let builder =
+ ClientBuilder::from_static(&"http://127.0.0.1:8888?interface=grpc.examples.echo.Echo")
+ .with_timeout(1000000)
+ .with_direct(true);
let mut cli = EchoClient::new(builder);
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs
index 90efc1a..010821e 100644
--- a/examples/echo/src/echo/server.rs
+++ b/examples/echo/src/echo/server.rs
@@ -70,7 +70,8 @@
// Dubbo::new()
// .with_config({
// let mut r = RootConfig::new();
- // r.test_config();
+ // r.test_config
+ // ();
// r
// })
// .start()
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs
index 0a74655..fc48dc5 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -21,7 +21,7 @@
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use dubbo::codegen::*;
/// Echo is the echo service.
- #[derive(Debug, Clone, Default)]
+ #[derive(Clone)]
pub struct EchoClient {
inner: TripleClient,
}
diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs
index eed3e52..14743ae 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -20,35 +20,20 @@
include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
}
-use std::env;
-
use dubbo::codegen::*;
-use dubbo_base::Url;
+use dubbo::extension;
use futures_util::StreamExt;
use protos::{greeter_client::GreeterClient, GreeterRequest};
use registry_nacos::NacosRegistry;
-use registry_zookeeper::ZookeeperRegistry;
#[tokio::main]
async fn main() {
dubbo_logger::init();
- let mut builder = ClientBuilder::new();
+ let _ = extension::EXTENSIONS.register::<NacosRegistry>().await;
- if let Ok(zk_servers) = env::var("ZOOKEEPER_SERVERS") {
- let zkr = ZookeeperRegistry::new(&zk_servers);
- let directory = RegistryDirectory::new(Box::new(zkr));
- builder = builder.with_directory(Box::new(directory));
- } else if let Ok(nacos_url_str) = env::var("NACOS_URL") {
- // NACOS_URL=nacos://mse-96efa264-p.nacos-ans.mse.aliyuncs.com
- let nacos_url = Url::from_url(&nacos_url_str).unwrap();
- let registry = NacosRegistry::new(nacos_url);
- let directory = RegistryDirectory::new(Box::new(registry));
- builder = builder.with_directory(Box::new(directory));
- } else {
- builder = builder.with_host("http://127.0.0.1:8888");
- }
+ let builder = ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap());
let mut cli = GreeterClient::new(builder);
@@ -60,7 +45,7 @@
.await;
let resp = match resp {
Ok(resp) => resp,
- Err(err) => return println!("{:?}", err),
+ Err(err) => return println!("response error: {:?}", err),
};
let (_parts, body) = resp.into_parts();
println!("Response: {:?}", body);
diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs
index f143e25..6bdbd00 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -19,10 +19,11 @@
use async_trait::async_trait;
use futures_util::{Stream, StreamExt};
+use registry_nacos::NacosRegistry;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use dubbo::{codegen::*, Dubbo};
+use dubbo::{codegen::*, extension, Dubbo};
use dubbo_config::RootConfig;
use dubbo_logger::{
tracing::{info, span},
@@ -32,8 +33,6 @@
greeter_server::{register_server, Greeter},
GreeterReply, GreeterRequest,
};
-use registry_zookeeper::ZookeeperRegistry;
-
pub mod protos {
#![allow(non_camel_case_types)]
include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
@@ -50,15 +49,18 @@
register_server(GreeterServerImpl {
name: "greeter".to_string(),
});
- let zkr = ZookeeperRegistry::default();
+ // let zkr: ZookeeperRegistry = ZookeeperRegistry::default();
let r = RootConfig::new();
let r = match r.load() {
Ok(config) => config,
Err(_err) => panic!("err: {:?}", _err), // response was droped
};
+
+ let _ = extension::EXTENSIONS.register::<NacosRegistry>().await;
let mut f = Dubbo::new()
.with_config(r)
- .add_registry("zookeeper", Box::new(zkr));
+ .add_registry("nacos://127.0.0.1:8848/");
+
f.start().await;
}
diff --git a/protocol/base/src/invoker.rs b/protocol/base/src/invoker.rs
index 14de6ce..d676fa4 100644
--- a/protocol/base/src/invoker.rs
+++ b/protocol/base/src/invoker.rs
@@ -66,9 +66,9 @@
impl Display for BaseInvoker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Invoker")
- .field("protocol", &self.url.scheme)
- .field("host", &self.url.ip)
- .field("path", &self.url.location)
+ .field("protocol", &self.url.protocol())
+ .field("host", &self.url.host())
+ .field("path", &self.url.path())
.finish()
}
}
diff --git a/registry/nacos/Cargo.toml b/registry/nacos/Cargo.toml
index 2437bc0..1fa6c19 100644
--- a/registry/nacos/Cargo.toml
+++ b/registry/nacos/Cargo.toml
@@ -9,13 +9,15 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-nacos-sdk = { version = "0.2.3", features = ["naming", "auth-by-http"] }
+nacos-sdk = { version = "0.3", features = ["naming", "auth-by-http", "async"] }
dubbo.workspace = true
serde_json.workspace = true
serde = { workspace = true, features = ["derive"] }
anyhow.workspace = true
dubbo-logger.workspace = true
dubbo-base.workspace = true
+tokio.workspace = true
+async-trait.workspace = true
[dev-dependencies]
tracing-subscriber = "0.3.16"
diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs
index bbbaaa2..9ac7022 100644
--- a/registry/nacos/src/lib.rs
+++ b/registry/nacos/src/lib.rs
@@ -14,362 +14,379 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-mod utils;
-use dubbo_base::Url;
-use std::{
- collections::{HashMap, HashSet},
- sync::{Arc, Mutex},
+use async_trait::async_trait;
+use dubbo_base::{StdError, Url};
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::mpsc;
+
+use dubbo::extension::{
+ registry_extension::{DiscoverStream, Registry, ServiceChange},
+ Extension,
};
-
-use anyhow::anyhow;
-use dubbo::registry::{NotifyListener, Registry, RegistryNotifyListener, ServiceEvent};
-use dubbo_logger::tracing::{error, info, warn};
-use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
-
-use crate::utils::{build_nacos_client_props, is_concrete_str, is_wildcard_str, match_range};
-
-const VERSION_KEY: &str = "version";
-
-const GROUP_KEY: &str = "group";
-
-const DEFAULT_GROUP: &str = "DEFAULT_GROUP";
-
-const PROVIDER_SIDE: &str = "provider";
-
-const DEFAULT_CATEGORY: &str = PROVIDERS_CATEGORY;
-
-const SIDE_KEY: &str = "side";
-
-const REGISTER_CONSUMER_URL_KEY: &str = "register-consumer-url";
-
-const SERVICE_NAME_SEPARATOR: &str = ":";
-
-const CATEGORY_KEY: &str = "category";
-
-const PROVIDERS_CATEGORY: &str = "providers";
-
-#[allow(dead_code)]
-const ADMIN_PROTOCOL: &str = "admin";
-
-#[allow(dead_code)]
-const INNERCLASS_SYMBOL: &str = "$";
-
-#[allow(dead_code)]
-const INNERCLASS_COMPATIBLE_SYMBOL: &str = "___";
+use dubbo_base::{
+ registry_param::{
+ AppName, Category, Group, InterfaceName, RegistryUrl, ServiceNamespace, Version,
+ },
+ url::UrlParam,
+};
+use dubbo_logger::tracing::info;
+use nacos_sdk::api::{
+ naming::{NamingEventListener, NamingService, NamingServiceBuilder, ServiceInstance},
+ props::ClientProps,
+};
+use tokio::sync::{watch, Notify};
pub struct NacosRegistry {
- nacos_naming_service: Arc<dyn NamingService + Sync + Send + 'static>,
- listeners: Mutex<HashMap<String, HashSet<Arc<NotifyListenerWrapper>>>>,
+ url: Url,
+ nacos_service: Arc<dyn NamingService + Send + Sync>,
}
impl NacosRegistry {
- pub fn new(url: Url) -> Self {
- let (nacos_client_props, enable_auth) = build_nacos_client_props(&url);
+ pub fn new(url: Url, nacos_service: Arc<dyn NamingService + Send + Sync>) -> Self {
+ Self { url, nacos_service }
+ }
+
+ fn create_nacos_service_instance(url: &Url) -> ServiceInstance {
+ let ip = url.host().unwrap();
+ let port = url.port().unwrap();
+
+ ServiceInstance {
+ ip: ip.to_string(),
+ port: port.into(),
+ metadata: url.all_query_params(),
+ ..Default::default()
+ }
+ }
+
+ fn diff<'a>(
+ old_service: &'a Vec<ServiceInstance>,
+ new_services: &'a Vec<ServiceInstance>,
+ ) -> (Vec<&'a ServiceInstance>, Vec<&'a ServiceInstance>) {
+ let new_hosts_map: HashMap<String, &ServiceInstance> = new_services
+ .iter()
+ .map(|hosts| (hosts.ip_and_port(), hosts))
+ .collect();
+
+ let old_hosts_map: HashMap<String, &ServiceInstance> = old_service
+ .iter()
+ .map(|hosts| (hosts.ip_and_port(), hosts))
+ .collect();
+
+ let mut add_hosts = Vec::<&ServiceInstance>::new();
+ let mut removed_hosts = Vec::<&ServiceInstance>::new();
+
+ for (key, new_host) in new_hosts_map.iter() {
+ let old_host = old_hosts_map.get(key);
+ match old_host {
+ None => {
+ add_hosts.push(*new_host);
+ }
+ Some(old_host) => {
+ if !old_host.is_same_instance(new_host) {
+ removed_hosts.push(*old_host);
+ add_hosts.push(*new_host);
+ }
+ }
+ }
+ }
+
+ for (key, old_host) in old_hosts_map.iter() {
+ let new_host = new_hosts_map.get(key);
+ match new_host {
+ None => {
+ removed_hosts.push(*old_host);
+ }
+ Some(_) => {}
+ }
+ }
+
+ (removed_hosts, add_hosts)
+ }
+}
+
+#[async_trait]
+impl Registry for NacosRegistry {
+ async fn register(&self, url: Url) -> Result<(), StdError> {
+ let service_name = NacosServiceName::new(&url);
+
+ let group_name = service_name.group();
+
+ let registry_service_name_str = service_name.value();
+
+ let service_instance = Self::create_nacos_service_instance(&url);
+
+ self.nacos_service
+ .register_instance(
+ registry_service_name_str.to_owned(),
+ Some(group_name.to_owned()),
+ service_instance,
+ )
+ .await?;
+
+ Ok(())
+ }
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let service_name = NacosServiceName::new(&url);
+
+ let group_name = service_name.group();
+
+ let registry_service_name_str = service_name.value();
+
+ let service_instance = Self::create_nacos_service_instance(&url);
+
+ self.nacos_service
+ .deregister_instance(
+ registry_service_name_str.to_owned(),
+ Some(group_name.to_owned()),
+ service_instance,
+ )
+ .await?;
+
+ Ok(())
+ }
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+ let service_name = NacosServiceName::new(&url);
+
+ let group_name = service_name.group().to_owned();
+
+ let registry_service_name_str = service_name.value().to_owned();
+
+ let all_instance = self
+ .nacos_service
+ .get_all_instances(
+ registry_service_name_str.to_owned(),
+ Some(group_name.to_owned()),
+ Vec::default(),
+ false,
+ )
+ .await?;
+
+ let (tx, rx) = mpsc::channel(1024);
+
+ let (event_listener, mut listener_change_rx, closed) = NacosNamingEventListener::new();
+ let event_listener = Arc::new(event_listener);
+
+ let nacos_service_cloned = self.nacos_service.clone();
+ let event_listener_cloned = event_listener.clone();
+ let registry_service_name_str_clone = registry_service_name_str.clone();
+ let group_name_clone = group_name.clone();
+ tokio::spawn(async move {
+ let mut current_instances = all_instance;
+ for instance in ¤t_instances {
+ let url = instance_to_url(instance).as_str().to_owned();
+ let _ = tx.send(Ok(ServiceChange::Insert(url, ()))).await;
+ }
+
+ loop {
+ let change = tokio::select! {
+ _ = closed.notified() => {
+ break;
+ },
+ change = listener_change_rx.changed() => change
+ };
+
+ if change.is_err() {
+ break;
+ }
+
+ let change = listener_change_rx.borrow_and_update().clone();
+ if change.is_empty() {
+ continue;
+ }
+ let (remove_instances, add_instances) = Self::diff(¤t_instances, &change);
+
+ for remove_instance in remove_instances {
+ let url = instance_to_url(remove_instance).as_str().to_owned();
+ let Ok(_) = tx.send(Ok(ServiceChange::Remove(url))).await else {
+ break;
+ };
+ }
+
+ for add_instance in add_instances {
+ let url = instance_to_url(add_instance).as_str().to_owned();
+ let Ok(_) = tx.send(Ok(ServiceChange::Insert(url, ()))).await else {
+ break;
+ };
+ }
+
+ current_instances = change;
+ }
+
+ info!("unsubscribe");
+ let _ = nacos_service_cloned
+ .unsubscribe(
+ registry_service_name_str_clone,
+ Some(group_name_clone),
+ Vec::default(),
+ event_listener_cloned,
+ )
+ .await;
+ });
+
+ let _ = self
+ .nacos_service
+ .subscribe(
+ registry_service_name_str,
+ Some(group_name),
+ Vec::default(),
+ event_listener,
+ )
+ .await?;
+
+ Ok(rx)
+ }
+
+ async fn unsubscribe(&self, _: Url) -> Result<(), StdError> {
+ Ok(())
+ }
+
+ fn url(&self) -> &Url {
+ &self.url
+ }
+}
+
+#[async_trait]
+impl Extension for NacosRegistry {
+ type Target = Box<dyn Registry + Send + Sync + 'static>;
+
+ fn name() -> String {
+ "nacos".to_string()
+ }
+
+ async fn create(url: Url) -> Result<Self::Target, StdError> {
+ // url example:
+ // extension://0.0.0.0?extension-type=registry&extension-name=nacos®istry=nacos://127.0.0.1:8848
+ let registry_url = url.query::<RegistryUrl>().unwrap();
+ let registry_url = registry_url.value();
+
+ let host = registry_url.host().unwrap();
+ let port = registry_url.port().unwrap_or(8848);
+
+ let nacos_server_addr = format!("{}:{}", host, port);
+
+ let namespace = registry_url.query::<ServiceNamespace>().unwrap_or_default();
+ let namespace = namespace.value();
+
+ let app_name = registry_url.query::<AppName>().unwrap_or_default();
+ let app_name = app_name.value();
+
+ let user_name = registry_url.username();
+ let password = registry_url.password().unwrap_or_default();
+
+ let nacos_client_props = ClientProps::new()
+ .server_addr(nacos_server_addr)
+ .namespace(namespace)
+ .app_name(app_name)
+ .auth_username(user_name)
+ .auth_password(password);
let mut nacos_naming_builder = NamingServiceBuilder::new(nacos_client_props);
- if enable_auth {
+ if !user_name.is_empty() {
nacos_naming_builder = nacos_naming_builder.enable_auth_plugin_http();
}
let nacos_naming_service = nacos_naming_builder.build().unwrap();
- Self {
- nacos_naming_service: Arc::new(nacos_naming_service),
- listeners: Mutex::new(HashMap::new()),
- }
- }
+ let nacos_registry = NacosRegistry::new(registry_url, Arc::new(nacos_naming_service));
- #[allow(dead_code)]
- fn get_subscribe_service_names(&self, service_name: &NacosServiceName) -> HashSet<String> {
- if service_name.is_concrete() {
- let mut set = HashSet::new();
- let service_subscribe_name = service_name.to_subscriber_str();
- let service_subscriber_legacy_name = service_name.to_subscriber_legacy_string();
- if service_subscribe_name.eq(&service_subscriber_legacy_name) {
- set.insert(service_subscribe_name);
- } else {
- set.insert(service_subscribe_name);
- set.insert(service_subscriber_legacy_name);
- }
-
- set
- } else {
- let list_view = self.nacos_naming_service.get_service_list(
- 1,
- i32::MAX,
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- );
- if let Err(e) = list_view {
- error!("list service instances occur an error: {:?}", e);
- return HashSet::default();
- }
-
- let list_view = list_view.unwrap();
- let set: HashSet<String> = list_view
- .0
- .into_iter()
- .filter(|service_name| service_name.split(SERVICE_NAME_SEPARATOR).count() == 4)
- .map(|service_name| NacosServiceName::from_service_name_str(&service_name))
- .filter(|other_service_name| service_name.is_compatible(other_service_name))
- .map(|service_name| service_name.to_subscriber_str())
- .collect();
- set
- }
+ Ok(Box::new(nacos_registry))
}
}
-impl NacosRegistry {
- fn create_nacos_service_instance(url: Url) -> ServiceInstance {
- let ip = url.ip;
- let port = url.port;
- nacos_sdk::api::naming::ServiceInstance {
- ip,
- port: port.parse().unwrap(),
- metadata: url.params,
- ..Default::default()
- }
+fn instance_to_url(instance: &ServiceInstance) -> Url {
+ let mut url = Url::empty();
+ url.set_protocol("provider");
+ url.set_host(instance.ip());
+ url.set_port(instance.port().try_into().unwrap_or_default());
+ url.extend_pairs(
+ instance
+ .metadata()
+ .iter()
+ .map(|(k, v)| (k.clone(), v.clone())),
+ );
+
+ url
+}
+
+struct NacosNamingEventListener {
+ tx: watch::Sender<Vec<ServiceInstance>>,
+ closed: Arc<Notify>,
+}
+
+impl NacosNamingEventListener {
+ fn new() -> (Self, watch::Receiver<Vec<ServiceInstance>>, Arc<Notify>) {
+ let (tx, rx) = watch::channel(Vec::new());
+
+ let closed = Arc::new(Notify::new());
+ let this = Self {
+ tx,
+ closed: closed.clone(),
+ };
+ (this, rx, closed)
}
}
-impl Registry for NacosRegistry {
- fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
- let side = url.get_param(SIDE_KEY).unwrap_or_default();
- let register_consumer = url
- .get_param(REGISTER_CONSUMER_URL_KEY)
- .unwrap_or_else(|| false.to_string())
- .parse::<bool>()
- .unwrap_or(false);
- if side.ne(PROVIDER_SIDE) && !register_consumer {
- warn!("Please set 'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url registration.");
- return Ok(());
- }
-
- let nacos_service_name = NacosServiceName::new(&url);
-
- let group_name = Some(
- nacos_service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- );
- let nacos_service_name = nacos_service_name.to_register_str();
-
- let nacos_service_instance = Self::create_nacos_service_instance(url);
-
- info!("register service: {}", nacos_service_name);
- let ret = self.nacos_naming_service.register_instance(
- nacos_service_name,
- group_name,
- nacos_service_instance,
- );
- if let Err(e) = ret {
- error!("register to nacos occur an error: {:?}", e);
- return Err(anyhow!("register to nacos occur an error: {:?}", e).into());
- }
-
- Ok(())
- }
-
- fn unregister(&mut self, url: Url) -> Result<(), dubbo::StdError> {
- let nacos_service_name = NacosServiceName::new(&url);
-
- let group_name = Some(
- nacos_service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- );
- let nacos_service_name = nacos_service_name.to_register_str();
-
- let nacos_service_instance = Self::create_nacos_service_instance(url);
-
- info!("deregister service: {}", nacos_service_name);
-
- let ret = self.nacos_naming_service.deregister_instance(
- nacos_service_name,
- group_name,
- nacos_service_instance,
- );
- if let Err(e) = ret {
- error!("deregister service from nacos occur an error: {:?}", e);
- return Err(anyhow!("deregister service from nacos occur an error: {:?}", e).into());
- }
- Ok(())
- }
-
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), dubbo::StdError> {
- let service_name = NacosServiceName::new(&url);
- let url_str = url.to_url();
-
- info!("subscribe: {}", &url_str);
-
- let nacos_listener: Arc<NotifyListenerWrapper> = {
- let listeners = self.listeners.lock();
- if let Err(e) = listeners {
- error!("subscribe service failed: {:?}", e);
- return Err(anyhow!("subscribe service failed: {:?}", e).into());
+impl NamingEventListener for NacosNamingEventListener {
+ fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
+ match event.instances {
+ Some(ref instances) => {
+ let instances = instances.clone();
+ let send = self.tx.send(instances);
+ match send {
+ Ok(_) => {}
+ Err(_) => {
+ self.closed.notify_waiters();
+ }
+ }
}
-
- let mut listeners = listeners.unwrap();
- let listener_set = listeners.get_mut(url_str.as_str());
-
- let wrapper = Arc::new(NotifyListenerWrapper(listener));
- if let Some(listener_set) = listener_set {
- listener_set.insert(wrapper.clone());
- } else {
- let mut hash_set = HashSet::new();
- hash_set.insert(wrapper.clone());
- listeners.insert(url_str, hash_set);
- }
-
- wrapper
- };
-
- let ret = self.nacos_naming_service.subscribe(
- service_name.to_subscriber_str(),
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- Vec::new(),
- nacos_listener,
- );
-
- if let Err(e) = ret {
- error!("subscribe service failed: {:?}", e);
- return Err(anyhow!("subscribe service failed: {:?}", e).into());
+ None => {}
}
-
- Ok(())
- }
-
- fn unsubscribe(
- &self,
- url: Url,
- listener: RegistryNotifyListener,
- ) -> Result<(), dubbo::StdError> {
- let service_name = NacosServiceName::new(&url);
- let url_str = url.to_url();
- info!("unsubscribe: {}", &url_str);
-
- let nacos_listener: Arc<NotifyListenerWrapper> = {
- let listeners = self.listeners.lock();
- if let Err(e) = listeners {
- error!("unsubscribe service failed: {:?}", e);
- return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
- }
-
- let mut listeners = listeners.unwrap();
- let listener_set = listeners.get_mut(url_str.as_str());
- if listener_set.is_none() {
- return Ok(());
- }
-
- let listener_set = listener_set.unwrap();
-
- let listener = Arc::new(NotifyListenerWrapper(listener));
- let listener = listener_set.take(&listener);
- if listener.is_none() {
- return Ok(());
- }
-
- listener.unwrap()
- };
-
- let ret = self.nacos_naming_service.unsubscribe(
- service_name.to_subscriber_str(),
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- Vec::new(),
- nacos_listener,
- );
-
- if let Err(e) = ret {
- error!("unsubscribe service failed: {:?}", e);
- return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
- }
-
- Ok(())
}
}
struct NacosServiceName {
+ #[allow(dead_code)]
category: String,
- service_interface: String,
+ #[allow(dead_code)]
+ interface: String,
+ #[allow(dead_code)]
version: String,
+ #[allow(dead_code)]
group: String,
+
+ #[allow(dead_code)]
+ value: String,
}
impl NacosServiceName {
- fn new(url: &Url) -> NacosServiceName {
- let service_interface = url.get_service_name();
+ fn new(url: &Url) -> Self {
+ let interface = url.query::<InterfaceName>().unwrap();
+ let interface = interface.value();
- let category = url.get_param(CATEGORY_KEY).unwrap_or_default();
+ let category = url.query::<Category>().unwrap_or_default();
+ let category = category.value();
- let version = url.get_param(VERSION_KEY).unwrap_or_default();
+ let version = url.query::<Version>().unwrap_or_default();
+ let version = version.value();
- let group = url.get_param(GROUP_KEY).unwrap_or_default();
+ let group = url.query::<Group>().unwrap_or_default();
+ let group = group.value();
+
+ let value = format!("{}:{}:{}:{}", category, interface, version, group);
Self {
category,
- service_interface: service_interface.clone(),
+ interface,
version,
group,
- }
- }
-
- #[allow(dead_code)]
- fn from_service_name_str(service_name_str: &str) -> Self {
- let mut splitter = service_name_str.split(SERVICE_NAME_SEPARATOR);
-
- let category = splitter.next().unwrap_or_default().to_string();
- let service_interface = splitter.next().unwrap_or_default().to_string();
- let version = splitter.next().unwrap_or_default().to_string();
- let group = splitter.next().unwrap_or_default().to_string();
-
- Self {
- category,
- service_interface,
- version,
- group,
- }
- }
-
- #[allow(dead_code)]
- fn version(&self) -> &str {
- &self.version
- }
-
- #[allow(dead_code)]
- fn get_version_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.version.is_empty() {
- default
- } else {
- &self.version
- }
- }
-
- #[allow(dead_code)]
- fn group(&self) -> &str {
- &self.group
- }
-
- fn get_group_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.group.is_empty() {
- default
- } else {
- &self.group
+ value,
}
}
@@ -379,157 +396,23 @@
}
#[allow(dead_code)]
- fn get_category_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.category.is_empty() {
- default
- } else {
- &self.category
- }
+ fn interface(&self) -> &str {
+ &self.interface
}
#[allow(dead_code)]
- fn service_interface(&self) -> &str {
- &self.service_interface
+ fn version(&self) -> &str {
+ &self.version
}
#[allow(dead_code)]
- fn get_service_interface_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.service_interface.is_empty() {
- default
- } else {
- &self.service_interface
- }
- }
-
- fn to_register_str(&self) -> String {
- let category = if self.category.is_empty() {
- DEFAULT_CATEGORY
- } else {
- &self.category
- };
- format!(
- "{}:{}:{}:{}",
- category, self.service_interface, self.version, self.group
- )
- }
-
- fn to_subscriber_str(&self) -> String {
- let category = if is_concrete_str(&self.service_interface) {
- DEFAULT_CATEGORY
- } else {
- &self.category
- };
-
- format!(
- "{}:{}:{}:{}",
- category, self.service_interface, self.version, self.group
- )
+ fn group(&self) -> &str {
+ &self.group
}
#[allow(dead_code)]
- fn to_subscriber_legacy_string(&self) -> String {
- let mut legacy_string = DEFAULT_CATEGORY.to_owned();
- if !self.service_interface.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.service_interface);
- }
-
- if !self.version.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.version);
- }
-
- if !self.group.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.group);
- }
-
- legacy_string
- }
-
- #[allow(dead_code)]
- fn is_concrete(&self) -> bool {
- is_concrete_str(&self.service_interface)
- && is_concrete_str(&self.version)
- && is_concrete_str(&self.group)
- }
-
- #[allow(dead_code)]
- fn is_compatible(&self, other: &NacosServiceName) -> bool {
- if !other.is_concrete() {
- return false;
- }
-
- if !self.category.eq(&other.category) && !match_range(&self.category, &other.category) {
- return false;
- }
-
- if is_wildcard_str(&self.version) {
- return true;
- }
-
- if is_wildcard_str(&self.group) {
- return true;
- }
-
- if !&self.version.eq(&other.version) && !match_range(&self.version, &other.version) {
- return false;
- }
-
- if !self.group.eq(&other.group) && !match_range(&self.group, &other.group) {
- return false;
- }
-
- true
- }
-}
-
-struct NotifyListenerWrapper(Arc<dyn NotifyListener + Sync + Send + 'static>);
-
-impl std::hash::Hash for NotifyListenerWrapper {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- let ptr = self.0.as_ref();
- std::ptr::hash(ptr, state);
- }
-}
-
-impl PartialEq for NotifyListenerWrapper {
- fn eq(&self, other: &Self) -> bool {
- let self_ptr = self.0.as_ref() as *const dyn NotifyListener;
- let other_ptr = other.0.as_ref() as *const dyn NotifyListener;
-
- let (self_data_ptr, _): (*const u8, *const u8) = unsafe { std::mem::transmute(self_ptr) };
-
- let (other_data_ptr, _): (*const u8, *const u8) = unsafe { std::mem::transmute(other_ptr) };
- self_data_ptr == other_data_ptr
- }
-}
-
-impl Eq for NotifyListenerWrapper {}
-
-impl nacos_sdk::api::naming::NamingEventListener for NotifyListenerWrapper {
- fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
- let service_name = event.service_name.clone();
- let instances = event.instances.as_ref();
- let urls: Vec<Url>;
- if let Some(instances) = instances {
- urls = instances
- .iter()
- .filter_map(|data| {
- let url_str =
- format!("triple://{}:{}/{}", data.ip(), data.port(), service_name);
- Url::from_url(&url_str)
- })
- .collect();
- } else {
- urls = Vec::new();
- }
- let notify_event = ServiceEvent {
- key: service_name,
- action: String::from("CHANGE"),
- service: urls,
- };
- self.0.notify(notify_event);
+ fn value(&self) -> &str {
+ &self.value
}
}
@@ -538,14 +421,16 @@
use core::time;
use std::thread;
+ use tracing::error;
+ use dubbo_base::{extension_param::ExtensionName, registry_param::Side};
use tracing::metadata::LevelFilter;
use super::*;
- #[test]
+ #[tokio::test]
#[ignore]
- pub fn test_register_to_nacos() {
+ pub async fn test_register_to_nacos() {
tracing_subscriber::fmt()
.with_thread_names(true)
.with_file(true)
@@ -555,15 +440,19 @@
.with_max_level(LevelFilter::DEBUG)
.init();
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
+ let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+ .parse()
+ .unwrap();
+ extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+ extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+ let registry = NacosRegistry::create(extension_url).await.unwrap();
- let ret = registry.register(service_url);
+ let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+
+ service_url.add_query_param(Side::Provider);
+
+ let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
@@ -571,9 +460,9 @@
thread::sleep(sleep_millis);
}
- #[test]
+ #[tokio::test]
#[ignore]
- pub fn test_register_and_unregister() {
+ pub async fn test_register_and_unregister() {
tracing_subscriber::fmt()
.with_thread_names(true)
.with_file(true)
@@ -583,23 +472,27 @@
.with_max_level(LevelFilter::DEBUG)
.init();
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
+ let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+ .parse()
+ .unwrap();
+ extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+ extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
- let mut service_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+ let registry = NacosRegistry::create(extension_url).await.unwrap();
- let ret = registry.register(service_url);
+ let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+
+ service_url.add_query_param(Side::Provider);
+
+ let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
let sleep_millis = time::Duration::from_secs(10);
thread::sleep(sleep_millis);
- let unregister_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- let ret = registry.unregister(unregister_url);
+ let unregister_url = "tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+ let ret = registry.unregister(unregister_url).await;
info!("deregister result: {:?}", ret);
@@ -607,20 +500,9 @@
thread::sleep(sleep_millis);
}
- struct TestNotifyListener;
- impl NotifyListener for TestNotifyListener {
- fn notify(&self, event: ServiceEvent) {
- info!("notified: {:?}", event.key);
- }
-
- fn notify_all(&self, event: ServiceEvent) {
- info!("notify_all: {:?}", event.key);
- }
- }
-
- #[test]
+ #[tokio::test]
#[ignore]
- fn test_subscribe() {
+ pub async fn test_subscribe() {
tracing_subscriber::fmt()
.with_thread_names(true)
.with_file(true)
@@ -630,34 +512,41 @@
.with_max_level(LevelFilter::DEBUG)
.init();
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
+ let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+ .parse()
+ .unwrap();
+ extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+ extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+ let registry = NacosRegistry::create(extension_url).await.unwrap();
- let ret = registry.register(service_url);
+ let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+
+ service_url.add_query_param(Side::Provider);
+
+ let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
- let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+ let subscribe_url = "consumer://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider".parse().unwrap();
+ let subscribe_ret = registry.subscribe(subscribe_url).await;
- let ret = registry.subscribe(subscribe_url, Arc::new(TestNotifyListener));
-
- if let Err(e) = ret {
+ if let Err(e) = subscribe_ret {
error!("error message: {:?}", e);
return;
}
+ let mut rx = subscribe_ret.unwrap();
+ let change = rx.recv().await;
+ info!("receive change: {:?}", change);
+
let sleep_millis = time::Duration::from_secs(300);
thread::sleep(sleep_millis);
}
- #[test]
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
- fn test_unsubscribe() {
+ pub async fn test_unsubscribe() {
tracing_subscriber::fmt()
.with_thread_names(true)
.with_file(true)
@@ -667,39 +556,39 @@
.with_max_level(LevelFilter::DEBUG)
.init();
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
+ let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+ .parse()
+ .unwrap();
+ extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+ extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+ let registry = NacosRegistry::create(extension_url).await.unwrap();
- let ret = registry.register(service_url);
+ let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+
+ service_url.add_query_param(Side::Provider);
+
+ let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
- let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+ let subscribe_url = "provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider".parse().unwrap();
- let listener = Arc::new(TestNotifyListener);
-
- let ret = registry.subscribe(subscribe_url, listener.clone());
+ let ret = registry.subscribe(subscribe_url).await;
if let Err(e) = ret {
error!("error message: {:?}", e);
return;
}
+ let mut rx = ret.unwrap();
+ let change = rx.recv().await;
+ info!("receive change: {:?}", change);
+
let sleep_millis = time::Duration::from_secs(40);
thread::sleep(sleep_millis);
- let unsubscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
- let ret = registry.unsubscribe(unsubscribe_url, listener.clone());
-
- if let Err(e) = ret {
- error!("error message: {:?}", e);
- return;
- }
+ drop(rx);
let sleep_millis = time::Duration::from_secs(40);
thread::sleep(sleep_millis);
diff --git a/registry/nacos/src/utils/mod.rs b/registry/nacos/src/utils/mod.rs
deleted file mode 100644
index b247f60..0000000
--- a/registry/nacos/src/utils/mod.rs
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use dubbo_base::Url;
-use nacos_sdk::api::props::ClientProps;
-
-const APP_NAME_KEY: &str = "AppName";
-
-const UNKNOWN_APP: &str = "UnknownApp";
-
-const NAMESPACE_KEY: &str = "namespace";
-
-const DEFAULT_NAMESPACE: &str = "public";
-
-const USERNAME_KEY: &str = "username";
-
-const PASSWORD_KEY: &str = "password";
-
-const BACKUP_KEY: &str = "backup";
-
-const WILDCARD: &str = "*";
-
-const RANGE_STR_SEPARATOR: &str = ",";
-
-pub(crate) fn build_nacos_client_props(url: &Url) -> (nacos_sdk::api::props::ClientProps, bool) {
- let host = &url.ip;
- let port = &url.port;
- let backup = url
- .get_param(BACKUP_KEY)
- .map(|mut data| {
- data.insert(0, ',');
- data
- })
- .unwrap_or_default();
- let server_addr = format!("{}:{}{}", host, port, backup);
-
- let namespace = url
- .get_param(NAMESPACE_KEY)
- .unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
- let app_name = url
- .get_param(APP_NAME_KEY)
- .unwrap_or_else(|| UNKNOWN_APP.to_string());
- let username = url.get_param(USERNAME_KEY).unwrap_or_default();
- let password = url.get_param(PASSWORD_KEY).unwrap_or_default();
-
- let enable_auth = !password.is_empty() && !username.is_empty();
-
- // todo ext parameters
-
- let mut client_props = ClientProps::new();
-
- client_props = client_props
- .server_addr(server_addr)
- .namespace(namespace)
- .app_name(app_name)
- .auth_username(username)
- .auth_password(password);
-
- (client_props, enable_auth)
-}
-
-pub(crate) fn is_wildcard_str(str: &str) -> bool {
- str.eq(WILDCARD)
-}
-
-pub(crate) fn is_range_str(str: &str) -> bool {
- let ret = str.split(RANGE_STR_SEPARATOR);
- let count = ret.count();
- count > 1
-}
-
-pub(crate) fn is_concrete_str(str: &str) -> bool {
- !is_wildcard_str(str) && !is_range_str(str)
-}
-
-pub(crate) fn match_range(range: &str, value: &str) -> bool {
- if range.is_empty() {
- return true;
- }
-
- if !is_range_str(range) {
- return false;
- }
-
- range
- .split(RANGE_STR_SEPARATOR)
- .any(|data| (*data).eq(value))
-}
diff --git a/registry/zookeeper/Cargo.toml b/registry/zookeeper/Cargo.toml
index 2df5499..ebcb269 100644
--- a/registry/zookeeper/Cargo.toml
+++ b/registry/zookeeper/Cargo.toml
@@ -9,10 +9,13 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-zookeeper = "0.7.0"
+zookeeper = "0.8.0"
dubbo.workspace = true
+anyhow.workspace = true
serde_json.workspace = true
serde = { workspace = true, features = ["derive"] }
urlencoding.workspace = true
dubbo-logger.workspace = true
dubbo-base.workspace = true
+tokio.workspace = true
+async-trait.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index 6a6e94b..c9683bf 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -17,29 +17,20 @@
#![allow(unused_variables, dead_code, missing_docs)]
-use std::{
- collections::{HashMap, HashSet},
- env,
- sync::{Arc, Mutex, RwLock},
- time::Duration,
-};
+use std::{collections::HashMap, env, sync::Arc, time::Duration};
+use async_trait::async_trait;
use dubbo_base::{
constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
- Url,
+ StdError, Url,
};
use dubbo_logger::tracing::{debug, error, info};
use serde::{Deserialize, Serialize};
-#[allow(unused_imports)]
-use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZkError, ZooKeeper};
+use tokio::{select, sync::mpsc};
+use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
-use dubbo::{
- registry::{
- memory_registry::MemoryRegistry, NotifyListener, Registry, RegistryNotifyListener,
- ServiceEvent,
- },
- StdError,
-};
+use dubbo::extension::registry_extension::{DiscoverStream, Registry, ServiceChange};
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam};
// Get metadata of a service registration from a URL
// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
@@ -54,12 +45,9 @@
}
}
-//#[derive(Debug)]
pub struct ZookeeperRegistry {
root_path: String,
zk_client: Arc<ZooKeeper>,
- listeners: RwLock<HashMap<String, RegistryNotifyListener>>,
- memory_registry: Arc<Mutex<MemoryRegistry>>,
}
#[derive(Serialize, Deserialize, Debug)]
@@ -91,24 +79,6 @@
ZookeeperRegistry {
root_path: "/services".to_string(),
zk_client: Arc::new(zk_client),
- listeners: RwLock::new(HashMap::new()),
- memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
- }
- }
-
- fn create_listener(
- &self,
- path: String,
- service_name: String,
- listener: RegistryNotifyListener,
- ) -> ServiceInstancesChangedListener {
- let mut service_names = HashSet::new();
- service_names.insert(service_name.clone());
- ServiceInstancesChangedListener {
- zk_client: Arc::clone(&self.zk_client),
- path,
- service_name: service_name.clone(),
- listener,
}
}
@@ -127,10 +97,6 @@
s.to_string()
}
- pub fn get_client(&self) -> Arc<ZooKeeper> {
- self.zk_client.clone()
- }
-
// If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned.
pub fn create_path(
&self,
@@ -153,8 +119,8 @@
match zk_result {
Ok(_) => Ok(()),
Err(err) => {
- error!("create path {} to zookeeper error {}", path, err);
- Err(Box::try_from(err).unwrap())
+ error!("zk path {} parent not exists.", path);
+ Err(err.into())
}
}
}
@@ -176,28 +142,12 @@
current.push('/');
current.push_str(node_key);
if !self.exists_path(current.as_str()) {
- let new_create_mode = match children == node_key {
- true => create_mode,
- false => CreateMode::Persistent,
- };
- let new_data = match children == node_key {
- true => data,
- false => "",
+ let (new_create_mode, new_data) = match children == node_key {
+ true => (create_mode, data),
+ false => (CreateMode::Persistent, ""),
};
- //Skip ZkError::NodeExists
- let res = self.create_path(current.as_str(), new_data, new_create_mode);
- let mut node_exist = false;
- if let Err(e) = &res {
- if let Some(zk_err) = e.downcast_ref::<ZkError>() {
- if ZkError::NodeExists == *zk_err {
- node_exist = true;
- }
- }
- }
- if !node_exist {
- return res;
- }
+ self.create_path(current.as_str(), new_data, new_create_mode)?;
}
}
Ok(())
@@ -205,7 +155,7 @@
pub fn delete_path(&self, path: &str) {
if self.exists_path(path) {
- self.get_client().delete(path, None).unwrap()
+ self.zk_client.delete(path, None).unwrap()
}
}
@@ -225,6 +175,65 @@
None
}
}
+
+ pub fn diff<'a>(
+ old_urls: &'a Vec<String>,
+ new_urls: &'a Vec<String>,
+ ) -> (Vec<String>, Vec<String>) {
+ let old_urls_map: HashMap<String, String> = old_urls
+ .iter()
+ .map(|url| url.parse())
+ .filter(|item| item.is_ok())
+ .map(|item| item.unwrap())
+ .map(|item: Url| {
+ let ip_port = item.authority().to_owned();
+ let url = item.as_str().to_owned();
+ (ip_port, url)
+ })
+ .collect();
+
+ let new_urls_map: HashMap<String, String> = new_urls
+ .iter()
+ .map(|url| url.parse())
+ .filter(|item| item.is_ok())
+ .map(|item| item.unwrap())
+ .map(|item: Url| {
+ let ip_port = item.authority().to_owned();
+ let url = item.as_str().to_owned();
+ (ip_port, url)
+ })
+ .collect();
+
+ let mut add_hosts = Vec::new();
+ let mut removed_hosts = Vec::new();
+
+ for (key, new_host) in new_urls_map.iter() {
+ let old_host = old_urls_map.get(key);
+ match old_host {
+ None => {
+ add_hosts.push(new_host.clone());
+ }
+ Some(old_host) => {
+ if !old_host.eq(new_host) {
+ removed_hosts.push(old_host.clone());
+ add_hosts.push(new_host.clone());
+ }
+ }
+ }
+ }
+
+ for (key, old_host) in old_urls_map.iter() {
+ let new_host = old_urls_map.get(key);
+ match new_host {
+ None => {
+ removed_hosts.push(old_host.clone());
+ }
+ Some(_) => {}
+ }
+ }
+
+ (removed_hosts, add_hosts)
+ }
}
impl Default for ZookeeperRegistry {
@@ -248,221 +257,188 @@
}
}
+#[async_trait]
impl Registry for ZookeeperRegistry {
- fn register(&mut self, url: Url) -> Result<(), StdError> {
+ async fn register(&self, url: Url) -> Result<(), StdError> {
debug!("register url: {}", url);
+ let interface_name = url.query::<InterfaceName>().unwrap().value();
+ let url_str = url.as_str();
let zk_path = format!(
"/{}/{}/{}/{}",
- DUBBO_KEY,
- url.service_name,
- PROVIDERS_KEY,
- url.encoded_raw_url_string()
+ DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
);
self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
Ok(())
}
- fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap().value();
+ let url_str = url.as_str();
+
let zk_path = format!(
"/{}/{}/{}/{}",
- DUBBO_KEY,
- url.service_name,
- PROVIDERS_KEY,
- url.encoded_raw_url_string()
+ DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
);
self.delete_path(zk_path.as_str());
Ok(())
}
// for consumer to find the changes of providers
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
- let service_name = url.get_service_name();
- let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
- if self
- .listeners
- .read()
- .unwrap()
- .get(service_name.as_str())
- .is_some()
- {
- return Ok(());
- }
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap().value();
- self.listeners
- .write()
- .unwrap()
- .insert(service_name.to_string(), listener.clone());
+ let zk_path = format!("/{}/{}/{}", DUBBO_KEY, interface_name, PROVIDERS_KEY);
- let zk_listener =
- self.create_listener(zk_path.clone(), service_name.to_string(), listener.clone());
+ debug!("subscribe service: {}", zk_path);
- let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
- let result = match zk_changed_paths {
- Err(err) => {
- error!("zk subscribe error: {}", err);
- Vec::new()
+ let (listener, mut change_rx) = ZooKeeperListener::new();
+ let arc_listener = Arc::new(listener);
+
+ let watcher = ZooKeeperWatcher::new(arc_listener.clone(), zk_path.clone());
+
+ let (discover_tx, discover_rx) = mpsc::channel(64);
+
+ let zk_client_in_task = self.zk_client.clone();
+ let zk_path_in_task = zk_path.clone();
+ let interface_name_in_task = interface_name.clone();
+ let arc_listener_in_task = arc_listener.clone();
+ tokio::spawn(async move {
+ let zk_client = zk_client_in_task;
+ let zk_path = zk_path_in_task;
+ let interface_name = interface_name_in_task;
+ let listener = arc_listener_in_task;
+
+ let mut current_urls = Vec::new();
+
+ loop {
+ let changed = select! {
+ _ = discover_tx.closed() => {
+ info!("discover task quit, discover channel closed");
+ None
+ },
+ changed = change_rx.recv() => {
+ changed
+ }
+ };
+
+ match changed {
+ Some(_) => {
+ let zookeeper_watcher =
+ ZooKeeperWatcher::new(listener.clone(), zk_path.clone());
+
+ match zk_client.get_children_w(&zk_path, zookeeper_watcher) {
+ Ok(children) => {
+ let (removed, add) =
+ ZookeeperRegistry::diff(¤t_urls, &children);
+
+ for url in removed {
+ match discover_tx
+ .send(Ok(ServiceChange::Remove(url.clone())))
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
+ error!("send service change failed: {:?}, maybe user unsubscribe", e);
+ break;
+ }
+ }
+ }
+
+ for url in add {
+ match discover_tx
+ .send(Ok(ServiceChange::Insert(url.clone(), ())))
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
+ error!("send service change failed: {:?}, maybe user unsubscribe", e);
+ break;
+ }
+ }
+ }
+
+ current_urls = children;
+ }
+ Err(err) => {
+ error!("zk subscribe error: {}", err);
+ break;
+ }
+ }
+ }
+ None => {
+ error!("receive service change task quit, unsubscribe {}.", zk_path);
+ break;
+ }
+ }
}
- Ok(urls) => urls
- .iter()
- .map(|node_key| {
- let provider_url: Url = urlencoding::decode(node_key)
- .unwrap()
- .to_string()
- .as_str()
- .into();
- provider_url
- })
- .collect(),
- };
- info!("notifying {}->{:?}", service_name, result);
- listener.notify(ServiceEvent {
- key: service_name,
- action: String::from("ADD"),
- service: result,
+
+ debug!("unsubscribe service: {}", zk_path);
});
+
+ arc_listener.changed(zk_path);
+
+ Ok(discover_rx)
+ }
+
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap().value();
+
+ let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &interface_name, PROVIDERS_KEY);
+
+ info!("unsubscribe service: {}", zk_path);
Ok(())
}
- fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
+ fn url(&self) -> &Url {
todo!()
}
}
-pub struct ServiceInstancesChangedListener {
- zk_client: Arc<ZooKeeper>,
+pub struct ZooKeeperListener {
+ tx: mpsc::Sender<String>,
+}
+
+impl ZooKeeperListener {
+ pub fn new() -> (ZooKeeperListener, mpsc::Receiver<String>) {
+ let (tx, rx) = mpsc::channel(64);
+ let this = ZooKeeperListener { tx };
+ (this, rx)
+ }
+
+ pub fn changed(&self, path: String) {
+ match self.tx.try_send(path) {
+ Ok(_) => {}
+ Err(err) => {
+ error!("send change list to listener occur an error: {}", err);
+ return;
+ }
+ }
+ }
+}
+
+pub struct ZooKeeperWatcher {
+ listener: Arc<ZooKeeperListener>,
path: String,
- service_name: String,
- listener: RegistryNotifyListener,
}
-impl Watcher for ServiceInstancesChangedListener {
+impl ZooKeeperWatcher {
+ pub fn new(listener: Arc<ZooKeeperListener>, path: String) -> ZooKeeperWatcher {
+ ZooKeeperWatcher { listener, path }
+ }
+}
+
+impl Watcher for ZooKeeperWatcher {
fn handle(&self, event: WatchedEvent) {
- if let (WatchedEventType::NodeChildrenChanged, Some(path)) = (event.event_type, event.path)
- {
- let event_path = path.clone();
- let dirs = self
- .zk_client
- .get_children(&event_path, false)
- .expect("msg");
- let result: Vec<Url> = dirs
- .iter()
- .map(|node_key| {
- let provider_url: Url = node_key.as_str().into();
- provider_url
- })
- .collect();
- let res = self.zk_client.get_children_w(
- &path,
- ServiceInstancesChangedListener {
- zk_client: Arc::clone(&self.zk_client),
- path: path.clone(),
- service_name: self.service_name.clone(),
- listener: Arc::clone(&self.listener),
- },
- );
-
- info!("notify {}->{:?}", self.service_name, result);
- self.listener.notify(ServiceEvent {
- key: self.service_name.clone(),
- action: String::from("ADD"),
- service: result,
- });
+ info!("receive zookeeper event: {:?}", event);
+ let event_type: WatchedEventType = event.event_type;
+ match event_type {
+ WatchedEventType::None => {
+ info!("event type is none, ignore it.");
+ return;
+ }
+ _ => {}
}
- }
-}
-impl NotifyListener for ServiceInstancesChangedListener {
- fn notify(&self, event: ServiceEvent) {
- self.listener.notify(event);
- }
-
- fn notify_all(&self, event: ServiceEvent) {
- self.listener.notify(event);
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::sync::Arc;
-
- use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
-
- use crate::ZookeeperRegistry;
-
- struct TestZkWatcher {
- pub watcher: Arc<Option<TestZkWatcher>>,
- }
-
- impl Watcher for TestZkWatcher {
- fn handle(&self, event: WatchedEvent) {
- println!("event: {:?}", event);
- }
- }
-
- #[test]
- fn zk_read_write_watcher() {
- // https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
- // using ENV to set zookeeper server urls
- let zkr = ZookeeperRegistry::default();
- let zk_client = zkr.get_client();
- let watcher = TestZkWatcher {
- watcher: Arc::new(None),
- };
- if zk_client.exists("/test", true).is_err() {
- zk_client
- .create(
- "/test",
- vec![1, 3],
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- )
- .unwrap();
- }
- let zk_res = zk_client.create(
- "/test",
- "hello".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let result = zk_client.get_children_w("/test", watcher);
- assert!(result.is_ok());
- if zk_client.exists("/test/a", true).is_err() {
- zk_client.delete("/test/a", None).unwrap();
- }
- if zk_client.exists("/test/a", true).is_err() {
- zk_client.delete("/test/b", None).unwrap();
- }
- let zk_res = zk_client.create(
- "/test/a",
- "hello".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let zk_res = zk_client.create(
- "/test/b",
- "world".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let test_a_result = zk_client.get_data("/test", true);
- assert!(test_a_result.is_ok());
- let vec1 = test_a_result.unwrap().0;
- // data in /test should equals to "hello"
- assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
- zk_client.close().unwrap()
- }
-
- #[test]
- fn create_path_with_parent_check() {
- let zkr = ZookeeperRegistry::default();
- let path = "/du1bbo/test11111";
- let data = "hello";
- // creating a child on a not exists parent, throw a NoNode error.
- // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
- // assert!(result.is_err());
- let create_with_parent_check_result =
- zkr.create_path_with_parent_check(path, data, CreateMode::Ephemeral);
- assert!(create_with_parent_check_result.is_ok());
- assert_eq!(data, zkr.get_data(path, false).unwrap());
+ self.listener.changed(self.path.clone());
}
}
diff --git a/remoting/base/src/exchange/client.rs b/remoting/base/src/exchange/client.rs
index edbb4a6..71c42d4 100644
--- a/remoting/base/src/exchange/client.rs
+++ b/remoting/base/src/exchange/client.rs
@@ -51,7 +51,7 @@
pub fn new(url: Url, client: BoxedClient, connection_timeout: Duration) -> Self {
ExchangeClient {
connection_timeout,
- address: url.get_ip_port(),
+ address: url.authority().to_owned(),
client: None,
init: AtomicBool::new(false),
active: AtomicI32::new(0),