Fix wrong service name when IP is node IP in `k8s-mesh` (#7737)

diff --git a/CHANGES.md b/CHANGES.md
index 6d1e8b7..d9c7e13 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
 * Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
 * Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
 * Improve the speed of writing TiDB by batching the SQL execution.
+* Fix wrong service name when IP is node IP in `k8s-mesh`.
 
 #### UI
 
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
index abd3f35..72a7954 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
@@ -66,6 +66,8 @@
 
     private final EnvoyMetricReceiverConfig config;
 
+    private final KubernetesNodeRegistry nodeRegistry;
+
     public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) {
         this.config = config;
 
@@ -80,6 +82,7 @@
                 .setDaemon(true)
                 .build()
         );
+        nodeRegistry = new KubernetesNodeRegistry();
     }
 
     public void start() throws IOException {
@@ -99,6 +102,8 @@
         listenPodEvents(coreV1Api, factory);
 
         factory.startAllRegisteredInformers();
+
+        nodeRegistry.start();
     }
 
     private void listenServiceEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
@@ -268,6 +273,9 @@
     }
 
     public ServiceMetaInfo findService(final String ip) {
+        if (nodeRegistry.isNode(ip)) {
+            return config.serviceMetaInfoFactory().unknown();
+        }
         final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
         if (isNull(service)) {
             log.debug("Unknown ip {}, ip -> service is null", ip);
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java
new file mode 100644
index 0000000..cfb92bd
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.kubernetes.client.informer.ResourceEventHandler;
+import io.kubernetes.client.informer.SharedInformerFactory;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.Configuration;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Node;
+import io.kubernetes.client.openapi.models.V1NodeAddress;
+import io.kubernetes.client.openapi.models.V1NodeList;
+import io.kubernetes.client.openapi.models.V1NodeStatus;
+import io.kubernetes.client.util.Config;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
+
+@Slf4j
+final class KubernetesNodeRegistry implements ResourceEventHandler<V1Node> {
+    private final Set<String> nodeIPs;
+
+    private final ExecutorService executor;
+
+    public KubernetesNodeRegistry() {
+        nodeIPs = Collections.newSetFromMap(new ConcurrentHashMap<>());
+        executor = Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+                .setNameFormat("KubernetesNodeRegistry-%d")
+                .setDaemon(true)
+                .build()
+        );
+    }
+
+    public void start() throws IOException {
+        final ApiClient apiClient = Config.defaultClient();
+        apiClient.setHttpClient(apiClient.getHttpClient()
+                                         .newBuilder()
+                                         .readTimeout(0, TimeUnit.SECONDS)
+                                         .build());
+        Configuration.setDefaultApiClient(apiClient);
+
+        final CoreV1Api coreV1Api = new CoreV1Api();
+        final SharedInformerFactory factory = new SharedInformerFactory(executor);
+
+        listenNodeEvents(coreV1Api, factory);
+
+        factory.startAllRegisteredInformers();
+    }
+
+    private void listenNodeEvents(final CoreV1Api coreV1Api,
+                                  final SharedInformerFactory factory) {
+        factory.sharedIndexInformerFor(
+            params -> coreV1Api.listNodeCall(
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                params.resourceVersion,
+                null,
+                params.timeoutSeconds,
+                params.watch,
+                null
+            ),
+            V1Node.class,
+            V1NodeList.class
+        ).addEventHandler(this);
+    }
+
+    @Override
+    public void onAdd(final V1Node node) {
+        forEachAddress(node, nodeIPs::add);
+    }
+
+    @Override
+    public void onUpdate(final V1Node oldNode, final V1Node newNode) {
+        onAdd(newNode);
+    }
+
+    @Override
+    public void onDelete(final V1Node node,
+                         final boolean deletedFinalStateUnknown) {
+        forEachAddress(node, nodeIPs::remove);
+    }
+
+    void forEachAddress(final V1Node node,
+                        final Consumer<String> consume) {
+        Optional.ofNullable(node)
+                .map(V1Node::getStatus)
+                .map(V1NodeStatus::getAddresses)
+                .ifPresent(addresses ->
+                               addresses.stream()
+                                        .map(V1NodeAddress::getAddress)
+                                        .filter(StringUtil::isNotBlank)
+                                        .forEach(consume)
+                );
+    }
+
+    boolean isNode(final String ip) {
+        return nodeIPs.contains(ip);
+    }
+}