blob: ad966387b88ae25a194384ecebc494ccb1b4122f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#![allow(unused_variables, dead_code, missing_docs)]
use std::collections::HashMap;
use std::collections::HashSet;
use std::env;
use std::sync::RwLock;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tracing::{error, info};
#[allow(unused_imports)]
use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
use dubbo::cluster::support::cluster_invoker::ClusterInvoker;
use dubbo::codegen::BoxRegistry;
use dubbo::common::consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY};
use dubbo::common::url::Url;
use dubbo::registry::integration::ClusterRegistryIntegration;
use dubbo::registry::memory_registry::{MemoryNotifyListener, MemoryRegistry};
use dubbo::registry::NotifyListener;
use dubbo::registry::Registry;
use dubbo::registry::ServiceEvent;
use dubbo::StdError;
// 从url中获取服务注册的元数据
// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
pub const REGISTRY_GROUP_KEY: &str = "registry.group";
struct LoggingWatcher;
impl Watcher for LoggingWatcher {
fn handle(&self, e: WatchedEvent) {
println!("{:?}", e)
}
}
//#[derive(Debug)]
pub struct ZookeeperRegistry {
root_path: String,
zk_client: Arc<ZooKeeper>,
listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as Registry>::NotifyListener>>>,
memory_registry: Arc<Mutex<MemoryRegistry>>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ZkServiceInstance {
name: String,
address: String,
port: i32,
}
impl ZkServiceInstance {
pub fn get_service_name(&self) -> &str {
self.name.as_str()
}
pub fn get_host(&self) -> &str {
self.address.as_str()
}
pub fn get_port(&self) -> i32 {
self.port
}
}
impl ZookeeperRegistry {
pub fn new(connect_string: &str) -> ZookeeperRegistry {
let zk_client =
ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
info!("zk server connect string: {}", connect_string);
ZookeeperRegistry {
root_path: "/services".to_string(),
zk_client: Arc::new(zk_client),
listeners: RwLock::new(HashMap::new()),
memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
}
}
fn create_listener(
&self,
path: String,
service_name: String,
listener: Arc<<ZookeeperRegistry as Registry>::NotifyListener>,
) -> ServiceInstancesChangedListener {
let mut service_names = HashSet::new();
service_names.insert(service_name.clone());
ServiceInstancesChangedListener {
zk_client: Arc::clone(&self.zk_client),
path,
service_name: service_name.clone(),
listener,
}
}
// metadata /dubbo/mapping designed for application discovery; deprecated for currently using interface discovery
// #[deprecated]
fn get_app_name(&self, service_name: String) -> String {
let res = self
.zk_client
.get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
let x = res.unwrap().0;
let s = match std::str::from_utf8(&x) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
s.to_string()
}
pub fn get_client(&self) -> Arc<ZooKeeper> {
self.zk_client.clone()
}
// If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned.
pub fn create_path(
&self,
path: &str,
data: &str,
create_mode: CreateMode,
) -> Result<(), StdError> {
if self.exists_path(path) {
self.zk_client
.set_data(path, data.as_bytes().to_vec(), None)
.unwrap_or_else(|_| panic!("set data to {} failed.", path));
return Ok(());
}
let zk_result = self.zk_client.create(
path,
data.as_bytes().to_vec(),
Acl::open_unsafe().clone(),
create_mode,
);
match zk_result {
Ok(_) => Ok(()),
Err(err) => {
error!("zk path {} parent not exists.", path);
Err(Box::try_from(err).unwrap())
}
}
}
// For avoiding Err(ZkError::NoNode) when parent node is't exists
pub fn create_path_with_parent_check(
&self,
path: &str,
data: &str,
create_mode: CreateMode,
) -> Result<(), StdError> {
let nodes: Vec<&str> = path.split('/').collect();
let mut current: String = String::new();
let children = *nodes.last().unwrap();
for node_key in nodes {
if node_key.is_empty() {
continue;
};
current.push('/');
current.push_str(node_key);
if !self.exists_path(current.as_str()) {
let new_create_mode = match children == node_key {
true => create_mode,
false => CreateMode::Persistent,
};
let new_data = match children == node_key {
true => data,
false => "",
};
self.create_path(current.as_str(), new_data, new_create_mode)
.unwrap();
}
}
Ok(())
}
pub fn delete_path(&self, path: &str) {
if self.exists_path(path) {
self.get_client().delete(path, None).unwrap()
}
}
pub fn exists_path(&self, path: &str) -> bool {
self.zk_client.exists(path, false).unwrap().is_some()
}
pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
if self.exists_path(path) {
let zk_result = self.zk_client.get_data(path, watch);
if let Ok(..) = zk_result {
Some(String::from_utf8(zk_result.unwrap().0).unwrap())
} else {
None
}
} else {
None
}
}
}
impl Default for ZookeeperRegistry {
fn default() -> ZookeeperRegistry {
let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
Ok(val) => val,
Err(_) => {
let default_connect_string = "localhost:2181";
info!(
"No ZOOKEEPER_SERVERS env value, using {} as default.",
default_connect_string
);
default_connect_string.to_string()
}
};
info!(
"using external registry with it's connect string {}",
zk_connect_string.as_str()
);
ZookeeperRegistry::new(zk_connect_string.as_str())
}
}
impl Registry for ZookeeperRegistry {
type NotifyListener = MemoryNotifyListener;
fn register(&mut self, url: Url) -> Result<(), StdError> {
let zk_path = format!(
"/{}/{}/{}/{}",
DUBBO_KEY,
url.service_name,
PROVIDERS_KEY,
url.encoded_raw_url_string()
);
self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
Ok(())
}
fn unregister(&mut self, url: Url) -> Result<(), StdError> {
let zk_path = format!(
"/{}/{}/{}/{}",
DUBBO_KEY,
url.service_name,
PROVIDERS_KEY,
url.encoded_raw_url_string()
);
self.delete_path(zk_path.as_str());
Ok(())
}
// for consumer to find the changes of providers
fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
let service_name = url.get_service_name();
let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
if self
.listeners
.read()
.unwrap()
.get(service_name.as_str())
.is_some()
{
return Ok(());
}
let arc_listener = Arc::new(listener);
self.listeners
.write()
.unwrap()
.insert(service_name.to_string(), Arc::clone(&arc_listener));
let zk_listener = self.create_listener(
zk_path.clone(),
service_name.to_string(),
Arc::clone(&arc_listener),
);
let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
let result = match zk_changed_paths {
Err(err) => {
error!("zk subscribe error: {}", err);
Vec::new()
}
Ok(urls) => urls
.iter()
.map(|node_key| {
let provider_url: Url = urlencoding::decode(node_key)
.unwrap()
.to_string()
.as_str()
.into();
provider_url
})
.collect(),
};
info!("notifying {}->{:?}", service_name, result);
arc_listener.notify(ServiceEvent {
key: service_name,
action: String::from("ADD"),
service: result,
});
Ok(())
}
fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
todo!()
}
}
pub struct ServiceInstancesChangedListener {
zk_client: Arc<ZooKeeper>,
path: String,
service_name: String,
listener: Arc<MemoryNotifyListener>,
}
impl Watcher for ServiceInstancesChangedListener {
fn handle(&self, event: WatchedEvent) {
if let (WatchedEventType::NodeChildrenChanged, Some(path)) = (event.event_type, event.path)
{
let event_path = path.clone();
let dirs = self
.zk_client
.get_children(&event_path, false)
.expect("msg");
let result: Vec<Url> = dirs
.iter()
.map(|node_key| {
let provider_url: Url = node_key.as_str().into();
provider_url
})
.collect();
let res = self.zk_client.get_children_w(
&path,
ServiceInstancesChangedListener {
zk_client: Arc::clone(&self.zk_client),
path: path.clone(),
service_name: self.service_name.clone(),
listener: Arc::clone(&self.listener),
},
);
info!("notify {}->{:?}", self.service_name, result);
self.listener.notify(ServiceEvent {
key: self.service_name.clone(),
action: String::from("ADD"),
service: result,
});
}
}
}
impl NotifyListener for ServiceInstancesChangedListener {
fn notify(&self, event: ServiceEvent) {
self.listener.notify(event);
}
fn notify_all(&self, event: ServiceEvent) {
self.listener.notify(event);
}
}
impl ClusterRegistryIntegration for ZookeeperRegistry {
fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
todo!()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
use crate::zookeeper_registry::ZookeeperRegistry;
struct TestZkWatcher {
pub watcher: Arc<Option<TestZkWatcher>>,
}
impl Watcher for TestZkWatcher {
fn handle(&self, event: WatchedEvent) {
println!("event: {:?}", event);
}
}
#[test]
fn zk_read_write_watcher() {
// https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
// using ENV to set zookeeper server urls
let zkr = ZookeeperRegistry::default();
let zk_client = zkr.get_client();
let watcher = TestZkWatcher {
watcher: Arc::new(None),
};
if zk_client.exists("/test", true).is_err() {
zk_client
.create(
"/test",
vec![1, 3],
Acl::open_unsafe().clone(),
CreateMode::Ephemeral,
)
.unwrap();
}
let zk_res = zk_client.create(
"/test",
"hello".into(),
Acl::open_unsafe().clone(),
CreateMode::Ephemeral,
);
let result = zk_client.get_children_w("/test", watcher);
assert!(result.is_ok());
if zk_client.exists("/test/a", true).is_err() {
zk_client.delete("/test/a", None).unwrap();
}
if zk_client.exists("/test/a", true).is_err() {
zk_client.delete("/test/b", None).unwrap();
}
let zk_res = zk_client.create(
"/test/a",
"hello".into(),
Acl::open_unsafe().clone(),
CreateMode::Ephemeral,
);
let zk_res = zk_client.create(
"/test/b",
"world".into(),
Acl::open_unsafe().clone(),
CreateMode::Ephemeral,
);
let test_a_result = zk_client.get_data("/test", true);
assert!(test_a_result.is_ok());
let vec1 = test_a_result.unwrap().0;
// data in /test should equals to "hello"
assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
zk_client.close().unwrap()
}
#[test]
fn create_path_with_parent_check() {
let zkr = ZookeeperRegistry::default();
let path = "/du1bbo/test11111";
let data = "hello";
// creating a child on a not exists parent, throw a NoNode error.
// let result = zkr.create_path(path, data, CreateMode::Ephemeral);
// assert!(result.is_err());
let create_with_parent_check_result =
zkr.create_path_with_parent_check(path, data, CreateMode::Ephemeral);
assert!(create_with_parent_check_result.is_ok());
assert_eq!(data, zkr.get_data(path, false).unwrap());
}
}