feat: replace kube-apiserver watch with informer (#10543)
fixes #10535
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 375e0a5..4f38e14 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -159,7 +159,7 @@
<eureka.version>1.9.12</eureka.version>
<!-- Fabric8 for Kubernetes -->
- <fabric8_kubernetes_version>5.3.2</fabric8_kubernetes_version>
+ <fabric8_kubernetes_version>6.1.1</fabric8_kubernetes_version>
<!-- Alibaba -->
<alibaba_spring_context_support_version>1.0.8</alibaba_spring_context_support_version>
diff --git a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
index 1a0c1fa..300dae3 100644
--- a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
+++ b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
@@ -21,8 +21,7 @@
import org.apache.dubbo.rpc.cluster.router.mesh.route.MeshAppRuleListener;
import org.apache.dubbo.rpc.cluster.router.mesh.route.MeshEnvListener;
-import com.google.gson.Gson;
-import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
+import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
@@ -30,7 +29,6 @@
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -81,24 +79,27 @@
try {
Watch watch = kubernetesClient
- .customResource(
- MeshConstant.getVsDefinition())
- .watch(namespace, appName, null, new ListOptionsBuilder().build(), new Watcher<String>() {
- @Override
- public void eventReceived(Action action, String resource) {
- logger.info("Received VS Rule notification. AppName: " + appName + " Action:" + action + " Resource:" + resource);
-
- if (action == Action.ADDED || action == Action.MODIFIED) {
- Map drRuleMap = new Gson().fromJson(resource, Map.class);
- String vsRule = new Yaml(new SafeConstructor()).dump(drRuleMap);
- vsAppCache.put(appName, vsRule);
- if (drAppCache.containsKey(appName)) {
- notifyListener(vsRule, appName, drAppCache.get(appName));
+ .genericKubernetesResources(
+ MeshConstant.getVsDefinition())
+ .inNamespace(namespace)
+ .withName(appName)
+ .watch(new Watcher<GenericKubernetesResource>() {
+ @Override
+ public void eventReceived(Action action, GenericKubernetesResource resource) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Received VS Rule notification. AppName: " + appName + " Action:" + action + " Resource:" + resource);
}
- } else {
- appRuleListenerMap.get(appName).receiveConfigInfo("");
+
+ if (action == Action.ADDED || action == Action.MODIFIED) {
+ String vsRule = new Yaml(new SafeConstructor()).dump(resource);
+ vsAppCache.put(appName, vsRule);
+ if (drAppCache.containsKey(appName)) {
+ notifyListener(vsRule, appName, drAppCache.get(appName));
+ }
+ } else {
+ appRuleListenerMap.get(appName).receiveConfigInfo("");
+ }
}
- }
@Override
public void onClose(WatcherException cause) {
@@ -107,15 +108,17 @@
});
vsAppWatch.put(appName, watch);
try {
- Map<String, Object> vsRule = kubernetesClient
- .customResource(
- MeshConstant.getVsDefinition())
- .get(namespace, appName);
+ GenericKubernetesResource vsRule = kubernetesClient
+ .genericKubernetesResources(
+ MeshConstant.getVsDefinition())
+ .inNamespace(namespace)
+ .withName(appName)
+ .get();
vsAppCache.put(appName, new Yaml(new SafeConstructor()).dump(vsRule));
} catch (Throwable ignore) {
}
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("Error occurred when listen kubernetes crd.", e);
}
}
@@ -134,42 +137,47 @@
try {
Watch watch = kubernetesClient
- .customResource(
- MeshConstant.getDrDefinition())
- .watch(namespace, appName, null, new ListOptionsBuilder().build(), new Watcher<String>() {
- @Override
- public void eventReceived(Action action, String resource) {
- logger.info("Received VS Rule notification. AppName: " + appName + " Action:" + action + " Resource:" + resource);
-
- if (action == Action.ADDED || action == Action.MODIFIED) {
- Map drRuleMap = new Gson().fromJson(resource, Map.class);
- String drRule = new Yaml(new SafeConstructor()).dump(drRuleMap);
-
- drAppCache.put(appName, drRule);
- if (vsAppCache.containsKey(appName)) {
- notifyListener(vsAppCache.get(appName), appName, drRule);
+ .genericKubernetesResources(
+ MeshConstant.getDrDefinition())
+ .inNamespace(namespace)
+ .withName(appName)
+ .watch(new Watcher<GenericKubernetesResource>() {
+ @Override
+ public void eventReceived(Action action, GenericKubernetesResource resource) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Received VS Rule notification. AppName: " + appName + " Action:" + action + " Resource:" + resource);
}
- } else {
- appRuleListenerMap.get(appName).receiveConfigInfo("");
- }
- }
- @Override
- public void onClose(WatcherException cause) {
- // ignore
- }
- });
+ if (action == Action.ADDED || action == Action.MODIFIED) {
+ String drRule = new Yaml(new SafeConstructor()).dump(resource);
+
+ drAppCache.put(appName, drRule);
+ if (vsAppCache.containsKey(appName)) {
+ notifyListener(vsAppCache.get(appName), appName, drRule);
+ }
+ } else {
+ appRuleListenerMap.get(appName).receiveConfigInfo("");
+ }
+ }
+
+ @Override
+ public void onClose(WatcherException cause) {
+ // ignore
+ }
+ });
drAppWatch.put(appName, watch);
try {
- Map<String, Object> drRule = kubernetesClient
- .customResource(
- MeshConstant.getDrDefinition())
- .get(namespace, appName);
+ GenericKubernetesResource drRule = kubernetesClient
+ .genericKubernetesResources(
+ MeshConstant.getDrDefinition())
+ .inNamespace(namespace)
+ .withName(appName)
+ .get();
drAppCache.put(appName, new Yaml(new SafeConstructor()).dump(drRule));
} catch (Throwable ignore) {
}
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("Error occurred when listen kubernetes crd.", e);
}
}
diff --git a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
index e0f47f1..087de56 100644
--- a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
+++ b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
@@ -39,11 +39,10 @@
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.Watch;
-import io.fabric8.kubernetes.client.Watcher;
-import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import java.util.HashSet;
import java.util.LinkedList;
@@ -69,18 +68,18 @@
public final static String KUBERNETES_PROPERTIES_KEY = "io.dubbo/metadata";
- private final static ConcurrentHashMap<String, Watch> SERVICE_WATCHER = new ConcurrentHashMap<>(64);
-
- private final static ConcurrentHashMap<String, Watch> PODS_WATCHER = new ConcurrentHashMap<>(64);
-
- private final static ConcurrentHashMap<String, Watch> ENDPOINTS_WATCHER = new ConcurrentHashMap<>(64);
-
private final static ConcurrentHashMap<String, AtomicLong> SERVICE_UPDATE_TIME = new ConcurrentHashMap<>(64);
+ private final static ConcurrentHashMap<String, SharedIndexInformer<Service>> SERVICE_INFORMER = new ConcurrentHashMap<>(64);
+
+ private final static ConcurrentHashMap<String, SharedIndexInformer<Pod>> PODS_INFORMER = new ConcurrentHashMap<>(64);
+
+ private final static ConcurrentHashMap<String, SharedIndexInformer<Endpoints>> ENDPOINTS_INFORMER = new ConcurrentHashMap<>(64);
+
public KubernetesServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
super(applicationModel, registryURL);
Config config = KubernetesConfigUtils.createKubernetesConfig(registryURL);
- this.kubernetesClient = new DefaultKubernetesClient(config);
+ this.kubernetesClient = new KubernetesClientBuilder().withConfig(config).build();
this.currentHostname = System.getenv("HOSTNAME");
this.registryURL = registryURL;
this.namespace = config.getNamespace();
@@ -94,9 +93,9 @@
}
if (!availableAccess) {
String message = "Unable to access api server. " +
- "Please check your url config." +
- " Master URL: " + config.getMasterUrl() +
- " Hostname: " + currentHostname;
+ "Please check your url config." +
+ " Master URL: " + config.getMasterUrl() +
+ " Hostname: " + currentHostname;
logger.error(message);
} else {
KubernetesMeshEnvListener.injectKubernetesEnv(kubernetesClient, namespace);
@@ -104,15 +103,15 @@
}
@Override
- public void doDestroy() throws Exception {
- SERVICE_WATCHER.forEach((k, v) -> v.close());
- SERVICE_WATCHER.clear();
+ public void doDestroy() {
+ SERVICE_INFORMER.forEach((k, v) -> v.close());
+ SERVICE_INFORMER.clear();
- PODS_WATCHER.forEach((k, v) -> v.close());
- PODS_WATCHER.clear();
+ PODS_INFORMER.forEach((k, v) -> v.close());
+ PODS_INFORMER.clear();
- ENDPOINTS_WATCHER.forEach((k, v) -> v.close());
- ENDPOINTS_WATCHER.clear();
+ ENDPOINTS_INFORMER.forEach((k, v) -> v.close());
+ ENDPOINTS_INFORMER.clear();
kubernetesClient.close();
}
@@ -121,18 +120,18 @@
public void doRegister(ServiceInstance serviceInstance) throws RuntimeException {
if (enableRegister) {
kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withName(currentHostname)
- .edit(pod ->
- new PodBuilder(pod)
- .editOrNewMetadata()
- .addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata()))
- .endMetadata()
- .build());
+ .pods()
+ .inNamespace(namespace)
+ .withName(currentHostname)
+ .edit(pod ->
+ new PodBuilder(pod)
+ .editOrNewMetadata()
+ .addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata()))
+ .endMetadata()
+ .build());
if (logger.isInfoEnabled()) {
logger.info("Write Current Service Instance Metadata to Kubernetes pod. " +
- "Current pod name: " + currentHostname);
+ "Current pod name: " + currentHostname);
}
}
}
@@ -150,15 +149,15 @@
public void doUnregister(ServiceInstance serviceInstance) throws RuntimeException {
if (enableRegister) {
kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withName(currentHostname)
- .edit(pod ->
- new PodBuilder(pod)
- .editOrNewMetadata()
- .removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
- .endMetadata()
- .build());
+ .pods()
+ .inNamespace(namespace)
+ .withName(currentHostname)
+ .edit(pod ->
+ new PodBuilder(pod)
+ .editOrNewMetadata()
+ .removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
+ .endMetadata()
+ .build());
if (logger.isInfoEnabled()) {
logger.info("Remove Current Service Instance from Kubernetes pod. Current pod name: " + currentHostname);
}
@@ -168,23 +167,33 @@
@Override
public Set<String> getServices() {
return kubernetesClient
- .services()
- .inNamespace(namespace)
- .list()
- .getItems()
- .stream()
- .map(service -> service.getMetadata().getName())
- .collect(Collectors.toSet());
+ .services()
+ .inNamespace(namespace)
+ .list()
+ .getItems()
+ .stream()
+ .map(service -> service.getMetadata().getName())
+ .collect(Collectors.toSet());
}
@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
- Endpoints endpoints =
- kubernetesClient
- .endpoints()
- .inNamespace(namespace)
- .withName(serviceName)
- .get();
+ Endpoints endpoints = null;
+ SharedIndexInformer<Endpoints> endInformer = ENDPOINTS_INFORMER.get(serviceName);
+ if (endInformer != null) {
+ // get endpoints directly from informer local store
+ List<Endpoints> endpointsList = endInformer.getStore().list();
+ if (endpointsList.size() > 0) {
+ endpoints = endpointsList.get(0);
+ }
+ }
+ if (endpoints == null) {
+ endpoints = kubernetesClient
+ .endpoints()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .get();
+ }
return toServiceInstance(endpoints, serviceName);
}
@@ -206,28 +215,40 @@
}
private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
- Watch watch = kubernetesClient
- .endpoints()
- .inNamespace(namespace)
- .withName(serviceName)
- .watch(new Watcher<Endpoints>() {
- @Override
- public void eventReceived(Action action, Endpoints resource) {
- if (logger.isDebugEnabled()) {
- logger.debug("Received Endpoint Event. Event type: " + action.name() +
- ". Current pod name: " + currentHostname);
+ SharedIndexInformer<Endpoints> endInformer = kubernetesClient
+ .endpoints()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .inform(new ResourceEventHandler<Endpoints>() {
+ @Override
+ public void onAdd(Endpoints endpoints) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Endpoint Event. Event type: added. Current pod name: " + currentHostname +
+ ". Endpoints is: " + endpoints);
+ }
+ notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
}
- notifyServiceChanged(serviceName, listener);
- }
+ @Override
+ public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Endpoint Event. Event type: updated. Current pod name: " + currentHostname +
+ ". The new Endpoints is: " + newEndpoints);
+ }
+ notifyServiceChanged(serviceName, listener, toServiceInstance(newEndpoints, serviceName));
+ }
- @Override
- public void onClose(WatcherException cause) {
- // ignore
- }
- });
+ @Override
+ public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Endpoint Event. Event type: deleted. Current pod name: " + currentHostname +
+ ". Endpoints is: " + endpoints);
+ }
+ notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
+ }
+ });
- ENDPOINTS_WATCHER.put(serviceName, watch);
+ ENDPOINTS_INFORMER.put(serviceName, endInformer);
}
private void watchPods(ServiceInstancesChangedListener listener, String serviceName) {
@@ -236,68 +257,88 @@
return;
}
- Watch watch = kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withLabels(serviceSelector)
- .watch(new Watcher<Pod>() {
- @Override
- public void eventReceived(Action action, Pod resource) {
- if (Action.MODIFIED.equals(action)) {
+ SharedIndexInformer<Pod> podInformer = kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabels(serviceSelector)
+ .inform(new ResourceEventHandler<Pod>() {
+ @Override
+ public void onAdd(Pod pod) {
if (logger.isDebugEnabled()) {
- logger.debug("Received Pods Update Event. Current pod name: " + currentHostname);
+ logger.debug("Received Pods Event. Event type: added. Current pod name: " + currentHostname +
+ ". Pod is: " + pod);
+ }
+ }
+
+ @Override
+ public void onUpdate(Pod oldPod, Pod newPod) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Pods Event. Event type: updated. Current pod name: " + currentHostname +
+ ". new Pod is: " + newPod);
}
- notifyServiceChanged(serviceName, listener);
+ notifyServiceChanged(serviceName, listener, getInstances(serviceName));
}
- }
- @Override
- public void onClose(WatcherException cause) {
- // ignore
- }
- });
+ @Override
+ public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Pods Event. Event type: deleted. Current pod name: " + currentHostname +
+ ". Pod is: " + pod);
+ }
+ }
+ });
- PODS_WATCHER.put(serviceName, watch);
+ PODS_INFORMER.put(serviceName, podInformer);
}
private void watchService(ServiceInstancesChangedListener listener, String serviceName) {
- Watch watch = kubernetesClient
- .services()
- .inNamespace(namespace)
- .withName(serviceName)
- .watch(new Watcher<Service>() {
- @Override
- public void eventReceived(Action action, Service resource) {
- if (Action.MODIFIED.equals(action)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Received Service Update Event. Update Pods Watcher. " +
- "Current pod name: " + currentHostname);
+ SharedIndexInformer<Service> serviceInformer = kubernetesClient
+ .services()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .inform(
+ new ResourceEventHandler<Service>() {
+ @Override
+ public void onAdd(Service service) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Service Added Event. " +
+ "Current pod name: " + currentHostname);
+ }
+ }
+
+ @Override
+ public void onUpdate(Service oldService, Service newService) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Service Update Event. Update Pods Watcher. Current pod name: " + currentHostname +
+ ". The new Service is: " + newService);
+ }
+ if (PODS_INFORMER.containsKey(serviceName)) {
+ PODS_INFORMER.get(serviceName).close();
+ PODS_INFORMER.remove(serviceName);
+ }
+ watchPods(listener, serviceName);
+ }
+
+ @Override
+ public void onDelete(Service service, boolean deletedFinalStateUnknown) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Service Delete Event. " +
+ "Current pod name: " + currentHostname);
+ }
+ }
}
+ );
- if (PODS_WATCHER.containsKey(serviceName)) {
- PODS_WATCHER.get(serviceName).close();
- PODS_WATCHER.remove(serviceName);
- }
- watchPods(listener, serviceName);
- }
- }
-
- @Override
- public void onClose(WatcherException cause) {
- // ignore
- }
- });
-
- SERVICE_WATCHER.put(serviceName, watch);
+ SERVICE_INFORMER.put(serviceName, serviceInformer);
}
- private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {
+ private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> serviceInstanceList) {
long receivedTime = System.nanoTime();
ServiceInstancesChangedEvent event;
- event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
+ event = new ServiceInstancesChangedEvent(serviceName, serviceInstanceList);
AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
long lastUpdateTime = updateTime.get();
@@ -311,9 +352,9 @@
if (logger.isInfoEnabled()) {
logger.info("Discard Service Instance Data. " +
- "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
- "Current Data received time: " + receivedTime + ". " +
- "Newer Data received time: " + lastUpdateTime + ".");
+ "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
+ "Current Data received time: " + receivedTime + ". " +
+ "Newer Data received time: " + lastUpdateTime + ".");
}
}
@@ -336,25 +377,25 @@
return new LinkedList<>();
}
Map<String, Pod> pods = kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withLabels(serviceSelector)
- .list()
- .getItems()
- .stream()
- .collect(
- Collectors.toMap(
- pod -> pod.getMetadata().getName(),
- pod -> pod));
+ .pods()
+ .inNamespace(namespace)
+ .withLabels(serviceSelector)
+ .list()
+ .getItems()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ pod -> pod.getMetadata().getName(),
+ pod -> pod));
List<ServiceInstance> instances = new LinkedList<>();
Set<Integer> instancePorts = new HashSet<>();
for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
instancePorts.addAll(
- endpointSubset.getPorts()
- .stream().map(EndpointPort::getPort)
- .collect(Collectors.toSet()));
+ endpointSubset.getPorts()
+ .stream().map(EndpointPort::getPort)
+ .collect(Collectors.toSet()));
}
for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
@@ -363,10 +404,9 @@
String ip = address.getIp();
if (pod == null) {
logger.warn("Unable to match Kubernetes Endpoint address with Pod. " +
- "EndpointAddress Hostname: " + address.getTargetRef().getName());
+ "EndpointAddress Hostname: " + address.getTargetRef().getName());
continue;
}
-
instancePorts.forEach(port -> {
ServiceInstance serviceInstance = new DefaultServiceInstance(serviceName, ip, port, ScopeModelUtil.getApplicationModel(getUrl().getScopeModel()));
@@ -376,8 +416,8 @@
instances.add(serviceInstance);
} else {
logger.warn("Unable to find Service Instance metadata in Pod Annotations. " +
- "Possibly cause: provider has not been initialized successfully. " +
- "EndpointAddress Hostname: " + address.getTargetRef().getName());
+ "Possibly cause: provider has not been initialized successfully. " +
+ "EndpointAddress Hostname: " + address.getTargetRef().getName());
}
});
}
diff --git a/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java b/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
index 6b1a1b0..28aa002 100644
--- a/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
+++ b/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
@@ -14,185 +14,220 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-//package org.apache.dubbo.registry.kubernetes;
-//
-//import org.apache.dubbo.common.URL;
-//import org.apache.dubbo.registry.client.DefaultServiceInstance;
-//import org.apache.dubbo.registry.client.ServiceInstance;
-//import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
-//import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-//import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
-//import org.apache.dubbo.rpc.model.ApplicationModel;
-//import org.apache.dubbo.rpc.model.ScopeModelUtil;
-//
-//import io.fabric8.kubernetes.api.model.Endpoints;
-//import io.fabric8.kubernetes.api.model.EndpointsBuilder;
-//import io.fabric8.kubernetes.api.model.Pod;
-//import io.fabric8.kubernetes.api.model.PodBuilder;
-//import io.fabric8.kubernetes.api.model.Service;
-//import io.fabric8.kubernetes.api.model.ServiceBuilder;
-//import io.fabric8.kubernetes.client.Config;
-//import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
-//import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
-//import org.junit.jupiter.api.AfterEach;
-//import org.junit.jupiter.api.Assertions;
-//import org.junit.jupiter.api.BeforeEach;
-//import org.junit.jupiter.api.Test;
-//import org.junit.jupiter.api.extension.ExtendWith;
-//import org.mockito.ArgumentCaptor;
-//import org.mockito.Mockito;
-//import org.mockito.junit.jupiter.MockitoExtension;
-//
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Map;
-//
-//@ExtendWith({MockitoExtension.class})
-//public class KubernetesServiceDiscoveryTest {
-// public KubernetesServer mockServer = new KubernetesServer(false, true);
-//
-// private NamespacedKubernetesClient mockClient;
-//
-// private ServiceInstancesChangedListener mockListener = Mockito.mock(ServiceInstancesChangedListener.class);
-//
-// private URL serverUrl;
-//
-// private Map<String, String> selector;
-//
-// @BeforeEach
-// public void setUp() {
-// mockServer.before();
-// mockClient = mockServer.getClient();
-//
-// serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
-// .setProtocol("kubernetes")
-// .addParameter(KubernetesClientConst.USE_HTTPS, "false")
-// .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
-// serverUrl.setScopeModel(ApplicationModel.defaultModel());
-//
-// System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
-// System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false");
-//
-// selector = new HashMap<>(4);
-// selector.put("l", "v");
-// Pod pod = new PodBuilder()
-// .withNewMetadata().withName("TestServer").withLabels(selector).endMetadata()
-// .build();
-//
-// Service service = new ServiceBuilder()
-// .withNewMetadata().withName("TestService").endMetadata()
-// .withNewSpec().withSelector(selector).endSpec().build();
-//
-// Endpoints endPoints = new EndpointsBuilder()
-// .withNewMetadata().withName("TestService").endMetadata()
-// .addNewSubset()
-// .addNewAddress().withIp("ip1")
-// .withNewTargetRef().withUid("uid1").withName("TestServer").endTargetRef().endAddress()
-// .addNewPort("Test", "Test", 12345, "TCP").endSubset()
-// .build();
-//
-// mockClient.pods().create(pod);
-// mockClient.services().create(service);
-// mockClient.endpoints().create(endPoints);
-// }
-//
-// @AfterEach
-// public void destroy() {
-// mockServer.after();
-// }
-//
-// @Test
-// public void testEndpointsUpdate() throws Exception {
-//
-// KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
-// serviceDiscovery.initialize(serverUrl);
-//
-// serviceDiscovery.setCurrentHostname("TestServer");
-// serviceDiscovery.setKubernetesClient(mockClient);
-//
-// ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-// serviceDiscovery.register(serviceInstance);
-//
-// HashSet<String> serviceList = new HashSet<>(4);
-// serviceList.add("TestService");
-// Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
-// Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
-//
-// serviceDiscovery.addServiceInstancesChangedListener(mockListener);
-// mockClient.endpoints().withName("TestService")
-// .edit(endpoints ->
-// new EndpointsBuilder(endpoints)
-// .editFirstSubset()
-// .addNewAddress()
-// .withIp("ip2")
-// .withNewTargetRef().withUid("uid2").withName("TestServer").endTargetRef()
-// .endAddress().endSubset()
-// .build());
-//
-// Thread.sleep(5000);
-// ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
-// ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
-// Mockito.verify(mockListener, Mockito.times(2)).onEvent(eventArgumentCaptor.capture());
-// Assertions.assertEquals(2, eventArgumentCaptor.getValue().getServiceInstances().size());
-//
-// serviceDiscovery.unregister(serviceInstance);
-//
-// serviceDiscovery.destroy();
-// }
-//
-// @Test
-// public void testPodsUpdate() throws Exception {
-//
-// KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
-// serviceDiscovery.initialize(serverUrl);
-//
-// serviceDiscovery.setCurrentHostname("TestServer");
-// serviceDiscovery.setKubernetesClient(mockClient);
-//
-// ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-// serviceDiscovery.register(serviceInstance);
-//
-// HashSet<String> serviceList = new HashSet<>(4);
-// serviceList.add("TestService");
-// Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
-// Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
-//
-// serviceDiscovery.addServiceInstancesChangedListener(mockListener);
-//
-// serviceInstance = new DefaultServiceInstance("TestService", "Test12345", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-// serviceDiscovery.update(serviceInstance);
-//
-// Thread.sleep(5000);
-// ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
-// ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
-// Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
-// Assertions.assertEquals(1, eventArgumentCaptor.getValue().getServiceInstances().size());
-//
-// serviceDiscovery.unregister(serviceInstance);
-//
-// serviceDiscovery.destroy();
-// }
-//
-// @Test
-// public void testGetInstance() throws Exception {
-// KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
-// serviceDiscovery.initialize(serverUrl);
-//
-// serviceDiscovery.setCurrentHostname("TestServer");
-// serviceDiscovery.setKubernetesClient(mockClient);
-//
-// ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-// serviceDiscovery.register(serviceInstance);
-//
-// serviceDiscovery.update(serviceInstance);
-//
-// Assertions.assertEquals(1, serviceDiscovery.getServices().size());
-// Assertions.assertEquals(1, serviceDiscovery.getInstances("TestService").size());
-//
-// Assertions.assertEquals(serviceInstance, serviceDiscovery.getLocalInstance());
-//
-// serviceDiscovery.unregister(serviceInstance);
-//
-// serviceDiscovery.destroy();
-// }
-//}
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ScopeModelUtil;
+
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.EndpointsBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst.NAMESPACE;
+
+@ExtendWith({MockitoExtension.class})
+public class KubernetesServiceDiscoveryTest {
+ private static final String SERVICE_NAME = "TestService";
+
+ private static final String POD_NAME = "TestServer";
+
+ public KubernetesServer mockServer = new KubernetesServer(false, true);
+
+ private NamespacedKubernetesClient mockClient;
+
+ private ServiceInstancesChangedListener mockListener = Mockito.mock(ServiceInstancesChangedListener.class);
+
+ private URL serverUrl;
+
+ private Map<String, String> selector;
+
+ private KubernetesServiceDiscovery serviceDiscovery;
+
+
+ @BeforeEach
+ public void setUp() {
+ mockServer.before();
+ mockClient = mockServer.getClient().inNamespace("dubbo-demo");
+
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ applicationModel.getApplicationConfigManager().setApplication(new ApplicationConfig());
+
+ serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
+ .setProtocol("kubernetes")
+ .addParameter(NAMESPACE, "dubbo-demo")
+ .addParameter(KubernetesClientConst.USE_HTTPS, "false")
+ .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
+ serverUrl.setScopeModel(applicationModel);
+
+ this.serviceDiscovery = new KubernetesServiceDiscovery(applicationModel, serverUrl);
+
+ System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
+ System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false");
+
+ selector = new HashMap<>(4);
+ selector.put("l", "v");
+ Pod pod = new PodBuilder()
+ .withNewMetadata().withName(POD_NAME).withLabels(selector).endMetadata()
+ .build();
+
+ Service service = new ServiceBuilder()
+ .withNewMetadata().withName(SERVICE_NAME).endMetadata()
+ .withNewSpec().withSelector(selector).endSpec().build();
+
+ Endpoints endPoints = new EndpointsBuilder()
+ .withNewMetadata().withName(SERVICE_NAME).endMetadata()
+ .addNewSubset()
+ .addNewAddress().withIp("ip1")
+ .withNewTargetRef().withUid("uid1").withName(POD_NAME).endTargetRef().endAddress()
+ .addNewPort("Test", "Test", 12345, "TCP").endSubset()
+ .build();
+
+ mockClient.pods().resource(pod).create();
+ mockClient.services().resource(service).create();
+ mockClient.endpoints().resource(endPoints).create();
+ }
+
+ @AfterEach
+ public void destroy() throws Exception {
+ serviceDiscovery.destroy();
+ mockClient.close();
+ mockServer.after();
+ }
+
+ @Test
+ public void testEndpointsUpdate() throws Exception {
+ serviceDiscovery.setCurrentHostname(POD_NAME);
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+ serviceDiscovery.doRegister(serviceInstance);
+
+ HashSet<String> serviceList = new HashSet<>(4);
+ serviceList.add(SERVICE_NAME);
+ Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+ Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+ serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+ mockClient.endpoints().withName(SERVICE_NAME)
+ .edit(endpoints ->
+ new EndpointsBuilder(endpoints)
+ .editFirstSubset()
+ .addNewAddress()
+ .withIp("ip2")
+ .withNewTargetRef().withUid("uid2").withName(POD_NAME).endTargetRef()
+ .endAddress().endSubset()
+ .build());
+
+ Thread.sleep(2000);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+ ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Mockito.verify(mockListener, Mockito.times(2)).onEvent(eventArgumentCaptor.capture());
+ Assertions.assertEquals(2, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+ serviceDiscovery.doUnregister(serviceInstance);
+ }
+
+ @Test
+ public void testPodsUpdate() throws Exception {
+ serviceDiscovery.setCurrentHostname(POD_NAME);
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+ serviceDiscovery.doRegister(serviceInstance);
+
+ HashSet<String> serviceList = new HashSet<>(4);
+ serviceList.add(SERVICE_NAME);
+ Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+ Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+ serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+
+ serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test12345", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+ serviceDiscovery.doUpdate(serviceInstance);
+
+ Thread.sleep(2000);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+ ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
+ Assertions.assertEquals(1, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+ serviceDiscovery.doUnregister(serviceInstance);
+ }
+
+ @Test
+ public void testServiceUpdate() throws Exception {
+ serviceDiscovery.setCurrentHostname(POD_NAME);
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+ serviceDiscovery.doRegister(serviceInstance);
+
+ HashSet<String> serviceList = new HashSet<>(4);
+ serviceList.add(SERVICE_NAME);
+ Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+ Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+ serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+
+ selector.put("app", "test");
+ mockClient.services().withName(SERVICE_NAME)
+ .edit(service -> new ServiceBuilder(service)
+ .editSpec()
+ .addToSelector(selector)
+ .endSpec()
+ .build());
+
+ Thread.sleep(2000);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+ ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
+ Assertions.assertEquals(1, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+ serviceDiscovery.doUnregister(serviceInstance);
+ }
+
+ @Test
+ public void testGetInstance() {
+ serviceDiscovery.setCurrentHostname(POD_NAME);
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+ serviceDiscovery.doRegister(serviceInstance);
+
+ serviceDiscovery.doUpdate(serviceInstance);
+
+ Assertions.assertEquals(1, serviceDiscovery.getServices().size());
+ Assertions.assertEquals(1, serviceDiscovery.getInstances(SERVICE_NAME).size());
+
+ serviceDiscovery.doUnregister(serviceInstance);
+ }
+}