Fix wrong service name when IP is node IP in `k8s-mesh` (#7737)
diff --git a/CHANGES.md b/CHANGES.md
index 6d1e8b7..d9c7e13 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
* Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
* Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
* Improve the speed of writing TiDB by batching the SQL execution.
+* Fix wrong service name when IP is node IP in `k8s-mesh`.
#### UI
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
index abd3f35..72a7954 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
@@ -66,6 +66,8 @@
private final EnvoyMetricReceiverConfig config;
+ private final KubernetesNodeRegistry nodeRegistry;
+
public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) {
this.config = config;
@@ -80,6 +82,7 @@
.setDaemon(true)
.build()
);
+ nodeRegistry = new KubernetesNodeRegistry();
}
public void start() throws IOException {
@@ -99,6 +102,8 @@
listenPodEvents(coreV1Api, factory);
factory.startAllRegisteredInformers();
+
+ nodeRegistry.start();
}
private void listenServiceEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
@@ -268,6 +273,9 @@
}
public ServiceMetaInfo findService(final String ip) {
+ if (nodeRegistry.isNode(ip)) {
+ return config.serviceMetaInfoFactory().unknown();
+ }
final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
if (isNull(service)) {
log.debug("Unknown ip {}, ip -> service is null", ip);
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java
new file mode 100644
index 0000000..cfb92bd
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.kubernetes.client.informer.ResourceEventHandler;
+import io.kubernetes.client.informer.SharedInformerFactory;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.Configuration;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Node;
+import io.kubernetes.client.openapi.models.V1NodeAddress;
+import io.kubernetes.client.openapi.models.V1NodeList;
+import io.kubernetes.client.openapi.models.V1NodeStatus;
+import io.kubernetes.client.util.Config;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
+
+@Slf4j
+final class KubernetesNodeRegistry implements ResourceEventHandler<V1Node> {
+ private final Set<String> nodeIPs;
+
+ private final ExecutorService executor;
+
+ public KubernetesNodeRegistry() {
+ nodeIPs = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ executor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setNameFormat("KubernetesNodeRegistry-%d")
+ .setDaemon(true)
+ .build()
+ );
+ }
+
+ public void start() throws IOException {
+ final ApiClient apiClient = Config.defaultClient();
+ apiClient.setHttpClient(apiClient.getHttpClient()
+ .newBuilder()
+ .readTimeout(0, TimeUnit.SECONDS)
+ .build());
+ Configuration.setDefaultApiClient(apiClient);
+
+ final CoreV1Api coreV1Api = new CoreV1Api();
+ final SharedInformerFactory factory = new SharedInformerFactory(executor);
+
+ listenNodeEvents(coreV1Api, factory);
+
+ factory.startAllRegisteredInformers();
+ }
+
+ private void listenNodeEvents(final CoreV1Api coreV1Api,
+ final SharedInformerFactory factory) {
+ factory.sharedIndexInformerFor(
+ params -> coreV1Api.listNodeCall(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ params.resourceVersion,
+ null,
+ params.timeoutSeconds,
+ params.watch,
+ null
+ ),
+ V1Node.class,
+ V1NodeList.class
+ ).addEventHandler(this);
+ }
+
+ @Override
+ public void onAdd(final V1Node node) {
+ forEachAddress(node, nodeIPs::add);
+ }
+
+ @Override
+ public void onUpdate(final V1Node oldNode, final V1Node newNode) {
+ onAdd(newNode);
+ }
+
+ @Override
+ public void onDelete(final V1Node node,
+ final boolean deletedFinalStateUnknown) {
+ forEachAddress(node, nodeIPs::remove);
+ }
+
+ void forEachAddress(final V1Node node,
+ final Consumer<String> consume) {
+ Optional.ofNullable(node)
+ .map(V1Node::getStatus)
+ .map(V1NodeStatus::getAddresses)
+ .ifPresent(addresses ->
+ addresses.stream()
+ .map(V1NodeAddress::getAddress)
+ .filter(StringUtil::isNotBlank)
+ .forEach(consume)
+ );
+ }
+
+ boolean isNode(final String ip) {
+ return nodeIPs.contains(ip);
+ }
+}