load balance and service registry closed loop (#105)

* feat(cluster): loadbalance types

* feat(rpc): types alias for cluster invoker

* feat(cluster): integration

* feat(cluster): integration with examples

* feat(cluster.loadbalance): greeter example with default random loadbalance passed

* feat(cluster.loadbalance): completing roundrobin arithmetic and fixing compile warns.

* typo

* fix compile warns

* fix rustfmt check fails.

* fix cargo check fails(due to the use of nightly channel locally).

* fix: default yaml config parse failed in ci.

* ci actions zk test

* feat(registry): zk support

* feat(registry): zk support, connected to zk

* feat(registry): provider.services  key as service name

* feat(registry): serviceKey and configuration files, aligned to dubbo ecology

* feat(commons): tested Url impl

* feat(commons): tested Url impl

* feat(zk): interface service discovery

* feat(zk): create_path_with_parent_check

* feat(zk): export bug fixed.
diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml
index 56d2c11..304650d 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -45,10 +45,16 @@
           toolchain: stable
       - run: rustup component add rustfmt
       - run: cargo fmt --all -- --check
-
   example-greeter:
     name: example/greeter
     runs-on: ubuntu-latest
+    services:
+      zoo1:
+        image: zookeeper:3.8
+        ports:
+          - 2181:2181
+        env:
+          ZOO_MY_ID: 1
     steps:
       - uses: actions/checkout@main
       - uses: actions-rs/toolchain@v1
@@ -80,6 +86,9 @@
       - name: example greeter
         run: |
           ../../target/debug/greeter-server &
-          sleep 1s ;
+          sleep 3 
           ../../target/debug/greeter-client
+        env:
+          ZOOKEEPER_SERVERS: 127.0.0.1:2181
+          DUBBO_CONFIG_PATH: ./dubbo.yaml
         working-directory: examples/greeter
\ No newline at end of file
diff --git a/config/src/config.rs b/config/src/config.rs
index b82475c..828689c 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -17,6 +17,7 @@
 
 use std::{collections::HashMap, env, fs, sync::RwLock};
 
+use crate::protocol::Protocol;
 use lazy_static::lazy_static;
 use serde::{Deserialize, Serialize};
 
@@ -26,6 +27,8 @@
 
 pub const DUBBO_CONFIG_PATH: &str = "./dubbo.yaml";
 
+pub const DUBBO_CONFIG_PREFIX: &str = "dubbo";
+
 lazy_static! {
     pub static ref GLOBAL_ROOT_CONFIG: RwLock<Option<RootConfig>> = RwLock::new(None);
 }
@@ -35,16 +38,13 @@
 #[allow(dead_code)]
 #[derive(Debug, Default, Serialize, Deserialize, Clone)]
 pub struct RootConfig {
-    pub name: String,
+    #[serde(default)]
+    pub protocols: ProtocolConfig,
 
-    #[serde(skip_serializing, skip_deserializing)]
-    pub service: HashMap<String, ServiceConfig>,
-    pub protocols: HashMap<String, ProtocolConfig>,
-    pub registries: HashMap<String, String>,
-
+    #[serde(default)]
     pub provider: ProviderConfig,
 
-    #[serde(skip_serializing, skip_deserializing)]
+    #[serde(default)]
     pub data: HashMap<String, String>,
 }
 
@@ -66,10 +66,7 @@
 impl RootConfig {
     pub fn new() -> Self {
         Self {
-            name: "dubbo".to_string(),
-            service: HashMap::new(),
             protocols: HashMap::new(),
-            registries: HashMap::new(),
             provider: ProviderConfig::new(),
             data: HashMap::new(),
         }
@@ -93,12 +90,10 @@
 
         tracing::info!("current path: {:?}", env::current_dir());
         let data = fs::read(config_path)?;
-        let mut conf: RootConfig = serde_yaml::from_slice(&data).unwrap();
+        let conf: HashMap<String, RootConfig> = serde_yaml::from_slice(&data).unwrap();
+        let root_config: RootConfig = conf.get(DUBBO_CONFIG_PREFIX).unwrap().clone();
         tracing::debug!("origin config: {:?}", conf);
-        for (name, svc) in conf.service.iter_mut() {
-            svc.name = name.to_string();
-        }
-        Ok(conf)
+        Ok(root_config)
     }
 
     pub fn test_config(&mut self) {
@@ -108,37 +103,29 @@
 
         let service_config = ServiceConfig::default()
             .group("test".to_string())
-            .serializer("json".to_string())
             .version("1.0.0".to_string())
-            .protocol_names("triple".to_string())
-            .name("grpc.examples.echo.Echo".to_string());
+            .protocol("triple".to_string())
+            .interface("grpc.examples.echo.Echo".to_string());
 
-        let triple_config = ProtocolConfig::default()
-            .name("triple".to_string())
-            .ip("0.0.0.0".to_string())
-            .port("8888".to_string());
-
-        let service_config = service_config.add_protocol_configs(triple_config);
-        self.service
+        self.provider
+            .services
             .insert("grpc.examples.echo.Echo".to_string(), service_config);
-        self.service.insert(
+        self.provider.services.insert(
             "helloworld.Greeter".to_string(),
             ServiceConfig::default()
                 .group("test".to_string())
-                .serializer("json".to_string())
                 .version("1.0.0".to_string())
-                .name("helloworld.Greeter".to_string())
-                .protocol_names("triple".to_string()),
+                .interface("helloworld.Greeter".to_string())
+                .protocol("triple".to_string()),
         );
         self.protocols.insert(
             "triple".to_string(),
-            ProtocolConfig::default()
+            Protocol::default()
                 .name("triple".to_string())
                 .ip("0.0.0.0".to_string())
                 .port("8889".to_string()),
         );
 
-        provider.services = self.service.clone();
         self.provider = provider.clone();
         println!("provider config: {:?}", provider);
         // 通过环境变量读取某个文件。加在到内存中
diff --git a/config/src/protocol.rs b/config/src/protocol.rs
index 654f0d8..cdc357a 100644
--- a/config/src/protocol.rs
+++ b/config/src/protocol.rs
@@ -19,8 +19,10 @@
 
 use serde::{Deserialize, Serialize};
 
+pub const DEFAULT_PROTOCOL: &str = "triple";
+
 #[derive(Default, Debug, Clone, Serialize, Deserialize)]
-pub struct ProtocolConfig {
+pub struct Protocol {
     pub ip: String,
     pub port: String,
     pub name: String,
@@ -29,7 +31,14 @@
     pub params: HashMap<String, String>,
 }
 
-impl ProtocolConfig {
+pub type ProtocolConfig = HashMap<String, Protocol>;
+
+pub trait ProtocolRetrieve {
+    fn get_protocol(&self, protocol_key: &str) -> Option<Protocol>;
+    fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol;
+}
+
+impl Protocol {
     pub fn name(self, name: String) -> Self {
         Self { name, ..self }
     }
@@ -50,3 +59,28 @@
         format!("{}://{}:{}", self.name, self.ip, self.port)
     }
 }
+
+impl ProtocolRetrieve for ProtocolConfig {
+    fn get_protocol(&self, protocol_key: &str) -> Option<Protocol> {
+        let result = self.get(protocol_key);
+        if let Some(..) = result {
+            Some(result.unwrap().clone())
+        } else {
+            None
+        }
+    }
+
+    fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol {
+        let result = self.get_protocol(protocol_key);
+        if let Some(..) = result {
+            result.unwrap().clone()
+        } else {
+            let result = self.get_protocol(protocol_key);
+            if result.is_none() {
+                panic!("default triple protocol dose not defined.")
+            } else {
+                result.unwrap()
+            }
+        }
+    }
+}
diff --git a/config/src/provider.rs b/config/src/provider.rs
index b7bec78..ccb9cf8 100644
--- a/config/src/provider.rs
+++ b/config/src/provider.rs
@@ -23,10 +23,11 @@
 
 #[derive(Debug, Default, Serialize, Deserialize, Clone)]
 pub struct ProviderConfig {
+    #[serde(default)]
     pub registry_ids: Vec<String>,
-
+    #[serde(default)]
     pub protocol_ids: Vec<String>,
-
+    #[serde(default)]
     pub services: HashMap<String, ServiceConfig>,
 }
 
diff --git a/config/src/service.rs b/config/src/service.rs
index 809f560..1f85a92 100644
--- a/config/src/service.rs
+++ b/config/src/service.rs
@@ -15,30 +15,19 @@
  * limitations under the License.
  */
 
-use std::collections::HashMap;
-
 use serde::{Deserialize, Serialize};
 
-use super::protocol::ProtocolConfig;
-
 #[derive(Debug, Default, Serialize, Deserialize, Clone)]
 pub struct ServiceConfig {
     pub version: String,
     pub group: String,
-
-    #[serde(skip_serializing, skip_deserializing)]
-    pub name: String,
     pub protocol: String,
-    pub registry: String,
-    pub serializer: String,
-
-    // #[serde(skip_serializing, skip_deserializing)]
-    pub protocol_configs: HashMap<String, ProtocolConfig>,
+    pub interface: String,
 }
 
 impl ServiceConfig {
-    pub fn name(self, name: String) -> Self {
-        Self { name, ..self }
+    pub fn interface(self, interface: String) -> Self {
+        Self { interface, ..self }
     }
 
     pub fn version(self, version: String) -> Self {
@@ -49,20 +38,10 @@
         Self { group, ..self }
     }
 
-    pub fn protocol_names(self, protocol: String) -> Self {
+    pub fn protocol(self, protocol: String) -> Self {
         Self { protocol, ..self }
     }
 
-    pub fn serializer(self, serializer: String) -> Self {
-        Self { serializer, ..self }
-    }
-
-    pub fn add_protocol_configs(mut self, protocol_config: ProtocolConfig) -> Self {
-        self.protocol_configs
-            .insert(protocol_config.name.clone(), protocol_config);
-        Self { ..self }
-    }
-
     // pub fn get_url(&self) -> Vec<Url> {
     //     let mut urls = Vec::new();
     //     for (_, conf) in self.protocol_configs.iter() {
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 01cd89c..372e688 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -100,8 +100,8 @@
                     }
                 }
 
-                pub fn with_directory(mut self, directory: Box<dyn Directory>) -> Self {
-                    self.inner = self.inner.with_directory(directory);
+                pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
+                    self.inner = self.inner.with_cluster(invoker);
                     self
                 }
 
@@ -122,7 +122,7 @@
     let package = if emit_package { service.package() } else { "" };
 
     for method in service.methods() {
-        let service_unique_name= format!(
+        let service_unique_name = format!(
             "{}{}{}",
             package,
             if package.is_empty() { "" } else { "." },
@@ -139,7 +139,13 @@
         stream.extend(generate_doc_comments(method.comment()));
 
         let method = match (method.client_streaming(), method.server_streaming()) {
-            (false, false) => generate_unary(service_unique_name, &method, proto_path, compile_well_known_types, path),
+            (false, false) => generate_unary(
+                service_unique_name,
+                &method,
+                proto_path,
+                compile_well_known_types,
+                path,
+            ),
             (false, true) => {
                 generate_server_streaming(&method, proto_path, compile_well_known_types, path)
             }
@@ -174,7 +180,7 @@
         ) -> Result<Response<#response>, dubbo::status::Status> {
            let codec = #codec_name::<#request, #response>::default();
            let invocation = RpcInvocation::default()
-            .with_servie_unique_name(String::from(#service_unique_name))
+            .with_service_unique_name(String::from(#service_unique_name))
             .with_method_name(String::from(#method_name));
            let path = http::uri::PathAndQuery::from_static(#path);
            self.inner
@@ -182,7 +188,7 @@
                 request,
                 codec,
                 path,
-                invocation,
+                Arc::new(invocation),
             )
             .await
         }
diff --git a/dubbo.yaml b/dubbo.yaml
index f6e7aa9..7404c14 100644
--- a/dubbo.yaml
+++ b/dubbo.yaml
@@ -11,14 +11,16 @@
         ip: 0.0.0.0
         port: '8888'
         name: triple
-  # helloworld.Greeter:
-  #   version: 1.0.0
-  #   group: test
-  #   protocol: triple
-  #   registry: ''
-  #   serializer: json
+#   helloworld.Greeter:
+#     version: 1.0.0
+#     group: test
+#     protocol: triple
+#     registry: ''
+#     serializer: json
 protocols:
   triple:
     ip: 0.0.0.0
     port: '8888'
     name: triple
+
+
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 4161601..cda887f 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -33,5 +33,7 @@
 axum = "0.5.9"
 async-stream = "0.3"
 flate2 = "1.0"
+itertools = "0.10.1"
+urlencoding = "2.1.2"
 
 dubbo-config = {path = "../config", version = "0.2.0"}
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index 9f02386..b33bab9 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -24,8 +24,10 @@
 use crate::registry::memory_registry::MemoryNotifyListener;
 use crate::registry::{BoxRegistry, RegistryWrapper};
 
+pub type BoxDirectory = Box<dyn Directory>;
+
 pub trait Directory: Debug + DirectoryClone {
-    fn list(&self, invocation: RpcInvocation) -> Vec<Url>;
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Arc<Vec<Url>>;
 }
 
 pub trait DirectoryClone {
@@ -47,7 +49,7 @@
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct RegistryDirectory {
     registry: RegistryWrapper,
     service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
@@ -64,32 +66,37 @@
     }
 }
 
-impl DirectoryClone for RegistryDirectory {
-    fn clone_box(&self) -> Box<dyn Directory> {
-        todo!()
-    }
-}
+// impl DirectoryClone for RegistryDirectory {
+//     fn clone_box(&self) -> Box<dyn Directory> {
+//         todo!()
+//     }
+// }
 
 impl Directory for RegistryDirectory {
-    fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Arc<Vec<Url>> {
         let service_name = invocation.get_target_service_unique_name();
 
-        let url = Url::from_url(&format!(
-            "triple://{}:{}/{}",
-            "127.0.0.1", "8888", service_name
-        ))
-        .unwrap();
+        let url =
+            Url::from_url(&format!("tri://{}:{}/{}", "0.0.0.0", "8888", service_name)).unwrap();
 
-        self.registry.registry.as_ref().expect("msg").subscribe(
-            url,
-            MemoryNotifyListener {
-                service_instances: Arc::clone(&self.service_instances),
-            },
-        ).expect("subscribe");
+        self.registry
+            .registry
+            .as_ref()
+            .expect("msg")
+            .subscribe(
+                url,
+                MemoryNotifyListener {
+                    service_instances: Arc::clone(&self.service_instances),
+                },
+            )
+            .expect("subscribe");
 
-        let map = self.service_instances.read().expect("service_instances.read");
+        let map = self
+            .service_instances
+            .read()
+            .expect("service_instances.read");
         let binding = Vec::new();
         let url_vec = map.get(&service_name).unwrap_or(&binding);
-        url_vec.to_vec()
+        Arc::new(url_vec.to_vec())
     }
 }
diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/loadbalance/impls/mod.rs
new file mode 100644
index 0000000..eadbdfb
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/mod.rs
@@ -0,0 +1,2 @@
+pub mod random;
+pub mod roundrobin;
diff --git a/dubbo/src/cluster/loadbalance/impls/random.rs b/dubbo/src/cluster/loadbalance/impls/random.rs
new file mode 100644
index 0000000..d5305db
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/random.rs
@@ -0,0 +1,39 @@
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use crate::cluster::loadbalance::types::{LoadBalance, Metadata};
+use crate::codegen::RpcInvocation;
+use crate::common::url::Url;
+
+pub struct RandomLoadBalance {
+    pub metadata: Metadata,
+}
+
+impl Default for RandomLoadBalance {
+    fn default() -> Self {
+        RandomLoadBalance {
+            metadata: Metadata::new("random"),
+        }
+    }
+}
+
+impl Debug for RandomLoadBalance {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "RandomLoadBalance")
+    }
+}
+
+impl LoadBalance for RandomLoadBalance {
+    fn select(
+        &self,
+        invokers: Arc<Vec<Url>>,
+        _url: Option<Url>,
+        _invocation: Arc<RpcInvocation>,
+    ) -> Option<Url> {
+        if invokers.is_empty() {
+            return None;
+        }
+        let index = rand::random::<usize>() % invokers.len();
+        Some(invokers[index].clone())
+    }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
new file mode 100644
index 0000000..b3a0112
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
@@ -0,0 +1,63 @@
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, RwLock};
+
+use crate::cluster::loadbalance::types::{LoadBalance, Metadata};
+use crate::codegen::RpcInvocation;
+use crate::common::url::Url;
+
+pub struct RoundRobinLoadBalance {
+    pub metadata: Metadata,
+    pub counter_map: RwLock<HashMap<String, AtomicUsize>>,
+}
+
+impl Default for RoundRobinLoadBalance {
+    fn default() -> Self {
+        RoundRobinLoadBalance {
+            metadata: Metadata::new("roundrobin"),
+            counter_map: RwLock::new(HashMap::new()),
+        }
+    }
+}
+
+impl Debug for RoundRobinLoadBalance {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "RoundRobinLoadBalance")
+    }
+}
+
+impl RoundRobinLoadBalance {
+    fn guarantee_counter_key(&self, key: &str) {
+        let contained = self.counter_map.try_read().unwrap().contains_key(key);
+        if !contained {
+            self.counter_map
+                .try_write()
+                .unwrap()
+                .insert(key.to_string(), AtomicUsize::new(0));
+        }
+    }
+}
+
+impl LoadBalance for RoundRobinLoadBalance {
+    fn select(
+        &self,
+        invokers: Arc<Vec<Url>>,
+        _url: Option<Url>,
+        invocation: Arc<RpcInvocation>,
+    ) -> Option<Url> {
+        if invokers.is_empty() {
+            return None;
+        }
+        let fingerprint = invocation.unique_fingerprint();
+        self.guarantee_counter_key(fingerprint.as_str());
+        let index = self
+            .counter_map
+            .try_read()
+            .unwrap()
+            .get(fingerprint.as_str())?
+            .fetch_add(1, Ordering::SeqCst)
+            % invokers.len();
+        Some(invokers[index].clone())
+    }
+}
diff --git a/dubbo/src/cluster/loadbalance/mod.rs b/dubbo/src/cluster/loadbalance/mod.rs
new file mode 100644
index 0000000..dd08724
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/mod.rs
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::collections::HashMap;
+
+use lazy_static::lazy_static;
+
+use crate::cluster::loadbalance::impls::random::RandomLoadBalance;
+use crate::cluster::loadbalance::impls::roundrobin::RoundRobinLoadBalance;
+use crate::cluster::loadbalance::types::BoxLoadBalance;
+
+pub mod impls;
+pub mod types;
+
+lazy_static! {
+    pub static ref LOAD_BALANCE_EXTENSIONS: HashMap<String, BoxLoadBalance> =
+        init_loadbalance_extensions();
+}
+
+fn init_loadbalance_extensions() -> HashMap<String, BoxLoadBalance> {
+    let mut loadbalance_map: HashMap<String, BoxLoadBalance> = HashMap::new();
+    loadbalance_map.insert("random".to_string(), Box::new(RandomLoadBalance::default()));
+    loadbalance_map.insert(
+        "roundrobin".to_string(),
+        Box::new(RoundRobinLoadBalance::default()),
+    );
+    loadbalance_map
+}
diff --git a/dubbo/src/cluster/loadbalance/types.rs b/dubbo/src/cluster/loadbalance/types.rs
new file mode 100644
index 0000000..baf3d08
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/types.rs
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use crate::codegen::RpcInvocation;
+use crate::common::url::Url;
+
+pub type BoxLoadBalance = Box<dyn LoadBalance + Send + Sync>;
+
+pub trait LoadBalance: Debug {
+    fn select(
+        &self,
+        invokers: Arc<Vec<Url>>,
+        url: Option<Url>,
+        invocation: Arc<RpcInvocation>,
+    ) -> Option<Url>;
+}
+
+pub struct Metadata {
+    pub name: &'static str,
+}
+
+impl Metadata {
+    pub fn new(name: &'static str) -> Self {
+        Metadata { name }
+    }
+}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 84980ea..ef43fe6 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -16,3 +16,5 @@
  */
 
 pub mod directory;
+pub mod loadbalance;
+pub mod support;
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs b/dubbo/src/cluster/support/cluster_invoker.rs
new file mode 100644
index 0000000..814b596
--- /dev/null
+++ b/dubbo/src/cluster/support/cluster_invoker.rs
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::str::FromStr;
+use std::sync::Arc;
+
+use http::uri::PathAndQuery;
+use http::Request;
+use hyper::Body;
+use tower_service::Service;
+
+use crate::cluster::loadbalance::types::BoxLoadBalance;
+use crate::cluster::loadbalance::LOAD_BALANCE_EXTENSIONS;
+use crate::cluster::support::DEFAULT_LOADBALANCE;
+use crate::codegen::{Directory, RegistryDirectory, TripleClient};
+use crate::common::url::Url;
+use crate::invocation::RpcInvocation;
+use crate::triple;
+
+#[derive(Debug, Clone)]
+pub struct ClusterInvoker {
+    directory: Arc<RegistryDirectory>,
+    destroyed: bool,
+}
+
+pub trait ClusterInvokerSelector {
+    /// Select a invoker using loadbalance policy.
+    fn select(
+        &self,
+        invocation: Arc<RpcInvocation>,
+        invokers: Arc<Vec<Url>>,
+        excluded: Arc<Vec<Url>>,
+    ) -> Option<Url>;
+
+    fn do_select(
+        &self,
+        loadbalance_key: Option<&str>,
+        invocation: Arc<RpcInvocation>,
+        invokers: Arc<Vec<Url>>,
+    ) -> Option<Url>;
+}
+
+pub trait ClusterRequestBuilder<T>
+where
+    T: Service<http::Request<hyper::Body>, Response = http::Response<crate::BoxBody>>,
+    T::Error: Into<crate::Error>,
+{
+    fn build_req(
+        &self,
+        triple_client: &TripleClient<T>,
+        path: http::uri::PathAndQuery,
+        invocation: Arc<RpcInvocation>,
+        body: hyper::Body,
+    ) -> http::Request<hyper::Body>;
+}
+
+impl ClusterInvoker {
+    pub fn with_directory(registry_directory: RegistryDirectory) -> Self {
+        ClusterInvoker {
+            directory: Arc::new(registry_directory),
+            destroyed: false,
+        }
+    }
+
+    pub fn directory(&self) -> Arc<RegistryDirectory> {
+        self.directory.clone()
+    }
+
+    pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance {
+        if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) {
+            LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap()
+        } else {
+            println!(
+                "loadbalance {} not found, use default loadbalance {}",
+                loadbalance_key, DEFAULT_LOADBALANCE
+            );
+            LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap()
+        }
+    }
+
+    pub fn is_available(&self, invocation: Arc<RpcInvocation>) -> bool {
+        !self.destroyed() && !self.directory.list(invocation).is_empty()
+    }
+
+    pub fn destroyed(&self) -> bool {
+        self.destroyed
+    }
+}
+
+impl ClusterInvokerSelector for ClusterInvoker {
+    fn select(
+        &self,
+        invocation: Arc<RpcInvocation>,
+        invokers: Arc<Vec<Url>>,
+        _excluded: Arc<Vec<Url>>,
+    ) -> Option<Url> {
+        if invokers.is_empty() {
+            return None;
+        }
+        let instance_count = invokers.len();
+        return if instance_count == 1 {
+            Some(invokers.as_ref().first()?.clone())
+        } else {
+            let loadbalance = Some(DEFAULT_LOADBALANCE);
+            self.do_select(loadbalance, invocation, invokers)
+        };
+    }
+
+    /// picking instance invoker url from registry directory
+    fn do_select(
+        &self,
+        loadbalance_key: Option<&str>,
+        invocation: Arc<RpcInvocation>,
+        invokers: Arc<Vec<Url>>,
+    ) -> Option<Url> {
+        let loadbalance = self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE));
+        loadbalance.select(invokers, None, invocation)
+    }
+}
+
+impl<T> ClusterRequestBuilder<T> for ClusterInvoker
+where
+    T: Service<http::Request<hyper::Body>, Response = http::Response<crate::BoxBody>>,
+    T::Error: Into<crate::Error>,
+{
+    fn build_req(
+        &self,
+        triple_client: &triple::client::triple::TripleClient<T>,
+        path: PathAndQuery,
+        invocation: Arc<RpcInvocation>,
+        body: Body,
+    ) -> Request<Body> {
+        let invokers = self.directory.list(invocation.clone());
+        let invoker_url = self
+            .select(invocation, invokers, Arc::new(Vec::new()))
+            .expect("no valid provider");
+        let http_uri =
+            http::Uri::from_str(&format!("http://{}:{}/", invoker_url.ip, invoker_url.port))
+                .unwrap();
+        TripleClient::new_map_request(triple_client, http_uri, path, body)
+    }
+}
diff --git a/dubbo/src/cluster/support/mod.rs b/dubbo/src/cluster/support/mod.rs
new file mode 100644
index 0000000..ae42cc2
--- /dev/null
+++ b/dubbo/src/cluster/support/mod.rs
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod cluster_invoker;
+
+pub const DEFAULT_LOADBALANCE: &str = "random";
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 6a2e74d..7489faf 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -24,12 +24,16 @@
 pub use hyper::Body as hyperBody;
 pub use tower_service::Service;
 
-pub use super::registry::Registry;
-pub use super::registry::BoxRegistry;
-pub use super::registry::RegistryWrapper;
+pub use super::cluster::directory::Directory;
+pub use super::cluster::directory::RegistryDirectory;
+pub use super::cluster::support::cluster_invoker::ClusterInvoker;
+pub use super::invocation::RpcInvocation;
 pub use super::invocation::{IntoStreamingRequest, Request, Response};
 pub use super::protocol::triple::triple_invoker::TripleInvoker;
 pub use super::protocol::Invoker;
+pub use super::registry::BoxRegistry;
+pub use super::registry::Registry;
+pub use super::registry::RegistryWrapper;
 pub use super::triple::client::TripleClient;
 pub use super::triple::codec::prost::ProstCodec;
 pub use super::triple::codec::Codec;
@@ -39,9 +43,6 @@
 };
 pub use super::triple::server::TripleServer;
 pub use super::{empty_body, BoxBody, BoxFuture, StdError};
-pub use super::invocation::RpcInvocation;
-pub use super::cluster::directory::Directory;
-pub use super::cluster::directory::RegistryDirectory;
 pub use crate::filter::service::FilterService;
 pub use crate::filter::Filter;
 pub use crate::triple::client::connection::Connection;
diff --git a/dubbo/src/common/consts.rs b/dubbo/src/common/consts.rs
index c05c2c7..17993c8 100644
--- a/dubbo/src/common/consts.rs
+++ b/dubbo/src/common/consts.rs
@@ -18,3 +18,15 @@
 pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
 pub const PROTOCOL: &str = "protocol";
 pub const REGISTRY: &str = "registry";
+
+// URL key
+pub const DUBBO_KEY: &str = "dubbo";
+pub const PROVIDERS_KEY: &str = "providers";
+pub const LOCALHOST_IP: &str = "127.0.0.1";
+pub const METADATA_MAPPING_KEY: &str = "mapping";
+pub const VERSION_KEY: &str = "version";
+pub const GROUP_KEY: &str = "group";
+pub const INTERFACE_KEY: &str = "interface";
+pub const ANYHOST_KEY: &str = "anyhost";
+pub const SIDE_KEY: &str = "side";
+pub const TIMESTAMP_KEY: &str = "timestamp";
diff --git a/dubbo/src/common/url.rs b/dubbo/src/common/url.rs
index 4f79b30..3ed290b 100644
--- a/dubbo/src/common/url.rs
+++ b/dubbo/src/common/url.rs
@@ -16,15 +16,23 @@
  */
 
 use std::collections::HashMap;
+use std::fmt::{Display, Formatter};
+
+use crate::common::consts::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
+use http::Uri;
 
 #[derive(Debug, Clone, Default, PartialEq)]
 pub struct Url {
-    pub uri: String,
-    pub protocol: String,
+    pub raw_url_string: String,
+    // value of scheme is different to protocol name, eg. triple -> tri://
+    pub scheme: String,
     pub location: String,
     pub ip: String,
     pub port: String,
-    pub service_key: Vec<String>,
+    // serviceKey format in dubbo java and go '{group}/{interfaceName}:{version}'
+    pub service_key: String,
+    // same to interfaceName
+    pub service_name: String,
     pub params: HashMap<String, String>,
 }
 
@@ -34,84 +42,192 @@
     }
 
     pub fn from_url(url: &str) -> Option<Self> {
-        // url: triple://127.0.0.1:8888/helloworld.Greeter
+        // url: tri://127.0.0.1:8888/helloworld.Greeter
         let uri = url
             .parse::<http::Uri>()
             .map_err(|err| {
                 tracing::error!("fail to parse url({}), err: {:?}", url, err);
             })
             .unwrap();
-        Some(Self {
-            uri: uri.to_string(),
-            protocol: uri.scheme_str()?.to_string(),
+        let query = uri.path_and_query().unwrap().query();
+        let mut url_inst = Self {
+            raw_url_string: url.to_string(),
+            scheme: uri.scheme_str()?.to_string(),
             ip: uri.authority()?.host().to_string(),
             port: uri.authority()?.port()?.to_string(),
             location: uri.authority()?.to_string(),
-            service_key: uri
-                .path()
-                .trim_start_matches('/')
-                .split(',')
-                .map(|x| x.to_string())
-                .collect::<Vec<_>>(),
-            params: HashMap::new(),
-        })
+            service_key: uri.path().trim_start_matches('/').to_string(),
+            service_name: uri.path().trim_start_matches('/').to_string(),
+            params: if let Some(..) = query {
+                Url::decode(query.unwrap())
+            } else {
+                HashMap::new()
+            },
+        };
+        url_inst.renew_raw_url_string();
+        Some(url_inst)
     }
 
-    pub fn get_service_name(&self) -> Vec<String> {
+    pub fn get_service_key(&self) -> String {
         self.service_key.clone()
     }
 
-    pub fn get_param(&self, key: String) -> Option<String> {
-        self.params.get(&key).cloned()
+    pub fn get_service_name(&self) -> String {
+        self.service_name.clone()
     }
 
-    pub fn encode_param(&self) -> String {
+    pub fn get_param(&self, key: &str) -> Option<String> {
+        self.params.get(key).cloned()
+    }
+
+    fn encode_param(&self) -> String {
         let mut params_vec: Vec<String> = Vec::new();
         for (k, v) in self.params.iter() {
             // let tmp = format!("{}={}", k, v);
             params_vec.push(format!("{}={}", k, v));
         }
-        params_vec.join("&")
+        if params_vec.is_empty() {
+            "".to_string()
+        } else {
+            format!("?{}", params_vec.join("&"))
+        }
     }
 
-    pub fn decode(&mut self, params: String) {
-        let p: Vec<String> = params.split('&').map(|v| v.trim().to_string()).collect();
+    pub fn params_count(&self) -> usize {
+        self.params.len()
+    }
+
+    fn decode(raw_query_string: &str) -> HashMap<String, String> {
+        let mut params = HashMap::new();
+        let p: Vec<String> = raw_query_string
+            .split('&')
+            .map(|v| v.trim().to_string())
+            .collect();
         for v in p.iter() {
             let values: Vec<String> = v.split('=').map(|v| v.trim().to_string()).collect();
             if values.len() != 2 {
                 continue;
             }
-            self.params.insert(values[0].clone(), values[1].clone());
+            params.insert(values[0].clone(), values[1].clone());
         }
+        params
     }
 
-    pub fn to_url(&self) -> String {
-        format!("{}://{}:{}", self.protocol, self.ip, self.port)
+    pub fn set_param(&mut self, key: &str, value: &str) {
+        self.params.insert(key.to_string(), value.to_string());
+        self.renew_raw_url_string();
+    }
+
+    pub fn raw_url_string(&self) -> String {
+        self.raw_url_string.clone()
+    }
+
+    pub fn encoded_raw_url_string(&self) -> String {
+        urlencoding::encode(self.raw_url_string.as_str()).to_string()
+    }
+
+    fn build_service_key(&self) -> String {
+        format!(
+            "{group}/{interfaceName}:{version}",
+            group = self.get_param(GROUP_KEY).unwrap_or("default".to_string()),
+            interfaceName = self.get_param(INTERFACE_KEY).unwrap_or("error".to_string()),
+            version = self.get_param(VERSION_KEY).unwrap_or("1.0.0".to_string())
+        )
+    }
+
+    fn renew_raw_url_string(&mut self) {
+        self.raw_url_string = format!(
+            "{}://{}:{}/{}{}",
+            self.scheme,
+            self.ip,
+            self.port,
+            self.service_name,
+            self.encode_param()
+        );
+        self.service_key = self.build_service_key()
+    }
+
+    // short_url is used for tcp listening
+    pub fn short_url(&self) -> String {
+        format!("{}://{}:{}", self.scheme, self.ip, self.port)
+    }
+}
+
+impl Display for Url {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.write_str(self.raw_url_string().as_str())
+    }
+}
+
+impl Into<Uri> for Url {
+    fn into(self) -> Uri {
+        self.raw_url_string.parse::<Uri>().unwrap()
+    }
+}
+
+impl From<&str> for Url {
+    fn from(url: &str) -> Self {
+        Url::from_url(url).unwrap()
     }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::common::consts::{ANYHOST_KEY, VERSION_KEY};
 
     #[test]
     fn test_from_url() {
-        let u1 = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter");
-        println!("{:?}", u1.unwrap().get_service_name())
+        let mut u1 = Url::from_url("tri://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&\
+        application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&\
+        environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&\
+        module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&\
+        side=provider&timeout=3000&timestamp=1556509797245&version=1.0.0&application=test");
+        assert_eq!(
+            u1.as_ref().unwrap().service_key,
+            "default/com.ikurento.user.UserProvider:1.0.0"
+        );
+        assert_eq!(
+            u1.as_ref()
+                .unwrap()
+                .get_param(ANYHOST_KEY)
+                .unwrap()
+                .as_str(),
+            "true"
+        );
+        assert_eq!(
+            u1.as_ref()
+                .unwrap()
+                .get_param("default.timeout")
+                .unwrap()
+                .as_str(),
+            "10000"
+        );
+        assert_eq!(u1.as_ref().unwrap().scheme, "tri");
+        assert_eq!(u1.as_ref().unwrap().ip, "127.0.0.1");
+        assert_eq!(u1.as_ref().unwrap().port, "20000");
+        assert_eq!(u1.as_ref().unwrap().params_count(), 18);
+        u1.as_mut().unwrap().set_param("key1", "value1");
+        assert_eq!(
+            u1.as_ref().unwrap().get_param("key1").unwrap().as_str(),
+            "value1"
+        );
+        assert_eq!(
+            u1.as_ref()
+                .unwrap()
+                .get_param(VERSION_KEY)
+                .unwrap()
+                .as_str(),
+            "1.0.0"
+        );
     }
 
     #[test]
-    fn test_encode_params() {
-        let mut u = Url::default();
-        u.params.insert("method".to_string(), "GET".to_string());
-        u.params.insert("args".to_string(), "GET".to_string());
-
-        let en = u.encode_param();
-        println!("encode_params: {:?}", en);
-
-        let mut u1 = Url::default();
-        u1.decode(en);
-        println!("decode_params: {:?}", u1);
-        assert_eq!(u1, u);
+    fn test2() {
+        let url: Url = "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter".into();
+        assert_eq!(
+            url.raw_url_string(),
+            "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter"
+        )
     }
 }
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index 5cff9e8..96b9b28 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -16,22 +16,30 @@
  */
 
 use std::collections::HashMap;
+use std::error::Error;
 use std::pin::Pin;
+use std::sync::{Arc, Mutex};
 
 use futures::future;
 use futures::Future;
+use tracing::{debug, info};
+
+use dubbo_config::protocol::ProtocolRetrieve;
+use dubbo_config::{get_global_config, RootConfig};
 
 use crate::common::url::Url;
 use crate::protocol::{BoxExporter, Protocol};
 use crate::registry::protocol::RegistryProtocol;
-use dubbo_config::{get_global_config, RootConfig};
+use crate::registry::types::{Registries, RegistriesOperation};
+use crate::registry::{BoxRegistry, Registry};
 
 // Invoker是否可以基于hyper写一个通用的
 
 #[derive(Default)]
 pub struct Dubbo {
+    // cached protocols, key of map means protocol name eg. dubbo, triple, grpc
     protocols: HashMap<String, Vec<Url>>,
-    registries: HashMap<String, Url>,
+    registries: Option<Registries>,
     service_registry: HashMap<String, Vec<Url>>, // registry: Urls
     config: Option<RootConfig>,
 }
@@ -41,7 +49,7 @@
         tracing_subscriber::fmt::init();
         Self {
             protocols: HashMap::new(),
-            registries: HashMap::new(),
+            registries: None,
             service_registry: HashMap::new(),
             config: None,
         }
@@ -52,80 +60,83 @@
         self
     }
 
-    pub fn init(&mut self) {
+    pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry) -> Self {
+        if self.registries.is_none() {
+            self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
+        }
+        self.registries
+            .as_ref()
+            .unwrap()
+            .insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
+        self
+    }
+
+    pub fn init(&mut self) -> Result<(), Box<dyn Error>> {
         if self.config.is_none() {
             self.config = Some(get_global_config())
         }
 
-        let conf = self.config.as_ref().unwrap();
-        tracing::debug!("global conf: {:?}", conf);
-
-        for (name, url) in conf.registries.iter() {
-            self.registries
-                .insert(name.to_string(), Url::from_url(url).unwrap());
-        }
-
-        for (_, c) in conf.provider.services.iter() {
-            let u = if c.protocol_configs.is_empty() {
-                let protocol = match conf.protocols.get(&c.protocol) {
-                    Some(v) => v.to_owned(),
-                    None => {
-                        tracing::warn!("protocol {:?} not exists", c.protocol);
-                        continue;
-                    }
-                };
-                let protocol_url = format!("{}/{}", protocol.to_url(), c.name.clone(),);
+        let root_config = self.config.as_ref().unwrap();
+        debug!("global conf: {:?}", root_config);
+        // env::set_var("ZOOKEEPER_SERVERS",root_config);
+        for (_, service_config) in root_config.provider.services.iter() {
+            info!("init service name: {}", service_config.interface);
+            let url = if root_config
+                .protocols
+                .contains_key(service_config.protocol.as_str())
+            {
+                let protocol = root_config
+                    .protocols
+                    .get_protocol_or_default(service_config.protocol.as_str());
+                let protocol_url =
+                    format!("{}/{}", protocol.to_url(), service_config.interface.clone(),);
+                info!("protocol_url: {:?}", protocol_url);
                 Url::from_url(&protocol_url)
             } else {
-                let protocol = match c.protocol_configs.get(&c.protocol) {
-                    Some(v) => v.to_owned(),
-                    None => {
-                        tracing::warn!("protocol {:?} not exists", c.protocol);
-                        continue;
-                    }
-                };
-                let protocol_url = format!("{}/{}", protocol.to_url(), c.name.clone(),);
-                Url::from_url(&protocol_url)
+                return Err(format!("protocol {:?} not exists", service_config.protocol).into());
             };
-            tracing::info!("url: {:?}", u);
-            if u.is_none() {
+            info!("url: {:?}", url);
+            if url.is_none() {
                 continue;
             }
-
-            let u = u.unwrap();
-
-            let reg_url = self.registries.get(&c.registry).unwrap();
-            if self.service_registry.get(&c.name).is_some() {
-                self.service_registry
-                    .get_mut(&c.name)
+            let u = url.unwrap();
+            if self.protocols.get(&service_config.protocol).is_some() {
+                self.protocols
+                    .get_mut(&service_config.protocol)
                     .unwrap()
-                    .push(reg_url.clone());
+                    .push(u);
             } else {
-                self.service_registry
-                    .insert(c.name.clone(), vec![reg_url.clone()]);
-            }
-
-            if self.protocols.get(&c.protocol).is_some() {
-                self.protocols.get_mut(&c.protocol).unwrap().push(u);
-            } else {
-                self.protocols.insert(c.protocol.clone(), vec![u]);
+                self.protocols
+                    .insert(service_config.protocol.clone(), vec![u]);
             }
         }
+        Ok(())
     }
 
     pub async fn start(&mut self) {
-        self.init();
-
+        self.init().unwrap();
+        info!("starting...");
         // TODO: server registry
-
-        let mem_reg =
-            Box::new(RegistryProtocol::new().with_services(self.service_registry.clone()));
+        let mem_reg = Box::new(
+            RegistryProtocol::new()
+                .with_registries(self.registries.as_ref().unwrap().clone())
+                .with_services(self.service_registry.clone()),
+        );
         let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
         for (name, items) in self.protocols.iter() {
             for url in items.iter() {
-                tracing::info!("protocol: {:?}, service url: {:?}", name, url);
+                info!("protocol: {:?}, service url: {:?}", name, url);
                 let exporter = mem_reg.clone().export(url.to_owned());
-                async_vec.push(exporter)
+                async_vec.push(exporter);
+                //TODO multiple registry
+                if self.registries.is_some() {
+                    self.registries
+                        .as_ref()
+                        .unwrap()
+                        .default_registry()
+                        .register(url.clone())
+                        .unwrap();
+                }
             }
         }
 
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index 85d971f..9a9cf8f 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -15,9 +15,11 @@
  * limitations under the License.
  */
 
-use futures_core::Stream;
+use std::fmt::Debug;
 use std::{collections::HashMap, str::FromStr};
 
+use futures_core::Stream;
+
 pub struct Request<T> {
     pub message: T,
     pub metadata: Metadata,
@@ -83,6 +85,13 @@
     metadata: Metadata,
 }
 
+pub trait RpcResult {}
+
+/// symbol for cluster invoke
+impl<T> RpcResult for Response<T> {}
+
+pub type BoxRpcResult = Box<dyn RpcResult>;
+
 impl<T> Response<T> {
     pub fn new(message: T) -> Response<T> {
         Self {
@@ -195,14 +204,16 @@
     fn get_method_name(&self) -> String;
 }
 
-#[derive(Default)]
+pub type BoxInvocation = Box<dyn Invocation>;
+
+#[derive(Default, Debug)]
 pub struct RpcInvocation {
     target_service_unique_name: String,
     method_name: String,
 }
 
-impl RpcInvocation{
-    pub fn with_servie_unique_name(mut self, service_unique_name: String) -> Self {
+impl RpcInvocation {
+    pub fn with_service_unique_name(mut self, service_unique_name: String) -> Self {
         self.target_service_unique_name = service_unique_name;
         self
     }
@@ -210,9 +221,11 @@
         self.method_name = method_name;
         self
     }
+    pub fn unique_fingerprint(&self) -> String {
+        format!("{}#{}", self.target_service_unique_name, self.method_name)
+    }
 }
 
-
 impl Invocation for RpcInvocation {
     fn get_target_service_unique_name(&self) -> String {
         self.target_service_unique_name.clone()
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index e685cba..0bac3f7 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-pub mod codegen;
 pub mod cluster;
+pub mod codegen;
 pub mod common;
 pub mod filter;
 mod framework;
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 20042cb..572b56d 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-pub mod server_desc;
-pub mod triple;
-
+use std::fmt::Debug;
 use std::future::Future;
 use std::task::{Context, Poll};
 
@@ -26,6 +24,9 @@
 
 use crate::common::url::Url;
 
+pub mod server_desc;
+pub mod triple;
+
 #[async_trait]
 pub trait Protocol {
     type Invoker;
@@ -39,7 +40,7 @@
     fn unexport(&self);
 }
 
-pub trait Invoker<ReqBody> {
+pub trait Invoker<ReqBody>: Debug {
     type Response;
 
     type Error;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index 2ec3f4a..2a727c3 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -24,7 +24,7 @@
 use crate::triple::client::connection::Connection;
 
 #[allow(dead_code)]
-#[derive(Clone, Default)]
+#[derive(Clone, Default, Debug)]
 pub struct TripleInvoker {
     url: Url,
     conn: Connection,
@@ -32,7 +32,7 @@
 
 impl TripleInvoker {
     pub fn new(url: Url) -> TripleInvoker {
-        let uri = http::Uri::from_str(&url.to_url()).unwrap();
+        let uri = http::Uri::from_str(&url.short_url()).unwrap();
         Self {
             url,
             conn: Connection::new().with_host(uri),
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs
index 5b713ad..c15e26c 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -46,7 +46,7 @@
 
     pub fn get_server(&self, url: Url) -> Option<TripleServer> {
         self.servers
-            .get(&url.service_key.join(","))
+            .get(&url.service_key)
             .map(|data| data.to_owned())
     }
 }
@@ -60,10 +60,10 @@
     }
 
     async fn export(mut self, url: Url) -> BoxExporter {
-        let server = TripleServer::new(url.service_key.clone());
-        self.servers
-            .insert(url.service_key.join(","), server.clone());
-        server.serve(url.to_url()).await;
+        // service_key is same to key of TRIPLE_SERVICES
+        let server = TripleServer::new(url.service_name.clone());
+        self.servers.insert(url.service_key.clone(), server.clone());
+        server.serve(url.short_url().clone()).await;
 
         Box::new(TripleExporter::new())
     }
diff --git a/dubbo/src/protocol/triple/triple_server.rs b/dubbo/src/protocol/triple/triple_server.rs
index b85dbe1..8e42cab 100644
--- a/dubbo/src/protocol/triple/triple_server.rs
+++ b/dubbo/src/protocol/triple/triple_server.rs
@@ -26,9 +26,9 @@
 }
 
 impl TripleServer {
-    pub fn new(names: Vec<String>) -> TripleServer {
+    pub fn new(names: String) -> TripleServer {
         Self {
-            service_names: names,
+            service_names: vec![names],
             s: DubboServer::new(),
         }
     }
diff --git a/dubbo/src/registry/integration.rs b/dubbo/src/registry/integration.rs
new file mode 100644
index 0000000..2266e46
--- /dev/null
+++ b/dubbo/src/registry/integration.rs
@@ -0,0 +1,8 @@
+use crate::cluster::support::cluster_invoker::ClusterInvoker;
+use crate::registry::BoxRegistry;
+use std::sync::Arc;
+
+pub trait ClusterRegistryIntegration {
+    /// get cluster invoker struct
+    fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>>;
+}
diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs
index f1ff25f..76b8922 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -16,9 +16,11 @@
  */
 
 #![allow(unused_variables, dead_code, missing_docs)]
+
 use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::RwLock;
+use tracing::debug;
 
 use crate::common::url::Url;
 
@@ -46,9 +48,9 @@
 impl Registry for MemoryRegistry {
     type NotifyListener = MemoryNotifyListener;
 
-    fn register(&mut self, mut url: crate::common::url::Url) -> Result<(), crate::StdError> {
+    fn register(&mut self, mut url: Url) -> Result<(), crate::StdError> {
         // define provider label: ${registry.group}/${service_name}/provider
-        let registry_group = match url.get_param(REGISTRY_GROUP_KEY.to_string()) {
+        let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
             Some(key) => key,
             None => "dubbo".to_string(),
         };
@@ -56,20 +58,20 @@
         let dubbo_path = format!(
             "/{}/{}/{}",
             registry_group,
-            url.get_service_name().join(","),
+            url.get_service_name(),
             "provider",
         );
 
         url.params.insert("anyhost".to_string(), "true".to_string());
         // define triple url path
-        let raw_url = format!("{}?{}", url.to_url(), url.encode_param(),);
+        let raw_url = url.raw_url_string();
 
         self.registries.write().unwrap().insert(dubbo_path, raw_url);
         Ok(())
     }
 
     fn unregister(&mut self, url: crate::common::url::Url) -> Result<(), crate::StdError> {
-        let registry_group = match url.get_param(REGISTRY_GROUP_KEY.to_string()) {
+        let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
             Some(key) => key,
             None => "dubbo".to_string(),
         };
@@ -77,7 +79,7 @@
         let dubbo_path = format!(
             "/{}/{}/{}",
             registry_group,
-            url.get_service_name().join(","),
+            url.get_service_name(),
             "provider",
         );
         self.registries.write().unwrap().remove(&dubbo_path);
@@ -108,10 +110,11 @@
 
 impl NotifyListener for MemoryNotifyListener {
     fn notify(&self, event: super::ServiceEvent) {
-        let mut map=self.service_instances.write().expect("msg");
+        debug!("notify {:?}", event);
+        let mut map = self.service_instances.write().expect("msg");
         match event.action.as_str() {
-            "ADD"=> map.insert(event.key, event.service),
-            &_ => todo!()
+            "ADD" => map.insert(event.key, event.service),
+            &_ => todo!(),
         };
     }
 
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 8c56692..ea915fe 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -16,12 +16,15 @@
  */
 
 #![allow(unused_variables, dead_code, missing_docs)]
+pub mod integration;
 pub mod memory_registry;
 pub mod protocol;
+pub mod types;
 
-use std::fmt::Debug;
+use std::fmt::{Debug, Formatter};
 
 use crate::common::url::Url;
+use crate::registry::memory_registry::MemoryNotifyListener;
 
 pub trait Registry {
     type NotifyListener;
@@ -38,18 +41,24 @@
     fn notify_all(&self, event: ServiceEvent);
 }
 
+#[derive(Debug)]
 pub struct ServiceEvent {
     pub key: String,
     pub action: String,
     pub service: Vec<Url>,
 }
 
-pub type BoxRegistry =
-    Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener> + Send + Sync>;
+pub type BoxRegistry = Box<dyn Registry<NotifyListener = MemoryNotifyListener> + Send + Sync>;
+
+impl Debug for BoxRegistry {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.write_str("BoxRegistry")
+    }
+}
 
 #[derive(Default)]
 pub struct RegistryWrapper {
-    pub registry: Option<Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener>>>,
+    pub registry: Option<Box<dyn Registry<NotifyListener = MemoryNotifyListener>>>,
 }
 
 impl Clone for RegistryWrapper {
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 933975c..db6c1f6 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -16,7 +16,8 @@
  */
 
 use std::collections::HashMap;
-use std::sync::{Arc, RwLock};
+use std::fmt::{Debug, Formatter};
+use std::sync::{Arc, Mutex, RwLock};
 
 use super::memory_registry::MemoryRegistry;
 use super::BoxRegistry;
@@ -27,26 +28,45 @@
 use crate::protocol::BoxExporter;
 use crate::protocol::BoxInvoker;
 use crate::protocol::Protocol;
+use crate::registry::types::Registries;
 
 #[derive(Clone, Default)]
 pub struct RegistryProtocol {
     // registerAddr: Registry
-    registries: Arc<RwLock<HashMap<String, BoxRegistry>>>,
+    registries: Option<Registries>,
     // providerUrl: Exporter
     exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
     // serviceName: registryUrls
     services: HashMap<String, Vec<Url>>,
 }
 
+impl Debug for RegistryProtocol {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.write_str(
+            format!(
+                "RegistryProtocol services{:#?},registries,{:#?}",
+                self.services,
+                self.registries.clone()
+            )
+            .as_str(),
+        )
+    }
+}
+
 impl RegistryProtocol {
     pub fn new() -> Self {
         RegistryProtocol {
-            registries: Arc::new(RwLock::new(HashMap::new())),
+            registries: None,
             exporters: Arc::new(RwLock::new(HashMap::new())),
             services: HashMap::new(),
         }
     }
 
+    pub fn with_registries(mut self, registries: Registries) -> Self {
+        self.registries = Some(registries);
+        self
+    }
+
     pub fn with_services(mut self, services: HashMap<String, Vec<Url>>) -> Self {
         self.services.extend(services);
         self
@@ -55,9 +75,11 @@
     pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
         let mem = MemoryRegistry::default();
         self.registries
-            .write()
+            .as_ref()
             .unwrap()
-            .insert(url.location, Box::new(mem.clone()));
+            .lock()
+            .unwrap()
+            .insert(url.location, Arc::new(Mutex::new(Box::new(mem.clone()))));
 
         Box::new(mem)
     }
@@ -77,23 +99,23 @@
         // init Exporter based on provider_url
         // server registry based on register_url
         // start server health check
-        let registry_url = self.services.get(url.get_service_name().join(",").as_str());
+        let registry_url = self.services.get(url.get_service_name().as_str());
         if let Some(urls) = registry_url {
             for url in urls.clone().iter() {
-                if !url.protocol.is_empty() {
+                if !url.service_key.is_empty() {
                     let mut reg = self.get_registry(url.clone());
                     reg.register(url.clone()).unwrap();
                 }
             }
         }
 
-        match url.clone().protocol.as_str() {
-            "triple" => {
+        match url.clone().scheme.as_str() {
+            "tri" => {
                 let pro = Box::new(TripleProtocol::new());
                 return pro.export(url).await;
             }
             _ => {
-                tracing::error!("protocol {:?} not implemented", url.protocol);
+                tracing::error!("protocol {:?} not implemented", url.scheme);
                 Box::new(TripleExporter::new())
             }
         }
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
new file mode 100644
index 0000000..4768c33
--- /dev/null
+++ b/dubbo/src/registry/types.rs
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use itertools::Itertools;
+use tracing::info;
+
+use crate::common::url::Url;
+use crate::registry::memory_registry::MemoryNotifyListener;
+use crate::registry::{BoxRegistry, Registry};
+use crate::StdError;
+
+pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
+pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;
+
+pub const DEFAULT_REGISTRY_KEY: &str = "default";
+
+pub trait RegistriesOperation {
+    fn get(&self, registry_key: &str) -> SafeRegistry;
+    fn insert(&self, registry_key: String, registry: SafeRegistry);
+    fn default_registry(&self) -> SafeRegistry;
+}
+
+impl RegistriesOperation for Registries {
+    fn get(&self, registry_key: &str) -> SafeRegistry {
+        self.as_ref()
+            .lock()
+            .unwrap()
+            .get(registry_key)
+            .unwrap()
+            .clone()
+    }
+
+    fn insert(&self, registry_key: String, registry: SafeRegistry) {
+        self.as_ref().lock().unwrap().insert(registry_key, registry);
+    }
+
+    fn default_registry(&self) -> SafeRegistry {
+        let guard = self.as_ref().lock().unwrap();
+        let (_, result) = guard
+            .iter()
+            .find_or_first(|e| e.0 == DEFAULT_REGISTRY_KEY)
+            .unwrap()
+            .to_owned();
+        result.clone()
+    }
+}
+
+impl Registry for SafeRegistry {
+    type NotifyListener = MemoryNotifyListener;
+
+    fn register(&mut self, url: Url) -> Result<(), StdError> {
+        info!("register {}.", url);
+        self.lock().unwrap().register(url).expect("registry err.");
+        Ok(())
+    }
+
+    fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+        self.lock().unwrap().register(url).expect("registry err.");
+        Ok(())
+    }
+
+    fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
+        self.lock().unwrap().register(url).expect("registry err.");
+        Ok(())
+    }
+
+    fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
+        self.lock().unwrap().register(url).expect("registry err.");
+        Ok(())
+    }
+}
diff --git a/dubbo/src/triple/client/connection.rs b/dubbo/src/triple/client/connection.rs
index ce45309..9246c9b 100644
--- a/dubbo/src/triple/client/connection.rs
+++ b/dubbo/src/triple/client/connection.rs
@@ -20,6 +20,7 @@
 use hyper::client::conn::Builder;
 use hyper::client::service::Connect;
 use tower_service::Service;
+use tracing::debug;
 
 use crate::boxed;
 use crate::triple::transport::connector::get_connector;
@@ -86,6 +87,7 @@
         let mut connector = Connect::new(get_connector(self.connector.clone()), builder);
         let uri = self.host.clone();
         let fut = async move {
+            debug!("send rpc call to {}", uri);
             let mut con = connector.call(uri).await.unwrap();
             con.call(req)
                 .await
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index 27db0ad..ca81f85 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -16,30 +16,30 @@
  */
 
 use std::str::FromStr;
+use std::sync::Arc;
 
 use futures_util::{future, stream, StreamExt, TryStreamExt};
-
 use http::HeaderValue;
-use rand::prelude::SliceRandom;
 use tower_service::Service;
 
-use super::connection::Connection;
-use crate::codegen::{Directory, RpcInvocation};
+use crate::cluster::support::cluster_invoker::{ClusterInvoker, ClusterRequestBuilder};
+use crate::codegen::RpcInvocation;
 use crate::filter::service::FilterService;
 use crate::filter::Filter;
 use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
-
 use crate::triple::codec::Codec;
 use crate::triple::compression::CompressionEncoding;
 use crate::triple::decode::Decoding;
 use crate::triple::encode::encode;
 
+use super::connection::Connection;
+
 #[derive(Debug, Clone, Default)]
 pub struct TripleClient<T> {
     host: Option<http::Uri>,
     inner: T,
     send_compression_encoding: Option<CompressionEncoding>,
-    directory: Option<Box<dyn Directory>>,
+    cluster_invoker: Option<ClusterInvoker>,
 }
 
 impl TripleClient<Connection> {
@@ -56,7 +56,7 @@
             host: uri.clone(),
             inner: Connection::new().with_host(uri.unwrap()),
             send_compression_encoding: Some(CompressionEncoding::Gzip),
-            directory: None,
+            cluster_invoker: None,
         }
     }
 }
@@ -67,7 +67,7 @@
             host,
             inner,
             send_compression_encoding: Some(CompressionEncoding::Gzip),
-            directory: None,
+            cluster_invoker: None,
         }
     }
 
@@ -78,10 +78,9 @@
         TripleClient::new(FilterService::new(self.inner, filter), self.host)
     }
 
-    /// host: http://0.0.0.0:8888
-    pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
+    pub fn with_cluster(self, invoker: ClusterInvoker) -> Self {
         TripleClient {
-            directory: Some(directory),
+            cluster_invoker: Some(invoker),
             ..self
         }
     }
@@ -92,7 +91,7 @@
     T: Service<http::Request<hyper::Body>, Response = http::Response<crate::BoxBody>>,
     T::Error: Into<crate::Error>,
 {
-    fn new_map_request(
+    pub fn new_map_request(
         &self,
         uri: http::Uri,
         path: http::uri::PathAndQuery,
@@ -242,11 +241,11 @@
     }
 
     pub async fn unary<C, M1, M2>(
-        &mut self,
+        &self,
         req: Request<M1>,
         mut codec: C,
         path: http::uri::PathAndQuery,
-        invocation: RpcInvocation,
+        invocation: Arc<RpcInvocation>,
     ) -> Result<Response<M2>, crate::status::Status>
     where
         C: Codec<Encode = M1, Decode = M2>,
@@ -262,14 +261,19 @@
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
 
-        let url_list = self.directory.as_ref().expect("msg").list(invocation);
-        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
-        let http_uri =
-            http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
-
-        let req = self.new_map_request(http_uri.clone(), path, body);
-
-        let mut conn = Connection::new().with_host(http_uri);
+        // let url_list = self.directory.as_ref().expect("msg").list(invocation);
+        // let url_list = self.cluster_invoker.as_ref().unwrap().directory().list(invocation.clone());
+        // let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+        // let http_uri =
+        //     http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
+        //
+        // let req = self.new_map_request(http_uri.clone(), path, body);
+        let req =
+            self.cluster_invoker
+                .as_ref()
+                .unwrap()
+                .build_req(self, path, invocation.clone(), body);
+        let mut conn = Connection::new().with_host(req.uri().clone());
         let response = conn
             .call(req)
             .await
diff --git a/dubbo/src/triple/compression.rs b/dubbo/src/triple/compression.rs
index b173211..65cdce2 100644
--- a/dubbo/src/triple/compression.rs
+++ b/dubbo/src/triple/compression.rs
@@ -111,12 +111,12 @@
     let len = src.len();
     src.reserve(len);
 
-    compress(CompressionEncoding::Gzip, &mut src, &mut dst, len);
+    compress(CompressionEncoding::Gzip, &mut src, &mut dst, len).unwrap();
     println!("src: {:?}, dst: {:?}", src, dst);
 
     let mut de_dst = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
     let de_len = dst.len();
-    decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len);
+    decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len).unwrap();
 
     println!("src: {:?}, dst: {:?}", dst, de_dst);
 }
diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs
index bb2389f..71a3056 100644
--- a/examples/echo/src/protos/hello_echo.rs
+++ b/examples/echo/src/protos/hello_echo.rs
@@ -18,13 +18,13 @@
 /// EchoRequest is the request for echo.
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct EchoRequest {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub message: ::prost::alloc::string::String,
 }
 /// EchoResponse is the response for echo.
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct EchoResponse {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub message: ::prost::alloc::string::String,
 }
 /// Generated client implementations.
@@ -67,17 +67,17 @@
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
             let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
-            self.inner.unary(request, codec, path, RpcInvocation::default()).await
+            self.inner
+                .unary(request, codec, path, Arc::new(RpcInvocation::default()))
+                .await
         }
         /// ServerStreamingEcho is server side streaming.
         pub async fn server_streaming_echo(
             &mut self,
             request: Request<super::EchoRequest>,
         ) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ServerStreamingEcho",
             );
@@ -88,10 +88,8 @@
             &mut self,
             request: impl IntoStreamingRequest<Message = super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ClientStreamingEcho",
             );
@@ -102,10 +100,8 @@
             &mut self,
             request: impl IntoStreamingRequest<Message = super::EchoRequest>,
         ) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
             );
@@ -126,9 +122,7 @@
             request: Request<super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
         ///Server streaming response type for the ServerStreamingEcho method.
-        type ServerStreamingEchoStream: futures_util::Stream<
-                Item = Result<super::EchoResponse, dubbo::status::Status>,
-            >
+        type ServerStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
             + Send
             + 'static;
         /// ServerStreamingEcho is server side streaming.
@@ -142,19 +136,14 @@
             request: Request<Decoding<super::EchoRequest>>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
         ///Server streaming response type for the BidirectionalStreamingEcho method.
-        type BidirectionalStreamingEchoStream: futures_util::Stream<
-                Item = Result<super::EchoResponse, dubbo::status::Status>,
-            >
+        type BidirectionalStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
             + Send
             + 'static;
         /// BidirectionalStreamingEcho is bidi streaming.
         async fn bidirectional_streaming_echo(
             &self,
             request: Request<Decoding<super::EchoRequest>>,
-        ) -> Result<
-            Response<Self::BidirectionalStreamingEchoStream>,
-            dubbo::status::Status,
-        >;
+        ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, dubbo::status::Status>;
     }
     /// Echo is the echo service.
     #[derive(Debug)]
@@ -184,10 +173,7 @@
         type Response = http::Response<BoxBody>;
         type Error = std::convert::Infallible;
         type Future = BoxFuture<Self::Response, Self::Error>;
-        fn poll_ready(
-            &mut self,
-            _cx: &mut Context<'_>,
-        ) -> Poll<Result<(), Self::Error>> {
+        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -200,26 +186,18 @@
                     }
                     impl<T: Echo> UnarySvc<super::EchoRequest> for UnaryEchoServer<T> {
                         type Response = super::EchoResponse;
-                        type Future = BoxFuture<
-                            Response<Self::Response>,
-                            dubbo::status::Status,
-                        >;
-                        fn call(
-                            &mut self,
-                            request: Request<super::EchoRequest>,
-                        ) -> Self::Future {
+                        type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
+                        fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
                             let inner = self.inner.0.clone();
                             let fut = async move { inner.unary_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server.unary(UnaryEchoServer { inner }, req).await;
                         Ok(res)
                     };
@@ -230,32 +208,22 @@
                     struct ServerStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
-                    for ServerStreamingEchoServer<T> {
+                    impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for ServerStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
                         type ResponseStream = T::ServerStreamingEchoStream;
-                        type Future = BoxFuture<
-                            Response<Self::ResponseStream>,
-                            dubbo::status::Status,
-                        >;
-                        fn call(
-                            &mut self,
-                            request: Request<super::EchoRequest>,
-                        ) -> Self::Future {
+                        type Future =
+                            BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
+                        fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                inner.server_streaming_echo(request).await
-                            };
+                            let fut = async move { inner.server_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
                             .server_streaming(ServerStreamingEchoServer { inner }, req)
                             .await;
@@ -268,31 +236,23 @@
                     struct ClientStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
-                    for ClientStreamingEchoServer<T> {
+                    impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for ClientStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
-                        type Future = BoxFuture<
-                            Response<Self::Response>,
-                            dubbo::status::Status,
-                        >;
+                        type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
                         fn call(
                             &mut self,
                             request: Request<Decoding<super::EchoRequest>>,
                         ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                inner.client_streaming_echo(request).await
-                            };
+                            let fut = async move { inner.client_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
                             .client_streaming(ClientStreamingEchoServer { inner }, req)
                             .await;
@@ -305,56 +265,41 @@
                     struct BidirectionalStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> StreamingSvc<super::EchoRequest>
-                    for BidirectionalStreamingEchoServer<T> {
+                    impl<T: Echo> StreamingSvc<super::EchoRequest> for BidirectionalStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
                         type ResponseStream = T::BidirectionalStreamingEchoStream;
-                        type Future = BoxFuture<
-                            Response<Self::ResponseStream>,
-                            dubbo::status::Status,
-                        >;
+                        type Future =
+                            BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
                         fn call(
                             &mut self,
                             request: Request<Decoding<super::EchoRequest>>,
                         ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                inner.bidirectional_streaming_echo(request).await
-                            };
+                            let fut =
+                                async move { inner.bidirectional_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
-                            .bidi_streaming(
-                                BidirectionalStreamingEchoServer {
-                                    inner,
-                                },
-                                req,
-                            )
+                            .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req)
                             .await;
                         Ok(res)
                     };
                     Box::pin(fut)
                 }
-                _ => {
-                    Box::pin(async move {
-                        Ok(
-                            http::Response::builder()
-                                .status(200)
-                                .header("grpc-status", "12")
-                                .header("content-type", "application/grpc")
-                                .body(empty_body())
-                                .unwrap(),
-                        )
-                    })
-                }
+                _ => Box::pin(async move {
+                    Ok(http::Response::builder()
+                        .status(200)
+                        .header("grpc-status", "12")
+                        .header("content-type", "application/grpc")
+                        .body(empty_body())
+                        .unwrap())
+                }),
             }
         }
     }
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index c4b20cc..c34b19f 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -24,7 +24,6 @@
 tokio-stream = "0.1"
 tracing = "0.1"
 tracing-subscriber = "0.2.0"
-
 dubbo = {path = "../../dubbo", version = "0.2.0"}
 dubbo-config = {path = "../../config", version = "0.2.0"}
 dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = "0.2.0"}
diff --git a/examples/greeter/dubbo.yaml b/examples/greeter/dubbo.yaml
index 1e4ade8..b9bd353 100644
--- a/examples/greeter/dubbo.yaml
+++ b/examples/greeter/dubbo.yaml
@@ -1,18 +1,17 @@
-name: dubbo
-service:
-  org.apache.dubbo.sample.tri.Greeter:
-    version: 1.0.0
-    group: test
-    protocol: triple
-    registry: ''
-    serializer: json
-    protocol_configs:
-      triple:
-        ip: 0.0.0.0
-        port: '8888'
-        name: triple
-protocols:
-  triple:
-    ip: 0.0.0.0
-    port: '8888'
-    name: triple
+dubbo:
+  protocols:
+    triple:
+      ip: 0.0.0.0
+      port: '8888'
+      name: tri
+  registries:
+    demoZK:
+      protocol: zookeeper
+      address: 127.0.0.1:2181
+  provider:
+    services:
+      GreeterProvider:
+        version: 1.0.0
+        group: test
+        protocol: triple
+        interface: org.apache.dubbo.sample.tri.Greeter
\ No newline at end of file
diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs
index 93d509a..c673872 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 
+use std::time::Duration;
+
+use tracing::Level;
+use tracing_subscriber::FmtSubscriber;
+
+use dubbo::cluster::support::cluster_invoker::ClusterInvoker;
+use dubbo::{cluster::directory::RegistryDirectory, codegen::*};
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
+use protos::{greeter_client::GreeterClient, GreeterRequest};
+
 pub mod protos {
     #![allow(non_camel_case_types)]
     include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
 }
-use std::{str::FromStr, time::Duration, env};
-
-use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
-
-use dubbo::{cluster::directory::RegistryDirectory, codegen::*, invocation::RpcInvocation};
-use http;
-use protos::{greeter_client::GreeterClient, GreeterRequest};
-use tracing::Level;
-use tracing_subscriber::FmtSubscriber;
 
 #[tokio::main]
 async fn main() {
@@ -41,19 +42,13 @@
 
     tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
 
-    let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
-        Ok(val) => val,
-        Err(_) => "localhost:2181".to_string(),
-    };
-    let zkr = ZookeeperRegistry::new(&zk_connect_string);
+    let zkr = ZookeeperRegistry::default();
     let directory = RegistryDirectory::new(Box::new(zkr));
+    let cluster_invoker = ClusterInvoker::with_directory(directory);
 
-    let http_uri = http::Uri::from_str(&"http://1.1.1.1:8888").unwrap();
-
-    let mut cli = GreeterClient::new(Connection::new().with_host(http_uri));
-    cli = cli.with_directory(Box::new(directory));
-    //let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string());
-
+    let mut cli = GreeterClient::new(Connection::new());
+    cli = cli.with_cluster(cluster_invoker);
+    // using loop for loadbalance test
     println!("# unary call");
     let resp = cli
         .greet(Request::new(GreeterRequest {
@@ -67,7 +62,5 @@
     let (_parts, body) = resp.into_parts();
     println!("Response: {:?}", body);
 
-    loop {
-        tokio::time::sleep(Duration::from_millis(100)).await;
-    }
+    tokio::time::sleep(Duration::from_millis(2000)).await;
 }
diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs
index 37de662..ab797de 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -15,47 +15,49 @@
  * limitations under the License.
  */
 
-pub mod protos {
-    #![allow(non_camel_case_types)]
-    include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
-}
+use std::{io::ErrorKind, pin::Pin};
 
+use async_trait::async_trait;
+use futures_util::Stream;
 use futures_util::StreamExt;
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tracing::info;
+
+use dubbo::{codegen::*, Dubbo};
+use dubbo_config::RootConfig;
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
 use protos::{
     greeter_server::{register_server, Greeter},
     GreeterReply, GreeterRequest,
 };
 
-use std::{io::ErrorKind, pin::Pin};
-
-use async_trait::async_trait;
-use futures_util::Stream;
-use tokio::sync::mpsc;
-use tokio_stream::wrappers::ReceiverStream;
-
-use dubbo::{codegen::*, Dubbo};
-use dubbo_config::RootConfig;
+pub mod protos {
+    #![allow(non_camel_case_types)]
+    include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
+}
 
 type ResponseStream =
     Pin<Box<dyn Stream<Item = Result<GreeterReply, dubbo::status::Status>> + Send>>;
 
 #[tokio::main]
 async fn main() {
+    use tracing::{span, Level};
+    let span = span!(Level::DEBUG, "greeter.server");
+    let _enter = span.enter();
     register_server(GreeterServerImpl {
         name: "greeter".to_string(),
     });
-
-    // Dubbo::new().start().await;
-    Dubbo::new()
-        .with_config({
-            let r = RootConfig::new();
-            match r.load() {
-                Ok(config) => config,
-                Err(_err) => panic!("err: {:?}", _err), // response was droped
-            }
-        })
-        .start()
-        .await;
+    let zkr = ZookeeperRegistry::default();
+    let r = RootConfig::new();
+    let r = match r.load() {
+        Ok(config) => config,
+        Err(_err) => panic!("err: {:?}", _err), // response was droped
+    };
+    let mut f = Dubbo::new()
+        .with_config(r)
+        .add_registry("default", Box::new(zkr));
+    f.start().await;
 }
 
 #[allow(dead_code)]
@@ -71,7 +73,7 @@
         &self,
         request: Request<GreeterRequest>,
     ) -> Result<Response<GreeterReply>, dubbo::status::Status> {
-        println!("GreeterServer::greet {:?}", request.metadata);
+        info!("GreeterServer::greet {:?}", request.metadata);
 
         Ok(Response::new(GreeterReply {
             message: "hello, dubbo-rust".to_string(),
diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml
index cf040fa..722462a 100644
--- a/registry-zookeeper/Cargo.toml
+++ b/registry-zookeeper/Cargo.toml
@@ -7,8 +7,9 @@
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-zookeeper = "0.6.1"
+zookeeper = "0.7.0"
 dubbo = {path = "../dubbo/", version = "0.2.0"}
 serde_json = "1.0"
 serde = {version = "1.0.145",features = ["derive"]}
 tracing = "0.1"
+urlencoding = "2.1.2"
\ No newline at end of file
diff --git a/registry-zookeeper/src/lib.rs b/registry-zookeeper/src/lib.rs
index 7481129..ccfce10 100644
--- a/registry-zookeeper/src/lib.rs
+++ b/registry-zookeeper/src/lib.rs
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-
 pub mod zookeeper_registry;
 
 #[cfg(test)]
diff --git a/registry-zookeeper/src/zookeeper_registry.rs b/registry-zookeeper/src/zookeeper_registry.rs
index 6c2ef91..ad96638 100644
--- a/registry-zookeeper/src/zookeeper_registry.rs
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -17,26 +17,32 @@
 
 #![allow(unused_variables, dead_code, missing_docs)]
 
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::env;
+use std::sync::RwLock;
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use serde::{Deserialize, Serialize};
+use tracing::{error, info};
+#[allow(unused_imports)]
+use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+
+use dubbo::cluster::support::cluster_invoker::ClusterInvoker;
+use dubbo::codegen::BoxRegistry;
+use dubbo::common::consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY};
 use dubbo::common::url::Url;
-use dubbo::registry::memory_registry::MemoryNotifyListener;
+use dubbo::registry::integration::ClusterRegistryIntegration;
+use dubbo::registry::memory_registry::{MemoryNotifyListener, MemoryRegistry};
 use dubbo::registry::NotifyListener;
 use dubbo::registry::Registry;
 use dubbo::registry::ServiceEvent;
 use dubbo::StdError;
-use serde::{Deserialize, Serialize};
-use tracing::info;
-use zookeeper::Acl;
-use zookeeper::CreateMode;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::sync::Arc;
-use std::sync::RwLock;
-use std::time::Duration;
-use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
 
 // 从url中获取服务注册的元数据
-/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
-/// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
+// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
 
 pub const REGISTRY_GROUP_KEY: &str = "registry.group";
 
@@ -51,20 +57,8 @@
 pub struct ZookeeperRegistry {
     root_path: String,
     zk_client: Arc<ZooKeeper>,
-
     listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as Registry>::NotifyListener>>>,
-}
-
-pub struct MyNotifyListener {}
-
-impl NotifyListener for MyNotifyListener {
-    fn notify(&self, event: dubbo::registry::ServiceEvent) {
-        todo!()
-    }
-
-    fn notify_all(&self, event: dubbo::registry::ServiceEvent) {
-        todo!()
-    }
+    memory_registry: Arc<Mutex<MemoryRegistry>>,
 }
 
 #[derive(Serialize, Deserialize, Debug)]
@@ -92,11 +86,12 @@
     pub fn new(connect_string: &str) -> ZookeeperRegistry {
         let zk_client =
             ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
+        info!("zk server connect string: {}", connect_string);
         ZookeeperRegistry {
             root_path: "/services".to_string(),
             zk_client: Arc::new(zk_client),
-
             listeners: RwLock::new(HashMap::new()),
+            memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
         }
     }
 
@@ -108,15 +103,16 @@
     ) -> ServiceInstancesChangedListener {
         let mut service_names = HashSet::new();
         service_names.insert(service_name.clone());
-        return ServiceInstancesChangedListener {
+        ServiceInstancesChangedListener {
             zk_client: Arc::clone(&self.zk_client),
-            path: path,
-
+            path,
             service_name: service_name.clone(),
-            listener: listener,
-        };
+            listener,
+        }
     }
 
+    // metadata /dubbo/mapping designed for application discovery; deprecated for currently using interface discovery
+    // #[deprecated]
     fn get_app_name(&self, service_name: String) -> String {
         let res = self
             .zk_client
@@ -129,25 +125,154 @@
         };
         s.to_string()
     }
+
+    pub fn get_client(&self) -> Arc<ZooKeeper> {
+        self.zk_client.clone()
+    }
+
+    // If the parent node does not exist in the ZooKeeper, Err(ZkError::NoNode) will be returned.
+    pub fn create_path(
+        &self,
+        path: &str,
+        data: &str,
+        create_mode: CreateMode,
+    ) -> Result<(), StdError> {
+        if self.exists_path(path) {
+            self.zk_client
+                .set_data(path, data.as_bytes().to_vec(), None)
+                .unwrap_or_else(|_| panic!("set data to {} failed.", path));
+            return Ok(());
+        }
+        let zk_result = self.zk_client.create(
+            path,
+            data.as_bytes().to_vec(),
+            Acl::open_unsafe().clone(),
+            create_mode,
+        );
+        match zk_result {
+            Ok(_) => Ok(()),
+            Err(err) => {
+                error!("zk path {} parent not exists.", path);
+                Err(Box::try_from(err).unwrap())
+            }
+        }
+    }
+
+    // For avoiding Err(ZkError::NoNode) when parent node is't exists
+    pub fn create_path_with_parent_check(
+        &self,
+        path: &str,
+        data: &str,
+        create_mode: CreateMode,
+    ) -> Result<(), StdError> {
+        let nodes: Vec<&str> = path.split('/').collect();
+        let mut current: String = String::new();
+        let children = *nodes.last().unwrap();
+        for node_key in nodes {
+            if node_key.is_empty() {
+                continue;
+            };
+            current.push('/');
+            current.push_str(node_key);
+            if !self.exists_path(current.as_str()) {
+                let new_create_mode = match children == node_key {
+                    true => create_mode,
+                    false => CreateMode::Persistent,
+                };
+                let new_data = match children == node_key {
+                    true => data,
+                    false => "",
+                };
+                self.create_path(current.as_str(), new_data, new_create_mode)
+                    .unwrap();
+            }
+        }
+        Ok(())
+    }
+
+    pub fn delete_path(&self, path: &str) {
+        if self.exists_path(path) {
+            self.get_client().delete(path, None).unwrap()
+        }
+    }
+
+    pub fn exists_path(&self, path: &str) -> bool {
+        self.zk_client.exists(path, false).unwrap().is_some()
+    }
+
+    pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
+        if self.exists_path(path) {
+            let zk_result = self.zk_client.get_data(path, watch);
+            if let Ok(..) = zk_result {
+                Some(String::from_utf8(zk_result.unwrap().0).unwrap())
+            } else {
+                None
+            }
+        } else {
+            None
+        }
+    }
+}
+
+impl Default for ZookeeperRegistry {
+    fn default() -> ZookeeperRegistry {
+        let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+            Ok(val) => val,
+            Err(_) => {
+                let default_connect_string = "localhost:2181";
+                info!(
+                    "No ZOOKEEPER_SERVERS env value, using {} as default.",
+                    default_connect_string
+                );
+                default_connect_string.to_string()
+            }
+        };
+        info!(
+            "using external registry with it's connect string {}",
+            zk_connect_string.as_str()
+        );
+        ZookeeperRegistry::new(zk_connect_string.as_str())
+    }
 }
 
 impl Registry for ZookeeperRegistry {
     type NotifyListener = MemoryNotifyListener;
 
     fn register(&mut self, url: Url) -> Result<(), StdError> {
-        todo!();
+        let zk_path = format!(
+            "/{}/{}/{}/{}",
+            DUBBO_KEY,
+            url.service_name,
+            PROVIDERS_KEY,
+            url.encoded_raw_url_string()
+        );
+        self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?;
+        Ok(())
     }
 
     fn unregister(&mut self, url: Url) -> Result<(), StdError> {
-        todo!();
+        let zk_path = format!(
+            "/{}/{}/{}/{}",
+            DUBBO_KEY,
+            url.service_name,
+            PROVIDERS_KEY,
+            url.encoded_raw_url_string()
+        );
+        self.delete_path(zk_path.as_str());
+        Ok(())
     }
 
+    // for consumer to find the changes of providers
     fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
-        let binding = url.get_service_name();
-        let service_name = binding.get(0).unwrap();
-        let app_name = self.get_app_name(service_name.clone());
-        let path = self.root_path.clone() + "/" + &app_name;
-        if self.listeners.read().unwrap().get(service_name).is_some() {
+        let service_name = url.get_service_name();
+        let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
+        if self
+            .listeners
+            .read()
+            .unwrap()
+            .get(service_name.as_str())
+            .is_some()
+        {
             return Ok(());
         }
 
@@ -158,37 +283,32 @@
             .insert(service_name.to_string(), Arc::clone(&arc_listener));
 
         let zk_listener = self.create_listener(
-            path.clone(),
+            zk_path.clone(),
             service_name.to_string(),
             Arc::clone(&arc_listener),
         );
 
-        let res = self.zk_client.get_children_w(&path, zk_listener);
-        let result: Vec<Url> = res
-            .unwrap()
-            .iter()
-            .map(|node_key| {
-                let zk_res = self.zk_client.get_data(
-                    &(self.root_path.clone() + "/" + &app_name + "/" + &node_key),
-                    false,
-                );
-                let vec_u8 = zk_res.unwrap().0;
-                let sstr = std::str::from_utf8(&vec_u8).unwrap();
-                let instance: ZkServiceInstance = serde_json::from_str(sstr).unwrap();
-                let url = Url::from_url(&format!(
-                    "triple://{}:{}/{}",
-                    instance.get_host(),
-                    instance.get_port(),
-                    service_name
-                ))
-                .unwrap();
-                url
-            })
-            .collect();
-
-        info!("notifing {}->{:?}", service_name, result);
+        let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
+        let result = match zk_changed_paths {
+            Err(err) => {
+                error!("zk subscribe error: {}", err);
+                Vec::new()
+            }
+            Ok(urls) => urls
+                .iter()
+                .map(|node_key| {
+                    let provider_url: Url = urlencoding::decode(node_key)
+                        .unwrap()
+                        .to_string()
+                        .as_str()
+                        .into();
+                    provider_url
+                })
+                .collect(),
+        };
+        info!("notifying {}->{:?}", service_name, result);
         arc_listener.notify(ServiceEvent {
-            key: service_name.to_string(),
+            key: service_name,
             action: String::from("ADD"),
             service: result,
         });
@@ -203,7 +323,6 @@
 pub struct ServiceInstancesChangedListener {
     zk_client: Arc<ZooKeeper>,
     path: String,
-
     service_name: String,
     listener: Arc<MemoryNotifyListener>,
 }
@@ -215,40 +334,26 @@
             let event_path = path.clone();
             let dirs = self
                 .zk_client
-                .get_children(&event_path.clone(), false)
+                .get_children(&event_path, false)
                 .expect("msg");
             let result: Vec<Url> = dirs
                 .iter()
                 .map(|node_key| {
-                    let zk_res = self
-                        .zk_client
-                        .get_data(&(event_path.clone() + "/" + node_key), false);
-                    let vec_u8 = zk_res.unwrap().0;
-                    let sstr = std::str::from_utf8(&vec_u8).unwrap();
-                    let instance: ZkServiceInstance = serde_json::from_str(sstr).unwrap();
-                    let url = Url::from_url(&format!(
-                        "triple://{}:{}/{}",
-                        instance.get_host(),
-                        instance.get_port(),
-                        self.service_name
-                    ))
-                    .unwrap();
-                    url
+                    let provider_url: Url = node_key.as_str().into();
+                    provider_url
                 })
                 .collect();
-
             let res = self.zk_client.get_children_w(
                 &path,
                 ServiceInstancesChangedListener {
                     zk_client: Arc::clone(&self.zk_client),
                     path: path.clone(),
-
                     service_name: self.service_name.clone(),
                     listener: Arc::clone(&self.listener),
                 },
             );
 
-            info!("notifing {}->{:?}", self.service_name, result);
+            info!("notify {}->{:?}", self.service_name, result);
             self.listener.notify(ServiceEvent {
                 key: self.service_name.clone(),
                 action: String::from("ADD"),
@@ -260,48 +365,102 @@
 
 impl NotifyListener for ServiceInstancesChangedListener {
     fn notify(&self, event: ServiceEvent) {
-        todo!()
+        self.listener.notify(event);
     }
 
     fn notify_all(&self, event: ServiceEvent) {
+        self.listener.notify(event);
+    }
+}
+
+impl ClusterRegistryIntegration for ZookeeperRegistry {
+    fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
         todo!()
     }
 }
 
-#[test]
-fn it_works() {
-    let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181";
-    let zk_client =
-        ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap();
-    let watcher = Arc::new(Some(TestZkWatcher {
-        watcher: Arc::new(None),
-    }));
-    watcher.as_ref().expect("").watcher = Arc::clone(&watcher);
-    let x = watcher.as_ref().expect("");
-    zk_client.get_children_w("/test", x);
-    zk_client.delete("/test/a", None);
-    zk_client.delete("/test/b", None);
-    let zk_res = zk_client.create(
-        "/test/a",
-        vec![1, 3],
-        Acl::open_unsafe().clone(),
-        CreateMode::Ephemeral,
-    );
-    let zk_res = zk_client.create(
-        "/test/b",
-        vec![1, 3],
-        Acl::open_unsafe().clone(),
-        CreateMode::Ephemeral,
-    );
-    zk_client.close();
-}
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
 
-struct TestZkWatcher {
-    pub watcher: Arc<Option<TestZkWatcher>>,
-}
+    use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
 
-impl Watcher for TestZkWatcher {
-    fn handle(&self, event: WatchedEvent) {
-        println!("event: {:?}", event);
+    use crate::zookeeper_registry::ZookeeperRegistry;
+
+    struct TestZkWatcher {
+        pub watcher: Arc<Option<TestZkWatcher>>,
+    }
+
+    impl Watcher for TestZkWatcher {
+        fn handle(&self, event: WatchedEvent) {
+            println!("event: {:?}", event);
+        }
+    }
+
+    #[test]
+    fn zk_read_write_watcher() {
+        // https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
+        // using ENV to set zookeeper server urls
+        let zkr = ZookeeperRegistry::default();
+        let zk_client = zkr.get_client();
+        let watcher = TestZkWatcher {
+            watcher: Arc::new(None),
+        };
+        if zk_client.exists("/test", true).is_err() {
+            zk_client
+                .create(
+                    "/test",
+                    vec![1, 3],
+                    Acl::open_unsafe().clone(),
+                    CreateMode::Ephemeral,
+                )
+                .unwrap();
+        }
+        let zk_res = zk_client.create(
+            "/test",
+            "hello".into(),
+            Acl::open_unsafe().clone(),
+            CreateMode::Ephemeral,
+        );
+        let result = zk_client.get_children_w("/test", watcher);
+        assert!(result.is_ok());
+        if zk_client.exists("/test/a", true).is_err() {
+            zk_client.delete("/test/a", None).unwrap();
+        }
+        if zk_client.exists("/test/a", true).is_err() {
+            zk_client.delete("/test/b", None).unwrap();
+        }
+        let zk_res = zk_client.create(
+            "/test/a",
+            "hello".into(),
+            Acl::open_unsafe().clone(),
+            CreateMode::Ephemeral,
+        );
+        let zk_res = zk_client.create(
+            "/test/b",
+            "world".into(),
+            Acl::open_unsafe().clone(),
+            CreateMode::Ephemeral,
+        );
+        let test_a_result = zk_client.get_data("/test", true);
+        assert!(test_a_result.is_ok());
+        let vec1 = test_a_result.unwrap().0;
+        // data in /test should equals to "hello"
+        assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
+        zk_client.close().unwrap()
+    }
+
+    #[test]
+    fn create_path_with_parent_check() {
+        let zkr = ZookeeperRegistry::default();
+        let path = "/du1bbo/test11111";
+        let data = "hello";
+        // creating a child on a not exists parent, throw a NoNode error.
+        // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
+        // assert!(result.is_err());
+        let create_with_parent_check_result =
+            zkr.create_path_with_parent_check(path, data, CreateMode::Ephemeral);
+        assert!(create_with_parent_check_result.is_ok());
+        assert_eq!(data, zkr.get_data(path, false).unwrap());
     }
 }
diff --git a/scripts/ci-check.sh b/scripts/ci-check.sh
new file mode 100755
index 0000000..50d25f1
--- /dev/null
+++ b/scripts/ci-check.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+cargo fmt --all -- --check
+# use stable channel
+cargo check
+target/debug/greeter-server && target/debug/greeter-client && sleep 3s ;