feat: replace kube-apiserver watch with informer (#10543)

fixes #10535
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 375e0a5..4f38e14 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -159,7 +159,7 @@
         <eureka.version>1.9.12</eureka.version>
 
         <!-- Fabric8 for Kubernetes -->
-        <fabric8_kubernetes_version>5.3.2</fabric8_kubernetes_version>
+        <fabric8_kubernetes_version>6.1.1</fabric8_kubernetes_version>
 
         <!-- Alibaba -->
         <alibaba_spring_context_support_version>1.0.8</alibaba_spring_context_support_version>
diff --git a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
index 1a0c1fa..300dae3 100644
--- a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
+++ b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
@@ -21,8 +21,7 @@
 import org.apache.dubbo.rpc.cluster.router.mesh.route.MeshAppRuleListener;
 import org.apache.dubbo.rpc.cluster.router.mesh.route.MeshEnvListener;
 
-import com.google.gson.Gson;
-import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
+import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.Watch;
 import io.fabric8.kubernetes.client.Watcher;
@@ -30,7 +29,6 @@
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.SafeConstructor;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -81,24 +79,27 @@
 
         try {
             Watch watch = kubernetesClient
-                .customResource(
-                    MeshConstant.getVsDefinition())
-                .watch(namespace, appName, null, new ListOptionsBuilder().build(), new Watcher<String>() {
-                    @Override
-                    public void eventReceived(Action action, String resource) {
-                        logger.info("Received VS Rule notification. AppName: " + appName + " Action:" + action + " Resource:" + resource);
-
-                        if (action == Action.ADDED || action == Action.MODIFIED) {
-                            Map drRuleMap = new Gson().fromJson(resource, Map.class);
-                            String vsRule = new Yaml(new SafeConstructor()).dump(drRuleMap);
-                            vsAppCache.put(appName, vsRule);
-                            if (drAppCache.containsKey(appName)) {
-                                notifyListener(vsRule, appName, drAppCache.get(appName));
+                    .genericKubernetesResources(
+                            MeshConstant.getVsDefinition())
+                    .inNamespace(namespace)
+                    .withName(appName)
+                    .watch(new Watcher<GenericKubernetesResource>() {
+                        @Override
+                        public void eventReceived(Action action, GenericKubernetesResource resource) {
+                            if (logger.isInfoEnabled()) {
+                                logger.info("Received VS Rule notification. AppName: " + appName + " Action:" + action + " Resource:" + resource);
                             }
-                        } else {
-                            appRuleListenerMap.get(appName).receiveConfigInfo("");
+
+                            if (action == Action.ADDED || action == Action.MODIFIED) {
+                                String vsRule = new Yaml(new SafeConstructor()).dump(resource);
+                                vsAppCache.put(appName, vsRule);
+                                if (drAppCache.containsKey(appName)) {
+                                    notifyListener(vsRule, appName, drAppCache.get(appName));
+                                }
+                            } else {
+                                appRuleListenerMap.get(appName).receiveConfigInfo("");
+                            }
                         }
-                    }
 
                     @Override
                     public void onClose(WatcherException cause) {
@@ -107,15 +108,17 @@
                 });
             vsAppWatch.put(appName, watch);
             try {
-                Map<String, Object> vsRule = kubernetesClient
-                    .customResource(
-                        MeshConstant.getVsDefinition())
-                    .get(namespace, appName);
+                GenericKubernetesResource vsRule = kubernetesClient
+                        .genericKubernetesResources(
+                                MeshConstant.getVsDefinition())
+                        .inNamespace(namespace)
+                        .withName(appName)
+                        .get();
                 vsAppCache.put(appName, new Yaml(new SafeConstructor()).dump(vsRule));
             } catch (Throwable ignore) {
 
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("Error occurred when listen kubernetes crd.", e);
         }
     }
@@ -134,42 +137,47 @@
 
         try {
             Watch watch = kubernetesClient
-                .customResource(
-                    MeshConstant.getDrDefinition())
-                .watch(namespace, appName, null, new ListOptionsBuilder().build(), new Watcher<String>() {
-                    @Override
-                    public void eventReceived(Action action, String resource) {
-                        logger.info("Received VS Rule notification. AppName: " + appName + " Action:" + action + " Resource:" + resource);
-
-                        if (action == Action.ADDED || action == Action.MODIFIED) {
-                            Map drRuleMap = new Gson().fromJson(resource, Map.class);
-                            String drRule = new Yaml(new SafeConstructor()).dump(drRuleMap);
-
-                            drAppCache.put(appName, drRule);
-                            if (vsAppCache.containsKey(appName)) {
-                                notifyListener(vsAppCache.get(appName), appName, drRule);
+                    .genericKubernetesResources(
+                            MeshConstant.getDrDefinition())
+                    .inNamespace(namespace)
+                    .withName(appName)
+                    .watch(new Watcher<GenericKubernetesResource>() {
+                        @Override
+                        public void eventReceived(Action action, GenericKubernetesResource resource) {
+                            if (logger.isInfoEnabled()) {
+                                logger.info("Received VS Rule notification. AppName: " + appName + " Action:" + action + " Resource:" + resource);
                             }
-                        } else {
-                            appRuleListenerMap.get(appName).receiveConfigInfo("");
-                        }
-                    }
 
-                    @Override
-                    public void onClose(WatcherException cause) {
-                        // ignore
-                    }
-                });
+                            if (action == Action.ADDED || action == Action.MODIFIED) {
+                                String drRule = new Yaml(new SafeConstructor()).dump(resource);
+
+                                drAppCache.put(appName, drRule);
+                                if (vsAppCache.containsKey(appName)) {
+                                    notifyListener(vsAppCache.get(appName), appName, drRule);
+                                }
+                            } else {
+                                appRuleListenerMap.get(appName).receiveConfigInfo("");
+                            }
+                        }
+
+                        @Override
+                        public void onClose(WatcherException cause) {
+                            // ignore
+                        }
+                    });
             drAppWatch.put(appName, watch);
             try {
-                Map<String, Object> drRule = kubernetesClient
-                    .customResource(
-                        MeshConstant.getDrDefinition())
-                    .get(namespace, appName);
+                GenericKubernetesResource drRule = kubernetesClient
+                        .genericKubernetesResources(
+                                MeshConstant.getDrDefinition())
+                        .inNamespace(namespace)
+                        .withName(appName)
+                        .get();
                 drAppCache.put(appName, new Yaml(new SafeConstructor()).dump(drRule));
             } catch (Throwable ignore) {
 
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("Error occurred when listen kubernetes crd.", e);
         }
     }
diff --git a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
index e0f47f1..087de56 100644
--- a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
+++ b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
@@ -39,11 +39,10 @@
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.Watch;
-import io.fabric8.kubernetes.client.Watcher;
-import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -69,18 +68,18 @@
 
     public final static String KUBERNETES_PROPERTIES_KEY = "io.dubbo/metadata";
 
-    private final static ConcurrentHashMap<String, Watch> SERVICE_WATCHER = new ConcurrentHashMap<>(64);
-
-    private final static ConcurrentHashMap<String, Watch> PODS_WATCHER = new ConcurrentHashMap<>(64);
-
-    private final static ConcurrentHashMap<String, Watch> ENDPOINTS_WATCHER = new ConcurrentHashMap<>(64);
-
     private final static ConcurrentHashMap<String, AtomicLong> SERVICE_UPDATE_TIME = new ConcurrentHashMap<>(64);
 
+    private final static ConcurrentHashMap<String, SharedIndexInformer<Service>> SERVICE_INFORMER = new ConcurrentHashMap<>(64);
+
+    private final static ConcurrentHashMap<String, SharedIndexInformer<Pod>> PODS_INFORMER = new ConcurrentHashMap<>(64);
+
+    private final static ConcurrentHashMap<String, SharedIndexInformer<Endpoints>> ENDPOINTS_INFORMER = new ConcurrentHashMap<>(64);
+
     public KubernetesServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
         super(applicationModel, registryURL);
         Config config = KubernetesConfigUtils.createKubernetesConfig(registryURL);
-        this.kubernetesClient = new DefaultKubernetesClient(config);
+        this.kubernetesClient = new KubernetesClientBuilder().withConfig(config).build();
         this.currentHostname = System.getenv("HOSTNAME");
         this.registryURL = registryURL;
         this.namespace = config.getNamespace();
@@ -94,9 +93,9 @@
         }
         if (!availableAccess) {
             String message = "Unable to access api server. " +
-                "Please check your url config." +
-                " Master URL: " + config.getMasterUrl() +
-                " Hostname: " + currentHostname;
+                    "Please check your url config." +
+                    " Master URL: " + config.getMasterUrl() +
+                    " Hostname: " + currentHostname;
             logger.error(message);
         } else {
             KubernetesMeshEnvListener.injectKubernetesEnv(kubernetesClient, namespace);
@@ -104,15 +103,15 @@
     }
 
     @Override
-    public void doDestroy() throws Exception {
-        SERVICE_WATCHER.forEach((k, v) -> v.close());
-        SERVICE_WATCHER.clear();
+    public void doDestroy() {
+        SERVICE_INFORMER.forEach((k, v) -> v.close());
+        SERVICE_INFORMER.clear();
 
-        PODS_WATCHER.forEach((k, v) -> v.close());
-        PODS_WATCHER.clear();
+        PODS_INFORMER.forEach((k, v) -> v.close());
+        PODS_INFORMER.clear();
 
-        ENDPOINTS_WATCHER.forEach((k, v) -> v.close());
-        ENDPOINTS_WATCHER.clear();
+        ENDPOINTS_INFORMER.forEach((k, v) -> v.close());
+        ENDPOINTS_INFORMER.clear();
 
         kubernetesClient.close();
     }
@@ -121,18 +120,18 @@
     public void doRegister(ServiceInstance serviceInstance) throws RuntimeException {
         if (enableRegister) {
             kubernetesClient
-                .pods()
-                .inNamespace(namespace)
-                .withName(currentHostname)
-                .edit(pod ->
-                    new PodBuilder(pod)
-                        .editOrNewMetadata()
-                        .addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata()))
-                        .endMetadata()
-                        .build());
+                    .pods()
+                    .inNamespace(namespace)
+                    .withName(currentHostname)
+                    .edit(pod ->
+                            new PodBuilder(pod)
+                                    .editOrNewMetadata()
+                                    .addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata()))
+                                    .endMetadata()
+                                    .build());
             if (logger.isInfoEnabled()) {
                 logger.info("Write Current Service Instance Metadata to Kubernetes pod. " +
-                    "Current pod name: " + currentHostname);
+                        "Current pod name: " + currentHostname);
             }
         }
     }
@@ -150,15 +149,15 @@
     public void doUnregister(ServiceInstance serviceInstance) throws RuntimeException {
         if (enableRegister) {
             kubernetesClient
-                .pods()
-                .inNamespace(namespace)
-                .withName(currentHostname)
-                .edit(pod ->
-                    new PodBuilder(pod)
-                        .editOrNewMetadata()
-                        .removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
-                        .endMetadata()
-                        .build());
+                    .pods()
+                    .inNamespace(namespace)
+                    .withName(currentHostname)
+                    .edit(pod ->
+                            new PodBuilder(pod)
+                                    .editOrNewMetadata()
+                                    .removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
+                                    .endMetadata()
+                                    .build());
             if (logger.isInfoEnabled()) {
                 logger.info("Remove Current Service Instance from Kubernetes pod. Current pod name: " + currentHostname);
             }
@@ -168,23 +167,33 @@
     @Override
     public Set<String> getServices() {
         return kubernetesClient
-            .services()
-            .inNamespace(namespace)
-            .list()
-            .getItems()
-            .stream()
-            .map(service -> service.getMetadata().getName())
-            .collect(Collectors.toSet());
+                .services()
+                .inNamespace(namespace)
+                .list()
+                .getItems()
+                .stream()
+                .map(service -> service.getMetadata().getName())
+                .collect(Collectors.toSet());
     }
 
     @Override
     public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
-        Endpoints endpoints =
-            kubernetesClient
-                .endpoints()
-                .inNamespace(namespace)
-                .withName(serviceName)
-                .get();
+        Endpoints endpoints = null;
+        SharedIndexInformer<Endpoints> endInformer = ENDPOINTS_INFORMER.get(serviceName);
+        if (endInformer != null) {
+            // get endpoints directly from informer local store
+            List<Endpoints> endpointsList = endInformer.getStore().list();
+            if (endpointsList.size() > 0) {
+                endpoints = endpointsList.get(0);
+            }
+        }
+        if (endpoints == null) {
+            endpoints = kubernetesClient
+                    .endpoints()
+                    .inNamespace(namespace)
+                    .withName(serviceName)
+                    .get();
+        }
 
         return toServiceInstance(endpoints, serviceName);
     }
@@ -206,28 +215,40 @@
     }
 
     private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
-        Watch watch = kubernetesClient
-            .endpoints()
-            .inNamespace(namespace)
-            .withName(serviceName)
-            .watch(new Watcher<Endpoints>() {
-                @Override
-                public void eventReceived(Action action, Endpoints resource) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Received Endpoint Event. Event type: " + action.name() +
-                            ". Current pod name: " + currentHostname);
+        SharedIndexInformer<Endpoints> endInformer = kubernetesClient
+                .endpoints()
+                .inNamespace(namespace)
+                .withName(serviceName)
+                .inform(new ResourceEventHandler<Endpoints>() {
+                    @Override
+                    public void onAdd(Endpoints endpoints) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Received Endpoint Event. Event type: added. Current pod name: " + currentHostname +
+                                    ". Endpoints is: " + endpoints);
+                        }
+                        notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
                     }
 
-                    notifyServiceChanged(serviceName, listener);
-                }
+                    @Override
+                    public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Received Endpoint Event. Event type: updated. Current pod name: " + currentHostname +
+                                    ". The new Endpoints is: " + newEndpoints);
+                        }
+                        notifyServiceChanged(serviceName, listener, toServiceInstance(newEndpoints, serviceName));
+                    }
 
