Rft: add Cluster to ClientBuilder (#142)

comment some codes

* refactor(triple): rm unused var in clientBuilder

* refactor(dubbo): delete some codes

* refactor(cluster): rm some duplicate codes

* refactor(registry): rm unused import

* refactor(triple): use two build func for different usage

* style: cargo fmt --all

* refactor(cluster): rm registryWrapper

* refactor(cluster): delete print

* chore(dubbo): upgrade hyper version in cargo.toml

* refactor(cluster): comment the logic of clone body
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 1e1c9fb..bfcfe45 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -90,11 +90,6 @@
                     }
                 }
 
-                pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
-                    self.inner = self.inner.with_cluster(invoker);
-                    self
-                }
-
                 #methods
 
             }
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 91b19d6..d0ab2a4 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -10,7 +10,7 @@
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-hyper = { version = "0.14.19", features = ["full"] }
+hyper = { version = "0.14.26", features = ["full"] }
 http = "0.2"
 tower-service.workspace = true
 http-body = "0.4.4"
@@ -33,7 +33,7 @@
 axum = "0.5.9"
 async-stream = "0.3"
 flate2 = "1.0"
-aws-smithy-http = "0.54.1"
+aws-smithy-http = "0.55.2"
 dyn-clone = "1.0.11"
 itertools.workspace = true
 urlencoding.workspace = true
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index e74fc6d..afe9657 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -23,39 +23,21 @@
 };
 
 use crate::{
+    codegen::TripleInvoker,
     invocation::{Invocation, RpcInvocation},
-    registry::{memory_registry::MemoryNotifyListener, BoxRegistry, RegistryWrapper},
+    protocol::BoxInvoker,
+    registry::{memory_registry::MemoryNotifyListener, BoxRegistry},
 };
 use dubbo_base::Url;
 use dubbo_logger::tracing;
 
+use crate::cluster::Directory;
+
 /// Directory.
 ///
 /// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
-pub trait Directory: Debug + DirectoryClone {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url>;
-}
 
-pub trait DirectoryClone {
-    fn clone_box(&self) -> Box<dyn Directory>;
-}
-
-impl<T> DirectoryClone for T
-where
-    T: 'static + Directory + Clone,
-{
-    fn clone_box(&self) -> Box<dyn Directory> {
-        Box::new(self.clone())
-    }
-}
-
-impl Clone for Box<dyn Directory> {
-    fn clone(&self) -> Box<dyn Directory> {
-        self.clone_box()
-    }
-}
-
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct StaticDirectory {
     uri: http::Uri,
 }
@@ -78,7 +60,7 @@
 }
 
 impl Directory for StaticDirectory {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
         let url = Url::from_url(&format!(
             "tri://{}:{}/{}",
             self.uri.host().unwrap(),
@@ -86,43 +68,28 @@
             invocation.get_target_service_unique_name(),
         ))
         .unwrap();
-        vec![url]
+        let invoker = Box::new(TripleInvoker::new(url));
+        vec![invoker]
     }
 }
 
-impl DirectoryClone for StaticDirectory {
-    fn clone_box(&self) -> Box<dyn Directory> {
-        Box::new(StaticDirectory {
-            uri: self.uri.clone(),
-        })
-    }
-}
-
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct RegistryDirectory {
-    registry: RegistryWrapper,
+    registry: Arc<BoxRegistry>,
     service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
 }
 
 impl RegistryDirectory {
     pub fn new(registry: BoxRegistry) -> RegistryDirectory {
         RegistryDirectory {
-            registry: RegistryWrapper {
-                registry: Some(registry),
-            },
+            registry: Arc::new(registry),
             service_instances: Arc::new(RwLock::new(HashMap::new())),
         }
     }
 }
 
