| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #![allow(unused_variables, dead_code, missing_docs)] |
| |
| use std::collections::HashMap; |
| use std::collections::HashSet; |
| use std::env; |
| use std::sync::RwLock; |
| use std::sync::{Arc, Mutex}; |
| use std::time::Duration; |
| |
| use serde::{Deserialize, Serialize}; |
| use tracing::{error, info}; |
| #[allow(unused_imports)] |
| use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper}; |
| |
| use dubbo::cluster::support::cluster_invoker::ClusterInvoker; |
| use dubbo::codegen::BoxRegistry; |
| use dubbo::common::consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY}; |
| use dubbo::common::url::Url; |
| use dubbo::registry::integration::ClusterRegistryIntegration; |
| use dubbo::registry::memory_registry::{MemoryNotifyListener, MemoryRegistry}; |
| use dubbo::registry::NotifyListener; |
| use dubbo::registry::Registry; |
| use dubbo::registry::ServiceEvent; |
| use dubbo::StdError; |
| |
| // 从url中获取服务注册的元数据 |
| // rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s) |
| // dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER]) |
| |
| pub const REGISTRY_GROUP_KEY: &str = "registry.group"; |
| |
| struct LoggingWatcher; |
| impl Watcher for LoggingWatcher { |
| fn handle(&self, e: WatchedEvent) { |
| println!("{:?}", e) |
| } |
| } |
| |
| //#[derive(Debug)] |
| pub struct ZookeeperRegistry { |
| root_path: String, |
| zk_client: Arc<ZooKeeper>, |
| listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as Registry>::NotifyListener>>>, |
| memory_registry: Arc<Mutex<MemoryRegistry>>, |
| } |
| |
| #[derive(Serialize, Deserialize, Debug)] |
| pub struct ZkServiceInstance { |
| name: String, |
| address: String, |
| port: i32, |
| } |
| |
| impl ZkServiceInstance { |
| pub fn get_service_name(&self) -> &str { |
| self.name.as_str() |
| } |
| |
| pub fn get_host(&self) -> &str { |
| self.address.as_str() |
| } |
| |
| pub fn get_port(&self) -> i32 { |
| self.port |
| } |
| } |
| |
| impl ZookeeperRegistry { |
| pub fn new(connect_string: &str) -> ZookeeperRegistry { |
| let zk_client = |
| ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap(); |
| info!("zk server connect string: {}", connect_string); |
| ZookeeperRegistry { |
| root_path: "/services".to_string(), |
| zk_client: Arc::new(zk_client), |
| listeners: RwLock::new(HashMap::new()), |
| memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())), |
| } |
| } |
| |
| fn create_listener( |
| &self, |
| path: String, |
| service_name: String, |
| listener: Arc<<ZookeeperRegistry as Registry>::NotifyListener>, |
| ) -> ServiceInstancesChangedListener { |
| let mut service_names = HashSet::new(); |
| service_names.insert(service_name.clone()); |
| ServiceInstancesChangedListener { |
| zk_client: Arc::clone(&self.zk_client), |
| path, |
| service_name: service_name.clone(), |
| listener, |
| } |
| } |
| |
| // metadata /dubbo/mapping designed for application discovery; deprecated for currently using interface discovery |
| // #[deprecated] |
| fn get_app_name(&self, service_name: String) -> String { |
| let res = self |
| .zk_client |
| .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false); |
| |
| let x = res.unwrap().0; |
| let s = match std::str::from_utf8(&x) { |
| Ok(v) => v, |
| Err(e) => panic!("Invalid UTF-8 sequence: {}", e), |
| }; |
| s.to_string() |
| } |
| |
| pub fn get_client(&self) -> Arc<ZooKeeper> { |
| self.zk_client.clone() |
| } |
| |
| // If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned. |
| pub fn create_path( |
| &self, |
| path: &str, |
| data: &str, |
| create_mode: CreateMode, |
| ) -> Result<(), StdError> { |
| if self.exists_path(path) { |
| self.zk_client |
| .set_data(path, data.as_bytes().to_vec(), None) |
| .unwrap_or_else(|_| panic!("set data to {} failed.", path)); |
| return Ok(()); |
| } |
| let zk_result = self.zk_client.create( |
| path, |
| data.as_bytes().to_vec(), |
| Acl::open_unsafe().clone(), |
| create_mode, |
| ); |
| match zk_result { |
| Ok(_) => Ok(()), |
| Err(err) => { |
| error!("zk path {} parent not exists.", path); |
| Err(Box::try_from(err).unwrap()) |
| } |
| } |
| } |
| |
| // For avoiding Err(ZkError::NoNode) when parent node is't exists |
| pub fn create_path_with_parent_check( |
| &self, |
| path: &str, |
| data: &str, |
| create_mode: CreateMode, |
| ) -> Result<(), StdError> { |
| let nodes: Vec<&str> = path.split('/').collect(); |
| let mut current: String = String::new(); |
| let children = *nodes.last().unwrap(); |
| for node_key in nodes { |
| if node_key.is_empty() { |
| continue; |
| }; |
| current.push('/'); |
| current.push_str(node_key); |
| if !self.exists_path(current.as_str()) { |
| let new_create_mode = match children == node_key { |
| true => create_mode, |
| false => CreateMode::Persistent, |
| }; |
| let new_data = match children == node_key { |
| true => data, |
| false => "", |
| }; |
| self.create_path(current.as_str(), new_data, new_create_mode) |
| .unwrap(); |
| } |
| } |
| Ok(()) |
| } |
| |
| pub fn delete_path(&self, path: &str) { |
| if self.exists_path(path) { |
| self.get_client().delete(path, None).unwrap() |
| } |
| } |
| |
| pub fn exists_path(&self, path: &str) -> bool { |
| self.zk_client.exists(path, false).unwrap().is_some() |
| } |
| |
| pub fn get_data(&self, path: &str, watch: bool) -> Option<String> { |
| if self.exists_path(path) { |
| let zk_result = self.zk_client.get_data(path, watch); |
| if let Ok(..) = zk_result { |
| Some(String::from_utf8(zk_result.unwrap().0).unwrap()) |
| } else { |
| None |
| } |
| } else { |
| None |
| } |
| } |
| } |
| |
| impl Default for ZookeeperRegistry { |
| fn default() -> ZookeeperRegistry { |
| let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") { |
| Ok(val) => val, |
| Err(_) => { |
| let default_connect_string = "localhost:2181"; |
| info!( |
| "No ZOOKEEPER_SERVERS env value, using {} as default.", |
| default_connect_string |
| ); |
| default_connect_string.to_string() |
| } |
| }; |
| info!( |
| "using external registry with it's connect string {}", |
| zk_connect_string.as_str() |
| ); |
| ZookeeperRegistry::new(zk_connect_string.as_str()) |
| } |
| } |
| |
| impl Registry for ZookeeperRegistry { |
| type NotifyListener = MemoryNotifyListener; |
| |
| fn register(&mut self, url: Url) -> Result<(), StdError> { |
| let zk_path = format!( |
| "/{}/{}/{}/{}", |
| DUBBO_KEY, |
| url.service_name, |
| PROVIDERS_KEY, |
| url.encoded_raw_url_string() |
| ); |
| self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?; |
| Ok(()) |
| } |
| |
| fn unregister(&mut self, url: Url) -> Result<(), StdError> { |
| let zk_path = format!( |
| "/{}/{}/{}/{}", |
| DUBBO_KEY, |
| url.service_name, |
| PROVIDERS_KEY, |
| url.encoded_raw_url_string() |
| ); |
| self.delete_path(zk_path.as_str()); |
| Ok(()) |
| } |
| |
| // for consumer to find the changes of providers |
| fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> { |
| let service_name = url.get_service_name(); |
| let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY); |
| if self |
| .listeners |
| .read() |
| .unwrap() |
| .get(service_name.as_str()) |
| .is_some() |
| { |
| return Ok(()); |
| } |
| |
| let arc_listener = Arc::new(listener); |
| self.listeners |
| .write() |
| .unwrap() |
| .insert(service_name.to_string(), Arc::clone(&arc_listener)); |
| |
| let zk_listener = self.create_listener( |
| zk_path.clone(), |
| service_name.to_string(), |
| Arc::clone(&arc_listener), |
| ); |
| |
| let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener); |
| let result = match zk_changed_paths { |
| Err(err) => { |
| error!("zk subscribe error: {}", err); |
| Vec::new() |
| } |
| Ok(urls) => urls |
| .iter() |
| .map(|node_key| { |
| let provider_url: Url = urlencoding::decode(node_key) |
| .unwrap() |
| .to_string() |
| .as_str() |
| .into(); |
| provider_url |
| }) |
| .collect(), |
| }; |
| info!("notifying {}->{:?}", service_name, result); |
| arc_listener.notify(ServiceEvent { |
| key: service_name, |
| action: String::from("ADD"), |
| service: result, |
| }); |
| Ok(()) |
| } |
| |
| fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> { |
| todo!() |
| } |
| } |
| |
| pub struct ServiceInstancesChangedListener { |
| zk_client: Arc<ZooKeeper>, |
| path: String, |
| service_name: String, |
| listener: Arc<MemoryNotifyListener>, |
| } |
| |
| impl Watcher for ServiceInstancesChangedListener { |
| fn handle(&self, event: WatchedEvent) { |
| if let (WatchedEventType::NodeChildrenChanged, Some(path)) = (event.event_type, event.path) |
| { |
| let event_path = path.clone(); |
| let dirs = self |
| .zk_client |
| .get_children(&event_path, false) |
| .expect("msg"); |
| let result: Vec<Url> = dirs |
| .iter() |
| .map(|node_key| { |
| let provider_url: Url = node_key.as_str().into(); |
| provider_url |
| }) |
| .collect(); |
| let res = self.zk_client.get_children_w( |
| &path, |
| ServiceInstancesChangedListener { |
| zk_client: Arc::clone(&self.zk_client), |
| path: path.clone(), |
| service_name: self.service_name.clone(), |
| listener: Arc::clone(&self.listener), |
| }, |
| ); |
| |
| info!("notify {}->{:?}", self.service_name, result); |
| self.listener.notify(ServiceEvent { |
| key: self.service_name.clone(), |
| action: String::from("ADD"), |
| service: result, |
| }); |
| } |
| } |
| } |
| |
| impl NotifyListener for ServiceInstancesChangedListener { |
| fn notify(&self, event: ServiceEvent) { |
| self.listener.notify(event); |
| } |
| |
| fn notify_all(&self, event: ServiceEvent) { |
| self.listener.notify(event); |
| } |
| } |
| |
| impl ClusterRegistryIntegration for ZookeeperRegistry { |
| fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> { |
| todo!() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use std::sync::Arc; |
| |
| use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher}; |
| |
| use crate::zookeeper_registry::ZookeeperRegistry; |
| |
| struct TestZkWatcher { |
| pub watcher: Arc<Option<TestZkWatcher>>, |
| } |
| |
| impl Watcher for TestZkWatcher { |
| fn handle(&self, event: WatchedEvent) { |
| println!("event: {:?}", event); |
| } |
| } |
| |
| #[test] |
| fn zk_read_write_watcher() { |
| // https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs |
| // using ENV to set zookeeper server urls |
| let zkr = ZookeeperRegistry::default(); |
| let zk_client = zkr.get_client(); |
| let watcher = TestZkWatcher { |
| watcher: Arc::new(None), |
| }; |
| if zk_client.exists("/test", true).is_err() { |
| zk_client |
| .create( |
| "/test", |
| vec![1, 3], |
| Acl::open_unsafe().clone(), |
| CreateMode::Ephemeral, |
| ) |
| .unwrap(); |
| } |
| let zk_res = zk_client.create( |
| "/test", |
| "hello".into(), |
| Acl::open_unsafe().clone(), |
| CreateMode::Ephemeral, |
| ); |
| let result = zk_client.get_children_w("/test", watcher); |
| assert!(result.is_ok()); |
| if zk_client.exists("/test/a", true).is_err() { |
| zk_client.delete("/test/a", None).unwrap(); |
| } |
| if zk_client.exists("/test/a", true).is_err() { |
| zk_client.delete("/test/b", None).unwrap(); |
| } |
| let zk_res = zk_client.create( |
| "/test/a", |
| "hello".into(), |
| Acl::open_unsafe().clone(), |
| CreateMode::Ephemeral, |
| ); |
| let zk_res = zk_client.create( |
| "/test/b", |
| "world".into(), |
| Acl::open_unsafe().clone(), |
| CreateMode::Ephemeral, |
| ); |
| let test_a_result = zk_client.get_data("/test", true); |
| assert!(test_a_result.is_ok()); |
| let vec1 = test_a_result.unwrap().0; |
| // data in /test should equals to "hello" |
| assert_eq!(String::from_utf8(vec1).unwrap(), "hello"); |
| zk_client.close().unwrap() |
| } |
| |
| #[test] |
| fn create_path_with_parent_check() { |
| let zkr = ZookeeperRegistry::default(); |
| let path = "/du1bbo/test11111"; |
| let data = "hello"; |
| // creating a child on a not exists parent, throw a NoNode error. |
| // let result = zkr.create_path(path, data, CreateMode::Ephemeral); |
| // assert!(result.is_err()); |
| let create_with_parent_check_result = |
| zkr.create_path_with_parent_check(path, data, CreateMode::Ephemeral); |
| assert!(create_with_parent_check_result.is_ok()); |
| assert_eq!(data, zkr.get_data(path, false).unwrap()); |
| } |
| } |