-                @Override
-                public void onClose(WatcherException cause) {
-                    // ignore
-                }
-            });
+                    @Override
+                    public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Received Endpoint Event. Event type: deleted. Current pod name: " + currentHostname +
+                                    ". Endpoints is: " + endpoints);
+                        }
+                        notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
+                    }
+                });
 
-        ENDPOINTS_WATCHER.put(serviceName, watch);
+        ENDPOINTS_INFORMER.put(serviceName, endInformer);
     }
 
     private void watchPods(ServiceInstancesChangedListener listener, String serviceName) {
@@ -236,68 +257,88 @@
             return;
         }
 
-        Watch watch = kubernetesClient
-            .pods()
-            .inNamespace(namespace)
-            .withLabels(serviceSelector)
-            .watch(new Watcher<Pod>() {
-                @Override
-                public void eventReceived(Action action, Pod resource) {
-                    if (Action.MODIFIED.equals(action)) {
+        SharedIndexInformer<Pod> podInformer = kubernetesClient
+                .pods()
+                .inNamespace(namespace)
+                .withLabels(serviceSelector)
+                .inform(new ResourceEventHandler<Pod>() {
+                    @Override
+                    public void onAdd(Pod pod) {
                         if (logger.isDebugEnabled()) {
-                            logger.debug("Received Pods Update Event. Current pod name: " + currentHostname);
+                            logger.debug("Received Pods Event. Event type: added. Current pod name: " + currentHostname +
+                                    ". Pod is: " + pod);
+                        }
+                    }
+
+                    @Override
+                    public void onUpdate(Pod oldPod, Pod newPod) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Received Pods Event. Event type: updated. Current pod name: " + currentHostname +
+                                    ". new Pod is: " + newPod);
                         }
 
-                        notifyServiceChanged(serviceName, listener);
+                        notifyServiceChanged(serviceName, listener, getInstances(serviceName));
                     }
-                }
 
-                @Override
-                public void onClose(WatcherException cause) {
-                    // ignore
-                }
-            });
+                    @Override
+                    public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Received Pods Event. Event type: deleted. Current pod name: " + currentHostname +
+                                    ". Pod is: " + pod);
+                        }
+                    }
+                });
 
