Rft: add Cluster to ClientBuilder (#142)
comment some codes
* refactor(triple): rm unused var in clientBuilder
* refactor(dubbo): delete some codes
* refactor(cluster): rm some duplicate codes
* refactor(registry): rm unused import
* refactor(triple): use two build func for different usage
* style: cargo fmt --all
* refactor(cluster): rm registryWrapper
* refactor(cluster): delete print
* chore(dubbo): upgrade hyper version in cargo.toml
* refactor(cluster): comment the logic of clone body
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 1e1c9fb..bfcfe45 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -90,11 +90,6 @@
}
}
- pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
- self.inner = self.inner.with_cluster(invoker);
- self
- }
-
#methods
}
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 91b19d6..d0ab2a4 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -10,7 +10,7 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-hyper = { version = "0.14.19", features = ["full"] }
+hyper = { version = "0.14.26", features = ["full"] }
http = "0.2"
tower-service.workspace = true
http-body = "0.4.4"
@@ -33,7 +33,7 @@
axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
-aws-smithy-http = "0.54.1"
+aws-smithy-http = "0.55.2"
dyn-clone = "1.0.11"
itertools.workspace = true
urlencoding.workspace = true
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index e74fc6d..afe9657 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -23,39 +23,21 @@
};
use crate::{
+ codegen::TripleInvoker,
invocation::{Invocation, RpcInvocation},
- registry::{memory_registry::MemoryNotifyListener, BoxRegistry, RegistryWrapper},
+ protocol::BoxInvoker,
+ registry::{memory_registry::MemoryNotifyListener, BoxRegistry},
};
use dubbo_base::Url;
use dubbo_logger::tracing;
+use crate::cluster::Directory;
+
/// Directory.
///
/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
-pub trait Directory: Debug + DirectoryClone {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url>;
-}
-pub trait DirectoryClone {
- fn clone_box(&self) -> Box<dyn Directory>;
-}
-
-impl<T> DirectoryClone for T
-where
- T: 'static + Directory + Clone,
-{
- fn clone_box(&self) -> Box<dyn Directory> {
- Box::new(self.clone())
- }
-}
-
-impl Clone for Box<dyn Directory> {
- fn clone(&self) -> Box<dyn Directory> {
- self.clone_box()
- }
-}
-
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct StaticDirectory {
uri: http::Uri,
}
@@ -78,7 +60,7 @@
}
impl Directory for StaticDirectory {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let url = Url::from_url(&format!(
"tri://{}:{}/{}",
self.uri.host().unwrap(),
@@ -86,43 +68,28 @@
invocation.get_target_service_unique_name(),
))
.unwrap();
- vec![url]
+ let invoker = Box::new(TripleInvoker::new(url));
+ vec![invoker]
}
}
-impl DirectoryClone for StaticDirectory {
- fn clone_box(&self) -> Box<dyn Directory> {
- Box::new(StaticDirectory {
- uri: self.uri.clone(),
- })
- }
-}
-
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct RegistryDirectory {
- registry: RegistryWrapper,
+ registry: Arc<BoxRegistry>,
service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
}
impl RegistryDirectory {
pub fn new(registry: BoxRegistry) -> RegistryDirectory {
RegistryDirectory {
- registry: RegistryWrapper {
- registry: Some(registry),
- },
+ registry: Arc::new(registry),
service_instances: Arc::new(RwLock::new(HashMap::new())),
}
}
}
-impl DirectoryClone for RegistryDirectory {
- fn clone_box(&self) -> Box<dyn Directory> {
- todo!()
- }
-}
-
impl Directory for RegistryDirectory {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let service_name = invocation.get_target_service_unique_name();
let url = Url::from_url(&format!(
@@ -132,9 +99,6 @@
.unwrap();
self.registry
- .registry
- .as_ref()
- .expect("msg")
.subscribe(
url,
Arc::new(MemoryNotifyListener {
@@ -149,6 +113,11 @@
.expect("service_instances.read");
let binding = Vec::new();
let url_vec = map.get(&service_name).unwrap_or(&binding);
- url_vec.to_vec()
+ // url_vec.to_vec()
+ let mut invokers: Vec<BoxInvoker> = vec![];
+ for item in url_vec.iter() {
+ invokers.push(Box::new(TripleInvoker::new(item.clone())));
+ }
+ invokers
}
}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 4f73d2f..d1f96f9 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -19,6 +19,7 @@
use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
+use dyn_clone::DynClone;
use crate::{
empty_body,
@@ -28,13 +29,14 @@
pub mod directory;
pub mod loadbalance;
-pub mod support;
-pub trait Directory: Debug {
+pub trait Directory: Debug + DynClone {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
- fn is_empty(&self) -> bool;
+ // fn is_empty(&self) -> bool;
}
+dyn_clone::clone_trait_object!(Directory);
+
type BoxDirectory = Box<dyn Directory + Send + Sync>;
pub trait Cluster {
@@ -76,13 +78,12 @@
}
fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
- println!("req: {}", req.body().content_length().unwrap());
- let clone_body = req.body().try_clone().unwrap();
- let mut clone_req = http::Request::builder()
- .uri(req.uri().clone())
- .method(req.method().clone());
- *clone_req.headers_mut().unwrap() = req.headers().clone();
- let r = clone_req.body(clone_body).unwrap();
+ // let clone_body = req.body().try_clone().unwrap();
+ // let mut clone_req = http::Request::builder()
+ // .uri(req.uri().clone())
+ // .method(req.method().clone());
+ // *clone_req.headers_mut().unwrap() = req.headers().clone();
+ // let r = clone_req.body(clone_body).unwrap();
let invokers = self.dir.list(
RpcInvocation::default()
.with_service_unique_name("hello".to_string())
@@ -90,7 +91,7 @@
);
for mut invoker in invokers {
let fut = async move {
- let res = invoker.call(r).await;
+ let res = invoker.call(req).await;
return res;
};
return Box::pin(fut);
@@ -110,7 +111,7 @@
}
}
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone)]
pub struct MockDirectory {
// router_chain: RouterChain,
invokers: Vec<BoxInvoker>,
@@ -134,9 +135,9 @@
self.invokers.clone()
}
- fn is_empty(&self) -> bool {
- false
- }
+ // fn is_empty(&self) -> bool {
+ // false
+ // }
}
#[derive(Debug, Default)]
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs b/dubbo/src/cluster/support/cluster_invoker.rs
deleted file mode 100644
index 0ccca48..0000000
--- a/dubbo/src/cluster/support/cluster_invoker.rs
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use aws_smithy_http::body::SdkBody;
-use std::{str::FromStr, sync::Arc};
-
-use dubbo_base::Url;
-use http::{uri::PathAndQuery, Request};
-
-use crate::{
- cluster::{
- loadbalance::{types::BoxLoadBalance, LOAD_BALANCE_EXTENSIONS},
- support::DEFAULT_LOADBALANCE,
- },
- codegen::{Directory, RegistryDirectory, TripleClient},
- invocation::RpcInvocation,
-};
-
-#[derive(Debug, Clone)]
-pub struct ClusterInvoker {
- directory: Arc<RegistryDirectory>,
- destroyed: bool,
-}
-
-pub trait ClusterInvokerSelector {
- /// Select a invoker using loadbalance policy.
- fn select(
- &self,
- invocation: Arc<RpcInvocation>,
- invokers: Arc<Vec<Url>>,
- excluded: Arc<Vec<Url>>,
- ) -> Option<Url>;
-
- fn do_select(
- &self,
- loadbalance_key: Option<&str>,
- invocation: Arc<RpcInvocation>,
- invokers: Arc<Vec<Url>>,
- ) -> Option<Url>;
-}
-
-pub trait ClusterRequestBuilder {
- fn build_req(
- &self,
- triple_client: &mut TripleClient,
- path: http::uri::PathAndQuery,
- invocation: Arc<RpcInvocation>,
- body: SdkBody,
- ) -> http::Request<SdkBody>;
-}
-
-impl ClusterInvoker {
- pub fn with_directory(registry_directory: RegistryDirectory) -> Self {
- ClusterInvoker {
- directory: Arc::new(registry_directory),
- destroyed: false,
- }
- }
-
- pub fn directory(&self) -> Arc<RegistryDirectory> {
- self.directory.clone()
- }
-
- pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance {
- if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) {
- LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap()
- } else {
- println!(
- "loadbalance {} not found, use default loadbalance {}",
- loadbalance_key, DEFAULT_LOADBALANCE
- );
- LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap()
- }
- }
-
- pub fn is_available(&self, invocation: Arc<RpcInvocation>) -> bool {
- !self.destroyed() && !self.directory.list(invocation).is_empty()
- }
-
- pub fn destroyed(&self) -> bool {
- self.destroyed
- }
-}
-
-impl ClusterInvokerSelector for ClusterInvoker {
- fn select(
- &self,
- invocation: Arc<RpcInvocation>,
- invokers: Arc<Vec<Url>>,
- _excluded: Arc<Vec<Url>>,
- ) -> Option<Url> {
- if invokers.is_empty() {
- return None;
- }
- let instance_count = invokers.len();
- return if instance_count == 1 {
- Some(invokers.as_ref().first()?.clone())
- } else {
- let loadbalance = Some(DEFAULT_LOADBALANCE);
- self.do_select(loadbalance, invocation, invokers)
- };
- }
-
- /// picking instance invoker url from registry directory
- fn do_select(
- &self,
- loadbalance_key: Option<&str>,
- invocation: Arc<RpcInvocation>,
- invokers: Arc<Vec<Url>>,
- ) -> Option<Url> {
- let loadbalance = self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE));
- loadbalance.select(invokers, None, invocation)
- }
-}
-
-impl ClusterRequestBuilder for ClusterInvoker {
- fn build_req(
- &self,
- triple_client: &mut TripleClient,
- path: PathAndQuery,
- invocation: Arc<RpcInvocation>,
- body: SdkBody,
- ) -> Request<SdkBody> {
- let invokers = self.directory.list(invocation.clone());
- let invoker_url = self
- .select(invocation, Arc::new(invokers), Arc::new(Vec::new()))
- .expect("no valid provider");
- let http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", invoker_url.ip, invoker_url.port))
- .unwrap();
- triple_client.map_request(http_uri, path, body)
- }
-}
diff --git a/dubbo/src/cluster/support/mod.rs b/dubbo/src/cluster/support/mod.rs
deleted file mode 100644
index ae42cc2..0000000
--- a/dubbo/src/cluster/support/mod.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-pub mod cluster_invoker;
-
-pub const DEFAULT_LOADBALANCE: &str = "random";
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 98d784f..412feb9 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -27,14 +27,11 @@
pub use tower_service::Service;
pub use super::{
- cluster::{
- directory::{Directory, RegistryDirectory},
- support::cluster_invoker::ClusterInvoker,
- },
+ cluster::directory::RegistryDirectory,
empty_body,
invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
protocol::{triple::triple_invoker::TripleInvoker, Invoker},
- registry::{BoxRegistry, Registry, RegistryWrapper},
+ registry::{BoxRegistry, Registry},
triple::{
client::TripleClient,
codec::{prost::ProstCodec, Codec},
diff --git a/dubbo/src/registry/integration.rs b/dubbo/src/registry/integration.rs
index 15b82d0..2944f98 100644
--- a/dubbo/src/registry/integration.rs
+++ b/dubbo/src/registry/integration.rs
@@ -14,10 +14,3 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-use crate::{cluster::support::cluster_invoker::ClusterInvoker, registry::BoxRegistry};
-use std::sync::Arc;
-
-pub trait ClusterRegistryIntegration {
- /// get cluster invoker struct
- fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>>;
-}
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 31106f0..2a95452 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -60,20 +60,3 @@
f.write_str("BoxRegistry")
}
}
-
-#[derive(Default)]
-pub struct RegistryWrapper {
- pub registry: Option<Box<dyn Registry>>,
-}
-
-impl Clone for RegistryWrapper {
- fn clone(&self) -> Self {
- Self { registry: None }
- }
-}
-
-impl Debug for RegistryWrapper {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("RegistryWrapper").finish()
- }
-}
diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs
index 29957a6..06ecd62 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -15,9 +15,12 @@
* limitations under the License.
*/
+use std::sync::Arc;
+
use crate::{
- cluster::{directory::StaticDirectory, Cluster, MockCluster, MockDirectory},
- codegen::{ClusterInvoker, Directory, RegistryDirectory, TripleInvoker},
+ cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory},
+ codegen::{RegistryDirectory, RpcInvocation, TripleInvoker},
+ protocol::BoxInvoker,
triple::compression::CompressionEncoding,
utils::boxed_clone::BoxCloneService,
};
@@ -35,7 +38,6 @@
pub timeout: Option<u64>,
pub connector: &'static str,
directory: Option<Box<dyn Directory>>,
- cluster_invoker: Option<ClusterInvoker>,
pub direct: bool,
host: String,
}
@@ -46,7 +48,6 @@
timeout: None,
connector: "",
directory: None,
- cluster_invoker: None,
direct: false,
host: "".to_string(),
}
@@ -57,7 +58,6 @@
timeout: None,
connector: "",
directory: Some(Box::new(StaticDirectory::new(&host))),
- cluster_invoker: None,
direct: true,
host: host.clone().to_string(),
}
@@ -74,15 +74,13 @@
pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
Self {
directory: Some(directory),
- cluster_invoker: None,
..self
}
}
pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self {
Self {
- directory: None,
- cluster_invoker: Some(ClusterInvoker::with_directory(registry)),
+ directory: Some(Box::new(registry)),
..self
}
}
@@ -97,7 +95,6 @@
pub fn with_connector(self, connector: &'static str) -> Self {
Self {
connector: connector,
- cluster_invoker: None,
..self
}
}
@@ -106,25 +103,31 @@
Self { direct, ..self }
}
- pub fn build(self) -> TripleClient {
+ pub(crate) fn direct_build(self) -> TripleClient {
let mut cli = TripleClient {
send_compression_encoding: Some(CompressionEncoding::Gzip),
- directory: self.directory,
- cluster_invoker: self.cluster_invoker,
+ builder: Some(self.clone()),
invoker: None,
};
+ cli.invoker = Some(Box::new(TripleInvoker::new(
+ Url::from_url(&self.host).unwrap(),
+ )));
+ return cli;
+ }
+
+ pub fn build(self, invocation: Arc<RpcInvocation>) -> Option<BoxInvoker> {
if self.direct {
- cli.invoker = Some(Box::new(TripleInvoker::new(
+ return Some(Box::new(TripleInvoker::new(
Url::from_url(&self.host).unwrap(),
)));
- return cli;
}
+ let invokers = match self.directory {
+ Some(v) => v.list(invocation),
+ None => panic!("use direct connection"),
+ };
- let cluster = MockCluster::default().join(Box::new(MockDirectory::new(vec![Box::new(
- TripleInvoker::new(Url::from_url("http://127.0.0.1:8888").unwrap()),
- )])));
+ let cluster = MockCluster::default().join(Box::new(MockDirectory::new(invokers)));
- cli.invoker = Some(cluster);
- cli
+ return Some(cluster);
}
}
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index eb7934d..124cfcf 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-use std::{str::FromStr, sync::Arc};
+use std::str::FromStr;
use futures_util::{future, stream, StreamExt, TryStreamExt};
use aws_smithy_http::body::SdkBody;
use http::HeaderValue;
-use rand::prelude::SliceRandom;
-use tower_service::Service;
-use super::{super::transport::connection::Connection, builder::ClientBuilder};
-use crate::codegen::{ClusterInvoker, Directory, RpcInvocation};
+use super::builder::ClientBuilder;
+use crate::codegen::RpcInvocation;
use crate::{
- cluster::support::cluster_invoker::ClusterRequestBuilder,
invocation::{IntoStreamingRequest, Metadata, Request, Response},
protocol::BoxInvoker,
triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode},
@@ -37,8 +34,7 @@
#[derive(Debug, Clone, Default)]
pub struct TripleClient {
pub(crate) send_compression_encoding: Option<CompressionEncoding>,
- pub(crate) directory: Option<Box<dyn Directory>>,
- pub(crate) cluster_invoker: Option<ClusterInvoker>,
+ pub(crate) builder: Option<ClientBuilder>,
pub invoker: Option<BoxInvoker>,
}
@@ -46,17 +42,14 @@
pub fn connect(host: String) -> Self {
let builder = ClientBuilder::from_static(&host).with_direct(true);
- builder.build()
+ builder.direct_build()
}
pub fn new(builder: ClientBuilder) -> Self {
- builder.build()
- }
-
- pub fn with_cluster(self, invoker: ClusterInvoker) -> Self {
TripleClient {
- cluster_invoker: Some(invoker),
- ..self
+ send_compression_encoding: Some(CompressionEncoding::Gzip),
+ builder: Some(builder),
+ invoker: None,
}
}
@@ -137,7 +130,7 @@
req: Request<M1>,
mut codec: C,
path: http::uri::PathAndQuery,
- _invocation: RpcInvocation,
+ invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
@@ -155,8 +148,16 @@
let bytes = hyper::body::to_bytes(body).await.unwrap();
let sdk_body = SdkBody::from(bytes);
- // let mut conn = Connection::new().with_host(http_uri);
- let mut conn = self.invoker.clone().unwrap();
+ let mut conn = match self.invoker.clone() {
+ Some(v) => v,
+ None => self
+ .builder
+ .clone()
+ .unwrap()
+ .build(invocation.into())
+ .unwrap(),
+ };
+
let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
let req = self.map_request(http_uri.clone(), path, sdk_body);
@@ -214,25 +215,20 @@
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
- let arc_invocation = Arc::new(invocation);
- let req;
- let http_uri;
- if self.cluster_invoker.is_some() {
- let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone();
- req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body);
- http_uri = req.uri().clone();
- } else {
- let url_list = self
- .directory
- .as_ref()
- .expect("msg")
- .list(arc_invocation.clone());
- let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
- http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
- req = self.map_request(http_uri.clone(), path, sdk_body);
- }
- let mut conn = Connection::new().with_host(http_uri);
+
+ let mut conn = match self.invoker.clone() {
+ Some(v) => v,
+ None => self
+ .builder
+ .clone()
+ .unwrap()
+ .build(invocation.into())
+ .unwrap(),
+ };
+
+ let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
+
let response = conn
.call(req)
.await
@@ -271,25 +267,21 @@
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
- let arc_invocation = Arc::new(invocation);
- let req;
- let http_uri;
- if self.cluster_invoker.is_some() {
- let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone();
- req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body);
- http_uri = req.uri().clone();
- } else {
- let url_list = self
- .directory
- .as_ref()
- .expect("msg")
- .list(arc_invocation.clone());
- let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
- http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
- req = self.map_request(http_uri.clone(), path, sdk_body);
- }
- let mut conn = Connection::new().with_host(http_uri);
+
+ let mut conn = match self.invoker.clone() {
+ Some(v) => v,
+ None => self
+ .builder
+ .clone()
+ .unwrap()
+ .build(invocation.into())
+ .unwrap(),
+ };
+
+ let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
+
+ // let mut conn = Connection::new().with_host(http_uri);
let response = conn
.call(req)
.await
@@ -344,26 +336,19 @@
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
- let arc_invocation = Arc::new(invocation);
- let req;
- let http_uri;
- if self.cluster_invoker.is_some() {
- let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone();
- req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body);
- http_uri = req.uri().clone();
- } else {
- let url_list = self
- .directory
- .as_ref()
- .expect("msg")
- .list(arc_invocation.clone());
- let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
- http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
- req = self.map_request(http_uri.clone(), path, sdk_body);
- }
- let mut conn = Connection::new().with_host(http_uri);
+ let mut conn = match self.invoker.clone() {
+ Some(v) => v,
+ None => self
+ .builder
+ .clone()
+ .unwrap()
+ .build(invocation.into())
+ .unwrap(),
+ };
+ let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
+
let response = conn
.call(req)
.await
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 319c17b..bc6638b 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -30,9 +30,7 @@
tokio-stream = "0.1"
dubbo-logger.workspace=true
-hyper = { version = "0.14.19", features = ["full"]}
-
-dubbo = {path = "../../dubbo", version = "0.3.0" }
+dubbo = {path = "../../dubbo"}
dubbo-config = {path = "../../config", version = "0.3.0" }
registry-zookeeper.workspace=true
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index db46958..0a2f150 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -34,7 +34,9 @@
// let builder = ClientBuilder::new()
// .with_connector("unix")
// .with_host("unix://127.0.0.1:8888");
- let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888").with_timeout(1000000);
+ let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888")
+ .with_timeout(1000000)
+ .with_direct(true);
let mut cli = EchoClient::new(builder);
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index 6ad3b1b..d68cab7 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -29,7 +29,7 @@
async-trait = "0.1.56"
tokio-stream = "0.1"
dubbo-logger = { path = "../../common/logger" }
-dubbo = { path = "../../dubbo", version = "0.3.0" }
+dubbo = { path = "../../dubbo"}
dubbo-config = { path = "../../config", version = "0.3.0" }
registry-zookeeper.workspace = true
registry-nacos.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index 5debc0a..e8c2c5c 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -34,11 +34,9 @@
use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
use dubbo::{
- cluster::support::cluster_invoker::ClusterInvoker,
- codegen::BoxRegistry,
registry::{
- integration::ClusterRegistryIntegration, memory_registry::MemoryRegistry, NotifyListener,
- Registry, RegistryNotifyListener, ServiceEvent,
+ memory_registry::MemoryRegistry, NotifyListener, Registry, RegistryNotifyListener,
+ ServiceEvent,
},
StdError,
};
@@ -371,12 +369,6 @@
}
}
-impl ClusterRegistryIntegration for ZookeeperRegistry {
- fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
- todo!()
- }
-}
-
#[cfg(test)]
mod tests {
use std::sync::Arc;