Ftr: add extension module (#181)
* Ftr: add extension module
- adapt static registry by extension
- adapt nacos registry by extension
link #180
* cargo fmt all
* fix ci error
* fix nacos image version error
* Rft: re-design extension register
* Fix: cargo fix
* Fix: add some license for every files
- extract UrlParam to single file
- fix github ci error
* Fix: fmt all
* Fix: Add license for extension_param.rs and registry_param.rs
* Fix: rename query_param_by_kv method name
diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml
index 878a5e1..8f5e9aa 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -60,6 +60,13 @@
- 2181:2181
env:
ZOO_MY_ID: 1
+ nacos:
+ image: nacos/nacos-server:v2.3.1
+ ports:
+ - 8848:8848
+ - 9848:9848
+ env:
+ MODE: standalone
steps:
- uses: actions/checkout@main
- uses: actions-rs/toolchain@v1
diff --git a/Cargo.toml b/Cargo.toml
index 0abff00..50d46f0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -58,6 +58,7 @@
once_cell = "1.16.0"
itertools = "0.10.1"
bytes = "1.0"
+url = "2.5.0"
diff --git a/common/base/Cargo.toml b/common/base/Cargo.toml
index 40579e0..678af77 100644
--- a/common/base/Cargo.toml
+++ b/common/base/Cargo.toml
@@ -6,6 +6,5 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-urlencoding.workspace = true
-http = "0.2"
-dubbo-logger.workspace = true
\ No newline at end of file
+dubbo-logger.workspace = true
+url.workspace = true
\ No newline at end of file
diff --git a/common/base/src/extension_param.rs b/common/base/src/extension_param.rs
new file mode 100644
index 0000000..93e0a16
--- /dev/null
+++ b/common/base/src/extension_param.rs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::{url::UrlParam, StdError};
+use std::{borrow::Cow, convert::Infallible, str::FromStr};
+
+pub struct ExtensionName(String);
+
+impl ExtensionName {
+ pub fn new(name: String) -> Self {
+ ExtensionName(name)
+ }
+}
+
+impl UrlParam for ExtensionName {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "extension-name"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for ExtensionName {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(ExtensionName::new(s.to_string()))
+ }
+}
+
+pub enum ExtensionType {
+ Registry,
+}
+
+impl UrlParam for ExtensionType {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "extension-type"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ match self {
+ ExtensionType::Registry => "registry".to_owned(),
+ }
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ match self {
+ ExtensionType::Registry => Cow::Borrowed("registry"),
+ }
+ }
+}
+
+impl FromStr for ExtensionType {
+ type Err = Infallible;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "registry" => Ok(ExtensionType::Registry),
+ _ => panic!("the extension type enum is not in range"),
+ }
+ }
+}
diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs
index b97b342..dcc9256 100644
--- a/common/base/src/lib.rs
+++ b/common/base/src/lib.rs
@@ -19,8 +19,12 @@
allow(dead_code, unused_imports, unused_variables, unused_mut)
)]
pub mod constants;
+pub mod extension_param;
pub mod node;
+pub mod registry_param;
pub mod url;
pub use node::Node;
pub use url::Url;
+
+pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
diff --git a/common/base/src/registry_param.rs b/common/base/src/registry_param.rs
new file mode 100644
index 0000000..8aa7c07
--- /dev/null
+++ b/common/base/src/registry_param.rs
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::{url::UrlParam, StdError, Url};
+use std::{borrow::Cow, convert::Infallible, str::FromStr};
+
+pub struct RegistryUrl(Url);
+
+impl RegistryUrl {
+ pub fn new(url: Url) -> Self {
+ Self(url)
+ }
+}
+
+impl UrlParam for RegistryUrl {
+ type TargetType = Url;
+
+ fn name() -> &'static str {
+ "registry"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for RegistryUrl {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.parse()?))
+ }
+}
+
+pub struct ServiceNamespace(String);
+
+impl ServiceNamespace {
+ pub fn new(namespace: String) -> Self {
+ Self(namespace)
+ }
+}
+
+impl UrlParam for ServiceNamespace {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "namespace"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for ServiceNamespace {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for ServiceNamespace {
+ fn default() -> Self {
+ Self("public".to_string())
+ }
+}
+
+pub struct AppName(String);
+
+impl AppName {
+ pub fn new(app_name: String) -> Self {
+ Self(app_name)
+ }
+}
+
+impl UrlParam for AppName {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "app_name"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for AppName {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for AppName {
+ fn default() -> Self {
+ Self("UnknownApp".to_string())
+ }
+}
+
+pub struct InterfaceName(String);
+
+impl InterfaceName {
+ pub fn new(interface_name: String) -> Self {
+ Self(interface_name)
+ }
+}
+
+impl UrlParam for InterfaceName {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "interface"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for InterfaceName {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for InterfaceName {
+ fn default() -> Self {
+ Self("".to_string())
+ }
+}
+
+pub struct Category(String);
+
+impl Category {
+ pub fn new(category: String) -> Self {
+ Self(category)
+ }
+}
+
+impl UrlParam for Category {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "category"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for Category {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for Category {
+ fn default() -> Self {
+ Self("".to_string())
+ }
+}
+
+pub struct Version(String);
+
+impl Version {
+ pub fn new(version: String) -> Self {
+ Self(version)
+ }
+}
+
+impl UrlParam for Version {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "version"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for Version {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for Version {
+ fn default() -> Self {
+ Self("".to_string())
+ }
+}
+
+pub struct Group(String);
+
+impl Group {
+ pub fn new(group: String) -> Self {
+ Self(group)
+ }
+}
+
+impl UrlParam for Group {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "group"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.clone()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ self.0.as_str().into()
+ }
+}
+
+impl FromStr for Group {
+ type Err = StdError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for Group {
+ fn default() -> Self {
+ Self("".to_string())
+ }
+}
+
+pub enum Side {
+ Provider,
+ Consumer,
+}
+
+impl UrlParam for Side {
+ type TargetType = String;
+
+ fn name() -> &'static str {
+ "side"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ match self {
+ Side::Consumer => "consumer".to_owned(),
+ Side::Provider => "provider".to_owned(),
+ }
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ match self {
+ Side::Consumer => Cow::Borrowed("consumer"),
+ Side::Provider => Cow::Borrowed("provider"),
+ }
+ }
+}
+
+impl FromStr for Side {
+ type Err = Infallible;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "consumer" => Ok(Side::Consumer),
+ "provider" => Ok(Side::Provider),
+ _ => Ok(Side::Consumer),
+ }
+ }
+}
+
+impl Default for Side {
+ fn default() -> Self {
+ Side::Consumer
+ }
+}
+
+pub struct StaticInvokerUrls(String);
+
+impl UrlParam for StaticInvokerUrls {
+ type TargetType = Vec<Url>;
+
+ fn name() -> &'static str {
+ "static-invoker-urls"
+ }
+
+ fn value(&self) -> Self::TargetType {
+ self.0.split(",").map(|url| url.parse().unwrap()).collect()
+ }
+
+ fn as_str(&self) -> Cow<str> {
+ Cow::Borrowed(&self.0)
+ }
+}
+
+impl FromStr for StaticInvokerUrls {
+ type Err = Infallible;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self(s.to_string()))
+ }
+}
+
+impl Default for StaticInvokerUrls {
+ fn default() -> Self {
+ Self(String::default())
+ }
+}
diff --git a/common/base/src/url.rs b/common/base/src/url.rs
index 82b026f..ac97f26 100644
--- a/common/base/src/url.rs
+++ b/common/base/src/url.rs
@@ -16,237 +16,162 @@
*/
use std::{
+ borrow::Cow,
collections::HashMap,
- fmt::{Display, Formatter},
+ fmt::{Debug, Display, Formatter},
+ str::FromStr,
};
-use crate::constants::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
-use http::Uri;
-
-#[derive(Debug, Clone, Default, PartialEq)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Url {
- pub raw_url_string: String,
- // value of scheme is different to base name, eg. triple -> tri://
- pub scheme: String,
- pub location: String,
- pub ip: String,
- pub port: String,
- // serviceKey format in dubbo java and go '{group}/{interfaceName}:{version}'
- pub service_key: String,
- // same to interfaceName
- pub service_name: String,
- pub params: HashMap<String, String>,
+ inner: url::Url,
}
impl Url {
- pub fn new() -> Self {
- Default::default()
+ pub fn empty() -> Self {
+ "empty://localhost".parse().unwrap()
}
- pub fn from_url(url: &str) -> Option<Self> {
- // url: triple://127.0.0.1:8888/helloworld.Greeter
- let uri = url
- .parse::<http::Uri>()
- .map_err(|err| {
- dubbo_logger::tracing::error!("fail to parse url({}), err: {:?}", url, err);
- })
- .unwrap();
- let query = uri.path_and_query().unwrap().query();
- let mut url_inst = Self {
- raw_url_string: url.to_string(),
- scheme: uri.scheme_str()?.to_string(),
- ip: uri.authority()?.host().to_string(),
- port: uri.authority()?.port()?.to_string(),
- location: uri.authority()?.to_string(),
- service_key: uri.path().trim_start_matches('/').to_string(),
- service_name: uri.path().trim_start_matches('/').to_string(),
- params: if let Some(..) = query {
- Url::decode(query.unwrap())
- } else {
- HashMap::new()
- },
- };
- url_inst.renew_raw_url_string();
- Some(url_inst)
+ pub fn protocol(&self) -> &str {
+ self.inner.scheme()
}
- pub fn get_service_key(&self) -> String {
- self.service_key.clone()
+ pub fn host(&self) -> Option<&str> {
+ self.inner.host_str()
}
- pub fn get_service_name(&self) -> String {
- self.service_name.clone()
+ pub fn authority(&self) -> &str {
+ self.inner.authority()
}
- pub fn get_param(&self, key: &str) -> Option<String> {
- self.params.get(key).cloned()
+ pub fn username(&self) -> &str {
+ self.inner.username()
}
- fn encode_param(&self) -> String {
- let mut params_vec: Vec<String> = Vec::new();
- for (k, v) in self.params.iter() {
- // let tmp = format!("{}={}", k, v);
- params_vec.push(format!("{}={}", k, v));
- }
- if params_vec.is_empty() {
- "".to_string()
- } else {
- format!("?{}", params_vec.join("&"))
- }
+ pub fn password(&self) -> Option<&str> {
+ self.inner.password()
}
- pub fn params_count(&self) -> usize {
- self.params.len()
+ pub fn port(&self) -> Option<u16> {
+ self.inner.port_or_known_default()
}
- fn decode(raw_query_string: &str) -> HashMap<String, String> {
- let mut params = HashMap::new();
- let p: Vec<String> = raw_query_string
- .split('&')
- .map(|v| v.trim().to_string())
- .collect();
- for v in p.iter() {
- let values: Vec<String> = v.split('=').map(|v| v.trim().to_string()).collect();
- if values.len() != 2 {
- continue;
- }
- params.insert(values[0].clone(), values[1].clone());
- }
- params
+ pub fn path(&self) -> &str {
+ self.inner.path()
}
- pub fn set_param(&mut self, key: &str, value: &str) {
- self.params.insert(key.to_string(), value.to_string());
- self.renew_raw_url_string();
+ pub fn query<T: UrlParam>(&self) -> Option<T> {
+ self.inner
+ .query_pairs()
+ .find(|(k, _)| k == T::name())
+ .map(|(_, v)| T::from_str(&v).ok())
+ .flatten()
}
- pub fn raw_url_string(&self) -> String {
- self.raw_url_string.clone()
+ pub fn query_param_by_key(&self, key: &str) -> Option<String> {
+ self.inner
+ .query_pairs()
+ .find(|(k, _)| k == key)
+ .map(|(_, v)| v.into_owned())
}
- pub fn encoded_raw_url_string(&self) -> String {
- urlencoding::encode(self.raw_url_string.as_str()).to_string()
+ pub fn all_query_params(&self) -> HashMap<String, String> {
+ self.inner
+ .query_pairs()
+ .map(|(k, v)| (k.into_owned(), v.into_owned()))
+ .collect()
}
- fn build_service_key(&self) -> String {
- format!(
- "{group}/{interfaceName}:{version}",
- group = self.get_param(GROUP_KEY).unwrap_or("default".to_string()),
- interfaceName = self.get_param(INTERFACE_KEY).unwrap_or("error".to_string()),
- version = self.get_param(VERSION_KEY).unwrap_or("1.0.0".to_string())
- )
+ pub fn set_protocol(&mut self, protocol: &str) {
+ let _ = self.inner.set_scheme(protocol);
}
- pub fn to_url(&self) -> String {
- self.raw_url_string()
+ pub fn set_host(&mut self, host: &str) {
+ let _ = self.inner.set_host(Some(host));
}
- fn renew_raw_url_string(&mut self) {
- self.raw_url_string = format!(
- "{}://{}:{}/{}{}",
- self.scheme,
- self.ip,
- self.port,
- self.service_name,
- self.encode_param()
- );
- self.service_key = self.build_service_key()
+ pub fn set_port(&mut self, port: u16) {
+ let _ = self.inner.set_port(Some(port));
}
- // short_url is used for tcp listening
- pub fn short_url(&self) -> String {
- format!(
- "{}://{}:{}/{}",
- self.scheme, self.ip, self.port, self.service_name
- )
+ pub fn set_username(&mut self, username: &str) {
+ let _ = self.inner.set_username(username);
}
- pub fn protocol(&self) -> String {
- self.scheme.clone()
+ pub fn set_password(&mut self, password: &str) {
+ let _ = self.inner.set_password(Some(password));
}
- pub fn get_ip_port(&self) -> String {
- format!("{}:{}", self.ip, self.port)
+ pub fn set_path(&mut self, path: &str) {
+ let _ = self.inner.set_path(path);
+ }
+
+ pub fn extend_pairs(&mut self, pairs: impl Iterator<Item = (String, String)>) {
+ let mut query_pairs = self.inner.query_pairs_mut();
+ query_pairs.extend_pairs(pairs);
+ }
+
+ pub fn add_query_param<T: UrlParam>(&mut self, param: T) {
+ let mut pairs = self.inner.query_pairs_mut();
+ pairs.append_pair(T::name(), ¶m.as_str());
+ }
+
+ pub fn remove_query_param<T: UrlParam>(&mut self) {
+ let query = self.inner.query_pairs().filter(|(k, v)| k.ne(T::name()));
+ let mut inner_url = self.inner.clone();
+ inner_url.query_pairs_mut().clear().extend_pairs(query);
+ self.inner = inner_url;
+ }
+
+ pub fn remove_all_param(&mut self) {
+ self.inner.query_pairs_mut().clear();
+ }
+
+ pub fn as_str(&self) -> &str {
+ self.inner.as_str()
+ }
+
+ pub fn short_url_without_query(&self) -> String {
+ let mut url = self.inner.clone();
+ url.set_query(Some(""));
+ url.into()
+ }
+}
+
+impl FromStr for Url {
+ type Err = url::ParseError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Url {
+ inner: url::Url::parse(s)?,
+ })
}
}
impl Display for Url {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.write_str(self.raw_url_string().as_str())
+ std::fmt::Display::fmt(&self.inner, f)
}
}
-impl Into<Uri> for Url {
- fn into(self) -> Uri {
- self.raw_url_string.parse::<Uri>().unwrap()
+impl Debug for Url {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ std::fmt::Debug::fmt(&self.inner, f)
}
}
-impl From<&str> for Url {
- fn from(url: &str) -> Self {
- Url::from_url(url).unwrap()
+impl From<Url> for String {
+ fn from(url: Url) -> Self {
+ url.inner.into()
}
}
-#[cfg(test)]
-mod tests {
- use crate::{
- constants::{ANYHOST_KEY, VERSION_KEY},
- url::Url,
- };
+pub trait UrlParam: FromStr {
+ type TargetType;
- #[test]
- fn test_from_url() {
- let mut u1 = Url::from_url("tri://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&\
- application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&\
- environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&\
- module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&\
- side=provider&timeout=3000×tamp=1556509797245&version=1.0.0&application=test");
- assert_eq!(
- u1.as_ref().unwrap().service_key,
- "default/com.ikurento.user.UserProvider:1.0.0"
- );
- assert_eq!(
- u1.as_ref()
- .unwrap()
- .get_param(ANYHOST_KEY)
- .unwrap()
- .as_str(),
- "true"
- );
- assert_eq!(
- u1.as_ref()
- .unwrap()
- .get_param("default.timeout")
- .unwrap()
- .as_str(),
- "10000"
- );
- assert_eq!(u1.as_ref().unwrap().scheme, "tri");
- assert_eq!(u1.as_ref().unwrap().ip, "127.0.0.1");
- assert_eq!(u1.as_ref().unwrap().port, "20000");
- assert_eq!(u1.as_ref().unwrap().params_count(), 18);
- u1.as_mut().unwrap().set_param("key1", "value1");
- assert_eq!(
- u1.as_ref().unwrap().get_param("key1").unwrap().as_str(),
- "value1"
- );
- assert_eq!(
- u1.as_ref()
- .unwrap()
- .get_param(VERSION_KEY)
- .unwrap()
- .as_str(),
- "1.0.0"
- );
- }
+ fn name() -> &'static str;
- #[test]
- fn test2() {
- let url: Url = "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter".into();
- assert_eq!(
- url.raw_url_string(),
- "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter"
- )
- }
+ fn value(&self) -> Self::TargetType;
+
+ fn as_str(&self) -> Cow<str>;
}
diff --git a/config/src/router.rs b/config/src/router.rs
index b45bd47..7976f6e 100644
--- a/config/src/router.rs
+++ b/config/src/router.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
diff --git a/config/src/service.rs b/config/src/service.rs
index 1f85a92..8a1f191 100644
--- a/config/src/service.rs
+++ b/config/src/service.rs
@@ -41,16 +41,4 @@
pub fn protocol(self, protocol: String) -> Self {
Self { protocol, ..self }
}
-
- // pub fn get_url(&self) -> Vec<Url> {
- // let mut urls = Vec::new();
- // for (_, conf) in self.protocol_configs.iter() {
- // urls.push(Url {
- // url: conf.to_owned().to_url(),
- // service_key: "".to_string(),
- // });
- // }
-
- // urls
- // }
}
diff --git a/dubbo/src/cluster/failover.rs b/dubbo/src/cluster/failover.rs
index 8a00c9f..a223ddf 100644
--- a/dubbo/src/cluster/failover.rs
+++ b/dubbo/src/cluster/failover.rs
@@ -1,12 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use std::task::Poll;
+use dubbo_base::StdError;
use futures_util::future;
use http::Request;
use tower::{retry::Retry, util::Oneshot, ServiceExt};
use tower_service::Service;
-use crate::StdError;
-
pub struct Failover<N> {
inner: N, // loadbalancer service
}
diff --git a/dubbo/src/cluster/router/condition/condition_router.rs b/dubbo/src/cluster/router/condition/condition_router.rs
index 73aca00..21b525a 100644
--- a/dubbo/src/cluster/router/condition/condition_router.rs
+++ b/dubbo/src/cluster/router/condition/condition_router.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use crate::{
cluster::router::{condition::single_router::ConditionSingleRouter, Router},
codegen::RpcInvocation,
diff --git a/dubbo/src/cluster/router/condition/matcher.rs b/dubbo/src/cluster/router/condition/matcher.rs
index 92bbe2d..2ee33d6 100644
--- a/dubbo/src/cluster/router/condition/matcher.rs
+++ b/dubbo/src/cluster/router/condition/matcher.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use regex::Regex;
use std::{collections::HashSet, error::Error, option::Option};
diff --git a/dubbo/src/cluster/router/condition/mod.rs b/dubbo/src/cluster/router/condition/mod.rs
index 7285b88..d4a83b9 100644
--- a/dubbo/src/cluster/router/condition/mod.rs
+++ b/dubbo/src/cluster/router/condition/mod.rs
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
pub mod condition_router;
pub mod matcher;
pub mod single_router;
diff --git a/dubbo/src/cluster/router/condition/single_router.rs b/dubbo/src/cluster/router/condition/single_router.rs
index 5f06aa8..54c61cb 100644
--- a/dubbo/src/cluster/router/condition/single_router.rs
+++ b/dubbo/src/cluster/router/condition/single_router.rs
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
use dubbo_base::Url;
use dubbo_logger::tracing::info;
use regex::Regex;
diff --git a/dubbo/src/cluster/router/manager/condition_manager.rs b/dubbo/src/cluster/router/manager/condition_manager.rs
index 7ad5e1b..7772950 100644
--- a/dubbo/src/cluster/router/manager/condition_manager.rs
+++ b/dubbo/src/cluster/router/manager/condition_manager.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use crate::cluster::router::condition::{
condition_router::{ConditionRouter, ConditionSingleRouters},
single_router::ConditionSingleRouter,
diff --git a/dubbo/src/cluster/router/manager/mod.rs b/dubbo/src/cluster/router/manager/mod.rs
index 025f6c1..593fa22 100644
--- a/dubbo/src/cluster/router/manager/mod.rs
+++ b/dubbo/src/cluster/router/manager/mod.rs
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
mod condition_manager;
pub mod router_manager;
mod tag_manager;
diff --git a/dubbo/src/cluster/router/manager/router_manager.rs b/dubbo/src/cluster/router/manager/router_manager.rs
index e963181..e6c8b6c 100644
--- a/dubbo/src/cluster/router/manager/router_manager.rs
+++ b/dubbo/src/cluster/router/manager/router_manager.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use crate::cluster::router::{
manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager},
nacos_config_center::nacos_client::NacosClient,
diff --git a/dubbo/src/cluster/router/manager/tag_manager.rs b/dubbo/src/cluster/router/manager/tag_manager.rs
index 8dc2499..f028af2 100644
--- a/dubbo/src/cluster/router/manager/tag_manager.rs
+++ b/dubbo/src/cluster/router/manager/tag_manager.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use crate::cluster::router::tag::tag_router::{TagRouter, TagRouterInner};
use dubbo_config::router::TagRouterConfig;
use std::sync::{Arc, RwLock};
diff --git a/dubbo/src/cluster/router/mod.rs b/dubbo/src/cluster/router/mod.rs
index 17c9aec..edc081b 100644
--- a/dubbo/src/cluster/router/mod.rs
+++ b/dubbo/src/cluster/router/mod.rs
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
pub mod condition;
pub mod manager;
pub mod nacos_config_center;
diff --git a/dubbo/src/cluster/router/nacos_config_center/mod.rs b/dubbo/src/cluster/router/nacos_config_center/mod.rs
index 7878fa9..7172231 100644
--- a/dubbo/src/cluster/router/nacos_config_center/mod.rs
+++ b/dubbo/src/cluster/router/nacos_config_center/mod.rs
@@ -1 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
pub mod nacos_client;
diff --git a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
index ce72641..68b5f09 100644
--- a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
+++ b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use crate::cluster::router::manager::router_manager::{
get_global_router_manager, RouterConfigChangeEvent,
};
diff --git a/dubbo/src/cluster/router/router_chain.rs b/dubbo/src/cluster/router/router_chain.rs
index 42d5826..601bc5e 100644
--- a/dubbo/src/cluster/router/router_chain.rs
+++ b/dubbo/src/cluster/router/router_chain.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use crate::{cluster::router::BoxRouter, invocation::RpcInvocation};
use dubbo_base::Url;
use std::{collections::HashMap, sync::Arc};
diff --git a/dubbo/src/cluster/router/tag/mod.rs b/dubbo/src/cluster/router/tag/mod.rs
index 6ac5b21..673a720 100644
--- a/dubbo/src/cluster/router/tag/mod.rs
+++ b/dubbo/src/cluster/router/tag/mod.rs
@@ -1 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
pub mod tag_router;
diff --git a/dubbo/src/cluster/router/tag/tag_router.rs b/dubbo/src/cluster/router/tag/tag_router.rs
index 7a83ea5..3d28f93 100644
--- a/dubbo/src/cluster/router/tag/tag_router.rs
+++ b/dubbo/src/cluster/router/tag/tag_router.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use crate::{
cluster::router::{utils::to_original_map, Router},
codegen::RpcInvocation,
diff --git a/dubbo/src/cluster/router/utils.rs b/dubbo/src/cluster/router/utils.rs
index 2ca50fc..eca98f6 100644
--- a/dubbo/src/cluster/router/utils.rs
+++ b/dubbo/src/cluster/router/utils.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use dubbo_base::Url;
use std::{collections::HashMap, string::String};
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 452f560..eb1d385 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -22,6 +22,7 @@
pub use async_trait::async_trait;
pub use bytes::Bytes;
+pub use dubbo_base::StdError;
pub use http_body::Body;
pub use hyper::Body as hyperBody;
pub use tower_service::Service;
@@ -39,7 +40,7 @@
TripleServer,
},
},
- BoxBody, BoxFuture, StdError,
+ BoxBody, BoxFuture,
};
pub use crate::{
filter::{service::FilterService, Filter},
diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs
index f0d9ebe..ace67e6 100644
--- a/dubbo/src/directory/mod.rs
+++ b/dubbo/src/directory/mod.rs
@@ -27,21 +27,21 @@
invocation::Invocation,
invoker::{clone_invoker::CloneInvoker, NewInvoker},
param::Param,
- registry::n_registry::Registry,
svc::NewService,
- StdError,
};
-use dubbo_base::Url;
-use dubbo_logger::tracing::debug;
-use futures_core::ready;
+use dubbo_base::{StdError, Url};
+use dubbo_logger::tracing::{debug, error};
use futures_util::future;
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tower::{
buffer::Buffer,
discover::{Change, Discover},
+ ServiceExt,
};
+use crate::extension::registry_extension::{proxy::RegistryProxy, Registry};
+use dubbo_base::registry_param::InterfaceName;
use tower_service::Service;
type BufferedDirectory =
@@ -49,7 +49,8 @@
pub struct NewCachedDirectory<N>
where
- N: Registry + Clone + Send + Sync + 'static,
+ N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+ <N as Service<()>>::Future: Send + 'static,
{
inner: CachedDirectory<NewDirectory<N>, RpcInvocation>,
}
@@ -76,7 +77,8 @@
impl<N> NewCachedDirectory<N>
where
- N: Registry + Clone + Send + Sync + 'static,
+ N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+ <N as Service<()>>::Future: Send + 'static,
{
pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
tower_layer::layer_fn(|inner: N| {
@@ -92,7 +94,8 @@
where
T: Param<RpcInvocation>,
// service registry
- N: Registry + Clone + Send + Sync + 'static,
+ N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+ <N as Service<()>>::Future: Send + 'static,
{
type Service = BufferedDirectory;
@@ -151,14 +154,15 @@
where
T: Param<RpcInvocation>,
// service registry
- N: Registry + Clone + Send + Sync + 'static,
+ N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+ <N as Service<()>>::Future: Send + 'static,
{
type Service = BufferedDirectory;
fn new_service(&self, target: T) -> Self::Service {
let service_name = target.param().get_target_service_unique_name();
- let registry = self.inner.clone();
+ let fut = self.inner.clone().oneshot(());
let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE);
@@ -166,7 +170,14 @@
// todo use dubbo url model generate subscribe url
// category:serviceInterface:version:group
let consumer_url = format!("consumer://{}/{}", "127.0.0.1:8888", service_name);
- let subscribe_url = Url::from_url(&consumer_url).unwrap();
+ let mut subscribe_url: Url = consumer_url.parse().unwrap();
+ subscribe_url.add_query_param(InterfaceName::new(service_name));
+
+ let Ok(registry) = fut.await else {
+ error!("registry extension load failed.");
+ return;
+ };
+
let receiver = registry.subscribe(subscribe_url).await;
debug!("discover start!");
match receiver {
diff --git a/dubbo/src/extension/mod.rs b/dubbo/src/extension/mod.rs
new file mode 100644
index 0000000..5641bea
--- /dev/null
+++ b/dubbo/src/extension/mod.rs
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod registry_extension;
+
+use crate::{
+ extension::registry_extension::proxy::RegistryProxy, registry::registry::StaticRegistry,
+};
+use dubbo_base::{extension_param::ExtensionType, url::UrlParam, StdError, Url};
+use dubbo_logger::tracing::{error, info};
+use thiserror::Error;
+use tokio::sync::oneshot;
+
+pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectoryCommander> =
+ once_cell::sync::Lazy::new(|| ExtensionDirectory::init());
+
+#[derive(Default)]
+struct ExtensionDirectory {
+ registry_extension_loader: registry_extension::RegistryExtensionLoader,
+}
+
+impl ExtensionDirectory {
+ fn init() -> ExtensionDirectoryCommander {
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<ExtensionOpt>(64);
+
+ tokio::spawn(async move {
+ let mut extension_directory = ExtensionDirectory::default();
+
+ // register static registry extension
+ let _ = extension_directory
+ .register(
+ StaticRegistry::name(),
+ StaticRegistry::convert_to_extension_factories(),
+ ExtensionType::Registry,
+ )
+ .await;
+
+ while let Some(extension_opt) = rx.recv().await {
+ match extension_opt {
+ ExtensionOpt::Register(
+ extension_name,
+ extension_factories,
+ extension_type,
+ tx,
+ ) => {
+ let result = extension_directory
+ .register(extension_name, extension_factories, extension_type)
+ .await;
+ let _ = tx.send(result);
+ }
+ ExtensionOpt::Remove(extension_name, extension_type, tx) => {
+ let result = extension_directory
+ .remove(extension_name, extension_type)
+ .await;
+ let _ = tx.send(result);
+ }
+ ExtensionOpt::Load(url, extension_type, tx) => {
+ let result = extension_directory.load(url, extension_type).await;
+ let _ = tx.send(result);
+ }
+ }
+ }
+ });
+
+ ExtensionDirectoryCommander { sender: tx }
+ }
+
+ async fn register(
+ &mut self,
+ extension_name: String,
+ extension_factories: ExtensionFactories,
+ extension_type: ExtensionType,
+ ) -> Result<(), StdError> {
+ match extension_type {
+ ExtensionType::Registry => match extension_factories {
+ ExtensionFactories::RegistryExtensionFactory(registry_extension_factory) => {
+ self.registry_extension_loader
+ .register(extension_name, registry_extension_factory)
+ .await;
+ Ok(())
+ }
+ },
+ }
+ }
+
+ async fn remove(
+ &mut self,
+ extension_name: String,
+ extension_type: ExtensionType,
+ ) -> Result<(), StdError> {
+ match extension_type {
+ ExtensionType::Registry => {
+ self.registry_extension_loader.remove(extension_name).await;
+ Ok(())
+ }
+ }
+ }
+
+ async fn load(
+ &mut self,
+ url: Url,
+ extension_type: ExtensionType,
+ ) -> Result<Extensions, StdError> {
+ match extension_type {
+ ExtensionType::Registry => {
+ let extension = self.registry_extension_loader.load(&url).await;
+ match extension {
+ Ok(extension) => Ok(Extensions::Registry(extension)),
+ Err(err) => {
+ error!("load extension failed: {}", err);
+ Err(err)
+ }
+ }
+ }
+ }
+ }
+}
+
+pub struct ExtensionDirectoryCommander {
+ sender: tokio::sync::mpsc::Sender<ExtensionOpt>,
+}
+
+impl ExtensionDirectoryCommander {
+ #[allow(private_bounds)]
+ pub async fn register<T>(&self) -> Result<(), StdError>
+ where
+ T: Extension,
+ T: ExtensionMetaInfo,
+ T: ConvertToExtensionFactories,
+ {
+ let extension_name = T::name();
+ let extension_factories = T::convert_to_extension_factories();
+ let extension_type = T::extension_type();
+
+ info!(
+ "register extension: {}, type: {}",
+ extension_name,
+ extension_type.as_str()
+ );
+
+ let (tx, rx) = oneshot::channel();
+
+ let send = self
+ .sender
+ .send(ExtensionOpt::Register(
+ extension_name.clone(),
+ extension_factories,
+ extension_type,
+ tx,
+ ))
+ .await;
+
+ let Ok(_) = send else {
+ let err_msg = format!("register extension {} failed", extension_name);
+ return Err(RegisterExtensionError::new(err_msg).into());
+ };
+
+ let ret = rx.await;
+
+ match ret {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ let err_msg = format!("register extension {} failed", extension_name);
+ Err(RegisterExtensionError::new(err_msg).into())
+ }
+ }
+ }
+
+ #[allow(private_bounds)]
+ pub async fn remove<T>(&self) -> Result<(), StdError>
+ where
+ T: Extension,
+ T: ExtensionMetaInfo,
+ {
+ let extension_name = T::name();
+ let extension_type = T::extension_type();
+
+ info!(
+ "remove extension: {}, type: {}",
+ extension_name,
+ extension_type.as_str()
+ );
+
+ let (tx, rx) = oneshot::channel();
+
+ let send = self
+ .sender
+ .send(ExtensionOpt::Remove(
+ extension_name.clone(),
+ extension_type,
+ tx,
+ ))
+ .await;
+
+ let Ok(_) = send else {
+ let err_msg = format!("remove extension {} failed", extension_name);
+ return Err(RemoveExtensionError::new(err_msg).into());
+ };
+
+ let ret = rx.await;
+
+ match ret {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ let err_msg = format!("remove extension {} failed", extension_name);
+ Err(RemoveExtensionError::new(err_msg).into())
+ }
+ }
+ }
+
+ pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, StdError> {
+ let url_str = url.to_string();
+ info!("load registry extension: {}", url_str);
+
+ let (tx, rx) = oneshot::channel();
+
+ let send = self
+ .sender
+ .send(ExtensionOpt::Load(url, ExtensionType::Registry, tx))
+ .await;
+
+ let Ok(_) = send else {
+ let err_msg = format!("load registry extension failed: {}", url_str);
+ return Err(LoadExtensionError::new(err_msg).into());
+ };
+
+ let extensions = rx.await;
+
+ let Ok(extension) = extensions else {
+ let err_msg = format!("load registry extension failed: {}", url_str);
+ return Err(LoadExtensionError::new(err_msg).into());
+ };
+
+ let Ok(extensions) = extension else {
+ let err_msg = format!("load registry extension failed: {}", url_str);
+ return Err(LoadExtensionError::new(err_msg).into());
+ };
+
+ match extensions {
+ Extensions::Registry(proxy) => Ok(proxy),
+ }
+ }
+}
+
+enum ExtensionOpt {
+ Register(
+ String,
+ ExtensionFactories,
+ ExtensionType,
+ oneshot::Sender<Result<(), StdError>>,
+ ),
+ Remove(String, ExtensionType, oneshot::Sender<Result<(), StdError>>),
+ Load(
+ Url,
+ ExtensionType,
+ oneshot::Sender<Result<Extensions, StdError>>,
+ ),
+}
+
+pub(crate) trait Sealed {}
+
+#[allow(private_bounds)]
+#[async_trait::async_trait]
+pub trait Extension: Sealed {
+ type Target;
+
+ fn name() -> String;
+
+ async fn create(url: &Url) -> Result<Self::Target, StdError>;
+}
+
+#[allow(private_bounds)]
+pub(crate) trait ExtensionMetaInfo {
+ fn extension_type() -> ExtensionType;
+}
+
+pub(crate) enum Extensions {
+ Registry(RegistryProxy),
+}
+
+pub(crate) enum ExtensionFactories {
+ RegistryExtensionFactory(registry_extension::RegistryExtensionFactory),
+}
+
+#[allow(private_bounds)]
+pub(crate) trait ConvertToExtensionFactories {
+ fn convert_to_extension_factories() -> ExtensionFactories;
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub(crate) struct RegisterExtensionError(String);
+
+impl RegisterExtensionError {
+ pub fn new(msg: String) -> Self {
+ RegisterExtensionError(msg)
+ }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub struct RemoveExtensionError(String);
+
+impl RemoveExtensionError {
+ pub fn new(msg: String) -> Self {
+ RemoveExtensionError(msg)
+ }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub struct LoadExtensionError(String);
+
+impl LoadExtensionError {
+ pub fn new(msg: String) -> Self {
+ LoadExtensionError(msg)
+ }
+}
diff --git a/dubbo/src/extension/registry_extension.rs b/dubbo/src/extension/registry_extension.rs
new file mode 100644
index 0000000..e27d6a5
--- /dev/null
+++ b/dubbo/src/extension/registry_extension.rs
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{collections::HashMap, future::Future, pin::Pin};
+
+use async_trait::async_trait;
+use thiserror::Error;
+use tokio::sync::mpsc::Receiver;
+use tower::discover::Change;
+
+use dubbo_base::{
+ extension_param::ExtensionName, registry_param::RegistryUrl, url::UrlParam, StdError, Url,
+};
+use proxy::RegistryProxy;
+
+use crate::extension::{
+ ConvertToExtensionFactories, Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType,
+};
+
+// extension://0.0.0.0/?extension-type=registry&extension-name=nacos®istry-url=nacos://127.0.0.1:8848
+pub fn to_extension_url(registry_url: Url) -> Url {
+ let mut registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap();
+
+ let protocol = registry_url.protocol();
+
+ registry_extension_loader_url.add_query_param(ExtensionType::Registry);
+ registry_extension_loader_url.add_query_param(ExtensionName::new(protocol.to_string()));
+ registry_extension_loader_url.add_query_param(RegistryUrl::new(registry_url));
+
+ registry_extension_loader_url
+}
+
+pub type ServiceChange = Change<String, ()>;
+pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
+
+#[async_trait]
+pub trait Registry {
+ async fn register(&self, url: Url) -> Result<(), StdError>;
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError>;
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
+
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
+
+ fn url(&self) -> &Url;
+}
+
+impl<T> crate::extension::Sealed for T where T: Registry + Send + 'static {}
+
+impl<T> ExtensionMetaInfo for T
+where
+ T: Registry + Send + 'static,
+ T: Extension<Target = Box<dyn Registry + Send + 'static>>,
+{
+ fn extension_type() -> ExtensionType {
+ ExtensionType::Registry
+ }
+}
+
+impl<T> ConvertToExtensionFactories for T
+where
+ T: Registry + Send + 'static,
+ T: Extension<Target = Box<dyn Registry + Send + 'static>>,
+{
+ fn convert_to_extension_factories() -> ExtensionFactories {
+ fn constrain<F>(f: F) -> F
+ where
+ F: for<'a> Fn(
+ &'a Url,
+ ) -> Pin<
+ Box<
+ dyn Future<Output = Result<Box<dyn Registry + Send + 'static>, StdError>>
+ + Send
+ + 'a,
+ >,
+ >,
+ {
+ f
+ }
+
+ let constructor = constrain(|url: &Url| {
+ let f = <T as Extension>::create(url);
+ Box::pin(f)
+ });
+
+ ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(constructor))
+ }
+}
+
+#[derive(Default)]
+pub(super) struct RegistryExtensionLoader {
+ factories: HashMap<String, RegistryExtensionFactory>,
+}
+
+impl RegistryExtensionLoader {
+ pub(crate) async fn register(
+ &mut self,
+ extension_name: String,
+ factory: RegistryExtensionFactory,
+ ) {
+ self.factories.insert(extension_name, factory);
+ }
+
+ pub(crate) async fn remove(&mut self, extension_name: String) {
+ self.factories.remove(&extension_name);
+ }
+
+ pub(crate) async fn load(&mut self, url: &Url) -> Result<RegistryProxy, StdError> {
+ let extension_name = url.query::<ExtensionName>().unwrap();
+ let extension_name = extension_name.value();
+ let factory = self.factories.get_mut(&extension_name).ok_or_else(|| {
+ RegistryExtensionLoaderError::new(format!(
+ "registry extension loader error: extension name {} not found",
+ extension_name
+ ))
+ })?;
+ factory.create(url).await
+ }
+}
+
+type RegistryConstructor = for<'a> fn(
+ &'a Url,
+) -> Pin<
+ Box<dyn Future<Output = Result<Box<dyn Registry + Send + 'static>, StdError>> + Send + 'a>,
+>;
+
+pub(crate) struct RegistryExtensionFactory {
+ constructor: RegistryConstructor,
+ instances: HashMap<String, RegistryProxy>,
+}
+
+impl RegistryExtensionFactory {
+ pub(super) fn new(constructor: RegistryConstructor) -> Self {
+ Self {
+ constructor,
+ instances: HashMap::new(),
+ }
+ }
+}
+
+impl RegistryExtensionFactory {
+ pub(super) async fn create(&mut self, url: &Url) -> Result<RegistryProxy, StdError> {
+ let registry_url = url.query::<RegistryUrl>().unwrap();
+ let registry_url = registry_url.value();
+ let url_str = registry_url.as_str().to_string();
+ match self.instances.get(&url_str) {
+ Some(proxy) => {
+ let proxy = proxy.clone();
+ Ok(proxy)
+ }
+ None => {
+ let registry = (self.constructor)(url).await?;
+ let proxy = <RegistryProxy as From<Box<dyn Registry + Send>>>::from(registry);
+ self.instances.insert(url_str, proxy.clone());
+ Ok(proxy)
+ }
+ }
+ }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub(crate) struct RegistryExtensionLoaderError(String);
+
+impl RegistryExtensionLoaderError {
+ pub(crate) fn new(msg: String) -> Self {
+ RegistryExtensionLoaderError(msg)
+ }
+}
+
+pub mod proxy {
+ use async_trait::async_trait;
+ use thiserror::Error;
+ use tokio::sync::oneshot;
+
+ use dubbo_base::{StdError, Url};
+ use dubbo_logger::tracing::error;
+
+ use crate::extension::registry_extension::{DiscoverStream, Registry};
+
+ pub(super) enum RegistryOpt {
+ Register(Url, oneshot::Sender<Result<(), StdError>>),
+ Unregister(Url, oneshot::Sender<Result<(), StdError>>),
+ Subscribe(Url, oneshot::Sender<Result<DiscoverStream, StdError>>),
+ UnSubscribe(Url, oneshot::Sender<Result<(), StdError>>),
+ }
+
+ #[derive(Clone)]
+ pub struct RegistryProxy {
+ sender: tokio::sync::mpsc::Sender<RegistryOpt>,
+ url: Url,
+ }
+
+ #[async_trait]
+ impl Registry for RegistryProxy {
+ async fn register(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+
+ match self
+ .sender
+ .send(RegistryOpt::Register(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive register response failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive register response failed").into()
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send register request failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send register opt failed").into());
+ }
+ }
+ }
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+ match self
+ .sender
+ .send(RegistryOpt::Unregister(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive unregister response failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive unregister response failed").into(),
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send unregister request failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send unregister opt failed").into());
+ }
+ }
+ }
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+ let (tx, rx) = oneshot::channel();
+
+ match self
+ .sender
+ .send(RegistryOpt::Subscribe(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive subscribe response failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive subscribe response failed").into()
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send subscribe request failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send subscribe opt failed").into());
+ }
+ }
+ }
+
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+ match self
+ .sender
+ .send(RegistryOpt::UnSubscribe(url.clone(), tx))
+ .await
+ {
+ Ok(_) => {
+ match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!("registry proxy error: receive unsubscribe response failed, url: {}", url);
+ return Err(RegistryProxyError::new(
+ "receive unsubscribe response failed",
+ )
+ .into());
+ }
+ }
+ }
+ Err(_) => {
+ error!(
+ "registry proxy error: send unsubscribe request failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send unsubscribe opt failed").into());
+ }
+ }
+ }
+
+ fn url(&self) -> &Url {
+ &self.url
+ }
+ }
+
+ impl From<Box<dyn Registry + Send>> for RegistryProxy {
+ fn from(registry: Box<dyn Registry + Send>) -> Self {
+ let url = registry.url().clone();
+
+ let (sender, mut receiver) = tokio::sync::mpsc::channel(1024);
+
+ tokio::spawn(async move {
+ while let Some(opt) = receiver.recv().await {
+ match opt {
+ RegistryOpt::Register(url, tx) => {
+ let register = registry.register(url).await;
+ if let Err(_) = tx.send(register) {
+ error!("registry proxy error: send register response failed");
+ }
+ }
+ RegistryOpt::Unregister(url, tx) => {
+ let unregister = registry.unregister(url).await;
+ if let Err(_) = tx.send(unregister) {
+ error!("registry proxy error: send unregister response failed");
+ }
+ }
+ RegistryOpt::Subscribe(url, tx) => {
+ let subscribe = registry.subscribe(url).await;
+ if let Err(_) = tx.send(subscribe) {
+ error!("registry proxy error: send subscribe response failed");
+ }
+ }
+ RegistryOpt::UnSubscribe(url, tx) => {
+ let unsubscribe = registry.unsubscribe(url).await;
+ if let Err(_) = tx.send(unsubscribe) {
+ error!("registry proxy error: send unsubscribe response failed");
+ }
+ }
+ }
+ }
+ });
+
+ RegistryProxy { sender, url }
+ }
+ }
+
+ #[derive(Error, Debug)]
+ #[error("registry proxy error: {0}")]
+ pub(crate) struct RegistryProxyError(String);
+
+ impl RegistryProxyError {
+ pub(crate) fn new(msg: &str) -> Self {
+ RegistryProxyError(msg.to_string())
+ }
+ }
+}
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index 2666d25..f3c6dc1 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -15,20 +15,13 @@
* limitations under the License.
*/
-use std::{
- collections::HashMap,
- error::Error,
- pin::Pin,
- sync::{Arc, Mutex},
-};
+use std::{collections::HashMap, error::Error, pin::Pin};
use crate::{
+ extension,
+ extension::registry_extension::Registry,
protocol::{BoxExporter, Protocol},
- registry::{
- n_registry::{ArcRegistry, Registry},
- protocol::RegistryProtocol,
- types::{Registries, RegistriesOperation},
- },
+ registry::protocol::RegistryProtocol,
};
use dubbo_base::Url;
use dubbo_config::{get_global_config, protocol::ProtocolRetrieve, RootConfig};
@@ -40,7 +33,7 @@
#[derive(Default)]
pub struct Dubbo {
protocols: HashMap<String, Vec<Url>>,
- registries: Option<Registries>,
+ registries: Vec<Url>,
service_registry: HashMap<String, Vec<Url>>, // registry: Urls
config: Option<&'static RootConfig>,
}
@@ -49,7 +42,7 @@
pub fn new() -> Dubbo {
Self {
protocols: HashMap::new(),
- registries: None,
+ registries: Vec::default(),
service_registry: HashMap::new(),
config: None,
}
@@ -60,14 +53,10 @@
self
}
- pub fn add_registry(mut self, registry_key: &str, registry: ArcRegistry) -> Self {
- if self.registries.is_none() {
- self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
- }
- self.registries
- .as_ref()
- .unwrap()
- .insert(registry_key.to_string(), registry);
+ pub fn add_registry(mut self, registry: &str) -> Self {
+ let url: Url = registry.parse().unwrap();
+ let url = extension::registry_extension::to_extension_url(url);
+ self.registries.push(url);
self
}
@@ -88,10 +77,15 @@
let protocol = root_config
.protocols
.get_protocol_or_default(service_config.protocol.as_str());
- let protocol_url =
- format!("{}/{}", protocol.to_url(), service_config.interface.clone(),);
+ let interface_name = service_config.interface.clone();
+ let protocol_url = format!(
+ "{}/{}?interface={}",
+ protocol.to_url(),
+ interface_name,
+ interface_name
+ );
tracing::info!("protocol_url: {:?}", protocol_url);
- Url::from_url(&protocol_url)
+ protocol_url.parse().ok()
} else {
return Err(format!("base {:?} not exists", service_config.protocol).into());
};
@@ -117,9 +111,20 @@
self.init().unwrap();
tracing::info!("starting...");
// TODO: server registry
+
+ let mut registry_extensions = Vec::new();
+
+ for registry_url in &self.registries {
+ let registry_url = registry_url.clone();
+ let registry_extension = extension::EXTENSIONS.load_registry(registry_url).await;
+ if let Ok(registry_extension) = registry_extension {
+ registry_extensions.push(registry_extension);
+ }
+ }
+
let mem_reg = Box::new(
RegistryProtocol::new()
- .with_registries(self.registries.as_ref().unwrap().clone())
+ .with_registries(registry_extensions.clone())
.with_services(self.service_registry.clone()),
);
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
@@ -128,15 +133,10 @@
tracing::info!("base: {:?}, service url: {:?}", name, url);
let exporter = mem_reg.clone().export(url.to_owned());
async_vec.push(exporter);
+
//TODO multiple registry
- if self.registries.is_some() {
- let _ = self
- .registries
- .as_ref()
- .unwrap()
- .default_registry()
- .register(url.clone())
- .await;
+ for registry_extension in ®istry_extensions {
+ let _ = registry_extension.register(url.clone()).await;
}
}
}
diff --git a/dubbo/src/invoker/clone_body.rs b/dubbo/src/invoker/clone_body.rs
index 4de8f89..913910a 100644
--- a/dubbo/src/invoker/clone_body.rs
+++ b/dubbo/src/invoker/clone_body.rs
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
use std::{
collections::VecDeque,
pin::Pin,
@@ -8,13 +24,12 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_core::ready;
+use dubbo_base::StdError;
use http::HeaderMap;
use http_body::Body;
use pin_project::pin_project;
use thiserror::Error;
-use crate::StdError;
-
#[derive(Error, Debug)]
#[error("buffered body reach max capacity.")]
pub struct ReachMaxCapacityError;
diff --git a/dubbo/src/invoker/clone_invoker.rs b/dubbo/src/invoker/clone_invoker.rs
index c1fa00d..557d76e 100644
--- a/dubbo/src/invoker/clone_invoker.rs
+++ b/dubbo/src/invoker/clone_invoker.rs
@@ -1,5 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
use std::{mem, pin::Pin, task::Poll};
+use dubbo_base::StdError;
use dubbo_logger::tracing::debug;
use futures_core::{future::BoxFuture, ready, Future, TryFuture};
use futures_util::FutureExt;
@@ -16,8 +33,6 @@
use tower::{buffer::Buffer, ServiceExt};
use tower_service::Service;
-use crate::StdError;
-
use super::clone_body::CloneBody;
enum Inner<S> {
diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs
index 92b8b46..1c87c0e 100644
--- a/dubbo/src/invoker/mod.rs
+++ b/dubbo/src/invoker/mod.rs
@@ -1,5 +1,19 @@
-use dubbo_base::Url;
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
use crate::{codegen::TripleInvoker, invoker::clone_invoker::CloneInvoker, svc::NewService};
pub mod clone_body;
@@ -13,7 +27,7 @@
fn new_service(&self, url: String) -> Self::Service {
// todo create another invoker by url protocol
- let url = Url::from_url(&url).unwrap();
+ let url = url.parse().unwrap();
CloneInvoker::new(TripleInvoker::new(url))
}
}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index d397b42..1a521a2 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -19,6 +19,7 @@
pub mod codegen;
pub mod context;
pub mod directory;
+pub mod extension;
pub mod filter;
mod framework;
pub mod invocation;
@@ -38,7 +39,6 @@
pub use framework::Dubbo;
-pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type BoxFuture<T, E> = self::Pin<Box<dyn self::Future<Output = Result<T, E>> + Send + 'static>>;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, self::status::Status>;
diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs
index 4e26781..74f2217 100644
--- a/dubbo/src/loadbalancer/mod.rs
+++ b/dubbo/src/loadbalancer/mod.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use dubbo_base::StdError;
use futures_core::future::BoxFuture;
use tower::{discover::ServiceList, ServiceExt};
use tower_service::Service;
@@ -7,7 +24,6 @@
invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker},
param::Param,
svc::NewService,
- StdError,
};
use crate::protocol::triple::triple_invoker::TripleInvoker;
diff --git a/dubbo/src/param.rs b/dubbo/src/param.rs
index bef5041..298c3b3 100644
--- a/dubbo/src/param.rs
+++ b/dubbo/src/param.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
pub trait Param<T> {
fn param(&self) -> T;
}
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index c8451e3..cb6b08c 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -35,7 +35,7 @@
impl TripleInvoker {
pub fn new(url: Url) -> TripleInvoker {
- let uri = http::Uri::from_str(&url.to_url()).unwrap();
+ let uri = http::Uri::from_str(url.as_str()).unwrap();
Self {
url,
conn: Connection::new().with_host(uri).build(),
@@ -55,7 +55,7 @@
let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap();
- let authority = self.url.clone().get_ip_port();
+ let authority = self.url.authority();
let uri = Uri::builder()
.scheme("http")
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs
index 71f6edc..27174ba 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-use std::{boxed::Box, collections::HashMap};
+use std::collections::HashMap;
use async_trait::async_trait;
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
use super::{
triple_exporter::TripleExporter, triple_invoker::TripleInvoker, triple_server::TripleServer,
@@ -44,8 +44,9 @@
}
pub fn get_server(&self, url: Url) -> Option<TripleServer> {
+ let interface_name = url.query::<InterfaceName>().unwrap();
self.servers
- .get(&url.service_key)
+ .get(interface_name.value().as_str())
.map(|data| data.to_owned())
}
}
@@ -61,8 +62,12 @@
async fn export(mut self, url: Url) -> BoxExporter {
// service_key is same to key of TRIPLE_SERVICES
let server = TripleServer::new();
- self.servers.insert(url.service_key.clone(), server.clone());
- server.serve(url.short_url().as_str().into()).await;
+
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ self.servers.insert(interface_name, server.clone());
+ server.serve(url).await;
Box::new(TripleExporter::new())
}
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index b82fda8..08ae175 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -16,47 +16,43 @@
*/
#![allow(unused_variables, dead_code, missing_docs)]
+
+use crate::{extension, extension::registry_extension::proxy::RegistryProxy};
+use dubbo_base::{StdError, Url};
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tower_service::Service;
+
pub mod integration;
-pub mod n_registry;
pub mod protocol;
-pub mod types;
+pub mod registry;
-// use std::{
-// fmt::{Debug, Formatter},
-// sync::Arc,
-// };
+#[derive(Clone)]
+pub struct MkRegistryService {
+ registry_url: Url,
+}
-// use dubbo_base::Url;
+impl MkRegistryService {
+ pub fn new(registry_url: Url) -> Self {
+ Self { registry_url }
+ }
+}
-// pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 'static>;
-// pub trait Registry {
-// fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
-// fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;
+impl Service<()> for MkRegistryService {
+ type Response = RegistryProxy;
+ type Error = StdError;
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
-// fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), crate::StdError>;
-// fn unsubscribe(
-// &self,
-// url: Url,
-// listener: RegistryNotifyListener,
-// ) -> Result<(), crate::StdError>;
-// }
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
-// pub trait NotifyListener {
-// fn notify(&self, event: ServiceEvent);
-// fn notify_all(&self, event: ServiceEvent);
-// }
-
-// #[derive(Debug)]
-// pub struct ServiceEvent {
-// pub key: String,
-// pub action: String,
-// pub service: Vec<Url>,
-// }
-
-// pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
-
-// impl Debug for BoxRegistry {
-// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-// f.write_str("BoxRegistry")
-// }
-// }
+ fn call(&mut self, req: ()) -> Self::Future {
+ let fut = extension::EXTENSIONS.load_registry(self.registry_url.clone());
+ Box::pin(fut)
+ }
+}
diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs
deleted file mode 100644
index abcd56b..0000000
--- a/dubbo/src/registry/n_registry.rs
+++ /dev/null
@@ -1,203 +0,0 @@
-use std::{
- collections::{HashMap, HashSet},
- sync::Arc,
-};
-
-use async_trait::async_trait;
-use dubbo_base::Url;
-use thiserror::Error;
-use tokio::sync::{
- mpsc::{self, Receiver},
- Mutex,
-};
-use tower::discover::Change;
-
-use crate::StdError;
-
-pub type ServiceChange = Change<String, ()>;
-pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
-pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
-
-#[async_trait]
-pub trait Registry {
- async fn register(&self, url: Url) -> Result<(), StdError>;
-
- async fn unregister(&self, url: Url) -> Result<(), StdError>;
-
- async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
-
- async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
-}
-
-#[derive(Clone)]
-pub struct ArcRegistry {
- inner: Arc<dyn Registry + Send + Sync + 'static>,
-}
-
-pub enum RegistryComponent {
- NacosRegistry(ArcRegistry),
- ZookeeperRegistry,
- StaticRegistry(StaticRegistry),
-}
-
-pub struct StaticServiceValues {
- listeners: Vec<mpsc::Sender<Result<ServiceChange, StdError>>>,
- urls: HashSet<String>,
-}
-
-#[derive(Default)]
-pub struct StaticRegistry {
- urls: Mutex<HashMap<String, StaticServiceValues>>,
-}
-
-impl ArcRegistry {
- pub fn new(registry: impl Registry + Send + Sync + 'static) -> Self {
- Self {
- inner: Arc::new(registry),
- }
- }
-}
-
-#[async_trait]
-impl Registry for ArcRegistry {
- async fn register(&self, url: Url) -> Result<(), StdError> {
- self.inner.register(url).await
- }
-
- async fn unregister(&self, url: Url) -> Result<(), StdError> {
- self.inner.unregister(url).await
- }
-
- async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
- self.inner.subscribe(url).await
- }
-
- async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
- self.inner.unsubscribe(url).await
- }
-}
-
-#[async_trait]
-impl Registry for RegistryComponent {
- async fn register(&self, url: Url) -> Result<(), StdError> {
- todo!()
- }
-
- async fn unregister(&self, url: Url) -> Result<(), StdError> {
- todo!()
- }
-
- async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
- match self {
- RegistryComponent::NacosRegistry(registry) => registry.subscribe(url).await,
- RegistryComponent::ZookeeperRegistry => todo!(),
- RegistryComponent::StaticRegistry(registry) => registry.subscribe(url).await,
- }
- }
-
- async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
- todo!()
- }
-}
-
-impl StaticRegistry {
- pub fn new(urls: Vec<Url>) -> Self {
- let mut map = HashMap::with_capacity(urls.len());
-
- for url in urls {
- let service_name = url.get_service_name();
- let static_values = map
- .entry(service_name)
- .or_insert_with(|| StaticServiceValues {
- listeners: Vec::new(),
- urls: HashSet::new(),
- });
- let url = url.to_string();
- static_values.urls.insert(url.clone());
- }
-
- Self {
- urls: Mutex::new(map),
- }
- }
-}
-
-#[async_trait]
-impl Registry for StaticRegistry {
- async fn register(&self, url: Url) -> Result<(), StdError> {
- let service_name = url.get_service_name();
- let mut lock = self.urls.lock().await;
-
- let static_values = lock
- .entry(service_name)
- .or_insert_with(|| StaticServiceValues {
- listeners: Vec::new(),
- urls: HashSet::new(),
- });
- let url = url.to_string();
- static_values.urls.insert(url.clone());
-
- static_values.listeners.retain(|listener| {
- let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
- ret.is_ok()
- });
-
- Ok(())
- }
-
- async fn unregister(&self, url: Url) -> Result<(), StdError> {
- let service_name = url.get_service_name();
- let mut lock = self.urls.lock().await;
-
- match lock.get_mut(&service_name) {
- None => Ok(()),
- Some(static_values) => {
- let url = url.to_string();
- static_values.urls.remove(&url);
- static_values.listeners.retain(|listener| {
- let ret = listener.try_send(Ok(ServiceChange::Remove(url.clone())));
- ret.is_ok()
- });
- if static_values.urls.is_empty() {
- lock.remove(&service_name);
- }
- Ok(())
- }
- }
- }
-
- async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
- let service_name = url.get_service_name();
-
- let change_rx = {
- let mut lock = self.urls.lock().await;
- let static_values = lock
- .entry(service_name)
- .or_insert_with(|| StaticServiceValues {
- listeners: Vec::new(),
- urls: HashSet::new(),
- });
-
- let (tx, change_rx) = mpsc::channel(64);
- static_values.listeners.push(tx);
-
- for url in static_values.urls.iter() {
- static_values.listeners.retain(|listener| {
- let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
- ret.is_ok()
- });
- }
- change_rx
- };
-
- Ok(change_rx)
- }
-
- async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
- Ok(())
- }
-}
-
-#[derive(Error, Debug)]
-#[error("static registry error: {0}")]
-struct StaticRegistryError(String);
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index b9ba722..76350a4 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -15,26 +15,25 @@
* limitations under the License.
*/
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
use dubbo_logger::tracing;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
-use super::n_registry::{ArcRegistry, Registry, StaticRegistry};
use crate::{
+ extension::registry_extension::{proxy::RegistryProxy, Registry},
protocol::{
triple::{triple_exporter::TripleExporter, triple_protocol::TripleProtocol},
BoxExporter, BoxInvoker, Protocol,
},
- registry::types::Registries,
};
#[derive(Clone, Default)]
pub struct RegistryProtocol {
// registerAddr: Registry
- registries: Option<Registries>,
+ registries: Vec<RegistryProxy>,
// providerUrl: Exporter
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
// serviceName: registryUrls
@@ -44,14 +43,14 @@
impl RegistryProtocol {
pub fn new() -> Self {
RegistryProtocol {
- registries: None,
+ registries: Vec::default(),
exporters: Arc::new(RwLock::new(HashMap::new())),
services: HashMap::new(),
}
}
- pub fn with_registries(mut self, registries: Registries) -> Self {
- self.registries = Some(registries);
+ pub fn with_registries(mut self, registries: Vec<RegistryProxy>) -> Self {
+ self.registries.extend(registries);
self
}
@@ -59,19 +58,6 @@
self.services.extend(services);
self
}
-
- pub fn get_registry(&mut self, url: Url) -> ArcRegistry {
- let mem = StaticRegistry::default();
- let mem = ArcRegistry::new(mem);
- self.registries
- .as_ref()
- .unwrap()
- .lock()
- .unwrap()
- .insert(url.location, mem.clone());
-
- mem
- }
}
#[async_trait::async_trait]
@@ -88,23 +74,23 @@
// init Exporter based on provider_url
// server registry based on register_url
// start server health check
- let registry_url = self.services.get(url.get_service_name().as_str());
+ let service_name = url.query::<InterfaceName>().unwrap();
+ let registry_url = self.services.get(service_name.as_str().as_ref());
if let Some(urls) = registry_url {
- for url in urls.clone().iter() {
- if !url.service_key.is_empty() {
- let reg = self.get_registry(url.clone());
- let _ = reg.register(url.clone()).await;
+ for url in urls.iter() {
+ for registry_proxy in &self.registries {
+ let _ = registry_proxy.register(url.clone()).await;
}
}
}
- match url.clone().scheme.as_str() {
+ match url.clone().protocol() {
"tri" => {
let pro = Box::new(TripleProtocol::new());
return pro.export(url).await;
}
_ => {
- tracing::error!("base {:?} not implemented", url.scheme);
+ tracing::error!("base {:?} not implemented", url.protocol());
Box::new(TripleExporter::new())
}
}
diff --git a/dubbo/src/registry/registry.rs b/dubbo/src/registry/registry.rs
new file mode 100644
index 0000000..85a8168
--- /dev/null
+++ b/dubbo/src/registry/registry.rs
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::collections::{HashMap, HashSet};
+
+use async_trait::async_trait;
+use itertools::Itertools;
+use thiserror::Error;
+use tokio::sync::{
+ mpsc::{self},
+ Mutex,
+};
+
+use dubbo_base::{
+ extension_param::{ExtensionName, ExtensionType},
+ registry_param::{InterfaceName, RegistryUrl, StaticInvokerUrls},
+ url::UrlParam,
+ StdError, Url,
+};
+
+use crate::extension::{
+ registry_extension::{DiscoverStream, Registry, ServiceChange},
+ Extension,
+};
+
+pub struct StaticServiceValues {
+ listeners: Vec<mpsc::Sender<Result<ServiceChange, StdError>>>,
+ urls: HashSet<String>,
+}
+
+pub struct StaticRegistry {
+ urls: Mutex<HashMap<String, StaticServiceValues>>,
+ self_url: Url,
+}
+
+impl StaticRegistry {
+ pub fn to_extension_url(static_invoker_urls: Vec<Url>) -> Url {
+ let static_invoker_urls: StaticInvokerUrls =
+ static_invoker_urls.iter().join(",").parse().unwrap();
+ let mut static_registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap();
+
+ static_registry_extension_loader_url.add_query_param(ExtensionType::Registry);
+ static_registry_extension_loader_url.add_query_param(ExtensionName::new(Self::name()));
+ static_registry_extension_loader_url
+ .add_query_param(RegistryUrl::new("static://127.0.0.1".parse().unwrap()));
+ static_registry_extension_loader_url.add_query_param(static_invoker_urls);
+
+ static_registry_extension_loader_url
+ }
+}
+
+impl StaticRegistry {
+ pub fn new(url: Url) -> Self {
+ let static_urls = url.query::<StaticInvokerUrls>();
+ let static_urls = match static_urls {
+ None => Vec::default(),
+ Some(static_urls) => static_urls.value(),
+ };
+
+ let mut map = HashMap::with_capacity(static_urls.len());
+
+ for url in static_urls {
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ let static_values = map
+ .entry(interface_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+ let url = url.to_string();
+ static_values.urls.insert(url.clone());
+ }
+
+ let self_url = "static://0.0.0.0".parse().unwrap();
+
+ Self {
+ urls: Mutex::new(map),
+ self_url,
+ }
+ }
+}
+
+impl Default for StaticRegistry {
+ fn default() -> Self {
+ let self_url = "static://0.0.0.0".parse().unwrap();
+
+ Self {
+ self_url,
+ urls: Mutex::new(HashMap::new()),
+ }
+ }
+}
+#[async_trait]
+impl Registry for StaticRegistry {
+ async fn register(&self, url: Url) -> Result<(), StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ let mut lock = self.urls.lock().await;
+
+ let static_values = lock
+ .entry(interface_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+ let url = url.to_string();
+ static_values.urls.insert(url.clone());
+
+ static_values.listeners.retain(|listener| {
+ let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
+ ret.is_ok()
+ });
+
+ Ok(())
+ }
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ let mut lock = self.urls.lock().await;
+
+ match lock.get_mut(&interface_name) {
+ None => Ok(()),
+ Some(static_values) => {
+ let url = url.to_string();
+ static_values.urls.remove(&url);
+ static_values.listeners.retain(|listener| {
+ let ret = listener.try_send(Ok(ServiceChange::Remove(url.clone())));
+ ret.is_ok()
+ });
+ if static_values.urls.is_empty() {
+ lock.remove(&interface_name);
+ }
+ Ok(())
+ }
+ }
+ }
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap();
+ let interface_name = interface_name.value();
+
+ let change_rx = {
+ let mut lock = self.urls.lock().await;
+ let static_values = lock
+ .entry(interface_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+
+ let (tx, change_rx) = mpsc::channel(64);
+ static_values.listeners.push(tx);
+
+ for listener in &static_values.listeners {
+ for url in &static_values.urls {
+ let _ = listener
+ .send(Ok(ServiceChange::Insert(url.clone(), ())))
+ .await;
+ }
+ }
+
+ change_rx
+ };
+
+ Ok(change_rx)
+ }
+
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+ Ok(())
+ }
+
+ fn url(&self) -> &Url {
+ &self.self_url
+ }
+}
+
+#[async_trait::async_trait]
+impl Extension for StaticRegistry {
+ type Target = Box<dyn Registry + Send + 'static>;
+
+ fn name() -> String {
+ "static".to_string()
+ }
+
+ async fn create(url: &Url) -> Result<Self::Target, StdError> {
+ // url example:
+ // extension://0.0.0.0?extension-type=registry&extension-name=static®istry=static://127.0.0.1
+ let static_invoker_urls = url.query::<StaticInvokerUrls>();
+
+ let registry_url = url.query::<RegistryUrl>().unwrap();
+ let mut registry_url = registry_url.value();
+
+ if let Some(static_invoker_urls) = static_invoker_urls {
+ registry_url.add_query_param(static_invoker_urls);
+ }
+
+ let static_registry = StaticRegistry::new(registry_url);
+
+ Ok(Box::new(static_registry))
+ }
+}
+#[derive(Error, Debug)]
+#[error("static registry error: {0}")]
+struct StaticRegistryError(String);
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
deleted file mode 100644
index 5c1687d..0000000
--- a/dubbo/src/registry/types.rs
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use std::{
- collections::HashMap,
- sync::{Arc, Mutex},
-};
-
-use itertools::Itertools;
-
-use super::n_registry::ArcRegistry;
-
-pub type Registries = Arc<Mutex<HashMap<String, ArcRegistry>>>;
-
-pub const DEFAULT_REGISTRY_KEY: &str = "default";
-
-pub trait RegistriesOperation {
- fn get(&self, registry_key: &str) -> ArcRegistry;
- fn insert(&self, registry_key: String, registry: ArcRegistry);
- fn default_registry(&self) -> ArcRegistry;
-}
-
-impl RegistriesOperation for Registries {
- fn get(&self, registry_key: &str) -> ArcRegistry {
- self.as_ref()
- .lock()
- .unwrap()
- .get(registry_key)
- .unwrap()
- .clone()
- }
-
- fn insert(&self, registry_key: String, registry: ArcRegistry) {
- self.as_ref().lock().unwrap().insert(registry_key, registry);
- }
-
- fn default_registry(&self) -> ArcRegistry {
- let guard = self.as_ref().lock().unwrap();
- let (_, result) = guard
- .iter()
- .find_or_first(|e| e.0 == DEFAULT_REGISTRY_KEY)
- .unwrap()
- .to_owned();
- result.clone()
- }
-}
diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs
index c244864..28dfda7 100644
--- a/dubbo/src/route/mod.rs
+++ b/dubbo/src/route/mod.rs
@@ -1,5 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use std::pin::Pin;
+use dubbo_base::StdError;
use dubbo_logger::tracing::debug;
use futures_core::{ready, Future};
use futures_util::{future::Ready, FutureExt, TryFutureExt};
@@ -11,7 +29,6 @@
invoker::clone_invoker::CloneInvoker,
param::Param,
svc::NewService,
- StdError,
};
pub struct NewRoutes<N> {
@@ -20,6 +37,7 @@
pub struct NewRoutesFuture<S, T> {
inner: RoutesFutureInnerState<S>,
+ #[allow(dead_code)]
target: T,
}
@@ -39,6 +57,7 @@
#[derive(Clone)]
pub struct Routes<T> {
+ #[allow(dead_code)]
target: T,
invokers: Vec<CloneInvoker<TripleInvoker>>,
}
diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs
index db59b92..f846663 100644
--- a/dubbo/src/svc.rs
+++ b/dubbo/src/svc.rs
@@ -1,4 +1,20 @@
-use std::{marker::PhantomData, sync::Arc};
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::sync::Arc;
pub trait NewService<T> {
type Service;
@@ -45,32 +61,3 @@
self.inner.new_service(t)
}
}
-
-// inner: Box<dyn Service<T, Response = U, Error = E, Future = Pin<Box<dyn Future<Output = Result<U, E>> + Send>>> + Send>,
-pub struct BoxedService<N, R> {
- inner: N,
- _mark: PhantomData<R>,
-}
-
-impl<R, N> BoxedService<N, R> {
- pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
- tower_layer::layer_fn(|inner: N| Self {
- inner,
- _mark: PhantomData,
- })
- }
-}
-
-// impl<T, R, N> NewService<T> for BoxedService<N, R>
-// where
-// N: NewService<T>,
-// N::Service: Service<R> + Send,
-// <N::Service as Service<R>>::Future: Send + 'static,
-// {
-// type Service = Box<dyn Service<R, Response = <N::Service as Service<R>>::Response, Error = <N::Service as Service<R>>::Error, Future = Pin<Box<dyn Future<Output = Result<<N::Service as Service<R>>::Response, <N::Service as Service<R>>::Error>> + Send>>> + Send>;
-
-// fn new_service(&self, target: T) -> Self::Service {
-// let service = self.inner.new_service(target);
-// Box::new(service.map_future(|f|Box::pin(f) as _))
-// }
-// }
diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs
index 0dadbcc..94c855b 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -18,14 +18,11 @@
use std::sync::Arc;
use crate::{
- cluster::NewCluster,
- directory::NewCachedDirectory,
- loadbalancer::NewLoadBalancer,
- registry::n_registry::{ArcRegistry, RegistryComponent, StaticRegistry},
- route::NewRoutes,
- utils::boxed_clone::BoxCloneService,
+ cluster::NewCluster, directory::NewCachedDirectory, extension, loadbalancer::NewLoadBalancer,
+ route::NewRoutes, utils::boxed_clone::BoxCloneService,
};
+use crate::registry::{registry::StaticRegistry, MkRegistryService};
use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
use tower::ServiceBuilder;
@@ -33,15 +30,15 @@
pub type ClientBoxService =
BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, crate::Error>;
-pub type ServiceMK = Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>;
+pub type ServiceMK =
+ Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<MkRegistryService>>>>>;
#[derive(Default)]
pub struct ClientBuilder {
pub timeout: Option<u64>,
pub connector: &'static str,
- registry: Option<ArcRegistry>,
+ registry_extension_url: Option<Url>,
pub direct: bool,
- host: String,
}
impl ClientBuilder {
@@ -49,22 +46,18 @@
ClientBuilder {
timeout: None,
connector: "",
- registry: None,
+ registry_extension_url: None,
direct: false,
- host: "".to_string(),
}
}
pub fn from_static(host: &str) -> ClientBuilder {
+ let registry_extension_url = StaticRegistry::to_extension_url(vec![host.parse().unwrap()]);
Self {
timeout: None,
connector: "",
- registry: Some(ArcRegistry::new(StaticRegistry::new(vec![Url::from_url(
- host,
- )
- .unwrap()]))),
+ registry_extension_url: Some(registry_extension_url),
direct: true,
- host: host.to_string(),
}
}
@@ -75,19 +68,19 @@
}
}
- pub fn with_registry(self, registry: ArcRegistry) -> Self {
+ pub fn with_registry(self, registry: Url) -> Self {
+ let registry_extension_url = extension::registry_extension::to_extension_url(registry);
Self {
- registry: Some(registry),
+ registry_extension_url: Some(registry_extension_url),
..self
}
}
pub fn with_host(self, host: &'static str) -> Self {
+ let registry_extension_url = StaticRegistry::to_extension_url(vec![host.parse().unwrap()]);
+
Self {
- registry: Some(ArcRegistry::new(StaticRegistry::new(vec![Url::from_url(
- host,
- )
- .unwrap()]))),
+ registry_extension_url: Some(registry_extension_url),
..self
}
}
@@ -101,14 +94,17 @@
}
pub fn build(mut self) -> ServiceMK {
- let registry = self.registry.take().expect("registry must not be empty");
+ let registry = self
+ .registry_extension_url
+ .take()
+ .expect("registry must not be empty");
let mk_service = ServiceBuilder::new()
.layer(NewCluster::layer())
.layer(NewLoadBalancer::layer())
.layer(NewRoutes::layer())
.layer(NewCachedDirectory::layer())
- .service(registry);
+ .service(MkRegistryService::new(registry));
Arc::new(mk_service)
}
diff --git a/dubbo/src/triple/server/builder.rs b/dubbo/src/triple/server/builder.rs
index 9ef7152..b473dd8 100644
--- a/dubbo/src/triple/server/builder.rs
+++ b/dubbo/src/triple/server/builder.rs
@@ -21,7 +21,7 @@
str::FromStr,
};
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
use dubbo_logger::tracing;
use http::{Request, Response, Uri};
use hyper::body::Body;
@@ -132,7 +132,7 @@
impl From<Url> for ServerBuilder {
fn from(u: Url) -> Self {
- let uri = match http::Uri::from_str(&u.raw_url_string()) {
+ let uri = match http::Uri::from_str(&u.as_str()) {
Ok(v) => v,
Err(err) => {
tracing::error!("http uri parse error: {}, url: {:?}", err, &u);
@@ -142,10 +142,14 @@
let authority = uri.authority().unwrap();
+ let service_name = u.query::<InterfaceName>().unwrap().value();
+
Self {
- listener: u.get_param("listener").unwrap_or("tcp".to_string()),
+ listener: u
+ .query_param_by_key("listener")
+ .unwrap_or("tcp".to_string()),
addr: authority.to_string().to_socket_addrs().unwrap().next(),
- service_names: vec![u.service_name],
+ service_names: vec![service_name],
server: DubboServer::default(),
certs: Vec::new(),
keys: Vec::new(),
diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs
index cb0b9d7..212c880 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -15,12 +15,11 @@
* limitations under the License.
*/
+use dubbo_base::StdError;
use hyper::client::{conn::Builder, service::Connect};
use tower_service::Service;
-use crate::{
- boxed, invoker::clone_body::CloneBody, triple::transport::connector::get_connector, StdError,
-};
+use crate::{boxed, invoker::clone_body::CloneBody, triple::transport::connector::get_connector};
type HyperConnect = Connect<
crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, StdError>,
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index 0a2f150..18939a2 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -34,9 +34,10 @@
// let builder = ClientBuilder::new()
// .with_connector("unix")
// .with_host("unix://127.0.0.1:8888");
- let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888")
- .with_timeout(1000000)
- .with_direct(true);
+ let builder =
+ ClientBuilder::from_static(&"http://127.0.0.1:8888?interface=grpc.examples.echo.Echo")
+ .with_timeout(1000000)
+ .with_direct(true);
let mut cli = EchoClient::new(builder);
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs
index 90efc1a..010821e 100644
--- a/examples/echo/src/echo/server.rs
+++ b/examples/echo/src/echo/server.rs
@@ -70,7 +70,8 @@
// Dubbo::new()
// .with_config({
// let mut r = RootConfig::new();
- // r.test_config();
+ // r.test_config
+ // ();
// r
// })
// .start()
diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs
index cb92ed0..14743ae 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -20,11 +20,9 @@
include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
}
-use std::env;
+use dubbo::codegen::*;
-use dubbo::{codegen::*, registry::n_registry::ArcRegistry};
-
-use dubbo_base::Url;
+use dubbo::extension;
use futures_util::StreamExt;
use protos::{greeter_client::GreeterClient, GreeterRequest};
use registry_nacos::NacosRegistry;
@@ -33,9 +31,9 @@
async fn main() {
dubbo_logger::init();
- let builder = ClientBuilder::new().with_registry(ArcRegistry::new(NacosRegistry::new(
- Url::from_url("nacos://127.0.0.1:8848").unwrap(),
- )));
+ let _ = extension::EXTENSIONS.register::<NacosRegistry>().await;
+
+ let builder = ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap());
let mut cli = GreeterClient::new(builder);
diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs
index fd436e5..a652ed8 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -18,13 +18,12 @@
use std::{io::ErrorKind, pin::Pin};
use async_trait::async_trait;
-use dubbo_base::Url;
use futures_util::{Stream, StreamExt};
use registry_nacos::NacosRegistry;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use dubbo::{codegen::*, registry::n_registry::ArcRegistry, Dubbo};
+use dubbo::{codegen::*, extension, Dubbo};
use dubbo_config::RootConfig;
use dubbo_logger::{
tracing::{info, span},
@@ -34,8 +33,6 @@
greeter_server::{register_server, Greeter},
GreeterReply, GreeterRequest,
};
-use registry_zookeeper::ZookeeperRegistry;
-
pub mod protos {
#![allow(non_camel_case_types)]
include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
@@ -59,10 +56,10 @@
Err(_err) => panic!("err: {:?}", _err), // response was droped
};
- let nacos_registry = NacosRegistry::new(Url::from_url("nacos://127.0.0.1:8848").unwrap());
+ let _ = extension::EXTENSIONS.register::<NacosRegistry>().await;
let mut f = Dubbo::new()
.with_config(r)
- .add_registry("nacos-registry", ArcRegistry::new(nacos_registry));
+ .add_registry("nacos://127.0.0.1:8848/");
f.start().await;
}
diff --git a/protocol/base/src/invoker.rs b/protocol/base/src/invoker.rs
index 14de6ce..d676fa4 100644
--- a/protocol/base/src/invoker.rs
+++ b/protocol/base/src/invoker.rs
@@ -66,9 +66,9 @@
impl Display for BaseInvoker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Invoker")
- .field("protocol", &self.url.scheme)
- .field("host", &self.url.ip)
- .field("path", &self.url.location)
+ .field("protocol", &self.url.protocol())
+ .field("host", &self.url.host())
+ .field("path", &self.url.path())
.finish()
}
}
diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs
index ad34237..204846b 100644
--- a/registry/nacos/src/lib.rs
+++ b/registry/nacos/src/lib.rs
@@ -14,84 +14,47 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-mod utils;
use async_trait::async_trait;
-use dubbo_base::Url;
+use dubbo_base::{StdError, Url};
use std::{collections::HashMap, sync::Arc};
-use tokio::{select, sync::mpsc};
+use tokio::sync::mpsc;
-use anyhow::anyhow;
-use dubbo::{
- registry::n_registry::{DiscoverStream, Registry, ServiceChange},
- StdError,
+use dubbo::extension::{
+ registry_extension::{DiscoverStream, Registry, ServiceChange},
+ Extension,
};
-use dubbo_logger::tracing::{debug, error, info};
-use nacos_sdk::api::naming::{
- NamingEventListener, NamingService, NamingServiceBuilder, ServiceInstance,
+use dubbo_base::{
+ registry_param::{
+ AppName, Category, Group, InterfaceName, RegistryUrl, ServiceNamespace, Version,
+ },
+ url::UrlParam,
};
-
-use crate::utils::{build_nacos_client_props, is_concrete_str, is_wildcard_str, match_range};
-
-const VERSION_KEY: &str = "version";
-
-const GROUP_KEY: &str = "group";
-
-const DEFAULT_GROUP: &str = "DEFAULT_GROUP";
-
-const PROVIDER_SIDE: &str = "provider";
-
-const DEFAULT_CATEGORY: &str = PROVIDERS_CATEGORY;
-
-const SIDE_KEY: &str = "side";
-
-const REGISTER_CONSUMER_URL_KEY: &str = "register-consumer-url";
-
-const SERVICE_NAME_SEPARATOR: &str = ":";
-
-const CATEGORY_KEY: &str = "category";
-
-const PROVIDERS_CATEGORY: &str = "providers";
-
-#[allow(dead_code)]
-const ADMIN_PROTOCOL: &str = "admin";
-
-#[allow(dead_code)]
-const INNERCLASS_SYMBOL: &str = "$";
-
-#[allow(dead_code)]
-const INNERCLASS_COMPATIBLE_SYMBOL: &str = "___";
+use dubbo_logger::tracing::info;
+use nacos_sdk::api::{
+ naming::{NamingEventListener, NamingService, NamingServiceBuilder, ServiceInstance},
+ props::ClientProps,
+};
+use tokio::sync::{watch, Notify};
pub struct NacosRegistry {
- nacos_naming_service: Arc<dyn NamingService + Sync + Send + 'static>,
+ url: Url,
+ nacos_service: Arc<dyn NamingService + Send + Sync>,
}
impl NacosRegistry {
- pub fn new(url: Url) -> Self {
- let (nacos_client_props, enable_auth) = build_nacos_client_props(&url);
-
- let mut nacos_naming_builder = NamingServiceBuilder::new(nacos_client_props);
-
- if enable_auth {
- nacos_naming_builder = nacos_naming_builder.enable_auth_plugin_http();
- }
-
- let nacos_naming_service = nacos_naming_builder.build().unwrap();
-
- Self {
- nacos_naming_service: Arc::new(nacos_naming_service),
- }
+ pub fn new(url: Url, nacos_service: Arc<dyn NamingService + Send + Sync>) -> Self {
+ Self { url, nacos_service }
}
-}
-impl NacosRegistry {
- fn create_nacos_service_instance(url: Url) -> ServiceInstance {
- let ip = url.ip;
- let port = url.port;
- nacos_sdk::api::naming::ServiceInstance {
- ip,
- port: port.parse().unwrap(),
- metadata: url.params,
+ fn create_nacos_service_instance(url: &Url) -> ServiceInstance {
+ let ip = url.host().unwrap();
+ let port = url.port().unwrap();
+
+ ServiceInstance {
+ ip: ip.to_string(),
+ port: port.into(),
+ metadata: url.all_query_params(),
..Default::default()
}
}
@@ -144,296 +107,286 @@
#[async_trait]
impl Registry for NacosRegistry {
- async fn register(&self, url: Url) -> Result<(), dubbo::StdError> {
- // let side = url.get_param(SIDE_KEY).unwrap_or_default();
- // let register_consumer = url
- // .get_param(REGISTER_CONSUMER_URL_KEY)
- // .unwrap_or_else(|| false.to_string())
- // .parse::<bool>()
- // .unwrap_or(false);
- // if side.ne(PROVIDER_SIDE) && !register_consumer {
- // warn!("Please set 'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url registration.");
- // return Ok(());
- // }
+ async fn register(&self, url: Url) -> Result<(), StdError> {
+ let service_name = NacosServiceName::new(&url);
- let nacos_service_name = NacosServiceName::new(&url);
+ let group_name = service_name.group();
- let group_name = Some(
- nacos_service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- );
- let nacos_service_name = nacos_service_name.to_register_str();
+ let registry_service_name_str = service_name.value();
- let nacos_service_instance = Self::create_nacos_service_instance(url);
+ let service_instance = Self::create_nacos_service_instance(&url);
- info!("register service: {}", nacos_service_name);
- let ret = self
- .nacos_naming_service
- .register_instance(nacos_service_name, group_name, nacos_service_instance)
- .await;
- if let Err(e) = ret {
- error!("register to nacos occur an error: {:?}", e);
- return Err(anyhow!("register to nacos occur an error: {:?}", e).into());
- }
+ self.nacos_service
+ .register_instance(
+ registry_service_name_str.to_owned(),
+ Some(group_name.to_owned()),
+ service_instance,
+ )
+ .await?;
Ok(())
}
- async fn unregister(&self, url: Url) -> Result<(), dubbo::StdError> {
- let nacos_service_name = NacosServiceName::new(&url);
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let service_name = NacosServiceName::new(&url);
- let group_name = Some(
- nacos_service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- );
- let nacos_service_name = nacos_service_name.to_register_str();
+ let group_name = service_name.group();
- let nacos_service_instance = Self::create_nacos_service_instance(url);
+ let registry_service_name_str = service_name.value();
- info!("deregister service: {}", nacos_service_name);
+ let service_instance = Self::create_nacos_service_instance(&url);
- let ret = self
- .nacos_naming_service
- .deregister_instance(nacos_service_name, group_name, nacos_service_instance)
- .await;
- if let Err(e) = ret {
- error!("deregister service from nacos occur an error: {:?}", e);
- return Err(anyhow!("deregister service from nacos occur an error: {:?}", e).into());
- }
+ self.nacos_service
+ .deregister_instance(
+ registry_service_name_str.to_owned(),
+ Some(group_name.to_owned()),
+ service_instance,
+ )
+ .await?;
+
Ok(())
}
async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
let service_name = NacosServiceName::new(&url);
- let service_group = service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string();
- let subscriber_url = service_name.to_subscriber_str();
- info!("subscribe: {}", subscriber_url);
- let (listener, mut change_receiver) = ServiceChangeListener::new();
- let arc_listener = Arc::new(listener);
+ let group_name = service_name.group().to_owned();
- let (discover_tx, discover_rx) = mpsc::channel(64);
-
- let nacos_naming_service = self.nacos_naming_service.clone();
-
- let listener_in_task = arc_listener.clone();
- let service_group_in_task = service_group.clone();
- let subscriber_url_in_task = subscriber_url.clone();
- tokio::spawn(async move {
- let listener = listener_in_task;
- let service_group = service_group_in_task;
- let subscriber_url = subscriber_url_in_task;
-
- let mut current_instances = Vec::new();
- loop {
- let change = select! {
- _ = discover_tx.closed() => {
- debug!("service {} change task quit, unsubscribe.", subscriber_url);
- None
- },
- change = change_receiver.recv() => change
- };
-
- match change {
- Some(instances) => {
- debug!("service {} changed", subscriber_url);
- let (remove_instances, add_instances) =
- NacosRegistry::diff(¤t_instances, &instances);
-
- for instance in remove_instances {
- let service_name = instance.service_name.as_ref();
- let url = match service_name {
- None => {
- format!("triple://{}:{}", instance.ip(), instance.port())
- }
- Some(service_name) => {
- format!(
- "triple://{}:{}/{}",
- instance.ip(),
- instance.port(),
- service_name
- )
- }
- };
-
- match discover_tx.send(Ok(ServiceChange::Remove(url))).await {
- Ok(_) => {}
- Err(e) => {
- error!(
- "send service change failed: {:?}, maybe user unsubscribe",
- e
- );
- break;
- }
- }
- }
-
- for instance in add_instances {
- let service_name = instance.service_name.as_ref();
- let url = match service_name {
- None => {
- format!("triple://{}:{}", instance.ip(), instance.port())
- }
- Some(service_name) => {
- format!(
- "triple://{}:{}/{}",
- instance.ip(),
- instance.port(),
- service_name
- )
- }
- };
-
- match discover_tx.send(Ok(ServiceChange::Insert(url, ()))).await {
- Ok(_) => {}
- Err(e) => {
- error!(
- "send service change failed: {:?}, maybe user unsubscribe",
- e
- );
- break;
- }
- }
- }
- current_instances = instances;
- }
- None => {
- error!(
- "receive service change task quit, unsubscribe {}.",
- subscriber_url
- );
- break;
- }
- }
- }
-
- debug!("unsubscribe service: {}", subscriber_url);
- // unsubscribe
- let unsubscribe = nacos_naming_service
- .unsubscribe(subscriber_url, Some(service_group), Vec::new(), listener)
- .await;
-
- match unsubscribe {
- Ok(_) => {}
- Err(e) => {
- error!("unsubscribe service failed: {:?}", e);
- }
- }
- });
+ let registry_service_name_str = service_name.value().to_owned();
let all_instance = self
- .nacos_naming_service
+ .nacos_service
.get_all_instances(
- subscriber_url.clone(),
- Some(service_group.clone()),
- Vec::new(),
+ registry_service_name_str.to_owned(),
+ Some(group_name.to_owned()),
+ Vec::default(),
false,
)
.await?;
- let _ = arc_listener.changed(all_instance);
- match self
- .nacos_naming_service
- .subscribe(
- subscriber_url.clone(),
- Some(service_group.clone()),
- Vec::new(),
- arc_listener,
- )
- .await
- {
- Ok(_) => {}
- Err(e) => {
- error!("subscribe service failed: {:?}", e);
- return Err(anyhow!("subscribe service failed: {:?}", e).into());
+ let (tx, rx) = mpsc::channel(1024);
+
+ let (event_listener, mut listener_change_rx, closed) = NacosNamingEventListener::new();
+ let event_listener = Arc::new(event_listener);
+
+ let nacos_service_cloned = self.nacos_service.clone();
+ let event_listener_cloned = event_listener.clone();
+ let registry_service_name_str_clone = registry_service_name_str.clone();
+ let group_name_clone = group_name.clone();
+ tokio::spawn(async move {
+ let mut current_instances = all_instance;
+ for instance in ¤t_instances {
+ let url = instance_to_url(instance).as_str().to_owned();
+ let _ = tx.send(Ok(ServiceChange::Insert(url, ()))).await;
}
- }
- Ok(discover_rx)
+ loop {
+ let change = tokio::select! {
+ _ = closed.notified() => {
+ break;
+ },
+ change = listener_change_rx.changed() => change
+ };
+
+ if change.is_err() {
+ break;
+ }
+
+ let change = listener_change_rx.borrow_and_update().clone();
+ if change.is_empty() {
+ continue;
+ }
+ let (remove_instances, add_instances) = Self::diff(¤t_instances, &change);
+
+ for remove_instance in remove_instances {
+ let url = instance_to_url(remove_instance).as_str().to_owned();
+ let Ok(_) = tx.send(Ok(ServiceChange::Remove(url))).await else {
+ break;
+ };
+ }
+
+ for add_instance in add_instances {
+ let url = instance_to_url(add_instance).as_str().to_owned();
+ let Ok(_) = tx.send(Ok(ServiceChange::Insert(url, ()))).await else {
+ break;
+ };
+ }
+
+ current_instances = change;
+ }
+
+ info!("unsubscribe");
+ let _ = nacos_service_cloned
+ .unsubscribe(
+ registry_service_name_str_clone,
+ Some(group_name_clone),
+ Vec::default(),
+ event_listener_cloned,
+ )
+ .await;
+ });
+
+ let _ = self
+ .nacos_service
+ .subscribe(
+ registry_service_name_str,
+ Some(group_name),
+ Vec::default(),
+ event_listener,
+ )
+ .await?;
+
+ Ok(rx)
}
- async fn unsubscribe(&self, url: Url) -> Result<(), dubbo::StdError> {
- let service_name = NacosServiceName::new(&url);
- let subscriber_url = service_name.to_subscriber_str();
- info!("unsubscribe: {}", &subscriber_url);
-
+ async fn unsubscribe(&self, _: Url) -> Result<(), StdError> {
Ok(())
}
+
+ fn url(&self) -> &Url {
+ &self.url
+ }
+}
+
+#[async_trait]
+impl Extension for NacosRegistry {
+ type Target = Box<dyn Registry + Send + 'static>;
+
+ fn name() -> String {
+ "nacos".to_string()
+ }
+
+ async fn create(url: &Url) -> Result<Self::Target, StdError> {
+ // url example:
+ // extension://0.0.0.0?extension-type=registry&extension-name=nacos®istry=nacos://127.0.0.1:8848
+ let registry_url = url.query::<RegistryUrl>().unwrap();
+ let registry_url = registry_url.value();
+
+ let host = registry_url.host().unwrap();
+ let port = registry_url.port().unwrap_or(8848);
+
+ let nacos_server_addr = format!("{}:{}", host, port);
+
+ let namespace = registry_url.query::<ServiceNamespace>().unwrap_or_default();
+ let namespace = namespace.value();
+
+ let app_name = registry_url.query::<AppName>().unwrap_or_default();
+ let app_name = app_name.value();
+
+ let user_name = registry_url.username();
+ let password = registry_url.password().unwrap_or_default();
+
+ let nacos_client_props = ClientProps::new()
+ .server_addr(nacos_server_addr)
+ .namespace(namespace)
+ .app_name(app_name)
+ .auth_username(user_name)
+ .auth_password(password);
+
+ let mut nacos_naming_builder = NamingServiceBuilder::new(nacos_client_props);
+
+ if !user_name.is_empty() {
+ nacos_naming_builder = nacos_naming_builder.enable_auth_plugin_http();
+ }
+
+ let nacos_naming_service = nacos_naming_builder.build().unwrap();
+
+ let nacos_registry = NacosRegistry::new(registry_url, Arc::new(nacos_naming_service));
+
+ Ok(Box::new(nacos_registry))
+ }
+}
+
+fn instance_to_url(instance: &ServiceInstance) -> Url {
+ let mut url = Url::empty();
+ url.set_protocol("provider");
+ url.set_host(instance.ip());
+ url.set_port(instance.port().try_into().unwrap_or_default());
+ url.extend_pairs(
+ instance
+ .metadata()
+ .iter()
+ .map(|(k, v)| (k.clone(), v.clone())),
+ );
+
+ url
+}
+
+struct NacosNamingEventListener {
+ tx: watch::Sender<Vec<ServiceInstance>>,
+ closed: Arc<Notify>,
+}
+
+impl NacosNamingEventListener {
+ fn new() -> (Self, watch::Receiver<Vec<ServiceInstance>>, Arc<Notify>) {
+ let (tx, rx) = watch::channel(Vec::new());
+
+ let closed = Arc::new(Notify::new());
+ let this = Self {
+ tx,
+ closed: closed.clone(),
+ };
+ (this, rx, closed)
+ }
+}
+
+impl NamingEventListener for NacosNamingEventListener {
+ fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
+ match event.instances {
+ Some(ref instances) => {
+ let instances = instances.clone();
+ let send = self.tx.send(instances);
+ match send {
+ Ok(_) => {}
+ Err(_) => {
+ self.closed.notify_waiters();
+ }
+ }
+ }
+ None => {}
+ }
+ }
}
struct NacosServiceName {
+ #[allow(dead_code)]
category: String,
- service_interface: String,
+ #[allow(dead_code)]
+ interface: String,
+ #[allow(dead_code)]
version: String,
+ #[allow(dead_code)]
group: String,
+
+ #[allow(dead_code)]
+ value: String,
}
impl NacosServiceName {
- fn new(url: &Url) -> NacosServiceName {
- let service_interface = url.get_service_name();
+ fn new(url: &Url) -> Self {
+ let interface = url.query::<InterfaceName>().unwrap();
+ let interface = interface.value();
- let category = url.get_param(CATEGORY_KEY).unwrap_or_default();
+ let category = url.query::<Category>().unwrap_or_default();
+ let category = category.value();
- let version = url.get_param(VERSION_KEY).unwrap_or_default();
+ let version = url.query::<Version>().unwrap_or_default();
+ let version = version.value();
- let group = url.get_param(GROUP_KEY).unwrap_or_default();
+ let group = url.query::<Group>().unwrap_or_default();
+ let group = group.value();
+
+ let value = format!("{}:{}:{}:{}", category, interface, version, group);
Self {
category,
- service_interface: service_interface.clone(),
+ interface,
version,
group,
- }
- }
-
- #[allow(dead_code)]
- fn from_service_name_str(service_name_str: &str) -> Self {
- let mut splitter = service_name_str.split(SERVICE_NAME_SEPARATOR);
-
- let category = splitter.next().unwrap_or_default().to_string();
- let service_interface = splitter.next().unwrap_or_default().to_string();
- let version = splitter.next().unwrap_or_default().to_string();
- let group = splitter.next().unwrap_or_default().to_string();
-
- Self {
- category,
- service_interface,
- version,
- group,
- }
- }
-
- #[allow(dead_code)]
- fn version(&self) -> &str {
- &self.version
- }
-
- #[allow(dead_code)]
- fn get_version_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.version.is_empty() {
- default
- } else {
- &self.version
- }
- }
-
- #[allow(dead_code)]
- fn group(&self) -> &str {
- &self.group
- }
-
- fn get_group_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.group.is_empty() {
- default
- } else {
- &self.group
+ value,
}
}
@@ -443,148 +396,23 @@
}
#[allow(dead_code)]
- fn get_category_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.category.is_empty() {
- default
- } else {
- &self.category
- }
+ fn interface(&self) -> &str {
+ &self.interface
}
#[allow(dead_code)]
- fn service_interface(&self) -> &str {
- &self.service_interface
+ fn version(&self) -> &str {
+ &self.version
}
#[allow(dead_code)]
- fn get_service_interface_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.service_interface.is_empty() {
- default
- } else {
- &self.service_interface
- }
- }
-
- fn to_register_str(&self) -> String {
- let category = if self.category.is_empty() {
- DEFAULT_CATEGORY
- } else {
- &self.category
- };
- format!(
- "{}:{}:{}:{}",
- category, self.service_interface, self.version, self.group
- )
- }
-
- fn to_subscriber_str(&self) -> String {
- let category = if is_concrete_str(&self.service_interface) {
- DEFAULT_CATEGORY
- } else {
- &self.category
- };
-
- format!(
- "{}:{}:{}:{}",
- category, self.service_interface, self.version, self.group
- )
+ fn group(&self) -> &str {
+ &self.group
}
#[allow(dead_code)]
- fn to_subscriber_legacy_string(&self) -> String {
- let mut legacy_string = DEFAULT_CATEGORY.to_owned();
- if !self.service_interface.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.service_interface);
- }
-
- if !self.version.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.version);
- }
-
- if !self.group.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.group);
- }
-
- legacy_string
- }
-
- #[allow(dead_code)]
- fn is_concrete(&self) -> bool {
- is_concrete_str(&self.service_interface)
- && is_concrete_str(&self.version)
- && is_concrete_str(&self.group)
- }
-
- #[allow(dead_code)]
- fn is_compatible(&self, other: &NacosServiceName) -> bool {
- if !other.is_concrete() {
- return false;
- }
-
- if !self.category.eq(&other.category) && !match_range(&self.category, &other.category) {
- return false;
- }
-
- if is_wildcard_str(&self.version) {
- return true;
- }
-
- if is_wildcard_str(&self.group) {
- return true;
- }
-
- if !&self.version.eq(&other.version) && !match_range(&self.version, &other.version) {
- return false;
- }
-
- if !self.group.eq(&other.group) && !match_range(&self.group, &other.group) {
- return false;
- }
-
- true
- }
-}
-
-struct ServiceChangeListener {
- tx: mpsc::Sender<Vec<ServiceInstance>>,
-}
-
-impl ServiceChangeListener {
- pub fn new() -> (Self, mpsc::Receiver<Vec<ServiceInstance>>) {
- let (tx, rx) = mpsc::channel(64);
- let this = Self { tx };
-
- (this, rx)
- }
-
- pub fn changed(&self, instances: Vec<ServiceInstance>) -> Result<(), dubbo::StdError> {
- match self.tx.try_send(instances) {
- Ok(_) => Ok(()),
- Err(e) => {
- error!("send service change failed: {:?}", e);
- Err(anyhow!("send service change failed: {:?}", e).into())
- }
- }
- }
-}
-
-impl NamingEventListener for ServiceChangeListener {
- fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
- debug!("service change {}", event.service_name.clone());
- debug!("nacos event: {:?}", event);
-
- let instances = event.instances.as_ref();
- match instances {
- None => {
- let _ = self.changed(Vec::default());
- }
- Some(instances) => {
- let _ = self.changed(instances.clone());
- }
- }
+ fn value(&self) -> &str {
+ &self.value
}
}
@@ -593,7 +421,9 @@
use core::time;
use std::thread;
+ use tracing::error;
+ use dubbo_base::{extension_param::ExtensionName, registry_param::Side};
use tracing::metadata::LevelFilter;
use super::*;
@@ -610,13 +440,17 @@
.with_max_level(LevelFilter::DEBUG)
.init();
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let registry = NacosRegistry::new(nacos_registry_url);
+ let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+ .parse()
+ .unwrap();
+ extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+ extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+ let registry = NacosRegistry::create(&extension_url).await.unwrap();
+
+ let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+
+ service_url.add_query_param(Side::Provider);
let ret = registry.register(service_url).await;
@@ -638,13 +472,17 @@
.with_max_level(LevelFilter::DEBUG)
.init();
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let registry = NacosRegistry::new(nacos_registry_url);
+ let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+ .parse()
+ .unwrap();
+ extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+ extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
- let mut service_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+ let registry = NacosRegistry::create(&extension_url).await.unwrap();
+
+ let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+
+ service_url.add_query_param(Side::Provider);
let ret = registry.register(service_url).await;
@@ -653,7 +491,7 @@
let sleep_millis = time::Duration::from_secs(10);
thread::sleep(sleep_millis);
- let unregister_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
+ let unregister_url = "tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
let ret = registry.unregister(unregister_url).await;
info!("deregister result: {:?}", ret);
@@ -674,19 +512,23 @@
.with_max_level(LevelFilter::DEBUG)
.init();
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let registry = NacosRegistry::new(nacos_registry_url);
+ let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+ .parse()
+ .unwrap();
+ extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+ extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+ let registry = NacosRegistry::create(&extension_url).await.unwrap();
+
+ let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+
+ service_url.add_query_param(Side::Provider);
let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
- let subscribe_url = Url::from_url("consumer://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+ let subscribe_url = "consumer://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider".parse().unwrap();
let subscribe_ret = registry.subscribe(subscribe_url).await;
if let Err(e) = subscribe_ret {
@@ -714,19 +556,23 @@
.with_max_level(LevelFilter::DEBUG)
.init();
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let registry = NacosRegistry::new(nacos_registry_url);
+ let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+ .parse()
+ .unwrap();
+ extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+ extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+ let registry = NacosRegistry::create(&extension_url).await.unwrap();
+
+ let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap();
+
+ service_url.add_query_param(Side::Provider);
let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
- let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+ let subscribe_url = "provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider".parse().unwrap();
let ret = registry.subscribe(subscribe_url).await;
@@ -742,13 +588,7 @@
let sleep_millis = time::Duration::from_secs(40);
thread::sleep(sleep_millis);
- let unsubscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
- let ret = registry.unsubscribe(unsubscribe_url).await;
-
- if let Err(e) = ret {
- error!("error message: {:?}", e);
- return;
- }
+ drop(rx);
let sleep_millis = time::Duration::from_secs(40);
thread::sleep(sleep_millis);
diff --git a/registry/nacos/src/utils/mod.rs b/registry/nacos/src/utils/mod.rs
deleted file mode 100644
index b247f60..0000000
--- a/registry/nacos/src/utils/mod.rs
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use dubbo_base::Url;
-use nacos_sdk::api::props::ClientProps;
-
-const APP_NAME_KEY: &str = "AppName";
-
-const UNKNOWN_APP: &str = "UnknownApp";
-
-const NAMESPACE_KEY: &str = "namespace";
-
-const DEFAULT_NAMESPACE: &str = "public";
-
-const USERNAME_KEY: &str = "username";
-
-const PASSWORD_KEY: &str = "password";
-
-const BACKUP_KEY: &str = "backup";
-
-const WILDCARD: &str = "*";
-
-const RANGE_STR_SEPARATOR: &str = ",";
-
-pub(crate) fn build_nacos_client_props(url: &Url) -> (nacos_sdk::api::props::ClientProps, bool) {
- let host = &url.ip;
- let port = &url.port;
- let backup = url
- .get_param(BACKUP_KEY)
- .map(|mut data| {
- data.insert(0, ',');
- data
- })
- .unwrap_or_default();
- let server_addr = format!("{}:{}{}", host, port, backup);
-
- let namespace = url
- .get_param(NAMESPACE_KEY)
- .unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
- let app_name = url
- .get_param(APP_NAME_KEY)
- .unwrap_or_else(|| UNKNOWN_APP.to_string());
- let username = url.get_param(USERNAME_KEY).unwrap_or_default();
- let password = url.get_param(PASSWORD_KEY).unwrap_or_default();
-
- let enable_auth = !password.is_empty() && !username.is_empty();
-
- // todo ext parameters
-
- let mut client_props = ClientProps::new();
-
- client_props = client_props
- .server_addr(server_addr)
- .namespace(namespace)
- .app_name(app_name)
- .auth_username(username)
- .auth_password(password);
-
- (client_props, enable_auth)
-}
-
-pub(crate) fn is_wildcard_str(str: &str) -> bool {
- str.eq(WILDCARD)
-}
-
-pub(crate) fn is_range_str(str: &str) -> bool {
- let ret = str.split(RANGE_STR_SEPARATOR);
- let count = ret.count();
- count > 1
-}
-
-pub(crate) fn is_concrete_str(str: &str) -> bool {
- !is_wildcard_str(str) && !is_range_str(str)
-}
-
-pub(crate) fn match_range(range: &str, value: &str) -> bool {
- if range.is_empty() {
- return true;
- }
-
- if !is_range_str(range) {
- return false;
- }
-
- range
- .split(RANGE_STR_SEPARATOR)
- .any(|data| (*data).eq(value))
-}
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index f3733d5..dc44899 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -22,17 +22,15 @@
use async_trait::async_trait;
use dubbo_base::{
constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
- Url,
+ StdError, Url,
};
use dubbo_logger::tracing::{debug, error, info};
use serde::{Deserialize, Serialize};
use tokio::{select, sync::mpsc};
use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
-use dubbo::{
- registry::n_registry::{DiscoverStream, Registry, ServiceChange},
- StdError,
-};
+use dubbo::extension::registry_extension::{DiscoverStream, Registry, ServiceChange};
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam};
// 从url中获取服务注册的元数据
// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
@@ -184,24 +182,24 @@
) -> (Vec<String>, Vec<String>) {
let old_urls_map: HashMap<String, String> = old_urls
.iter()
- .map(|url| dubbo_base::Url::from_url(url.as_str()))
- .filter(|item| item.is_some())
+ .map(|url| url.parse())
+ .filter(|item| item.is_ok())
.map(|item| item.unwrap())
- .map(|item| {
- let ip_port = item.get_ip_port();
- let url = item.encoded_raw_url_string();
+ .map(|item: Url| {
+ let ip_port = item.authority().to_owned();
+ let url = item.as_str().to_owned();
(ip_port, url)
})
.collect();
let new_urls_map: HashMap<String, String> = new_urls
.iter()
- .map(|url| dubbo_base::Url::from_url(url.as_str()))
- .filter(|item| item.is_some())
+ .map(|url| url.parse())
+ .filter(|item| item.is_ok())
.map(|item| item.unwrap())
- .map(|item| {
- let ip_port = item.get_ip_port();
- let url = item.encoded_raw_url_string();
+ .map(|item: Url| {
+ let ip_port = item.authority().to_owned();
+ let url = item.as_str().to_owned();
(ip_port, url)
})
.collect();
@@ -263,24 +261,23 @@
impl Registry for ZookeeperRegistry {
async fn register(&self, url: Url) -> Result<(), StdError> {
debug!("register url: {}", url);
+ let interface_name = url.query::<InterfaceName>().unwrap().value();
+ let url_str = url.as_str();
let zk_path = format!(
"/{}/{}/{}/{}",
- DUBBO_KEY,
- url.service_name,
- PROVIDERS_KEY,
- url.encoded_raw_url_string()
+ DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
);
self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
Ok(())
}
async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let interface_name = url.query::<InterfaceName>().unwrap().value();
+ let url_str = url.as_str();
+
let zk_path = format!(
"/{}/{}/{}/{}",
- DUBBO_KEY,
- url.service_name,
- PROVIDERS_KEY,
- url.encoded_raw_url_string()
+ DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
);
self.delete_path(zk_path.as_str());
Ok(())
@@ -288,8 +285,9 @@
// for consumer to find the changes of providers
async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
- let service_name = url.get_service_name();
- let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
+ let interface_name = url.query::<InterfaceName>().unwrap().value();
+
+ let zk_path = format!("/{}/{}/{}", DUBBO_KEY, interface_name, PROVIDERS_KEY);
debug!("subscribe service: {}", zk_path);
@@ -302,12 +300,12 @@
let zk_client_in_task = self.zk_client.clone();
let zk_path_in_task = zk_path.clone();
- let service_name_in_task = service_name.clone();
+ let interface_name_in_task = interface_name.clone();
let arc_listener_in_task = arc_listener.clone();
tokio::spawn(async move {
let zk_client = zk_client_in_task;
let zk_path = zk_path_in_task;
- let service_name = service_name_in_task;
+ let interface_name = interface_name_in_task;
let listener = arc_listener_in_task;
let mut current_urls = Vec::new();
@@ -383,12 +381,17 @@
}
async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
- let service_name = url.get_service_name();
- let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
+ let interface_name = url.query::<InterfaceName>().unwrap().value();
+
+ let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &interface_name, PROVIDERS_KEY);
info!("unsubscribe service: {}", zk_path);
Ok(())
}
+
+ fn url(&self) -> &Url {
+ todo!()
+ }
}
pub struct ZooKeeperListener {
diff --git a/remoting/base/src/exchange/client.rs b/remoting/base/src/exchange/client.rs
index edbb4a6..71c42d4 100644
--- a/remoting/base/src/exchange/client.rs
+++ b/remoting/base/src/exchange/client.rs
@@ -51,7 +51,7 @@
pub fn new(url: Url, client: BoxedClient, connection_timeout: Duration) -> Self {
ExchangeClient {
connection_timeout,
- address: url.get_ip_port(),
+ address: url.authority().to_owned(),
client: None,
init: AtomicBool::new(false),
active: AtomicI32::new(0),