-        PODS_WATCHER.put(serviceName, watch);
+        PODS_INFORMER.put(serviceName, podInformer);
     }
 
     private void watchService(ServiceInstancesChangedListener listener, String serviceName) {
-        Watch watch = kubernetesClient
-            .services()
-            .inNamespace(namespace)
-            .withName(serviceName)
-            .watch(new Watcher<Service>() {
-                @Override
-                public void eventReceived(Action action, Service resource) {
-                    if (Action.MODIFIED.equals(action)) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Received Service Update Event. Update Pods Watcher. " +
-                                "Current pod name: " + currentHostname);
+        SharedIndexInformer<Service> serviceInformer = kubernetesClient
+                .services()
+                .inNamespace(namespace)
+                .withName(serviceName)
+                .inform(
+                        new ResourceEventHandler<Service>() {
+                            @Override
+                            public void onAdd(Service service) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("Received Service Added Event. " +
+                                            "Current pod name: " + currentHostname);
+                                }
+                            }
+
+                            @Override
+                            public void onUpdate(Service oldService, Service newService) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("Received Service Update Event. Update Pods Watcher. Current pod name: " + currentHostname +
+                                            ". The new Service is: " + newService);
+                                }
+                                if (PODS_INFORMER.containsKey(serviceName)) {
+                                    PODS_INFORMER.get(serviceName).close();
+                                    PODS_INFORMER.remove(serviceName);
+                                }
+                                watchPods(listener, serviceName);
+                            }
+
+                            @Override
+                            public void onDelete(Service service, boolean deletedFinalStateUnknown) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("Received Service Delete Event. " +
+                                            "Current pod name: " + currentHostname);
+                                }
+                            }
                         }
