| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; |
| |
| import static com.google.common.base.Strings.isNullOrEmpty; |
| import static java.util.Objects.isNull; |
| import static java.util.Optional.ofNullable; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| import org.apache.skywalking.library.kubernetes.KubernetesEndpointWatcher; |
| import org.apache.skywalking.library.kubernetes.KubernetesEndpointsListener; |
| import org.apache.skywalking.library.kubernetes.KubernetesNodeListener; |
| import org.apache.skywalking.library.kubernetes.KubernetesNodeWatcher; |
| import org.apache.skywalking.library.kubernetes.KubernetesPodListener; |
| import org.apache.skywalking.library.kubernetes.KubernetesPodWatcher; |
| import org.apache.skywalking.library.kubernetes.KubernetesServiceListener; |
| import org.apache.skywalking.library.kubernetes.KubernetesServiceWatcher; |
| import org.apache.skywalking.oap.server.library.util.StringUtil; |
| import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; |
| import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; |
| import com.google.common.collect.ImmutableMap; |
| import io.kubernetes.client.openapi.models.V1Endpoints; |
| import io.kubernetes.client.openapi.models.V1Node; |
| import io.kubernetes.client.openapi.models.V1NodeAddress; |
| import io.kubernetes.client.openapi.models.V1NodeStatus; |
| import io.kubernetes.client.openapi.models.V1ObjectMeta; |
| import io.kubernetes.client.openapi.models.V1Pod; |
| import io.kubernetes.client.openapi.models.V1Service; |
| import lombok.extern.slf4j.Slf4j; |
| |
| @Slf4j |
| public class K8SServiceRegistry |
| implements KubernetesServiceListener, KubernetesPodListener, |
| KubernetesEndpointsListener, KubernetesNodeListener { |
| protected final Map<String/* ip */, ServiceMetaInfo> ipServiceMetaInfoMap; |
| |
| protected final Map<String/* namespace:serviceName */, V1Service> idServiceMap; |
| |
| protected final Map<String/* ip */, V1Pod> ipPodMap; |
| |
| protected final Map<String/* ip */, String/* namespace:serviceName */> ipServiceMap; |
| |
| protected final ServiceNameFormatter serviceNameFormatter; |
| |
| private final EnvoyMetricReceiverConfig config; |
| |
| private final Set<String> nodeIPs = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
| |
| public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) { |
| this.config = config; |
| |
| serviceNameFormatter = new ServiceNameFormatter(config.getK8sServiceNameRule()); |
| ipServiceMetaInfoMap = new ConcurrentHashMap<>(); |
| idServiceMap = new ConcurrentHashMap<>(); |
| ipPodMap = new ConcurrentHashMap<>(); |
| ipServiceMap = new ConcurrentHashMap<>(); |
| } |
| |
| public void start() throws IOException { |
| KubernetesPodWatcher.INSTANCE.addListener(this).start(); |
| KubernetesServiceWatcher.INSTANCE.addListener(this).start(); |
| KubernetesEndpointWatcher.INSTANCE.addListener(this).start(); |
| KubernetesNodeWatcher.INSTANCE.addListener(this).start(); |
| } |
| |
| @Override |
| public void onServiceAdded(final V1Service service) { |
| ofNullable(service.getMetadata()).ifPresent( |
| metadata -> idServiceMap.put(metadata.getNamespace() + ":" + metadata.getName(), service) |
| ); |
| |
| recompose(); |
| } |
| |
| @Override |
| public void onServiceDeleted(final V1Service service) { |
| ofNullable(service.getMetadata()).ifPresent( |
| metadata -> idServiceMap.remove(metadata.getNamespace() + ":" + metadata.getName()) |
| ); |
| } |
| |
| @Override |
| public void onServiceUpdated(V1Service oldService, V1Service newService) { |
| onServiceAdded(newService); |
| } |
| |
| @Override |
| public void onPodAdded(final V1Pod pod) { |
| ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(podIP -> ipPodMap.put(podIP, pod)); |
| |
| recompose(); |
| } |
| |
| @Override |
| public void onPodDeleted(final V1Pod pod) { |
| ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(ipPodMap::remove); |
| } |
| |
| @Override |
| public void onPodUpdated(V1Pod oldPod, V1Pod newPod) { |
| onPodAdded(newPod); |
| } |
| |
| @Override |
| public void onEndpointsAdded(final V1Endpoints endpoints) { |
| V1ObjectMeta endpointsMetadata = endpoints.getMetadata(); |
| if (isNull(endpointsMetadata)) { |
| log.error("Endpoints metadata is null: {}", endpoints); |
| return; |
| } |
| |
| final String namespace = endpointsMetadata.getNamespace(); |
| final String name = endpointsMetadata.getName(); |
| |
| ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach( |
| subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach( |
| address -> ofNullable(address.getIp()).ifPresent(ip -> ipServiceMap.put(ip, namespace + ":" + name)) |
| )) |
| )); |
| |
| recompose(); |
| } |
| |
| @Override |
| public void onEndpointsDeleted(final V1Endpoints endpoints) { |
| ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach( |
| subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach( |
| address -> ofNullable(address.getIp()).ifPresent(ipServiceMap::remove) |
| )) |
| )); |
| } |
| |
| @Override |
| public void onEndpointsUpdated(V1Endpoints oldEndpoints, V1Endpoints newEndpoints) { |
| onEndpointsAdded(newEndpoints); |
| } |
| |
| @Override |
| public void onNodeAdded(final V1Node node) { |
| forEachAddress(node, nodeIPs::add); |
| } |
| |
| @Override |
| public void onNodeUpdated(final V1Node oldNode, final V1Node newNode) { |
| onNodeAdded(newNode); |
| } |
| |
| @Override |
| public void onNodeDeleted(final V1Node node) { |
| forEachAddress(node, nodeIPs::remove); |
| } |
| |
| protected void forEachAddress(final V1Node node, |
| final Consumer<String> consume) { |
| Optional.ofNullable(node) |
| .map(V1Node::getStatus) |
| .map(V1NodeStatus::getAddresses) |
| .ifPresent(addresses -> |
| addresses.stream() |
| .map(V1NodeAddress::getAddress) |
| .filter(StringUtil::isNotBlank) |
| .forEach(consume) |
| ); |
| } |
| |
| protected List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final V1ObjectMeta podMeta) { |
| final Map<String, String> labels = podMeta.getLabels(); |
| final List<ServiceMetaInfo.KeyValue> tags = new ArrayList<>(); |
| tags.add(new ServiceMetaInfo.KeyValue("pod", podMeta.getName())); |
| tags.add(new ServiceMetaInfo.KeyValue("namespace", podMeta.getNamespace())); |
| if (isNull(labels)) { |
| return tags; |
| } |
| return labels.entrySet() |
| .stream() |
| .map(each -> new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue())) |
| .collect(Collectors.toCollection(() -> tags)); |
| } |
| |
| public ServiceMetaInfo findService(final String ip) { |
| if (isNode(ip)) { |
| return config.serviceMetaInfoFactory().unknown(); |
| } |
| final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip); |
| if (isNull(service)) { |
| log.debug("Unknown ip {}, ip -> service is null", ip); |
| return config.serviceMetaInfoFactory().unknown(); |
| } |
| return service; |
| } |
| |
| protected void recompose() { |
| ipPodMap.forEach((ip, pod) -> { |
| final String namespaceService = ipServiceMap.get(ip); |
| final V1Service service; |
| if (isNullOrEmpty(namespaceService) || isNull(service = idServiceMap.get(namespaceService))) { |
| return; |
| } |
| |
| if (isNull(pod.getMetadata())) { |
| log.warn("Pod metadata is null, {}", pod); |
| return; |
| } |
| |
| ipServiceMetaInfoMap.computeIfAbsent(ip, unused -> composeServiceMetaInfo(service, pod)); |
| }); |
| } |
| |
| protected ServiceMetaInfo composeServiceMetaInfo(final V1Service service, final V1Pod pod) { |
| final Map<String, Object> context = ImmutableMap.of("service", service, "pod", pod); |
| final ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo(); |
| final V1ObjectMeta podMetadata = pod.getMetadata(); |
| |
| try { |
| serviceMetaInfo.setServiceName(serviceNameFormatter.format(context)); |
| } catch (Exception e) { |
| log.error("Failed to evaluate service name.", e); |
| final V1ObjectMeta serviceMetadata = service.getMetadata(); |
| if (isNull(serviceMetadata)) { |
| log.warn("Service metadata is null, {}", service); |
| return config.serviceMetaInfoFactory().unknown(); |
| } |
| serviceMetaInfo.setServiceName(serviceMetadata.getName()); |
| } |
| serviceMetaInfo.setServiceInstanceName( |
| String.format("%s.%s", podMetadata.getName(), podMetadata.getNamespace())); |
| serviceMetaInfo.setTags(transformLabelsToTags(podMetadata)); |
| |
| return serviceMetaInfo; |
| } |
| |
| public boolean isNode(final String ip) { |
| return nodeIPs.contains(ip); |
| } |
| |
| public boolean isEmpty() { |
| return ipServiceMetaInfoMap.isEmpty(); |
| } |
| } |