blob: 9284e5f5f4735bc5aad6e9d015b7cdaa7a198c08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"sync"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"istio.io/api/annotation"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
type PodCache struct {
pods kclient.Client[*v1.Pod]
sync.RWMutex
// podsByIP maintains stable pod IP to name key mapping
// this allows us to retrieve the latest status by pod IP.
// This should only contain RUNNING or PENDING pods with an allocated IP.
podsByIP map[string]sets.Set[types.NamespacedName]
// ipByPods is a reverse map of podsByIP. This exists to allow us to prune stale entries in the
// pod cache if a pod changes IP.
ipByPods map[types.NamespacedName]string
// needResync is map of IP to endpoint namespace/name. This is used to requeue endpoint
// events when pod event comes. This typically happens when pod is not available
// in podCache when endpoint event comes.
needResync map[string]sets.Set[types.NamespacedName]
queueEndpointEvent func(types.NamespacedName)
c *Controller
}
func newPodCache(c *Controller, pods kclient.Client[*v1.Pod], queueEndpointEvent func(types.NamespacedName)) *PodCache {
out := &PodCache{
pods: pods,
c: c,
podsByIP: make(map[string]sets.Set[types.NamespacedName]),
ipByPods: make(map[types.NamespacedName]string),
needResync: make(map[string]sets.Set[types.NamespacedName]),
queueEndpointEvent: queueEndpointEvent,
}
return out
}
func (pc *PodCache) endpointDeleted(key types.NamespacedName, ip string) {
pc.Lock()
defer pc.Unlock()
sets.DeleteCleanupLast(pc.needResync, ip, key)
}
func (pc *PodCache) getPodByKey(key types.NamespacedName) *v1.Pod {
return pc.pods.Get(key.Name, key.Namespace)
}
func (pc *PodCache) getPodKeys(addr string) []types.NamespacedName {
pc.RLock()
defer pc.RUnlock()
return pc.podsByIP[addr].UnsortedList()
}
func (pc *PodCache) getPodsByIP(addr string) []*v1.Pod {
keys := pc.getPodKeys(addr)
if keys == nil {
return nil
}
res := make([]*v1.Pod, 0, len(keys))
for _, key := range keys {
p := pc.getPodByKey(key)
// Subtle race condition. getPodKeys is our cache over pods, while getPodByKey hits the informer cache.
// if these are out of sync, p may be nil (pod was deleted).
if p != nil {
res = append(res, p)
}
}
return res
}
func (pc *PodCache) queueEndpointEventOnPodArrival(key types.NamespacedName, ip string) {
pc.Lock()
defer pc.Unlock()
sets.InsertOrNew(pc.needResync, ip, key)
}
func (pc *PodCache) getIPByPod(key types.NamespacedName) string {
pc.RLock()
defer pc.RUnlock()
return pc.ipByPods[key]
}
func (pc *PodCache) onEvent(old, pod *v1.Pod, ev model.Event) error {
ip := pod.Status.PodIP
// PodIP will be empty when pod is just created, but before the IP is assigned
// via UpdateStatus.
if len(ip) == 0 {
// However, in the case of an Eviction, the event that marks the pod as Failed may *also* have removed the IP.
// If the pod *used to* have an IP, then we need to actually delete it.
ip = pc.getIPByPod(config.NamespacedName(pod))
if len(ip) == 0 {
return nil
}
}
key := config.NamespacedName(pod)
switch ev {
case model.EventAdd:
if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
pc.addPod(pod, ip, key, false)
} else {
return nil
}
case model.EventUpdate:
if !shouldPodBeInEndpoints(pod) || !IsPodReady(pod) {
// delete only if this pod was in the cache
if !pc.deleteIP(ip, key) {
return nil
}
ev = model.EventDelete
} else if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
labelUpdated := pc.labelFilter(old, pod)
pc.addPod(pod, ip, key, labelUpdated)
} else {
return nil
}
case model.EventDelete:
// delete only if this pod was in the cache,
// in most case it has already been deleted in `UPDATE` with `DeletionTimestamp` set.
if !pc.deleteIP(ip, key) {
return nil
}
}
// TODO notifyWorkloadHandlers
return nil
}
func (pc *PodCache) addPod(pod *v1.Pod, ip string, key types.NamespacedName, labelUpdated bool) {
pc.Lock()
// if the pod has been cached, return
if pc.podsByIP[ip].Contains(key) {
pc.Unlock()
if labelUpdated {
pc.proxyUpdates(pod, true)
}
return
}
if current, f := pc.ipByPods[key]; f {
// The pod already exists, but with another IP Address. We need to clean up that
sets.DeleteCleanupLast(pc.podsByIP, current, key)
}
sets.InsertOrNew(pc.podsByIP, ip, key)
pc.ipByPods[key] = ip
if endpointsToUpdate, f := pc.needResync[ip]; f {
delete(pc.needResync, ip)
for epKey := range endpointsToUpdate {
pc.queueEndpointEvent(epKey)
}
}
pc.Unlock()
pc.proxyUpdates(pod, false)
}
func (pc *PodCache) deleteIP(ip string, podKey types.NamespacedName) bool {
pc.Lock()
defer pc.Unlock()
if pc.podsByIP[ip].Contains(podKey) {
sets.DeleteCleanupLast(pc.podsByIP, ip, podKey)
delete(pc.ipByPods, podKey)
return true
}
return false
}
func (pc *PodCache) labelFilter(old, cur *v1.Pod) bool {
// If labels/annotations updated, trigger proxy push
labelsChanged := !maps.Equal(old.Labels, cur.Labels)
// Annotations are only used in endpoints in one case, so just compare that one
relevantAnnotationsChanged := old.Annotations[annotation.AmbientRedirection.Name] != cur.Annotations[annotation.AmbientRedirection.Name]
changed := labelsChanged || relevantAnnotationsChanged
return changed
}
func (pc *PodCache) proxyUpdates(pod *v1.Pod, isPodUpdate bool) {
if pc.c != nil {
if pc.c.opts.XDSUpdater != nil {
ip := pod.Status.PodIP
pc.c.opts.XDSUpdater.ProxyUpdate(pc.c.Cluster(), ip)
}
if isPodUpdate {
// Recompute service(s) due to pod label change.
// If it is a new pod, no need to recompute, as it yet computed for the first time yet.
pc.c.recomputeServiceForPod(pod)
}
}
}
func shouldPodBeInEndpoints(pod *v1.Pod) bool {
if isPodPhaseTerminal(pod.Status.Phase) {
return false
}
if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
return false
}
if pod.DeletionTimestamp != nil {
return false
}
return true
}
func isPodPhaseTerminal(phase v1.PodPhase) bool {
return phase == v1.PodFailed || phase == v1.PodSucceeded
}
func IsPodReady(pod *v1.Pod) bool {
return IsPodReadyConditionTrue(pod.Status)
}
func IsPodReadyConditionTrue(status v1.PodStatus) bool {
condition := GetPodReadyCondition(status)
return condition != nil && condition.Status == v1.ConditionTrue
}
func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition {
_, condition := GetPodCondition(&status, v1.PodReady)
return condition
}
func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if status == nil {
return -1, nil
}
return GetPodConditionFromList(status.Conditions, conditionType)
}
func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if conditions == nil {
return -1, nil
}
for i := range conditions {
if conditions[i].Type == conditionType {
return i, &conditions[i]
}
}
return -1, nil
}