+                );
 
-                        if (PODS_WATCHER.containsKey(serviceName)) {
-                            PODS_WATCHER.get(serviceName).close();
-                            PODS_WATCHER.remove(serviceName);
-                        }
-                        watchPods(listener, serviceName);
-                    }
-                }
-
-                @Override
-                public void onClose(WatcherException cause) {
-                    // ignore
-                }
-            });
-
-        SERVICE_WATCHER.put(serviceName, watch);
+        SERVICE_INFORMER.put(serviceName, serviceInformer);
     }
 
-    private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {
+    private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> serviceInstanceList) {
         long receivedTime = System.nanoTime();
 
         ServiceInstancesChangedEvent event;
 
-        event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
+        event = new ServiceInstancesChangedEvent(serviceName, serviceInstanceList);
 
         AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
         long lastUpdateTime = updateTime.get();
@@ -311,9 +352,9 @@
 
         if (logger.isInfoEnabled()) {
             logger.info("Discard Service Instance Data. " +
-                "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
-                "Current Data received time: " + receivedTime + ". " +
-                "Newer Data received time: " + lastUpdateTime + ".");
+                    "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
+                    "Current Data received time: " + receivedTime + ". " +
+                    "Newer Data received time: " + lastUpdateTime + ".");
         }
     }
 