-impl DirectoryClone for RegistryDirectory {
-    fn clone_box(&self) -> Box<dyn Directory> {
-        todo!()
-    }
-}
-
 impl Directory for RegistryDirectory {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
         let service_name = invocation.get_target_service_unique_name();
 
         let url = Url::from_url(&format!(
@@ -132,9 +99,6 @@
         .unwrap();
 
         self.registry
-            .registry
-            .as_ref()
-            .expect("msg")
             .subscribe(
                 url,
                 Arc::new(MemoryNotifyListener {
@@ -149,6 +113,11 @@
             .expect("service_instances.read");
         let binding = Vec::new();
         let url_vec = map.get(&service_name).unwrap_or(&binding);
-        url_vec.to_vec()
+        // url_vec.to_vec()
+        let mut invokers: Vec<BoxInvoker> = vec![];
+        for item in url_vec.iter() {
+            invokers.push(Box::new(TripleInvoker::new(item.clone())));
+        }
+        invokers
     }
 }
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 4f73d2f..d1f96f9 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -19,6 +19,7 @@
 
 use aws_smithy_http::body::SdkBody;
 use dubbo_base::Url;
+use dyn_clone::DynClone;
 
 use crate::{
     empty_body,
@@ -28,13 +29,14 @@
 
 pub mod directory;
 pub mod loadbalance;
-pub mod support;
 
-pub trait Directory: Debug {
+pub trait Directory: Debug + DynClone {
     fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
-    fn is_empty(&self) -> bool;
+    // fn is_empty(&self) -> bool;
 }
 
+dyn_clone::clone_trait_object!(Directory);
+
 type BoxDirectory = Box<dyn Directory + Send + Sync>;
 
 pub trait Cluster {
@@ -76,13 +78,12 @@
     }
 
     fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
-        println!("req: {}", req.body().content_length().unwrap());
-        let clone_body = req.body().try_clone().unwrap();
-        let mut clone_req = http::Request::builder()
-            .uri(req.uri().clone())
-            .method(req.method().clone());
-        *clone_req.headers_mut().unwrap() = req.headers().clone();
-        let r = clone_req.body(clone_body).unwrap();
+        // let clone_body = req.body().try_clone().unwrap();
+        // let mut clone_req = http::Request::builder()
+        //     .uri(req.uri().clone())
+        //     .method(req.method().clone());
+        // *clone_req.headers_mut().unwrap() = req.headers().clone();
+        // let r = clone_req.body(clone_body).unwrap();
         let invokers = self.dir.list(
             RpcInvocation::default()
                 .with_service_unique_name("hello".to_string())
@@ -90,7 +91,7 @@
         );
         for mut invoker in invokers {
             let fut = async move {
-                let res = invoker.call(r).await;
+                let res = invoker.call(req).await;
                 return res;
             };
             return Box::pin(fut);
@@ -110,7 +111,7 @@
     }
 }
 
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone)]
 pub struct MockDirectory {
     // router_chain: RouterChain,
     invokers: Vec<BoxInvoker>,
@@ -134,9 +135,9 @@
         self.invokers.clone()
     }
 
-    fn is_empty(&self) -> bool {
-        false
-    }
+    // fn is_empty(&self) -> bool {
+    //     false
+    // }
 }
 
 #[derive(Debug, Default)]
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs b/dubbo/src/cluster/support/cluster_invoker.rs
deleted file mode 100644
index 0ccca48..0000000
--- a/dubbo/src/cluster/support/cluster_invoker.rs
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use aws_smithy_http::body::SdkBody;
-use std::{str::FromStr, sync::Arc};
-
-use dubbo_base::Url;
-use http::{uri::PathAndQuery, Request};
-
-use crate::{
-    cluster::{
-        loadbalance::{types::BoxLoadBalance, LOAD_BALANCE_EXTENSIONS},
-        support::DEFAULT_LOADBALANCE,
-    },
-    codegen::{Directory, RegistryDirectory, TripleClient},
-    invocation::RpcInvocation,
-};
-
-#[derive(Debug, Clone)]
-pub struct ClusterInvoker {
-    directory: Arc<RegistryDirectory>,
-    destroyed: bool,
-}
-
-pub trait ClusterInvokerSelector {
-    /// Select a invoker using loadbalance policy.
-    fn select(
-        &self,
-        invocation: Arc<RpcInvocation>,
-        invokers: Arc<Vec<Url>>,
-        excluded: Arc<Vec<Url>>,
-    ) -> Option<Url>;
-
-    fn do_select(
-        &self,
-        loadbalance_key: Option<&str>,
-        invocation: Arc<RpcInvocation>,
-        invokers: Arc<Vec<Url>>,
-    ) -> Option<Url>;
-}
-
-pub trait ClusterRequestBuilder {
-    fn build_req(
-        &self,
-        triple_client: &mut TripleClient,
-        path: http::uri::PathAndQuery,
-        invocation: Arc<RpcInvocation>,
-        body: SdkBody,
-    ) -> http::Request<SdkBody>;
-}
-
-impl ClusterInvoker {
-    pub fn with_directory(registry_directory: RegistryDirectory) -> Self {
-        ClusterInvoker {
-            directory: Arc::new(registry_directory),
-            destroyed: false,
-        }
-    }
-
-    pub fn directory(&self) -> Arc<RegistryDirectory> {
-        self.directory.clone()
-    }
-
-    pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance {
-        if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) {
-            LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap()
-        } else {
-            println!(
-                "loadbalance {} not found, use default loadbalance {}",
-                loadbalance_key, DEFAULT_LOADBALANCE
-            );
-            LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap()
-        }
-    }
-
-    pub fn is_available(&self, invocation: Arc<RpcInvocation>) -> bool {
-        !self.destroyed() && !self.directory.list(invocation).is_empty()
-    }
-
-    pub fn destroyed(&self) -> bool {
-        self.destroyed
-    }
-}
-
-impl ClusterInvokerSelector for ClusterInvoker {
-    fn select(
-        &self,
-        invocation: Arc<RpcInvocation>,
-        invokers: Arc<Vec<Url>>,
-        _excluded: Arc<Vec<Url>>,
-    ) -> Option<Url> {
-        if invokers.is_empty() {
-            return None;
-        }
-        let instance_count = invokers.len();
-        return if instance_count == 1 {
-            Some(invokers.as_ref().first()?.clone())
-        } else {
-            let loadbalance = Some(DEFAULT_LOADBALANCE);
-            self.do_select(loadbalance, invocation, invokers)
-        };
-    }
-
-    /// picking instance invoker url from registry directory
-    fn do_select(
-        &self,
-        loadbalance_key: Option<&str>,
-        invocation: Arc<RpcInvocation>,
-        invokers: Arc<Vec<Url>>,
-    ) -> Option<Url> {
-        let loadbalance = self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE));
-        loadbalance.select(invokers, None, invocation)
-    }
-}
-
-impl ClusterRequestBuilder for ClusterInvoker {
-    fn build_req(
-        &self,
-        triple_client: &mut TripleClient,
-        path: PathAndQuery,
-        invocation: Arc<RpcInvocation>,
-        body: SdkBody,
-    ) -> Request<SdkBody> {
-        let invokers = self.directory.list(invocation.clone());
-        let invoker_url = self
-            .select(invocation, Arc::new(invokers), Arc::new(Vec::new()))
-            .expect("no valid provider");
-        let http_uri =
-            http::Uri::from_str(&format!("http://{}:{}/", invoker_url.ip, invoker_url.port))
-                .unwrap();
-        triple_client.map_request(http_uri, path, body)
-    }
-}
diff --git a/dubbo/src/cluster/support/mod.rs b/dubbo/src/cluster/support/mod.rs
deleted file mode 100644
index ae42cc2..0000000
--- a/dubbo/src/cluster/support/mod.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-pub mod cluster_invoker;
-
-pub const DEFAULT_LOADBALANCE: &str = "random";
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 98d784f..412feb9 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -27,14 +27,11 @@
 pub use tower_service::Service;
 
 pub use super::{
-    cluster::{
-        directory::{Directory, RegistryDirectory},
-        support::cluster_invoker::ClusterInvoker,
-    },
+    cluster::directory::RegistryDirectory,
     empty_body,
     invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
     protocol::{triple::triple_invoker::TripleInvoker, Invoker},
-    registry::{BoxRegistry, Registry, RegistryWrapper},
+    registry::{BoxRegistry, Registry},
     triple::{
         client::TripleClient,
         codec::{prost::ProstCodec, Codec},
diff --git a/dubbo/src/registry/integration.rs b/dubbo/src/registry/integration.rs
index 15b82d0..2944f98 100644
--- a/dubbo/src/registry/integration.rs
+++ b/dubbo/src/registry/integration.rs
@@ -14,10 +14,3 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-use crate::{cluster::support::cluster_invoker::ClusterInvoker, registry::BoxRegistry};
-use std::sync::Arc;
-
-pub trait ClusterRegistryIntegration {
-    /// get cluster invoker struct
-    fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>>;
-}
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 31106f0..2a95452 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -60,20 +60,3 @@
         f.write_str("BoxRegistry")
     }
 }
