load balance and service registry closed loop (#105)
* feat(cluster): loadbalance types
* feat(rpc): types alias for cluster invoker
* feat(cluster): integration
* feat(cluster): integration with examples
* feat(cluster.loadbalance): greeter example with default random loadbalance passed
* feat(cluster.loadbalance): completing roundrobin arithmetic and fixing compile warns.
* typo
* fix compile warns
* fix rustfmt check fails.
* fix cargo check fails(due to the use of nightly channel locally).
* fix: default yaml config parse failed in ci.
* ci actions zk test
* feat(registry): zk support
* feat(registry): zk support, connected to zk
* feat(registry): provider.services key as service name
* feat(registry): serviceKey and configuration files, aligned to dubbo ecology
* feat(commons): tested Url impl
* feat(commons): tested Url impl
* feat(zk): interface service discovery
* feat(zk): create_path_with_parent_check
* feat(zk): export bug fixed.
diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml
index 56d2c11..304650d 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -45,10 +45,16 @@
toolchain: stable
- run: rustup component add rustfmt
- run: cargo fmt --all -- --check
-
example-greeter:
name: example/greeter
runs-on: ubuntu-latest
+ services:
+ zoo1:
+ image: zookeeper:3.8
+ ports:
+ - 2181:2181
+ env:
+ ZOO_MY_ID: 1
steps:
- uses: actions/checkout@main
- uses: actions-rs/toolchain@v1
@@ -80,6 +86,9 @@
- name: example greeter
run: |
../../target/debug/greeter-server &
- sleep 1s ;
+ sleep 3
../../target/debug/greeter-client
+ env:
+ ZOOKEEPER_SERVERS: 127.0.0.1:2181
+ DUBBO_CONFIG_PATH: ./dubbo.yaml
working-directory: examples/greeter
\ No newline at end of file
diff --git a/config/src/config.rs b/config/src/config.rs
index b82475c..828689c 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -17,6 +17,7 @@
use std::{collections::HashMap, env, fs, sync::RwLock};
+use crate::protocol::Protocol;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
@@ -26,6 +27,8 @@
pub const DUBBO_CONFIG_PATH: &str = "./dubbo.yaml";
+pub const DUBBO_CONFIG_PREFIX: &str = "dubbo";
+
lazy_static! {
pub static ref GLOBAL_ROOT_CONFIG: RwLock<Option<RootConfig>> = RwLock::new(None);
}
@@ -35,16 +38,13 @@
#[allow(dead_code)]
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct RootConfig {
- pub name: String,
+ #[serde(default)]
+ pub protocols: ProtocolConfig,
- #[serde(skip_serializing, skip_deserializing)]
- pub service: HashMap<String, ServiceConfig>,
- pub protocols: HashMap<String, ProtocolConfig>,
- pub registries: HashMap<String, String>,
-
+ #[serde(default)]
pub provider: ProviderConfig,
- #[serde(skip_serializing, skip_deserializing)]
+ #[serde(default)]
pub data: HashMap<String, String>,
}
@@ -66,10 +66,7 @@
impl RootConfig {
pub fn new() -> Self {
Self {
- name: "dubbo".to_string(),
- service: HashMap::new(),
protocols: HashMap::new(),
- registries: HashMap::new(),
provider: ProviderConfig::new(),
data: HashMap::new(),
}
@@ -93,12 +90,10 @@
tracing::info!("current path: {:?}", env::current_dir());
let data = fs::read(config_path)?;
- let mut conf: RootConfig = serde_yaml::from_slice(&data).unwrap();
+ let conf: HashMap<String, RootConfig> = serde_yaml::from_slice(&data).unwrap();
+ let root_config: RootConfig = conf.get(DUBBO_CONFIG_PREFIX).unwrap().clone();
tracing::debug!("origin config: {:?}", conf);
- for (name, svc) in conf.service.iter_mut() {
- svc.name = name.to_string();
- }
- Ok(conf)
+ Ok(root_config)
}
pub fn test_config(&mut self) {
@@ -108,37 +103,29 @@
let service_config = ServiceConfig::default()
.group("test".to_string())
- .serializer("json".to_string())
.version("1.0.0".to_string())
- .protocol_names("triple".to_string())
- .name("grpc.examples.echo.Echo".to_string());
+ .protocol("triple".to_string())
+ .interface("grpc.examples.echo.Echo".to_string());
- let triple_config = ProtocolConfig::default()
- .name("triple".to_string())
- .ip("0.0.0.0".to_string())
- .port("8888".to_string());
-
- let service_config = service_config.add_protocol_configs(triple_config);
- self.service
+ self.provider
+ .services
.insert("grpc.examples.echo.Echo".to_string(), service_config);
- self.service.insert(
+ self.provider.services.insert(
"helloworld.Greeter".to_string(),
ServiceConfig::default()
.group("test".to_string())
- .serializer("json".to_string())
.version("1.0.0".to_string())
- .name("helloworld.Greeter".to_string())
- .protocol_names("triple".to_string()),
+ .interface("helloworld.Greeter".to_string())
+ .protocol("triple".to_string()),
);
self.protocols.insert(
"triple".to_string(),
- ProtocolConfig::default()
+ Protocol::default()
.name("triple".to_string())
.ip("0.0.0.0".to_string())
.port("8889".to_string()),
);
- provider.services = self.service.clone();
self.provider = provider.clone();
println!("provider config: {:?}", provider);
// 通过环境变量读取某个文件。加在到内存中
diff --git a/config/src/protocol.rs b/config/src/protocol.rs
index 654f0d8..cdc357a 100644
--- a/config/src/protocol.rs
+++ b/config/src/protocol.rs
@@ -19,8 +19,10 @@
use serde::{Deserialize, Serialize};
+pub const DEFAULT_PROTOCOL: &str = "triple";
+
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
-pub struct ProtocolConfig {
+pub struct Protocol {
pub ip: String,
pub port: String,
pub name: String,
@@ -29,7 +31,14 @@
pub params: HashMap<String, String>,
}
-impl ProtocolConfig {
+pub type ProtocolConfig = HashMap<String, Protocol>;
+
+pub trait ProtocolRetrieve {
+ fn get_protocol(&self, protocol_key: &str) -> Option<Protocol>;
+ fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol;
+}
+
+impl Protocol {
pub fn name(self, name: String) -> Self {
Self { name, ..self }
}
@@ -50,3 +59,28 @@
format!("{}://{}:{}", self.name, self.ip, self.port)
}
}
+
+impl ProtocolRetrieve for ProtocolConfig {
+ fn get_protocol(&self, protocol_key: &str) -> Option<Protocol> {
+ let result = self.get(protocol_key);
+ if let Some(..) = result {
+ Some(result.unwrap().clone())
+ } else {
+ None
+ }
+ }
+
+ fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol {
+ let result = self.get_protocol(protocol_key);
+ if let Some(..) = result {
+ result.unwrap().clone()
+ } else {
+ let result = self.get_protocol(protocol_key);
+ if result.is_none() {
+ panic!("default triple protocol dose not defined.")
+ } else {
+ result.unwrap()
+ }
+ }
+ }
+}
diff --git a/config/src/provider.rs b/config/src/provider.rs
index b7bec78..ccb9cf8 100644
--- a/config/src/provider.rs
+++ b/config/src/provider.rs
@@ -23,10 +23,11 @@
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct ProviderConfig {
+ #[serde(default)]
pub registry_ids: Vec<String>,
-
+ #[serde(default)]
pub protocol_ids: Vec<String>,
-
+ #[serde(default)]
pub services: HashMap<String, ServiceConfig>,
}
diff --git a/config/src/service.rs b/config/src/service.rs
index 809f560..1f85a92 100644
--- a/config/src/service.rs
+++ b/config/src/service.rs
@@ -15,30 +15,19 @@
* limitations under the License.
*/
-use std::collections::HashMap;
-
use serde::{Deserialize, Serialize};
-use super::protocol::ProtocolConfig;
-
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct ServiceConfig {
pub version: String,
pub group: String,
-
- #[serde(skip_serializing, skip_deserializing)]
- pub name: String,
pub protocol: String,
- pub registry: String,
- pub serializer: String,
-
- // #[serde(skip_serializing, skip_deserializing)]
- pub protocol_configs: HashMap<String, ProtocolConfig>,
+ pub interface: String,
}
impl ServiceConfig {
- pub fn name(self, name: String) -> Self {
- Self { name, ..self }
+ pub fn interface(self, interface: String) -> Self {
+ Self { interface, ..self }
}
pub fn version(self, version: String) -> Self {
@@ -49,20 +38,10 @@
Self { group, ..self }
}
- pub fn protocol_names(self, protocol: String) -> Self {
+ pub fn protocol(self, protocol: String) -> Self {
Self { protocol, ..self }
}
- pub fn serializer(self, serializer: String) -> Self {
- Self { serializer, ..self }
- }
-
- pub fn add_protocol_configs(mut self, protocol_config: ProtocolConfig) -> Self {
- self.protocol_configs
- .insert(protocol_config.name.clone(), protocol_config);
- Self { ..self }
- }
-
// pub fn get_url(&self) -> Vec<Url> {
// let mut urls = Vec::new();
// for (_, conf) in self.protocol_configs.iter() {
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 01cd89c..372e688 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -100,8 +100,8 @@
}
}
- pub fn with_directory(mut self, directory: Box<dyn Directory>) -> Self {
- self.inner = self.inner.with_directory(directory);
+ pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
+ self.inner = self.inner.with_cluster(invoker);
self
}
@@ -122,7 +122,7 @@
let package = if emit_package { service.package() } else { "" };
for method in service.methods() {
- let service_unique_name= format!(
+ let service_unique_name = format!(
"{}{}{}",
package,
if package.is_empty() { "" } else { "." },
@@ -139,7 +139,13 @@
stream.extend(generate_doc_comments(method.comment()));
let method = match (method.client_streaming(), method.server_streaming()) {
- (false, false) => generate_unary(service_unique_name, &method, proto_path, compile_well_known_types, path),
+ (false, false) => generate_unary(
+ service_unique_name,
+ &method,
+ proto_path,
+ compile_well_known_types,
+ path,
+ ),
(false, true) => {
generate_server_streaming(&method, proto_path, compile_well_known_types, path)
}
@@ -174,7 +180,7 @@
) -> Result<Response<#response>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
- .with_servie_unique_name(String::from(#service_unique_name))
+ .with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner
@@ -182,7 +188,7 @@
request,
codec,
path,
- invocation,
+ Arc::new(invocation),
)
.await
}
diff --git a/dubbo.yaml b/dubbo.yaml
index f6e7aa9..7404c14 100644
--- a/dubbo.yaml
+++ b/dubbo.yaml
@@ -11,14 +11,16 @@
ip: 0.0.0.0
port: '8888'
name: triple
- # helloworld.Greeter:
- # version: 1.0.0
- # group: test
- # protocol: triple
- # registry: ''
- # serializer: json
+# helloworld.Greeter:
+# version: 1.0.0
+# group: test
+# protocol: triple
+# registry: ''
+# serializer: json
protocols:
triple:
ip: 0.0.0.0
port: '8888'
name: triple
+
+
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 4161601..cda887f 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -33,5 +33,7 @@
axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
+itertools = "0.10.1"
+urlencoding = "2.1.2"
dubbo-config = {path = "../config", version = "0.2.0"}
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index 9f02386..b33bab9 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -24,8 +24,10 @@
use crate::registry::memory_registry::MemoryNotifyListener;
use crate::registry::{BoxRegistry, RegistryWrapper};
+pub type BoxDirectory = Box<dyn Directory>;
+
pub trait Directory: Debug + DirectoryClone {
- fn list(&self, invocation: RpcInvocation) -> Vec<Url>;
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Arc<Vec<Url>>;
}
pub trait DirectoryClone {
@@ -47,7 +49,7 @@
}
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct RegistryDirectory {
registry: RegistryWrapper,
service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
@@ -64,32 +66,37 @@
}
}
-impl DirectoryClone for RegistryDirectory {
- fn clone_box(&self) -> Box<dyn Directory> {
- todo!()
- }
-}
+// impl DirectoryClone for RegistryDirectory {
+// fn clone_box(&self) -> Box<dyn Directory> {
+// todo!()
+// }
+// }
impl Directory for RegistryDirectory {
- fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Arc<Vec<Url>> {
let service_name = invocation.get_target_service_unique_name();
- let url = Url::from_url(&format!(
- "triple://{}:{}/{}",
- "127.0.0.1", "8888", service_name
- ))
- .unwrap();
+ let url =
+ Url::from_url(&format!("tri://{}:{}/{}", "0.0.0.0", "8888", service_name)).unwrap();
- self.registry.registry.as_ref().expect("msg").subscribe(
- url,
- MemoryNotifyListener {
- service_instances: Arc::clone(&self.service_instances),
- },
- ).expect("subscribe");
+ self.registry
+ .registry
+ .as_ref()
+ .expect("msg")
+ .subscribe(
+ url,
+ MemoryNotifyListener {
+ service_instances: Arc::clone(&self.service_instances),
+ },
+ )
+ .expect("subscribe");
- let map = self.service_instances.read().expect("service_instances.read");
+ let map = self
+ .service_instances
+ .read()
+ .expect("service_instances.read");
let binding = Vec::new();
let url_vec = map.get(&service_name).unwrap_or(&binding);
- url_vec.to_vec()
+ Arc::new(url_vec.to_vec())
}
}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/loadbalance/impls/mod.rs
new file mode 100644
index 0000000..eadbdfb
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/mod.rs
@@ -0,0 +1,2 @@
+pub mod random;
+pub mod roundrobin;
diff --git a/dubbo/src/cluster/loadbalance/impls/random.rs b/dubbo/src/cluster/loadbalance/impls/random.rs
new file mode 100644
index 0000000..d5305db
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/random.rs
@@ -0,0 +1,39 @@
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use crate::cluster::loadbalance::types::{LoadBalance, Metadata};
+use crate::codegen::RpcInvocation;
+use crate::common::url::Url;
+
+pub struct RandomLoadBalance {
+ pub metadata: Metadata,
+}
+
+impl Default for RandomLoadBalance {
+ fn default() -> Self {
+ RandomLoadBalance {
+ metadata: Metadata::new("random"),
+ }
+ }
+}
+
+impl Debug for RandomLoadBalance {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "RandomLoadBalance")
+ }
+}
+
+impl LoadBalance for RandomLoadBalance {
+ fn select(
+ &self,
+ invokers: Arc<Vec<Url>>,
+ _url: Option<Url>,
+ _invocation: Arc<RpcInvocation>,
+ ) -> Option<Url> {
+ if invokers.is_empty() {
+ return None;
+ }
+ let index = rand::random::<usize>() % invokers.len();
+ Some(invokers[index].clone())
+ }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
new file mode 100644
index 0000000..b3a0112
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
@@ -0,0 +1,63 @@
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, RwLock};
+
+use crate::cluster::loadbalance::types::{LoadBalance, Metadata};
+use crate::codegen::RpcInvocation;
+use crate::common::url::Url;
+
+pub struct RoundRobinLoadBalance {
+ pub metadata: Metadata,
+ pub counter_map: RwLock<HashMap<String, AtomicUsize>>,
+}
+
+impl Default for RoundRobinLoadBalance {
+ fn default() -> Self {
+ RoundRobinLoadBalance {
+ metadata: Metadata::new("roundrobin"),
+ counter_map: RwLock::new(HashMap::new()),
+ }
+ }
+}
+
+impl Debug for RoundRobinLoadBalance {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "RoundRobinLoadBalance")
+ }
+}
+
+impl RoundRobinLoadBalance {
+ fn guarantee_counter_key(&self, key: &str) {
+ let contained = self.counter_map.try_read().unwrap().contains_key(key);
+ if !contained {
+ self.counter_map
+ .try_write()
+ .unwrap()
+ .insert(key.to_string(), AtomicUsize::new(0));
+ }
+ }
+}
+
+impl LoadBalance for RoundRobinLoadBalance {
+ fn select(
+ &self,
+ invokers: Arc<Vec<Url>>,
+ _url: Option<Url>,
+ invocation: Arc<RpcInvocation>,
+ ) -> Option<Url> {
+ if invokers.is_empty() {
+ return None;
+ }
+ let fingerprint = invocation.unique_fingerprint();
+ self.guarantee_counter_key(fingerprint.as_str());
+ let index = self
+ .counter_map
+ .try_read()
+ .unwrap()
+ .get(fingerprint.as_str())?
+ .fetch_add(1, Ordering::SeqCst)
+ % invokers.len();
+ Some(invokers[index].clone())
+ }
+}
diff --git a/dubbo/src/cluster/loadbalance/mod.rs b/dubbo/src/cluster/loadbalance/mod.rs
new file mode 100644
index 0000000..dd08724
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/mod.rs
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::collections::HashMap;
+
+use lazy_static::lazy_static;
+
+use crate::cluster::loadbalance::impls::random::RandomLoadBalance;
+use crate::cluster::loadbalance::impls::roundrobin::RoundRobinLoadBalance;
+use crate::cluster::loadbalance::types::BoxLoadBalance;
+
+pub mod impls;
+pub mod types;
+
+lazy_static! {
+ pub static ref LOAD_BALANCE_EXTENSIONS: HashMap<String, BoxLoadBalance> =
+ init_loadbalance_extensions();
+}
+
+fn init_loadbalance_extensions() -> HashMap<String, BoxLoadBalance> {
+ let mut loadbalance_map: HashMap<String, BoxLoadBalance> = HashMap::new();
+ loadbalance_map.insert("random".to_string(), Box::new(RandomLoadBalance::default()));
+ loadbalance_map.insert(
+ "roundrobin".to_string(),
+ Box::new(RoundRobinLoadBalance::default()),
+ );
+ loadbalance_map
+}
diff --git a/dubbo/src/cluster/loadbalance/types.rs b/dubbo/src/cluster/loadbalance/types.rs
new file mode 100644
index 0000000..baf3d08
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/types.rs
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use crate::codegen::RpcInvocation;
+use crate::common::url::Url;
+
+pub type BoxLoadBalance = Box<dyn LoadBalance + Send + Sync>;
+
+pub trait LoadBalance: Debug {
+ fn select(
+ &self,
+ invokers: Arc<Vec<Url>>,
+ url: Option<Url>,
+ invocation: Arc<RpcInvocation>,
+ ) -> Option<Url>;
+}
+
+pub struct Metadata {
+ pub name: &'static str,
+}
+
+impl Metadata {
+ pub fn new(name: &'static str) -> Self {
+ Metadata { name }
+ }
+}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 84980ea..ef43fe6 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -16,3 +16,5 @@
*/
pub mod directory;
+pub mod loadbalance;
+pub mod support;
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs b/dubbo/src/cluster/support/cluster_invoker.rs
new file mode 100644
index 0000000..814b596
--- /dev/null
+++ b/dubbo/src/cluster/support/cluster_invoker.rs
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::str::FromStr;
+use std::sync::Arc;
+
+use http::uri::PathAndQuery;
+use http::Request;
+use hyper::Body;
+use tower_service::Service;
+
+use crate::cluster::loadbalance::types::BoxLoadBalance;
+use crate::cluster::loadbalance::LOAD_BALANCE_EXTENSIONS;
+use crate::cluster::support::DEFAULT_LOADBALANCE;
+use crate::codegen::{Directory, RegistryDirectory, TripleClient};
+use crate::common::url::Url;
+use crate::invocation::RpcInvocation;
+use crate::triple;
+
+#[derive(Debug, Clone)]
+pub struct ClusterInvoker {
+ directory: Arc<RegistryDirectory>,
+ destroyed: bool,
+}
+
+pub trait ClusterInvokerSelector {
+ /// Select a invoker using loadbalance policy.
+ fn select(
+ &self,
+ invocation: Arc<RpcInvocation>,
+ invokers: Arc<Vec<Url>>,
+ excluded: Arc<Vec<Url>>,
+ ) -> Option<Url>;
+
+ fn do_select(
+ &self,
+ loadbalance_key: Option<&str>,
+ invocation: Arc<RpcInvocation>,
+ invokers: Arc<Vec<Url>>,
+ ) -> Option<Url>;
+}
+
+pub trait ClusterRequestBuilder<T>
+where
+ T: Service<http::Request<hyper::Body>, Response = http::Response<crate::BoxBody>>,
+ T::Error: Into<crate::Error>,
+{
+ fn build_req(
+ &self,
+ triple_client: &TripleClient<T>,
+ path: http::uri::PathAndQuery,
+ invocation: Arc<RpcInvocation>,
+ body: hyper::Body,
+ ) -> http::Request<hyper::Body>;
+}
+
+impl ClusterInvoker {
+ pub fn with_directory(registry_directory: RegistryDirectory) -> Self {
+ ClusterInvoker {
+ directory: Arc::new(registry_directory),
+ destroyed: false,
+ }
+ }
+
+ pub fn directory(&self) -> Arc<RegistryDirectory> {
+ self.directory.clone()
+ }
+
+ pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance {
+ if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) {
+ LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap()
+ } else {
+ println!(
+ "loadbalance {} not found, use default loadbalance {}",
+ loadbalance_key, DEFAULT_LOADBALANCE
+ );
+ LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap()
+ }
+ }
+
+ pub fn is_available(&self, invocation: Arc<RpcInvocation>) -> bool {
+ !self.destroyed() && !self.directory.list(invocation).is_empty()
+ }
+
+ pub fn destroyed(&self) -> bool {
+ self.destroyed
+ }
+}
+
+impl ClusterInvokerSelector for ClusterInvoker {
+ fn select(
+ &self,
+ invocation: Arc<RpcInvocation>,
+ invokers: Arc<Vec<Url>>,
+ _excluded: Arc<Vec<Url>>,
+ ) -> Option<Url> {
+ if invokers.is_empty() {
+ return None;
+ }
+ let instance_count = invokers.len();
+ return if instance_count == 1 {
+ Some(invokers.as_ref().first()?.clone())
+ } else {
+ let loadbalance = Some(DEFAULT_LOADBALANCE);
+ self.do_select(loadbalance, invocation, invokers)
+ };
+ }
+
+ /// picking instance invoker url from registry directory
+ fn do_select(
+ &self,
+ loadbalance_key: Option<&str>,
+ invocation: Arc<RpcInvocation>,
+ invokers: Arc<Vec<Url>>,
+ ) -> Option<Url> {
+ let loadbalance = self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE));
+ loadbalance.select(invokers, None, invocation)
+ }
+}
+
+impl<T> ClusterRequestBuilder<T> for ClusterInvoker
+where
+ T: Service<http::Request<hyper::Body>, Response = http::Response<crate::BoxBody>>,
+ T::Error: Into<crate::Error>,
+{
+ fn build_req(
+ &self,
+ triple_client: &triple::client::triple::TripleClient<T>,
+ path: PathAndQuery,
+ invocation: Arc<RpcInvocation>,
+ body: Body,
+ ) -> Request<Body> {
+ let invokers = self.directory.list(invocation.clone());
+ let invoker_url = self
+ .select(invocation, invokers, Arc::new(Vec::new()))
+ .expect("no valid provider");
+ let http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", invoker_url.ip, invoker_url.port))
+ .unwrap();
+ TripleClient::new_map_request(triple_client, http_uri, path, body)
+ }
+}
diff --git a/dubbo/src/cluster/support/mod.rs b/dubbo/src/cluster/support/mod.rs
new file mode 100644
index 0000000..ae42cc2
--- /dev/null
+++ b/dubbo/src/cluster/support/mod.rs
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod cluster_invoker;
+
+pub const DEFAULT_LOADBALANCE: &str = "random";
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 6a2e74d..7489faf 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -24,12 +24,16 @@
pub use hyper::Body as hyperBody;
pub use tower_service::Service;
-pub use super::registry::Registry;
-pub use super::registry::BoxRegistry;
-pub use super::registry::RegistryWrapper;
+pub use super::cluster::directory::Directory;
+pub use super::cluster::directory::RegistryDirectory;
+pub use super::cluster::support::cluster_invoker::ClusterInvoker;
+pub use super::invocation::RpcInvocation;
pub use super::invocation::{IntoStreamingRequest, Request, Response};
pub use super::protocol::triple::triple_invoker::TripleInvoker;
pub use super::protocol::Invoker;
+pub use super::registry::BoxRegistry;
+pub use super::registry::Registry;
+pub use super::registry::RegistryWrapper;
pub use super::triple::client::TripleClient;
pub use super::triple::codec::prost::ProstCodec;
pub use super::triple::codec::Codec;
@@ -39,9 +43,6 @@
};
pub use super::triple::server::TripleServer;
pub use super::{empty_body, BoxBody, BoxFuture, StdError};
-pub use super::invocation::RpcInvocation;
-pub use super::cluster::directory::Directory;
-pub use super::cluster::directory::RegistryDirectory;
pub use crate::filter::service::FilterService;
pub use crate::filter::Filter;
pub use crate::triple::client::connection::Connection;
diff --git a/dubbo/src/common/consts.rs b/dubbo/src/common/consts.rs
index c05c2c7..17993c8 100644
--- a/dubbo/src/common/consts.rs
+++ b/dubbo/src/common/consts.rs
@@ -18,3 +18,15 @@
pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
pub const PROTOCOL: &str = "protocol";
pub const REGISTRY: &str = "registry";
+
+// URL key
+pub const DUBBO_KEY: &str = "dubbo";
+pub const PROVIDERS_KEY: &str = "providers";
+pub const LOCALHOST_IP: &str = "127.0.0.1";
+pub const METADATA_MAPPING_KEY: &str = "mapping";
+pub const VERSION_KEY: &str = "version";
+pub const GROUP_KEY: &str = "group";
+pub const INTERFACE_KEY: &str = "interface";
+pub const ANYHOST_KEY: &str = "anyhost";
+pub const SIDE_KEY: &str = "side";
+pub const TIMESTAMP_KEY: &str = "timestamp";
diff --git a/dubbo/src/common/url.rs b/dubbo/src/common/url.rs
index 4f79b30..3ed290b 100644
--- a/dubbo/src/common/url.rs
+++ b/dubbo/src/common/url.rs
@@ -16,15 +16,23 @@
*/
use std::collections::HashMap;
+use std::fmt::{Display, Formatter};
+
+use crate::common::consts::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
+use http::Uri;
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Url {
- pub uri: String,
- pub protocol: String,
+ pub raw_url_string: String,
+ // value of scheme is different to protocol name, eg. triple -> tri://
+ pub scheme: String,
pub location: String,
pub ip: String,
pub port: String,
- pub service_key: Vec<String>,
+ // serviceKey format in dubbo java and go '{group}/{interfaceName}:{version}'
+ pub service_key: String,
+ // same to interfaceName
+ pub service_name: String,
pub params: HashMap<String, String>,
}
@@ -34,84 +42,192 @@
}
pub fn from_url(url: &str) -> Option<Self> {
- // url: triple://127.0.0.1:8888/helloworld.Greeter
+ // url: tri://127.0.0.1:8888/helloworld.Greeter
let uri = url
.parse::<http::Uri>()
.map_err(|err| {
tracing::error!("fail to parse url({}), err: {:?}", url, err);
})
.unwrap();
- Some(Self {
- uri: uri.to_string(),
- protocol: uri.scheme_str()?.to_string(),
+ let query = uri.path_and_query().unwrap().query();
+ let mut url_inst = Self {
+ raw_url_string: url.to_string(),
+ scheme: uri.scheme_str()?.to_string(),
ip: uri.authority()?.host().to_string(),
port: uri.authority()?.port()?.to_string(),
location: uri.authority()?.to_string(),
- service_key: uri
- .path()
- .trim_start_matches('/')
- .split(',')
- .map(|x| x.to_string())
- .collect::<Vec<_>>(),
- params: HashMap::new(),
- })
+ service_key: uri.path().trim_start_matches('/').to_string(),
+ service_name: uri.path().trim_start_matches('/').to_string(),
+ params: if let Some(..) = query {
+ Url::decode(query.unwrap())
+ } else {
+ HashMap::new()
+ },
+ };
+ url_inst.renew_raw_url_string();
+ Some(url_inst)
}
- pub fn get_service_name(&self) -> Vec<String> {
+ pub fn get_service_key(&self) -> String {
self.service_key.clone()
}
- pub fn get_param(&self, key: String) -> Option<String> {
- self.params.get(&key).cloned()
+ pub fn get_service_name(&self) -> String {
+ self.service_name.clone()
}
- pub fn encode_param(&self) -> String {
+ pub fn get_param(&self, key: &str) -> Option<String> {
+ self.params.get(key).cloned()
+ }
+
+ fn encode_param(&self) -> String {
let mut params_vec: Vec<String> = Vec::new();
for (k, v) in self.params.iter() {
// let tmp = format!("{}={}", k, v);
params_vec.push(format!("{}={}", k, v));
}
- params_vec.join("&")
+ if params_vec.is_empty() {
+ "".to_string()
+ } else {
+ format!("?{}", params_vec.join("&"))
+ }
}
- pub fn decode(&mut self, params: String) {
- let p: Vec<String> = params.split('&').map(|v| v.trim().to_string()).collect();
+ pub fn params_count(&self) -> usize {
+ self.params.len()
+ }
+
+ fn decode(raw_query_string: &str) -> HashMap<String, String> {
+ let mut params = HashMap::new();
+ let p: Vec<String> = raw_query_string
+ .split('&')
+ .map(|v| v.trim().to_string())
+ .collect();
for v in p.iter() {
let values: Vec<String> = v.split('=').map(|v| v.trim().to_string()).collect();
if values.len() != 2 {
continue;
}
- self.params.insert(values[0].clone(), values[1].clone());
+ params.insert(values[0].clone(), values[1].clone());
}
+ params
}
- pub fn to_url(&self) -> String {
- format!("{}://{}:{}", self.protocol, self.ip, self.port)
+ pub fn set_param(&mut self, key: &str, value: &str) {
+ self.params.insert(key.to_string(), value.to_string());
+ self.renew_raw_url_string();
+ }
+
+ pub fn raw_url_string(&self) -> String {
+ self.raw_url_string.clone()
+ }
+
+ pub fn encoded_raw_url_string(&self) -> String {
+ urlencoding::encode(self.raw_url_string.as_str()).to_string()
+ }
+
+ fn build_service_key(&self) -> String {
+ format!(
+ "{group}/{interfaceName}:{version}",
+ group = self.get_param(GROUP_KEY).unwrap_or("default".to_string()),
+ interfaceName = self.get_param(INTERFACE_KEY).unwrap_or("error".to_string()),
+ version = self.get_param(VERSION_KEY).unwrap_or("1.0.0".to_string())
+ )
+ }
+
+ fn renew_raw_url_string(&mut self) {
+ self.raw_url_string = format!(
+ "{}://{}:{}/{}{}",
+ self.scheme,
+ self.ip,
+ self.port,
+ self.service_name,
+ self.encode_param()
+ );
+ self.service_key = self.build_service_key()
+ }
+
+ // short_url is used for tcp listening
+ pub fn short_url(&self) -> String {
+ format!("{}://{}:{}", self.scheme, self.ip, self.port)
+ }
+}
+
+impl Display for Url {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(self.raw_url_string().as_str())
+ }
+}
+
+impl Into<Uri> for Url {
+ fn into(self) -> Uri {
+ self.raw_url_string.parse::<Uri>().unwrap()
+ }
+}
+
+impl From<&str> for Url {
+ fn from(url: &str) -> Self {
+ Url::from_url(url).unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
+ use crate::common::consts::{ANYHOST_KEY, VERSION_KEY};
#[test]
fn test_from_url() {
- let u1 = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter");
- println!("{:?}", u1.unwrap().get_service_name())
+ let mut u1 = Url::from_url("tri://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&\
+ application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&\
+ environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&\
+ module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&\
+ side=provider&timeout=3000×tamp=1556509797245&version=1.0.0&application=test");
+ assert_eq!(
+ u1.as_ref().unwrap().service_key,
+ "default/com.ikurento.user.UserProvider:1.0.0"
+ );
+ assert_eq!(
+ u1.as_ref()
+ .unwrap()
+ .get_param(ANYHOST_KEY)
+ .unwrap()
+ .as_str(),
+ "true"
+ );
+ assert_eq!(
+ u1.as_ref()
+ .unwrap()
+ .get_param("default.timeout")
+ .unwrap()
+ .as_str(),
+ "10000"
+ );
+ assert_eq!(u1.as_ref().unwrap().scheme, "tri");
+ assert_eq!(u1.as_ref().unwrap().ip, "127.0.0.1");
+ assert_eq!(u1.as_ref().unwrap().port, "20000");
+ assert_eq!(u1.as_ref().unwrap().params_count(), 18);
+ u1.as_mut().unwrap().set_param("key1", "value1");
+ assert_eq!(
+ u1.as_ref().unwrap().get_param("key1").unwrap().as_str(),
+ "value1"
+ );
+ assert_eq!(
+ u1.as_ref()
+ .unwrap()
+ .get_param(VERSION_KEY)
+ .unwrap()
+ .as_str(),
+ "1.0.0"
+ );
}
#[test]
- fn test_encode_params() {
- let mut u = Url::default();
- u.params.insert("method".to_string(), "GET".to_string());
- u.params.insert("args".to_string(), "GET".to_string());
-
- let en = u.encode_param();
- println!("encode_params: {:?}", en);
-
- let mut u1 = Url::default();
- u1.decode(en);
- println!("decode_params: {:?}", u1);
- assert_eq!(u1, u);
+ fn test2() {
+ let url: Url = "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter".into();
+ assert_eq!(
+ url.raw_url_string(),
+ "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter"
+ )
}
}
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index 5cff9e8..96b9b28 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -16,22 +16,30 @@
*/
use std::collections::HashMap;
+use std::error::Error;
use std::pin::Pin;
+use std::sync::{Arc, Mutex};
use futures::future;
use futures::Future;
+use tracing::{debug, info};
+
+use dubbo_config::protocol::ProtocolRetrieve;
+use dubbo_config::{get_global_config, RootConfig};
use crate::common::url::Url;
use crate::protocol::{BoxExporter, Protocol};
use crate::registry::protocol::RegistryProtocol;
-use dubbo_config::{get_global_config, RootConfig};
+use crate::registry::types::{Registries, RegistriesOperation};
+use crate::registry::{BoxRegistry, Registry};
// Invoker是否可以基于hyper写一个通用的
#[derive(Default)]
pub struct Dubbo {
+ // cached protocols, key of map means protocol name eg. dubbo, triple, grpc
protocols: HashMap<String, Vec<Url>>,
- registries: HashMap<String, Url>,
+ registries: Option<Registries>,
service_registry: HashMap<String, Vec<Url>>, // registry: Urls
config: Option<RootConfig>,
}
@@ -41,7 +49,7 @@
tracing_subscriber::fmt::init();
Self {
protocols: HashMap::new(),
- registries: HashMap::new(),
+ registries: None,
service_registry: HashMap::new(),
config: None,
}
@@ -52,80 +60,83 @@
self
}
- pub fn init(&mut self) {
+ pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry) -> Self {
+ if self.registries.is_none() {
+ self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
+ }
+ self.registries
+ .as_ref()
+ .unwrap()
+ .insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
+ self
+ }
+
+ pub fn init(&mut self) -> Result<(), Box<dyn Error>> {
if self.config.is_none() {
self.config = Some(get_global_config())
}
- let conf = self.config.as_ref().unwrap();
- tracing::debug!("global conf: {:?}", conf);
-
- for (name, url) in conf.registries.iter() {
- self.registries
- .insert(name.to_string(), Url::from_url(url).unwrap());
- }
-
- for (_, c) in conf.provider.services.iter() {
- let u = if c.protocol_configs.is_empty() {
- let protocol = match conf.protocols.get(&c.protocol) {
- Some(v) => v.to_owned(),
- None => {
- tracing::warn!("protocol {:?} not exists", c.protocol);
- continue;
- }
- };
- let protocol_url = format!("{}/{}", protocol.to_url(), c.name.clone(),);
+ let root_config = self.config.as_ref().unwrap();
+ debug!("global conf: {:?}", root_config);
+ // env::set_var("ZOOKEEPER_SERVERS",root_config);
+ for (_, service_config) in root_config.provider.services.iter() {
+ info!("init service name: {}", service_config.interface);
+ let url = if root_config
+ .protocols
+ .contains_key(service_config.protocol.as_str())
+ {
+ let protocol = root_config
+ .protocols
+ .get_protocol_or_default(service_config.protocol.as_str());
+ let protocol_url =
+ format!("{}/{}", protocol.to_url(), service_config.interface.clone(),);
+ info!("protocol_url: {:?}", protocol_url);
Url::from_url(&protocol_url)
} else {
- let protocol = match c.protocol_configs.get(&c.protocol) {
- Some(v) => v.to_owned(),
- None => {
- tracing::warn!("protocol {:?} not exists", c.protocol);
- continue;
- }
- };
- let protocol_url = format!("{}/{}", protocol.to_url(), c.name.clone(),);
- Url::from_url(&protocol_url)
+ return Err(format!("protocol {:?} not exists", service_config.protocol).into());
};
- tracing::info!("url: {:?}", u);
- if u.is_none() {
+ info!("url: {:?}", url);
+ if url.is_none() {
continue;
}
-
- let u = u.unwrap();
-
- let reg_url = self.registries.get(&c.registry).unwrap();
- if self.service_registry.get(&c.name).is_some() {
- self.service_registry
- .get_mut(&c.name)
+ let u = url.unwrap();
+ if self.protocols.get(&service_config.protocol).is_some() {
+ self.protocols
+ .get_mut(&service_config.protocol)
.unwrap()
- .push(reg_url.clone());
+ .push(u);
} else {
- self.service_registry
- .insert(c.name.clone(), vec![reg_url.clone()]);
- }
-
- if self.protocols.get(&c.protocol).is_some() {
- self.protocols.get_mut(&c.protocol).unwrap().push(u);
- } else {
- self.protocols.insert(c.protocol.clone(), vec![u]);
+ self.protocols
+ .insert(service_config.protocol.clone(), vec![u]);
}
}
+ Ok(())
}
pub async fn start(&mut self) {
- self.init();
-
+ self.init().unwrap();
+ info!("starting...");
// TODO: server registry
-
- let mem_reg =
- Box::new(RegistryProtocol::new().with_services(self.service_registry.clone()));
+ let mem_reg = Box::new(
+ RegistryProtocol::new()
+ .with_registries(self.registries.as_ref().unwrap().clone())
+ .with_services(self.service_registry.clone()),
+ );
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
for (name, items) in self.protocols.iter() {
for url in items.iter() {
- tracing::info!("protocol: {:?}, service url: {:?}", name, url);
+ info!("protocol: {:?}, service url: {:?}", name, url);
let exporter = mem_reg.clone().export(url.to_owned());
- async_vec.push(exporter)
+ async_vec.push(exporter);
+ //TODO multiple registry
+ if self.registries.is_some() {
+ self.registries
+ .as_ref()
+ .unwrap()
+ .default_registry()
+ .register(url.clone())
+ .unwrap();
+ }
}
}
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index 85d971f..9a9cf8f 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -15,9 +15,11 @@
* limitations under the License.
*/
-use futures_core::Stream;
+use std::fmt::Debug;
use std::{collections::HashMap, str::FromStr};
+use futures_core::Stream;
+
pub struct Request<T> {
pub message: T,
pub metadata: Metadata,
@@ -83,6 +85,13 @@
metadata: Metadata,
}
+pub trait RpcResult {}
+
+/// symbol for cluster invoke
+impl<T> RpcResult for Response<T> {}
+
+pub type BoxRpcResult = Box<dyn RpcResult>;
+
impl<T> Response<T> {
pub fn new(message: T) -> Response<T> {
Self {
@@ -195,14 +204,16 @@
fn get_method_name(&self) -> String;
}
-#[derive(Default)]
+pub type BoxInvocation = Box<dyn Invocation>;
+
+#[derive(Default, Debug)]
pub struct RpcInvocation {
target_service_unique_name: String,
method_name: String,
}
-impl RpcInvocation{
- pub fn with_servie_unique_name(mut self, service_unique_name: String) -> Self {
+impl RpcInvocation {
+ pub fn with_service_unique_name(mut self, service_unique_name: String) -> Self {
self.target_service_unique_name = service_unique_name;
self
}
@@ -210,9 +221,11 @@
self.method_name = method_name;
self
}
+ pub fn unique_fingerprint(&self) -> String {
+ format!("{}#{}", self.target_service_unique_name, self.method_name)
+ }
}
-
impl Invocation for RpcInvocation {
fn get_target_service_unique_name(&self) -> String {
self.target_service_unique_name.clone()
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index e685cba..0bac3f7 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-pub mod codegen;
pub mod cluster;
+pub mod codegen;
pub mod common;
pub mod filter;
mod framework;
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 20042cb..572b56d 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-pub mod server_desc;
-pub mod triple;
-
+use std::fmt::Debug;
use std::future::Future;
use std::task::{Context, Poll};
@@ -26,6 +24,9 @@
use crate::common::url::Url;
+pub mod server_desc;
+pub mod triple;
+
#[async_trait]
pub trait Protocol {
type Invoker;
@@ -39,7 +40,7 @@
fn unexport(&self);
}
-pub trait Invoker<ReqBody> {
+pub trait Invoker<ReqBody>: Debug {
type Response;
type Error;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index 2ec3f4a..2a727c3 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -24,7 +24,7 @@
use crate::triple::client::connection::Connection;
#[allow(dead_code)]
-#[derive(Clone, Default)]
+#[derive(Clone, Default, Debug)]
pub struct TripleInvoker {
url: Url,
conn: Connection,
@@ -32,7 +32,7 @@
impl TripleInvoker {
pub fn new(url: Url) -> TripleInvoker {
- let uri = http::Uri::from_str(&url.to_url()).unwrap();
+ let uri = http::Uri::from_str(&url.short_url()).unwrap();
Self {
url,
conn: Connection::new().with_host(uri),
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs
index 5b713ad..c15e26c 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -46,7 +46,7 @@
pub fn get_server(&self, url: Url) -> Option<TripleServer> {
self.servers
- .get(&url.service_key.join(","))
+ .get(&url.service_key)
.map(|data| data.to_owned())
}
}
@@ -60,10 +60,10 @@
}
async fn export(mut self, url: Url) -> BoxExporter {
- let server = TripleServer::new(url.service_key.clone());
- self.servers
- .insert(url.service_key.join(","), server.clone());
- server.serve(url.to_url()).await;
+ // service_key is same to key of TRIPLE_SERVICES
+ let server = TripleServer::new(url.service_name.clone());
+ self.servers.insert(url.service_key.clone(), server.clone());
+ server.serve(url.short_url().clone()).await;
Box::new(TripleExporter::new())
}
diff --git a/dubbo/src/protocol/triple/triple_server.rs b/dubbo/src/protocol/triple/triple_server.rs
index b85dbe1..8e42cab 100644
--- a/dubbo/src/protocol/triple/triple_server.rs
+++ b/dubbo/src/protocol/triple/triple_server.rs
@@ -26,9 +26,9 @@
}
impl TripleServer {
- pub fn new(names: Vec<String>) -> TripleServer {
+ pub fn new(names: String) -> TripleServer {
Self {
- service_names: names,
+ service_names: vec![names],
s: DubboServer::new(),
}
}
diff --git a/dubbo/src/registry/integration.rs b/dubbo/src/registry/integration.rs
new file mode 100644
index 0000000..2266e46
--- /dev/null
+++ b/dubbo/src/registry/integration.rs
@@ -0,0 +1,8 @@
+use crate::cluster::support::cluster_invoker::ClusterInvoker;
+use crate::registry::BoxRegistry;
+use std::sync::Arc;
+
+pub trait ClusterRegistryIntegration {
+ /// get cluster invoker struct
+ fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>>;
+}
diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs
index f1ff25f..76b8922 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -16,9 +16,11 @@
*/
#![allow(unused_variables, dead_code, missing_docs)]
+
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
+use tracing::debug;
use crate::common::url::Url;
@@ -46,9 +48,9 @@
impl Registry for MemoryRegistry {
type NotifyListener = MemoryNotifyListener;
- fn register(&mut self, mut url: crate::common::url::Url) -> Result<(), crate::StdError> {
+ fn register(&mut self, mut url: Url) -> Result<(), crate::StdError> {
// define provider label: ${registry.group}/${service_name}/provider
- let registry_group = match url.get_param(REGISTRY_GROUP_KEY.to_string()) {
+ let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
Some(key) => key,
None => "dubbo".to_string(),
};
@@ -56,20 +58,20 @@
let dubbo_path = format!(
"/{}/{}/{}",
registry_group,
- url.get_service_name().join(","),
+ url.get_service_name(),
"provider",
);
url.params.insert("anyhost".to_string(), "true".to_string());
// define triple url path
- let raw_url = format!("{}?{}", url.to_url(), url.encode_param(),);
+ let raw_url = url.raw_url_string();
self.registries.write().unwrap().insert(dubbo_path, raw_url);
Ok(())
}
fn unregister(&mut self, url: crate::common::url::Url) -> Result<(), crate::StdError> {
- let registry_group = match url.get_param(REGISTRY_GROUP_KEY.to_string()) {
+ let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
Some(key) => key,
None => "dubbo".to_string(),
};
@@ -77,7 +79,7 @@
let dubbo_path = format!(
"/{}/{}/{}",
registry_group,
- url.get_service_name().join(","),
+ url.get_service_name(),
"provider",
);
self.registries.write().unwrap().remove(&dubbo_path);
@@ -108,10 +110,11 @@
impl NotifyListener for MemoryNotifyListener {
fn notify(&self, event: super::ServiceEvent) {
- let mut map=self.service_instances.write().expect("msg");
+ debug!("notify {:?}", event);
+ let mut map = self.service_instances.write().expect("msg");
match event.action.as_str() {
- "ADD"=> map.insert(event.key, event.service),
- &_ => todo!()
+ "ADD" => map.insert(event.key, event.service),
+ &_ => todo!(),
};
}
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 8c56692..ea915fe 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -16,12 +16,15 @@
*/
#![allow(unused_variables, dead_code, missing_docs)]
+pub mod integration;
pub mod memory_registry;
pub mod protocol;
+pub mod types;
-use std::fmt::Debug;
+use std::fmt::{Debug, Formatter};
use crate::common::url::Url;
+use crate::registry::memory_registry::MemoryNotifyListener;
pub trait Registry {
type NotifyListener;
@@ -38,18 +41,24 @@
fn notify_all(&self, event: ServiceEvent);
}
+#[derive(Debug)]
pub struct ServiceEvent {
pub key: String,
pub action: String,
pub service: Vec<Url>,
}
-pub type BoxRegistry =
- Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener> + Send + Sync>;
+pub type BoxRegistry = Box<dyn Registry<NotifyListener = MemoryNotifyListener> + Send + Sync>;
+
+impl Debug for BoxRegistry {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str("BoxRegistry")
+ }
+}
#[derive(Default)]
pub struct RegistryWrapper {
- pub registry: Option<Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener>>>,
+ pub registry: Option<Box<dyn Registry<NotifyListener = MemoryNotifyListener>>>,
}
impl Clone for RegistryWrapper {
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 933975c..db6c1f6 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -16,7 +16,8 @@
*/
use std::collections::HashMap;
-use std::sync::{Arc, RwLock};
+use std::fmt::{Debug, Formatter};
+use std::sync::{Arc, Mutex, RwLock};
use super::memory_registry::MemoryRegistry;
use super::BoxRegistry;
@@ -27,26 +28,45 @@
use crate::protocol::BoxExporter;
use crate::protocol::BoxInvoker;
use crate::protocol::Protocol;
+use crate::registry::types::Registries;
#[derive(Clone, Default)]
pub struct RegistryProtocol {
// registerAddr: Registry
- registries: Arc<RwLock<HashMap<String, BoxRegistry>>>,
+ registries: Option<Registries>,
// providerUrl: Exporter
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
// serviceName: registryUrls
services: HashMap<String, Vec<Url>>,
}
+impl Debug for RegistryProtocol {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(
+ format!(
+ "RegistryProtocol services{:#?},registries,{:#?}",
+ self.services,
+ self.registries.clone()
+ )
+ .as_str(),
+ )
+ }
+}
+
impl RegistryProtocol {
pub fn new() -> Self {
RegistryProtocol {
- registries: Arc::new(RwLock::new(HashMap::new())),
+ registries: None,
exporters: Arc::new(RwLock::new(HashMap::new())),
services: HashMap::new(),
}
}
+ pub fn with_registries(mut self, registries: Registries) -> Self {
+ self.registries = Some(registries);
+ self
+ }
+
pub fn with_services(mut self, services: HashMap<String, Vec<Url>>) -> Self {
self.services.extend(services);
self
@@ -55,9 +75,11 @@
pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
let mem = MemoryRegistry::default();
self.registries
- .write()
+ .as_ref()
.unwrap()
- .insert(url.location, Box::new(mem.clone()));
+ .lock()
+ .unwrap()
+ .insert(url.location, Arc::new(Mutex::new(Box::new(mem.clone()))));
Box::new(mem)
}
@@ -77,23 +99,23 @@
// init Exporter based on provider_url
// server registry based on register_url
// start server health check
- let registry_url = self.services.get(url.get_service_name().join(",").as_str());
+ let registry_url = self.services.get(url.get_service_name().as_str());
if let Some(urls) = registry_url {
for url in urls.clone().iter() {
- if !url.protocol.is_empty() {
+ if !url.service_key.is_empty() {
let mut reg = self.get_registry(url.clone());
reg.register(url.clone()).unwrap();
}
}
}
- match url.clone().protocol.as_str() {
- "triple" => {
+ match url.clone().scheme.as_str() {
+ "tri" => {
let pro = Box::new(TripleProtocol::new());
return pro.export(url).await;
}
_ => {
- tracing::error!("protocol {:?} not implemented", url.protocol);
+ tracing::error!("protocol {:?} not implemented", url.scheme);
Box::new(TripleExporter::new())
}
}
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
new file mode 100644
index 0000000..4768c33
--- /dev/null
+++ b/dubbo/src/registry/types.rs
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use itertools::Itertools;
+use tracing::info;
+
+use crate::common::url::Url;
+use crate::registry::memory_registry::MemoryNotifyListener;
+use crate::registry::{BoxRegistry, Registry};
+use crate::StdError;
+
+pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
+pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;
+
+pub const DEFAULT_REGISTRY_KEY: &str = "default";
+
+pub trait RegistriesOperation {
+ fn get(&self, registry_key: &str) -> SafeRegistry;
+ fn insert(&self, registry_key: String, registry: SafeRegistry);
+ fn default_registry(&self) -> SafeRegistry;
+}
+
+impl RegistriesOperation for Registries {
+ fn get(&self, registry_key: &str) -> SafeRegistry {
+ self.as_ref()
+ .lock()
+ .unwrap()
+ .get(registry_key)
+ .unwrap()
+ .clone()
+ }
+
+ fn insert(&self, registry_key: String, registry: SafeRegistry) {
+ self.as_ref().lock().unwrap().insert(registry_key, registry);
+ }
+
+ fn default_registry(&self) -> SafeRegistry {
+ let guard = self.as_ref().lock().unwrap();
+ let (_, result) = guard
+ .iter()
+ .find_or_first(|e| e.0 == DEFAULT_REGISTRY_KEY)
+ .unwrap()
+ .to_owned();
+ result.clone()
+ }
+}
+
+impl Registry for SafeRegistry {
+ type NotifyListener = MemoryNotifyListener;
+
+ fn register(&mut self, url: Url) -> Result<(), StdError> {
+ info!("register {}.", url);
+ self.lock().unwrap().register(url).expect("registry err.");
+ Ok(())
+ }
+
+ fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+ self.lock().unwrap().register(url).expect("registry err.");
+ Ok(())
+ }
+
+ fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
+ self.lock().unwrap().register(url).expect("registry err.");
+ Ok(())
+ }
+
+ fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
+ self.lock().unwrap().register(url).expect("registry err.");
+ Ok(())
+ }
+}
diff --git a/dubbo/src/triple/client/connection.rs b/dubbo/src/triple/client/connection.rs
index ce45309..9246c9b 100644
--- a/dubbo/src/triple/client/connection.rs
+++ b/dubbo/src/triple/client/connection.rs
@@ -20,6 +20,7 @@
use hyper::client::conn::Builder;
use hyper::client::service::Connect;
use tower_service::Service;
+use tracing::debug;
use crate::boxed;
use crate::triple::transport::connector::get_connector;
@@ -86,6 +87,7 @@
let mut connector = Connect::new(get_connector(self.connector.clone()), builder);
let uri = self.host.clone();
let fut = async move {
+ debug!("send rpc call to {}", uri);
let mut con = connector.call(uri).await.unwrap();
con.call(req)
.await
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index 27db0ad..ca81f85 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -16,30 +16,30 @@
*/
use std::str::FromStr;
+use std::sync::Arc;
use futures_util::{future, stream, StreamExt, TryStreamExt};
-
use http::HeaderValue;
-use rand::prelude::SliceRandom;
use tower_service::Service;
-use super::connection::Connection;
-use crate::codegen::{Directory, RpcInvocation};
+use crate::cluster::support::cluster_invoker::{ClusterInvoker, ClusterRequestBuilder};
+use crate::codegen::RpcInvocation;
use crate::filter::service::FilterService;
use crate::filter::Filter;
use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
-
use crate::triple::codec::Codec;
use crate::triple::compression::CompressionEncoding;
use crate::triple::decode::Decoding;
use crate::triple::encode::encode;
+use super::connection::Connection;
+
#[derive(Debug, Clone, Default)]
pub struct TripleClient<T> {
host: Option<http::Uri>,
inner: T,
send_compression_encoding: Option<CompressionEncoding>,
- directory: Option<Box<dyn Directory>>,
+ cluster_invoker: Option<ClusterInvoker>,
}
impl TripleClient<Connection> {
@@ -56,7 +56,7 @@
host: uri.clone(),
inner: Connection::new().with_host(uri.unwrap()),
send_compression_encoding: Some(CompressionEncoding::Gzip),
- directory: None,
+ cluster_invoker: None,
}
}
}
@@ -67,7 +67,7 @@
host,
inner,
send_compression_encoding: Some(CompressionEncoding::Gzip),
- directory: None,
+ cluster_invoker: None,
}
}
@@ -78,10 +78,9 @@
TripleClient::new(FilterService::new(self.inner, filter), self.host)
}
- /// host: http://0.0.0.0:8888
- pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
+ pub fn with_cluster(self, invoker: ClusterInvoker) -> Self {
TripleClient {
- directory: Some(directory),
+ cluster_invoker: Some(invoker),
..self
}
}
@@ -92,7 +91,7 @@
T: Service<http::Request<hyper::Body>, Response = http::Response<crate::BoxBody>>,
T::Error: Into<crate::Error>,
{
- fn new_map_request(
+ pub fn new_map_request(
&self,
uri: http::Uri,
path: http::uri::PathAndQuery,
@@ -242,11 +241,11 @@
}
pub async fn unary<C, M1, M2>(
- &mut self,
+ &self,
req: Request<M1>,
mut codec: C,
path: http::uri::PathAndQuery,
- invocation: RpcInvocation,
+ invocation: Arc<RpcInvocation>,
) -> Result<Response<M2>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
@@ -262,14 +261,19 @@
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
- let url_list = self.directory.as_ref().expect("msg").list(invocation);
- let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
- let http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
-
- let req = self.new_map_request(http_uri.clone(), path, body);
-
- let mut conn = Connection::new().with_host(http_uri);
+ // let url_list = self.directory.as_ref().expect("msg").list(invocation);
+ // let url_list = self.cluster_invoker.as_ref().unwrap().directory().list(invocation.clone());
+ // let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+ // let http_uri =
+ // http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
+ //
+ // let req = self.new_map_request(http_uri.clone(), path, body);
+ let req =
+ self.cluster_invoker
+ .as_ref()
+ .unwrap()
+ .build_req(self, path, invocation.clone(), body);
+ let mut conn = Connection::new().with_host(req.uri().clone());
let response = conn
.call(req)
.await
diff --git a/dubbo/src/triple/compression.rs b/dubbo/src/triple/compression.rs
index b173211..65cdce2 100644
--- a/dubbo/src/triple/compression.rs
+++ b/dubbo/src/triple/compression.rs
@@ -111,12 +111,12 @@
let len = src.len();
src.reserve(len);
- compress(CompressionEncoding::Gzip, &mut src, &mut dst, len);
+ compress(CompressionEncoding::Gzip, &mut src, &mut dst, len).unwrap();
println!("src: {:?}, dst: {:?}", src, dst);
let mut de_dst = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
let de_len = dst.len();
- decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len);
+ decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len).unwrap();
println!("src: {:?}, dst: {:?}", dst, de_dst);
}
diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs
index bb2389f..71a3056 100644
--- a/examples/echo/src/protos/hello_echo.rs
+++ b/examples/echo/src/protos/hello_echo.rs
@@ -18,13 +18,13 @@
/// EchoRequest is the request for echo.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EchoRequest {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub message: ::prost::alloc::string::String,
}
/// EchoResponse is the response for echo.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EchoResponse {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub message: ::prost::alloc::string::String,
}
/// Generated client implementations.
@@ -67,17 +67,17 @@
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
- self.inner.unary(request, codec, path, RpcInvocation::default()).await
+ self.inner
+ .unary(request, codec, path, Arc::new(RpcInvocation::default()))
+ .await
}
/// ServerStreamingEcho is server side streaming.
pub async fn server_streaming_echo(
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
);
@@ -88,10 +88,8 @@
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
);
@@ -102,10 +100,8 @@
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
);
@@ -126,9 +122,7 @@
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the ServerStreamingEcho method.
- type ServerStreamingEchoStream: futures_util::Stream<
- Item = Result<super::EchoResponse, dubbo::status::Status>,
- >
+ type ServerStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
+ Send
+ 'static;
/// ServerStreamingEcho is server side streaming.
@@ -142,19 +136,14 @@
request: Request<Decoding<super::EchoRequest>>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the BidirectionalStreamingEcho method.
- type BidirectionalStreamingEchoStream: futures_util::Stream<
- Item = Result<super::EchoResponse, dubbo::status::Status>,
- >
+ type BidirectionalStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
+ Send
+ 'static;
/// BidirectionalStreamingEcho is bidi streaming.
async fn bidirectional_streaming_echo(
&self,
request: Request<Decoding<super::EchoRequest>>,
- ) -> Result<
- Response<Self::BidirectionalStreamingEchoStream>,
- dubbo::status::Status,
- >;
+ ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, dubbo::status::Status>;
}
/// Echo is the echo service.
#[derive(Debug)]
@@ -184,10 +173,7 @@
type Response = http::Response<BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
- fn poll_ready(
- &mut self,
- _cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -200,26 +186,18 @@
}
impl<T: Echo> UnarySvc<super::EchoRequest> for UnaryEchoServer<T> {
type Response = super::EchoResponse;
- type Future = BoxFuture<
- Response<Self::Response>,
- dubbo::status::Status,
- >;
- fn call(
- &mut self,
- request: Request<super::EchoRequest>,
- ) -> Self::Future {
+ type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
+ fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move { inner.unary_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server.unary(UnaryEchoServer { inner }, req).await;
Ok(res)
};
@@ -230,32 +208,22 @@
struct ServerStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
- for ServerStreamingEchoServer<T> {
+ impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for ServerStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::ServerStreamingEchoStream;
- type Future = BoxFuture<
- Response<Self::ResponseStream>,
- dubbo::status::Status,
- >;
- fn call(
- &mut self,
- request: Request<super::EchoRequest>,
- ) -> Self::Future {
+ type Future =
+ BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
+ fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
- inner.server_streaming_echo(request).await
- };
+ let fut = async move { inner.server_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
.server_streaming(ServerStreamingEchoServer { inner }, req)
.await;
@@ -268,31 +236,23 @@
struct ClientStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
- for ClientStreamingEchoServer<T> {
+ impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for ClientStreamingEchoServer<T> {
type Response = super::EchoResponse;
- type Future = BoxFuture<
- Response<Self::Response>,
- dubbo::status::Status,
- >;
+ type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
- inner.client_streaming_echo(request).await
- };
+ let fut = async move { inner.client_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
.client_streaming(ClientStreamingEchoServer { inner }, req)
.await;
@@ -305,56 +265,41 @@
struct BidirectionalStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> StreamingSvc<super::EchoRequest>
- for BidirectionalStreamingEchoServer<T> {
+ impl<T: Echo> StreamingSvc<super::EchoRequest> for BidirectionalStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::BidirectionalStreamingEchoStream;
- type Future = BoxFuture<
- Response<Self::ResponseStream>,
- dubbo::status::Status,
- >;
+ type Future =
+ BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
- inner.bidirectional_streaming_echo(request).await
- };
+ let fut =
+ async move { inner.bidirectional_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
- .bidi_streaming(
- BidirectionalStreamingEchoServer {
- inner,
- },
- req,
- )
+ .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req)
.await;
Ok(res)
};
Box::pin(fut)
}
- _ => {
- Box::pin(async move {
- Ok(
- http::Response::builder()
- .status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
- .body(empty_body())
- .unwrap(),
- )
- })
- }
+ _ => Box::pin(async move {
+ Ok(http::Response::builder()
+ .status(200)
+ .header("grpc-status", "12")
+ .header("content-type", "application/grpc")
+ .body(empty_body())
+ .unwrap())
+ }),
}
}
}
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index c4b20cc..c34b19f 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -24,7 +24,6 @@
tokio-stream = "0.1"
tracing = "0.1"
tracing-subscriber = "0.2.0"
-
dubbo = {path = "../../dubbo", version = "0.2.0"}
dubbo-config = {path = "../../config", version = "0.2.0"}
dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = "0.2.0"}
diff --git a/examples/greeter/dubbo.yaml b/examples/greeter/dubbo.yaml
index 1e4ade8..b9bd353 100644
--- a/examples/greeter/dubbo.yaml
+++ b/examples/greeter/dubbo.yaml
@@ -1,18 +1,17 @@
-name: dubbo
-service:
- org.apache.dubbo.sample.tri.Greeter:
- version: 1.0.0
- group: test
- protocol: triple
- registry: ''
- serializer: json
- protocol_configs:
- triple:
- ip: 0.0.0.0
- port: '8888'
- name: triple
-protocols:
- triple:
- ip: 0.0.0.0
- port: '8888'
- name: triple
+dubbo:
+ protocols:
+ triple:
+ ip: 0.0.0.0
+ port: '8888'
+ name: tri
+ registries:
+ demoZK:
+ protocol: zookeeper
+ address: 127.0.0.1:2181
+ provider:
+ services:
+ GreeterProvider:
+ version: 1.0.0
+ group: test
+ protocol: triple
+ interface: org.apache.dubbo.sample.tri.Greeter
\ No newline at end of file
diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs
index 93d509a..c673872 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -15,19 +15,20 @@
* limitations under the License.
*/
+use std::time::Duration;
+
+use tracing::Level;
+use tracing_subscriber::FmtSubscriber;
+
+use dubbo::cluster::support::cluster_invoker::ClusterInvoker;
+use dubbo::{cluster::directory::RegistryDirectory, codegen::*};
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
+use protos::{greeter_client::GreeterClient, GreeterRequest};
+
pub mod protos {
#![allow(non_camel_case_types)]
include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
}
-use std::{str::FromStr, time::Duration, env};
-
-use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
-
-use dubbo::{cluster::directory::RegistryDirectory, codegen::*, invocation::RpcInvocation};
-use http;
-use protos::{greeter_client::GreeterClient, GreeterRequest};
-use tracing::Level;
-use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() {
@@ -41,19 +42,13 @@
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
- let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
- Ok(val) => val,
- Err(_) => "localhost:2181".to_string(),
- };
- let zkr = ZookeeperRegistry::new(&zk_connect_string);
+ let zkr = ZookeeperRegistry::default();
let directory = RegistryDirectory::new(Box::new(zkr));
+ let cluster_invoker = ClusterInvoker::with_directory(directory);
- let http_uri = http::Uri::from_str(&"http://1.1.1.1:8888").unwrap();
-
- let mut cli = GreeterClient::new(Connection::new().with_host(http_uri));
- cli = cli.with_directory(Box::new(directory));
- //let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string());
-
+ let mut cli = GreeterClient::new(Connection::new());
+ cli = cli.with_cluster(cluster_invoker);
+ // using loop for loadbalance test
println!("# unary call");
let resp = cli
.greet(Request::new(GreeterRequest {
@@ -67,7 +62,5 @@
let (_parts, body) = resp.into_parts();
println!("Response: {:?}", body);
- loop {
- tokio::time::sleep(Duration::from_millis(100)).await;
- }
+ tokio::time::sleep(Duration::from_millis(2000)).await;
}
diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs
index 37de662..ab797de 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -15,47 +15,49 @@
* limitations under the License.
*/
-pub mod protos {
- #![allow(non_camel_case_types)]
- include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
-}
+use std::{io::ErrorKind, pin::Pin};
+use async_trait::async_trait;
+use futures_util::Stream;
use futures_util::StreamExt;
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tracing::info;
+
+use dubbo::{codegen::*, Dubbo};
+use dubbo_config::RootConfig;
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
use protos::{
greeter_server::{register_server, Greeter},
GreeterReply, GreeterRequest,
};
-use std::{io::ErrorKind, pin::Pin};
-
-use async_trait::async_trait;
-use futures_util::Stream;
-use tokio::sync::mpsc;
-use tokio_stream::wrappers::ReceiverStream;
-
-use dubbo::{codegen::*, Dubbo};
-use dubbo_config::RootConfig;
+pub mod protos {
+ #![allow(non_camel_case_types)]
+ include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
+}
type ResponseStream =
Pin<Box<dyn Stream<Item = Result<GreeterReply, dubbo::status::Status>> + Send>>;
#[tokio::main]
async fn main() {
+ use tracing::{span, Level};
+ let span = span!(Level::DEBUG, "greeter.server");
+ let _enter = span.enter();
register_server(GreeterServerImpl {
name: "greeter".to_string(),
});
-
- // Dubbo::new().start().await;
- Dubbo::new()
- .with_config({
- let r = RootConfig::new();
- match r.load() {
- Ok(config) => config,
- Err(_err) => panic!("err: {:?}", _err), // response was droped
- }
- })
- .start()
- .await;
+ let zkr = ZookeeperRegistry::default();
+ let r = RootConfig::new();
+ let r = match r.load() {
+ Ok(config) => config,
+ Err(_err) => panic!("err: {:?}", _err), // response was droped
+ };
+ let mut f = Dubbo::new()
+ .with_config(r)
+ .add_registry("default", Box::new(zkr));
+ f.start().await;
}
#[allow(dead_code)]
@@ -71,7 +73,7 @@
&self,
request: Request<GreeterRequest>,
) -> Result<Response<GreeterReply>, dubbo::status::Status> {
- println!("GreeterServer::greet {:?}", request.metadata);
+ info!("GreeterServer::greet {:?}", request.metadata);
Ok(Response::new(GreeterReply {
message: "hello, dubbo-rust".to_string(),
diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml
index cf040fa..722462a 100644
--- a/registry-zookeeper/Cargo.toml
+++ b/registry-zookeeper/Cargo.toml
@@ -7,8 +7,9 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-zookeeper = "0.6.1"
+zookeeper = "0.7.0"
dubbo = {path = "../dubbo/", version = "0.2.0"}
serde_json = "1.0"
serde = {version = "1.0.145",features = ["derive"]}
tracing = "0.1"
+urlencoding = "2.1.2"
\ No newline at end of file
diff --git a/registry-zookeeper/src/lib.rs b/registry-zookeeper/src/lib.rs
index 7481129..ccfce10 100644
--- a/registry-zookeeper/src/lib.rs
+++ b/registry-zookeeper/src/lib.rs
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-
pub mod zookeeper_registry;
#[cfg(test)]
diff --git a/registry-zookeeper/src/zookeeper_registry.rs b/registry-zookeeper/src/zookeeper_registry.rs
index 6c2ef91..ad96638 100644
--- a/registry-zookeeper/src/zookeeper_registry.rs
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -17,26 +17,32 @@
#![allow(unused_variables, dead_code, missing_docs)]
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::env;
+use std::sync::RwLock;
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use serde::{Deserialize, Serialize};
+use tracing::{error, info};
+#[allow(unused_imports)]
+use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+
+use dubbo::cluster::support::cluster_invoker::ClusterInvoker;
+use dubbo::codegen::BoxRegistry;
+use dubbo::common::consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY};
use dubbo::common::url::Url;
-use dubbo::registry::memory_registry::MemoryNotifyListener;
+use dubbo::registry::integration::ClusterRegistryIntegration;
+use dubbo::registry::memory_registry::{MemoryNotifyListener, MemoryRegistry};
use dubbo::registry::NotifyListener;
use dubbo::registry::Registry;
use dubbo::registry::ServiceEvent;
use dubbo::StdError;
-use serde::{Deserialize, Serialize};
-use tracing::info;
-use zookeeper::Acl;
-use zookeeper::CreateMode;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::sync::Arc;
-use std::sync::RwLock;
-use std::time::Duration;
-use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
// 从url中获取服务注册的元数据
-/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
-/// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
+// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
pub const REGISTRY_GROUP_KEY: &str = "registry.group";
@@ -51,20 +57,8 @@
pub struct ZookeeperRegistry {
root_path: String,
zk_client: Arc<ZooKeeper>,
-
listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as Registry>::NotifyListener>>>,
-}
-
-pub struct MyNotifyListener {}
-
-impl NotifyListener for MyNotifyListener {
- fn notify(&self, event: dubbo::registry::ServiceEvent) {
- todo!()
- }
-
- fn notify_all(&self, event: dubbo::registry::ServiceEvent) {
- todo!()
- }
+ memory_registry: Arc<Mutex<MemoryRegistry>>,
}
#[derive(Serialize, Deserialize, Debug)]
@@ -92,11 +86,12 @@
pub fn new(connect_string: &str) -> ZookeeperRegistry {
let zk_client =
ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
+ info!("zk server connect string: {}", connect_string);
ZookeeperRegistry {
root_path: "/services".to_string(),
zk_client: Arc::new(zk_client),
-
listeners: RwLock::new(HashMap::new()),
+ memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
}
}
@@ -108,15 +103,16 @@
) -> ServiceInstancesChangedListener {
let mut service_names = HashSet::new();
service_names.insert(service_name.clone());
- return ServiceInstancesChangedListener {
+ ServiceInstancesChangedListener {
zk_client: Arc::clone(&self.zk_client),
- path: path,
-
+ path,
service_name: service_name.clone(),
- listener: listener,
- };
+ listener,
+ }
}
+ // metadata /dubbo/mapping designed for application discovery; deprecated for currently using interface discovery
+ // #[deprecated]
fn get_app_name(&self, service_name: String) -> String {
let res = self
.zk_client
@@ -129,25 +125,154 @@
};
s.to_string()
}
+
+ pub fn get_client(&self) -> Arc<ZooKeeper> {
+ self.zk_client.clone()
+ }
+
+ // If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned.
+ pub fn create_path(
+ &self,
+ path: &str,
+ data: &str,
+ create_mode: CreateMode,
+ ) -> Result<(), StdError> {
+ if self.exists_path(path) {
+ self.zk_client
+ .set_data(path, data.as_bytes().to_vec(), None)
+ .unwrap_or_else(|_| panic!("set data to {} failed.", path));
+ return Ok(());
+ }
+ let zk_result = self.zk_client.create(
+ path,
+ data.as_bytes().to_vec(),
+ Acl::open_unsafe().clone(),
+ create_mode,
+ );
+ match zk_result {
+ Ok(_) => Ok(()),
+ Err(err) => {
+ error!("zk path {} parent not exists.", path);
+ Err(Box::try_from(err).unwrap())
+ }
+ }
+ }
+
+ // For avoiding Err(ZkError::NoNode) when parent node is't exists
+ pub fn create_path_with_parent_check(
+ &self,
+ path: &str,
+ data: &str,
+ create_mode: CreateMode,
+ ) -> Result<(), StdError> {
+ let nodes: Vec<&str> = path.split('/').collect();
+ let mut current: String = String::new();
+ let children = *nodes.last().unwrap();
+ for node_key in nodes {
+ if node_key.is_empty() {
+ continue;
+ };
+ current.push('/');
+ current.push_str(node_key);
+ if !self.exists_path(current.as_str()) {
+ let new_create_mode = match children == node_key {
+ true => create_mode,
+ false => CreateMode::Persistent,
+ };
+ let new_data = match children == node_key {
+ true => data,
+ false => "",
+ };
+ self.create_path(current.as_str(), new_data, new_create_mode)
+ .unwrap();
+ }
+ }
+ Ok(())
+ }
+
+ pub fn delete_path(&self, path: &str) {
+ if self.exists_path(path) {
+ self.get_client().delete(path, None).unwrap()
+ }
+ }
+
+ pub fn exists_path(&self, path: &str) -> bool {
+ self.zk_client.exists(path, false).unwrap().is_some()
+ }
+
+ pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
+ if self.exists_path(path) {
+ let zk_result = self.zk_client.get_data(path, watch);
+ if let Ok(..) = zk_result {
+ Some(String::from_utf8(zk_result.unwrap().0).unwrap())
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ }
+}
+
+impl Default for ZookeeperRegistry {
+ fn default() -> ZookeeperRegistry {
+ let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+ Ok(val) => val,
+ Err(_) => {
+ let default_connect_string = "localhost:2181";
+ info!(
+ "No ZOOKEEPER_SERVERS env value, using {} as default.",
+ default_connect_string
+ );
+ default_connect_string.to_string()
+ }
+ };
+ info!(
+ "using external registry with it's connect string {}",
+ zk_connect_string.as_str()
+ );
+ ZookeeperRegistry::new(zk_connect_string.as_str())
+ }
}
impl Registry for ZookeeperRegistry {
type NotifyListener = MemoryNotifyListener;
fn register(&mut self, url: Url) -> Result<(), StdError> {
- todo!();
+ let zk_path = format!(
+ "/{}/{}/{}/{}",
+ DUBBO_KEY,
+ url.service_name,
+ PROVIDERS_KEY,
+ url.encoded_raw_url_string()
+ );
+ self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
+ Ok(())
}
fn unregister(&mut self, url: Url) -> Result<(), StdError> {
- todo!();
+ let zk_path = format!(
+ "/{}/{}/{}/{}",
+ DUBBO_KEY,
+ url.service_name,
+ PROVIDERS_KEY,
+ url.encoded_raw_url_string()
+ );
+ self.delete_path(zk_path.as_str());
+ Ok(())
}
+ // for consumer to find the changes of providers
fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
- let binding = url.get_service_name();
- let service_name = binding.get(0).unwrap();
- let app_name = self.get_app_name(service_name.clone());
- let path = self.root_path.clone() + "/" + &app_name;
- if self.listeners.read().unwrap().get(service_name).is_some() {
+ let service_name = url.get_service_name();
+ let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
+ if self
+ .listeners
+ .read()
+ .unwrap()
+ .get(service_name.as_str())
+ .is_some()
+ {
return Ok(());
}
@@ -158,37 +283,32 @@
.insert(service_name.to_string(), Arc::clone(&arc_listener));
let zk_listener = self.create_listener(
- path.clone(),
+ zk_path.clone(),
service_name.to_string(),
Arc::clone(&arc_listener),
);
- let res = self.zk_client.get_children_w(&path, zk_listener);
- let result: Vec<Url> = res
- .unwrap()
- .iter()
- .map(|node_key| {
- let zk_res = self.zk_client.get_data(
- &(self.root_path.clone() + "/" + &app_name + "/" + &node_key),
- false,
- );
- let vec_u8 = zk_res.unwrap().0;
- let sstr = std::str::from_utf8(&vec_u8).unwrap();
- let instance: ZkServiceInstance = serde_json::from_str(sstr).unwrap();
- let url = Url::from_url(&format!(
- "triple://{}:{}/{}",
- instance.get_host(),
- instance.get_port(),
- service_name
- ))
- .unwrap();
- url
- })
- .collect();
-
- info!("notifing {}->{:?}", service_name, result);
+ let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
+ let result = match zk_changed_paths {
+ Err(err) => {
+ error!("zk subscribe error: {}", err);
+ Vec::new()
+ }
+ Ok(urls) => urls
+ .iter()
+ .map(|node_key| {
+ let provider_url: Url = urlencoding::decode(node_key)
+ .unwrap()
+ .to_string()
+ .as_str()
+ .into();
+ provider_url
+ })
+ .collect(),
+ };
+ info!("notifying {}->{:?}", service_name, result);
arc_listener.notify(ServiceEvent {
- key: service_name.to_string(),
+ key: service_name,
action: String::from("ADD"),
service: result,
});
@@ -203,7 +323,6 @@
pub struct ServiceInstancesChangedListener {
zk_client: Arc<ZooKeeper>,
path: String,
-
service_name: String,
listener: Arc<MemoryNotifyListener>,
}
@@ -215,40 +334,26 @@
let event_path = path.clone();
let dirs = self
.zk_client
- .get_children(&event_path.clone(), false)
+ .get_children(&event_path, false)
.expect("msg");
let result: Vec<Url> = dirs
.iter()
.map(|node_key| {
- let zk_res = self
- .zk_client
- .get_data(&(event_path.clone() + "/" + node_key), false);
- let vec_u8 = zk_res.unwrap().0;
- let sstr = std::str::from_utf8(&vec_u8).unwrap();
- let instance: ZkServiceInstance = serde_json::from_str(sstr).unwrap();
- let url = Url::from_url(&format!(
- "triple://{}:{}/{}",
- instance.get_host(),
- instance.get_port(),
- self.service_name
- ))
- .unwrap();
- url
+ let provider_url: Url = node_key.as_str().into();
+ provider_url
})
.collect();
-
let res = self.zk_client.get_children_w(
&path,
ServiceInstancesChangedListener {
zk_client: Arc::clone(&self.zk_client),
path: path.clone(),
-
service_name: self.service_name.clone(),
listener: Arc::clone(&self.listener),
},
);
- info!("notifing {}->{:?}", self.service_name, result);
+ info!("notify {}->{:?}", self.service_name, result);
self.listener.notify(ServiceEvent {
key: self.service_name.clone(),
action: String::from("ADD"),
@@ -260,48 +365,102 @@
impl NotifyListener for ServiceInstancesChangedListener {
fn notify(&self, event: ServiceEvent) {
- todo!()
+ self.listener.notify(event);
}
fn notify_all(&self, event: ServiceEvent) {
+ self.listener.notify(event);
+ }
+}
+
+impl ClusterRegistryIntegration for ZookeeperRegistry {
+ fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
todo!()
}
}
-#[test]
-fn it_works() {
- let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181";
- let zk_client =
- ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
- let watcher = Arc::new(Some(TestZkWatcher {
- watcher: Arc::new(None),
- }));
- watcher.as_ref().expect("").watcher = Arc::clone(&watcher);
- let x = watcher.as_ref().expect("");
- zk_client.get_children_w("/test", x);
- zk_client.delete("/test/a", None);
- zk_client.delete("/test/b", None);
- let zk_res = zk_client.create(
- "/test/a",
- vec![1, 3],
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let zk_res = zk_client.create(
- "/test/b",
- vec![1, 3],
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- zk_client.close();
-}
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
-struct TestZkWatcher {
- pub watcher: Arc<Option<TestZkWatcher>>,
-}
+ use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
-impl Watcher for TestZkWatcher {
- fn handle(&self, event: WatchedEvent) {
- println!("event: {:?}", event);
+ use crate::zookeeper_registry::ZookeeperRegistry;
+
+ struct TestZkWatcher {
+ pub watcher: Arc<Option<TestZkWatcher>>,
+ }
+
+ impl Watcher for TestZkWatcher {
+ fn handle(&self, event: WatchedEvent) {
+ println!("event: {:?}", event);
+ }
+ }
+
+ #[test]
+ fn zk_read_write_watcher() {
+ // https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
+ // using ENV to set zookeeper server urls
+ let zkr = ZookeeperRegistry::default();
+ let zk_client = zkr.get_client();
+ let watcher = TestZkWatcher {
+ watcher: Arc::new(None),
+ };
+ if zk_client.exists("/test", true).is_err() {
+ zk_client
+ .create(
+ "/test",
+ vec![1, 3],
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ )
+ .unwrap();
+ }
+ let zk_res = zk_client.create(
+ "/test",
+ "hello".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let result = zk_client.get_children_w("/test", watcher);
+ assert!(result.is_ok());
+ if zk_client.exists("/test/a", true).is_err() {
+ zk_client.delete("/test/a", None).unwrap();
+ }
+ if zk_client.exists("/test/a", true).is_err() {
+ zk_client.delete("/test/b", None).unwrap();
+ }
+ let zk_res = zk_client.create(
+ "/test/a",
+ "hello".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let zk_res = zk_client.create(
+ "/test/b",
+ "world".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let test_a_result = zk_client.get_data("/test", true);
+ assert!(test_a_result.is_ok());
+ let vec1 = test_a_result.unwrap().0;
+ // data in /test should equals to "hello"
+ assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
+ zk_client.close().unwrap()
+ }
+
+ #[test]
+ fn create_path_with_parent_check() {
+ let zkr = ZookeeperRegistry::default();
+ let path = "/du1bbo/test11111";
+ let data = "hello";
+ // creating a child on a not exists parent, throw a NoNode error.
+ // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
+ // assert!(result.is_err());
+ let create_with_parent_check_result =
+ zkr.create_path_with_parent_check(path, data, CreateMode::Ephemeral);
+ assert!(create_with_parent_check_result.is_ok());
+ assert_eq!(data, zkr.get_data(path, false).unwrap());
}
}
diff --git a/scripts/ci-check.sh b/scripts/ci-check.sh
new file mode 100755
index 0000000..50d25f1
--- /dev/null
+++ b/scripts/ci-check.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+cargo fmt --all -- --check
+# use stable channel
+cargo check
+target/debug/greeter-server && target/debug/greeter-client && sleep 3s ;