blob: 9902e147ce991cc83a8b0a961dd362e07fc041de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.isNull;
import static java.util.Optional.ofNullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.skywalking.library.kubernetes.KubernetesEndpointWatcher;
import org.apache.skywalking.library.kubernetes.KubernetesEndpointsListener;
import org.apache.skywalking.library.kubernetes.KubernetesNodeListener;
import org.apache.skywalking.library.kubernetes.KubernetesNodeWatcher;
import org.apache.skywalking.library.kubernetes.KubernetesPodListener;
import org.apache.skywalking.library.kubernetes.KubernetesPodWatcher;
import org.apache.skywalking.library.kubernetes.KubernetesServiceListener;
import org.apache.skywalking.library.kubernetes.KubernetesServiceWatcher;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import com.google.common.collect.ImmutableMap;
import io.kubernetes.client.openapi.models.V1Endpoints;
import io.kubernetes.client.openapi.models.V1Node;
import io.kubernetes.client.openapi.models.V1NodeAddress;
import io.kubernetes.client.openapi.models.V1NodeStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1Service;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class K8SServiceRegistry
implements KubernetesServiceListener, KubernetesPodListener,
KubernetesEndpointsListener, KubernetesNodeListener {
protected final Map<String/* ip */, ServiceMetaInfo> ipServiceMetaInfoMap;
protected final Map<String/* namespace:serviceName */, V1Service> idServiceMap;
protected final Map<String/* ip */, V1Pod> ipPodMap;
protected final Map<String/* ip */, String/* namespace:serviceName */> ipServiceMap;
protected final ServiceNameFormatter serviceNameFormatter;
private final EnvoyMetricReceiverConfig config;
private final Set<String> nodeIPs = Collections.newSetFromMap(new ConcurrentHashMap<>());
public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) {
this.config = config;
serviceNameFormatter = new ServiceNameFormatter(config.getK8sServiceNameRule());
ipServiceMetaInfoMap = new ConcurrentHashMap<>();
idServiceMap = new ConcurrentHashMap<>();
ipPodMap = new ConcurrentHashMap<>();
ipServiceMap = new ConcurrentHashMap<>();
}
public void start() throws IOException {
KubernetesPodWatcher.INSTANCE.addListener(this).start();
KubernetesServiceWatcher.INSTANCE.addListener(this).start();
KubernetesEndpointWatcher.INSTANCE.addListener(this).start();
KubernetesNodeWatcher.INSTANCE.addListener(this).start();
}
@Override
public void onServiceAdded(final V1Service service) {
ofNullable(service.getMetadata()).ifPresent(
metadata -> idServiceMap.put(metadata.getNamespace() + ":" + metadata.getName(), service)
);
recompose();
}
@Override
public void onServiceDeleted(final V1Service service) {
ofNullable(service.getMetadata()).ifPresent(
metadata -> idServiceMap.remove(metadata.getNamespace() + ":" + metadata.getName())
);
}
@Override
public void onServiceUpdated(V1Service oldService, V1Service newService) {
onServiceAdded(newService);
}
@Override
public void onPodAdded(final V1Pod pod) {
ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(podIP -> ipPodMap.put(podIP, pod));
recompose();
}
@Override
public void onPodDeleted(final V1Pod pod) {
ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(ipPodMap::remove);
}
@Override
public void onPodUpdated(V1Pod oldPod, V1Pod newPod) {
onPodAdded(newPod);
}
@Override
public void onEndpointsAdded(final V1Endpoints endpoints) {
V1ObjectMeta endpointsMetadata = endpoints.getMetadata();
if (isNull(endpointsMetadata)) {
log.error("Endpoints metadata is null: {}", endpoints);
return;
}
final String namespace = endpointsMetadata.getNamespace();
final String name = endpointsMetadata.getName();
ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
address -> ofNullable(address.getIp()).ifPresent(ip -> ipServiceMap.put(ip, namespace + ":" + name))
))
));
recompose();
}
@Override
public void onEndpointsDeleted(final V1Endpoints endpoints) {
ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
address -> ofNullable(address.getIp()).ifPresent(ipServiceMap::remove)
))
));
}
@Override
public void onEndpointsUpdated(V1Endpoints oldEndpoints, V1Endpoints newEndpoints) {
onEndpointsAdded(newEndpoints);
}
@Override
public void onNodeAdded(final V1Node node) {
forEachAddress(node, nodeIPs::add);
}
@Override
public void onNodeUpdated(final V1Node oldNode, final V1Node newNode) {
onNodeAdded(newNode);
}
@Override
public void onNodeDeleted(final V1Node node) {
forEachAddress(node, nodeIPs::remove);
}
protected void forEachAddress(final V1Node node,
final Consumer<String> consume) {
Optional.ofNullable(node)
.map(V1Node::getStatus)
.map(V1NodeStatus::getAddresses)
.ifPresent(addresses ->
addresses.stream()
.map(V1NodeAddress::getAddress)
.filter(StringUtil::isNotBlank)
.forEach(consume)
);
}
protected List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final V1ObjectMeta podMeta) {
final Map<String, String> labels = podMeta.getLabels();
final List<ServiceMetaInfo.KeyValue> tags = new ArrayList<>();
tags.add(new ServiceMetaInfo.KeyValue("pod", podMeta.getName()));
tags.add(new ServiceMetaInfo.KeyValue("namespace", podMeta.getNamespace()));
if (isNull(labels)) {
return tags;
}
return labels.entrySet()
.stream()
.map(each -> new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue()))
.collect(Collectors.toCollection(() -> tags));
}
public ServiceMetaInfo findService(final String ip) {
if (isNode(ip)) {
return config.serviceMetaInfoFactory().unknown();
}
final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
if (isNull(service)) {
log.debug("Unknown ip {}, ip -> service is null", ip);
return config.serviceMetaInfoFactory().unknown();
}
return service;
}
protected void recompose() {
ipPodMap.forEach((ip, pod) -> {
final String namespaceService = ipServiceMap.get(ip);
final V1Service service;
if (isNullOrEmpty(namespaceService) || isNull(service = idServiceMap.get(namespaceService))) {
return;
}
if (isNull(pod.getMetadata())) {
log.warn("Pod metadata is null, {}", pod);
return;
}
ipServiceMetaInfoMap.computeIfAbsent(ip, unused -> composeServiceMetaInfo(service, pod));
});
}
protected ServiceMetaInfo composeServiceMetaInfo(final V1Service service, final V1Pod pod) {
final Map<String, Object> context = ImmutableMap.of("service", service, "pod", pod);
final ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
final V1ObjectMeta podMetadata = pod.getMetadata();
try {
serviceMetaInfo.setServiceName(serviceNameFormatter.format(context));
} catch (Exception e) {
log.error("Failed to evaluate service name.", e);
final V1ObjectMeta serviceMetadata = service.getMetadata();
if (isNull(serviceMetadata)) {
log.warn("Service metadata is null, {}", service);
return config.serviceMetaInfoFactory().unknown();
}
serviceMetaInfo.setServiceName(serviceMetadata.getName());
}
serviceMetaInfo.setServiceInstanceName(
String.format("%s.%s", podMetadata.getName(), podMetadata.getNamespace()));
serviceMetaInfo.setTags(transformLabelsToTags(podMetadata));
return serviceMetaInfo;
}
public boolean isNode(final String ip) {
return nodeIPs.contains(ip);
}
public boolean isEmpty() {
return ipServiceMetaInfoMap.isEmpty();
}
}