-
-#[derive(Default)]
-pub struct RegistryWrapper {
-    pub registry: Option<Box<dyn Registry>>,
-}
-
-impl Clone for RegistryWrapper {
-    fn clone(&self) -> Self {
-        Self { registry: None }
-    }
-}
-
-impl Debug for RegistryWrapper {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("RegistryWrapper").finish()
-    }
-}
diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs
index 29957a6..06ecd62 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -15,9 +15,12 @@
  * limitations under the License.
  */
 
+use std::sync::Arc;
+
 use crate::{
-    cluster::{directory::StaticDirectory, Cluster, MockCluster, MockDirectory},
-    codegen::{ClusterInvoker, Directory, RegistryDirectory, TripleInvoker},
+    cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory},
+    codegen::{RegistryDirectory, RpcInvocation, TripleInvoker},
+    protocol::BoxInvoker,
     triple::compression::CompressionEncoding,
     utils::boxed_clone::BoxCloneService,
 };
@@ -35,7 +38,6 @@
     pub timeout: Option<u64>,
     pub connector: &'static str,
     directory: Option<Box<dyn Directory>>,
-    cluster_invoker: Option<ClusterInvoker>,
     pub direct: bool,
     host: String,
 }
@@ -46,7 +48,6 @@
             timeout: None,
             connector: "",
             directory: None,