@@ -336,25 +377,25 @@
             return new LinkedList<>();
         }
         Map<String, Pod> pods = kubernetesClient
-            .pods()
-            .inNamespace(namespace)
-            .withLabels(serviceSelector)
-            .list()
-            .getItems()
-            .stream()
-            .collect(
-                Collectors.toMap(
-                    pod -> pod.getMetadata().getName(),
-                    pod -> pod));
+                .pods()
+                .inNamespace(namespace)
+                .withLabels(serviceSelector)
+                .list()
+                .getItems()
+                .stream()
+                .collect(
+                        Collectors.toMap(
+                                pod -> pod.getMetadata().getName(),
+                                pod -> pod));
 
         List<ServiceInstance> instances = new LinkedList<>();
         Set<Integer> instancePorts = new HashSet<>();
 
         for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
             instancePorts.addAll(
-                endpointSubset.getPorts()
-                    .stream().map(EndpointPort::getPort)
-                    .collect(Collectors.toSet()));
+                    endpointSubset.getPorts()
+                            .stream().map(EndpointPort::getPort)
+                            .collect(Collectors.toSet()));
         }
 
         for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
@@ -363,10 +404,9 @@
                 String ip = address.getIp();
                 if (pod == null) {
                     logger.warn("Unable to match Kubernetes Endpoint address with Pod. " +
-                        "EndpointAddress Hostname: " + address.getTargetRef().getName());
+                            "EndpointAddress Hostname: " + address.getTargetRef().getName());
                     continue;
                 }
-
                 instancePorts.forEach(port -> {
                     ServiceInstance serviceInstance = new DefaultServiceInstance(serviceName, ip, port, ScopeModelUtil.getApplicationModel(getUrl().getScopeModel()));
 
@@ -376,8 +416,8 @@
                         instances.add(serviceInstance);
                     } else {
                         logger.warn("Unable to find Service Instance metadata in Pod Annotations. " +
-                            "Possibly cause: provider has not been initialized successfully. " +
-                            "EndpointAddress Hostname: " + address.getTargetRef().getName());
+                                "Possibly cause: provider has not been initialized successfully. " +
+                                "EndpointAddress Hostname: " + address.getTargetRef().getName());
                     }
                 });
             }
