Feat: cluster and extension (#185)

* feat(dubbo): add unix feature

* Rft: replace feature with target_os cfg

* Rft(dubbo): add ClientBuilder for client

* Rftï(dubbo-build): add build api for client

* style(examples): cargo fmt

* Rft: move connection from client to transport mod

* Rft(dubbo): add default timeout for client

* Ftr: add serverBuilder for Server, support multiple ways to start server

* Rft(examples): update yaml

* refactor(dubbo): update invoker trait

* refactor(cluster): add Cluster MockImpl

* refactor(triple): use ClientBuilder to init Cluster ability

* Update builder.rs

update default direct value

* Update triple.rs

handle unused var

* Update mod.rs

comment some codes

* refactor(triple): rm unused var in clientBuilder

* refactor(dubbo): delete some codes

* refactor(cluster): rm some duplicate codes

* refactor(registry): rm unused import

* refactor(triple): use two build func for different usage

* style: cargo fmt --all

* refactor(cluster): rm registryWrapper

* refactor(cluster): delete print

* chore(dubbo): upgrade hyper version in cargo.toml

* refactor(cluster): comment the logic of clone body

* Rft(triple): remove Clone of Invoker

* Rft(cluster): use ready_cache to manage Invokers, add ready_cache in FailoverCluster

* Rft(protocol): use interface Inheritance to redesign Invoker

* Feat(cluster): Cluster Policy Impl (#146)

* refactor(cluster): comment the logic of clone body

* Rft(triple): remove Clone of Invoker

* Rft(cluster): use ready_cache to manage Invokers, add ready_cache in FailoverCluster

* Rft(protocol): use interface Inheritance to redesign Invoker

---------

Co-authored-by: G-XD <38717659+G-XD@users.noreply.github.com>
Co-authored-by: GXD <gexiangdong@highlight.mobi>

* chore(github): rm workflow_dispatch in workflow (#149)

* feat: Add Router Module(#144) (#153)

add condition router,
add tag router,
use nacos as router config center

* Ftr: failover cluster (#156)

* Ftr: add ServiceNameDirectory (#157)

* Ftr: failover cluster

* Ftr: add ServiceNameDirectory

* Feat/cluster Optimized the Router module (#160)

* perf: Optimized the logic of the routing module.

Refactored route logic decision-making, eliminating unnecessary cloning
and improving performance.

* perf: Optimized the logic of the routing module.

Refactored route logic decision-making, eliminating unnecessary cloning
and improving performance.

* perf: Removed unnecessary configurations.

* perf: Removed unnecessary configurations.

* perf: Optimized the Router module

Optimized the Router module
Added Router Chain to MockDirectory

* Refactor: refactor Cluster component (#165)

* Refactor: refactor Cluster component

- add p2c loadbalance component
- add simple router component
- add replay body component
- add failover component
- add service directory compoent

* Enhance: add cache for routers

* Tst: local test passed (#166)

* Tst: local test passed

* Enhance: remove unnecessary key

* Enhance: add BUFFER SIZE const variable

* style(dubbo): cargo fmt --all

* style(dubbo): cargo fix --lib -p dubbo --allow-dirty

* chore(github): update branch in pr

* Mod: format code and fix some warnings (#167)

* style(dubbo): cargo fmt --all

* style(dubbo): cargo fix --lib -p dubbo --allow-dirty

* chore(github): update branch in pr

* Rft: adapt nacos registry and zookeeper registry (#169)

* Rft: adapt nacos registry and zookeeper registry

Close #168

* Rft: adapt static registry

* Rft: cargo fmt

* Ftr: add extension module (#181)

* Ftr: add extension module

- adapt static registry by extension
- adapt nacos  registry by extension

link #180

* cargo fmt all

* fix ci error

* fix nacos image version error

* Rft: re-design extension register

* Fix: cargo fix

* Fix: add some license for every files

- extract UrlParam to single file
- fix github ci error

* Fix: fmt all

* Fix: Add license for extension_param.rs and registry_param.rs

* Fix: rename query_param_by_kv method name

* Fix: get stuck when load extension in the concurrency environment (#184)

* Fix: get stuck when load extension in the concurrency environment

- Add a new struct called LoadExtensionPromise
- Remove async modifier in ExtensionDirectory

Close #183

* Ftr: use RwLock instead of unsafe

* Rft: simplify the code of extension promise resolve

* refeat(extensions): add sync for Registry trait

* chore: cargo fmt

* chore: cargo fmt

* chore: cargo fmt

* chore: cargo fmt

---------

Co-authored-by: G-XD <38717659+G-XD@users.noreply.github.com>
Co-authored-by: GXD <gexiangdong@highlight.mobi>
Co-authored-by: Urara <95117705+AdachiAndShimamura@users.noreply.github.com>
Co-authored-by: 毛文超 <admin@onew.me>
diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml
index b02b3bc..8f5e9aa 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -6,9 +6,10 @@
   push:
     branches: ["*"]
   pull_request:
-    branches: ["*"]
+    branches:
+    - '*'
+    - 'refact/*'
 
-  workflow_dispatch:
 
 jobs:
   check:
@@ -59,6 +60,13 @@
           - 2181:2181
         env:
           ZOO_MY_ID: 1
+      nacos:
+        image: nacos/nacos-server:v2.3.1
+        ports:
+          - 8848:8848
+          - 9848:9848
+        env:
+          MODE: standalone
     steps:
       - uses: actions/checkout@main
       - uses: actions-rs/toolchain@v1
diff --git a/Cargo.toml b/Cargo.toml
index ad39f08..4f96717 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,4 +1,5 @@
 [workspace]
+resolver = "2"
 members = [
   "common/logger",
   "common/utils",
@@ -59,5 +60,7 @@
 bytes = "1.0"
 prost-serde = "0.3.0"
 prost-serde-derive = "0.1.2"
+url = "2.5.0"
+
 
 
diff --git a/application.yaml b/application.yaml
index d357db1..bec29a6 100644
--- a/application.yaml
+++ b/application.yaml
@@ -21,4 +21,9 @@
     references:
       GreeterClientImpl:
         url: tri://localhost:20000
-        protocol: tri
\ No newline at end of file
+        protocol: tri
+  routers:
+    consumer:
+      - service: "org.apache.dubbo.sample.tri.Greeter"
+        url: tri://127.0.0.1:20000
+        protocol: triple
\ No newline at end of file
diff --git a/common/base/Cargo.toml b/common/base/Cargo.toml
index 40579e0..678af77 100644
--- a/common/base/Cargo.toml
+++ b/common/base/Cargo.toml
@@ -6,6 +6,5 @@
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-urlencoding.workspace = true
-http = "0.2"
-dubbo-logger.workspace = true
\ No newline at end of file
+dubbo-logger.workspace = true
+url.workspace = true
\ No newline at end of file
diff --git a/common/base/src/extension_param.rs b/common/base/src/extension_param.rs
new file mode 100644
index 0000000..93e0a16
--- /dev/null
+++ b/common/base/src/extension_param.rs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::{url::UrlParam, StdError};
+use std::{borrow::Cow, convert::Infallible, str::FromStr};
+
+pub struct ExtensionName(String);
+
+impl ExtensionName {
+    pub fn new(name: String) -> Self {
+        ExtensionName(name)
+    }
+}
+
+impl UrlParam for ExtensionName {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "extension-name"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.clone()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        self.0.as_str().into()
+    }
+}
+
+impl FromStr for ExtensionName {
+    type Err = StdError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(ExtensionName::new(s.to_string()))
+    }
+}
+
+pub enum ExtensionType {
+    Registry,
+}
+
+impl UrlParam for ExtensionType {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "extension-type"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        match self {
+            ExtensionType::Registry => "registry".to_owned(),
+        }
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        match self {
+            ExtensionType::Registry => Cow::Borrowed("registry"),
+        }
+    }
+}
+
+impl FromStr for ExtensionType {
+    type Err = Infallible;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "registry" => Ok(ExtensionType::Registry),
+            _ => panic!("the extension type enum is not in range"),
+        }
+    }
+}
diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs
index b97b342..dcc9256 100644
--- a/common/base/src/lib.rs
+++ b/common/base/src/lib.rs
@@ -19,8 +19,12 @@
     allow(dead_code, unused_imports, unused_variables, unused_mut)
 )]
 pub mod constants;
+pub mod extension_param;
 pub mod node;
+pub mod registry_param;
 pub mod url;
 
 pub use node::Node;
 pub use url::Url;
+
+pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
diff --git a/common/base/src/registry_param.rs b/common/base/src/registry_param.rs
new file mode 100644
index 0000000..8aa7c07
--- /dev/null
+++ b/common/base/src/registry_param.rs
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::{url::UrlParam, StdError, Url};
+use std::{borrow::Cow, convert::Infallible, str::FromStr};
+
+pub struct RegistryUrl(Url);
+
+impl RegistryUrl {
+    pub fn new(url: Url) -> Self {
+        Self(url)
+    }
+}
+
+impl UrlParam for RegistryUrl {
+    type TargetType = Url;
+
+    fn name() -> &'static str {
+        "registry"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.clone()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        self.0.as_str().into()
+    }
+}
+
+impl FromStr for RegistryUrl {
+    type Err = StdError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.parse()?))
+    }
+}
+
+pub struct ServiceNamespace(String);
+
+impl ServiceNamespace {
+    pub fn new(namespace: String) -> Self {
+        Self(namespace)
+    }
+}
+
+impl UrlParam for ServiceNamespace {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "namespace"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.clone()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        self.0.as_str().into()
+    }
+}
+
+impl FromStr for ServiceNamespace {
+    type Err = StdError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.to_string()))
+    }
+}
+
+impl Default for ServiceNamespace {
+    fn default() -> Self {
+        Self("public".to_string())
+    }
+}
+
+pub struct AppName(String);
+
+impl AppName {
+    pub fn new(app_name: String) -> Self {
+        Self(app_name)
+    }
+}
+
+impl UrlParam for AppName {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "app_name"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.clone()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        self.0.as_str().into()
+    }
+}
+
+impl FromStr for AppName {
+    type Err = StdError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.to_string()))
+    }
+}
+
+impl Default for AppName {
+    fn default() -> Self {
+        Self("UnknownApp".to_string())
+    }
+}
+
+pub struct InterfaceName(String);
+
+impl InterfaceName {
+    pub fn new(interface_name: String) -> Self {
+        Self(interface_name)
+    }
+}
+
+impl UrlParam for InterfaceName {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "interface"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.clone()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        self.0.as_str().into()
+    }
+}
+
+impl FromStr for InterfaceName {
+    type Err = StdError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.to_string()))
+    }
+}
+
+impl Default for InterfaceName {
+    fn default() -> Self {
+        Self("".to_string())
+    }
+}
+
+pub struct Category(String);
+
+impl Category {
+    pub fn new(category: String) -> Self {
+        Self(category)
+    }
+}
+
+impl UrlParam for Category {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "category"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.clone()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        self.0.as_str().into()
+    }
+}
+
+impl FromStr for Category {
+    type Err = StdError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.to_string()))
+    }
+}
+
+impl Default for Category {
+    fn default() -> Self {
+        Self("".to_string())
+    }
+}
+
+pub struct Version(String);
+
+impl Version {
+    pub fn new(version: String) -> Self {
+        Self(version)
+    }
+}
+
+impl UrlParam for Version {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "version"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.clone()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        self.0.as_str().into()
+    }
+}
+
+impl FromStr for Version {
+    type Err = StdError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.to_string()))
+    }
+}
+
+impl Default for Version {
+    fn default() -> Self {
+        Self("".to_string())
+    }
+}
+
+pub struct Group(String);
+
+impl Group {
+    pub fn new(group: String) -> Self {
+        Self(group)
+    }
+}
+
+impl UrlParam for Group {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "group"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.clone()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        self.0.as_str().into()
+    }
+}
+
+impl FromStr for Group {
+    type Err = StdError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.to_string()))
+    }
+}
+
+impl Default for Group {
+    fn default() -> Self {
+        Self("".to_string())
+    }
+}
+
+pub enum Side {
+    Provider,
+    Consumer,
+}
+
+impl UrlParam for Side {
+    type TargetType = String;
+
+    fn name() -> &'static str {
+        "side"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        match self {
+            Side::Consumer => "consumer".to_owned(),
+            Side::Provider => "provider".to_owned(),
+        }
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        match self {
+            Side::Consumer => Cow::Borrowed("consumer"),
+            Side::Provider => Cow::Borrowed("provider"),
+        }
+    }
+}
+
+impl FromStr for Side {
+    type Err = Infallible;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "consumer" => Ok(Side::Consumer),
+            "provider" => Ok(Side::Provider),
+            _ => Ok(Side::Consumer),
+        }
+    }
+}
+
+impl Default for Side {
+    fn default() -> Self {
+        Side::Consumer
+    }
+}
+
+pub struct StaticInvokerUrls(String);
+
+impl UrlParam for StaticInvokerUrls {
+    type TargetType = Vec<Url>;
+
+    fn name() -> &'static str {
+        "static-invoker-urls"
+    }
+
+    fn value(&self) -> Self::TargetType {
+        self.0.split(",").map(|url| url.parse().unwrap()).collect()
+    }
+
+    fn as_str(&self) -> Cow<str> {
+        Cow::Borrowed(&self.0)
+    }
+}
+
+impl FromStr for StaticInvokerUrls {
+    type Err = Infallible;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.to_string()))
+    }
+}
+
+impl Default for StaticInvokerUrls {
+    fn default() -> Self {
+        Self(String::default())
+    }
+}
diff --git a/common/base/src/url.rs b/common/base/src/url.rs
index 82b026f..ac97f26 100644
--- a/common/base/src/url.rs
+++ b/common/base/src/url.rs
@@ -16,237 +16,162 @@
  */
 
 use std::{
+    borrow::Cow,
     collections::HashMap,
-    fmt::{Display, Formatter},
+    fmt::{Debug, Display, Formatter},
+    str::FromStr,
 };
 