-            cluster_invoker: None,
             direct: false,
             host: "".to_string(),
         }
@@ -57,7 +58,6 @@
             timeout: None,
             connector: "",
             directory: Some(Box::new(StaticDirectory::new(&host))),
-            cluster_invoker: None,
             direct: true,
             host: host.clone().to_string(),
         }
@@ -74,15 +74,13 @@
     pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
         Self {
             directory: Some(directory),
-            cluster_invoker: None,
             ..self
         }
     }
 
     pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self {
         Self {
-            directory: None,
-            cluster_invoker: Some(ClusterInvoker::with_directory(registry)),
+            directory: Some(Box::new(registry)),
             ..self
         }
     }
@@ -97,7 +95,6 @@
     pub fn with_connector(self, connector: &'static str) -> Self {
         Self {
             connector: connector,
-            cluster_invoker: None,
             ..self
         }
     }
@@ -106,25 +103,31 @@
         Self { direct, ..self }
     }
 
-    pub fn build(self) -> TripleClient {
+    pub(crate) fn direct_build(self) -> TripleClient {
         let mut cli = TripleClient {
             send_compression_encoding: Some(CompressionEncoding::Gzip),
-            directory: self.directory,
-            cluster_invoker: self.cluster_invoker,
+            builder: Some(self.clone()),
             invoker: None,
         };
+        cli.invoker = Some(Box::new(TripleInvoker::new(
+            Url::from_url(&self.host).unwrap(),
+        )));
+        return cli;
+    }
+
+    pub fn build(self, invocation: Arc<RpcInvocation>) -> Option<BoxInvoker> {
         if self.direct {
-            cli.invoker = Some(Box::new(TripleInvoker::new(
+            return Some(Box::new(TripleInvoker::new(
                 Url::from_url(&self.host).unwrap(),
             )));
-            return cli;
         }
+        let invokers = match self.directory {
+            Some(v) => v.list(invocation),
+            None => panic!("use direct connection"),
+        };
 
-        let cluster = MockCluster::default().join(Box::new(MockDirectory::new(vec![Box::new(
-            TripleInvoker::new(Url::from_url("http://127.0.0.1:8888").unwrap()),
-        )])));
+        let cluster = MockCluster::default().join(Box::new(MockDirectory::new(invokers)));
 
-        cli.invoker = Some(cluster);
-        cli
+        return Some(cluster);
     }
 }
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index eb7934d..124cfcf 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-use std::{str::FromStr, sync::Arc};
+use std::str::FromStr;
 
 use futures_util::{future, stream, StreamExt, TryStreamExt};
 
 use aws_smithy_http::body::SdkBody;
 use http::HeaderValue;
-use rand::prelude::SliceRandom;
-use tower_service::Service;
 
-use super::{super::transport::connection::Connection, builder::ClientBuilder};
-use crate::codegen::{ClusterInvoker, Directory, RpcInvocation};
+use super::builder::ClientBuilder;
+use crate::codegen::RpcInvocation;
 
 use crate::{
-    cluster::support::cluster_invoker::ClusterRequestBuilder,
     invocation::{IntoStreamingRequest, Metadata, Request, Response},
     protocol::BoxInvoker,
     triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode},
@@ -37,8 +34,7 @@
 #[derive(Debug, Clone, Default)]
 pub struct TripleClient {
     pub(crate) send_compression_encoding: Option<CompressionEncoding>,
-    pub(crate) directory: Option<Box<dyn Directory>>,
-    pub(crate) cluster_invoker: Option<ClusterInvoker>,
+    pub(crate) builder: Option<ClientBuilder>,
     pub invoker: Option<BoxInvoker>,
 }
 
@@ -46,17 +42,14 @@
     pub fn connect(host: String) -> Self {
         let builder = ClientBuilder::from_static(&host).with_direct(true);
 
-        builder.build()
+        builder.direct_build()
     }
 
     pub fn new(builder: ClientBuilder) -> Self {
-        builder.build()
-    }
-
-    pub fn with_cluster(self, invoker: ClusterInvoker) -> Self {
         TripleClient {
-            cluster_invoker: Some(invoker),
-            ..self
+            send_compression_encoding: Some(CompressionEncoding::Gzip),
+            builder: Some(builder),
+            invoker: None,
         }
     }
 
@@ -137,7 +130,7 @@
         req: Request<M1>,
         mut codec: C,
         path: http::uri::PathAndQuery,
-        _invocation: RpcInvocation,
+        invocation: RpcInvocation,
     ) -> Result<Response<M2>, crate::status::Status>
     where
         C: Codec<Encode = M1, Decode = M2>,
@@ -155,8 +148,16 @@
         let bytes = hyper::body::to_bytes(body).await.unwrap();
         let sdk_body = SdkBody::from(bytes);
 
-        // let mut conn = Connection::new().with_host(http_uri);
-        let mut conn = self.invoker.clone().unwrap();
+        let mut conn = match self.invoker.clone() {
+            Some(v) => v,
+            None => self
+                .builder
+                .clone()
+                .unwrap()
+                .build(invocation.into())
+                .unwrap(),
+        };
+
         let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
         let req = self.map_request(http_uri.clone(), path, sdk_body);
 
@@ -214,25 +215,20 @@
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-        let arc_invocation = Arc::new(invocation);
-        let req;
-        let http_uri;
-        if self.cluster_invoker.is_some() {
-            let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone();
-            req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body);
-            http_uri = req.uri().clone();
-        } else {
-            let url_list = self
-                .directory
-                .as_ref()
-                .expect("msg")
-                .list(arc_invocation.clone());
-            let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
-            http_uri =
-                http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
-            req = self.map_request(http_uri.clone(), path, sdk_body);
-        }
-        let mut conn = Connection::new().with_host(http_uri);
+
+        let mut conn = match self.invoker.clone() {
+            Some(v) => v,
+            None => self
+                .builder
+                .clone()
+                .unwrap()
+                .build(invocation.into())
+                .unwrap(),
+        };
+
+        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
+
         let response = conn
             .call(req)
             .await
@@ -271,25 +267,21 @@
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-        let arc_invocation = Arc::new(invocation);
-        let req;
-        let http_uri;
-        if self.cluster_invoker.is_some() {
-            let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone();
-            req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body);
-            http_uri = req.uri().clone();
-        } else {
-            let url_list = self
-                .directory
-                .as_ref()
-                .expect("msg")
-                .list(arc_invocation.clone());
-            let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
-            http_uri =
-                http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
-            req = self.map_request(http_uri.clone(), path, sdk_body);
-        }
-        let mut conn = Connection::new().with_host(http_uri);
+
+        let mut conn = match self.invoker.clone() {
+            Some(v) => v,
+            None => self
+                .builder
+                .clone()
+                .unwrap()
+                .build(invocation.into())
+                .unwrap(),
+        };
+
+        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
+
+        // let mut conn = Connection::new().with_host(http_uri);
         let response = conn
             .call(req)
             .await
