Ftr: add new-arc related packages (#124)
diff --git a/Cargo.toml b/Cargo.toml
index c879e4d..ad66064 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,7 +2,7 @@
members = [
"common/logger",
"common/utils",
- "xds",
+ "common/extention",
"registry/zookeeper",
"registry/nacos",
"metadata",
@@ -11,7 +11,15 @@
"examples/echo",
"examples/greeter",
"dubbo-build",
- "remoting/net"
+ "remoting/net",
+ "remoting/http",
+ "remoting/h2",
+ "remoting/zookeeper",
+ "remoting/exchange",
+ "remoting/xds",
+ "protocol/dubbo2",
+ "protocol/protocol",
+ "protocol/triple"
]
@@ -31,6 +39,12 @@
urlencoding = "2.1.2"
logger = {path="./common/logger"}
utils = {path="./common/utils"}
+remoting-net = {path="./remoting/net"}
+protocol = {path="./protocol/protocol"}
+protocol-dubbo2 = {path="./protocol/dubbo2"}
+protocol-triple = {path="./protocol/triple"}
+registry-zookeeper = {path="./registry/zookeeper"}
+registry-nacos = {path="./registry/nacos"}
anyhow = "1.0.66"
dubbo = { path = "./dubbo/" }
bb8 = "0.8.0" # A connecton pool based on tokio
diff --git a/common/extention/Cargo.toml b/common/extention/Cargo.toml
new file mode 100644
index 0000000..9b1450c
--- /dev/null
+++ b/common/extention/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "extention"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
diff --git a/xds/LICENSE b/common/extention/LICENSE
similarity index 100%
copy from xds/LICENSE
copy to common/extention/LICENSE
diff --git a/xds/src/lib.rs b/common/extention/src/lib.rs
similarity index 87%
copy from xds/src/lib.rs
copy to common/extention/src/lib.rs
index 3e01853..d64452d 100644
--- a/xds/src/lib.rs
+++ b/common/extention/src/lib.rs
@@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn it_works() {
- let result = 2 + 2;
+ let result = add(2, 2);
assert_eq!(result, 4);
}
}
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 93ba6d9..53f8b9a 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -28,13 +28,13 @@
prost = "0.10.4"
async-trait = "0.1.56"
tokio-stream = "0.1"
-logger = {path="../../common/logger"}
+logger.workspace=true
hyper = { version = "0.14.19", features = ["full"]}
dubbo = {path = "../../dubbo", version = "0.3.0" }
dubbo-config = {path = "../../config", version = "0.3.0" }
-dubbo-registry-zookeeper = {path = "../../registry/zookeeper", version = "0.3.0" }
+registry-zookeeper.workspace=true
[build-dependencies]
dubbo-build = {path = "../../dubbo-build", version = "0.3.0" }
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index 37bbbde..91cbefe 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -22,17 +22,17 @@
[dependencies]
http = "0.2"
http-body = "0.4.4"
-futures-util = {version = "0.3", default-features = false}
-tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal"] }
-prost-derive = {version = "0.10", optional = true}
+futures-util = { version = "0.3", default-features = false }
+tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net", "signal"] }
+prost-derive = { version = "0.10", optional = true }
prost = "0.10.4"
async-trait = "0.1.56"
tokio-stream = "0.1"
-logger = {path="../../common/logger"}
-dubbo = {path = "../../dubbo", version = "0.3.0" }
-dubbo-config = {path = "../../config", version = "0.3.0" }
-dubbo-registry-zookeeper = {path = "../../registry/zookeeper", version = "0.3.0" }
-dubbo-registry-nacos = {path = "../../registry/nacos", version = "0.3.0" }
+logger = { path = "../../common/logger" }
+dubbo = { path = "../../dubbo", version = "0.3.0" }
+dubbo-config = { path = "../../config", version = "0.3.0" }
+registry-zookeeper.workspace = true
+registry-nacos.workspace = true
[build-dependencies]
-dubbo-build = {path = "../../dubbo-build", version = "0.3.0" }
+dubbo-build = { path = "../../dubbo-build", version = "0.3.0" }
diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs
index 71d8f24..4b5437a 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -23,10 +23,11 @@
use std::env;
use dubbo::{codegen::*, common::url::Url};
-use dubbo_registry_nacos::nacos_registry::NacosRegistry;
-use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
+
use futures_util::StreamExt;
use protos::{greeter_client::GreeterClient, GreeterRequest};
+use registry_nacos::NacosRegistry;
+use registry_zookeeper::ZookeeperRegistry;
#[tokio::main]
async fn main() {
diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs
index a275d6d..32931e5 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -24,7 +24,6 @@
use dubbo::{codegen::*, Dubbo};
use dubbo_config::RootConfig;
-use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
use logger::{
tracing::{info, span},
Level,
@@ -33,6 +32,7 @@
greeter_server::{register_server, Greeter},
GreeterReply, GreeterRequest,
};
+use registry_zookeeper::ZookeeperRegistry;
pub mod protos {
#![allow(non_camel_case_types)]
diff --git a/protocol/README.md b/protocol/README.md
new file mode 100644
index 0000000..c6b9a8c
--- /dev/null
+++ b/protocol/README.md
@@ -0,0 +1,4 @@
+/protocol
+ /protocol # define protocol abstract layer
+ /dubbo2 # for dubbo2 protocol, hessian2 codec as default
+ /triple # for triple protocol
\ No newline at end of file
diff --git a/protocol/dubbo2/Cargo.toml b/protocol/dubbo2/Cargo.toml
new file mode 100644
index 0000000..09e950c
--- /dev/null
+++ b/protocol/dubbo2/Cargo.toml
@@ -0,0 +1,10 @@
+[package]
+name = "protocol-dubbo2"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+remoting-net.workspace = true
+protocol.workspace = true
\ No newline at end of file
diff --git a/xds/LICENSE b/protocol/dubbo2/LICENSE
similarity index 100%
copy from xds/LICENSE
copy to protocol/dubbo2/LICENSE
diff --git a/xds/src/lib.rs b/protocol/dubbo2/src/lib.rs
similarity index 87%
copy from xds/src/lib.rs
copy to protocol/dubbo2/src/lib.rs
index 3e01853..d64452d 100644
--- a/xds/src/lib.rs
+++ b/protocol/dubbo2/src/lib.rs
@@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn it_works() {
- let result = 2 + 2;
+ let result = add(2, 2);
assert_eq!(result, 4);
}
}
diff --git a/protocol/protocol/Cargo.toml b/protocol/protocol/Cargo.toml
new file mode 100644
index 0000000..77c8260
--- /dev/null
+++ b/protocol/protocol/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "protocol"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
diff --git a/xds/LICENSE b/protocol/protocol/LICENSE
similarity index 100%
copy from xds/LICENSE
copy to protocol/protocol/LICENSE
diff --git a/xds/src/lib.rs b/protocol/protocol/src/lib.rs
similarity index 87%
copy from xds/src/lib.rs
copy to protocol/protocol/src/lib.rs
index 3e01853..d64452d 100644
--- a/xds/src/lib.rs
+++ b/protocol/protocol/src/lib.rs
@@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn it_works() {
- let result = 2 + 2;
+ let result = add(2, 2);
assert_eq!(result, 4);
}
}
diff --git a/protocol/triple/Cargo.toml b/protocol/triple/Cargo.toml
new file mode 100644
index 0000000..43aa6c5
--- /dev/null
+++ b/protocol/triple/Cargo.toml
@@ -0,0 +1,10 @@
+[package]
+name = "protocol-triple"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+remoting-net.workspace = true
+protocol.workspace = true
\ No newline at end of file
diff --git a/xds/LICENSE b/protocol/triple/LICENSE
similarity index 100%
copy from xds/LICENSE
copy to protocol/triple/LICENSE
diff --git a/xds/src/lib.rs b/protocol/triple/src/lib.rs
similarity index 87%
copy from xds/src/lib.rs
copy to protocol/triple/src/lib.rs
index 3e01853..9d8d4b0 100644
--- a/xds/src/lib.rs
+++ b/protocol/triple/src/lib.rs
@@ -15,11 +15,17 @@
* limitations under the License.
*/
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn it_works() {
- let result = 2 + 2;
+ let result = add(2, 2);
assert_eq!(result, 4);
}
}
diff --git a/registry/README.md b/registry/README.md
new file mode 100644
index 0000000..ee46c7c
--- /dev/null
+++ b/registry/README.md
@@ -0,0 +1,3 @@
+/registry
+ /nacos # nacos registry and servicediscovery implementation
+ /zookeeper # zookeeper registry and servicediscovery implementation
\ No newline at end of file
diff --git a/registry/nacos/Cargo.toml b/registry/nacos/Cargo.toml
index 1e4518d..1a3a686 100644
--- a/registry/nacos/Cargo.toml
+++ b/registry/nacos/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "dubbo-registry-nacos"
+name = "registry-nacos"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"
diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs
index aa8cc4d..38e710f 100644
--- a/registry/nacos/src/lib.rs
+++ b/registry/nacos/src/lib.rs
@@ -14,5 +14,696 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub mod nacos_registry;
mod utils;
+
+use std::{
+ collections::{HashMap, HashSet},
+ sync::{Arc, Mutex},
+};
+
+use anyhow::anyhow;
+use dubbo::{
+ common::url::Url,
+ registry::{NotifyListener, Registry, RegistryNotifyListener, ServiceEvent},
+};
+use logger::tracing::{error, info, warn};
+use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
+
+use crate::utils::{build_nacos_client_props, is_concrete_str, is_wildcard_str, match_range};
+
+const VERSION_KEY: &str = "version";
+
+const GROUP_KEY: &str = "group";
+
+const DEFAULT_GROUP: &str = "DEFAULT_GROUP";
+
+const PROVIDER_SIDE: &str = "provider";
+
+const DEFAULT_CATEGORY: &str = PROVIDERS_CATEGORY;
+
+const SIDE_KEY: &str = "side";
+
+const REGISTER_CONSUMER_URL_KEY: &str = "register-consumer-url";
+
+const SERVICE_NAME_SEPARATOR: &str = ":";
+
+const CATEGORY_KEY: &str = "category";
+
+const PROVIDERS_CATEGORY: &str = "providers";
+
+#[allow(dead_code)]
+const ADMIN_PROTOCOL: &str = "admin";
+
+#[allow(dead_code)]
+const INNERCLASS_SYMBOL: &str = "$";
+
+#[allow(dead_code)]
+const INNERCLASS_COMPATIBLE_SYMBOL: &str = "___";
+
+pub struct NacosRegistry {
+ nacos_naming_service: Arc<dyn NamingService + Sync + Send + 'static>,
+ listeners: Mutex<HashMap<String, HashSet<Arc<NotifyListenerWrapper>>>>,
+}
+
+impl NacosRegistry {
+ pub fn new(url: Url) -> Self {
+ let (nacos_client_props, enable_auth) = build_nacos_client_props(&url);
+
+ let mut nacos_naming_builder = NamingServiceBuilder::new(nacos_client_props);
+
+ if enable_auth {
+ nacos_naming_builder = nacos_naming_builder.enable_auth_plugin_http();
+ }
+
+ let nacos_naming_service = nacos_naming_builder.build().unwrap();
+
+ Self {
+ nacos_naming_service: Arc::new(nacos_naming_service),
+ listeners: Mutex::new(HashMap::new()),
+ }
+ }
+
+ #[allow(dead_code)]
+ fn get_subscribe_service_names(&self, service_name: &NacosServiceName) -> HashSet<String> {
+ if service_name.is_concrete() {
+ let mut set = HashSet::new();
+ let service_subscribe_name = service_name.to_subscriber_str();
+ let service_subscriber_legacy_name = service_name.to_subscriber_legacy_string();
+ if service_subscribe_name.eq(&service_subscriber_legacy_name) {
+ set.insert(service_subscribe_name);
+ } else {
+ set.insert(service_subscribe_name);
+ set.insert(service_subscriber_legacy_name);
+ }
+
+ set
+ } else {
+ let list_view = self.nacos_naming_service.get_service_list(
+ 1,
+ i32::MAX,
+ Some(
+ service_name
+ .get_group_with_default(DEFAULT_GROUP)
+ .to_string(),
+ ),
+ );
+ if let Err(e) = list_view {
+ error!("list service instances occur an error: {:?}", e);
+ return HashSet::default();
+ }
+
+ let list_view = list_view.unwrap();
+ let set: HashSet<String> = list_view
+ .0
+ .into_iter()
+ .filter(|service_name| service_name.split(SERVICE_NAME_SEPARATOR).count() == 4)
+ .map(|service_name| NacosServiceName::from_service_name_str(&service_name))
+ .filter(|other_service_name| service_name.is_compatible(other_service_name))
+ .map(|service_name| service_name.to_subscriber_str())
+ .collect();
+ set
+ }
+ }
+}
+
+impl NacosRegistry {
+ fn create_nacos_service_instance(url: Url) -> ServiceInstance {
+ let ip = url.ip;
+ let port = url.port;
+ nacos_sdk::api::naming::ServiceInstance {
+ ip,
+ port: port.parse().unwrap(),
+ metadata: url.params,
+ ..Default::default()
+ }
+ }
+}
+
+impl Registry for NacosRegistry {
+ fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
+ let side = url.get_param(SIDE_KEY).unwrap_or_default();
+ let register_consumer = url
+ .get_param(REGISTER_CONSUMER_URL_KEY)
+ .unwrap_or_else(|| false.to_string())
+ .parse::<bool>()
+ .unwrap_or(false);
+ if side.ne(PROVIDER_SIDE) && !register_consumer {
+ warn!("Please set 'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url registration.");
+ return Ok(());
+ }
+
+ let nacos_service_name = NacosServiceName::new(&url);
+
+ let group_name = Some(
+ nacos_service_name
+ .get_group_with_default(DEFAULT_GROUP)
+ .to_string(),
+ );
+ let nacos_service_name = nacos_service_name.to_register_str();
+
+ let nacos_service_instance = Self::create_nacos_service_instance(url);
+
+ info!("register service: {}", nacos_service_name);
+ let ret = self.nacos_naming_service.register_instance(
+ nacos_service_name,
+ group_name,
+ nacos_service_instance,
+ );
+ if let Err(e) = ret {
+ error!("register to nacos occur an error: {:?}", e);
+ return Err(anyhow!("register to nacos occur an error: {:?}", e).into());
+ }
+
+ Ok(())
+ }
+
+ fn unregister(&mut self, url: Url) -> Result<(), dubbo::StdError> {
+ let nacos_service_name = NacosServiceName::new(&url);
+
+ let group_name = Some(
+ nacos_service_name
+ .get_group_with_default(DEFAULT_GROUP)
+ .to_string(),
+ );
+ let nacos_service_name = nacos_service_name.to_register_str();
+
+ let nacos_service_instance = Self::create_nacos_service_instance(url);
+
+ info!("deregister service: {}", nacos_service_name);
+
+ let ret = self.nacos_naming_service.deregister_instance(
+ nacos_service_name,
+ group_name,
+ nacos_service_instance,
+ );
+ if let Err(e) = ret {
+ error!("deregister service from nacos occur an error: {:?}", e);
+ return Err(anyhow!("deregister service from nacos occur an error: {:?}", e).into());
+ }
+ Ok(())
+ }
+
+ fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), dubbo::StdError> {
+ let service_name = NacosServiceName::new(&url);
+ let url_str = url.to_url();
+
+ info!("subscribe: {}", &url_str);
+
+ let nacos_listener: Arc<NotifyListenerWrapper> = {
+ let listeners = self.listeners.lock();
+ if let Err(e) = listeners {
+ error!("subscribe service failed: {:?}", e);
+ return Err(anyhow!("subscribe service failed: {:?}", e).into());
+ }
+
+ let mut listeners = listeners.unwrap();
+ let listener_set = listeners.get_mut(url_str.as_str());
+
+ let wrapper = Arc::new(NotifyListenerWrapper(listener));
+ if let Some(listener_set) = listener_set {
+ listener_set.insert(wrapper.clone());
+ } else {
+ let mut hash_set = HashSet::new();
+ hash_set.insert(wrapper.clone());
+ listeners.insert(url_str, hash_set);
+ }
+
+ wrapper
+ };
+
+ let ret = self.nacos_naming_service.subscribe(
+ service_name.to_subscriber_str(),
+ Some(
+ service_name
+ .get_group_with_default(DEFAULT_GROUP)
+ .to_string(),
+ ),
+ Vec::new(),
+ nacos_listener,
+ );
+
+ if let Err(e) = ret {
+ error!("subscribe service failed: {:?}", e);
+ return Err(anyhow!("subscribe service failed: {:?}", e).into());
+ }
+
+ Ok(())
+ }
+
+ fn unsubscribe(
+ &self,
+ url: Url,
+ listener: RegistryNotifyListener,
+ ) -> Result<(), dubbo::StdError> {
+ let service_name = NacosServiceName::new(&url);
+ let url_str = url.to_url();
+ info!("unsubscribe: {}", &url_str);
+
+ let nacos_listener: Arc<NotifyListenerWrapper> = {
+ let listeners = self.listeners.lock();
+ if let Err(e) = listeners {
+ error!("unsubscribe service failed: {:?}", e);
+ return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
+ }
+
+ let mut listeners = listeners.unwrap();
+ let listener_set = listeners.get_mut(url_str.as_str());
+ if listener_set.is_none() {
+ return Ok(());
+ }
+
+ let listener_set = listener_set.unwrap();
+
+ let listener = Arc::new(NotifyListenerWrapper(listener));
+ let listener = listener_set.take(&listener);
+ if listener.is_none() {
+ return Ok(());
+ }
+
+ listener.unwrap()
+ };
+
+ let ret = self.nacos_naming_service.unsubscribe(
+ service_name.to_subscriber_str(),
+ Some(
+ service_name
+ .get_group_with_default(DEFAULT_GROUP)
+ .to_string(),
+ ),
+ Vec::new(),
+ nacos_listener,
+ );
+
+ if let Err(e) = ret {
+ error!("unsubscribe service failed: {:?}", e);
+ return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
+ }
+
+ Ok(())
+ }
+}
+
+struct NacosServiceName {
+ category: String,
+
+ service_interface: String,
+
+ version: String,
+
+ group: String,
+}
+
+impl NacosServiceName {
+ fn new(url: &Url) -> NacosServiceName {
+ let service_interface = url.get_service_name();
+
+ let category = url.get_param(CATEGORY_KEY).unwrap_or_default();
+
+ let version = url.get_param(VERSION_KEY).unwrap_or_default();
+
+ let group = url.get_param(GROUP_KEY).unwrap_or_default();
+
+ Self {
+ category,
+ service_interface: service_interface.clone(),
+ version,
+ group,
+ }
+ }
+
+ #[allow(dead_code)]
+ fn from_service_name_str(service_name_str: &str) -> Self {
+ let mut splitter = service_name_str.split(SERVICE_NAME_SEPARATOR);
+
+ let category = splitter.next().unwrap_or_default().to_string();
+ let service_interface = splitter.next().unwrap_or_default().to_string();
+ let version = splitter.next().unwrap_or_default().to_string();
+ let group = splitter.next().unwrap_or_default().to_string();
+
+ Self {
+ category,
+ service_interface,
+ version,
+ group,
+ }
+ }
+
+ #[allow(dead_code)]
+ fn version(&self) -> &str {
+ &self.version
+ }
+
+ #[allow(dead_code)]
+ fn get_version_with_default<'a>(&'a self, default: &'a str) -> &str {
+ if self.version.is_empty() {
+ default
+ } else {
+ &self.version
+ }
+ }
+
+ #[allow(dead_code)]
+ fn group(&self) -> &str {
+ &self.group
+ }
+
+ fn get_group_with_default<'a>(&'a self, default: &'a str) -> &str {
+ if self.group.is_empty() {
+ default
+ } else {
+ &self.group
+ }
+ }
+
+ #[allow(dead_code)]
+ fn category(&self) -> &str {
+ &self.category
+ }
+
+ #[allow(dead_code)]
+ fn get_category_with_default<'a>(&'a self, default: &'a str) -> &str {
+ if self.category.is_empty() {
+ default
+ } else {
+ &self.category
+ }
+ }
+
+ #[allow(dead_code)]
+ fn service_interface(&self) -> &str {
+ &self.service_interface
+ }
+
+ #[allow(dead_code)]
+ fn get_service_interface_with_default<'a>(&'a self, default: &'a str) -> &str {
+ if self.service_interface.is_empty() {
+ default
+ } else {
+ &self.service_interface
+ }
+ }
+
+ fn to_register_str(&self) -> String {
+ let category = if self.category.is_empty() {
+ DEFAULT_CATEGORY
+ } else {
+ &self.category
+ };
+ format!(
+ "{}:{}:{}:{}",
+ category, self.service_interface, self.version, self.group
+ )
+ }
+
+ fn to_subscriber_str(&self) -> String {
+ let category = if is_concrete_str(&self.service_interface) {
+ DEFAULT_CATEGORY
+ } else {
+ &self.category
+ };
+
+ format!(
+ "{}:{}:{}:{}",
+ category, self.service_interface, self.version, self.group
+ )
+ }
+
+ #[allow(dead_code)]
+ fn to_subscriber_legacy_string(&self) -> String {
+ let mut legacy_string = DEFAULT_CATEGORY.to_owned();
+ if !self.service_interface.is_empty() {
+ legacy_string.push_str(SERVICE_NAME_SEPARATOR);
+ legacy_string.push_str(&self.service_interface);
+ }
+
+ if !self.version.is_empty() {
+ legacy_string.push_str(SERVICE_NAME_SEPARATOR);
+ legacy_string.push_str(&self.version);
+ }
+
+ if !self.group.is_empty() {
+ legacy_string.push_str(SERVICE_NAME_SEPARATOR);
+ legacy_string.push_str(&self.group);
+ }
+
+ legacy_string
+ }
+
+ #[allow(dead_code)]
+ fn is_concrete(&self) -> bool {
+ is_concrete_str(&self.service_interface)
+ && is_concrete_str(&self.version)
+ && is_concrete_str(&self.group)
+ }
+
+ #[allow(dead_code)]
+ fn is_compatible(&self, other: &NacosServiceName) -> bool {
+ if !other.is_concrete() {
+ return false;
+ }
+
+ if !self.category.eq(&other.category) && !match_range(&self.category, &other.category) {
+ return false;
+ }
+
+ if is_wildcard_str(&self.version) {
+ return true;
+ }
+
+ if is_wildcard_str(&self.group) {
+ return true;
+ }
+
+ if !&self.version.eq(&other.version) && !match_range(&self.version, &other.version) {
+ return false;
+ }
+
+ if !self.group.eq(&other.group) && !match_range(&self.group, &other.group) {
+ return false;
+ }
+
+ true
+ }
+}
+
+struct NotifyListenerWrapper(Arc<dyn NotifyListener + Sync + Send + 'static>);
+
+impl std::hash::Hash for NotifyListenerWrapper {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ let ptr = self.0.as_ref();
+ std::ptr::hash(ptr, state);
+ }
+}
+
+impl PartialEq for NotifyListenerWrapper {
+ fn eq(&self, other: &Self) -> bool {
+ let self_ptr = self.0.as_ref() as *const dyn NotifyListener;
+ let other_ptr = other.0.as_ref() as *const dyn NotifyListener;
+
+ let (self_data_ptr, _): (*const u8, *const u8) = unsafe { std::mem::transmute(self_ptr) };
+
+ let (other_data_ptr, _): (*const u8, *const u8) = unsafe { std::mem::transmute(other_ptr) };
+ self_data_ptr == other_data_ptr
+ }
+}
+
+impl Eq for NotifyListenerWrapper {}
+
+impl nacos_sdk::api::naming::NamingEventListener for NotifyListenerWrapper {
+ fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
+ let service_name = event.service_name.clone();
+ let instances = event.instances.as_ref();
+ let urls: Vec<Url>;
+ if let Some(instances) = instances {
+ urls = instances
+ .iter()
+ .filter_map(|data| {
+ let url_str =
+ format!("triple://{}:{}/{}", data.ip(), data.port(), service_name);
+ Url::from_url(&url_str)
+ })
+ .collect();
+ } else {
+ urls = Vec::new();
+ }
+ let notify_event = ServiceEvent {
+ key: service_name,
+ action: String::from("CHANGE"),
+ service: urls,
+ };
+ self.0.notify(notify_event);
+ }
+}
+
+#[cfg(test)]
+pub mod tests {
+
+ use core::time;
+ use std::thread;
+
+ use tracing::metadata::LevelFilter;
+
+ use super::*;
+
+ #[test]
+ #[ignore]
+ pub fn test_register_to_nacos() {
+ tracing_subscriber::fmt()
+ .with_thread_names(true)
+ .with_file(true)
+ .with_level(true)
+ .with_line_number(true)
+ .with_thread_ids(true)
+ .with_max_level(LevelFilter::DEBUG)
+ .init();
+
+ let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
+ let mut registry = NacosRegistry::new(nacos_registry_url);
+
+ let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
+ service_url
+ .params
+ .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+
+ let ret = registry.register(service_url);
+
+ info!("register result: {:?}", ret);
+
+ let sleep_millis = time::Duration::from_secs(300);
+ thread::sleep(sleep_millis);
+ }
+
+ #[test]
+ #[ignore]
+ pub fn test_register_and_unregister() {
+ tracing_subscriber::fmt()
+ .with_thread_names(true)
+ .with_file(true)
+ .with_level(true)
+ .with_line_number(true)
+ .with_thread_ids(true)
+ .with_max_level(LevelFilter::DEBUG)
+ .init();
+
+ let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
+ let mut registry = NacosRegistry::new(nacos_registry_url);
+
+ let mut service_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
+ service_url
+ .params
+ .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+
+ let ret = registry.register(service_url);
+
+ info!("register result: {:?}", ret);
+
+ let sleep_millis = time::Duration::from_secs(10);
+ thread::sleep(sleep_millis);
+
+ let unregister_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
+ let ret = registry.unregister(unregister_url);
+
+ info!("deregister result: {:?}", ret);
+
+ let sleep_millis = time::Duration::from_secs(10);
+ thread::sleep(sleep_millis);
+ }
+
+ struct TestNotifyListener;
+ impl NotifyListener for TestNotifyListener {
+ fn notify(&self, event: ServiceEvent) {
+ info!("notified: {:?}", event.key);
+ }
+
+ fn notify_all(&self, event: ServiceEvent) {
+ info!("notify_all: {:?}", event.key);
+ }
+ }
+
+ #[test]
+ #[ignore]
+ fn test_subscribe() {
+ tracing_subscriber::fmt()
+ .with_thread_names(true)
+ .with_file(true)
+ .with_level(true)
+ .with_line_number(true)
+ .with_thread_ids(true)
+ .with_max_level(LevelFilter::DEBUG)
+ .init();
+
+ let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
+ let mut registry = NacosRegistry::new(nacos_registry_url);
+
+ let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
+ service_url
+ .params
+ .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+
+ let ret = registry.register(service_url);
+
+ info!("register result: {:?}", ret);
+
+ let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+
+ let ret = registry.subscribe(subscribe_url, Arc::new(TestNotifyListener));
+
+ if let Err(e) = ret {
+ error!("error message: {:?}", e);
+ return;
+ }
+
+ let sleep_millis = time::Duration::from_secs(300);
+ thread::sleep(sleep_millis);
+ }
+
+ #[test]
+ #[ignore]
+ fn test_unsubscribe() {
+ tracing_subscriber::fmt()
+ .with_thread_names(true)
+ .with_file(true)
+ .with_level(true)
+ .with_line_number(true)
+ .with_thread_ids(true)
+ .with_max_level(LevelFilter::DEBUG)
+ .init();
+
+ let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
+ let mut registry = NacosRegistry::new(nacos_registry_url);
+
+ let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
+ service_url
+ .params
+ .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+
+ let ret = registry.register(service_url);
+
+ info!("register result: {:?}", ret);
+
+ let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+
+ let listener = Arc::new(TestNotifyListener);
+
+ let ret = registry.subscribe(subscribe_url, listener.clone());
+
+ if let Err(e) = ret {
+ error!("error message: {:?}", e);
+ return;
+ }
+
+ let sleep_millis = time::Duration::from_secs(40);
+ thread::sleep(sleep_millis);
+
+ let unsubscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+ let ret = registry.unsubscribe(unsubscribe_url, listener.clone());
+
+ if let Err(e) = ret {
+ error!("error message: {:?}", e);
+ return;
+ }
+
+ let sleep_millis = time::Duration::from_secs(40);
+ thread::sleep(sleep_millis);
+ }
+}
diff --git a/registry/nacos/src/nacos_registry.rs b/registry/nacos/src/nacos_registry.rs
deleted file mode 100644
index 8fd90fc..0000000
--- a/registry/nacos/src/nacos_registry.rs
+++ /dev/null
@@ -1,708 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use std::{
- collections::{HashMap, HashSet},
- sync::{Arc, Mutex},
-};
-
-use anyhow::anyhow;
-use dubbo::{
- common::url::Url,
- registry::{NotifyListener, Registry, RegistryNotifyListener, ServiceEvent},
-};
-use logger::tracing::{error, info, warn};
-use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
-
-use crate::utils::{build_nacos_client_props, is_concrete_str, is_wildcard_str, match_range};
-
-const VERSION_KEY: &str = "version";
-
-const GROUP_KEY: &str = "group";
-
-const DEFAULT_GROUP: &str = "DEFAULT_GROUP";
-
-const PROVIDER_SIDE: &str = "provider";
-
-const DEFAULT_CATEGORY: &str = PROVIDERS_CATEGORY;
-
-const SIDE_KEY: &str = "side";
-
-const REGISTER_CONSUMER_URL_KEY: &str = "register-consumer-url";
-
-const SERVICE_NAME_SEPARATOR: &str = ":";
-
-const CATEGORY_KEY: &str = "category";
-
-const PROVIDERS_CATEGORY: &str = "providers";
-
-#[allow(dead_code)]
-const ADMIN_PROTOCOL: &str = "admin";
-
-#[allow(dead_code)]
-const INNERCLASS_SYMBOL: &str = "$";
-
-#[allow(dead_code)]
-const INNERCLASS_COMPATIBLE_SYMBOL: &str = "___";
-
-pub struct NacosRegistry {
- nacos_naming_service: Arc<dyn NamingService + Sync + Send + 'static>,
- listeners: Mutex<HashMap<String, HashSet<Arc<NotifyListenerWrapper>>>>,
-}
-
-impl NacosRegistry {
- pub fn new(url: Url) -> Self {
- let (nacos_client_props, enable_auth) = build_nacos_client_props(&url);
-
- let mut nacos_naming_builder = NamingServiceBuilder::new(nacos_client_props);
-
- if enable_auth {
- nacos_naming_builder = nacos_naming_builder.enable_auth_plugin_http();
- }
-
- let nacos_naming_service = nacos_naming_builder.build().unwrap();
-
- Self {
- nacos_naming_service: Arc::new(nacos_naming_service),
- listeners: Mutex::new(HashMap::new()),
- }
- }
-
- #[allow(dead_code)]
- fn get_subscribe_service_names(&self, service_name: &NacosServiceName) -> HashSet<String> {
- if service_name.is_concrete() {
- let mut set = HashSet::new();
- let service_subscribe_name = service_name.to_subscriber_str();
- let service_subscriber_legacy_name = service_name.to_subscriber_legacy_string();
- if service_subscribe_name.eq(&service_subscriber_legacy_name) {
- set.insert(service_subscribe_name);
- } else {
- set.insert(service_subscribe_name);
- set.insert(service_subscriber_legacy_name);
- }
-
- set
- } else {
- let list_view = self.nacos_naming_service.get_service_list(
- 1,
- i32::MAX,
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- );
- if let Err(e) = list_view {
- error!("list service instances occur an error: {:?}", e);
- return HashSet::default();
- }
-
- let list_view = list_view.unwrap();
- let set: HashSet<String> = list_view
- .0
- .into_iter()
- .filter(|service_name| service_name.split(SERVICE_NAME_SEPARATOR).count() == 4)
- .map(|service_name| NacosServiceName::from_service_name_str(&service_name))
- .filter(|other_service_name| service_name.is_compatible(other_service_name))
- .map(|service_name| service_name.to_subscriber_str())
- .collect();
- set
- }
- }
-}
-
-impl NacosRegistry {
- fn create_nacos_service_instance(url: Url) -> ServiceInstance {
- let ip = url.ip;
- let port = url.port;
- nacos_sdk::api::naming::ServiceInstance {
- ip,
- port: port.parse().unwrap(),
- metadata: url.params,
- ..Default::default()
- }
- }
-}
-
-impl Registry for NacosRegistry {
- fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
- let side = url.get_param(SIDE_KEY).unwrap_or_default();
- let register_consumer = url
- .get_param(REGISTER_CONSUMER_URL_KEY)
- .unwrap_or_else(|| false.to_string())
- .parse::<bool>()
- .unwrap_or(false);
- if side.ne(PROVIDER_SIDE) && !register_consumer {
- warn!("Please set 'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url registration.");
- return Ok(());
- }
-
- let nacos_service_name = NacosServiceName::new(&url);
-
- let group_name = Some(
- nacos_service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- );
- let nacos_service_name = nacos_service_name.to_register_str();
-
- let nacos_service_instance = Self::create_nacos_service_instance(url);
-
- info!("register service: {}", nacos_service_name);
- let ret = self.nacos_naming_service.register_instance(
- nacos_service_name,
- group_name,
- nacos_service_instance,
- );
- if let Err(e) = ret {
- error!("register to nacos occur an error: {:?}", e);
- return Err(anyhow!("register to nacos occur an error: {:?}", e).into());
- }
-
- Ok(())
- }
-
- fn unregister(&mut self, url: Url) -> Result<(), dubbo::StdError> {
- let nacos_service_name = NacosServiceName::new(&url);
-
- let group_name = Some(
- nacos_service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- );
- let nacos_service_name = nacos_service_name.to_register_str();
-
- let nacos_service_instance = Self::create_nacos_service_instance(url);
-
- info!("deregister service: {}", nacos_service_name);
-
- let ret = self.nacos_naming_service.deregister_instance(
- nacos_service_name,
- group_name,
- nacos_service_instance,
- );
- if let Err(e) = ret {
- error!("deregister service from nacos occur an error: {:?}", e);
- return Err(anyhow!("deregister service from nacos occur an error: {:?}", e).into());
- }
- Ok(())
- }
-
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), dubbo::StdError> {
- let service_name = NacosServiceName::new(&url);
- let url_str = url.to_url();
-
- info!("subscribe: {}", &url_str);
-
- let nacos_listener: Arc<NotifyListenerWrapper> = {
- let listeners = self.listeners.lock();
- if let Err(e) = listeners {
- error!("subscribe service failed: {:?}", e);
- return Err(anyhow!("subscribe service failed: {:?}", e).into());
- }
-
- let mut listeners = listeners.unwrap();
- let listener_set = listeners.get_mut(url_str.as_str());
-
- let wrapper = Arc::new(NotifyListenerWrapper(listener));
- if let Some(listener_set) = listener_set {
- listener_set.insert(wrapper.clone());
- } else {
- let mut hash_set = HashSet::new();
- hash_set.insert(wrapper.clone());
- listeners.insert(url_str, hash_set);
- }
-
- wrapper
- };
-
- let ret = self.nacos_naming_service.subscribe(
- service_name.to_subscriber_str(),
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- Vec::new(),
- nacos_listener,
- );
-
- if let Err(e) = ret {
- error!("subscribe service failed: {:?}", e);
- return Err(anyhow!("subscribe service failed: {:?}", e).into());
- }
-
- Ok(())
- }
-
- fn unsubscribe(
- &self,
- url: Url,
- listener: RegistryNotifyListener,
- ) -> Result<(), dubbo::StdError> {
- let service_name = NacosServiceName::new(&url);
- let url_str = url.to_url();
- info!("unsubscribe: {}", &url_str);
-
- let nacos_listener: Arc<NotifyListenerWrapper> = {
- let listeners = self.listeners.lock();
- if let Err(e) = listeners {
- error!("unsubscribe service failed: {:?}", e);
- return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
- }
-
- let mut listeners = listeners.unwrap();
- let listener_set = listeners.get_mut(url_str.as_str());
- if listener_set.is_none() {
- return Ok(());
- }
-
- let listener_set = listener_set.unwrap();
-
- let listener = Arc::new(NotifyListenerWrapper(listener));
- let listener = listener_set.take(&listener);
- if listener.is_none() {
- return Ok(());
- }
-
- listener.unwrap()
- };
-
- let ret = self.nacos_naming_service.unsubscribe(
- service_name.to_subscriber_str(),
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- Vec::new(),
- nacos_listener,
- );
-
- if let Err(e) = ret {
- error!("unsubscribe service failed: {:?}", e);
- return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
- }
-
- Ok(())
- }
-}
-
-struct NacosServiceName {
- category: String,
-
- service_interface: String,
-
- version: String,
-
- group: String,
-}
-
-impl NacosServiceName {
- fn new(url: &Url) -> NacosServiceName {
- let service_interface = url.get_service_name();
-
- let category = url.get_param(CATEGORY_KEY).unwrap_or_default();
-
- let version = url.get_param(VERSION_KEY).unwrap_or_default();
-
- let group = url.get_param(GROUP_KEY).unwrap_or_default();
-
- Self {
- category,
- service_interface: service_interface.clone(),
- version,
- group,
- }
- }
-
- #[allow(dead_code)]
- fn from_service_name_str(service_name_str: &str) -> Self {
- let mut splitter = service_name_str.split(SERVICE_NAME_SEPARATOR);
-
- let category = splitter.next().unwrap_or_default().to_string();
- let service_interface = splitter.next().unwrap_or_default().to_string();
- let version = splitter.next().unwrap_or_default().to_string();
- let group = splitter.next().unwrap_or_default().to_string();
-
- Self {
- category,
- service_interface,
- version,
- group,
- }
- }
-
- #[allow(dead_code)]
- fn version(&self) -> &str {
- &self.version
- }
-
- #[allow(dead_code)]
- fn get_version_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.version.is_empty() {
- default
- } else {
- &self.version
- }
- }
-
- #[allow(dead_code)]
- fn group(&self) -> &str {
- &self.group
- }
-
- fn get_group_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.group.is_empty() {
- default
- } else {
- &self.group
- }
- }
-
- #[allow(dead_code)]
- fn category(&self) -> &str {
- &self.category
- }
-
- #[allow(dead_code)]
- fn get_category_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.category.is_empty() {
- default
- } else {
- &self.category
- }
- }
-
- #[allow(dead_code)]
- fn service_interface(&self) -> &str {
- &self.service_interface
- }
-
- #[allow(dead_code)]
- fn get_service_interface_with_default<'a>(&'a self, default: &'a str) -> &str {
- if self.service_interface.is_empty() {
- default
- } else {
- &self.service_interface
- }
- }
-
- fn to_register_str(&self) -> String {
- let category = if self.category.is_empty() {
- DEFAULT_CATEGORY
- } else {
- &self.category
- };
- format!(
- "{}:{}:{}:{}",
- category, self.service_interface, self.version, self.group
- )
- }
-
- fn to_subscriber_str(&self) -> String {
- let category = if is_concrete_str(&self.service_interface) {
- DEFAULT_CATEGORY
- } else {
- &self.category
- };
-
- format!(
- "{}:{}:{}:{}",
- category, self.service_interface, self.version, self.group
- )
- }
-
- #[allow(dead_code)]
- fn to_subscriber_legacy_string(&self) -> String {
- let mut legacy_string = DEFAULT_CATEGORY.to_owned();
- if !self.service_interface.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.service_interface);
- }
-
- if !self.version.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.version);
- }
-
- if !self.group.is_empty() {
- legacy_string.push_str(SERVICE_NAME_SEPARATOR);
- legacy_string.push_str(&self.group);
- }
-
- legacy_string
- }
-
- #[allow(dead_code)]
- fn is_concrete(&self) -> bool {
- is_concrete_str(&self.service_interface)
- && is_concrete_str(&self.version)
- && is_concrete_str(&self.group)
- }
-
- #[allow(dead_code)]
- fn is_compatible(&self, other: &NacosServiceName) -> bool {
- if !other.is_concrete() {
- return false;
- }
-
- if !self.category.eq(&other.category) && !match_range(&self.category, &other.category) {
- return false;
- }
-
- if is_wildcard_str(&self.version) {
- return true;
- }
-
- if is_wildcard_str(&self.group) {
- return true;
- }
-
- if !&self.version.eq(&other.version) && !match_range(&self.version, &other.version) {
- return false;
- }
-
- if !self.group.eq(&other.group) && !match_range(&self.group, &other.group) {
- return false;
- }
-
- true
- }
-}
-
-struct NotifyListenerWrapper(Arc<dyn NotifyListener + Sync + Send + 'static>);
-
-impl std::hash::Hash for NotifyListenerWrapper {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- let ptr = self.0.as_ref();
- std::ptr::hash(ptr, state);
- }
-}
-
-impl PartialEq for NotifyListenerWrapper {
- fn eq(&self, other: &Self) -> bool {
- let self_ptr = self.0.as_ref() as *const dyn NotifyListener;
- let other_ptr = other.0.as_ref() as *const dyn NotifyListener;
-
- let (self_data_ptr, _): (*const u8, *const u8) = unsafe { std::mem::transmute(self_ptr) };
-
- let (other_data_ptr, _): (*const u8, *const u8) = unsafe { std::mem::transmute(other_ptr) };
- self_data_ptr == other_data_ptr
- }
-}
-
-impl Eq for NotifyListenerWrapper {}
-
-impl nacos_sdk::api::naming::NamingEventListener for NotifyListenerWrapper {
- fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
- let service_name = event.service_name.clone();
- let instances = event.instances.as_ref();
- let urls: Vec<Url>;
- if let Some(instances) = instances {
- urls = instances
- .iter()
- .filter_map(|data| {
- let url_str =
- format!("triple://{}:{}/{}", data.ip(), data.port(), service_name);
- Url::from_url(&url_str)
- })
- .collect();
- } else {
- urls = Vec::new();
- }
- let notify_event = ServiceEvent {
- key: service_name,
- action: String::from("CHANGE"),
- service: urls,
- };
- self.0.notify(notify_event);
- }
-}
-
-#[cfg(test)]
-pub mod tests {
-
- use core::time;
- use std::thread;
-
- use tracing::metadata::LevelFilter;
-
- use super::*;
-
- #[test]
- #[ignore]
- pub fn test_register_to_nacos() {
- tracing_subscriber::fmt()
- .with_thread_names(true)
- .with_file(true)
- .with_level(true)
- .with_line_number(true)
- .with_thread_ids(true)
- .with_max_level(LevelFilter::DEBUG)
- .init();
-
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
-
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
-
- let ret = registry.register(service_url);
-
- info!("register result: {:?}", ret);
-
- let sleep_millis = time::Duration::from_secs(300);
- thread::sleep(sleep_millis);
- }
-
- #[test]
- #[ignore]
- pub fn test_register_and_unregister() {
- tracing_subscriber::fmt()
- .with_thread_names(true)
- .with_file(true)
- .with_level(true)
- .with_line_number(true)
- .with_thread_ids(true)
- .with_max_level(LevelFilter::DEBUG)
- .init();
-
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
-
- let mut service_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
-
- let ret = registry.register(service_url);
-
- info!("register result: {:?}", ret);
-
- let sleep_millis = time::Duration::from_secs(10);
- thread::sleep(sleep_millis);
-
- let unregister_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- let ret = registry.unregister(unregister_url);
-
- info!("deregister result: {:?}", ret);
-
- let sleep_millis = time::Duration::from_secs(10);
- thread::sleep(sleep_millis);
- }
-
- struct TestNotifyListener;
- impl NotifyListener for TestNotifyListener {
- fn notify(&self, event: ServiceEvent) {
- info!("notified: {:?}", event.key);
- }
-
- fn notify_all(&self, event: ServiceEvent) {
- info!("notify_all: {:?}", event.key);
- }
- }
-
- #[test]
- #[ignore]
- fn test_subscribe() {
- tracing_subscriber::fmt()
- .with_thread_names(true)
- .with_file(true)
- .with_level(true)
- .with_line_number(true)
- .with_thread_ids(true)
- .with_max_level(LevelFilter::DEBUG)
- .init();
-
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
-
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
-
- let ret = registry.register(service_url);
-
- info!("register result: {:?}", ret);
-
- let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
-
- let ret = registry.subscribe(subscribe_url, Arc::new(TestNotifyListener));
-
- if let Err(e) = ret {
- error!("error message: {:?}", e);
- return;
- }
-
- let sleep_millis = time::Duration::from_secs(300);
- thread::sleep(sleep_millis);
- }
-
- #[test]
- #[ignore]
- fn test_unsubscribe() {
- tracing_subscriber::fmt()
- .with_thread_names(true)
- .with_file(true)
- .with_level(true)
- .with_line_number(true)
- .with_thread_ids(true)
- .with_max_level(LevelFilter::DEBUG)
- .init();
-
- let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
-
- let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- service_url
- .params
- .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
-
- let ret = registry.register(service_url);
-
- info!("register result: {:?}", ret);
-
- let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
-
- let listener = Arc::new(TestNotifyListener);
-
- let ret = registry.subscribe(subscribe_url, listener.clone());
-
- if let Err(e) = ret {
- error!("error message: {:?}", e);
- return;
- }
-
- let sleep_millis = time::Duration::from_secs(40);
- thread::sleep(sleep_millis);
-
- let unsubscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
- let ret = registry.unsubscribe(unsubscribe_url, listener.clone());
-
- if let Err(e) = ret {
- error!("error message: {:?}", e);
- return;
- }
-
- let sleep_millis = time::Duration::from_secs(40);
- thread::sleep(sleep_millis);
- }
-}
diff --git a/registry/zookeeper/Cargo.toml b/registry/zookeeper/Cargo.toml
index a6687d9..c4a4e59 100644
--- a/registry/zookeeper/Cargo.toml
+++ b/registry/zookeeper/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "dubbo-registry-zookeeper"
+name = "registry-zookeeper"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index ccfce10..9f7447b 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -15,13 +15,450 @@
* limitations under the License.
*/
-pub mod zookeeper_registry;
+#![allow(unused_variables, dead_code, missing_docs)]
+
+use std::{
+ collections::{HashMap, HashSet},
+ env,
+ sync::{Arc, Mutex, RwLock},
+ time::Duration,
+};
+
+use logger::tracing::{debug, error, info};
+use serde::{Deserialize, Serialize};
+#[allow(unused_imports)]
+use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+
+use dubbo::{
+ cluster::support::cluster_invoker::ClusterInvoker,
+ codegen::BoxRegistry,
+ common::{
+ consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
+ url::Url,
+ },
+ registry::{
+ integration::ClusterRegistryIntegration, memory_registry::MemoryRegistry, NotifyListener,
+ Registry, RegistryNotifyListener, ServiceEvent,
+ },
+ StdError,
+};
+
+// 从url中获取服务注册的元数据
+// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
+
+pub const REGISTRY_GROUP_KEY: &str = "registry.group";
+
+struct LoggingWatcher;
+impl Watcher for LoggingWatcher {
+ fn handle(&self, e: WatchedEvent) {
+ info!("{:?}", e)
+ }
+}
+
+//#[derive(Debug)]
+pub struct ZookeeperRegistry {
+ root_path: String,
+ zk_client: Arc<ZooKeeper>,
+ listeners: RwLock<HashMap<String, RegistryNotifyListener>>,
+ memory_registry: Arc<Mutex<MemoryRegistry>>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ZkServiceInstance {
+ name: String,
+ address: String,
+ port: i32,
+}
+
+impl ZkServiceInstance {
+ pub fn get_service_name(&self) -> &str {
+ self.name.as_str()
+ }
+
+ pub fn get_host(&self) -> &str {
+ self.address.as_str()
+ }
+
+ pub fn get_port(&self) -> i32 {
+ self.port
+ }
+}
+
+impl ZookeeperRegistry {
+ pub fn new(connect_string: &str) -> ZookeeperRegistry {
+ let zk_client =
+ ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
+ info!("zk server connect string: {}", connect_string);
+ ZookeeperRegistry {
+ root_path: "/services".to_string(),
+ zk_client: Arc::new(zk_client),
+ listeners: RwLock::new(HashMap::new()),
+ memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
+ }
+ }
+
+ fn create_listener(
+ &self,
+ path: String,
+ service_name: String,
+ listener: RegistryNotifyListener,
+ ) -> ServiceInstancesChangedListener {
+ let mut service_names = HashSet::new();
+ service_names.insert(service_name.clone());
+ ServiceInstancesChangedListener {
+ zk_client: Arc::clone(&self.zk_client),
+ path,
+ service_name: service_name.clone(),
+ listener,
+ }
+ }
+
+ // metadata /dubbo/mapping designed for application discovery; deprecated for currently using interface discovery
+ // #[deprecated]
+ fn get_app_name(&self, service_name: String) -> String {
+ let res = self
+ .zk_client
+ .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
+
+ let x = res.unwrap().0;
+ let s = match std::str::from_utf8(&x) {
+ Ok(v) => v,
+ Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
+ };
+ s.to_string()
+ }
+
+ pub fn get_client(&self) -> Arc<ZooKeeper> {
+ self.zk_client.clone()
+ }
+
+ // If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned.
+ pub fn create_path(
+ &self,
+ path: &str,
+ data: &str,
+ create_mode: CreateMode,
+ ) -> Result<(), StdError> {
+ if self.exists_path(path) {
+ self.zk_client
+ .set_data(path, data.as_bytes().to_vec(), None)
+ .unwrap_or_else(|_| panic!("set data to {} failed.", path));
+ return Ok(());
+ }
+ let zk_result = self.zk_client.create(
+ path,
+ data.as_bytes().to_vec(),
+ Acl::open_unsafe().clone(),
+ create_mode,
+ );
+ match zk_result {
+ Ok(_) => Ok(()),
+ Err(err) => {
+ error!("zk path {} parent not exists.", path);
+ Err(Box::try_from(err).unwrap())
+ }
+ }
+ }
+
+ // For avoiding Err(ZkError::NoNode) when parent node is't exists
+ pub fn create_path_with_parent_check(
+ &self,
+ path: &str,
+ data: &str,
+ create_mode: CreateMode,
+ ) -> Result<(), StdError> {
+ let nodes: Vec<&str> = path.split('/').collect();
+ let mut current: String = String::new();
+ let children = *nodes.last().unwrap();
+ for node_key in nodes {
+ if node_key.is_empty() {
+ continue;
+ };
+ current.push('/');
+ current.push_str(node_key);
+ if !self.exists_path(current.as_str()) {
+ let new_create_mode = match children == node_key {
+ true => create_mode,
+ false => CreateMode::Persistent,
+ };
+ let new_data = match children == node_key {
+ true => data,
+ false => "",
+ };
+ self.create_path(current.as_str(), new_data, new_create_mode)
+ .unwrap();
+ }
+ }
+ Ok(())
+ }
+
+ pub fn delete_path(&self, path: &str) {
+ if self.exists_path(path) {
+ self.get_client().delete(path, None).unwrap()
+ }
+ }
+
+ pub fn exists_path(&self, path: &str) -> bool {
+ self.zk_client.exists(path, false).unwrap().is_some()
+ }
+
+ pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
+ if self.exists_path(path) {
+ let zk_result = self.zk_client.get_data(path, watch);
+ if let Ok(..) = zk_result {
+ Some(String::from_utf8(zk_result.unwrap().0).unwrap())
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ }
+}
+
+impl Default for ZookeeperRegistry {
+ fn default() -> ZookeeperRegistry {
+ let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+ Ok(val) => val,
+ Err(_) => {
+ let default_connect_string = "localhost:2181";
+ info!(
+ "No ZOOKEEPER_SERVERS env value, using {} as default.",
+ default_connect_string
+ );
+ default_connect_string.to_string()
+ }
+ };
+ println!(
+ "using external registry with it's connect string {}",
+ zk_connect_string.as_str()
+ );
+ ZookeeperRegistry::new(zk_connect_string.as_str())
+ }
+}
+
+impl Registry for ZookeeperRegistry {
+ fn register(&mut self, url: Url) -> Result<(), StdError> {
+ debug!("register url: {}", url);
+ let zk_path = format!(
+ "/{}/{}/{}/{}",
+ DUBBO_KEY,
+ url.service_name,
+ PROVIDERS_KEY,
+ url.encoded_raw_url_string()
+ );
+ self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
+ Ok(())
+ }
+
+ fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+ let zk_path = format!(
+ "/{}/{}/{}/{}",
+ DUBBO_KEY,
+ url.service_name,
+ PROVIDERS_KEY,
+ url.encoded_raw_url_string()
+ );
+ self.delete_path(zk_path.as_str());
+ Ok(())
+ }
+
+ // for consumer to find the changes of providers
+ fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
+ let service_name = url.get_service_name();
+ let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
+ if self
+ .listeners
+ .read()
+ .unwrap()
+ .get(service_name.as_str())
+ .is_some()
+ {
+ return Ok(());
+ }
+
+ self.listeners
+ .write()
+ .unwrap()
+ .insert(service_name.to_string(), listener.clone());
+
+ let zk_listener =
+ self.create_listener(zk_path.clone(), service_name.to_string(), listener.clone());
+
+ let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
+ let result = match zk_changed_paths {
+ Err(err) => {
+ error!("zk subscribe error: {}", err);
+ Vec::new()
+ }
+ Ok(urls) => urls
+ .iter()
+ .map(|node_key| {
+ let provider_url: Url = urlencoding::decode(node_key)
+ .unwrap()
+ .to_string()
+ .as_str()
+ .into();
+ provider_url
+ })
+ .collect(),
+ };
+ info!("notifying {}->{:?}", service_name, result);
+ listener.notify(ServiceEvent {
+ key: service_name,
+ action: String::from("ADD"),
+ service: result,
+ });
+ Ok(())
+ }
+
+ fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
+ todo!()
+ }
+}
+
+pub struct ServiceInstancesChangedListener {
+ zk_client: Arc<ZooKeeper>,
+ path: String,
+ service_name: String,
+ listener: RegistryNotifyListener,
+}
+
+impl Watcher for ServiceInstancesChangedListener {
+ fn handle(&self, event: WatchedEvent) {
+ if let (WatchedEventType::NodeChildrenChanged, Some(path)) = (event.event_type, event.path)
+ {
+ let event_path = path.clone();
+ let dirs = self
+ .zk_client
+ .get_children(&event_path, false)
+ .expect("msg");
+ let result: Vec<Url> = dirs
+ .iter()
+ .map(|node_key| {
+ let provider_url: Url = node_key.as_str().into();
+ provider_url
+ })
+ .collect();
+ let res = self.zk_client.get_children_w(
+ &path,
+ ServiceInstancesChangedListener {
+ zk_client: Arc::clone(&self.zk_client),
+ path: path.clone(),
+ service_name: self.service_name.clone(),
+ listener: Arc::clone(&self.listener),
+ },
+ );
+
+ info!("notify {}->{:?}", self.service_name, result);
+ self.listener.notify(ServiceEvent {
+ key: self.service_name.clone(),
+ action: String::from("ADD"),
+ service: result,
+ });
+ }
+ }
+}
+
+impl NotifyListener for ServiceInstancesChangedListener {
+ fn notify(&self, event: ServiceEvent) {
+ self.listener.notify(event);
+ }
+
+ fn notify_all(&self, event: ServiceEvent) {
+ self.listener.notify(event);
+ }
+}
+
+impl ClusterRegistryIntegration for ZookeeperRegistry {
+ fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
+ todo!()
+ }
+}
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
+ use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
+
+ use crate::zookeeper_registry::ZookeeperRegistry;
+
+ struct TestZkWatcher {
+ pub watcher: Arc<Option<TestZkWatcher>>,
+ }
+
+ impl Watcher for TestZkWatcher {
+ fn handle(&self, event: WatchedEvent) {
+ println!("event: {:?}", event);
+ }
+ }
+
#[test]
- fn it_works() {
- let result = 2 + 2;
- assert_eq!(result, 4);
+ fn zk_read_write_watcher() {
+ // https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
+ // using ENV to set zookeeper server urls
+ let zkr = ZookeeperRegistry::default();
+ let zk_client = zkr.get_client();
+ let watcher = TestZkWatcher {
+ watcher: Arc::new(None),
+ };
+ if zk_client.exists("/test", true).is_err() {
+ zk_client
+ .create(
+ "/test",
+ vec![1, 3],
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ )
+ .unwrap();
+ }
+ let zk_res = zk_client.create(
+ "/test",
+ "hello".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let result = zk_client.get_children_w("/test", watcher);
+ assert!(result.is_ok());
+ if zk_client.exists("/test/a", true).is_err() {
+ zk_client.delete("/test/a", None).unwrap();
+ }
+ if zk_client.exists("/test/a", true).is_err() {
+ zk_client.delete("/test/b", None).unwrap();
+ }
+ let zk_res = zk_client.create(
+ "/test/a",
+ "hello".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let zk_res = zk_client.create(
+ "/test/b",
+ "world".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let test_a_result = zk_client.get_data("/test", true);
+ assert!(test_a_result.is_ok());
+ let vec1 = test_a_result.unwrap().0;
+ // data in /test should equals to "hello"
+ assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
+ zk_client.close().unwrap()
+ }
+
+ #[test]
+ fn create_path_with_parent_check() {
+ let zkr = ZookeeperRegistry::default();
+ let path = "/du1bbo/test11111";
+ let data = "hello";
+ // creating a child on a not exists parent, throw a NoNode error.
+ // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
+ // assert!(result.is_err());
+ let create_with_parent_check_result =
+ zkr.create_path_with_parent_check(path, data, CreateMode::Ephemeral);
+ assert!(create_with_parent_check_result.is_ok());
+ assert_eq!(data, zkr.get_data(path, false).unwrap());
}
}
diff --git a/registry/zookeeper/src/zookeeper_registry.rs b/registry/zookeeper/src/zookeeper_registry.rs
deleted file mode 100644
index 9f7447b..0000000
--- a/registry/zookeeper/src/zookeeper_registry.rs
+++ /dev/null
@@ -1,464 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#![allow(unused_variables, dead_code, missing_docs)]
-
-use std::{
- collections::{HashMap, HashSet},
- env,
- sync::{Arc, Mutex, RwLock},
- time::Duration,
-};
-
-use logger::tracing::{debug, error, info};
-use serde::{Deserialize, Serialize};
-#[allow(unused_imports)]
-use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
-
-use dubbo::{
- cluster::support::cluster_invoker::ClusterInvoker,
- codegen::BoxRegistry,
- common::{
- consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
- url::Url,
- },
- registry::{
- integration::ClusterRegistryIntegration, memory_registry::MemoryRegistry, NotifyListener,
- Registry, RegistryNotifyListener, ServiceEvent,
- },
- StdError,
-};
-
-// 从url中获取服务注册的元数据
-// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
-// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
-
-pub const REGISTRY_GROUP_KEY: &str = "registry.group";
-
-struct LoggingWatcher;
-impl Watcher for LoggingWatcher {
- fn handle(&self, e: WatchedEvent) {
- info!("{:?}", e)
- }
-}
-
-//#[derive(Debug)]
-pub struct ZookeeperRegistry {
- root_path: String,
- zk_client: Arc<ZooKeeper>,
- listeners: RwLock<HashMap<String, RegistryNotifyListener>>,
- memory_registry: Arc<Mutex<MemoryRegistry>>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ZkServiceInstance {
- name: String,
- address: String,
- port: i32,
-}
-
-impl ZkServiceInstance {
- pub fn get_service_name(&self) -> &str {
- self.name.as_str()
- }
-
- pub fn get_host(&self) -> &str {
- self.address.as_str()
- }
-
- pub fn get_port(&self) -> i32 {
- self.port
- }
-}
-
-impl ZookeeperRegistry {
- pub fn new(connect_string: &str) -> ZookeeperRegistry {
- let zk_client =
- ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
- info!("zk server connect string: {}", connect_string);
- ZookeeperRegistry {
- root_path: "/services".to_string(),
- zk_client: Arc::new(zk_client),
- listeners: RwLock::new(HashMap::new()),
- memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
- }
- }
-
- fn create_listener(
- &self,
- path: String,
- service_name: String,
- listener: RegistryNotifyListener,
- ) -> ServiceInstancesChangedListener {
- let mut service_names = HashSet::new();
- service_names.insert(service_name.clone());
- ServiceInstancesChangedListener {
- zk_client: Arc::clone(&self.zk_client),
- path,
- service_name: service_name.clone(),
- listener,
- }
- }
-
- // metadata /dubbo/mapping designed for application discovery; deprecated for currently using interface discovery
- // #[deprecated]
- fn get_app_name(&self, service_name: String) -> String {
- let res = self
- .zk_client
- .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
-
- let x = res.unwrap().0;
- let s = match std::str::from_utf8(&x) {
- Ok(v) => v,
- Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
- };
- s.to_string()
- }
-
- pub fn get_client(&self) -> Arc<ZooKeeper> {
- self.zk_client.clone()
- }
-
- // If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned.
- pub fn create_path(
- &self,
- path: &str,
- data: &str,
- create_mode: CreateMode,
- ) -> Result<(), StdError> {
- if self.exists_path(path) {
- self.zk_client
- .set_data(path, data.as_bytes().to_vec(), None)
- .unwrap_or_else(|_| panic!("set data to {} failed.", path));
- return Ok(());
- }
- let zk_result = self.zk_client.create(
- path,
- data.as_bytes().to_vec(),
- Acl::open_unsafe().clone(),
- create_mode,
- );
- match zk_result {
- Ok(_) => Ok(()),
- Err(err) => {
- error!("zk path {} parent not exists.", path);
- Err(Box::try_from(err).unwrap())
- }
- }
- }
-
- // For avoiding Err(ZkError::NoNode) when parent node is't exists
- pub fn create_path_with_parent_check(
- &self,
- path: &str,
- data: &str,
- create_mode: CreateMode,
- ) -> Result<(), StdError> {
- let nodes: Vec<&str> = path.split('/').collect();
- let mut current: String = String::new();
- let children = *nodes.last().unwrap();
- for node_key in nodes {
- if node_key.is_empty() {
- continue;
- };
- current.push('/');
- current.push_str(node_key);
- if !self.exists_path(current.as_str()) {
- let new_create_mode = match children == node_key {
- true => create_mode,
- false => CreateMode::Persistent,
- };
- let new_data = match children == node_key {
- true => data,
- false => "",
- };
- self.create_path(current.as_str(), new_data, new_create_mode)
- .unwrap();
- }
- }
- Ok(())
- }
-
- pub fn delete_path(&self, path: &str) {
- if self.exists_path(path) {
- self.get_client().delete(path, None).unwrap()
- }
- }
-
- pub fn exists_path(&self, path: &str) -> bool {
- self.zk_client.exists(path, false).unwrap().is_some()
- }
-
- pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
- if self.exists_path(path) {
- let zk_result = self.zk_client.get_data(path, watch);
- if let Ok(..) = zk_result {
- Some(String::from_utf8(zk_result.unwrap().0).unwrap())
- } else {
- None
- }
- } else {
- None
- }
- }
-}
-
-impl Default for ZookeeperRegistry {
- fn default() -> ZookeeperRegistry {
- let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
- Ok(val) => val,
- Err(_) => {
- let default_connect_string = "localhost:2181";
- info!(
- "No ZOOKEEPER_SERVERS env value, using {} as default.",
- default_connect_string
- );
- default_connect_string.to_string()
- }
- };
- println!(
- "using external registry with it's connect string {}",
- zk_connect_string.as_str()
- );
- ZookeeperRegistry::new(zk_connect_string.as_str())
- }
-}
-
-impl Registry for ZookeeperRegistry {
- fn register(&mut self, url: Url) -> Result<(), StdError> {
- debug!("register url: {}", url);
- let zk_path = format!(
- "/{}/{}/{}/{}",
- DUBBO_KEY,
- url.service_name,
- PROVIDERS_KEY,
- url.encoded_raw_url_string()
- );
- self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
- Ok(())
- }
-
- fn unregister(&mut self, url: Url) -> Result<(), StdError> {
- let zk_path = format!(
- "/{}/{}/{}/{}",
- DUBBO_KEY,
- url.service_name,
- PROVIDERS_KEY,
- url.encoded_raw_url_string()
- );
- self.delete_path(zk_path.as_str());
- Ok(())
- }
-
- // for consumer to find the changes of providers
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
- let service_name = url.get_service_name();
- let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
- if self
- .listeners
- .read()
- .unwrap()
- .get(service_name.as_str())
- .is_some()
- {
- return Ok(());
- }
-
- self.listeners
- .write()
- .unwrap()
- .insert(service_name.to_string(), listener.clone());
-
- let zk_listener =
- self.create_listener(zk_path.clone(), service_name.to_string(), listener.clone());
-
- let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
- let result = match zk_changed_paths {
- Err(err) => {
- error!("zk subscribe error: {}", err);
- Vec::new()
- }
- Ok(urls) => urls
- .iter()
- .map(|node_key| {
- let provider_url: Url = urlencoding::decode(node_key)
- .unwrap()
- .to_string()
- .as_str()
- .into();
- provider_url
- })
- .collect(),
- };
- info!("notifying {}->{:?}", service_name, result);
- listener.notify(ServiceEvent {
- key: service_name,
- action: String::from("ADD"),
- service: result,
- });
- Ok(())
- }
-
- fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
- todo!()
- }
-}
-
-pub struct ServiceInstancesChangedListener {
- zk_client: Arc<ZooKeeper>,
- path: String,
- service_name: String,
- listener: RegistryNotifyListener,
-}
-
-impl Watcher for ServiceInstancesChangedListener {
- fn handle(&self, event: WatchedEvent) {
- if let (WatchedEventType::NodeChildrenChanged, Some(path)) = (event.event_type, event.path)
- {
- let event_path = path.clone();
- let dirs = self
- .zk_client
- .get_children(&event_path, false)
- .expect("msg");
- let result: Vec<Url> = dirs
- .iter()
- .map(|node_key| {
- let provider_url: Url = node_key.as_str().into();
- provider_url
- })
- .collect();
- let res = self.zk_client.get_children_w(
- &path,
- ServiceInstancesChangedListener {
- zk_client: Arc::clone(&self.zk_client),
- path: path.clone(),
- service_name: self.service_name.clone(),
- listener: Arc::clone(&self.listener),
- },
- );
-
- info!("notify {}->{:?}", self.service_name, result);
- self.listener.notify(ServiceEvent {
- key: self.service_name.clone(),
- action: String::from("ADD"),
- service: result,
- });
- }
- }
-}
-
-impl NotifyListener for ServiceInstancesChangedListener {
- fn notify(&self, event: ServiceEvent) {
- self.listener.notify(event);
- }
-
- fn notify_all(&self, event: ServiceEvent) {
- self.listener.notify(event);
- }
-}
-
-impl ClusterRegistryIntegration for ZookeeperRegistry {
- fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
- todo!()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::sync::Arc;
-
- use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
-
- use crate::zookeeper_registry::ZookeeperRegistry;
-
- struct TestZkWatcher {
- pub watcher: Arc<Option<TestZkWatcher>>,
- }
-
- impl Watcher for TestZkWatcher {
- fn handle(&self, event: WatchedEvent) {
- println!("event: {:?}", event);
- }
- }
-
- #[test]
- fn zk_read_write_watcher() {
- // https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
- // using ENV to set zookeeper server urls
- let zkr = ZookeeperRegistry::default();
- let zk_client = zkr.get_client();
- let watcher = TestZkWatcher {
- watcher: Arc::new(None),
- };
- if zk_client.exists("/test", true).is_err() {
- zk_client
- .create(
- "/test",
- vec![1, 3],
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- )
- .unwrap();
- }
- let zk_res = zk_client.create(
- "/test",
- "hello".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let result = zk_client.get_children_w("/test", watcher);
- assert!(result.is_ok());
- if zk_client.exists("/test/a", true).is_err() {
- zk_client.delete("/test/a", None).unwrap();
- }
- if zk_client.exists("/test/a", true).is_err() {
- zk_client.delete("/test/b", None).unwrap();
- }
- let zk_res = zk_client.create(
- "/test/a",
- "hello".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let zk_res = zk_client.create(
- "/test/b",
- "world".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let test_a_result = zk_client.get_data("/test", true);
- assert!(test_a_result.is_ok());
- let vec1 = test_a_result.unwrap().0;
- // data in /test should equals to "hello"
- assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
- zk_client.close().unwrap()
- }
-
- #[test]
- fn create_path_with_parent_check() {
- let zkr = ZookeeperRegistry::default();
- let path = "/du1bbo/test11111";
- let data = "hello";
- // creating a child on a not exists parent, throw a NoNode error.
- // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
- // assert!(result.is_err());
- let create_with_parent_check_result =
- zkr.create_path_with_parent_check(path, data, CreateMode::Ephemeral);
- assert!(create_with_parent_check_result.is_ok());
- assert_eq!(data, zkr.get_data(path, false).unwrap());
- }
-}
diff --git a/remoting/README.md b/remoting/README.md
new file mode 100644
index 0000000..4d247cc
--- /dev/null
+++ b/remoting/README.md
@@ -0,0 +1,6 @@
+/remoting
+ /net # for dubbo2
+ /http # on top of net, for hessian/rest/grpc/triple protocol
+ /exchange # convert tcp/http to invocation or invoker for protocol layer
+ /zookeeper # zk client for registry and config center
+ /nacos # nacos client for registry and config center
\ No newline at end of file
diff --git a/remoting/exchange/Cargo.toml b/remoting/exchange/Cargo.toml
new file mode 100644
index 0000000..ec14668
--- /dev/null
+++ b/remoting/exchange/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remoting-exchange"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
diff --git a/xds/LICENSE b/remoting/exchange/LICENSE
similarity index 100%
copy from xds/LICENSE
copy to remoting/exchange/LICENSE
diff --git a/xds/src/lib.rs b/remoting/exchange/src/lib.rs
similarity index 87%
copy from xds/src/lib.rs
copy to remoting/exchange/src/lib.rs
index 3e01853..d64452d 100644
--- a/xds/src/lib.rs
+++ b/remoting/exchange/src/lib.rs
@@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn it_works() {
- let result = 2 + 2;
+ let result = add(2, 2);
assert_eq!(result, 4);
}
}
diff --git a/remoting/h2/Cargo.toml b/remoting/h2/Cargo.toml
new file mode 100644
index 0000000..12a2804
--- /dev/null
+++ b/remoting/h2/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remoting-h2"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
diff --git a/xds/LICENSE b/remoting/h2/LICENSE
similarity index 100%
copy from xds/LICENSE
copy to remoting/h2/LICENSE
diff --git a/xds/src/lib.rs b/remoting/h2/src/lib.rs
similarity index 87%
copy from xds/src/lib.rs
copy to remoting/h2/src/lib.rs
index 3e01853..d64452d 100644
--- a/xds/src/lib.rs
+++ b/remoting/h2/src/lib.rs
@@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn it_works() {
- let result = 2 + 2;
+ let result = add(2, 2);
assert_eq!(result, 4);
}
}
diff --git a/remoting/http/Cargo.toml b/remoting/http/Cargo.toml
new file mode 100644
index 0000000..2fe07cd
--- /dev/null
+++ b/remoting/http/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remoting-http"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
diff --git a/xds/LICENSE b/remoting/http/LICENSE
similarity index 100%
copy from xds/LICENSE
copy to remoting/http/LICENSE
diff --git a/xds/src/lib.rs b/remoting/http/src/lib.rs
similarity index 87%
copy from xds/src/lib.rs
copy to remoting/http/src/lib.rs
index 3e01853..d64452d 100644
--- a/xds/src/lib.rs
+++ b/remoting/http/src/lib.rs
@@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn it_works() {
- let result = 2 + 2;
+ let result = add(2, 2);
assert_eq!(result, 4);
}
}
diff --git a/remoting/net/Cargo.toml b/remoting/net/Cargo.toml
index c62082e..1e1682e 100644
--- a/remoting/net/Cargo.toml
+++ b/remoting/net/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "net"
+name = "remoting-net"
version = "0.1.0"
edition = "2021"
diff --git a/xds/Cargo.toml b/remoting/xds/Cargo.toml
similarity index 88%
rename from xds/Cargo.toml
rename to remoting/xds/Cargo.toml
index 3918953..8bf8a3b 100644
--- a/xds/Cargo.toml
+++ b/remoting/xds/Cargo.toml
@@ -3,7 +3,7 @@
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"
-description = "dubbo-rust-xds"
+description = "remoting-xds"
repository = "https://github.com/apache/dubbo-rust.git"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
diff --git a/xds/LICENSE b/remoting/xds/LICENSE
similarity index 100%
rename from xds/LICENSE
rename to remoting/xds/LICENSE
diff --git a/xds/src/client/client.rs b/remoting/xds/src/client/client.rs
similarity index 100%
rename from xds/src/client/client.rs
rename to remoting/xds/src/client/client.rs
diff --git a/xds/src/client/mod.rs b/remoting/xds/src/client/mod.rs
similarity index 100%
rename from xds/src/client/mod.rs
rename to remoting/xds/src/client/mod.rs
diff --git a/xds/src/lib.rs b/remoting/xds/src/lib.rs
similarity index 98%
rename from xds/src/lib.rs
rename to remoting/xds/src/lib.rs
index 3e01853..eba5902 100644
--- a/xds/src/lib.rs
+++ b/remoting/xds/src/lib.rs
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+mod client;
+
#[cfg(test)]
mod tests {
#[test]
diff --git a/xds/src/server/mod.rs b/remoting/xds/src/server/mod.rs
similarity index 100%
rename from xds/src/server/mod.rs
rename to remoting/xds/src/server/mod.rs
diff --git a/xds/src/server/server.rs b/remoting/xds/src/server/server.rs
similarity index 100%
rename from xds/src/server/server.rs
rename to remoting/xds/src/server/server.rs
diff --git a/remoting/zookeeper/Cargo.toml b/remoting/zookeeper/Cargo.toml
new file mode 100644
index 0000000..eaf2b6c
--- /dev/null
+++ b/remoting/zookeeper/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remoting-zookeeper"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
diff --git a/xds/LICENSE b/remoting/zookeeper/LICENSE
similarity index 100%
copy from xds/LICENSE
copy to remoting/zookeeper/LICENSE
diff --git a/xds/src/lib.rs b/remoting/zookeeper/src/lib.rs
similarity index 87%
copy from xds/src/lib.rs
copy to remoting/zookeeper/src/lib.rs
index 3e01853..d64452d 100644
--- a/xds/src/lib.rs
+++ b/remoting/zookeeper/src/lib.rs
@@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn it_works() {
- let result = 2 + 2;
+ let result = add(2, 2);
assert_eq!(result, 4);
}
}