-use crate::constants::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
-use http::Uri;
-
-#[derive(Debug, Clone, Default, PartialEq)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
 pub struct Url {
-    pub raw_url_string: String,
-    // value of scheme is different to base name, eg. triple -> tri://
-    pub scheme: String,
-    pub location: String,
-    pub ip: String,
-    pub port: String,
-    // serviceKey format in dubbo java and go '{group}/{interfaceName}:{version}'
-    pub service_key: String,
-    // same to interfaceName
-    pub service_name: String,
-    pub params: HashMap<String, String>,
+    inner: url::Url,
 }
 
 impl Url {
-    pub fn new() -> Self {
-        Default::default()
+    pub fn empty() -> Self {
+        "empty://localhost".parse().unwrap()
     }
 
-    pub fn from_url(url: &str) -> Option<Self> {
-        // url: triple://127.0.0.1:8888/helloworld.Greeter
-        let uri = url
-            .parse::<http::Uri>()
-            .map_err(|err| {
-                dubbo_logger::tracing::error!("fail to parse url({}), err: {:?}", url, err);
-            })
-            .unwrap();
-        let query = uri.path_and_query().unwrap().query();
-        let mut url_inst = Self {
-            raw_url_string: url.to_string(),
-            scheme: uri.scheme_str()?.to_string(),
-            ip: uri.authority()?.host().to_string(),
-            port: uri.authority()?.port()?.to_string(),
-            location: uri.authority()?.to_string(),
-            service_key: uri.path().trim_start_matches('/').to_string(),
-            service_name: uri.path().trim_start_matches('/').to_string(),
-            params: if let Some(..) = query {
-                Url::decode(query.unwrap())
-            } else {
-                HashMap::new()
-            },
-        };
-        url_inst.renew_raw_url_string();
-        Some(url_inst)
+    pub fn protocol(&self) -> &str {
+        self.inner.scheme()
     }
 
-    pub fn get_service_key(&self) -> String {
-        self.service_key.clone()
+    pub fn host(&self) -> Option<&str> {
+        self.inner.host_str()
     }
 
-    pub fn get_service_name(&self) -> String {
-        self.service_name.clone()
+    pub fn authority(&self) -> &str {
+        self.inner.authority()
     }
 
-    pub fn get_param(&self, key: &str) -> Option<String> {
-        self.params.get(key).cloned()
+    pub fn username(&self) -> &str {
+        self.inner.username()
     }
 
-    fn encode_param(&self) -> String {
-        let mut params_vec: Vec<String> = Vec::new();
-        for (k, v) in self.params.iter() {
-            // let tmp = format!("{}={}", k, v);
-            params_vec.push(format!("{}={}", k, v));
-        }
-        if params_vec.is_empty() {
-            "".to_string()
-        } else {
-            format!("?{}", params_vec.join("&"))
-        }
+    pub fn password(&self) -> Option<&str> {
+        self.inner.password()
     }
 
-    pub fn params_count(&self) -> usize {
-        self.params.len()
+    pub fn port(&self) -> Option<u16> {
+        self.inner.port_or_known_default()
     }
 
-    fn decode(raw_query_string: &str) -> HashMap<String, String> {
-        let mut params = HashMap::new();
-        let p: Vec<String> = raw_query_string
-            .split('&')
-            .map(|v| v.trim().to_string())
-            .collect();
-        for v in p.iter() {
-            let values: Vec<String> = v.split('=').map(|v| v.trim().to_string()).collect();
-            if values.len() != 2 {
-                continue;
-            }
-            params.insert(values[0].clone(), values[1].clone());
-        }
-        params
+    pub fn path(&self) -> &str {
+        self.inner.path()
     }
 
-    pub fn set_param(&mut self, key: &str, value: &str) {
-        self.params.insert(key.to_string(), value.to_string());
-        self.renew_raw_url_string();
+    pub fn query<T: UrlParam>(&self) -> Option<T> {
+        self.inner
+            .query_pairs()
+            .find(|(k, _)| k == T::name())
+            .map(|(_, v)| T::from_str(&v).ok())
+            .flatten()
     }
 
-    pub fn raw_url_string(&self) -> String {
-        self.raw_url_string.clone()
+    pub fn query_param_by_key(&self, key: &str) -> Option<String> {
+        self.inner
+            .query_pairs()
+            .find(|(k, _)| k == key)
+            .map(|(_, v)| v.into_owned())
     }
 
-    pub fn encoded_raw_url_string(&self) -> String {
-        urlencoding::encode(self.raw_url_string.as_str()).to_string()
+    pub fn all_query_params(&self) -> HashMap<String, String> {
+        self.inner
+            .query_pairs()
+            .map(|(k, v)| (k.into_owned(), v.into_owned()))
+            .collect()
     }
 
-    fn build_service_key(&self) -> String {
-        format!(
-            "{group}/{interfaceName}:{version}",
-            group = self.get_param(GROUP_KEY).unwrap_or("default".to_string()),
-            interfaceName = self.get_param(INTERFACE_KEY).unwrap_or("error".to_string()),
-            version = self.get_param(VERSION_KEY).unwrap_or("1.0.0".to_string())
-        )
+    pub fn set_protocol(&mut self, protocol: &str) {
+        let _ = self.inner.set_scheme(protocol);
     }
 
-    pub fn to_url(&self) -> String {
-        self.raw_url_string()
+    pub fn set_host(&mut self, host: &str) {
+        let _ = self.inner.set_host(Some(host));
     }
 
-    fn renew_raw_url_string(&mut self) {
-        self.raw_url_string = format!(
-            "{}://{}:{}/{}{}",
-            self.scheme,
-            self.ip,
-            self.port,
-            self.service_name,
-            self.encode_param()
-        );
-        self.service_key = self.build_service_key()
+    pub fn set_port(&mut self, port: u16) {
+        let _ = self.inner.set_port(Some(port));
     }
 
-    // short_url is used for tcp listening
-    pub fn short_url(&self) -> String {
-        format!(
-            "{}://{}:{}/{}",
-            self.scheme, self.ip, self.port, self.service_name
-        )
+    pub fn set_username(&mut self, username: &str) {
+        let _ = self.inner.set_username(username);
     }
 
-    pub fn protocol(&self) -> String {
-        self.scheme.clone()
+    pub fn set_password(&mut self, password: &str) {
+        let _ = self.inner.set_password(Some(password));
     }
 
-    pub fn get_ip_port(&self) -> String {
-        format!("{}:{}", self.ip, self.port)
+    pub fn set_path(&mut self, path: &str) {
+        let _ = self.inner.set_path(path);
+    }
+
+    pub fn extend_pairs(&mut self, pairs: impl Iterator<Item = (String, String)>) {
+        let mut query_pairs = self.inner.query_pairs_mut();
+        query_pairs.extend_pairs(pairs);
+    }
+
+    pub fn add_query_param<T: UrlParam>(&mut self, param: T) {
+        let mut pairs = self.inner.query_pairs_mut();
+        pairs.append_pair(T::name(), &param.as_str());
+    }
+
+    pub fn remove_query_param<T: UrlParam>(&mut self) {
+        let query = self.inner.query_pairs().filter(|(k, v)| k.ne(T::name()));
+        let mut inner_url = self.inner.clone();
+        inner_url.query_pairs_mut().clear().extend_pairs(query);
+        self.inner = inner_url;
+    }
+
+    pub fn remove_all_param(&mut self) {
+        self.inner.query_pairs_mut().clear();
+    }
+
+    pub fn as_str(&self) -> &str {
+        self.inner.as_str()
+    }
+
+    pub fn short_url_without_query(&self) -> String {
+        let mut url = self.inner.clone();
+        url.set_query(Some(""));
+        url.into()
+    }
+}
+
+impl FromStr for Url {
+    type Err = url::ParseError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Url {
+            inner: url::Url::parse(s)?,
+        })
     }
 }
 
 impl Display for Url {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.write_str(self.raw_url_string().as_str())
+        std::fmt::Display::fmt(&self.inner, f)
     }
 }
 
-impl Into<Uri> for Url {
-    fn into(self) -> Uri {
-        self.raw_url_string.parse::<Uri>().unwrap()
+impl Debug for Url {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        std::fmt::Debug::fmt(&self.inner, f)
     }
 }
 
-impl From<&str> for Url {
-    fn from(url: &str) -> Self {
-        Url::from_url(url).unwrap()
+impl From<Url> for String {
+    fn from(url: Url) -> Self {
+        url.inner.into()
     }
 }
 
-#[cfg(test)]
-mod tests {
-    use crate::{
-        constants::{ANYHOST_KEY, VERSION_KEY},
-        url::Url,
-    };
+pub trait UrlParam: FromStr {
+    type TargetType;
 
-    #[test]
-    fn test_from_url() {
-        let mut u1 = Url::from_url("tri://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&\
-        application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&\
-        environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&\
-        module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&\
-        side=provider&timeout=3000&timestamp=1556509797245&version=1.0.0&application=test");
-        assert_eq!(
-            u1.as_ref().unwrap().service_key,
-            "default/com.ikurento.user.UserProvider:1.0.0"
-        );
-        assert_eq!(
-            u1.as_ref()
-                .unwrap()
-                .get_param(ANYHOST_KEY)
-                .unwrap()
-                .as_str(),
-            "true"
-        );
-        assert_eq!(
-            u1.as_ref()
-                .unwrap()
-                .get_param("default.timeout")
-                .unwrap()
-                .as_str(),
-            "10000"
-        );
-        assert_eq!(u1.as_ref().unwrap().scheme, "tri");
-        assert_eq!(u1.as_ref().unwrap().ip, "127.0.0.1");
-        assert_eq!(u1.as_ref().unwrap().port, "20000");
-        assert_eq!(u1.as_ref().unwrap().params_count(), 18);
-        u1.as_mut().unwrap().set_param("key1", "value1");
-        assert_eq!(
-            u1.as_ref().unwrap().get_param("key1").unwrap().as_str(),
-            "value1"
-        );
-        assert_eq!(
-            u1.as_ref()
-                .unwrap()
-                .get_param(VERSION_KEY)
-                .unwrap()
-                .as_str(),
-            "1.0.0"
-        );
-    }
+    fn name() -> &'static str;
 
-    #[test]
-    fn test2() {
-        let url: Url = "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter".into();
-        assert_eq!(
-            url.raw_url_string(),
-            "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter"
-        )
-    }
+    fn value(&self) -> Self::TargetType;
+
+    fn as_str(&self) -> Cow<str>;
 }
diff --git a/config/src/config.rs b/config/src/config.rs
index 646873d..cf4e441 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -17,7 +17,7 @@
 
 use std::{collections::HashMap, env, path::PathBuf};
 
-use crate::{protocol::Protocol, registry::RegistryConfig};
+use crate::{protocol::Protocol, registry::RegistryConfig, router::RouterConfig};
 use dubbo_logger::tracing;
 use dubbo_utils::yaml_util::yaml_file_parser;
 use once_cell::sync::OnceCell;
@@ -45,6 +45,9 @@
     pub registries: HashMap<String, RegistryConfig>,
 
     #[serde(default)]
+    pub routers: RouterConfig,
+
+    #[serde(default)]
     pub data: HashMap<String, String>,
 }
 
@@ -63,6 +66,7 @@
             protocols: HashMap::new(),
             registries: HashMap::new(),
             provider: ProviderConfig::new(),
+            routers: RouterConfig::default(),
             data: HashMap::new(),
         }
     }
diff --git a/config/src/lib.rs b/config/src/lib.rs
index 0748c66..6fa3880 100644
--- a/config/src/lib.rs
+++ b/config/src/lib.rs
@@ -21,4 +21,5 @@
 pub mod protocol;
 pub mod provider;
 pub mod registry;
+pub mod router;
 pub mod service;
diff --git a/config/src/router.rs b/config/src/router.rs
new file mode 100644
index 0000000..7976f6e
--- /dev/null
+++ b/config/src/router.rs
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
+pub struct ConditionRouterConfig {
+    #[serde(rename = "configVersion")]
+    pub config_version: String,
+    pub scope: String,
+    pub force: bool,
+    pub enabled: bool,
+    pub key: String,
+    pub conditions: Vec<String>,
+}
+
+#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
+pub struct TagRouterConfig {
+    #[serde(rename = "configVersion")]
+    pub config_version: String,
+    pub force: bool,
+    pub enabled: bool,
+    pub key: String,
+    pub tags: Vec<Tag>,
+}
+
+#[derive(Serialize, Deserialize, Clone, PartialEq, Default, Debug)]
+pub struct ConsumerConfig {
+    pub service: String,
+    pub url: String,
+    pub protocol: String,
+}
+
+#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
+pub struct Tag {
+    pub name: String,
+    #[serde(rename = "match")]
+    pub matches: Vec<TagMatchRule>,
+}
+
+#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
+pub struct TagMatchRule {
+    pub key: String,
+    pub value: String,
+}
+
+impl ConditionRouterConfig {
+    pub fn new(config: &String) -> Self {
+        serde_yaml::from_str(config).expect("parse error")
+    }
+}
+
+#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)]
+pub struct EnableAuth {
+    pub auth_username: String,
+    pub auth_password: String,
+}
+
+#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)]
+pub struct NacosConfig {
+    pub addr: String,
+    pub namespace: String,
+    pub app: String,
+    pub enable_auth: Option<EnableAuth>,
+    pub enable_auth_plugin_http: Option<bool>,
+}
+
+#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
+pub struct RouterConfig {
+    pub consumer: Option<Vec<ConsumerConfig>>,
+    pub nacos: Option<NacosConfig>,
+    pub conditions: Option<Vec<ConditionRouterConfig>>,
+    pub tags: Option<TagRouterConfig>,
+}
diff --git a/config/src/service.rs b/config/src/service.rs
index 1f85a92..8a1f191 100644
--- a/config/src/service.rs
+++ b/config/src/service.rs
@@ -41,16 +41,4 @@
     pub fn protocol(self, protocol: String) -> Self {
         Self { protocol, ..self }
     }
-
-    // pub fn get_url(&self) -> Vec<Url> {
-    //     let mut urls = Vec::new();
-    //     for (_, conf) in self.protocol_configs.iter() {
-    //         urls.push(Url {
-    //             url: conf.to_owned().to_url(),
-    //             service_key: "".to_string(),
-    //         });
-    //     }
-
-    //     urls
-    // }
 }
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index af32b64..a3290dd 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -63,7 +63,7 @@
 
             #service_doc
             #(#struct_attributes)*
-            #[derive(Debug, Clone, Default)]
+            #[derive(Clone)]
             pub struct #service_ident {
                 inner: TripleClient,
             }
@@ -76,12 +76,6 @@
                     }
                 }
 
-                // pub fn build(builder: ClientBuilder) -> Self {
-                //     Self {
-                //         inner: TripleClient::new(builder),
-                //     }
-                // }
-
                 pub fn new(builder: ClientBuilder) -> Self {
                     Self {
                         inner: TripleClient::new(builder),
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 8c2821e..dbbb96a 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -14,7 +14,7 @@
 http = "0.2"
 tower-service.workspace = true
 http-body = "0.4.4"
-tower = { workspace = true, features = ["timeout"] }
+tower = { workspace = true, features = ["timeout", "ready-cache","discover","retry"] }
 futures-util = "0.3.23"
 futures-core ="0.3.23"
 argh = "0.1"
@@ -24,6 +24,8 @@
 tokio-rustls="0.24.1"
 tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal",  "full" ] }
 prost = "0.11.9"
+tokio-util = "0.7.9"
+tokio-stream = "0.1"
 async-trait = "0.1.56"
 tower-layer.workspace = true
 bytes.workspace = true
@@ -42,8 +44,13 @@
 lazy_static.workspace = true
 dubbo-base.workspace = true
 dubbo-logger.workspace = true
+once_cell.workspace = true
 
 dubbo-config = { path = "../config", version = "0.3.0" }
 
 #对象存储
 state = { version = "0.5", features = ["tls"] }
+thiserror = "1.0.48"
+regex = "1.9.1"
+nacos-sdk = { version = "0.3.0", features = ["default"] }
+serde_yaml = "0.9.22"
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
deleted file mode 100644
index 144f011..0000000
--- a/dubbo/src/cluster/directory.rs
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use std::{
-    collections::HashMap,
-    fmt::Debug,
-    str::FromStr,
-    sync::{Arc, RwLock},
-};
-
-use crate::{
-    codegen::TripleInvoker,
-    invocation::{Invocation, RpcInvocation},
-    protocol::BoxInvoker,
-    registry::{memory_registry::MemoryNotifyListener, BoxRegistry},
-};
-use dubbo_base::Url;
-use dubbo_logger::tracing;
-
-use crate::cluster::Directory;
-
-/// Directory.
-///
-/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
-
-#[derive(Debug, Clone)]
-pub struct StaticDirectory {
-    uri: http::Uri,
-}
-
-impl StaticDirectory {
-    pub fn new(host: &str) -> StaticDirectory {
-        let uri = match http::Uri::from_str(host) {
-            Ok(v) => v,
-            Err(err) => {
-                tracing::error!("http uri parse error: {}, host: {}", err, host);
-                panic!("http uri parse error: {}, host: {}", err, host)
-            }
-        };
-        StaticDirectory { uri: uri }
-    }
-
-    pub fn from_uri(uri: &http::Uri) -> StaticDirectory {
-        StaticDirectory { uri: uri.clone() }
-    }
-}
-
-impl Directory for StaticDirectory {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
-        let url = Url::from_url(&format!(
-            "{}://{}:{}/{}",
-            self.uri.scheme_str().unwrap_or("tri"),
-            self.uri.host().unwrap(),
-            self.uri.port().unwrap(),
-            invocation.get_target_service_unique_name(),
-        ))
-        .unwrap();
-        let invoker = Box::new(TripleInvoker::new(url));
-        vec![invoker]
-    }
-}
-
-#[derive(Debug, Clone)]
-pub struct RegistryDirectory {
-    registry: Arc<BoxRegistry>,
-    service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
-}
-
-impl RegistryDirectory {
-    pub fn new(registry: BoxRegistry) -> RegistryDirectory {
-        RegistryDirectory {
-            registry: Arc::new(registry),
-            service_instances: Arc::new(RwLock::new(HashMap::new())),
-        }
-    }
-}
-
-impl Directory for RegistryDirectory {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
-        let service_name = invocation.get_target_service_unique_name();
-
-        let url = Url::from_url(&format!(
-            "triple://{}:{}/{}",
-            "127.0.0.1", "8888", service_name
-        ))
-        .unwrap();
-
-        self.registry
-            .subscribe(
-                url,
-                Arc::new(MemoryNotifyListener {
-                    service_instances: Arc::clone(&self.service_instances),
-                }),
-            )
-            .expect("subscribe");
-
-        let map = self
-            .service_instances
-            .read()
-            .expect("service_instances.read");
-        let binding = Vec::new();
-        let url_vec = map.get(&service_name).unwrap_or(&binding);
-        // url_vec.to_vec()
-        let mut invokers: Vec<BoxInvoker> = vec![];
-        for item in url_vec.iter() {
-            invokers.push(Box::new(TripleInvoker::new(item.clone())));
-        }
-        invokers
-    }
-}
diff --git a/dubbo/src/cluster/failover.rs b/dubbo/src/cluster/failover.rs
new file mode 100644
index 0000000..a223ddf
--- /dev/null
+++ b/dubbo/src/cluster/failover.rs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::task::Poll;
+
+use dubbo_base::StdError;
+use futures_util::future;
+use http::Request;
+use tower::{retry::Retry, util::Oneshot, ServiceExt};
+use tower_service::Service;
+
+pub struct Failover<N> {
+    inner: N, // loadbalancer service
+}
+
+#[derive(Clone)]
+pub struct FailoverPolicy;
+
+impl<N> Failover<N> {
+    pub fn new(inner: N) -> Self {
+        Self { inner }
+    }
+}
+
+impl<B, Res, E> tower::retry::Policy<Request<B>, Res, E> for FailoverPolicy
+where
+    B: http_body::Body + Clone,
+{
+    type Future = future::Ready<Self>;
+
+    fn retry(&self, _req: &Request<B>, result: Result<&Res, &E>) -> Option<Self::Future> {
+        //TODO some error handling or logging
+        match result {
+            Ok(_) => None,
+            Err(_) => Some(future::ready(self.clone())),
+        }
+    }
+
+    fn clone_request(&self, req: &Request<B>) -> Option<Request<B>> {
+        let mut clone = http::Request::new(req.body().clone());
+        *clone.method_mut() = req.method().clone();
+        *clone.uri_mut() = req.uri().clone();
+        *clone.headers_mut() = req.headers().clone();
+        *clone.version_mut() = req.version();
+
+        Some(clone)
+    }
+}
+
+impl<N, B> Service<Request<B>> for Failover<N>
+where
+    // B is CloneBody<B>
+    B: http_body::Body + Clone,
+    // loadbalancer service
+    N: Service<Request<B>> + Clone + 'static,
+    N::Error: Into<StdError>,
+    N::Future: Send,
+{
+    type Response = N::Response;
+
+    type Error = N::Error;
+
+    type Future = Oneshot<Retry<FailoverPolicy, N>, Request<B>>;
+
+    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+        self.inner.poll_ready(cx)
+    }
+
+    fn call(&mut self, req: Request<B>) -> Self::Future {
+        let retry = Retry::new(FailoverPolicy, self.inner.clone());
+        retry.oneshot(req)
+    }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/random.rs b/dubbo/src/cluster/loadbalance/impls/random.rs
deleted file mode 100644
index 3e1cf65..0000000
--- a/dubbo/src/cluster/loadbalance/impls/random.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use dubbo_base::Url;
-use std::{
-    fmt::{Debug, Formatter},
-    sync::Arc,
-};
-
-use crate::{
-    cluster::loadbalance::types::{LoadBalance, Metadata},
-    codegen::RpcInvocation,
-};
-
-pub struct RandomLoadBalance {
-    pub metadata: Metadata,
-}
-
-impl Default for RandomLoadBalance {
-    fn default() -> Self {
-        RandomLoadBalance {
-            metadata: Metadata::new("random"),
-        }
-    }
-}
-
-impl Debug for RandomLoadBalance {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        write!(f, "RandomLoadBalance")
-    }
-}
-
-impl LoadBalance for RandomLoadBalance {
-    fn select(
-        &self,
-        invokers: Arc<Vec<Url>>,
-        _url: Option<Url>,
-        _invocation: Arc<RpcInvocation>,
-    ) -> Option<Url> {
-        if invokers.is_empty() {
-            return None;
-        }
-        let index = rand::random::<usize>() % invokers.len();
-        Some(invokers[index].clone())
-    }
-}
diff --git a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
deleted file mode 100644
index 5fd0ed4..0000000
--- a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use dubbo_base::Url;
-use std::{
-    collections::HashMap,
-    fmt::{Debug, Formatter},
-    sync::{
-        atomic::{AtomicUsize, Ordering},
-        Arc, RwLock,
-    },
-};
-
-use crate::{
-    cluster::loadbalance::types::{LoadBalance, Metadata},
-    codegen::RpcInvocation,
-};
-
-pub struct RoundRobinLoadBalance {
-    pub metadata: Metadata,
-    pub counter_map: RwLock<HashMap<String, AtomicUsize>>,
-}
-
-impl Default for RoundRobinLoadBalance {
-    fn default() -> Self {
-        RoundRobinLoadBalance {
-            metadata: Metadata::new("roundrobin"),
-            counter_map: RwLock::new(HashMap::new()),
-        }
-    }
-}
-
-impl Debug for RoundRobinLoadBalance {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        write!(f, "RoundRobinLoadBalance")
-    }
-}
-
-impl RoundRobinLoadBalance {
-    fn guarantee_counter_key(&self, key: &str) {
-        let contained = self.counter_map.try_read().unwrap().contains_key(key);
-        if !contained {
-            self.counter_map
-                .try_write()
-                .unwrap()
-                .insert(key.to_string(), AtomicUsize::new(0));
-        }
-    }
-}
-
-impl LoadBalance for RoundRobinLoadBalance {
-    fn select(
-        &self,
-        invokers: Arc<Vec<Url>>,
-        _url: Option<Url>,
-        invocation: Arc<RpcInvocation>,
-    ) -> Option<Url> {
-        if invokers.is_empty() {
-            return None;
-        }
-        let fingerprint = invocation.unique_fingerprint();
-        self.guarantee_counter_key(fingerprint.as_str());
-        let index = self
-            .counter_map
-            .try_read()
-            .unwrap()
-            .get(fingerprint.as_str())?
-            .fetch_add(1, Ordering::SeqCst)
-            % invokers.len();
-        Some(invokers[index].clone())
-    }
-}
diff --git a/dubbo/src/cluster/loadbalance/mod.rs b/dubbo/src/cluster/loadbalance/mod.rs
deleted file mode 100644
index 1d04f7f..0000000
--- a/dubbo/src/cluster/loadbalance/mod.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use std::collections::HashMap;
-
-use lazy_static::lazy_static;
-
-use crate::cluster::loadbalance::{
-    impls::{random::RandomLoadBalance, roundrobin::RoundRobinLoadBalance},
-    types::BoxLoadBalance,
-};
-
-pub mod impls;
-pub mod types;
-
-lazy_static! {
-    pub static ref LOAD_BALANCE_EXTENSIONS: HashMap<String, BoxLoadBalance> =
-        init_loadbalance_extensions();
-}
-
-fn init_loadbalance_extensions() -> HashMap<String, BoxLoadBalance> {
-    let mut loadbalance_map: HashMap<String, BoxLoadBalance> = HashMap::new();
-    loadbalance_map.insert("random".to_string(), Box::new(RandomLoadBalance::default()));
-    loadbalance_map.insert(
-        "roundrobin".to_string(),
-        Box::new(RoundRobinLoadBalance::default()),
-    );
-    loadbalance_map
-}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index d1f96f9..1a20c16 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,165 +15,71 @@
  * limitations under the License.
  */
 
-use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll};
-
-use aws_smithy_http::body::SdkBody;
-use dubbo_base::Url;
-use dyn_clone::DynClone;
+use http::Request;
+use tower_service::Service;
 
 use crate::{
-    empty_body,
-    invocation::RpcInvocation,
-    protocol::{BoxInvoker, Invoker},
+    codegen::RpcInvocation, invoker::clone_body::CloneBody, param::Param, svc::NewService,
 };
 
-pub mod directory;
-pub mod loadbalance;
+use self::failover::Failover;
 
-pub trait Directory: Debug + DynClone {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
-    // fn is_empty(&self) -> bool;
+mod failover;
+
+pub struct NewCluster<N> {
+    inner: N, // new loadbalancer service
 }
 
-dyn_clone::clone_trait_object!(Directory);
-
-type BoxDirectory = Box<dyn Directory + Send + Sync>;
-
-pub trait Cluster {
-    fn join(&self, dir: BoxDirectory) -> BoxInvoker;
+pub struct Cluster<S> {
+    inner: S, // failover service
 }
 
-#[derive(Debug, Default)]
-pub struct MockCluster {}
-
-impl Cluster for MockCluster {
-    fn join(&self, dir: BoxDirectory) -> BoxInvoker {
-        Box::new(FailoverCluster::new(dir))
-    }
-}
-#[derive(Clone, Debug)]
-pub struct FailoverCluster {
-    dir: Arc<BoxDirectory>,
-}
-
-impl FailoverCluster {
-    pub fn new(dir: BoxDirectory) -> FailoverCluster {
-        Self { dir: Arc::new(dir) }
+impl<N> NewCluster<N> {
+    pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+        tower_layer::layer_fn(|inner: N| {
+            NewCluster {
+                inner, // new loadbalancer service
+            }
+        })
     }
 }
 
-impl Invoker<http::Request<SdkBody>> for FailoverCluster {
-    type Response = http::Response<crate::BoxBody>;
+impl<S, T> NewService<T> for NewCluster<S>
+where
+    T: Param<RpcInvocation>,
+    // new loadbalancer service
+    S: NewService<T>,
+{
+    type Service = Cluster<Failover<S::Service>>;
 
-    type Error = crate::Error;
+    fn new_service(&self, target: T) -> Self::Service {
+        Cluster {
+            inner: Failover::new(self.inner.new_service(target)),
+        }
+    }
+}
 
-    type Future = crate::BoxFuture<Self::Response, Self::Error>;
+impl<S> Service<Request<hyper::Body>> for Cluster<S>
+where
+    S: Service<Request<CloneBody>>,
+{
+    type Response = S::Response;
+
+    type Error = S::Error;
+
+    type Future = S::Future;
 
     fn poll_ready(
         &mut self,
-        _cx: &mut std::task::Context<'_>,
+        cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Result<(), Self::Error>> {
-        // if self.dir.is_empty() return err
-        Poll::Ready(Ok(()))
+        self.inner.poll_ready(cx)
     }
 
-    fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
-        // let clone_body = req.body().try_clone().unwrap();
-        // let mut clone_req = http::Request::builder()
-        //     .uri(req.uri().clone())
-        //     .method(req.method().clone());
-        // *clone_req.headers_mut().unwrap() = req.headers().clone();
-        // let r = clone_req.body(clone_body).unwrap();
-        let invokers = self.dir.list(
-            RpcInvocation::default()
-                .with_service_unique_name("hello".to_string())
-                .into(),
-        );
-        for mut invoker in invokers {
-            let fut = async move {
-                let res = invoker.call(req).await;
-                return res;
-            };
-            return Box::pin(fut);
-        }
-        Box::pin(async move {
-            Ok(http::Response::builder()
-                .status(200)
-                .header("grpc-status", "12")
-                .header("content-type", "application/grpc")
-                .body(empty_body())
-                .unwrap())
-        })
-    }
-
-    fn get_url(&self) -> dubbo_base::Url {
-        Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap()
-    }
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct MockDirectory {
-    // router_chain: RouterChain,
-    invokers: Vec<BoxInvoker>,
-}
-
-impl MockDirectory {
-    pub fn new(invokers: Vec<BoxInvoker>) -> MockDirectory {
-        Self {
-            // router_chain: RouterChain::default(),
-            invokers,
-        }
-    }
-}
-
-impl Directory for MockDirectory {
-    fn list(&self, _invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
-        // tracing::info!("MockDirectory: {}", meta);
-        let _u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
-        // vec![Box::new(TripleInvoker::new(u))]
-        // self.router_chain.route(u, invo);
-        self.invokers.clone()
-    }
-
-    // fn is_empty(&self) -> bool {
-    //     false
-    // }
-}
-
-#[derive(Debug, Default)]
-pub struct RouterChain {
-    router: HashMap<String, BoxRouter>,
-    invokers: Vec<BoxInvoker>,
-}
-
-impl RouterChain {
-    pub fn route(&self, url: Url, invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
-        let r = self.router.get("mock").unwrap();
-        r.route(self.invokers.clone(), url, invo)
-    }
-}
-
-pub trait Router: Debug {
-    fn route(
-        &self,
-        invokers: Vec<BoxInvoker>,
-        url: Url,
-        invo: Arc<RpcInvocation>,
-    ) -> Vec<BoxInvoker>;
-}
-
-pub type BoxRouter = Box<dyn Router + Sync + Send>;
-
-#[derive(Debug, Default)]
-pub struct MockRouter {}
-
-impl Router for MockRouter {
-    fn route(
-        &self,
-        invokers: Vec<BoxInvoker>,
-        _url: Url,
-        _invo: Arc<RpcInvocation>,
-    ) -> Vec<BoxInvoker> {
-        invokers
+    fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
+        let (parts, body) = req.into_parts();
+        let clone_body = CloneBody::new(body);
+        let req = Request::from_parts(parts, clone_body);
+        self.inner.call(req)
     }
 }
diff --git a/dubbo/src/cluster/router/condition/condition_router.rs b/dubbo/src/cluster/router/condition/condition_router.rs
new file mode 100644
index 0000000..21b525a
--- /dev/null
+++ b/dubbo/src/cluster/router/condition/condition_router.rs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::{
+    cluster::router::{condition::single_router::ConditionSingleRouter, Router},
+    codegen::RpcInvocation,
+};
+use dubbo_base::Url;
+use std::{
+    fmt::Debug,
+    sync::{Arc, RwLock},
+};
+
+#[derive(Default, Debug, Clone)]
+pub struct ConditionRouter {
+    //condition router for service scope
+    pub service_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
+    //condition router for application  scope
+    pub application_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
+}
+
+impl Router for ConditionRouter {
+    fn route(&self, mut invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
+        if let Some(routers) = &self.application_routers {
+            for router in &routers.read().unwrap().routers {
+                invokers = router.route(invokers, url.clone(), invo.clone());
+            }
+        }
+        if let Some(routers) = &self.service_routers {
+            for router in &routers.read().unwrap().routers {
+                invokers = router.route(invokers, url.clone(), invo.clone());
+            }
+        }
+        invokers
+    }
+}
+
+impl ConditionRouter {
+    pub fn new(
+        service_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
+        application_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
+    ) -> Self {
+        Self {
+            service_routers,
+            application_routers,
+        }
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct ConditionSingleRouters {
+    pub routers: Vec<ConditionSingleRouter>,
+}
+
+impl ConditionSingleRouters {
+    pub fn new(routers: Vec<ConditionSingleRouter>) -> Self {
+        Self { routers }
+    }
+    pub fn is_null(&self) -> bool {
+        self.routers.is_empty()
+    }
+}
diff --git a/dubbo/src/cluster/router/condition/matcher.rs b/dubbo/src/cluster/router/condition/matcher.rs
new file mode 100644
index 0000000..2ee33d6
--- /dev/null
+++ b/dubbo/src/cluster/router/condition/matcher.rs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use regex::Regex;
+use std::{collections::HashSet, error::Error, option::Option};
+
+#[derive(Clone, Debug, Default)]
+pub struct ConditionMatcher {
+    _key: String,
+    matches: HashSet<String>,
+    mismatches: HashSet<String>,
+}
+
+impl ConditionMatcher {
+    pub fn new(_key: String) -> Self {
+        ConditionMatcher {
+            _key,
+            matches: HashSet::new(),
+            mismatches: HashSet::new(),
+        }
+    }
+
+    pub fn is_match(&self, value: Option<String>) -> Result<bool, Box<dyn Error>> {
+        match value {
+            None => Ok(false),
+            Some(val) => {
+                for match_ in self.matches.iter() {
+                    if self.do_pattern_match(match_, &val) {
+                        return Ok(true);
+                    }
+                }
+                for mismatch in self.mismatches.iter() {
+                    if !self.do_pattern_match(mismatch, &val) {
+                        return Ok(true);
+                    }
+                }
+                Ok(false)
+            }
+        }
+    }
+
+    pub fn get_matches(&mut self) -> &mut HashSet<String> {
+        &mut self.matches
+    }
+    pub fn get_mismatches(&mut self) -> &mut HashSet<String> {
+        &mut self.mismatches
+    }
+
+    fn do_pattern_match(&self, pattern: &str, value: &str) -> bool {
+        if pattern.contains('*') {
+            return star_matcher(pattern, value);
+        }
+
+        if pattern.contains('~') {
+            let parts: Vec<&str> = pattern.split('~').collect();
+
+            if parts.len() == 2 {
+                if let (Ok(left), Ok(right), Ok(val)) = (
+                    parts[0].parse::<i32>(),
+                    parts[1].parse::<i32>(),
+                    value.parse::<i32>(),
+                ) {
+                    return range_matcher(val, left, right);
+                }
+            }
+            return false;
+        }
+        pattern == value
+    }
+}
+
+pub fn star_matcher(pattern: &str, input: &str) -> bool {
+    // 将*替换为任意字符的正则表达式
+    let pattern = pattern.replace("*", ".*");
+    let regex = Regex::new(&pattern).unwrap();
+    regex.is_match(input)
+}
+
+pub fn range_matcher(val: i32, min: i32, max: i32) -> bool {
+    min <= val && val <= max
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/router/condition/mod.rs
similarity index 92%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/cluster/router/condition/mod.rs
index 5a84af8..d4a83b9 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/cluster/router/condition/mod.rs
@@ -14,5 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-pub mod random;
-pub mod roundrobin;
+pub mod condition_router;
+pub mod matcher;
+pub mod single_router;
diff --git a/dubbo/src/cluster/router/condition/single_router.rs b/dubbo/src/cluster/router/condition/single_router.rs
new file mode 100644
index 0000000..54c61cb
--- /dev/null
+++ b/dubbo/src/cluster/router/condition/single_router.rs
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use dubbo_base::Url;
+use dubbo_logger::tracing::info;
+use regex::Regex;
+use std::{
+    collections::HashMap,
+    sync::{Arc, RwLock},
+};
+
+use crate::{
+    cluster::router::{condition::matcher::ConditionMatcher, utils::to_original_map, Router},
+    codegen::RpcInvocation,
+    invocation::Invocation,
+};
+
+#[derive(Debug, Clone, Default)]
+pub struct ConditionSingleRouter {
+    pub name: String,
+    pub when_condition: HashMap<String, Arc<RwLock<ConditionMatcher>>>,
+    pub then_condition: HashMap<String, Arc<RwLock<ConditionMatcher>>>,
+    pub enabled: bool,
+    pub force: bool,
+}
+
+impl Router for ConditionSingleRouter {
+    fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+        if !self.enabled {
+            return invokers;
+        };
+        let mut result = Vec::new();
+        if self.match_when(url.clone(), invocation.clone()) {
+            for invoker in &invokers.clone() {
+                if self.match_then(invoker.clone(), invocation.clone()) {
+                    result.push(invoker.clone());
+                }
+            }
+            if result.is_empty() && self.force == false {
+                invokers
+            } else {
+                result
+            }
+        } else {
+            invokers
+        }
+    }
+}
+
+impl ConditionSingleRouter {
+    pub fn new(rule: String, force: bool, enabled: bool) -> Self {
+        let mut router = Self {
+            name: "condition".to_string(),
+            when_condition: HashMap::new(),
+            then_condition: HashMap::new(),
+            enabled,
+            force,
+        };
+        if enabled {
+            router.init(rule).expect("parse rule error");
+        }
+        router
+    }
+
+    fn init(&mut self, rule: String) -> Result<(), Box<dyn std::error::Error>> {
+        match rule.trim().is_empty() {
+            true => Err("Illegal route rule!".into()),
+            false => {
+                let r = rule.replace("consumer.", "").replace("provider.", "");
+                let i = r.find("=>").unwrap_or_else(|| r.len());
+                let when_rule = r[..i].trim().to_string();
+                let then_rule = r[(i + 2)..].trim().to_string();
+                let when = if when_rule.is_empty() || when_rule == "true" {
+                    HashMap::new()
+                } else {
+                    self.parse_rule(&when_rule)?
+                };
+                let then = if then_rule.is_empty() || then_rule == "false" {
+                    HashMap::new()
+                } else {
+                    self.parse_rule(&then_rule)?
+                };
+                self.when_condition = when;
+                self.then_condition = then;
+                Ok(())
+            }
+        }
+    }
+
+    fn parse_rule(
+        &mut self,
+        rule: &str,
+    ) -> Result<HashMap<String, Arc<RwLock<ConditionMatcher>>>, Box<dyn std::error::Error>> {
+        let mut conditions: HashMap<String, Arc<RwLock<ConditionMatcher>>> = HashMap::new();
+        let mut current_matcher: Option<Arc<RwLock<ConditionMatcher>>> = None;
+        let regex = Regex::new(r"([&!=,]*)\s*([^&!=,\s]+)").unwrap();
+        for cap in regex.captures_iter(rule) {
+            let separator = &cap[1];
+            let content = &cap[2];
+
+            match separator {
+                "" => {
+                    let current_key = content.to_string();
+                    current_matcher =
+                        Some(Arc::new(RwLock::new(self.get_matcher(current_key.clone()))));
+                    conditions.insert(
+                        current_key.clone(),
+                        current_matcher.as_ref().unwrap().clone(),
+                    );
+                }
+                "&" => {
+                    let current_key = content.to_string();
+                    current_matcher = conditions.get(&current_key).cloned();
+                    if current_matcher.is_none() {
+                        let matcher = Arc::new(RwLock::new(self.get_matcher(current_key.clone())));
+                        conditions.insert(current_key.clone(), matcher.clone());
+                        current_matcher = Some(matcher);
+                    }
+                }
+                "=" => {
+                    if let Some(matcher) = &current_matcher {
+                        let mut matcher_borrowed = matcher.write().unwrap();
+                        matcher_borrowed
+                            .get_matches()
+                            .insert(content.to_string().clone());
+                    } else {
+                        return Err("Error: ...".into());
+                    }
+                }
+                "!=" => {
+                    if let Some(matcher) = &current_matcher {
+                        let mut matcher_borrowed = matcher.write().unwrap();
+                        matcher_borrowed
+                            .get_mismatches()
+                            .insert(content.to_string().clone());
+                    } else {
+                        return Err("Error: ...".into());
+                    }
+                }
+                "," => {
+                    if let Some(matcher) = &current_matcher {
+                        let mut matcher_borrowed = matcher.write().unwrap();
+                        if matcher_borrowed.get_matches().is_empty()
+                            && matcher_borrowed.get_mismatches().is_empty()
+                        {
+                            return Err("Error: ...".into());
+                        }
+                        drop(matcher_borrowed);
+                        let mut matcher_borrowed_mut = matcher.write().unwrap();
+                        matcher_borrowed_mut
+                            .get_matches()
+                            .insert(content.to_string().clone());
+                    } else {
+                        return Err("Error: ...".into());
+                    }
+                }
+                _ => {
+                    return Err("Error: ...".into());
+                }
+            }
+        }
+        Ok(conditions)
+    }
+
+    // pub fn is_runtime(&self) -> bool {
+    //     // same as the Java version
+    // }
+
+    pub fn get_matcher(&self, key: String) -> ConditionMatcher {
+        ConditionMatcher::new(key)
+    }
+
+    pub fn match_when(&self, url: Url, invocation: Arc<RpcInvocation>) -> bool {
+        if self.when_condition.is_empty() {
+            return true;
+        }
+        self.do_match(url, &self.when_condition, invocation)
+    }
+
+    pub fn match_then(&self, url: Url, invocation: Arc<RpcInvocation>) -> bool {
+        if self.then_condition.is_empty() {
+            return false;
+        }
+        self.do_match(url, &self.then_condition, invocation)
+    }
+
+    pub fn do_match(
+        &self,
+        url: Url,
+        conditions: &HashMap<String, Arc<RwLock<ConditionMatcher>>>,
+        invocation: Arc<RpcInvocation>,
+    ) -> bool {
+        let sample: HashMap<String, String> = to_original_map(url);
+        conditions.iter().all(|(key, condition_matcher)| {
+            let matcher = condition_matcher.read().unwrap();
+            let value = get_value(key, &sample, invocation.clone());
+            match matcher.is_match(value) {
+                Ok(result) => result,
+                Err(error) => {
+                    info!("Error occurred: {:?}", error);
+                    false
+                }
+            }
+        })
+    }
+}
+
+fn get_value(
+    key: &String,
+    sample: &HashMap<String, String>,
+    invocation: Arc<RpcInvocation>,
+) -> Option<String> {
+    if key == "method" {
+        let method_param = invocation.get_method_name();
+        return Some(method_param);
+    }
+    sample.get(key).cloned()
+}
diff --git a/dubbo/src/cluster/router/manager/condition_manager.rs b/dubbo/src/cluster/router/manager/condition_manager.rs
new file mode 100644
index 0000000..7772950
--- /dev/null
+++ b/dubbo/src/cluster/router/manager/condition_manager.rs
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::cluster::router::condition::{
+    condition_router::{ConditionRouter, ConditionSingleRouters},
+    single_router::ConditionSingleRouter,
+};
+use dubbo_config::router::ConditionRouterConfig;
+use std::{
+    collections::HashMap,
+    sync::{Arc, RwLock},
+};
+
+#[derive(Debug, Clone, Default)]
+pub struct ConditionRouterManager {
+    //Application-level routing applies globally, while service-level routing only affects a specific service.
+    pub routers_service: HashMap<String, Arc<RwLock<ConditionSingleRouters>>>,
+    pub routers_application: Arc<RwLock<ConditionSingleRouters>>,
+}
+
+impl ConditionRouterManager {
+    pub fn get_router(&self, service_name: &String) -> Option<ConditionRouter> {
+        let routers_application_is_null = self.routers_application.read().unwrap().is_null();
+        self.routers_service
+            .get(service_name)
+            .map(|routers_service| {
+                ConditionRouter::new(
+                    Some(routers_service.clone()),
+                    if routers_application_is_null {
+                        None
+                    } else {
+                        Some(self.routers_application.clone())
+                    },
+                )
+            })
+            .or_else(|| {
+                if routers_application_is_null {
+                    None
+                } else {
+                    Some(ConditionRouter::new(
+                        None,
+                        Some(self.routers_application.clone()),
+                    ))
+                }
+            })
+    }
+
+    pub fn update(&mut self, config: ConditionRouterConfig) {
+        let force = config.force;
+        let scope = config.scope;
+        let key = config.key;
+        let enable = config.enabled;
+
+        let routers = config
+            .conditions
+            .into_iter()
+            .map(|condition| ConditionSingleRouter::new(condition, force, enable))
+            .collect::<Vec<_>>();
+
+        match scope.as_str() {
+            "application" => {
+                self.routers_application.write().unwrap().routers = routers;
+            }
+            "service" => {
+                self.routers_service
+                    .entry(key)
+                    .or_insert_with(|| Arc::new(RwLock::new(ConditionSingleRouters::new(vec![]))))
+                    .write()
+                    .unwrap()
+                    .routers = routers;
+            }
+            _ => {}
+        }
+    }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/router/manager/mod.rs
similarity index 92%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/cluster/router/manager/mod.rs
index 5a84af8..593fa22 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/cluster/router/manager/mod.rs
@@ -14,5 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-pub mod random;
-pub mod roundrobin;
+mod condition_manager;
+pub mod router_manager;
+mod tag_manager;
diff --git a/dubbo/src/cluster/router/manager/router_manager.rs b/dubbo/src/cluster/router/manager/router_manager.rs
new file mode 100644
index 0000000..e6c8b6c
--- /dev/null
+++ b/dubbo/src/cluster/router/manager/router_manager.rs
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::cluster::router::{
+    manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager},
+    nacos_config_center::nacos_client::NacosClient,
+    router_chain::RouterChain,
+};
+use dubbo_base::Url;
+use dubbo_config::{
+    get_global_config,
+    router::{ConditionRouterConfig, NacosConfig, TagRouterConfig},
+};
+use dubbo_logger::tracing::{info, trace};
+use once_cell::sync::OnceCell;
+use std::{
+    collections::HashMap,
+    sync::{Arc, RwLock},
+};
+
+pub static GLOBAL_ROUTER_MANAGER: OnceCell<Arc<RwLock<RouterManager>>> = OnceCell::new();
+const TAG: &str = "tag";
+const CONDITION: &str = "condition";
+pub struct RouterManager {
+    pub condition_router_manager: ConditionRouterManager,
+    pub tag_router_manager: TagRouterManager,
+    pub nacos: Option<NacosClient>,
+    pub consumer: HashMap<String, Url>,
+}
+
+impl RouterManager {
+    pub fn get_router_chain(&self, service: String) -> RouterChain {
+        let mut chain = RouterChain::new();
+        if let Some(url) = self.consumer.get(service.as_str()) {
+            if let Some(tag_router) = self.tag_router_manager.get_router(&service) {
+                chain.add_router(TAG.to_string(), Box::new(tag_router));
+            }
+            if let Some(condition_router) = self.condition_router_manager.get_router(&service) {
+                chain.add_router(CONDITION.to_string(), Box::new(condition_router));
+            }
+            chain.self_url = url.clone();
+        }
+        chain
+    }
+
+    pub fn notify(&mut self, event: RouterConfigChangeEvent) {
+        match event.router_kind.as_str() {
+            CONDITION => {
+                let config: ConditionRouterConfig =
+                    serde_yaml::from_str(event.content.as_str()).unwrap();
+                self.condition_router_manager.update(config)
+            }
+            TAG => {
+                let config: TagRouterConfig = serde_yaml::from_str(event.content.as_str()).unwrap();
+                self.tag_router_manager.update(config)
+            }
+            _ => {
+                info!("other router change event")
+            }
+        }
+    }
+
+    pub fn init_nacos(&mut self, config: NacosConfig) {
+        self.nacos = Some(NacosClient::new_init_client(config));
+        self.init_router_managers_for_nacos();
+    }
+
+    fn init_router_managers_for_nacos(&mut self) {
+        if let Some(tag_config) = self
+            .nacos
+            .as_ref()
+            .and_then(|n| n.get_config("application", TAG, TAG))
+        {
+            self.tag_router_manager.update(tag_config);
+        }
+
+        if let Some(condition_app_config) = self
+            .nacos
+            .as_ref()
+            .and_then(|n| n.get_config("application", CONDITION, TAG))
+        {
+            self.condition_router_manager.update(condition_app_config);
+        }
+
+        for (service_name, _) in &self.consumer {
+            if let Some(condition_config) = self
+                .nacos
+                .as_ref()
+                .and_then(|n| n.get_config(service_name, CONDITION, CONDITION))
+            {
+                self.condition_router_manager.update(condition_config);
+            }
+        }
+    }
+
+    pub fn init(&mut self) {
+        let config = get_global_config().routers.clone();
+        self.init_consumer_configs();
+        if let Some(nacos_config) = &config.nacos {
+            self.init_nacos(nacos_config.clone());
+        } else {
+            trace!("Nacos not configured, using local YAML configuration for routing");
+            if let Some(condition_configs) = &config.conditions {
+                for condition_config in condition_configs {
+                    self.condition_router_manager
+                        .update(condition_config.clone());
+                }
+            } else {
+                info!("Unconfigured Condition Router")
+            }
+            if let Some(tag_config) = &config.tags {
+                self.tag_router_manager.update(tag_config.clone());
+            } else {
+                info!("Unconfigured Tag Router")
+            }
+        }
+    }
+
+    fn init_consumer_configs(&mut self) {
+        let consumer_configs = get_global_config()
+            .routers
+            .consumer
+            .clone()
+            .unwrap_or_else(Vec::new);
+
+        for consumer_config in consumer_configs {
+            let service_url = Url::from_url(
+                format!("{}/{}", consumer_config.url, consumer_config.service).as_str(),
+            )
+            .expect("Consumer config error");
+
+            self.consumer.insert(consumer_config.service, service_url);
+        }
+    }
+}
+
+pub fn get_global_router_manager() -> &'static Arc<RwLock<RouterManager>> {
+    GLOBAL_ROUTER_MANAGER.get_or_init(|| {
+        let mut router_manager = RouterManager {
+            condition_router_manager: ConditionRouterManager::default(),
+            tag_router_manager: TagRouterManager::default(),
+            nacos: None,
+            consumer: HashMap::new(),
+        };
+        router_manager.init();
+        return Arc::new(RwLock::new(router_manager));
+    })
+}
+
+#[derive(Debug, Default, Clone)]
+pub struct RouterConfigChangeEvent {
+    pub service_name: String,
+    pub router_kind: String,
+    pub content: String,
+}
diff --git a/dubbo/src/cluster/router/manager/tag_manager.rs b/dubbo/src/cluster/router/manager/tag_manager.rs
new file mode 100644
index 0000000..f028af2
--- /dev/null
+++ b/dubbo/src/cluster/router/manager/tag_manager.rs
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::cluster::router::tag::tag_router::{TagRouter, TagRouterInner};
+use dubbo_config::router::TagRouterConfig;
+use std::sync::{Arc, RwLock};
+
+#[derive(Debug, Clone, Default)]
+pub struct TagRouterManager {
+    pub tag_router: Arc<RwLock<TagRouterInner>>,
+}
+
+impl TagRouterManager {
+    pub fn get_router(&self, _service_name: &String) -> Option<TagRouter> {
+        Some(TagRouter {
+            inner: self.tag_router.clone(),
+        })
+    }
+
+    pub fn update(&mut self, config: TagRouterConfig) {
+        self.tag_router.write().unwrap().parse_config(config);
+    }
+}
diff --git a/dubbo/src/cluster/loadbalance/types.rs b/dubbo/src/cluster/router/mod.rs
similarity index 61%
rename from dubbo/src/cluster/loadbalance/types.rs
rename to dubbo/src/cluster/router/mod.rs
index 9273d07..edc081b 100644
--- a/dubbo/src/cluster/loadbalance/types.rs
+++ b/dubbo/src/cluster/router/mod.rs
@@ -14,29 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+pub mod condition;
+pub mod manager;
+pub mod nacos_config_center;
+pub mod router_chain;
+pub mod tag;
+pub mod utils;
 
+use crate::invocation::RpcInvocation;
 use dubbo_base::Url;
 use std::{fmt::Debug, sync::Arc};
 
-use crate::codegen::RpcInvocation;
-
-pub type BoxLoadBalance = Box<dyn LoadBalance + Send + Sync>;
-
-pub trait LoadBalance: Debug {
-    fn select(
-        &self,
-        invokers: Arc<Vec<Url>>,
-        url: Option<Url>,
-        invocation: Arc<RpcInvocation>,
-    ) -> Option<Url>;
+pub trait Router: Debug {
+    fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url>;
 }
 
-pub struct Metadata {
-    pub name: &'static str,
-}
+pub type BoxRouter = Box<dyn Router + Sync + Send>;
 
-impl Metadata {
-    pub fn new(name: &'static str) -> Self {
-        Metadata { name }
+#[derive(Debug, Default, Clone)]
+pub struct MockRouter {}
+
+impl Router for MockRouter {
+    fn route(&self, invokers: Vec<Url>, _url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
+        invokers
     }
 }
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/router/nacos_config_center/mod.rs
similarity index 95%
rename from dubbo/src/cluster/loadbalance/impls/mod.rs
rename to dubbo/src/cluster/router/nacos_config_center/mod.rs
index 5a84af8..7172231 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/cluster/router/nacos_config_center/mod.rs
@@ -14,5 +14,4 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-pub mod random;
-pub mod roundrobin;
+pub mod nacos_client;
diff --git a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
new file mode 100644
index 0000000..68b5f09
--- /dev/null
+++ b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::cluster::router::manager::router_manager::{
+    get_global_router_manager, RouterConfigChangeEvent,
+};
+use dubbo_config::router::NacosConfig;
+use dubbo_logger::{tracing, tracing::info};
+use nacos_sdk::api::{
+    config::{ConfigChangeListener, ConfigResponse, ConfigService, ConfigServiceBuilder},
+    props::ClientProps,
+};
+use std::sync::{Arc, RwLock};
+
+pub struct NacosClient {
+    pub client: Arc<RwLock<dyn ConfigService>>,
+}
+
+unsafe impl Send for NacosClient {}
+
+unsafe impl Sync for NacosClient {}
+
+pub struct ConfigChangeListenerImpl;
+
+impl NacosClient {
+    pub fn new_init_client(config: NacosConfig) -> Self {
+        let server_addr = config.addr;
+        let namespace = config.namespace;
+        let app = config.app;
+        let enable_auth = config.enable_auth;
+
+        let mut props = ClientProps::new()
+            .server_addr(server_addr)
+            .namespace(namespace)
+            .app_name(app);
+
+        if enable_auth.is_some() {
+            info!("enable nacos auth!");
+        } else {
+            info!("disable nacos auth!");
+        }
+
+        if let Some(auth) = enable_auth {
+            props = props
+                .auth_username(auth.auth_username)
+                .auth_password(auth.auth_password);
+        }
+
+        let client = Arc::new(RwLock::new(
+            ConfigServiceBuilder::new(props)
+                .build()
+                .expect("NacosClient build failed! Please check NacosConfig"),
+        ));
+
+        Self { client }
+    }
+
+    pub fn get_config<T>(&self, data_id: &str, group: &str, config_type: &str) -> Option<T>
+    where
+        T: serde::de::DeserializeOwned,
+    {
+        let config_resp = self
+            .client
+            .read()
+            .unwrap()
+            .get_config(data_id.to_string(), group.to_string());
+
+        match config_resp {
+            Ok(config_resp) => {
+                self.add_listener(data_id, group);
+                let string = config_resp.content();
+                let result = serde_yaml::from_str(string);
+
+                match result {
+                    Ok(config) => {
+                        info!(
+                            "success to get {}Router config and parse success",
+                            config_type
+                        );
+                        Some(config)
+                    }
+                    Err(_) => {
+                        info!("failed to parse {}Router rule", config_type);
+                        None
+                    }
+                }
+            }
+            Err(_) => None,
+        }
+    }
+
+    pub fn add_listener(&self, data_id: &str, group: &str) {
+        if let Err(err) = self
+            .client
+            .write()
+            .map_err(|e| format!("failed to create nacos config listener: {}", e))
+            .and_then(|client| {
+                client
+                    .add_listener(
+                        data_id.to_string(),
+                        group.to_string(),
+                        Arc::new(ConfigChangeListenerImpl {}),
+                    )
+                    .map_err(|e| format!("failed to add nacos config listener: {}", e))
+            })
+        {
+            tracing::error!("{}", err);
+        } else {
+            info!("listening the config success");
+        }
+    }
+}
+
+impl ConfigChangeListener for ConfigChangeListenerImpl {
+    fn notify(&self, config_resp: ConfigResponse) {
+        let content_type = config_resp.content_type();
+        let event = RouterConfigChangeEvent {
+            service_name: config_resp.data_id().to_string(),
+            router_kind: config_resp.group().to_string(),
+            content: config_resp.content().to_string(),
+        };
+
+        if content_type == "yaml" {
+            get_global_router_manager().write().unwrap().notify(event);
+        }
+
+        info!("notify config: {:?}", config_resp);
+    }
+}
diff --git a/dubbo/src/cluster/router/router_chain.rs b/dubbo/src/cluster/router/router_chain.rs
new file mode 100644
index 0000000..601bc5e
--- /dev/null
+++ b/dubbo/src/cluster/router/router_chain.rs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::{cluster::router::BoxRouter, invocation::RpcInvocation};
+use dubbo_base::Url;
+use std::{collections::HashMap, sync::Arc};
+
+#[derive(Debug, Default)]
+pub struct RouterChain {
+    pub routers: HashMap<String, BoxRouter>,
+    pub self_url: Url,
+}
+
+impl RouterChain {
+    pub fn new() -> Self {
+        RouterChain {
+            routers: HashMap::new(),
+            self_url: Url::new(),
+        }
+    }
+
+    pub fn route(&self, mut invokers: Vec<Url>, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+        for (_, value) in self.routers.iter() {
+            invokers = value.route(invokers, self.self_url.clone(), invocation.clone())
+        }
+        invokers
+    }
+
+    pub fn add_router(&mut self, key: String, router: BoxRouter) {
+        self.routers.insert(key, router);
+    }
+}
+
+#[test]
+fn test() {
+    use crate::cluster::router::manager::router_manager::get_global_router_manager;
+
+    let u1 = Url::from_url("tri://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u2 = Url::from_url("tri://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u3 = Url::from_url("tri://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u4 = Url::from_url("tri://127.0.2.1:8880/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u5 = Url::from_url("tri://127.0.1.1:8882/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u6 = Url::from_url("tri://213.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u7 = Url::from_url("tri://169.0.1.1:8887/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let invs = vec![u1, u2, u3, u4, u5, u6, u7];
+    let len = invs.len().clone();
+    let inv = Arc::new(
+        RpcInvocation::default()
+            .with_method_name("greet".to_string())
+            .with_service_unique_name("org.apache.dubbo.sample.tri.Greeter".to_string()),
+    );
+    let x = get_global_router_manager()
+        .read()
+        .unwrap()
+        .get_router_chain(inv.get_target_service_unique_name());
+    let result = x.route(invs, inv.clone());
+    println!("total:{},result:{}", len, result.len().clone());
+    dbg!(result);
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/router/tag/mod.rs
similarity index 95%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/cluster/router/tag/mod.rs
index 5a84af8..673a720 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/cluster/router/tag/mod.rs
@@ -14,5 +14,4 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-pub mod random;
-pub mod roundrobin;
+pub mod tag_router;
diff --git a/dubbo/src/cluster/router/tag/tag_router.rs b/dubbo/src/cluster/router/tag/tag_router.rs
new file mode 100644
index 0000000..3d28f93
--- /dev/null
+++ b/dubbo/src/cluster/router/tag/tag_router.rs
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::{
+    cluster::router::{utils::to_original_map, Router},
+    codegen::RpcInvocation,
+};
+use dubbo_base::Url;
+use dubbo_config::router::TagRouterConfig;
+use std::{
+    collections::HashMap,
+    fmt::Debug,
+    sync::{Arc, RwLock},
+};
+
+#[derive(Debug, Clone, Default)]
+pub struct TagRouterInner {
+    pub tag_rules: HashMap<String, HashMap<String, String>>,
+    pub force: bool,
+    pub enabled: bool,
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct TagRouter {
+    pub(crate) inner: Arc<RwLock<TagRouterInner>>,
+}
+impl Router for TagRouter {
+    fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+        return self.inner.read().unwrap().route(invokers, url, invocation);
+    }
+}
+
+impl TagRouterInner {
+    pub fn parse_config(&mut self, config: TagRouterConfig) {
+        self.tag_rules = HashMap::new();
+        self.force = config.force;
+        self.enabled = config.enabled;
+        for tag in &config.tags {
+            let mut tags = HashMap::new();
+            for rule in &tag.matches {
+                tags.insert(rule.key.clone(), rule.value.clone());
+            }
+            self.tag_rules.insert(tag.name.clone(), tags);
+        }
+    }
+
+    pub fn match_tag(&self, params: HashMap<String, String>) -> Option<String> {
+        let mut tag_result = None;
+        for (tag, tag_rules) in &self.tag_rules {
+            for (key, value) in tag_rules {
+                match params.get(key.as_str()) {
+                    None => {}
+                    Some(val) => {
+                        if val == value {
+                            tag_result = Some(tag.clone())
+                        }
+                    }
+                }
+            }
+        }
+        tag_result
+    }
+
+    pub fn route(&self, invokers: Vec<Url>, url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
+        if !self.enabled {
+            return invokers;
+        };
+        let self_param = to_original_map(url);
+        let invocation_tag = self.match_tag(self_param);
+        let mut invokers_result = Vec::new();
+        let mut invokers_no_tag = Vec::new();
+        for invoker in &invokers {
+            let invoker_param = to_original_map(invoker.clone());
+            let invoker_tag = self.match_tag(invoker_param);
+            if invoker_tag == None {
+                invokers_no_tag.push(invoker.clone());
+            }
+            if invoker_tag == invocation_tag {
+                invokers_result.push(invoker.clone());
+            }
+        }
+        if invokers_result.is_empty() {
+            if !self.force {
+                return invokers_no_tag;
+            }
+        }
+        invokers_result
+    }
+}
diff --git a/dubbo/src/cluster/router/utils.rs b/dubbo/src/cluster/router/utils.rs
new file mode 100644
index 0000000..eca98f6
--- /dev/null
+++ b/dubbo/src/cluster/router/utils.rs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use dubbo_base::Url;
+use std::{collections::HashMap, string::String};
+
+pub fn to_original_map(url: Url) -> HashMap<String, String> {
+    let mut result: HashMap<String, String> = HashMap::new();
+    result.insert("scheme".parse().unwrap(), url.scheme);
+    result.insert("location".parse().unwrap(), url.location);
+    result.insert("ip".parse().unwrap(), url.ip);
+    result.insert("port".parse().unwrap(), url.port);
+    result.insert("service_name".parse().unwrap(), url.service_name);
+    result.insert("service_key".parse().unwrap(), url.service_key);
+    for (key, value) in url.params {
+        result.insert(key, value);
+    }
+    result
+}
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 8d95c21..77f8f5a 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -22,16 +22,15 @@
 
 pub use async_trait::async_trait;
 pub use bytes::Bytes;
+pub use dubbo_base::StdError;
 pub use http_body::Body;
 pub use hyper::Body as hyperBody;
 pub use tower_service::Service;
 
 pub use super::{
-    cluster::directory::RegistryDirectory,
     empty_body,
     invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
     protocol::{triple::triple_invoker::TripleInvoker, Invoker},
-    registry::{BoxRegistry, Registry},
     triple::{
         client::TripleClient,
         codec::{prost::ProstCodec, serde_codec::SerdeCodec, Codec},
@@ -41,13 +40,12 @@
             TripleServer,
         },
     },
-    BoxBody, BoxFuture, StdError,
+    BoxBody, BoxFuture,
 };
 pub use crate::{
     filter::{service::FilterService, Filter},
     triple::{
-        client::builder::{ClientBoxService, ClientBuilder},
-        server::builder::ServerBuilder,
+        client::builder::ClientBuilder, server::builder::ServerBuilder,
         transport::connection::Connection,
     },
 };
diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs
new file mode 100644
index 0000000..ace67e6
--- /dev/null
+++ b/dubbo/src/directory/mod.rs
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{
+    collections::HashMap,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+use crate::{
+    codegen::{RpcInvocation, TripleInvoker},
+    invocation::Invocation,
+    invoker::{clone_invoker::CloneInvoker, NewInvoker},
+    param::Param,
+    svc::NewService,
+};
+use dubbo_base::{StdError, Url};
+use dubbo_logger::tracing::{debug, error};
+use futures_util::future;
+use tokio::sync::mpsc::channel;
+use tokio_stream::wrappers::ReceiverStream;
+use tower::{
+    buffer::Buffer,
+    discover::{Change, Discover},
+    ServiceExt,
+};
+
+use crate::extension::registry_extension::{proxy::RegistryProxy, Registry};
+use dubbo_base::registry_param::InterfaceName;
+use tower_service::Service;
+
+type BufferedDirectory =
+    Buffer<Directory<ReceiverStream<Result<Change<String, ()>, StdError>>>, ()>;
+
+pub struct NewCachedDirectory<N>
+where
+    N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+    <N as Service<()>>::Future: Send + 'static,
+{
+    inner: CachedDirectory<NewDirectory<N>, RpcInvocation>,
+}
+
+pub struct CachedDirectory<N, T>
+where
+    // NewDirectory
+    N: NewService<T>,
+{
+    inner: N,
+    cache: Arc<Mutex<HashMap<String, N::Service>>>,
+}
+
+pub struct NewDirectory<N> {
+    // registry
+    inner: N,
+}
+
+pub struct Directory<D> {
+    directory: HashMap<String, CloneInvoker<TripleInvoker>>,
+    discover: D,
+    new_invoker: NewInvoker,
+}
+
+impl<N> NewCachedDirectory<N>
+where
+    N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+    <N as Service<()>>::Future: Send + 'static,
+{
+    pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+        tower_layer::layer_fn(|inner: N| {
+            NewCachedDirectory {
+                // inner is registry
+                inner: CachedDirectory::new(NewDirectory::new(inner)),
+            }
+        })
+    }
+}
+
+impl<N, T> NewService<T> for NewCachedDirectory<N>
+where
+    T: Param<RpcInvocation>,
+    // service registry
+    N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+    <N as Service<()>>::Future: Send + 'static,
+{
+    type Service = BufferedDirectory;
+
+    fn new_service(&self, target: T) -> Self::Service {
+        self.inner.new_service(target.param())
+    }
+}
+
+impl<N, T> CachedDirectory<N, T>
+where
+    N: NewService<T>,
+{
+    pub fn new(inner: N) -> Self {
+        CachedDirectory {
+            inner,
+            cache: Default::default(),
+        }
+    }
+}
+
+impl<N, T> NewService<T> for CachedDirectory<N, T>
+where
+    T: Param<RpcInvocation>,
+    // NewDirectory
+    N: NewService<T>,
+    // Buffered directory
+    N::Service: Clone,
+{
+    type Service = N::Service;
+
+    fn new_service(&self, target: T) -> Self::Service {
+        let rpc_invocation = target.param();
+        let service_name = rpc_invocation.get_target_service_unique_name();
+        let mut cache = self.cache.lock().expect("cached directory lock failed.");
+        let value = cache.get(&service_name).map(|val| val.clone());
+        match value {
+            None => {
+                let new_service = self.inner.new_service(target);
+                cache.insert(service_name, new_service.clone());
+                new_service
+            }
+            Some(value) => value,
+        }
+    }
+}
+
+impl<N> NewDirectory<N> {
+    const MAX_DIRECTORY_BUFFER_SIZE: usize = 16;
+
+    pub fn new(inner: N) -> Self {
+        NewDirectory { inner }
+    }
+}
+
+impl<N, T> NewService<T> for NewDirectory<N>
+where
+    T: Param<RpcInvocation>,
+    // service registry
+    N: Service<(), Response = RegistryProxy> + Send + Clone + 'static,
+    <N as Service<()>>::Future: Send + 'static,
+{
+    type Service = BufferedDirectory;
+
+    fn new_service(&self, target: T) -> Self::Service {
+        let service_name = target.param().get_target_service_unique_name();
+
+        let fut = self.inner.clone().oneshot(());
+
+        let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE);
+
+        tokio::spawn(async move {
+            // todo use dubbo url model generate subscribe url
+            // category:serviceInterface:version:group
+            let consumer_url = format!("consumer://{}/{}", "127.0.0.1:8888", service_name);
+            let mut subscribe_url: Url = consumer_url.parse().unwrap();
+            subscribe_url.add_query_param(InterfaceName::new(service_name));
+
+            let Ok(registry) = fut.await else {
+                error!("registry extension load failed.");
+                return;
+            };
+
+            let receiver = registry.subscribe(subscribe_url).await;
+            debug!("discover start!");
+            match receiver {
+                Err(_e) => {
+                    // error!("discover stream error: {}", e);
+                    debug!("discover stream error");
+                }
+                Ok(mut receiver) => loop {
+                    let change = receiver.recv().await;
+                    debug!("receive change: {:?}", change);
+                    match change {
+                        None => {
+                            debug!("discover stream closed.");
+                            break;
+                        }
+                        Some(change) => {
+                            let _ = tx.send(change).await;
+                        }
+                    }
+                },
+            }
+        });
+
+        Buffer::new(
+            Directory::new(ReceiverStream::new(rx)),
+            Self::MAX_DIRECTORY_BUFFER_SIZE,
+        )
+    }
+}
+
+impl<D> Directory<D> {
+    pub fn new(discover: D) -> Self {
+        Directory {
+            directory: Default::default(),
+            discover,
+            new_invoker: NewInvoker,
+        }
+    }
+}
+
+impl<D> Service<()> for Directory<D>
+where
+    // Discover
+    D: Discover<Key = String> + Unpin + Send,
+    D::Error: Into<StdError>,
+{
+    type Response = Vec<CloneInvoker<TripleInvoker>>;
+
+    type Error = StdError;
+
+    type Future = future::Ready<Result<Self::Response, Self::Error>>;
+
+    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        loop {
+            let pin_discover = Pin::new(&mut self.discover);
+
+            match pin_discover.poll_discover(cx) {
+                Poll::Pending => {
+                    if self.directory.is_empty() {
+                        return Poll::Pending;
+                    } else {
+                        return Poll::Ready(Ok(()));
+                    }
+                }
+                Poll::Ready(change) => {
+                    let change = change.transpose().map_err(|e| e.into())?;
+                    match change {
+                        Some(Change::Remove(key)) => {
+                            debug!("remove key: {}", key);
+                            self.directory.remove(&key);
+                        }
+                        Some(Change::Insert(key, _)) => {
+                            debug!("insert key: {}", key);
+                            let invoker = self.new_invoker.new_service(key.clone());
+                            self.directory.insert(key, invoker);
+                        }
+                        None => {
+                            debug!("stream closed");
+                            return Poll::Ready(Ok(()));
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    fn call(&mut self, _: ()) -> Self::Future {
+        let vec = self
+            .directory
+            .values()
+            .map(|val| val.clone())
+            .collect::<Vec<CloneInvoker<TripleInvoker>>>();
+        future::ok(vec)
+    }
+}
diff --git a/dubbo/src/extension/mod.rs b/dubbo/src/extension/mod.rs
new file mode 100644
index 0000000..c1d0395
--- /dev/null
+++ b/dubbo/src/extension/mod.rs
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod registry_extension;
+
+use crate::{
+    extension::registry_extension::proxy::RegistryProxy, registry::registry::StaticRegistry,
+};
+use dubbo_base::{extension_param::ExtensionType, url::UrlParam, StdError, Url};
+use dubbo_logger::tracing::{error, info};
+use std::{future::Future, pin::Pin, sync::Arc};
+use thiserror::Error;
+use tokio::sync::{oneshot, RwLock};
+
+pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectoryCommander> =
+    once_cell::sync::Lazy::new(|| ExtensionDirectory::init());
+
+#[derive(Default)]
+struct ExtensionDirectory {
+    registry_extension_loader: registry_extension::RegistryExtensionLoader,
+}
+
+impl ExtensionDirectory {
+    fn init() -> ExtensionDirectoryCommander {
+        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExtensionOpt>(64);
+
+        tokio::spawn(async move {
+            let mut extension_directory = ExtensionDirectory::default();
+
+            // register static registry extension
+            let _ = extension_directory.register(
+                StaticRegistry::name(),
+                StaticRegistry::convert_to_extension_factories(),
+                ExtensionType::Registry,
+            );
+
+            while let Some(extension_opt) = rx.recv().await {
+                match extension_opt {
+                    ExtensionOpt::Register(
+                        extension_name,
+                        extension_factories,
+                        extension_type,
+                        tx,
+                    ) => {
+                        let result = extension_directory.register(
+                            extension_name,
+                            extension_factories,
+                            extension_type,
+                        );
+                        let _ = tx.send(result);
+                    }
+                    ExtensionOpt::Remove(extension_name, extension_type, tx) => {
+                        let result = extension_directory.remove(extension_name, extension_type);
+                        let _ = tx.send(result);
+                    }
+                    ExtensionOpt::Load(url, extension_type, tx) => {
+                        let _ = extension_directory.load(url, extension_type, tx);
+                    }
+                }
+            }
+        });
+
+        ExtensionDirectoryCommander { sender: tx }
+    }
+
+    fn register(
+        &mut self,
+        extension_name: String,
+        extension_factories: ExtensionFactories,
+        extension_type: ExtensionType,
+    ) -> Result<(), StdError> {
+        match extension_type {
+            ExtensionType::Registry => match extension_factories {
+                ExtensionFactories::RegistryExtensionFactory(registry_extension_factory) => {
+                    self.registry_extension_loader
+                        .register(extension_name, registry_extension_factory);
+                    Ok(())
+                }
+            },
+        }
+    }
+
+    fn remove(
+        &mut self,
+        extension_name: String,
+        extension_type: ExtensionType,
+    ) -> Result<(), StdError> {
+        match extension_type {
+            ExtensionType::Registry => {
+                self.registry_extension_loader.remove(extension_name);
+                Ok(())
+            }
+        }
+    }
+
+    fn load(
+        &mut self,
+        url: Url,
+        extension_type: ExtensionType,
+        callback: oneshot::Sender<Result<Extensions, StdError>>,
+    ) {
+        match extension_type {
+            ExtensionType::Registry => {
+                let extension = self.registry_extension_loader.load(url);
+                match extension {
+                    Ok(mut extension) => {
+                        tokio::spawn(async move {
+                            let extension = extension.resolve().await;
+                            match extension {
+                                Ok(extension) => {
+                                    let _ = callback.send(Ok(Extensions::Registry(extension)));
+                                }
+                                Err(err) => {
+                                    error!("load extension failed: {}", err);
+                                    let _ = callback.send(Err(err));
+                                }
+                            }
+                        });
+                    }
+                    Err(err) => {
+                        error!("load extension failed: {}", err);
+                        let _ = callback.send(Err(err));
+                    }
+                }
+            }
+        }
+    }
+}
+
+type ExtensionCreator<T> = Box<
+    dyn Fn(Url) -> Pin<Box<dyn Future<Output = Result<T, StdError>> + Send + 'static>>
+        + Send
+        + Sync
+        + 'static,
+>;
+pub(crate) struct ExtensionPromiseResolver<T> {
+    resolved_data: Option<T>,
+    creator: ExtensionCreator<T>,
+    url: Url,
+}
+
+impl<T> ExtensionPromiseResolver<T>
+where
+    T: Send + Clone + 'static,
+{
+    fn new(creator: ExtensionCreator<T>, url: Url) -> Self {
+        ExtensionPromiseResolver {
+            resolved_data: None,
+            creator,
+            url,
+        }
+    }
+
+    fn resolved_data(&self) -> Option<T> {
+        self.resolved_data.clone()
+    }
+
+    async fn resolve(&mut self) -> Result<T, StdError> {
+        match (self.creator)(self.url.clone()).await {
+            Ok(data) => {
+                self.resolved_data = Some(data.clone());
+                Ok(data)
+            }
+            Err(err) => {
+                error!("create extension failed: {}", err);
+                Err(LoadExtensionError::new(
+                    "load extension failed, create extension occur an error".to_string(),
+                )
+                .into())
+            }
+        }
+    }
+}
+
+pub(crate) struct LoadExtensionPromise<T> {
+    resolver: Arc<RwLock<ExtensionPromiseResolver<T>>>,
+}
+
+impl<T> LoadExtensionPromise<T>
+where
+    T: Send + Clone + 'static,
+{
+    pub(crate) fn new(creator: ExtensionCreator<T>, url: Url) -> Self {
+        let resolver = ExtensionPromiseResolver::new(creator, url);
+        LoadExtensionPromise {
+            resolver: Arc::new(RwLock::new(resolver)),
+        }
+    }
+
+    pub(crate) async fn resolve(&mut self) -> Result<T, StdError> {
+        // get read lock
+        let resolver_read_lock = self.resolver.read().await;
+        // if extension is not None, return it
+        if let Some(extension) = resolver_read_lock.resolved_data() {
+            return Ok(extension);
+        }
+        drop(resolver_read_lock);
+
+        let mut write_lock = self.resolver.write().await;
+
+        match write_lock.resolved_data() {
+            Some(extension) => Ok(extension),
+            None => {
+                let extension = write_lock.resolve().await;
+                extension
+            }
+        }
+    }
+}
+
+impl<T> Clone for LoadExtensionPromise<T> {
+    fn clone(&self) -> Self {
+        LoadExtensionPromise {
+            resolver: self.resolver.clone(),
+        }
+    }
+}
+
+pub struct ExtensionDirectoryCommander {
+    sender: tokio::sync::mpsc::Sender<ExtensionOpt>,
+}
+
+impl ExtensionDirectoryCommander {
+    #[allow(private_bounds)]
+    pub async fn register<T>(&self) -> Result<(), StdError>
+    where
+        T: Extension,
+        T: ExtensionMetaInfo,
+        T: ConvertToExtensionFactories,
+    {
+        let extension_name = T::name();
+        let extension_factories = T::convert_to_extension_factories();
+        let extension_type = T::extension_type();
+
+        info!(
+            "register extension: {}, type: {}",
+            extension_name,
+            extension_type.as_str()
+        );
+
+        let (tx, rx) = oneshot::channel();
+
+        let send = self
+            .sender
+            .send(ExtensionOpt::Register(
+                extension_name.clone(),
+                extension_factories,
+                extension_type,
+                tx,
+            ))
+            .await;
+
+        let Ok(_) = send else {
+            let err_msg = format!("register extension {} failed", extension_name);
+            return Err(RegisterExtensionError::new(err_msg).into());
+        };
+
+        let ret = rx.await;
+
+        match ret {
+            Ok(_) => Ok(()),
+            Err(_) => {
+                let err_msg = format!("register extension {} failed", extension_name);
+                Err(RegisterExtensionError::new(err_msg).into())
+            }
+        }
+    }
+
+    #[allow(private_bounds)]
+    pub async fn remove<T>(&self) -> Result<(), StdError>
+    where
+        T: Extension,
+        T: ExtensionMetaInfo,
+    {
+        let extension_name = T::name();
+        let extension_type = T::extension_type();
+
+        info!(
+            "remove extension: {}, type: {}",
+            extension_name,
+            extension_type.as_str()
+        );
+
+        let (tx, rx) = oneshot::channel();
+
+        let send = self
+            .sender
+            .send(ExtensionOpt::Remove(
+                extension_name.clone(),
+                extension_type,
+                tx,
+            ))
+            .await;
+
+        let Ok(_) = send else {
+            let err_msg = format!("remove extension {} failed", extension_name);
+            return Err(RemoveExtensionError::new(err_msg).into());
+        };
+
+        let ret = rx.await;
+
+        match ret {
+            Ok(_) => Ok(()),
+            Err(_) => {
+                let err_msg = format!("remove extension {} failed", extension_name);
+                Err(RemoveExtensionError::new(err_msg).into())
+            }
+        }
+    }
+
+    pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, StdError> {
+        let url_str = url.to_string();
+        info!("load registry extension: {}", url_str);
+
+        let (tx, rx) = oneshot::channel();
+
+        let send = self
+            .sender
+            .send(ExtensionOpt::Load(url, ExtensionType::Registry, tx))
+            .await;
+
+        let Ok(_) = send else {
+            let err_msg = format!("load registry extension failed: {}", url_str);
+            return Err(LoadExtensionError::new(err_msg).into());
+        };
+
+        let extensions = rx.await;
+
+        let Ok(extension) = extensions else {
+            let err_msg = format!("load registry extension failed: {}", url_str);
+            return Err(LoadExtensionError::new(err_msg).into());
+        };
+
+        let Ok(extensions) = extension else {
+            let err_msg = format!("load registry extension failed: {}", url_str);
+            return Err(LoadExtensionError::new(err_msg).into());
+        };
+
+        match extensions {
+            Extensions::Registry(proxy) => Ok(proxy),
+        }
+    }
+}
+
+enum ExtensionOpt {
+    Register(
+        String,
+        ExtensionFactories,
+        ExtensionType,
+        oneshot::Sender<Result<(), StdError>>,
+    ),
+    Remove(String, ExtensionType, oneshot::Sender<Result<(), StdError>>),
+    Load(
+        Url,
+        ExtensionType,
+        oneshot::Sender<Result<Extensions, StdError>>,
+    ),
+}
+
+pub(crate) trait Sealed {}
+
+#[allow(private_bounds)]
+#[async_trait::async_trait]
+pub trait Extension: Sealed {
+    type Target;
+
+    fn name() -> String;
+
+    async fn create(url: Url) -> Result<Self::Target, StdError>;
+}
+
+#[allow(private_bounds)]
+pub(crate) trait ExtensionMetaInfo {
+    fn extension_type() -> ExtensionType;
+}
+
+pub(crate) enum Extensions {
+    Registry(RegistryProxy),
+}
+
+pub(crate) enum ExtensionFactories {
+    RegistryExtensionFactory(registry_extension::RegistryExtensionFactory),
+}
+
+#[allow(private_bounds)]
+pub(crate) trait ConvertToExtensionFactories {
+    fn convert_to_extension_factories() -> ExtensionFactories;
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub(crate) struct RegisterExtensionError(String);
+
+impl RegisterExtensionError {
+    pub fn new(msg: String) -> Self {
+        RegisterExtensionError(msg)
+    }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub struct RemoveExtensionError(String);
+
+impl RemoveExtensionError {
+    pub fn new(msg: String) -> Self {
+        RemoveExtensionError(msg)
+    }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub struct LoadExtensionError(String);
+
+impl LoadExtensionError {
+    pub fn new(msg: String) -> Self {
+        LoadExtensionError(msg)
+    }
+}
diff --git a/dubbo/src/extension/registry_extension.rs b/dubbo/src/extension/registry_extension.rs
new file mode 100644
index 0000000..b1f75ba
--- /dev/null
+++ b/dubbo/src/extension/registry_extension.rs
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{collections::HashMap, future::Future, pin::Pin};
+
+use async_trait::async_trait;
+use thiserror::Error;
+use tokio::sync::mpsc::Receiver;
+use tower::discover::Change;
+
+use dubbo_base::{
+    extension_param::ExtensionName, registry_param::RegistryUrl, url::UrlParam, StdError, Url,
+};
+use proxy::RegistryProxy;
+
+use crate::extension::{
+    ConvertToExtensionFactories, Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType,
+    LoadExtensionPromise,
+};
+
+// extension://0.0.0.0/?extension-type=registry&extension-name=nacos&registry-url=nacos://127.0.0.1:8848
+pub fn to_extension_url(registry_url: Url) -> Url {
+    let mut registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap();
+
+    let protocol = registry_url.protocol();
+
+    registry_extension_loader_url.add_query_param(ExtensionType::Registry);
+    registry_extension_loader_url.add_query_param(ExtensionName::new(protocol.to_string()));
+    registry_extension_loader_url.add_query_param(RegistryUrl::new(registry_url));
+
+    registry_extension_loader_url
+}
+
+pub type ServiceChange = Change<String, ()>;
+pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
+
+#[async_trait]
+pub trait Registry {
+    async fn register(&self, url: Url) -> Result<(), StdError>;
+
+    async fn unregister(&self, url: Url) -> Result<(), StdError>;
+
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
+
+    async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
+
+    fn url(&self) -> &Url;
+}
+
+impl<T> crate::extension::Sealed for T where T: Registry + Send + Sync + 'static {}
+
+impl<T> ExtensionMetaInfo for T
+where
+    T: Registry + Send + Sync + 'static,
+    T: Extension<Target = Box<dyn Registry + Send + Sync + 'static>>,
+{
+    fn extension_type() -> ExtensionType {
+        ExtensionType::Registry
+    }
+}
+
+impl<T> ConvertToExtensionFactories for T
+where
+    T: Registry + Send + Sync + 'static,
+    T: Extension<Target = Box<dyn Registry + Send + Sync + 'static>>,
+{
+    fn convert_to_extension_factories() -> ExtensionFactories {
+        ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(
+            <T as Extension>::create,
+        ))
+    }
+}
+
+#[derive(Default)]
+pub(super) struct RegistryExtensionLoader {
+    factories: HashMap<String, RegistryExtensionFactory>,
+}
+
+impl RegistryExtensionLoader {
+    pub(crate) fn register(&mut self, extension_name: String, factory: RegistryExtensionFactory) {
+        self.factories.insert(extension_name, factory);
+    }
+
+    pub(crate) fn remove(&mut self, extension_name: String) {
+        self.factories.remove(&extension_name);
+    }
+
+    pub(crate) fn load(
+        &mut self,
+        url: Url,
+    ) -> Result<LoadExtensionPromise<RegistryProxy>, StdError> {
+        let extension_name = url.query::<ExtensionName>().unwrap();
+        let extension_name = extension_name.value();
+        let factory = self.factories.get_mut(&extension_name).ok_or_else(|| {
+            RegistryExtensionLoaderError::new(format!(
+                "registry extension loader error: extension name {} not found",
+                extension_name
+            ))
+        })?;
+        factory.create(url)
+    }
+}
+
+type RegistryConstructor = fn(
+    Url,
+) -> Pin<
+    Box<dyn Future<Output = Result<Box<dyn Registry + Send + Sync + 'static>, StdError>> + Send>,
+>;
+
+pub(crate) struct RegistryExtensionFactory {
+    constructor: RegistryConstructor,
+    instances: HashMap<String, LoadExtensionPromise<RegistryProxy>>,
+}
+
+impl RegistryExtensionFactory {
+    pub(super) fn new(constructor: RegistryConstructor) -> Self {
+        Self {
+            constructor,
+            instances: HashMap::new(),
+        }
+    }
+}
+
+impl RegistryExtensionFactory {
+    pub(super) fn create(
+        &mut self,
+        url: Url,
+    ) -> Result<LoadExtensionPromise<RegistryProxy>, StdError> {
+        let registry_url = url.query::<RegistryUrl>().unwrap();
+        let registry_url = registry_url.value();
+        let url_str = registry_url.as_str().to_string();
+        match self.instances.get(&url_str) {
+            Some(proxy) => {
+                let proxy = proxy.clone();
+                Ok(proxy)
+            }
+            None => {
+                let constructor = self.constructor;
+
+                let creator = move |url: Url| {
+                    let registry = constructor(url);
+                    Box::pin(async move {
+                        let registry = registry.await?;
+                        let proxy = <RegistryProxy as From<Box<dyn Registry + Send + Sync>>>::from(
+                            registry,
+                        );
+                        Ok(proxy)
+                    })
+                        as Pin<
+                            Box<
+                                dyn Future<Output = Result<RegistryProxy, StdError>>
+                                    + Send
+                                    + 'static,
+                            >,
+                        >
+                };
+
+                let promise = LoadExtensionPromise::new(Box::new(creator), url);
+                self.instances.insert(url_str, promise.clone());
+                Ok(promise)
+            }
+        }
+    }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub(crate) struct RegistryExtensionLoaderError(String);
+
+impl RegistryExtensionLoaderError {
+    pub(crate) fn new(msg: String) -> Self {
+        RegistryExtensionLoaderError(msg)
+    }
+}
+
+pub mod proxy {
+    use async_trait::async_trait;
+    use thiserror::Error;
+    use tokio::sync::oneshot;
+
+    use dubbo_base::{StdError, Url};
+    use dubbo_logger::tracing::error;
+
+    use crate::extension::registry_extension::{DiscoverStream, Registry};
+
+    pub(super) enum RegistryOpt {
+        Register(Url, oneshot::Sender<Result<(), StdError>>),
+        Unregister(Url, oneshot::Sender<Result<(), StdError>>),
+        Subscribe(Url, oneshot::Sender<Result<DiscoverStream, StdError>>),
+        UnSubscribe(Url, oneshot::Sender<Result<(), StdError>>),
+    }
+
+    #[derive(Clone)]
+    pub struct RegistryProxy {
+        sender: tokio::sync::mpsc::Sender<RegistryOpt>,
+        url: Url,
+    }
+
+    #[async_trait]
+    impl Registry for RegistryProxy {
+        async fn register(&self, url: Url) -> Result<(), StdError> {
+            let (tx, rx) = oneshot::channel();
+
+            match self
+                .sender
+                .send(RegistryOpt::Register(url.clone(), tx))
+                .await
+            {
+                Ok(_) => match rx.await {
+                    Ok(result) => result,
+                    Err(_) => {
+                        error!(
+                            "registry proxy error: receive register response failed, url: {}",
+                            url
+                        );
+                        return Err(
+                            RegistryProxyError::new("receive register response failed").into()
+                        );
+                    }
+                },
+                Err(_) => {
+                    error!(
+                        "registry proxy error: send register request failed, url: {}",
+                        url
+                    );
+                    return Err(RegistryProxyError::new("send register opt failed").into());
+                }
+            }
+        }
+
+        async fn unregister(&self, url: Url) -> Result<(), StdError> {
+            let (tx, rx) = oneshot::channel();
+            match self
+                .sender
+                .send(RegistryOpt::Unregister(url.clone(), tx))
+                .await
+            {
+                Ok(_) => match rx.await {
+                    Ok(result) => result,
+                    Err(_) => {
+                        error!(
+                            "registry proxy error: receive unregister response failed, url: {}",
+                            url
+                        );
+                        return Err(
+                            RegistryProxyError::new("receive unregister response failed").into(),
+                        );
+                    }
+                },
+                Err(_) => {
+                    error!(
+                        "registry proxy error: send unregister request failed, url: {}",
+                        url
+                    );
+                    return Err(RegistryProxyError::new("send unregister opt failed").into());
+                }
+            }
+        }
+
+        async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+            let (tx, rx) = oneshot::channel();
+
+            match self
+                .sender
+                .send(RegistryOpt::Subscribe(url.clone(), tx))
+                .await
+            {
+                Ok(_) => match rx.await {
+                    Ok(result) => result,
+                    Err(_) => {
+                        error!(
+                            "registry proxy error: receive subscribe response failed, url: {}",
+                            url
+                        );
+                        return Err(
+                            RegistryProxyError::new("receive subscribe response failed").into()
+                        );
+                    }
+                },
+                Err(_) => {
+                    error!(
+                        "registry proxy error: send subscribe request failed, url: {}",
+                        url
+                    );
+                    return Err(RegistryProxyError::new("send subscribe opt failed").into());
+                }
+            }
+        }
+
+        async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+            let (tx, rx) = oneshot::channel();
+            match self
+                .sender
+                .send(RegistryOpt::UnSubscribe(url.clone(), tx))
+                .await
+            {
+                Ok(_) => {
+                    match rx.await {
+                        Ok(result) => result,
+                        Err(_) => {
+                            error!("registry proxy error: receive unsubscribe response failed, url: {}", url);
+                            return Err(RegistryProxyError::new(
+                                "receive unsubscribe response failed",
+                            )
+                            .into());
+                        }
+                    }
+                }
+                Err(_) => {
+                    error!(
+                        "registry proxy error: send unsubscribe request failed, url: {}",
+                        url
+                    );
+                    return Err(RegistryProxyError::new("send unsubscribe opt failed").into());
+                }
+            }
+        }
+
+        fn url(&self) -> &Url {
+            &self.url
+        }
+    }
+
+    impl From<Box<dyn Registry + Send + Sync>> for RegistryProxy {
+        fn from(registry: Box<dyn Registry + Send + Sync>) -> Self {
+            let url = registry.url().clone();
+
+            let (sender, mut receiver) = tokio::sync::mpsc::channel(1024);
+
+            tokio::spawn(async move {
+                while let Some(opt) = receiver.recv().await {
+                    match opt {
+                        RegistryOpt::Register(url, tx) => {
+                            let register = registry.register(url).await;
+                            if let Err(_) = tx.send(register) {
+                                error!("registry proxy error: send register response failed");
+                            }
+                        }
+                        RegistryOpt::Unregister(url, tx) => {
+                            let unregister = registry.unregister(url).await;
+                            if let Err(_) = tx.send(unregister) {
+                                error!("registry proxy error: send unregister response failed");
+                            }
+                        }
+                        RegistryOpt::Subscribe(url, tx) => {
+                            let subscribe = registry.subscribe(url).await;
+                            if let Err(_) = tx.send(subscribe) {
+                                error!("registry proxy error: send subscribe response failed");
+                            }
+                        }
+                        RegistryOpt::UnSubscribe(url, tx) => {
+                            let unsubscribe = registry.unsubscribe(url).await;
+                            if let Err(_) = tx.send(unsubscribe) {
+                                error!("registry proxy error: send unsubscribe response failed");
+                            }
+                        }
+                    }
+                }
+            });
+
+            RegistryProxy { sender, url }
+        }
+    }
+
+    #[derive(Error, Debug)]
+    #[error("registry proxy error: {0}")]
+    pub(crate) struct RegistryProxyError(String);
+
+    impl RegistryProxyError {
+        pub(crate) fn new(msg: &str) -> Self {
+            RegistryProxyError(msg.to_string())
+        }
+    }
+}
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index d595f38..f3c6dc1 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -15,20 +15,13 @@
  * limitations under the License.
  */
 
-use std::{
-    collections::HashMap,
-    error::Error,
-    pin::Pin,
-    sync::{Arc, Mutex},
-};
+use std::{collections::HashMap, error::Error, pin::Pin};
 
 use crate::{
+    extension,
+    extension::registry_extension::Registry,
     protocol::{BoxExporter, Protocol},
-    registry::{
-        protocol::RegistryProtocol,
-        types::{Registries, RegistriesOperation},
-        BoxRegistry, Registry,
-    },
+    registry::protocol::RegistryProtocol,
 };
 use dubbo_base::Url;
 use dubbo_config::{get_global_config, protocol::ProtocolRetrieve, RootConfig};
@@ -40,7 +33,7 @@
 #[derive(Default)]
 pub struct Dubbo {
     protocols: HashMap<String, Vec<Url>>,
-    registries: Option<Registries>,
+    registries: Vec<Url>,
     service_registry: HashMap<String, Vec<Url>>, // registry: Urls
     config: Option<&'static RootConfig>,
 }
@@ -49,7 +42,7 @@
     pub fn new() -> Dubbo {
         Self {
             protocols: HashMap::new(),
-            registries: None,
+            registries: Vec::default(),
             service_registry: HashMap::new(),
             config: None,
         }
@@ -60,14 +53,10 @@
         self
     }
 
-    pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry) -> Self {
-        if self.registries.is_none() {
-            self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
-        }
-        self.registries
-            .as_ref()
-            .unwrap()
-            .insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
+    pub fn add_registry(mut self, registry: &str) -> Self {
+        let url: Url = registry.parse().unwrap();
+        let url = extension::registry_extension::to_extension_url(url);
+        self.registries.push(url);
         self
     }
 
@@ -88,10 +77,15 @@
                 let protocol = root_config
                     .protocols
                     .get_protocol_or_default(service_config.protocol.as_str());
-                let protocol_url =
-                    format!("{}/{}", protocol.to_url(), service_config.interface.clone(),);
+                let interface_name = service_config.interface.clone();
+                let protocol_url = format!(
+                    "{}/{}?interface={}",
+                    protocol.to_url(),
+                    interface_name,
+                    interface_name
+                );
                 tracing::info!("protocol_url: {:?}", protocol_url);
-                Url::from_url(&protocol_url)
+                protocol_url.parse().ok()
             } else {
                 return Err(format!("base {:?} not exists", service_config.protocol).into());
             };
@@ -117,9 +111,20 @@
         self.init().unwrap();
         tracing::info!("starting...");
         // TODO: server registry
+
+        let mut registry_extensions = Vec::new();
+
+        for registry_url in &self.registries {
+            let registry_url = registry_url.clone();
+            let registry_extension = extension::EXTENSIONS.load_registry(registry_url).await;
+            if let Ok(registry_extension) = registry_extension {
+                registry_extensions.push(registry_extension);
+            }
+        }
+
         let mem_reg = Box::new(
             RegistryProtocol::new()
-                .with_registries(self.registries.as_ref().unwrap().clone())
+                .with_registries(registry_extensions.clone())
                 .with_services(self.service_registry.clone()),
         );
         let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
@@ -128,14 +133,10 @@
                 tracing::info!("base: {:?}, service url: {:?}", name, url);
                 let exporter = mem_reg.clone().export(url.to_owned());
                 async_vec.push(exporter);
+
                 //TODO multiple registry
-                if self.registries.is_some() {
-                    self.registries
-                        .as_ref()
-                        .unwrap()
-                        .default_registry()
-                        .register(url.clone())
-                        .unwrap();
+                for registry_extension in &registry_extensions {
+                    let _ = registry_extension.register(url.clone()).await;
                 }
             }
         }
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index bdd152b..5750337 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -196,7 +196,7 @@
     fn get_method_name(&self) -> String;
 }
 
-#[derive(Default)]
+#[derive(Default, Clone)]
 pub struct RpcInvocation {
     target_service_unique_name: String,
     method_name: String,
diff --git a/dubbo/src/invoker/clone_body.rs b/dubbo/src/invoker/clone_body.rs
new file mode 100644
index 0000000..913910a
--- /dev/null
+++ b/dubbo/src/invoker/clone_body.rs
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{
+    collections::VecDeque,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use futures_core::ready;
+
+use dubbo_base::StdError;
+use http::HeaderMap;
+use http_body::Body;
+use pin_project::pin_project;
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+#[error("buffered body reach max capacity.")]
+pub struct ReachMaxCapacityError;
+
+pub struct BufferedBody {
+    shared: Arc<Mutex<Option<OwnedBufferedBody>>>,
+    owned: Option<OwnedBufferedBody>,
+    replay_body: bool,
+    replay_trailers: bool,
+    is_empty: bool,
+    size_hint: http_body::SizeHint,
+}
+
+pub struct OwnedBufferedBody {
+    body: hyper::Body,
+    trailers: Option<HeaderMap>,
+    buf: InnerBuffer,
+}
+
+impl BufferedBody {
+    pub fn new(body: hyper::Body, buf_size: usize) -> Self {
+        let size_hint = body.size_hint();
+        let is_empty = body.is_end_stream();
+        BufferedBody {
+            shared: Default::default(),
+            owned: Some(OwnedBufferedBody {
+                body,
+                trailers: None,
+                buf: InnerBuffer {
+                    bufs: Default::default(),
+                    capacity: buf_size,
+                },
+            }),
+            replay_body: false,
+            replay_trailers: false,
+            is_empty,
+            size_hint,
+        }
+    }
+}
+
+impl Clone for BufferedBody {
+    fn clone(&self) -> Self {
+        Self {
+            shared: self.shared.clone(),
+            owned: None,
+            replay_body: true,
+            replay_trailers: true,
+            is_empty: self.is_empty,
+            size_hint: self.size_hint.clone(),
+        }
+    }
+}
+
+impl Drop for BufferedBody {
+    fn drop(&mut self) {
+        if let Some(owned) = self.owned.take() {
+            let lock = self.shared.lock();
+            if let Ok(mut lock) = lock {
+                *lock = Some(owned);
+            }
+        }
+    }
+}
+
+impl Body for BufferedBody {
+    type Data = BytesData;
+    type Error = StdError;
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        let mut_self = self.get_mut();
+
+        let owned_body = mut_self.owned.get_or_insert_with(|| {
+            let lock = mut_self.shared.lock();
+            if let Err(e) = lock {
+                panic!("buffered body get shared data lock failed. {}", e);
+            }
+            let mut data = lock.unwrap();
+
+            data.take().expect("cannot get shared buffered body.")
+        });
+
+        if mut_self.replay_body {
+            mut_self.replay_body = false;
+            if owned_body.buf.has_remaining() {
+                return Poll::Ready(Some(Ok(BytesData::BufferedBytes(owned_body.buf.clone()))));
+            }
+
+            if owned_body.buf.is_capped() {
+                return Poll::Ready(Some(Err(ReachMaxCapacityError.into())));
+            }
+        }
+
+        if mut_self.is_empty {
+            return Poll::Ready(None);
+        }
+
+        let mut data = {
+            let pin = Pin::new(&mut owned_body.body);
+            let data = ready!(pin.poll_data(cx));
+            match data {
+                Some(Ok(data)) => data,
+                Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
+                None => {
+                    mut_self.is_empty = true;
+                    return Poll::Ready(None);
+                }
+            }
+        };
+
+        let len = data.remaining();
+
+        owned_body.buf.capacity = owned_body.buf.capacity.saturating_sub(len);
+
+        let data = if owned_body.buf.is_capped() {
+            if owned_body.buf.has_remaining() {
+                owned_body.buf.bufs = VecDeque::default();
+            }
+            data.copy_to_bytes(len)
+        } else {
+            owned_body.buf.push_bytes(data.copy_to_bytes(len))
+        };
+
+        Poll::Ready(Some(Ok(BytesData::OriginBytes(data))))
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+        let mut_self = self.get_mut();
+        let owned_body = mut_self.owned.get_or_insert_with(|| {
+            let lock = mut_self.shared.lock();
+            if let Err(e) = lock {
+                panic!("buffered body get shared data lock failed. {}", e);
+            }
+            let mut data = lock.unwrap();
+
+            data.take().expect("cannot get shared buffered body.")
+        });
+
+        if mut_self.replay_trailers {
+            mut_self.replay_trailers = false;
+            if let Some(ref trailers) = owned_body.trailers {
+                return Poll::Ready(Ok(Some(trailers.clone())));
+            }
+        }
+
+        let mut_body = &mut owned_body.body;
+        if !mut_body.is_end_stream() {
+            let trailers = ready!(Pin::new(mut_body).poll_trailers(cx)).map(|trailers| {
+                owned_body.trailers = trailers.clone();
+                trailers
+            });
+            return Poll::Ready(trailers.map_err(|e| e.into()));
+        }
+
+        Poll::Ready(Ok(None))
+    }
+
+    fn is_end_stream(&self) -> bool {
+        if self.is_empty {
+            return true;
+        }
+
+        let is_end = self
+            .owned
+            .as_ref()
+            .map(|owned| owned.body.is_end_stream())
+            .unwrap_or(false);
+
+        !self.replay_body && !self.replay_trailers && is_end
+    }
+
+    fn size_hint(&self) -> http_body::SizeHint {
+        self.size_hint.clone()
+    }
+}
+
+#[derive(Clone)]
+pub struct InnerBuffer {
+    bufs: VecDeque<Bytes>,
+    capacity: usize,
+}
+
+impl InnerBuffer {
+    pub fn push_bytes(&mut self, bytes: Bytes) -> Bytes {
+        self.bufs.push_back(bytes.clone());
+        bytes
+    }
+
+    pub fn is_capped(&self) -> bool {
+        self.capacity == 0
+    }
+}
+
+impl Buf for InnerBuffer {
+    fn remaining(&self) -> usize {
+        self.bufs.iter().map(|bytes| bytes.remaining()).sum()
+    }
+
+    fn chunk(&self) -> &[u8] {
+        self.bufs.front().map(Buf::chunk).unwrap_or(&[])
+    }
+
+    fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
+        if dst.is_empty() {
+            return 0;
+        }
+
+        let mut filled = 0;
+
+        for bytes in self.bufs.iter() {
+            filled += bytes.chunks_vectored(&mut dst[filled..])
+        }
+
+        filled
+    }
+
+    fn advance(&mut self, mut cnt: usize) {
+        while cnt > 0 {
+            let first = self.bufs.front_mut();
+            if first.is_none() {
+                break;
+            }
+            let first = first.unwrap();
+            let first_remaining = first.remaining();
+            if first_remaining > cnt {
+                first.advance(cnt);
+                break;
+            }
+
+            first.advance(first_remaining);
+            cnt = cnt - first_remaining;
+            self.bufs.pop_front();
+        }
+    }
+
+    fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes {
+        match self.bufs.front_mut() {
+            Some(buf) if len <= buf.remaining() => {
+                let bytes = buf.copy_to_bytes(len);
+                if buf.remaining() == 0 {
+                    self.bufs.pop_front();
+                }
+                bytes
+            }
+            _ => {
+                let mut bytes = BytesMut::with_capacity(len);
+                bytes.put(self.take(len));
+                bytes.freeze()
+            }
+        }
+    }
+}
+
+pub enum BytesData {
+    BufferedBytes(InnerBuffer),
+    OriginBytes(Bytes),
+}
+
+impl Buf for BytesData {
+    fn remaining(&self) -> usize {
+        match self {
+            BytesData::BufferedBytes(bytes) => bytes.remaining(),
+            BytesData::OriginBytes(bytes) => bytes.remaining(),
+        }
+    }
+
+    fn chunk(&self) -> &[u8] {
+        match self {
+            BytesData::BufferedBytes(bytes) => bytes.chunk(),
+            BytesData::OriginBytes(bytes) => bytes.chunk(),
+        }
+    }
+
+    fn advance(&mut self, cnt: usize) {
+        match self {
+            BytesData::BufferedBytes(bytes) => bytes.advance(cnt),
+            BytesData::OriginBytes(bytes) => bytes.advance(cnt),
+        }
+    }
+
+    fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes {
+        match self {
+            BytesData::BufferedBytes(bytes) => bytes.copy_to_bytes(len),
+            BytesData::OriginBytes(bytes) => bytes.copy_to_bytes(len),
+        }
+    }
+}
+
+#[pin_project]
+pub struct CloneBody(#[pin] BufferedBody);
+
+impl CloneBody {
+    pub fn new(inner_body: hyper::Body) -> Self {
+        let inner_body = BufferedBody::new(inner_body, 1024 * 64);
+        CloneBody(inner_body)
+    }
+}
+
+impl Body for CloneBody {
+    type Data = BytesData;
+
+    type Error = StdError;
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        self.project().0.poll_data(cx)
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+        self.project().0.poll_trailers(cx)
+    }
+
+    fn size_hint(&self) -> http_body::SizeHint {
+        self.0.size_hint()
+    }
+}
+
+impl Clone for CloneBody {
+    fn clone(&self) -> Self {
+        Self(self.0.clone())
+    }
+}
diff --git a/dubbo/src/invoker/clone_invoker.rs b/dubbo/src/invoker/clone_invoker.rs
new file mode 100644
index 0000000..557d76e
--- /dev/null
+++ b/dubbo/src/invoker/clone_invoker.rs
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{mem, pin::Pin, task::Poll};
+
+use dubbo_base::StdError;
+use dubbo_logger::tracing::debug;
+use futures_core::{future::BoxFuture, ready, Future, TryFuture};
+use futures_util::FutureExt;
+use pin_project::pin_project;
+use thiserror::Error;
+use tokio::{
+    sync::{
+        self,
+        watch::{Receiver, Sender},
+    },
+    task::JoinHandle,
+};
+use tokio_util::sync::ReusableBoxFuture;
+use tower::{buffer::Buffer, ServiceExt};
+use tower_service::Service;
+
+use super::clone_body::CloneBody;
+
+enum Inner<S> {
+    Invalid,
+    Ready(S),
+    Pending(JoinHandle<Result<S, (S, StdError)>>),
+}
+
+#[derive(Debug, Error)]
+#[error("the inner service has not got ready yet!")]
+struct InnerServiceNotReadyErr;
+
+#[pin_project(project = InnerServiceCallingResponseProj)]
+enum InnerServiceCallingResponse<Fut> {
+    Call(#[pin] Fut),
+    Fail,
+}
+
+impl<Fut> Future for InnerServiceCallingResponse<Fut>
+where
+    Fut: TryFuture,
+    Fut::Error: Into<StdError>,
+{
+    type Output = Result<Fut::Ok, StdError>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+        match self.project() {
+            InnerServiceCallingResponseProj::Call(call) => call.try_poll(cx).map_err(Into::into),
+            InnerServiceCallingResponseProj::Fail => {
+                Poll::Ready(Err(InnerServiceNotReadyErr.into()))
+            }
+        }
+    }
+}
+
+#[derive(Clone)]
+enum ObserveState {
+    Ready,
+    Pending,
+}
+
+struct ReadyService<S> {
+    inner: Inner<S>,
+    tx: Sender<ObserveState>,
+}
+
+impl<S> ReadyService<S> {
+    fn new(inner: S) -> (Self, Receiver<ObserveState>) {
+        let (tx, rx) = sync::watch::channel(ObserveState::Ready);
+        let ready_service = Self {
+            inner: Inner::Ready(inner),
+            tx,
+        };
+        (ready_service, rx)
+    }
+}
+
+impl<S, Req> Service<Req> for ReadyService<S>
+where
+    S: Service<Req> + Send + 'static,
+    <S as Service<Req>>::Error: Into<StdError>,
+{
+    type Response = S::Response;
+
+    type Error = StdError;
+
+    type Future = InnerServiceCallingResponse<S::Future>;
+
+    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+        loop {
+            match mem::replace(&mut self.inner, Inner::Invalid) {
+                Inner::Ready(mut svc) => {
+                    let poll_ready = svc.poll_ready(cx);
+                    match poll_ready {
+                        Poll::Pending => {
+                            self.inner = Inner::Pending(tokio::spawn(async move {
+                                let poll_ready = svc.ready().await;
+                                match poll_ready {
+                                    Ok(_) => Ok(svc),
+                                    Err(err) => Err((svc, err.into())),
+                                }
+                            }));
+
+                            let _ = self.tx.send(ObserveState::Pending);
+                            continue;
+                        }
+                        Poll::Ready(ret) => {
+                            self.inner = Inner::Ready(svc);
+
+                            let _ = self.tx.send(ObserveState::Ready);
+                            return Poll::Ready(ret.map_err(Into::into));
+                        }
+                    }
+                }
+                Inner::Pending(mut join_handle) => {
+                    if let Poll::Ready(res) = join_handle.poll_unpin(cx) {
+                        let (svc, res) = match res {
+                            Err(join_err) => panic!("ReadyService panicked: {join_err}"),
+                            Ok(Err((svc, err))) => (svc, Poll::Ready(Err(err))),
+                            Ok(Ok(svc)) => (svc, Poll::Ready(Ok(()))),
+                        };
+
+                        self.inner = Inner::Ready(svc);
+
+                        let _ = self.tx.send(ObserveState::Ready);
+                        return res;
+                    } else {
+                        self.inner = Inner::Pending(join_handle);
+
+                        let _ = self.tx.send(ObserveState::Pending);
+                        return Poll::Pending;
+                    }
+                }
+                Inner::Invalid => panic!("ReadyService panicked: inner state is invalid"),
+            }
+        }
+    }
+
+    fn call(&mut self, req: Req) -> Self::Future {
+        match self.inner {
+            Inner::Ready(ref mut svc) => InnerServiceCallingResponse::Call(svc.call(req)),
+            _ => InnerServiceCallingResponse::Fail,
+        }
+    }
+}
+
+impl<S> Drop for ReadyService<S> {
+    fn drop(&mut self) {
+        if let Inner::Pending(ref handler) = self.inner {
+            handler.abort();
+        }
+    }
+}
+
+pub struct CloneInvoker<Inv>
+where
+    Inv: Service<http::Request<CloneBody>> + Send + 'static,
+    Inv::Error: Into<StdError> + Send + Sync + 'static,
+    Inv::Future: Send,
+{
+    inner: Buffer<ReadyService<Inv>, http::Request<CloneBody>>,
+    rx: Receiver<ObserveState>,
+    poll: ReusableBoxFuture<'static, ObserveState>,
+    polling: bool,
+}
+
+impl<Inv> CloneInvoker<Inv>
+where
+    Inv: Service<http::Request<CloneBody>> + Send + 'static,
+    Inv::Error: Into<StdError> + Send + Sync + 'static,
+    Inv::Future: Send,
+{
+    const MAX_INVOKER_BUFFER_SIZE: usize = 16;
+
+    pub fn new(invoker: Inv) -> Self {
+        let (ready_service, rx) = ReadyService::new(invoker);
+
+        let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> =
+            Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE);
+
+        Self {
+            inner: buffer,
+            rx,
+            polling: false,
+            poll: ReusableBoxFuture::new(futures::future::pending()),
+        }
+    }
+}
+
+impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv>
+where
+    Inv: Service<http::Request<CloneBody>> + Send + 'static,
+    Inv::Error: Into<StdError> + Send + Sync + 'static,
+    Inv::Future: Send,
+{
+    type Response = Inv::Response;
+
+    type Error = StdError;
+
+    type Future = BoxFuture<'static, Result<Self::Response, StdError>>;
+
+    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+        loop {
+            if !self.polling {
+                match self.rx.borrow().clone() {
+                    ObserveState::Ready => return self.inner.poll_ready(cx),
+                    ObserveState::Pending => {
+                        self.polling = true;
+                        let mut rx = self.rx.clone();
+                        self.poll.set(async move {
+                            loop {
+                                let current_state = rx.borrow_and_update().clone();
+                                if matches!(current_state, ObserveState::Ready) {
+                                    return current_state;
+                                }
+                                if let Err(_) = rx.changed().await {
+                                    debug!("the readyService has already shutdown!");
+                                    futures::future::pending::<ObserveState>().await;
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+
+            let state = ready!(self.poll.poll_unpin(cx));
+            self.polling = false;
+
+            if matches!(state, ObserveState::Pending) {
+                continue;
+            }
+
+            return self.inner.poll_ready(cx);
+        }
+    }
+
+    fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+        Box::pin(self.inner.call(req))
+    }
+}
+
+impl<Inv> Clone for CloneInvoker<Inv>
+where
+    Inv: Service<http::Request<CloneBody>> + Send + 'static,
+    Inv::Error: Into<StdError> + Send + Sync + 'static,
+    Inv::Future: Send,
+{
+    fn clone(&self) -> Self {
+        Self {
+            inner: self.inner.clone(),
+            rx: self.rx.clone(),
+            polling: false,
+            poll: ReusableBoxFuture::new(futures::future::pending()),
+        }
+    }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/invoker/mod.rs
similarity index 63%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/invoker/mod.rs
index 5a84af8..1c87c0e 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/invoker/mod.rs
@@ -14,5 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-pub mod random;
-pub mod roundrobin;
+use crate::{codegen::TripleInvoker, invoker::clone_invoker::CloneInvoker, svc::NewService};
+
+pub mod clone_body;
+pub mod clone_invoker;
+
+pub struct NewInvoker;
+
+impl NewService<String> for NewInvoker {
+    type Service = CloneInvoker<TripleInvoker>;
+
+    fn new_service(&self, url: String) -> Self::Service {
+        // todo create another invoker by url protocol
+
+        let url = url.parse().unwrap();
+        CloneInvoker::new(TripleInvoker::new(url))
+    }
+}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 63c09d3..1a521a2 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -18,12 +18,19 @@
 pub mod cluster;
 pub mod codegen;
 pub mod context;
+pub mod directory;
+pub mod extension;
 pub mod filter;
 mod framework;
 pub mod invocation;
+pub mod invoker;
+pub mod loadbalancer;
+pub mod param;
 pub mod protocol;
 pub mod registry;
+pub mod route;
 pub mod status;
+pub mod svc;
 pub mod triple;
 pub mod utils;
 
@@ -32,7 +39,6 @@
 
 pub use framework::Dubbo;
 
-pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
 pub type BoxFuture<T, E> = self::Pin<Box<dyn self::Future<Output = Result<T, E>> + Send + 'static>>;
 pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
 pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, self::status::Status>;
diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs
new file mode 100644
index 0000000..74f2217
--- /dev/null
+++ b/dubbo/src/loadbalancer/mod.rs
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use dubbo_base::StdError;
+use futures_core::future::BoxFuture;
+use tower::{discover::ServiceList, ServiceExt};
+use tower_service::Service;
+
+use crate::{
+    codegen::RpcInvocation,
+    invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker},
+    param::Param,
+    svc::NewService,
+};
+
+use crate::protocol::triple::triple_invoker::TripleInvoker;
+
+pub struct NewLoadBalancer<N> {
+    inner: N,
+}
+
+#[derive(Clone)]
+pub struct LoadBalancer<S> {
+    inner: S, // Routes service
+}
+
+impl<N> NewLoadBalancer<N> {
+    pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+        tower_layer::layer_fn(|inner| {
+            NewLoadBalancer {
+                inner, // NewRoutes
+            }
+        })
+    }
+}
+
+impl<N, T> NewService<T> for NewLoadBalancer<N>
+where
+    T: Param<RpcInvocation> + Clone,
+    // NewRoutes
+    N: NewService<T>,
+{
+    type Service = LoadBalancer<N::Service>;
+
+    fn new_service(&self, target: T) -> Self::Service {
+        // Routes service
+        let svc = self.inner.new_service(target);
+
+        LoadBalancer { inner: svc }
+    }
+}
+
+impl<N> Service<http::Request<CloneBody>> for LoadBalancer<N>
+where
+    // Routes service
+    N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Clone,
+    N::Error: Into<StdError> + Send,
+    N::Future: Send + 'static,
+{
+    type Response = <CloneInvoker<TripleInvoker> as Service<http::Request<CloneBody>>>::Response;
+
+    type Error = StdError;
+
+    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
+
+    fn poll_ready(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), Self::Error>> {
+        self.inner.poll_ready(cx).map_err(Into::into)
+    }
+
+    fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+        let routes = self.inner.call(());
+
+        let fut = async move {
+            let routes = routes.await;
+
+            let routes: Vec<CloneInvoker<TripleInvoker>> = match routes {
+                Err(e) => return Err(Into::<StdError>::into(e)),
+                Ok(routes) => routes,
+            };
+
+            let service_list: Vec<_> = routes
+                .into_iter()
+                .map(|invoker| tower::load::Constant::new(invoker, 1))
+                .collect();
+
+            let service_list = ServiceList::new(service_list);
+
+            let p2c = tower::balance::p2c::Balance::new(service_list);
+
+            p2c.oneshot(req).await
+        };
+
+        Box::pin(fut)
+    }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/param.rs
similarity index 82%
copy from dubbo/src/cluster/loadbalance/impls/mod.rs
copy to dubbo/src/param.rs
index 5a84af8..298c3b3 100644
--- a/dubbo/src/cluster/loadbalance/impls/mod.rs
+++ b/dubbo/src/param.rs
@@ -14,5 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-pub mod random;
-pub mod roundrobin;
+
+pub trait Param<T> {
+    fn param(&self) -> T;
+}
+
+impl<T: ToOwned> Param<T::Owned> for T {
+    fn param(&self) -> T::Owned {
+        self.to_owned()
+    }
+}
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 145bcc8..7dbf1f3 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -15,15 +15,10 @@
  * limitations under the License.
  */
 
-use std::{
-    fmt::Debug,
-    future::Future,
-    task::{Context, Poll},
-};
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 use aws_smithy_http::body::SdkBody;
-use dyn_clone::DynClone;
 use tower_service::Service;
 
 use dubbo_base::Url;
@@ -44,18 +39,8 @@
     fn unexport(&self);
 }
 
-pub trait Invoker<ReqBody>: Debug + DynClone {
-    type Response;
-
-    type Error;
-
-    type Future: Future<Output = Result<Self::Response, Self::Error>>;
-
+pub trait Invoker<ReqBody>: Service<ReqBody> {
     fn get_url(&self) -> Url;
-
-    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
-
-    fn call(&mut self, req: ReqBody) -> Self::Future;
 }
 
 pub type BoxExporter = Box<dyn Exporter + Send + Sync>;
@@ -69,15 +54,6 @@
         + Sync,
 >;
 
-dyn_clone::clone_trait_object!(
-    Invoker<
-        http::Request<SdkBody>,
-        Response = http::Response<crate::BoxBody>,
-        Error = crate::Error,
-        Future = crate::BoxFuture<http::Response<crate::BoxBody>, crate::Error>,
-    >
-);
-
 pub struct WrapperInvoker<T>(T);
 
 impl<T, ReqBody> Service<http::Request<ReqBody>> for WrapperInvoker<T>
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index 42dfebe..cb6b08c 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-use aws_smithy_http::body::SdkBody;
 use dubbo_base::Url;
+use http::{HeaderValue, Uri};
 use std::{
     fmt::{Debug, Formatter},
     str::FromStr,
@@ -24,27 +24,21 @@
 use tower_service::Service;
 
 use crate::{
-    protocol::Invoker,
-    triple::{client::builder::ClientBoxService, transport::connection::Connection},
-    utils::boxed_clone::BoxCloneService,
+    invoker::clone_body::CloneBody,
+    triple::transport::{self, connection::Connection},
 };
 
-#[derive(Clone)]
 pub struct TripleInvoker {
     url: Url,
-    conn: ClientBoxService,
+    conn: Connection,
 }
 
 impl TripleInvoker {
     pub fn new(url: Url) -> TripleInvoker {
-        let uri = http::Uri::from_str(&url.to_url()).unwrap();
-        let mut conn = Connection::new().with_host(uri.clone());
-        if let Some(scheme) = uri.scheme_str() {
-            conn = conn.with_connector(scheme.to_string());
-        }
+        let uri = http::Uri::from_str(url.as_str()).unwrap();
         Self {
             url,
-            conn: BoxCloneService::new(conn),
+            conn: Connection::new().with_host(uri).build(),
         }
     }
 }
@@ -55,25 +49,107 @@
     }
 }
 
-impl Invoker<http::Request<SdkBody>> for TripleInvoker {
+impl TripleInvoker {
+    pub fn map_request(&self, req: http::Request<CloneBody>) -> http::Request<CloneBody> {
+        let (parts, body) = req.into_parts();
+
+        let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap();
+
+        let authority = self.url.authority();
+
+        let uri = Uri::builder()
+            .scheme("http")
+            .authority(authority)
+            .path_and_query(path_and_query)
+            .build()
+            .unwrap();
+
+        let mut req = hyper::Request::builder()
+            .version(http::Version::HTTP_2)
+            .uri(uri.clone())
+            .method("POST")
+            .body(body)
+            .unwrap();
+
+        // *req.version_mut() = http::Version::HTTP_2;
+        req.headers_mut()
+            .insert("method", HeaderValue::from_static("POST"));
+        req.headers_mut().insert(
+            "scheme",
+            HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
+        );
+        req.headers_mut()
+            .insert("path", HeaderValue::from_str(uri.path()).unwrap());
+        req.headers_mut().insert(
+            "authority",
+            HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
+        );
+        req.headers_mut().insert(
+            "content-type",
+            HeaderValue::from_static("application/grpc+proto"),
+        );
+        req.headers_mut()
+            .insert("user-agent", HeaderValue::from_static("dubbo-rust/0.1.0"));
+        req.headers_mut()
+            .insert("te", HeaderValue::from_static("trailers"));
+        req.headers_mut().insert(
+            "tri-service-version",
+            HeaderValue::from_static("dubbo-rust/0.1.0"),
+        );
+        req.headers_mut()
+            .insert("tri-service-group", HeaderValue::from_static("cluster"));
+        req.headers_mut().insert(
+            "tri-unit-info",
+            HeaderValue::from_static("dubbo-rust/0.1.0"),
+        );
+        // if let Some(_encoding) = self.send_compression_encoding {
+
+        // }
+
+        req.headers_mut()
+            .insert("grpc-encoding", http::HeaderValue::from_static("gzip"));
+
+        req.headers_mut().insert(
+            "grpc-accept-encoding",
+            http::HeaderValue::from_static("gzip"),
+        );
+
+        // // const (
+        // //     TripleContentType    = "application/grpc+proto"
+        // //     TripleUserAgent      = "grpc-go/1.35.0-dev"
+        // //     TripleServiceVersion = "tri-service-version"
+        // //     TripleAttachement    = "tri-attachment"
+        // //     TripleServiceGroup   = "tri-service-group"
+        // //     TripleRequestID      = "tri-req-id"
+        // //     TripleTraceID        = "tri-trace-traceid"
+        // //     TripleTraceRPCID     = "tri-trace-rpcid"
+        // //     TripleTraceProtoBin  = "tri-trace-proto-bin"
+        // //     TripleUnitInfo       = "tri-unit-info"
+        // // )
+        req
+    }
+}
+
+impl Service<http::Request<CloneBody>> for TripleInvoker {
     type Response = http::Response<crate::BoxBody>;
 
     type Error = crate::Error;
 
     type Future = crate::BoxFuture<Self::Response, Self::Error>;
 
-    fn get_url(&self) -> Url {
-        self.url.clone()
-    }
-
-    fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
-        self.conn.call(req)
-    }
-
     fn poll_ready(
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Result<(), Self::Error>> {
-        self.conn.poll_ready(cx)
+        <transport::connection::Connection as Service<http::Request<CloneBody>>>::poll_ready(
+            &mut self.conn,
+            cx,
+        )
+    }
+
+    fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+        let req = self.map_request(req);
+
+        self.conn.call(req)
     }
 }
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs
index 71f6edc..27174ba 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-use std::{boxed::Box, collections::HashMap};
+use std::collections::HashMap;
 
 use async_trait::async_trait;
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
 
 use super::{
     triple_exporter::TripleExporter, triple_invoker::TripleInvoker, triple_server::TripleServer,
@@ -44,8 +44,9 @@
     }
 
     pub fn get_server(&self, url: Url) -> Option<TripleServer> {
+        let interface_name = url.query::<InterfaceName>().unwrap();
         self.servers
-            .get(&url.service_key)
+            .get(interface_name.value().as_str())
             .map(|data| data.to_owned())
     }
 }
@@ -61,8 +62,12 @@
     async fn export(mut self, url: Url) -> BoxExporter {
         // service_key is same to key of TRIPLE_SERVICES
         let server = TripleServer::new();
-        self.servers.insert(url.service_key.clone(), server.clone());
-        server.serve(url.short_url().as_str().into()).await;
+
+        let interface_name = url.query::<InterfaceName>().unwrap();
+        let interface_name = interface_name.value();
+
+        self.servers.insert(interface_name, server.clone());
+        server.serve(url).await;
 
         Box::new(TripleExporter::new())
     }
diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs
index db878d6..093c16e 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-#![allow(unused_variables, dead_code, missing_docs)]
 
 use dubbo_logger::tracing::debug;
 use std::{
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 2a95452..ac4c012 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -15,48 +15,42 @@
  * limitations under the License.
  */
 
-#![allow(unused_variables, dead_code, missing_docs)]
-pub mod integration;
-pub mod memory_registry;
-pub mod protocol;
-pub mod types;
-
+use crate::{extension, extension::registry_extension::proxy::RegistryProxy};
+use dubbo_base::{StdError, Url};
 use std::{
-    fmt::{Debug, Formatter},
-    sync::Arc,
+    future::Future,
+    pin::Pin,
+    task::{Context, Poll},
 };
+use tower_service::Service;
 
-use dubbo_base::Url;
+pub mod integration;
+pub mod protocol;
+pub mod registry;
 
-pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 'static>;
-pub trait Registry {
-    fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
-    fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;
-
-    fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), crate::StdError>;
-    fn unsubscribe(
-        &self,
-        url: Url,
-        listener: RegistryNotifyListener,
-    ) -> Result<(), crate::StdError>;
+#[derive(Clone)]
+pub struct MkRegistryService {
+    registry_url: Url,
 }
 
-pub trait NotifyListener {
-    fn notify(&self, event: ServiceEvent);
-    fn notify_all(&self, event: ServiceEvent);
+impl MkRegistryService {
+    pub fn new(registry_url: Url) -> Self {
+        Self { registry_url }
+    }
 }
 
-#[derive(Debug)]
-pub struct ServiceEvent {
-    pub key: String,
-    pub action: String,
-    pub service: Vec<Url>,
-}
+impl Service<()> for MkRegistryService {
+    type Response = RegistryProxy;
+    type Error = StdError;
+    type Future =
+        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
 
-pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
+    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
 
-impl Debug for BoxRegistry {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.write_str("BoxRegistry")
+    fn call(&mut self, _req: ()) -> Self::Future {
+        let fut = extension::EXTENSIONS.load_registry(self.registry_url.clone());
+        Box::pin(fut)
     }
 }
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 1250950..f909988 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -15,57 +15,43 @@
  * limitations under the License.
  */
 
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
 use dubbo_logger::tracing;
 use std::{
     collections::HashMap,
-    fmt::{Debug, Formatter},
-    sync::{Arc, Mutex, RwLock},
+    sync::{Arc, RwLock},
 };
 
-use super::{memory_registry::MemoryRegistry, BoxRegistry};
 use crate::{
+    extension::registry_extension::{proxy::RegistryProxy, Registry},
     protocol::{
         triple::{triple_exporter::TripleExporter, triple_protocol::TripleProtocol},
         BoxExporter, BoxInvoker, Protocol,
     },
-    registry::types::Registries,
 };
 
 #[derive(Clone, Default)]
 pub struct RegistryProtocol {
     // registerAddr: Registry
-    registries: Option<Registries>,
+    registries: Vec<RegistryProxy>,
     // providerUrl: Exporter
+    #[allow(dead_code)]
     exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
     // serviceName: registryUrls
     services: HashMap<String, Vec<Url>>,
 }
 
-impl Debug for RegistryProtocol {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.write_str(
-            format!(
-                "RegistryProtocol services{:#?},registries,{:#?}",
-                self.services,
-                self.registries.clone()
-            )
-            .as_str(),
-        )
-    }
-}
-
 impl RegistryProtocol {
     pub fn new() -> Self {
         RegistryProtocol {
-            registries: None,
+            registries: Vec::default(),
             exporters: Arc::new(RwLock::new(HashMap::new())),
             services: HashMap::new(),
         }
     }
 
-    pub fn with_registries(mut self, registries: Registries) -> Self {
-        self.registries = Some(registries);
+    pub fn with_registries(mut self, registries: Vec<RegistryProxy>) -> Self {
+        self.registries.extend(registries);
         self
     }
 
@@ -73,18 +59,6 @@
         self.services.extend(services);
         self
     }
-
-    pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
-        let mem = MemoryRegistry::default();
-        self.registries
-            .as_ref()
-            .unwrap()
-            .lock()
-            .unwrap()
-            .insert(url.location, Arc::new(Mutex::new(Box::new(mem.clone()))));
-
-        Box::new(mem)
-    }
 }
 
 #[async_trait::async_trait]
@@ -101,29 +75,29 @@
         // init Exporter based on provider_url
         // server registry based on register_url
         // start server health check
-        let registry_url = self.services.get(url.get_service_name().as_str());
+        let service_name = url.query::<InterfaceName>().unwrap();
+        let registry_url = self.services.get(service_name.as_str().as_ref());
         if let Some(urls) = registry_url {
-            for url in urls.clone().iter() {
-                if !url.service_key.is_empty() {
-                    let mut reg = self.get_registry(url.clone());
-                    reg.register(url.clone()).unwrap();
+            for url in urls.iter() {
+                for registry_proxy in &self.registries {
+                    let _ = registry_proxy.register(url.clone()).await;
                 }
             }
         }
 
-        match url.clone().scheme.as_str() {
+        match url.clone().protocol() {
             "tri" => {
                 let pro = Box::new(TripleProtocol::new());
                 return pro.export(url).await;
             }
             _ => {
-                tracing::error!("base {:?} not implemented", url.scheme);
+                tracing::error!("base {:?} not implemented", url.protocol());
                 Box::new(TripleExporter::new())
             }
         }
     }
 
-    async fn refer(self, url: Url) -> Self::Invoker {
+    async fn refer(self, _url: Url) -> Self::Invoker {
         // getRegisterUrl
         // get Registry from registry_url
         // init directory based on registry_url and Registry
diff --git a/dubbo/src/registry/registry.rs b/dubbo/src/registry/registry.rs
new file mode 100644
index 0000000..2fc8258
--- /dev/null
+++ b/dubbo/src/registry/registry.rs
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::collections::{HashMap, HashSet};
+
+use async_trait::async_trait;
+use itertools::Itertools;
+
+use tokio::sync::{
+    mpsc::{self},
+    Mutex,
+};
+
+use dubbo_base::{
+    extension_param::{ExtensionName, ExtensionType},
+    registry_param::{InterfaceName, RegistryUrl, StaticInvokerUrls},
+    url::UrlParam,
+    StdError, Url,
+};
+
+use crate::extension::{
+    registry_extension::{DiscoverStream, Registry, ServiceChange},
+    Extension,
+};
+
+pub struct StaticServiceValues {
+    listeners: Vec<mpsc::Sender<Result<ServiceChange, StdError>>>,
+    urls: HashSet<String>,
+}
+
+pub struct StaticRegistry {
+    urls: Mutex<HashMap<String, StaticServiceValues>>,
+    self_url: Url,
+}
+
+impl StaticRegistry {
+    pub fn to_extension_url(static_invoker_urls: Vec<Url>) -> Url {
+        let static_invoker_urls: StaticInvokerUrls =
+            static_invoker_urls.iter().join(",").parse().unwrap();
+        let mut static_registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap();
+
+        static_registry_extension_loader_url.add_query_param(ExtensionType::Registry);
+        static_registry_extension_loader_url.add_query_param(ExtensionName::new(Self::name()));
+        static_registry_extension_loader_url
+            .add_query_param(RegistryUrl::new("static://127.0.0.1".parse().unwrap()));
+        static_registry_extension_loader_url.add_query_param(static_invoker_urls);
+
+        static_registry_extension_loader_url
+    }
+}
+
+impl StaticRegistry {
+    pub fn new(url: Url) -> Self {
+        let static_urls = url.query::<StaticInvokerUrls>();
+        let static_urls = match static_urls {
+            None => Vec::default(),
+            Some(static_urls) => static_urls.value(),
+        };
+
+        let mut map = HashMap::with_capacity(static_urls.len());
+
+        for url in static_urls {
+            let interface_name = url.query::<InterfaceName>().unwrap();
+            let interface_name = interface_name.value();
+
+            let static_values = map
+                .entry(interface_name)
+                .or_insert_with(|| StaticServiceValues {
+                    listeners: Vec::new(),
+                    urls: HashSet::new(),
+                });
+            let url = url.to_string();
+            static_values.urls.insert(url.clone());
+        }
+
+        let self_url = "static://0.0.0.0".parse().unwrap();
+
+        Self {
+            urls: Mutex::new(map),
+            self_url,
+        }
+    }
+}
+
+impl Default for StaticRegistry {
+    fn default() -> Self {
+        let self_url = "static://0.0.0.0".parse().unwrap();
+
+        Self {
+            self_url,
+            urls: Mutex::new(HashMap::new()),
+        }
+    }
+}
+#[async_trait]
+impl Registry for StaticRegistry {
+    async fn register(&self, url: Url) -> Result<(), StdError> {
+        let interface_name = url.query::<InterfaceName>().unwrap();
+        let interface_name = interface_name.value();
+
+        let mut lock = self.urls.lock().await;
+
+        let static_values = lock
+            .entry(interface_name)
+            .or_insert_with(|| StaticServiceValues {
+                listeners: Vec::new(),
+                urls: HashSet::new(),
+            });
+        let url = url.to_string();
+        static_values.urls.insert(url.clone());
+
+        static_values.listeners.retain(|listener| {
+            let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
+            ret.is_ok()
+        });
+
+        Ok(())
+    }
+
+    async fn unregister(&self, url: Url) -> Result<(), StdError> {
+        let interface_name = url.query::<InterfaceName>().unwrap();
+        let interface_name = interface_name.value();
+
+        let mut lock = self.urls.lock().await;
+
+        match lock.get_mut(&interface_name) {
+            None => Ok(()),
+            Some(static_values) => {
+                let url = url.to_string();
+                static_values.urls.remove(&url);
+                static_values.listeners.retain(|listener| {
+                    let ret = listener.try_send(Ok(ServiceChange::Remove(url.clone())));
+                    ret.is_ok()
+                });
+                if static_values.urls.is_empty() {
+                    lock.remove(&interface_name);
+                }
+                Ok(())
+            }
+        }
+    }
+
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+        let interface_name = url.query::<InterfaceName>().unwrap();
+        let interface_name = interface_name.value();
+
+        let change_rx = {
+            let mut lock = self.urls.lock().await;
+            let static_values = lock
+                .entry(interface_name)
+                .or_insert_with(|| StaticServiceValues {
+                    listeners: Vec::new(),
+                    urls: HashSet::new(),
+                });
+
+            let (tx, change_rx) = mpsc::channel(64);
+            static_values.listeners.push(tx);
+
+            for listener in &static_values.listeners {
+                for url in &static_values.urls {
+                    let _ = listener
+                        .send(Ok(ServiceChange::Insert(url.clone(), ())))
+                        .await;
+                }
+            }
+
+            change_rx
+        };
+
+        Ok(change_rx)
+    }
+
+    async fn unsubscribe(&self, _url: Url) -> Result<(), StdError> {
+        Ok(())
+    }
+
+    fn url(&self) -> &Url {
+        &self.self_url
+    }
+}
+
+#[async_trait::async_trait]
+impl Extension for StaticRegistry {
+    type Target = Box<dyn Registry + Send + Sync + 'static>;
+
+    fn name() -> String {
+        "static".to_string()
+    }
+
+    async fn create(url: Url) -> Result<Self::Target, StdError> {
+        // url example:
+        // extension://0.0.0.0?extension-type=registry&extension-name=static&registry=static://127.0.0.1
+        let static_invoker_urls = url.query::<StaticInvokerUrls>();
+
+        let registry_url = url.query::<RegistryUrl>().unwrap();
+        let mut registry_url = registry_url.value();
+
+        if let Some(static_invoker_urls) = static_invoker_urls {
+            registry_url.add_query_param(static_invoker_urls);
+        }
+
+        let static_registry = StaticRegistry::new(registry_url);
+
+        Ok(Box::new(static_registry))
+    }
+}
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
deleted file mode 100644
index ae7c7ca..0000000
--- a/dubbo/src/registry/types.rs
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use std::{
-    collections::HashMap,
-    sync::{Arc, Mutex},
-};
-
-use dubbo_base::Url;
-use dubbo_logger::tracing::info;
-use itertools::Itertools;
-
-use crate::{
-    registry::{BoxRegistry, Registry},
-    StdError,
-};
-
-use super::RegistryNotifyListener;
-
-pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
-pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;
-
-pub const DEFAULT_REGISTRY_KEY: &str = "default";
-
-pub trait RegistriesOperation {
-    fn get(&self, registry_key: &str) -> SafeRegistry;
-    fn insert(&self, registry_key: String, registry: SafeRegistry);
-    fn default_registry(&self) -> SafeRegistry;
-}
-
-impl RegistriesOperation for Registries {
-    fn get(&self, registry_key: &str) -> SafeRegistry {
-        self.as_ref()
-            .lock()
-            .unwrap()
-            .get(registry_key)
-            .unwrap()
-            .clone()
-    }
-
-    fn insert(&self, registry_key: String, registry: SafeRegistry) {
-        self.as_ref().lock().unwrap().insert(registry_key, registry);
-    }
-
-    fn default_registry(&self) -> SafeRegistry {
-        let guard = self.as_ref().lock().unwrap();
-        let (_, result) = guard
-            .iter()
-            .find_or_first(|e| e.0 == DEFAULT_REGISTRY_KEY)
-            .unwrap()
-            .to_owned();
-        result.clone()
-    }
-}
-
-impl Registry for SafeRegistry {
-    fn register(&mut self, url: Url) -> Result<(), StdError> {
-        info!("register {}.", url);
-        self.lock().unwrap().register(url).expect("registry err.");
-        Ok(())
-    }
-
-    fn unregister(&mut self, url: Url) -> Result<(), StdError> {
-        self.lock().unwrap().register(url).expect("registry err.");
-        Ok(())
-    }
-
-    fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
-        self.lock().unwrap().register(url).expect("registry err.");
-        Ok(())
-    }
-
-    fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
-        self.lock().unwrap().register(url).expect("registry err.");
-        Ok(())
-    }
-}
diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs
new file mode 100644
index 0000000..28dfda7
--- /dev/null
+++ b/dubbo/src/route/mod.rs
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::pin::Pin;
+
+use dubbo_base::StdError;
+use dubbo_logger::tracing::debug;
+use futures_core::{ready, Future};
+use futures_util::{future::Ready, FutureExt, TryFutureExt};
+use tower::{buffer::Buffer, util::FutureService};
+use tower_service::Service;
+
+use crate::{
+    codegen::{RpcInvocation, TripleInvoker},
+    invoker::clone_invoker::CloneInvoker,
+    param::Param,
+    svc::NewService,
+};
+
+pub struct NewRoutes<N> {
+    inner: N,
+}
+
+pub struct NewRoutesFuture<S, T> {
+    inner: RoutesFutureInnerState<S>,
+    #[allow(dead_code)]
+    target: T,
+}
+
+pub enum RoutesFutureInnerState<S> {
+    Service(S),
+    Future(
+        Pin<
+            Box<
+                dyn Future<Output = Result<Vec<CloneInvoker<TripleInvoker>>, StdError>>
+                    + Send
+                    + 'static,
+            >,
+        >,
+    ),
+    Ready(Vec<CloneInvoker<TripleInvoker>>),
+}
+
+#[derive(Clone)]
+pub struct Routes<T> {
+    #[allow(dead_code)]
+    target: T,
+    invokers: Vec<CloneInvoker<TripleInvoker>>,
+}
+
+impl<N> NewRoutes<N> {
+    pub fn new(inner: N) -> Self {
+        Self { inner }
+    }
+}
+
+impl<N> NewRoutes<N> {
+    const MAX_ROUTE_BUFFER_SIZE: usize = 16;
+
+    pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+        tower_layer::layer_fn(|inner: N| NewRoutes::new(inner))
+    }
+}
+
+impl<N, T> NewService<T> for NewRoutes<N>
+where
+    T: Param<RpcInvocation> + Clone + Send + Unpin + 'static,
+    // NewDirectory
+    N: NewService<T>,
+    // Directory
+    N::Service: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin + Send + 'static,
+    <N::Service as Service<()>>::Error: Into<StdError>,
+    <N::Service as Service<()>>::Future: Send + 'static,
+{
+    type Service =
+        Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, T>, Routes<T>>, ()>;
+
+    fn new_service(&self, target: T) -> Self::Service {
+        let inner = self.inner.new_service(target.clone());
+
+        Buffer::new(
+            FutureService::new(NewRoutesFuture {
+                inner: RoutesFutureInnerState::Service(inner),
+                target,
+            }),
+            Self::MAX_ROUTE_BUFFER_SIZE,
+        )
+    }
+}
+
+impl<N, T> Future for NewRoutesFuture<N, T>
+where
+    T: Param<RpcInvocation> + Clone + Unpin,
+    // Directory
+    N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin,
+    N::Error: Into<StdError>,
+    N::Future: Send + 'static,
+{
+    type Output = Result<Routes<T>, StdError>;
+
+    fn poll(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Self::Output> {
+        let this = self.get_mut();
+
+        loop {
+            match this.inner {
+                RoutesFutureInnerState::Service(ref mut service) => {
+                    debug!("RoutesFutureInnerState::Service");
+                    let _ = ready!(service.poll_ready(cx)).map_err(Into::into)?;
+                    let fut = service.call(()).map_err(|e| e.into()).boxed();
+                    this.inner = RoutesFutureInnerState::Future(fut);
+                }
+                RoutesFutureInnerState::Future(ref mut futures) => {
+                    debug!("RoutesFutureInnerState::Future");
+                    let invokers = ready!(futures.as_mut().poll(cx))?;
+                    this.inner = RoutesFutureInnerState::Ready(invokers);
+                }
+                RoutesFutureInnerState::Ready(ref invokers) => {
+                    debug!("RoutesFutureInnerState::Ready");
+                    let target = this.target.clone();
+                    return std::task::Poll::Ready(Ok(Routes {
+                        invokers: invokers.clone(),
+                        target,
+                    }));
+                }
+            }
+        }
+    }
+}
+
+impl<T> Service<()> for Routes<T>
+where
+    T: Param<RpcInvocation> + Clone,
+{
+    type Response = Vec<CloneInvoker<TripleInvoker>>;
+
+    type Error = StdError;
+
+    type Future = Ready<Result<Self::Response, Self::Error>>;
+
+    fn poll_ready(
+        &mut self,
+        _: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), Self::Error>> {
+        std::task::Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, _: ()) -> Self::Future {
+        // some router operator
+        // if new_invokers changed, send new invokers to routes_rx after router operator
+        futures_util::future::ok(self.invokers.clone())
+    }
+}
diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs
new file mode 100644
index 0000000..f846663
--- /dev/null
+++ b/dubbo/src/svc.rs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::sync::Arc;
+
+pub trait NewService<T> {
+    type Service;
+
+    fn new_service(&self, target: T) -> Self::Service;
+}
+
+pub struct ArcNewService<T, S> {
+    inner: Arc<dyn NewService<T, Service = S> + Send + Sync>,
+}
+
+impl<T, S> ArcNewService<T, S> {
+    pub fn layer<N>() -> impl tower_layer::Layer<N, Service = Self> + Clone + Copy
+    where
+        N: NewService<T, Service = S> + Send + Sync + 'static,
+        S: Send + 'static,
+    {
+        tower_layer::layer_fn(Self::new)
+    }
+
+    pub fn new<N>(inner: N) -> Self
+    where
+        N: NewService<T, Service = S> + Send + Sync + 'static,
+        S: Send + 'static,
+    {
+        Self {
+            inner: Arc::new(inner),
+        }
+    }
+}
+
+impl<T, S> Clone for ArcNewService<T, S> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: self.inner.clone(),
+        }
+    }
+}
+
+impl<T, S> NewService<T> for ArcNewService<T, S> {
+    type Service = S;
+
+    fn new_service(&self, t: T) -> S {
+        self.inner.new_service(t)
+    }
+}
diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs
index 9dae0f9..94c855b 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -18,28 +18,27 @@
 use std::sync::Arc;
 
 use crate::{
-    cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory},
-    codegen::{RegistryDirectory, RpcInvocation, TripleInvoker},
-    protocol::BoxInvoker,
-    triple::compression::CompressionEncoding,
-    utils::boxed_clone::BoxCloneService,
+    cluster::NewCluster, directory::NewCachedDirectory, extension, loadbalancer::NewLoadBalancer,
+    route::NewRoutes, utils::boxed_clone::BoxCloneService,
 };
 
+use crate::registry::{registry::StaticRegistry, MkRegistryService};
 use aws_smithy_http::body::SdkBody;
 use dubbo_base::Url;
-
-use super::TripleClient;
+use tower::ServiceBuilder;
 
 pub type ClientBoxService =
     BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, crate::Error>;
 
-#[derive(Clone, Debug, Default)]
+pub type ServiceMK =
+    Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<MkRegistryService>>>>>;
+
+#[derive(Default)]
 pub struct ClientBuilder {
     pub timeout: Option<u64>,
     pub connector: &'static str,
-    directory: Option<Box<dyn Directory>>,
+    registry_extension_url: Option<Url>,
     pub direct: bool,
-    host: String,
 }
 
 impl ClientBuilder {
@@ -47,19 +46,18 @@
         ClientBuilder {
             timeout: None,
             connector: "",
-            directory: None,
+            registry_extension_url: None,
             direct: false,
-            host: "".to_string(),
         }
     }
 
     pub fn from_static(host: &str) -> ClientBuilder {
+        let registry_extension_url = StaticRegistry::to_extension_url(vec![host.parse().unwrap()]);
         Self {
             timeout: None,
             connector: "",
-            directory: Some(Box::new(StaticDirectory::new(&host))),
+            registry_extension_url: Some(registry_extension_url),
             direct: true,
-            host: host.to_string(),
         }
     }
 
@@ -70,64 +68,44 @@
         }
     }
 
-    /// host: http://0.0.0.0:8888
-    pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
+    pub fn with_registry(self, registry: Url) -> Self {
+        let registry_extension_url = extension::registry_extension::to_extension_url(registry);
         Self {
-            directory: Some(directory),
-            ..self
-        }
-    }
-
-    pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self {
-        Self {
-            directory: Some(Box::new(registry)),
+            registry_extension_url: Some(registry_extension_url),
             ..self
         }
     }
 
     pub fn with_host(self, host: &'static str) -> Self {
+        let registry_extension_url = StaticRegistry::to_extension_url(vec![host.parse().unwrap()]);
+
         Self {
-            directory: Some(Box::new(StaticDirectory::new(&host))),
+            registry_extension_url: Some(registry_extension_url),
             ..self
         }
     }
 
     pub fn with_connector(self, connector: &'static str) -> Self {
-        Self {
-            connector: connector,
-            ..self
-        }
+        Self { connector, ..self }
     }
 
     pub fn with_direct(self, direct: bool) -> Self {
         Self { direct, ..self }
     }
 
-    pub(crate) fn direct_build(self) -> TripleClient {
-        let mut cli = TripleClient {
-            send_compression_encoding: Some(CompressionEncoding::Gzip),
-            builder: Some(self.clone()),
-            invoker: None,
-        };
-        cli.invoker = Some(Box::new(TripleInvoker::new(
-            Url::from_url(&self.host).unwrap(),
-        )));
-        return cli;
-    }
+    pub fn build(mut self) -> ServiceMK {
+        let registry = self
+            .registry_extension_url
+            .take()
+            .expect("registry must not be empty");
 
-    pub fn build(self, invocation: Arc<RpcInvocation>) -> Option<BoxInvoker> {
-        if self.direct {
-            return Some(Box::new(TripleInvoker::new(
-                Url::from_url(&self.host).unwrap(),
-            )));
-        }
-        let invokers = match self.directory {
-            Some(v) => v.list(invocation),
-            None => panic!("use direct connection"),
-        };
+        let mk_service = ServiceBuilder::new()
+            .layer(NewCluster::layer())
+            .layer(NewLoadBalancer::layer())
+            .layer(NewRoutes::layer())
+            .layer(NewCachedDirectory::layer())
+            .service(MkRegistryService::new(registry));
 
-        let cluster = MockCluster::default().join(Box::new(MockDirectory::new(invokers)));
-
-        return Some(cluster);
+        Arc::new(mk_service)
     }
 }
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index d4bc220..46f8e9c 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,22 +15,19 @@
  * limitations under the License.
  */
 
-use std::str::FromStr;
-
-use futures_util::{future, stream, StreamExt, TryStreamExt};
-
 use aws_smithy_http::body::SdkBody;
+use futures_util::{future, stream, StreamExt, TryStreamExt};
 use http::HeaderValue;
 use prost::Message;
 use serde::{Deserialize, Serialize};
+use tower_service::Service;
 
-use super::builder::ClientBuilder;
 use crate::codegen::{ProstCodec, RpcInvocation, SerdeCodec};
 
 use crate::{
     invocation::{IntoStreamingRequest, Metadata, Request, Response},
-    protocol::BoxInvoker,
     status::Status,
+    svc::NewService,
     triple::{
         codec::{Codec, Decoder, Encoder},
         compression::CompressionEncoding,
@@ -39,25 +36,29 @@
     },
 };
 
-#[derive(Debug, Clone, Default)]
+use super::builder::{ClientBuilder, ServiceMK};
+
+#[derive(Clone)]
 pub struct TripleClient {
     pub(crate) send_compression_encoding: Option<CompressionEncoding>,
-    pub(crate) builder: Option<ClientBuilder>,
-    pub invoker: Option<BoxInvoker>,
+    pub(crate) mk: ServiceMK,
 }
 
 impl TripleClient {
     pub fn connect(host: String) -> Self {
         let builder = ClientBuilder::from_static(&host).with_direct(true);
+        let mk = builder.build();
 
-        builder.direct_build()
+        TripleClient {
+            send_compression_encoding: Some(CompressionEncoding::Gzip),
+            mk,
+        }
     }
 
     pub fn new(builder: ClientBuilder) -> Self {
         TripleClient {
             send_compression_encoding: Some(CompressionEncoding::Gzip),
-            builder: Some(builder),
-            invoker: None,
+            mk: builder.build(),
         }
     }
 
@@ -155,24 +156,16 @@
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
-        let bytes = hyper::body::to_bytes(body).await.unwrap();
-        let sdk_body = SdkBody::from(bytes);
 
-        let mut conn = match self.invoker.clone() {
-            Some(v) => v,
-            None => self
-                .builder
-                .clone()
-                .unwrap()
-                .build(invocation.into())
-                .unwrap(),
-        };
+        let mut invoker = self.mk.new_service(invocation);
 
-        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
-        let req = self.map_request(http_uri.clone(), path, sdk_body);
+        let request = http::Request::builder()
+            .header("path", path.to_string())
+            .body(body)
+            .unwrap();
 
-        let response = conn
-            .call(req)
+        let response = invoker
+            .call(request)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
 
@@ -226,23 +219,16 @@
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
-        let sdk_body = SdkBody::from(body);
 
-        let mut conn = match self.invoker.clone() {
-            Some(v) => v,
-            None => self
-                .builder
-                .clone()
-                .unwrap()
-                .build(invocation.into())
-                .unwrap(),
-        };
+        let mut invoker = self.mk.new_service(invocation);
 
-        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
-        let req = self.map_request(http_uri.clone(), path, sdk_body);
+        let request = http::Request::builder()
+            .header("path", path.to_string())
+            .body(body)
+            .unwrap();
 
-        let response = conn
-            .call(req)
+        let response = invoker
+            .call(request)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
 
@@ -280,24 +266,16 @@
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
-        let sdk_body = SdkBody::from(body);
+        let mut invoker = self.mk.new_service(invocation);
 
-        let mut conn = match self.invoker.clone() {
-            Some(v) => v,
-            None => self
-                .builder
-                .clone()
-                .unwrap()
-                .build(invocation.into())
-                .unwrap(),
-        };
-
-        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
-        let req = self.map_request(http_uri.clone(), path, sdk_body);
+        let request = http::Request::builder()
+            .header("path", path.to_string())
+            .body(body)
+            .unwrap();
 
         // let mut conn = Connection::new().with_host(http_uri);
-        let response = conn
-            .call(req)
+        let response = invoker
+            .call(request)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
 
@@ -351,22 +329,15 @@
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
-        let sdk_body = SdkBody::from(body);
+        let mut invoker = self.mk.new_service(invocation);
 
-        let mut conn = match self.invoker.clone() {
-            Some(v) => v,
-            None => self
-                .builder
-                .clone()
-                .unwrap()
-                .build(invocation.into())
-                .unwrap(),
-        };
-        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
-        let req = self.map_request(http_uri.clone(), path, sdk_body);
+        let request = http::Request::builder()
+            .header("path", path.to_string())
+            .body(body)
+            .unwrap();
 
-        let response = conn
-            .call(req)
+        let response = invoker
+            .call(request)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
 
diff --git a/dubbo/src/triple/server/builder.rs b/dubbo/src/triple/server/builder.rs
index 9ef7152..b473dd8 100644
--- a/dubbo/src/triple/server/builder.rs
+++ b/dubbo/src/triple/server/builder.rs
@@ -21,7 +21,7 @@
     str::FromStr,
 };
 
-use dubbo_base::Url;
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
 use dubbo_logger::tracing;
 use http::{Request, Response, Uri};
 use hyper::body::Body;
@@ -132,7 +132,7 @@
 
 impl From<Url> for ServerBuilder {
     fn from(u: Url) -> Self {
-        let uri = match http::Uri::from_str(&u.raw_url_string()) {
+        let uri = match http::Uri::from_str(&u.as_str()) {
             Ok(v) => v,
             Err(err) => {
                 tracing::error!("http uri parse error: {}, url: {:?}", err, &u);
@@ -142,10 +142,14 @@
 
         let authority = uri.authority().unwrap();
 
+        let service_name = u.query::<InterfaceName>().unwrap().value();
+
         Self {
-            listener: u.get_param("listener").unwrap_or("tcp".to_string()),
+            listener: u
+                .query_param_by_key("listener")
+                .unwrap_or("tcp".to_string()),
             addr: authority.to_string().to_socket_addrs().unwrap().next(),
-            service_names: vec![u.service_name],
+            service_names: vec![service_name],
             server: DubboServer::default(),
             certs: Vec::new(),
             keys: Vec::new(),
diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs
index 10a879a..4369b82 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -15,19 +15,23 @@
  * limitations under the License.
  */
 
-use std::task::Poll;
-
-use dubbo_logger::tracing::debug;
+use dubbo_base::StdError;
 use hyper::client::{conn::Builder, service::Connect};
 use tower_service::Service;
 
-use crate::{boxed, triple::transport::connector::get_connector};
+use crate::{boxed, invoker::clone_body::CloneBody, triple::transport::connector::get_connector};
 
-#[derive(Debug, Clone)]
+type HyperConnect = Connect<
+    crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, StdError>,
+    CloneBody,
+    http::Uri,
+>;
+
 pub struct Connection {
     host: hyper::Uri,
     connector: String,
     builder: Builder,
+    connect: Option<HyperConnect>,
 }
 
 impl Default for Connection {
@@ -42,6 +46,7 @@
             host: hyper::Uri::default(),
             connector: "http".to_string(),
             builder: Builder::new(),
+            connect: None,
         }
     }
 
@@ -59,14 +64,16 @@
         self.builder = builder;
         self
     }
+
+    pub fn build(mut self) -> Self {
+        let builder = self.builder.clone().http2_only(true).to_owned();
+        let hyper_connect: HyperConnect = Connect::new(get_connector(&self.connector), builder);
+        self.connect = Some(hyper_connect);
+        self
+    }
 }
 
-impl<ReqBody> Service<http::Request<ReqBody>> for Connection
-where
-    ReqBody: http_body::Body + Unpin + Send + 'static,
-    ReqBody::Data: Send + Unpin,
-    ReqBody::Error: Into<crate::Error>,
-{
+impl Service<http::Request<CloneBody>> for Connection {
     type Response = http::Response<crate::BoxBody>;
 
     type Error = crate::Error;
@@ -75,25 +82,34 @@
 
     fn poll_ready(
         &mut self,
-        _cx: &mut std::task::Context<'_>,
+        cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
+        match self.connect {
+            None => {
+                panic!("connection must be built before use")
+            }
+            Some(ref mut connect) => connect.poll_ready(cx).map_err(|e| e.into()),
+        }
     }
 
-    fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
-        let builder = self.builder.clone().http2_only(true).to_owned();
-        let mut connector = Connect::new(get_connector(self.connector.as_str()), builder);
-        let uri = self.host.clone();
-        let fut = async move {
-            debug!("send base call to {}", uri);
-            let mut con = connector.call(uri).await.unwrap();
+    fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+        match self.connect {
+            None => {
+                panic!("connection must be built before use")
+            }
+            Some(ref mut connect) => {
+                let uri = self.host.clone();
+                let call_fut = connect.call(uri);
+                let fut = async move {
+                    let mut con = call_fut.await.unwrap();
+                    con.call(req)
+                        .await
+                        .map_err(|err| err.into())
+                        .map(|res| res.map(boxed))
+                };
 
-            con.call(req)
-                .await
-                .map_err(|err| err.into())
-                .map(|res| res.map(boxed))
-        };
-
-        Box::pin(fut)
+                return Box::pin(fut);
+            }
+        }
     }
 }
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index 1a21000..8f6a7bf 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -34,9 +34,10 @@
     // let builder = ClientBuilder::new()
     //     .with_connector("unix")
     //     .with_host("unix://127.0.0.1:8888");
-    let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888")
-        .with_timeout(1000000)
-        .with_direct(true);
+    let builder =
+        ClientBuilder::from_static(&"http://127.0.0.1:8888?interface=grpc.examples.echo.Echo")
+            .with_timeout(1000000)
+            .with_direct(true);
     let mut cli = EchoClient::new(builder);
     // let mut unary_cli = cli.clone().with_filter(FakeFilter {});
     // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs
index 90efc1a..010821e 100644
--- a/examples/echo/src/echo/server.rs
+++ b/examples/echo/src/echo/server.rs
@@ -70,7 +70,8 @@
     // Dubbo::new()
     //     .with_config({
     //         let mut r = RootConfig::new();
-    //         r.test_config();
+    //         r.test_config
+    // ();
     //         r
     //     })
     //     .start()
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs
index 0a74655..fc48dc5 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -21,7 +21,7 @@
     #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
     use dubbo::codegen::*;
     /// Echo is the echo service.
-    #[derive(Debug, Clone, Default)]
+    #[derive(Clone)]
     pub struct EchoClient {
         inner: TripleClient,
     }
diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs
index eed3e52..14743ae 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -20,35 +20,20 @@
     include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
 }
 
-use std::env;
-
 use dubbo::codegen::*;
 
-use dubbo_base::Url;
+use dubbo::extension;
 use futures_util::StreamExt;
 use protos::{greeter_client::GreeterClient, GreeterRequest};
 use registry_nacos::NacosRegistry;
-use registry_zookeeper::ZookeeperRegistry;
 
 #[tokio::main]
 async fn main() {
     dubbo_logger::init();
 
-    let mut builder = ClientBuilder::new();
+    let _ = extension::EXTENSIONS.register::<NacosRegistry>().await;
 
-    if let Ok(zk_servers) = env::var("ZOOKEEPER_SERVERS") {
-        let zkr = ZookeeperRegistry::new(&zk_servers);
-        let directory = RegistryDirectory::new(Box::new(zkr));
-        builder = builder.with_directory(Box::new(directory));
-    } else if let Ok(nacos_url_str) = env::var("NACOS_URL") {
-        // NACOS_URL=nacos://mse-96efa264-p.nacos-ans.mse.aliyuncs.com
-        let nacos_url = Url::from_url(&nacos_url_str).unwrap();
-        let registry = NacosRegistry::new(nacos_url);
-        let directory = RegistryDirectory::new(Box::new(registry));
-        builder = builder.with_directory(Box::new(directory));
-    } else {
-        builder = builder.with_host("http://127.0.0.1:8888");
-    }
+    let builder = ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap());
 
     let mut cli = GreeterClient::new(builder);
 
@@ -60,7 +45,7 @@
         .await;
     let resp = match resp {
         Ok(resp) => resp,
-        Err(err) => return println!("{:?}", err),
+        Err(err) => return println!("response error: {:?}", err),
     };
     let (_parts, body) = resp.into_parts();
     println!("Response: {:?}", body);
diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs
index f143e25..6bdbd00 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -19,10 +19,11 @@
 
 use async_trait::async_trait;
 use futures_util::{Stream, StreamExt};
+use registry_nacos::NacosRegistry;
 use tokio::sync::mpsc;
 use tokio_stream::wrappers::ReceiverStream;
 
-use dubbo::{codegen::*, Dubbo};
+use dubbo::{codegen::*, extension, Dubbo};
 use dubbo_config::RootConfig;
 use dubbo_logger::{
     tracing::{info, span},
@@ -32,8 +33,6 @@
     greeter_server::{register_server, Greeter},
     GreeterReply, GreeterRequest,
 };
-use registry_zookeeper::ZookeeperRegistry;
-
 pub mod protos {
     #![allow(non_camel_case_types)]
     include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
@@ -50,15 +49,18 @@
     register_server(GreeterServerImpl {
         name: "greeter".to_string(),
     });
-    let zkr = ZookeeperRegistry::default();
+    // let zkr: ZookeeperRegistry = ZookeeperRegistry::default();
     let r = RootConfig::new();
     let r = match r.load() {
         Ok(config) => config,
         Err(_err) => panic!("err: {:?}", _err), // response was droped
     };
+
+    let _ = extension::EXTENSIONS.register::<NacosRegistry>().await;
     let mut f = Dubbo::new()
         .with_config(r)
-        .add_registry("zookeeper", Box::new(zkr));
+        .add_registry("nacos://127.0.0.1:8848/");
+
     f.start().await;
 }
 
diff --git a/protocol/base/src/invoker.rs b/protocol/base/src/invoker.rs
index 14de6ce..d676fa4 100644
--- a/protocol/base/src/invoker.rs
+++ b/protocol/base/src/invoker.rs
@@ -66,9 +66,9 @@
 impl Display for BaseInvoker {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("Invoker")
-            .field("protocol", &self.url.scheme)
-            .field("host", &self.url.ip)
-            .field("path", &self.url.location)
+            .field("protocol", &self.url.protocol())
+            .field("host", &self.url.host())
+            .field("path", &self.url.path())
             .finish()
     }
 }
diff --git a/registry/nacos/Cargo.toml b/registry/nacos/Cargo.toml
index 2437bc0..1fa6c19 100644
--- a/registry/nacos/Cargo.toml
+++ b/registry/nacos/Cargo.toml
@@ -9,13 +9,15 @@
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-nacos-sdk = { version = "0.2.3", features = ["naming", "auth-by-http"] }
+nacos-sdk = { version = "0.3", features = ["naming", "auth-by-http", "async"] }
 dubbo.workspace = true
 serde_json.workspace = true
 serde = { workspace = true, features = ["derive"] }
 anyhow.workspace = true
 dubbo-logger.workspace = true
 dubbo-base.workspace = true
+tokio.workspace = true
+async-trait.workspace = true
 
 [dev-dependencies]
 tracing-subscriber = "0.3.16"
diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs
index bbbaaa2..9ac7022 100644
--- a/registry/nacos/src/lib.rs
+++ b/registry/nacos/src/lib.rs
@@ -14,362 +14,379 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-mod utils;
 
-use dubbo_base::Url;
-use std::{
-    collections::{HashMap, HashSet},
-    sync::{Arc, Mutex},
+use async_trait::async_trait;
+use dubbo_base::{StdError, Url};
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::mpsc;
+
+use dubbo::extension::{
+    registry_extension::{DiscoverStream, Registry, ServiceChange},
+    Extension,
 };
-
-use anyhow::anyhow;
-use dubbo::registry::{NotifyListener, Registry, RegistryNotifyListener, ServiceEvent};
-use dubbo_logger::tracing::{error, info, warn};
-use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
-
-use crate::utils::{build_nacos_client_props, is_concrete_str, is_wildcard_str, match_range};
-
-const VERSION_KEY: &str = "version";
-
-const GROUP_KEY: &str = "group";
-
-const DEFAULT_GROUP: &str = "DEFAULT_GROUP";
-
-const PROVIDER_SIDE: &str = "provider";
-
-const DEFAULT_CATEGORY: &str = PROVIDERS_CATEGORY;
-
-const SIDE_KEY: &str = "side";
-
-const REGISTER_CONSUMER_URL_KEY: &str = "register-consumer-url";
-
-const SERVICE_NAME_SEPARATOR: &str = ":";
-
-const CATEGORY_KEY: &str = "category";
-
-const PROVIDERS_CATEGORY: &str = "providers";
-
-#[allow(dead_code)]
-const ADMIN_PROTOCOL: &str = "admin";
-
-#[allow(dead_code)]
-const INNERCLASS_SYMBOL: &str = "$";
-
-#[allow(dead_code)]
-const INNERCLASS_COMPATIBLE_SYMBOL: &str = "___";
+use dubbo_base::{
+    registry_param::{
+        AppName, Category, Group, InterfaceName, RegistryUrl, ServiceNamespace, Version,
+    },
+    url::UrlParam,
+};
+use dubbo_logger::tracing::info;
+use nacos_sdk::api::{
+    naming::{NamingEventListener, NamingService, NamingServiceBuilder, ServiceInstance},
+    props::ClientProps,
+};
+use tokio::sync::{watch, Notify};
 
 pub struct NacosRegistry {
-    nacos_naming_service: Arc<dyn NamingService + Sync + Send + 'static>,
-    listeners: Mutex<HashMap<String, HashSet<Arc<NotifyListenerWrapper>>>>,
+    url: Url,
+    nacos_service: Arc<dyn NamingService + Send + Sync>,
 }
 
 impl NacosRegistry {
-    pub fn new(url: Url) -> Self {
-        let (nacos_client_props, enable_auth) = build_nacos_client_props(&url);
+    pub fn new(url: Url, nacos_service: Arc<dyn NamingService + Send + Sync>) -> Self {
+        Self { url, nacos_service }
+    }
+
+    fn create_nacos_service_instance(url: &Url) -> ServiceInstance {
+        let ip = url.host().unwrap();
+        let port = url.port().unwrap();
+
+        ServiceInstance {
+            ip: ip.to_string(),
+            port: port.into(),
+            metadata: url.all_query_params(),
+            ..Default::default()
+        }
+    }
+
+    fn diff<'a>(
+        old_service: &'a Vec<ServiceInstance>,
+        new_services: &'a Vec<ServiceInstance>,
+    ) -> (Vec<&'a ServiceInstance>, Vec<&'a ServiceInstance>) {
+        let new_hosts_map: HashMap<String, &ServiceInstance> = new_services
+            .iter()
+            .map(|hosts| (hosts.ip_and_port(), hosts))
+            .collect();
+
+        let old_hosts_map: HashMap<String, &ServiceInstance> = old_service
+            .iter()
+            .map(|hosts| (hosts.ip_and_port(), hosts))
+            .collect();
+
+        let mut add_hosts = Vec::<&ServiceInstance>::new();
+        let mut removed_hosts = Vec::<&ServiceInstance>::new();
+
+        for (key, new_host) in new_hosts_map.iter() {
+            let old_host = old_hosts_map.get(key);
+            match old_host {
+                None => {
+                    add_hosts.push(*new_host);
+                }
+                Some(old_host) => {
+                    if !old_host.is_same_instance(new_host) {
+                        removed_hosts.push(*old_host);
+                        add_hosts.push(*new_host);
+                    }
+                }
+            }
+        }
+
+        for (key, old_host) in old_hosts_map.iter() {
+            let new_host = new_hosts_map.get(key);
+            match new_host {
+                None => {
+                    removed_hosts.push(*old_host);
+                }
+                Some(_) => {}
+            }
+        }
+
+        (removed_hosts, add_hosts)
+    }
+}
+
+#[async_trait]
+impl Registry for NacosRegistry {
+    async fn register(&self, url: Url) -> Result<(), StdError> {
+        let service_name = NacosServiceName::new(&url);
+
+        let group_name = service_name.group();
+
+        let registry_service_name_str = service_name.value();
+
+        let service_instance = Self::create_nacos_service_instance(&url);
+
+        self.nacos_service
+            .register_instance(
+                registry_service_name_str.to_owned(),
+                Some(group_name.to_owned()),
+                service_instance,
+            )
+            .await?;
+
+        Ok(())
+    }
+
+    async fn unregister(&self, url: Url) -> Result<(), StdError> {
+        let service_name = NacosServiceName::new(&url);
+
+        let group_name = service_name.group();
+
+        let registry_service_name_str = service_name.value();
+
+        let service_instance = Self::create_nacos_service_instance(&url);
+
+        self.nacos_service
+            .deregister_instance(
+                registry_service_name_str.to_owned(),
+                Some(group_name.to_owned()),
+                service_instance,
+            )
+            .await?;
+
+        Ok(())
+    }
+
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+        let service_name = NacosServiceName::new(&url);
+
+        let group_name = service_name.group().to_owned();
+
+        let registry_service_name_str = service_name.value().to_owned();
+
+        let all_instance = self
+            .nacos_service
+            .get_all_instances(
+                registry_service_name_str.to_owned(),
+                Some(group_name.to_owned()),
+                Vec::default(),
+                false,
+            )
+            .await?;
+
+        let (tx, rx) = mpsc::channel(1024);
+
+        let (event_listener, mut listener_change_rx, closed) = NacosNamingEventListener::new();
+        let event_listener = Arc::new(event_listener);
+
+        let nacos_service_cloned = self.nacos_service.clone();
+        let event_listener_cloned = event_listener.clone();
+        let registry_service_name_str_clone = registry_service_name_str.clone();
+        let group_name_clone = group_name.clone();
+        tokio::spawn(async move {
+            let mut current_instances = all_instance;
+            for instance in &current_instances {
+                let url = instance_to_url(instance).as_str().to_owned();
+                let _ = tx.send(Ok(ServiceChange::Insert(url, ()))).await;
+            }
+
+            loop {
+                let change = tokio::select! {
+                    _ = closed.notified() => {
+                        break;
+                    },
+                    change = listener_change_rx.changed() => change
+                };
+
+                if change.is_err() {
+                    break;
+                }
+
+                let change = listener_change_rx.borrow_and_update().clone();
+                if change.is_empty() {
+                    continue;
+                }
+                let (remove_instances, add_instances) = Self::diff(&current_instances, &change);
+
+                for remove_instance in remove_instances {
+                    let url = instance_to_url(remove_instance).as_str().to_owned();
+                    let Ok(_) = tx.send(Ok(ServiceChange::Remove(url))).await else {
+                        break;
+                    };
+                }
+
+                for add_instance in add_instances {
+                    let url = instance_to_url(add_instance).as_str().to_owned();
+                    let Ok(_) = tx.send(Ok(ServiceChange::Insert(url, ()))).await else {
+                        break;
+                    };
+                }
+
+                current_instances = change;
+            }
+
+            info!("unsubscribe");
+            let _ = nacos_service_cloned
+                .unsubscribe(
+                    registry_service_name_str_clone,
+                    Some(group_name_clone),
+                    Vec::default(),
+                    event_listener_cloned,
+                )
+                .await;
+        });
+
+        let _ = self
+            .nacos_service
+            .subscribe(
+                registry_service_name_str,
+                Some(group_name),
+                Vec::default(),
+                event_listener,
+            )
+            .await?;
+
+        Ok(rx)
+    }
+
+    async fn unsubscribe(&self, _: Url) -> Result<(), StdError> {
+        Ok(())
+    }
+
+    fn url(&self) -> &Url {
+        &self.url
+    }
+}
+
+#[async_trait]
+impl Extension for NacosRegistry {
+    type Target = Box<dyn Registry + Send + Sync + 'static>;
+
+    fn name() -> String {
+        "nacos".to_string()
+    }
+
+    async fn create(url: Url) -> Result<Self::Target, StdError> {
+        // url example:
+        // extension://0.0.0.0?extension-type=registry&extension-name=nacos&registry=nacos://127.0.0.1:8848
+        let registry_url = url.query::<RegistryUrl>().unwrap();
+        let registry_url = registry_url.value();
+
+        let host = registry_url.host().unwrap();
+        let port = registry_url.port().unwrap_or(8848);
+
+        let nacos_server_addr = format!("{}:{}", host, port);
+
+        let namespace = registry_url.query::<ServiceNamespace>().unwrap_or_default();
+        let namespace = namespace.value();
+
+        let app_name = registry_url.query::<AppName>().unwrap_or_default();
+        let app_name = app_name.value();
+
+        let user_name = registry_url.username();
+        let password = registry_url.password().unwrap_or_default();
+
+        let nacos_client_props = ClientProps::new()
+            .server_addr(nacos_server_addr)
+            .namespace(namespace)
+            .app_name(app_name)
+            .auth_username(user_name)
+            .auth_password(password);
 
         let mut nacos_naming_builder = NamingServiceBuilder::new(nacos_client_props);
 
-        if enable_auth {
+        if !user_name.is_empty() {
             nacos_naming_builder = nacos_naming_builder.enable_auth_plugin_http();
         }
 
         let nacos_naming_service = nacos_naming_builder.build().unwrap();
 
-        Self {
-            nacos_naming_service: Arc::new(nacos_naming_service),
-            listeners: Mutex::new(HashMap::new()),
-        }
-    }
+        let nacos_registry = NacosRegistry::new(registry_url, Arc::new(nacos_naming_service));
 
-    #[allow(dead_code)]
-    fn get_subscribe_service_names(&self, service_name: &NacosServiceName) -> HashSet<String> {
-        if service_name.is_concrete() {
-            let mut set = HashSet::new();
-            let service_subscribe_name = service_name.to_subscriber_str();
-            let service_subscriber_legacy_name = service_name.to_subscriber_legacy_string();
-            if service_subscribe_name.eq(&service_subscriber_legacy_name) {
-                set.insert(service_subscribe_name);
-            } else {
-                set.insert(service_subscribe_name);
-                set.insert(service_subscriber_legacy_name);
-            }
-
-            set
-        } else {
-            let list_view = self.nacos_naming_service.get_service_list(
-                1,
-                i32::MAX,
-                Some(
-                    service_name
-                        .get_group_with_default(DEFAULT_GROUP)
-                        .to_string(),
-                ),
-            );
-            if let Err(e) = list_view {
-                error!("list service instances occur an error: {:?}", e);
-                return HashSet::default();
-            }
-
-            let list_view = list_view.unwrap();
-            let set: HashSet<String> = list_view
-                .0
-                .into_iter()
-                .filter(|service_name| service_name.split(SERVICE_NAME_SEPARATOR).count() == 4)
-                .map(|service_name| NacosServiceName::from_service_name_str(&service_name))
-                .filter(|other_service_name| service_name.is_compatible(other_service_name))
-                .map(|service_name| service_name.to_subscriber_str())
-                .collect();
-            set
-        }
+        Ok(Box::new(nacos_registry))
     }
 }
 
-impl NacosRegistry {
-    fn create_nacos_service_instance(url: Url) -> ServiceInstance {
-        let ip = url.ip;
-        let port = url.port;
-        nacos_sdk::api::naming::ServiceInstance {
-            ip,
-            port: port.parse().unwrap(),
-            metadata: url.params,
-            ..Default::default()
-        }
+fn instance_to_url(instance: &ServiceInstance) -> Url {
+    let mut url = Url::empty();
+    url.set_protocol("provider");
+    url.set_host(instance.ip());
+    url.set_port(instance.port().try_into().unwrap_or_default());
+    url.extend_pairs(
+        instance
+            .metadata()
+            .iter()
+            .map(|(k, v)| (k.clone(), v.clone())),
+    );
+
+    url
+}
+
+struct NacosNamingEventListener {
+    tx: watch::Sender<Vec<ServiceInstance>>,
+    closed: Arc<Notify>,
+}
+
+impl NacosNamingEventListener {
+    fn new() -> (Self, watch::Receiver<Vec<ServiceInstance>>, Arc<Notify>) {
+        let (tx, rx) = watch::channel(Vec::new());
+
+        let closed = Arc::new(Notify::new());
+        let this = Self {
+            tx,
+            closed: closed.clone(),
+        };
+        (this, rx, closed)
     }
 }
 
-impl Registry for NacosRegistry {
-    fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
-        let side = url.get_param(SIDE_KEY).unwrap_or_default();
-        let register_consumer = url
-            .get_param(REGISTER_CONSUMER_URL_KEY)
-            .unwrap_or_else(|| false.to_string())
-            .parse::<bool>()
-            .unwrap_or(false);
-        if side.ne(PROVIDER_SIDE) && !register_consumer {
-            warn!("Please set 'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url registration.");
-            return Ok(());
-        }
-
-        let nacos_service_name = NacosServiceName::new(&url);
-
-        let group_name = Some(
-            nacos_service_name
-                .get_group_with_default(DEFAULT_GROUP)
-                .to_string(),
-        );
-        let nacos_service_name = nacos_service_name.to_register_str();
-
-        let nacos_service_instance = Self::create_nacos_service_instance(url);
-
-        info!("register service: {}", nacos_service_name);
-        let ret = self.nacos_naming_service.register_instance(
-            nacos_service_name,
-            group_name,
-            nacos_service_instance,
-        );
-        if let Err(e) = ret {
-            error!("register to nacos occur an error: {:?}", e);
-            return Err(anyhow!("register to nacos occur an error: {:?}", e).into());
-        }
-
-        Ok(())
-    }
-
-    fn unregister(&mut self, url: Url) -> Result<(), dubbo::StdError> {
-        let nacos_service_name = NacosServiceName::new(&url);
-
-        let group_name = Some(
-            nacos_service_name
-                .get_group_with_default(DEFAULT_GROUP)
-                .to_string(),
-        );
-        let nacos_service_name = nacos_service_name.to_register_str();
-
-        let nacos_service_instance = Self::create_nacos_service_instance(url);
-
-        info!("deregister service: {}", nacos_service_name);
-
-        let ret = self.nacos_naming_service.deregister_instance(
-            nacos_service_name,
-            group_name,
-            nacos_service_instance,
-        );
-        if let Err(e) = ret {
-            error!("deregister service from nacos occur an error: {:?}", e);
-            return Err(anyhow!("deregister service from nacos occur an error: {:?}", e).into());
-        }
-        Ok(())
-    }
-
-    fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), dubbo::StdError> {
-        let service_name = NacosServiceName::new(&url);
-        let url_str = url.to_url();
-
-        info!("subscribe: {}", &url_str);
-
-        let nacos_listener: Arc<NotifyListenerWrapper> = {
-            let listeners = self.listeners.lock();
-            if let Err(e) = listeners {
-                error!("subscribe service failed: {:?}", e);
-                return Err(anyhow!("subscribe service failed: {:?}", e).into());
+impl NamingEventListener for NacosNamingEventListener {
+    fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
+        match event.instances {
+            Some(ref instances) => {
+                let instances = instances.clone();
+                let send = self.tx.send(instances);
+                match send {
+                    Ok(_) => {}
+                    Err(_) => {
+                        self.closed.notify_waiters();
+                    }
+                }
             }
-
-            let mut listeners = listeners.unwrap();
-            let listener_set = listeners.get_mut(url_str.as_str());
-
-            let wrapper = Arc::new(NotifyListenerWrapper(listener));
-            if let Some(listener_set) = listener_set {
-                listener_set.insert(wrapper.clone());
-            } else {
-                let mut hash_set = HashSet::new();
-                hash_set.insert(wrapper.clone());
-                listeners.insert(url_str, hash_set);
-            }
-
-            wrapper
-        };
-
-        let ret = self.nacos_naming_service.subscribe(
-            service_name.to_subscriber_str(),
-            Some(
-                service_name
-                    .get_group_with_default(DEFAULT_GROUP)
-                    .to_string(),
-            ),
-            Vec::new(),
-            nacos_listener,
-        );
-
-        if let Err(e) = ret {
-            error!("subscribe service failed: {:?}", e);
-            return Err(anyhow!("subscribe service failed: {:?}", e).into());
+            None => {}
         }
-
-        Ok(())
-    }
-
-    fn unsubscribe(
-        &self,
-        url: Url,
-        listener: RegistryNotifyListener,
-    ) -> Result<(), dubbo::StdError> {
-        let service_name = NacosServiceName::new(&url);
-        let url_str = url.to_url();
-        info!("unsubscribe: {}", &url_str);
-
-        let nacos_listener: Arc<NotifyListenerWrapper> = {
-            let listeners = self.listeners.lock();
-            if let Err(e) = listeners {
-                error!("unsubscribe service failed: {:?}", e);
-                return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
-            }
-
-            let mut listeners = listeners.unwrap();
-            let listener_set = listeners.get_mut(url_str.as_str());
-            if listener_set.is_none() {
-                return Ok(());
-            }
-
-            let listener_set = listener_set.unwrap();
-
-            let listener = Arc::new(NotifyListenerWrapper(listener));
-            let listener = listener_set.take(&listener);
-            if listener.is_none() {
-                return Ok(());
-            }
-
-            listener.unwrap()
-        };
-
-        let ret = self.nacos_naming_service.unsubscribe(
-            service_name.to_subscriber_str(),
-            Some(
-                service_name
-                    .get_group_with_default(DEFAULT_GROUP)
-                    .to_string(),
-            ),
-            Vec::new(),
-            nacos_listener,
-        );
-
-        if let Err(e) = ret {
-            error!("unsubscribe service failed: {:?}", e);
-            return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
-        }
-
-        Ok(())
     }
 }
 
 struct NacosServiceName {
+    #[allow(dead_code)]
     category: String,
 
-    service_interface: String,
+    #[allow(dead_code)]
+    interface: String,
 
+    #[allow(dead_code)]
     version: String,
 
+    #[allow(dead_code)]
     group: String,
+
+    #[allow(dead_code)]
+    value: String,
 }
 
 impl NacosServiceName {
-    fn new(url: &Url) -> NacosServiceName {
-        let service_interface = url.get_service_name();
+    fn new(url: &Url) -> Self {
+        let interface = url.query::<InterfaceName>().unwrap();
+        let interface = interface.value();
 
-        let category = url.get_param(CATEGORY_KEY).unwrap_or_default();
+        let category = url.query::<Category>().unwrap_or_default();
+        let category = category.value();
 
-        let version = url.get_param(VERSION_KEY).unwrap_or_default();
+        let version = url.query::<Version>().unwrap_or_default();
+        let version = version.value();
 
-        let group = url.get_param(GROUP_KEY).unwrap_or_default();
+        let group = url.query::<Group>().unwrap_or_default();
+        let group = group.value();
+
+        let value = format!("{}:{}:{}:{}", category, interface, version, group);
 
         Self {
             category,
-            service_interface: service_interface.clone(),
+            interface,
             version,
             group,
-        }
-    }
-
-    #[allow(dead_code)]
-    fn from_service_name_str(service_name_str: &str) -> Self {
-        let mut splitter = service_name_str.split(SERVICE_NAME_SEPARATOR);
-
-        let category = splitter.next().unwrap_or_default().to_string();
-        let service_interface = splitter.next().unwrap_or_default().to_string();
-        let version = splitter.next().unwrap_or_default().to_string();
-        let group = splitter.next().unwrap_or_default().to_string();
-
-        Self {
-            category,
-            service_interface,
-            version,
-            group,
-        }
-    }
-
-    #[allow(dead_code)]
-    fn version(&self) -> &str {
-        &self.version
-    }
-
-    #[allow(dead_code)]
-    fn get_version_with_default<'a>(&'a self, default: &'a str) -> &str {
-        if self.version.is_empty() {
-            default
-        } else {
-            &self.version
-        }
-    }
-
-    #[allow(dead_code)]
-    fn group(&self) -> &str {
-        &self.group
-    }
-
-    fn get_group_with_default<'a>(&'a self, default: &'a str) -> &str {
-        if self.group.is_empty() {
-            default
-        } else {
-            &self.group
+            value,
         }
     }
 
@@ -379,157 +396,23 @@
     }
 
     #[allow(dead_code)]
-    fn get_category_with_default<'a>(&'a self, default: &'a str) -> &str {
-        if self.category.is_empty() {
-            default
-        } else {
-            &self.category
-        }
+    fn interface(&self) -> &str {
+        &self.interface
     }
 
     #[allow(dead_code)]
-    fn service_interface(&self) -> &str {
-        &self.service_interface
+    fn version(&self) -> &str {
+        &self.version
     }
 
     #[allow(dead_code)]
-    fn get_service_interface_with_default<'a>(&'a self, default: &'a str) -> &str {
-        if self.service_interface.is_empty() {
-            default
-        } else {
-            &self.service_interface
-        }
-    }
-
-    fn to_register_str(&self) -> String {
-        let category = if self.category.is_empty() {
-            DEFAULT_CATEGORY
-        } else {
-            &self.category
-        };
-        format!(
-            "{}:{}:{}:{}",
-            category, self.service_interface, self.version, self.group
-        )
-    }
-
-    fn to_subscriber_str(&self) -> String {
-        let category = if is_concrete_str(&self.service_interface) {
-            DEFAULT_CATEGORY
-        } else {
-            &self.category
-        };
-
-        format!(
-            "{}:{}:{}:{}",
-            category, self.service_interface, self.version, self.group
-        )
+    fn group(&self) -> &str {
+        &self.group
     }
 
     #[allow(dead_code)]
-    fn to_subscriber_legacy_string(&self) -> String {
-        let mut legacy_string = DEFAULT_CATEGORY.to_owned();
-        if !self.service_interface.is_empty() {
-            legacy_string.push_str(SERVICE_NAME_SEPARATOR);
-            legacy_string.push_str(&self.service_interface);
-        }
-
-        if !self.version.is_empty() {
-            legacy_string.push_str(SERVICE_NAME_SEPARATOR);
-            legacy_string.push_str(&self.version);
-        }
-
-        if !self.group.is_empty() {
-            legacy_string.push_str(SERVICE_NAME_SEPARATOR);
-            legacy_string.push_str(&self.group);
-        }
-
-        legacy_string
-    }
-
-    #[allow(dead_code)]
-    fn is_concrete(&self) -> bool {
-        is_concrete_str(&self.service_interface)
-            && is_concrete_str(&self.version)
-            && is_concrete_str(&self.group)
-    }
-
-    #[allow(dead_code)]
-    fn is_compatible(&self, other: &NacosServiceName) -> bool {
-        if !other.is_concrete() {
-            return false;
-        }
-
-        if !self.category.eq(&other.category) && !match_range(&self.category, &other.category) {
-            return false;
-        }
-
-        if is_wildcard_str(&self.version) {
-            return true;
-        }
-
-        if is_wildcard_str(&self.group) {
-            return true;
-        }
-
-        if !&self.version.eq(&other.version) && !match_range(&self.version, &other.version) {
-            return false;
-        }
-
-        if !self.group.eq(&other.group) && !match_range(&self.group, &other.group) {
-            return false;
-        }
-
-        true
-    }
-}
-
-struct NotifyListenerWrapper(Arc<dyn NotifyListener + Sync + Send + 'static>);
-
-impl std::hash::Hash for NotifyListenerWrapper {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        let ptr = self.0.as_ref();
-        std::ptr::hash(ptr, state);
-    }
-}
-
-impl PartialEq for NotifyListenerWrapper {
-    fn eq(&self, other: &Self) -> bool {
-        let self_ptr = self.0.as_ref() as *const dyn NotifyListener;
-        let other_ptr = other.0.as_ref() as *const dyn NotifyListener;
-
-        let (self_data_ptr, _): (*const u8, *const u8) = unsafe { std::mem::transmute(self_ptr) };
-
-        let (other_data_ptr, _): (*const u8, *const u8) = unsafe { std::mem::transmute(other_ptr) };
-        self_data_ptr == other_data_ptr
-    }
-}
-
-impl Eq for NotifyListenerWrapper {}
-
-impl nacos_sdk::api::naming::NamingEventListener for NotifyListenerWrapper {
-    fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
-        let service_name = event.service_name.clone();
-        let instances = event.instances.as_ref();
-        let urls: Vec<Url>;
-        if let Some(instances) = instances {
-            urls = instances
-                .iter()
-                .filter_map(|data| {
-                    let url_str =
-                        format!("triple://{}:{}/{}", data.ip(), data.port(), service_name);
-                    Url::from_url(&url_str)
-                })
-                .collect();
-        } else {
-            urls = Vec::new();
-        }
-        let notify_event = ServiceEvent {
-            key: service_name,
-            action: String::from("CHANGE"),
-            service: urls,
-        };
-        self.0.notify(notify_event);
+    fn value(&self) -> &str {
+        &self.value
     }
 }
 
@@ -538,14 +421,16 @@
 
     use core::time;
     use std::thread;
+    use tracing::error;
 
+    use dubbo_base::{extension_param::ExtensionName, registry_param::Side};
     use tracing::metadata::LevelFilter;
 
     use super::*;
 
-    #[test]
+    #[tokio::test]
     #[ignore]
-    pub fn test_register_to_nacos() {
+    pub async fn test_register_to_nacos() {
         tracing_subscriber::fmt()
             .with_thread_names(true)
             .with_file(true)
@@ -555,15 +440,19 @@
             .with_max_level(LevelFilter::DEBUG)
             .init();
 
-        let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
-        let mut registry = NacosRegistry::new(nacos_registry_url);
+        let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+            .parse()
+            .unwrap();
+        extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+        extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
 
-        let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
-        service_url
-            .params
-            .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+        let registry = NacosRegistry::create(extension_url).await.unwrap();
 
-        let ret = registry.register(service_url);
+        let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807".parse().unwrap();
+
+        service_url.add_query_param(Side::Provider);
+
+        let ret = registry.register(service_url).await;
 
         info!("register result: {:?}", ret);
 
@@ -571,9 +460,9 @@
         thread::sleep(sleep_millis);
     }
 
-    #[test]
+    #[tokio::test]
     #[ignore]
-    pub fn test_register_and_unregister() {
+    pub async fn test_register_and_unregister() {
         tracing_subscriber::fmt()
             .with_thread_names(true)
             .with_file(true)
@@ -583,23 +472,27 @@
             .with_max_level(LevelFilter::DEBUG)
             .init();
 
-        let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
-        let mut registry = NacosRegistry::new(nacos_registry_url);
+        let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+            .parse()
+            .unwrap();
+        extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+        extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
 
-        let mut service_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
-        service_url
-            .params
-            .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+        let registry = NacosRegistry::create(extension_url).await.unwrap();
 
-        let ret = registry.register(service_url);
+        let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807".parse().unwrap();
+
+        service_url.add_query_param(Side::Provider);
+
+        let ret = registry.register(service_url).await;
 
         info!("register result: {:?}", ret);
 
         let sleep_millis = time::Duration::from_secs(10);
         thread::sleep(sleep_millis);
 
-        let unregister_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
-        let ret = registry.unregister(unregister_url);
+        let unregister_url = "tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807".parse().unwrap();
+        let ret = registry.unregister(unregister_url).await;
 
         info!("deregister result: {:?}", ret);
 
@@ -607,20 +500,9 @@
         thread::sleep(sleep_millis);
     }
 
-    struct TestNotifyListener;
-    impl NotifyListener for TestNotifyListener {
-        fn notify(&self, event: ServiceEvent) {
-            info!("notified: {:?}", event.key);
-        }
-
-        fn notify_all(&self, event: ServiceEvent) {
-            info!("notify_all: {:?}", event.key);
-        }
-    }
-
-    #[test]
+    #[tokio::test]
     #[ignore]
-    fn test_subscribe() {
+    pub async fn test_subscribe() {
         tracing_subscriber::fmt()
             .with_thread_names(true)
             .with_file(true)
@@ -630,34 +512,41 @@
             .with_max_level(LevelFilter::DEBUG)
             .init();
 
-        let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
-        let mut registry = NacosRegistry::new(nacos_registry_url);
+        let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+            .parse()
+            .unwrap();
+        extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+        extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
 
-        let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
-        service_url
-            .params
-            .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+        let registry = NacosRegistry::create(extension_url).await.unwrap();
 
-        let ret = registry.register(service_url);
+        let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807".parse().unwrap();
+
+        service_url.add_query_param(Side::Provider);
+
+        let ret = registry.register(service_url).await;
 
         info!("register result: {:?}", ret);
 
-        let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+        let subscribe_url = "consumer://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider".parse().unwrap();
+        let subscribe_ret = registry.subscribe(subscribe_url).await;
 
-        let ret = registry.subscribe(subscribe_url, Arc::new(TestNotifyListener));
-
-        if let Err(e) = ret {
+        if let Err(e) = subscribe_ret {
             error!("error message: {:?}", e);
             return;
         }
 
+        let mut rx = subscribe_ret.unwrap();
+        let change = rx.recv().await;
+        info!("receive change: {:?}", change);
+
         let sleep_millis = time::Duration::from_secs(300);
         thread::sleep(sleep_millis);
     }
 
-    #[test]
+    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
     #[ignore]
-    fn test_unsubscribe() {
+    pub async fn test_unsubscribe() {
         tracing_subscriber::fmt()
             .with_thread_names(true)
             .with_file(true)
@@ -667,39 +556,39 @@
             .with_max_level(LevelFilter::DEBUG)
             .init();
 
-        let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
-        let mut registry = NacosRegistry::new(nacos_registry_url);
+        let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry"
+            .parse()
+            .unwrap();
+        extension_url.add_query_param(ExtensionName::new("nacos".to_string()));
+        extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap()));
 
-        let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
-        service_url
-            .params
-            .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
+        let registry = NacosRegistry::create(extension_url).await.unwrap();
 
-        let ret = registry.register(service_url);
+        let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807".parse().unwrap();
+
+        service_url.add_query_param(Side::Provider);
+
+        let ret = registry.register(service_url).await;
 
         info!("register result: {:?}", ret);
 
-        let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+        let subscribe_url = "provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider".parse().unwrap();
 
-        let listener = Arc::new(TestNotifyListener);
-
-        let ret = registry.subscribe(subscribe_url, listener.clone());
+        let ret = registry.subscribe(subscribe_url).await;
 
         if let Err(e) = ret {
             error!("error message: {:?}", e);
             return;
         }
 
+        let mut rx = ret.unwrap();
+        let change = rx.recv().await;
+        info!("receive change: {:?}", change);
+
         let sleep_millis = time::Duration::from_secs(40);
         thread::sleep(sleep_millis);
 
-        let unsubscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
-        let ret = registry.unsubscribe(unsubscribe_url, listener.clone());
-
-        if let Err(e) = ret {
-            error!("error message: {:?}", e);
-            return;
-        }
+        drop(rx);
 
         let sleep_millis = time::Duration::from_secs(40);
         thread::sleep(sleep_millis);
diff --git a/registry/nacos/src/utils/mod.rs b/registry/nacos/src/utils/mod.rs
deleted file mode 100644
index b247f60..0000000
--- a/registry/nacos/src/utils/mod.rs
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use dubbo_base::Url;
-use nacos_sdk::api::props::ClientProps;
-
-const APP_NAME_KEY: &str = "AppName";
-
-const UNKNOWN_APP: &str = "UnknownApp";
-
-const NAMESPACE_KEY: &str = "namespace";
-
-const DEFAULT_NAMESPACE: &str = "public";
-
-const USERNAME_KEY: &str = "username";
-
-const PASSWORD_KEY: &str = "password";
-
-const BACKUP_KEY: &str = "backup";
-
-const WILDCARD: &str = "*";
-
-const RANGE_STR_SEPARATOR: &str = ",";
-
-pub(crate) fn build_nacos_client_props(url: &Url) -> (nacos_sdk::api::props::ClientProps, bool) {
-    let host = &url.ip;
-    let port = &url.port;
-    let backup = url
-        .get_param(BACKUP_KEY)
-        .map(|mut data| {
-            data.insert(0, ',');
-            data
-        })
-        .unwrap_or_default();
-    let server_addr = format!("{}:{}{}", host, port, backup);
-
-    let namespace = url
-        .get_param(NAMESPACE_KEY)
-        .unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
-    let app_name = url
-        .get_param(APP_NAME_KEY)
-        .unwrap_or_else(|| UNKNOWN_APP.to_string());
-    let username = url.get_param(USERNAME_KEY).unwrap_or_default();
-    let password = url.get_param(PASSWORD_KEY).unwrap_or_default();
-
-    let enable_auth = !password.is_empty() && !username.is_empty();
-
-    // todo ext parameters
-
-    let mut client_props = ClientProps::new();
-
-    client_props = client_props
-        .server_addr(server_addr)
-        .namespace(namespace)
-        .app_name(app_name)
-        .auth_username(username)
-        .auth_password(password);
-
-    (client_props, enable_auth)
-}
-
-pub(crate) fn is_wildcard_str(str: &str) -> bool {
-    str.eq(WILDCARD)
-}
-
-pub(crate) fn is_range_str(str: &str) -> bool {
-    let ret = str.split(RANGE_STR_SEPARATOR);
-    let count = ret.count();
-    count > 1
-}
-
-pub(crate) fn is_concrete_str(str: &str) -> bool {
-    !is_wildcard_str(str) && !is_range_str(str)
-}
-
-pub(crate) fn match_range(range: &str, value: &str) -> bool {
-    if range.is_empty() {
-        return true;
-    }
-
-    if !is_range_str(range) {
-        return false;
-    }
-
-    range
-        .split(RANGE_STR_SEPARATOR)
-        .any(|data| (*data).eq(value))
-}
diff --git a/registry/zookeeper/Cargo.toml b/registry/zookeeper/Cargo.toml
index 2df5499..ebcb269 100644
--- a/registry/zookeeper/Cargo.toml
+++ b/registry/zookeeper/Cargo.toml
@@ -9,10 +9,13 @@
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-zookeeper = "0.7.0"
+zookeeper = "0.8.0"
 dubbo.workspace = true
+anyhow.workspace = true
 serde_json.workspace = true
 serde = { workspace = true, features = ["derive"] }
 urlencoding.workspace = true
 dubbo-logger.workspace = true
 dubbo-base.workspace = true
+tokio.workspace = true
+async-trait.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index 6a6e94b..c9683bf 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -17,29 +17,20 @@
 
 #![allow(unused_variables, dead_code, missing_docs)]
 
-use std::{
-    collections::{HashMap, HashSet},
-    env,
-    sync::{Arc, Mutex, RwLock},
-    time::Duration,
-};
+use std::{collections::HashMap, env, sync::Arc, time::Duration};
 
+use async_trait::async_trait;
 use dubbo_base::{
     constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
-    Url,
+    StdError, Url,
 };
 use dubbo_logger::tracing::{debug, error, info};
 use serde::{Deserialize, Serialize};
-#[allow(unused_imports)]
-use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZkError, ZooKeeper};
+use tokio::{select, sync::mpsc};
+use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
 
-use dubbo::{
-    registry::{
-        memory_registry::MemoryRegistry, NotifyListener, Registry, RegistryNotifyListener,
-        ServiceEvent,
-    },
-    StdError,
-};
+use dubbo::extension::registry_extension::{DiscoverStream, Registry, ServiceChange};
+use dubbo_base::{registry_param::InterfaceName, url::UrlParam};
 
 // Get metadata of a service registration from a URL
 // rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
@@ -54,12 +45,9 @@
     }
 }
 
-//#[derive(Debug)]
 pub struct ZookeeperRegistry {
     root_path: String,
     zk_client: Arc<ZooKeeper>,
-    listeners: RwLock<HashMap<String, RegistryNotifyListener>>,
-    memory_registry: Arc<Mutex<MemoryRegistry>>,
 }
 
 #[derive(Serialize, Deserialize, Debug)]
@@ -91,24 +79,6 @@
         ZookeeperRegistry {
             root_path: "/services".to_string(),
             zk_client: Arc::new(zk_client),
-            listeners: RwLock::new(HashMap::new()),
-            memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
-        }
-    }
-
-    fn create_listener(
-        &self,
-        path: String,
-        service_name: String,
-        listener: RegistryNotifyListener,
-    ) -> ServiceInstancesChangedListener {
-        let mut service_names = HashSet::new();
-        service_names.insert(service_name.clone());
-        ServiceInstancesChangedListener {
-            zk_client: Arc::clone(&self.zk_client),
-            path,
-            service_name: service_name.clone(),
-            listener,
         }
     }
 
@@ -127,10 +97,6 @@
         s.to_string()
     }
 
-    pub fn get_client(&self) -> Arc<ZooKeeper> {
-        self.zk_client.clone()
-    }
-
     // If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned.
     pub fn create_path(
         &self,
@@ -153,8 +119,8 @@
         match zk_result {
             Ok(_) => Ok(()),
             Err(err) => {
-                error!("create path {} to zookeeper error {}", path, err);
-                Err(Box::try_from(err).unwrap())
+                error!("zk path {} parent not exists.", path);
+                Err(err.into())
             }
         }
     }
@@ -176,28 +142,12 @@
             current.push('/');
             current.push_str(node_key);
             if !self.exists_path(current.as_str()) {
-                let new_create_mode = match children == node_key {
-                    true => create_mode,
-                    false => CreateMode::Persistent,
-                };
-                let new_data = match children == node_key {
-                    true => data,
-                    false => "",
+                let (new_create_mode, new_data) = match children == node_key {
+                    true => (create_mode, data),
+                    false => (CreateMode::Persistent, ""),
                 };
 
-                //Skip ZkError::NodeExists
-                let res = self.create_path(current.as_str(), new_data, new_create_mode);
-                let mut node_exist = false;
-                if let Err(e) = &res {
-                    if let Some(zk_err) = e.downcast_ref::<ZkError>() {
-                        if ZkError::NodeExists == *zk_err {
-                            node_exist = true;
-                        }
-                    }
-                }
-                if !node_exist {
-                    return res;
-                }
+                self.create_path(current.as_str(), new_data, new_create_mode)?;
             }
         }
         Ok(())
@@ -205,7 +155,7 @@
 
     pub fn delete_path(&self, path: &str) {
         if self.exists_path(path) {
-            self.get_client().delete(path, None).unwrap()
+            self.zk_client.delete(path, None).unwrap()
         }
     }
 
@@ -225,6 +175,65 @@
             None
         }
     }
+
+    pub fn diff<'a>(
+        old_urls: &'a Vec<String>,
+        new_urls: &'a Vec<String>,
+    ) -> (Vec<String>, Vec<String>) {
+        let old_urls_map: HashMap<String, String> = old_urls
+            .iter()
+            .map(|url| url.parse())
+            .filter(|item| item.is_ok())
+            .map(|item| item.unwrap())
+            .map(|item: Url| {
+                let ip_port = item.authority().to_owned();
+                let url = item.as_str().to_owned();
+                (ip_port, url)
+            })
+            .collect();
+
+        let new_urls_map: HashMap<String, String> = new_urls
+            .iter()
+            .map(|url| url.parse())
+            .filter(|item| item.is_ok())
+            .map(|item| item.unwrap())
+            .map(|item: Url| {
+                let ip_port = item.authority().to_owned();
+                let url = item.as_str().to_owned();
+                (ip_port, url)
+            })
+            .collect();
+
+        let mut add_hosts = Vec::new();
+        let mut removed_hosts = Vec::new();
+
+        for (key, new_host) in new_urls_map.iter() {
+            let old_host = old_urls_map.get(key);
+            match old_host {
+                None => {
+                    add_hosts.push(new_host.clone());
+                }
+                Some(old_host) => {
+                    if !old_host.eq(new_host) {
+                        removed_hosts.push(old_host.clone());
+                        add_hosts.push(new_host.clone());
+                    }
+                }
+            }
+        }
+
+        for (key, old_host) in old_urls_map.iter() {
+            let new_host = old_urls_map.get(key);
+            match new_host {
+                None => {
+                    removed_hosts.push(old_host.clone());
+                }
+                Some(_) => {}
+            }
+        }
+
+        (removed_hosts, add_hosts)
+    }
 }
 
 impl Default for ZookeeperRegistry {
@@ -248,221 +257,188 @@
     }
 }
 
+#[async_trait]
 impl Registry for ZookeeperRegistry {
-    fn register(&mut self, url: Url) -> Result<(), StdError> {
+    async fn register(&self, url: Url) -> Result<(), StdError> {
         debug!("register url: {}", url);
+        let interface_name = url.query::<InterfaceName>().unwrap().value();
+        let url_str = url.as_str();
         let zk_path = format!(
             "/{}/{}/{}/{}",
-            DUBBO_KEY,
-            url.service_name,
-            PROVIDERS_KEY,
-            url.encoded_raw_url_string()
+            DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
         );
         self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
         Ok(())
     }
 
-    fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+    async fn unregister(&self, url: Url) -> Result<(), StdError> {
+        let interface_name = url.query::<InterfaceName>().unwrap().value();
+        let url_str = url.as_str();
+
         let zk_path = format!(
             "/{}/{}/{}/{}",
-            DUBBO_KEY,
-            url.service_name,
-            PROVIDERS_KEY,
-            url.encoded_raw_url_string()
+            DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str
         );
         self.delete_path(zk_path.as_str());
         Ok(())
     }
 
     // for consumer to find the changes of providers
-    fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
-        let service_name = url.get_service_name();
-        let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
-        if self
-            .listeners
-            .read()
-            .unwrap()
-            .get(service_name.as_str())
-            .is_some()
-        {
-            return Ok(());
-        }
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+        let interface_name = url.query::<InterfaceName>().unwrap().value();
 
-        self.listeners
-            .write()
-            .unwrap()
-            .insert(service_name.to_string(), listener.clone());
+        let zk_path = format!("/{}/{}/{}", DUBBO_KEY, interface_name, PROVIDERS_KEY);
 
-        let zk_listener =
-            self.create_listener(zk_path.clone(), service_name.to_string(), listener.clone());
+        debug!("subscribe service: {}", zk_path);
 
-        let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
-        let result = match zk_changed_paths {
-            Err(err) => {
-                error!("zk subscribe error: {}", err);
-                Vec::new()
+        let (listener, mut change_rx) = ZooKeeperListener::new();
+        let arc_listener = Arc::new(listener);
+
+        let watcher = ZooKeeperWatcher::new(arc_listener.clone(), zk_path.clone());
+
+        let (discover_tx, discover_rx) = mpsc::channel(64);
+
+        let zk_client_in_task = self.zk_client.clone();
+        let zk_path_in_task = zk_path.clone();
+        let interface_name_in_task = interface_name.clone();
+        let arc_listener_in_task = arc_listener.clone();
+        tokio::spawn(async move {
+            let zk_client = zk_client_in_task;
+            let zk_path = zk_path_in_task;
+            let interface_name = interface_name_in_task;
+            let listener = arc_listener_in_task;
+
+            let mut current_urls = Vec::new();
+
+            loop {
+                let changed = select! {
+                    _ = discover_tx.closed() => {
+                        info!("discover task quit, discover channel closed");
+                        None
+                    },
+                    changed = change_rx.recv() => {
+                        changed
+                    }
+                };
+
+                match changed {
+                    Some(_) => {
+                        let zookeeper_watcher =
+                            ZooKeeperWatcher::new(listener.clone(), zk_path.clone());
+
+                        match zk_client.get_children_w(&zk_path, zookeeper_watcher) {
+                            Ok(children) => {
+                                let (removed, add) =
+                                    ZookeeperRegistry::diff(&current_urls, &children);
+
+                                for url in removed {
+                                    match discover_tx
+                                        .send(Ok(ServiceChange::Remove(url.clone())))
+                                        .await
+                                    {
+                                        Ok(_) => {}
+                                        Err(e) => {
+                                            error!("send service change failed: {:?}, maybe user unsubscribe", e);
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                for url in add {
+                                    match discover_tx
+                                        .send(Ok(ServiceChange::Insert(url.clone(), ())))
+                                        .await
+                                    {
+                                        Ok(_) => {}
+                                        Err(e) => {
+                                            error!("send service change failed: {:?}, maybe user unsubscribe", e);
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                current_urls = children;
+                            }
+                            Err(err) => {
+                                error!("zk subscribe error: {}", err);
+                                break;
+                            }
+                        }
+                    }
+                    None => {
+                        error!("receive service change task quit, unsubscribe {}.", zk_path);
+                        break;
+                    }
+                }
             }
-            Ok(urls) => urls
-                .iter()
-                .map(|node_key| {
-                    let provider_url: Url = urlencoding::decode(node_key)
-                        .unwrap()
-                        .to_string()
-                        .as_str()
-                        .into();
-                    provider_url
-                })
-                .collect(),
-        };
-        info!("notifying {}->{:?}", service_name, result);
-        listener.notify(ServiceEvent {
-            key: service_name,
-            action: String::from("ADD"),
-            service: result,
+
+            debug!("unsubscribe service: {}", zk_path);
         });
+
+        arc_listener.changed(zk_path);
+
+        Ok(discover_rx)
+    }
+
+    async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+        let interface_name = url.query::<InterfaceName>().unwrap().value();
+
+        let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &interface_name, PROVIDERS_KEY);
+
+        info!("unsubscribe service: {}", zk_path);
         Ok(())
     }
 
-    fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
+    fn url(&self) -> &Url {
         todo!()
     }
 }
 
-pub struct ServiceInstancesChangedListener {
-    zk_client: Arc<ZooKeeper>,
+pub struct ZooKeeperListener {
+    tx: mpsc::Sender<String>,
+}
+
+impl ZooKeeperListener {
+    pub fn new() -> (ZooKeeperListener, mpsc::Receiver<String>) {
+        let (tx, rx) = mpsc::channel(64);
+        let this = ZooKeeperListener { tx };
+        (this, rx)
+    }
+
+    pub fn changed(&self, path: String) {
+        match self.tx.try_send(path) {
+            Ok(_) => {}
+            Err(err) => {
+                error!("send change list to listener occur an error: {}", err);
+                return;
+            }
+        }
+    }
+}
+
+pub struct ZooKeeperWatcher {
+    listener: Arc<ZooKeeperListener>,
     path: String,
-    service_name: String,
-    listener: RegistryNotifyListener,
 }
 
-impl Watcher for ServiceInstancesChangedListener {
+impl ZooKeeperWatcher {
+    pub fn new(listener: Arc<ZooKeeperListener>, path: String) -> ZooKeeperWatcher {
+        ZooKeeperWatcher { listener, path }
+    }
+}
+
+impl Watcher for ZooKeeperWatcher {
     fn handle(&self, event: WatchedEvent) {
-        if let (WatchedEventType::NodeChildrenChanged, Some(path)) = (event.event_type, event.path)
-        {
-            let event_path = path.clone();
-            let dirs = self
-                .zk_client
-                .get_children(&event_path, false)
-                .expect("msg");
-            let result: Vec<Url> = dirs
-                .iter()
-                .map(|node_key| {
-                    let provider_url: Url = node_key.as_str().into();
-                    provider_url
-                })
-                .collect();
-            let res = self.zk_client.get_children_w(
-                &path,
-                ServiceInstancesChangedListener {
-                    zk_client: Arc::clone(&self.zk_client),
-                    path: path.clone(),
-                    service_name: self.service_name.clone(),
-                    listener: Arc::clone(&self.listener),
-                },
-            );
-
-            info!("notify {}->{:?}", self.service_name, result);
-            self.listener.notify(ServiceEvent {
-                key: self.service_name.clone(),
-                action: String::from("ADD"),
-                service: result,
-            });
+        info!("receive zookeeper event: {:?}", event);
+        let event_type: WatchedEventType = event.event_type;
+        match event_type {
+            WatchedEventType::None => {
+                info!("event type is none, ignore it.");
+                return;
+            }
+            _ => {}
         }
-    }
-}
 
-impl NotifyListener for ServiceInstancesChangedListener {
-    fn notify(&self, event: ServiceEvent) {
-        self.listener.notify(event);
-    }
-
-    fn notify_all(&self, event: ServiceEvent) {
-        self.listener.notify(event);
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::sync::Arc;
-
-    use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
-
-    use crate::ZookeeperRegistry;
-
-    struct TestZkWatcher {
-        pub watcher: Arc<Option<TestZkWatcher>>,
-    }
-
-    impl Watcher for TestZkWatcher {
-        fn handle(&self, event: WatchedEvent) {
-            println!("event: {:?}", event);
-        }
-    }
-
-    #[test]
-    fn zk_read_write_watcher() {
-        // https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
-        // using ENV to set zookeeper server urls
-        let zkr = ZookeeperRegistry::default();
-        let zk_client = zkr.get_client();
-        let watcher = TestZkWatcher {
-            watcher: Arc::new(None),
-        };
-        if zk_client.exists("/test", true).is_err() {
-            zk_client
-                .create(
-                    "/test",
-                    vec![1, 3],
-                    Acl::open_unsafe().clone(),
-                    CreateMode::Ephemeral,
-                )
-                .unwrap();
-        }
-        let zk_res = zk_client.create(
-            "/test",
-            "hello".into(),
-            Acl::open_unsafe().clone(),
-            CreateMode::Ephemeral,
-        );
-        let result = zk_client.get_children_w("/test", watcher);
-        assert!(result.is_ok());
-        if zk_client.exists("/test/a", true).is_err() {
-            zk_client.delete("/test/a", None).unwrap();
-        }
-        if zk_client.exists("/test/a", true).is_err() {
-            zk_client.delete("/test/b", None).unwrap();
-        }
-        let zk_res = zk_client.create(
-            "/test/a",
-            "hello".into(),
-            Acl::open_unsafe().clone(),
-            CreateMode::Ephemeral,
-        );
-        let zk_res = zk_client.create(
-            "/test/b",
-            "world".into(),
-            Acl::open_unsafe().clone(),
-            CreateMode::Ephemeral,
-        );
-        let test_a_result = zk_client.get_data("/test", true);
-        assert!(test_a_result.is_ok());
-        let vec1 = test_a_result.unwrap().0;
-        // data in /test should equals to "hello"
-        assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
-        zk_client.close().unwrap()
-    }
-
-    #[test]
-    fn create_path_with_parent_check() {
-        let zkr = ZookeeperRegistry::default();
-        let path = "/du1bbo/test11111";
-        let data = "hello";
-        // creating a child on a not exists parent, throw a NoNode error.
-        // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
-        // assert!(result.is_err());
-        let create_with_parent_check_result =
-            zkr.create_path_with_parent_check(path, data, CreateMode::Ephemeral);
-        assert!(create_with_parent_check_result.is_ok());
-        assert_eq!(data, zkr.get_data(path, false).unwrap());
+        self.listener.changed(self.path.clone());
     }
 }
diff --git a/remoting/base/src/exchange/client.rs b/remoting/base/src/exchange/client.rs
index edbb4a6..71c42d4 100644
--- a/remoting/base/src/exchange/client.rs
+++ b/remoting/base/src/exchange/client.rs
@@ -51,7 +51,7 @@
     pub fn new(url: Url, client: BoxedClient, connection_timeout: Duration) -> Self {
         ExchangeClient {
             connection_timeout,
-            address: url.get_ip_port(),
+            address: url.authority().to_owned(),
             client: None,
             init: AtomicBool::new(false),
             active: AtomicI32::new(0),