diff --git a/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java b/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
index 6b1a1b0..28aa002 100644
--- a/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
+++ b/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
@@ -14,185 +14,220 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-//package org.apache.dubbo.registry.kubernetes;
-//
-//import org.apache.dubbo.common.URL;
-//import org.apache.dubbo.registry.client.DefaultServiceInstance;
-//import org.apache.dubbo.registry.client.ServiceInstance;
-//import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
-//import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-//import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
-//import org.apache.dubbo.rpc.model.ApplicationModel;
-//import org.apache.dubbo.rpc.model.ScopeModelUtil;
-//
-//import io.fabric8.kubernetes.api.model.Endpoints;
-//import io.fabric8.kubernetes.api.model.EndpointsBuilder;
-//import io.fabric8.kubernetes.api.model.Pod;
-//import io.fabric8.kubernetes.api.model.PodBuilder;
-//import io.fabric8.kubernetes.api.model.Service;
-//import io.fabric8.kubernetes.api.model.ServiceBuilder;
-//import io.fabric8.kubernetes.client.Config;
-//import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
-//import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
-//import org.junit.jupiter.api.AfterEach;
-//import org.junit.jupiter.api.Assertions;
-//import org.junit.jupiter.api.BeforeEach;
-//import org.junit.jupiter.api.Test;
-//import org.junit.jupiter.api.extension.ExtendWith;
-//import org.mockito.ArgumentCaptor;
-//import org.mockito.Mockito;
-//import org.mockito.junit.jupiter.MockitoExtension;
-//
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Map;
-//
-//@ExtendWith({MockitoExtension.class})
-//public class KubernetesServiceDiscoveryTest {
-//    public KubernetesServer mockServer = new KubernetesServer(false, true);
-//
-//    private NamespacedKubernetesClient mockClient;
-//
-//    private ServiceInstancesChangedListener mockListener = Mockito.mock(ServiceInstancesChangedListener.class);
-//
-//    private URL serverUrl;
-//
-//    private Map<String, String> selector;
-//
-//    @BeforeEach
-//    public void setUp() {
-//        mockServer.before();
-//        mockClient = mockServer.getClient();
-//
-//        serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
-//            .setProtocol("kubernetes")
-//            .addParameter(KubernetesClientConst.USE_HTTPS, "false")
-//            .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
-//        serverUrl.setScopeModel(ApplicationModel.defaultModel());
-//
-//        System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
-//        System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false");
-//
-//        selector = new HashMap<>(4);
-//        selector.put("l", "v");
-//        Pod pod = new PodBuilder()
-//            .withNewMetadata().withName("TestServer").withLabels(selector).endMetadata()
-//            .build();
-//
-//        Service service = new ServiceBuilder()
-//            .withNewMetadata().withName("TestService").endMetadata()
-//            .withNewSpec().withSelector(selector).endSpec().build();
-//
-//        Endpoints endPoints = new EndpointsBuilder()
-//            .withNewMetadata().withName("TestService").endMetadata()
-//            .addNewSubset()
-//            .addNewAddress().withIp("ip1")
-//            .withNewTargetRef().withUid("uid1").withName("TestServer").endTargetRef().endAddress()
-//            .addNewPort("Test", "Test", 12345, "TCP").endSubset()
-//            .build();
-//
-//        mockClient.pods().create(pod);
-//        mockClient.services().create(service);
-//        mockClient.endpoints().create(endPoints);
-//    }
-//
-//    @AfterEach
-//    public void destroy() {
-//        mockServer.after();
-//    }
-//
-//    @Test
-//    public void testEndpointsUpdate() throws Exception {
-//
-//        KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
-//        serviceDiscovery.initialize(serverUrl);
-//
-//        serviceDiscovery.setCurrentHostname("TestServer");
-//        serviceDiscovery.setKubernetesClient(mockClient);
-//
-//        ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-//        serviceDiscovery.register(serviceInstance);
-//
-//        HashSet<String> serviceList = new HashSet<>(4);
-//        serviceList.add("TestService");
-//        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
-//        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
-//
-//        serviceDiscovery.addServiceInstancesChangedListener(mockListener);
-//        mockClient.endpoints().withName("TestService")
-//            .edit(endpoints ->
-//                new EndpointsBuilder(endpoints)
-//                    .editFirstSubset()
-//                    .addNewAddress()
-//                    .withIp("ip2")
-//                    .withNewTargetRef().withUid("uid2").withName("TestServer").endTargetRef()
-//                    .endAddress().endSubset()
-//                    .build());
-//
-//        Thread.sleep(5000);
-//        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
-//            ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
-//        Mockito.verify(mockListener, Mockito.times(2)).onEvent(eventArgumentCaptor.capture());
-//        Assertions.assertEquals(2, eventArgumentCaptor.getValue().getServiceInstances().size());
-//
-//        serviceDiscovery.unregister(serviceInstance);
-//
-//        serviceDiscovery.destroy();
-//    }
-//
-//    @Test
-//    public void testPodsUpdate() throws Exception {
-//
-//        KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
-//        serviceDiscovery.initialize(serverUrl);
-//
-//        serviceDiscovery.setCurrentHostname("TestServer");
-//        serviceDiscovery.setKubernetesClient(mockClient);
-//
-//        ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-//        serviceDiscovery.register(serviceInstance);
-//
-//        HashSet<String> serviceList = new HashSet<>(4);
-//        serviceList.add("TestService");
-//        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
-//        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
-//
-//        serviceDiscovery.addServiceInstancesChangedListener(mockListener);
-//
-//        serviceInstance = new DefaultServiceInstance("TestService", "Test12345", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-//        serviceDiscovery.update(serviceInstance);
-//
-//        Thread.sleep(5000);
-//        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
-//            ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
-//        Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
-//        Assertions.assertEquals(1, eventArgumentCaptor.getValue().getServiceInstances().size());
-//
-//        serviceDiscovery.unregister(serviceInstance);
-//
-//        serviceDiscovery.destroy();
-//    }
-//
-//    @Test
-//    public void testGetInstance() throws Exception {
-//        KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
-//        serviceDiscovery.initialize(serverUrl);
-//
-//        serviceDiscovery.setCurrentHostname("TestServer");
-//        serviceDiscovery.setKubernetesClient(mockClient);
-//
-//        ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-//        serviceDiscovery.register(serviceInstance);
-//
-//        serviceDiscovery.update(serviceInstance);
-//
-//        Assertions.assertEquals(1, serviceDiscovery.getServices().size());
-//        Assertions.assertEquals(1, serviceDiscovery.getInstances("TestService").size());
-//
-//        Assertions.assertEquals(serviceInstance, serviceDiscovery.getLocalInstance());
-//
-//        serviceDiscovery.unregister(serviceInstance);
-//
-//        serviceDiscovery.destroy();
-//    }
-//}
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ScopeModelUtil;
+
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.EndpointsBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst.NAMESPACE;
+
+@ExtendWith({MockitoExtension.class})
+public class KubernetesServiceDiscoveryTest {
+    private static final String SERVICE_NAME = "TestService";
+
+    private static final String POD_NAME = "TestServer";
+
+    public KubernetesServer mockServer = new KubernetesServer(false, true);
+
+    private NamespacedKubernetesClient mockClient;
+
+    private ServiceInstancesChangedListener mockListener = Mockito.mock(ServiceInstancesChangedListener.class);
+
+    private URL serverUrl;
+
+    private Map<String, String> selector;
+
+    private KubernetesServiceDiscovery serviceDiscovery;
+
+
+    @BeforeEach
+    public void setUp() {
+        mockServer.before();
+        mockClient = mockServer.getClient().inNamespace("dubbo-demo");
+
+        ApplicationModel applicationModel = ApplicationModel.defaultModel();
+        applicationModel.getApplicationConfigManager().setApplication(new ApplicationConfig());
+
+        serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
+                .setProtocol("kubernetes")
+                .addParameter(NAMESPACE, "dubbo-demo")
+                .addParameter(KubernetesClientConst.USE_HTTPS, "false")
+                .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
+        serverUrl.setScopeModel(applicationModel);
+
+        this.serviceDiscovery = new KubernetesServiceDiscovery(applicationModel, serverUrl);
+
+        System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
+        System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false");
+
+        selector = new HashMap<>(4);
+        selector.put("l", "v");
+        Pod pod = new PodBuilder()
+                .withNewMetadata().withName(POD_NAME).withLabels(selector).endMetadata()
+                .build();
+
+        Service service = new ServiceBuilder()
+                .withNewMetadata().withName(SERVICE_NAME).endMetadata()
+                .withNewSpec().withSelector(selector).endSpec().build();
+
+        Endpoints endPoints = new EndpointsBuilder()
+                .withNewMetadata().withName(SERVICE_NAME).endMetadata()
+                .addNewSubset()
+                .addNewAddress().withIp("ip1")
+                .withNewTargetRef().withUid("uid1").withName(POD_NAME).endTargetRef().endAddress()
+                .addNewPort("Test", "Test", 12345, "TCP").endSubset()
+                .build();
+
+        mockClient.pods().resource(pod).create();
+        mockClient.services().resource(service).create();
+        mockClient.endpoints().resource(endPoints).create();
+    }
+
+    @AfterEach
+    public void destroy() throws Exception {
+        serviceDiscovery.destroy();
+        mockClient.close();
+        mockServer.after();
+    }
+
+    @Test
+    public void testEndpointsUpdate() throws Exception {
+        serviceDiscovery.setCurrentHostname(POD_NAME);
+        serviceDiscovery.setKubernetesClient(mockClient);
+
+        ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+        serviceDiscovery.doRegister(serviceInstance);
+
+        HashSet<String> serviceList = new HashSet<>(4);
+        serviceList.add(SERVICE_NAME);
+        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+        serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+        mockClient.endpoints().withName(SERVICE_NAME)
+                .edit(endpoints ->
+                        new EndpointsBuilder(endpoints)
+                                .editFirstSubset()
+                                .addNewAddress()
+                                .withIp("ip2")
+                                .withNewTargetRef().withUid("uid2").withName(POD_NAME).endTargetRef()
+                                .endAddress().endSubset()
+                                .build());
+
+        Thread.sleep(2000);
+        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+                ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+        Mockito.verify(mockListener, Mockito.times(2)).onEvent(eventArgumentCaptor.capture());
+        Assertions.assertEquals(2, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+        serviceDiscovery.doUnregister(serviceInstance);
+    }
+
+    @Test
+    public void testPodsUpdate() throws Exception {
+        serviceDiscovery.setCurrentHostname(POD_NAME);
+        serviceDiscovery.setKubernetesClient(mockClient);
+
+        ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+        serviceDiscovery.doRegister(serviceInstance);
+
+        HashSet<String> serviceList = new HashSet<>(4);
+        serviceList.add(SERVICE_NAME);
+        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+        serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+
+        serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test12345", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+        serviceDiscovery.doUpdate(serviceInstance);
+
+        Thread.sleep(2000);
+        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+                ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+        Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
+        Assertions.assertEquals(1, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+        serviceDiscovery.doUnregister(serviceInstance);
+    }
+
+    @Test
+    public void testServiceUpdate() throws Exception {
+        serviceDiscovery.setCurrentHostname(POD_NAME);
+        serviceDiscovery.setKubernetesClient(mockClient);
+
+        ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+        serviceDiscovery.doRegister(serviceInstance);
+
+        HashSet<String> serviceList = new HashSet<>(4);
+        serviceList.add(SERVICE_NAME);
+        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+        serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+
+        selector.put("app", "test");
+        mockClient.services().withName(SERVICE_NAME)
+                .edit(service -> new ServiceBuilder(service)
+                        .editSpec()
+                        .addToSelector(selector)
+                        .endSpec()
+                        .build());
+
+        Thread.sleep(2000);
+        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+                ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+        Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
+        Assertions.assertEquals(1, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+        serviceDiscovery.doUnregister(serviceInstance);
+    }
+
+    @Test
+    public void testGetInstance() {
+        serviceDiscovery.setCurrentHostname(POD_NAME);
+        serviceDiscovery.setKubernetesClient(mockClient);
+
+        ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+        serviceDiscovery.doRegister(serviceInstance);
+
+        serviceDiscovery.doUpdate(serviceInstance);
+
+        Assertions.assertEquals(1, serviceDiscovery.getServices().size());
+        Assertions.assertEquals(1, serviceDiscovery.getInstances(SERVICE_NAME).size());
+
+        serviceDiscovery.doUnregister(serviceInstance);
+    }
+}