@@ -344,26 +336,19 @@
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-        let arc_invocation = Arc::new(invocation);
-        let req;
-        let http_uri;
-        if self.cluster_invoker.is_some() {
-            let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone();
-            req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body);
-            http_uri = req.uri().clone();
-        } else {
-            let url_list = self
-                .directory
-                .as_ref()
-                .expect("msg")
-                .list(arc_invocation.clone());
-            let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
-            http_uri =
-                http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
-            req = self.map_request(http_uri.clone(), path, sdk_body);
-        }
 
-        let mut conn = Connection::new().with_host(http_uri);
+        let mut conn = match self.invoker.clone() {
+            Some(v) => v,
+            None => self
+                .builder
+                .clone()
+                .unwrap()
+                .build(invocation.into())
+                .unwrap(),
+        };
+        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
+
         let response = conn
             .call(req)
             .await
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 319c17b..bc6638b 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -30,9 +30,7 @@
 tokio-stream = "0.1"
 dubbo-logger.workspace=true
 
-hyper = { version = "0.14.19", features = ["full"]}
-
-dubbo = {path = "../../dubbo", version = "0.3.0" }
+dubbo = {path = "../../dubbo"}
 dubbo-config = {path = "../../config", version = "0.3.0" }
 registry-zookeeper.workspace=true
 
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index db46958..0a2f150 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -34,7 +34,9 @@
     // let builder = ClientBuilder::new()
     //     .with_connector("unix")
     //     .with_host("unix://127.0.0.1:8888");
