blob: 5d3911824d30e2fbaec8f5210972b5f0d2d8e490 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.meter.analyzer.k8s;
import static java.util.Objects.isNull;
import static java.util.Optional.ofNullable;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.library.kubernetes.KubernetesPodListener;
import org.apache.skywalking.library.kubernetes.KubernetesPodWatcher;
import org.apache.skywalking.library.kubernetes.KubernetesServiceListener;
import org.apache.skywalking.library.kubernetes.KubernetesServiceWatcher;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import com.google.common.base.Strings;
import io.kubernetes.client.openapi.models.V1LoadBalancerIngress;
import io.kubernetes.client.openapi.models.V1LoadBalancerStatus;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1ServiceStatus;
import lombok.SneakyThrows;
public class K8sInfoRegistry
implements KubernetesServiceListener, KubernetesPodListener {
private final static K8sInfoRegistry INSTANCE = new K8sInfoRegistry();
private final Map<String/* podName.namespace */, V1Pod> namePodMap = new ConcurrentHashMap<>();
protected final Map<String/* serviceName.namespace */, V1Service> nameServiceMap = new ConcurrentHashMap<>();
private final Map<String/* podName.namespace */, String /* serviceName.namespace */> podServiceMap = new ConcurrentHashMap<>();
private final Map<String/* podIP */, String /* podName.namespace */> ipPodMap = new ConcurrentHashMap<>();
private final Map<String/* serviceIP */, String /* serviceName.namespace */> ipServiceMap = new ConcurrentHashMap<>();
private static final String SEPARATOR = ".";
public static K8sInfoRegistry getInstance() {
return INSTANCE;
}
@SneakyThrows
public void start() {
KubernetesPodWatcher.INSTANCE.addListener(this).start();
KubernetesServiceWatcher.INSTANCE.addListener(this).start();
}
@Override
public void onServiceAdded(final V1Service service) {
ofNullable(service.getMetadata()).ifPresent(
metadata -> nameServiceMap.put(metadata.getName() + SEPARATOR + metadata.getNamespace(), service)
);
recompose();
}
@Override
public void onServiceDeleted(final V1Service service) {
ofNullable(service.getMetadata()).ifPresent(
metadata -> nameServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace())
);
ofNullable(service.getStatus()).map(V1ServiceStatus::getLoadBalancer).filter(Objects::nonNull)
.map(V1LoadBalancerStatus::getIngress).filter(CollectionUtils::isNotEmpty)
.ifPresent(l -> l.stream().filter(i -> StringUtil.isNotEmpty(i.getIp()))
.forEach(i -> ipServiceMap.remove(i.getIp())));
ofNullable(service.getSpec()).map(V1ServiceSpec::getClusterIPs).filter(CollectionUtils::isNotEmpty)
.ifPresent(l -> l.stream().filter(StringUtil::isNotEmpty).forEach(ipServiceMap::remove));
recompose();
}
@Override
public void onServiceUpdated(V1Service oldService, V1Service newService) {
onServiceAdded(newService);
}
@Override
public void onPodAdded(final V1Pod pod) {
ofNullable(pod.getMetadata()).ifPresent(
metadata -> namePodMap.put(metadata.getName() + SEPARATOR + metadata.getNamespace(), pod));
recompose();
}
@Override
public void onPodDeleted(final V1Pod pod) {
ofNullable(pod.getMetadata()).ifPresent(
metadata -> namePodMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace()));
ofNullable(pod.getMetadata()).ifPresent(
metadata -> podServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace()));
ofNullable(pod.getStatus()).filter(s -> StringUtil.isNotEmpty(s.getPodIP())).ifPresent(
status -> ipPodMap.remove(status.getPodIP()));
}
@Override
public void onPodUpdated(V1Pod oldPod, V1Pod newPod) {
onPodAdded(newPod);
}
private void recompose() {
namePodMap.forEach((podName, pod) -> {
if (!isNull(pod.getMetadata())) {
ofNullable(pod.getStatus()).filter(s -> StringUtil.isNotEmpty(s.getPodIP())).ifPresent(
status -> ipPodMap.put(status.getPodIP(), podName));
}
nameServiceMap.forEach((serviceName, service) -> {
if (isNull(pod.getMetadata()) || isNull(service.getMetadata()) || isNull(service.getSpec())) {
return;
}
Map<String, String> selector = service.getSpec().getSelector();
Map<String, String> labels = pod.getMetadata().getLabels();
if (isNull(labels) || isNull(selector)) {
return;
}
String podNamespace = pod.getMetadata().getNamespace();
String serviceNamespace = service.getMetadata().getNamespace();
if (Strings.isNullOrEmpty(podNamespace) || Strings.isNullOrEmpty(
serviceNamespace) || !podNamespace.equals(serviceNamespace)) {
return;
}
if (hasIntersection(selector.entrySet(), labels.entrySet())) {
podServiceMap.put(podName, serviceName);
}
});
});
nameServiceMap.forEach((serviceName, service) -> {
if (!isNull(service.getSpec()) && CollectionUtils.isNotEmpty(service.getSpec().getClusterIPs())) {
for (String clusterIP : service.getSpec().getClusterIPs()) {
ipServiceMap.put(clusterIP, serviceName);
}
}
if (!isNull(service.getStatus()) && !isNull(service.getStatus().getLoadBalancer())
&& CollectionUtils.isNotEmpty(service.getStatus().getLoadBalancer().getIngress())) {
for (V1LoadBalancerIngress loadBalancerIngress : service.getStatus().getLoadBalancer().getIngress()) {
if (StringUtil.isNotEmpty(loadBalancerIngress.getIp())) {
ipServiceMap.put(loadBalancerIngress.getIp(), serviceName);
}
}
}
});
}
public String findServiceName(String namespace, String podName) {
return this.podServiceMap.get(podName + SEPARATOR + namespace);
}
public String findPodByIP(String ip) {
return this.ipPodMap.get(ip);
}
public String findServiceByIP(String ip) {
return this.ipServiceMap.get(ip);
}
private boolean hasIntersection(Collection<?> o, Collection<?> c) {
Objects.requireNonNull(o);
Objects.requireNonNull(c);
for (final Object value : o) {
if (!c.contains(value)) {
return false;
}
}
return true;
}
}