-    let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888").with_timeout(1000000);
+    let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888")
+        .with_timeout(1000000)
+        .with_direct(true);
     let mut cli = EchoClient::new(builder);
     // let mut unary_cli = cli.clone().with_filter(FakeFilter {});
     // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index 6ad3b1b..d68cab7 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -29,7 +29,7 @@
 async-trait = "0.1.56"
 tokio-stream = "0.1"
 dubbo-logger = { path = "../../common/logger" }
-dubbo = { path = "../../dubbo", version = "0.3.0" }
+dubbo = { path = "../../dubbo"}
 dubbo-config = { path = "../../config", version = "0.3.0" }
 registry-zookeeper.workspace = true
 registry-nacos.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index 5debc0a..e8c2c5c 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -34,11 +34,9 @@
 use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
 
 use dubbo::{
-    cluster::support::cluster_invoker::ClusterInvoker,
-    codegen::BoxRegistry,
     registry::{
-        integration::ClusterRegistryIntegration, memory_registry::MemoryRegistry, NotifyListener,
-        Registry, RegistryNotifyListener, ServiceEvent,
+        memory_registry::MemoryRegistry, NotifyListener, Registry, RegistryNotifyListener,
+        ServiceEvent,
     },
     StdError,
 };
@@ -371,12 +369,6 @@
     }
 }
 
-impl ClusterRegistryIntegration for ZookeeperRegistry {
-    fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
-        todo!()
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;