blob: 5f8e1d0e4d5f247e04818f6676447e9bc3366c5c [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"fmt"
"sync"
)
import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller/filter"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
// PodCache is an eventually consistent pod cache
type PodCache struct {
informer filter.FilteredSharedIndexInformer
sync.RWMutex
// podsByIP maintains stable pod IP to name key mapping
// this allows us to retrieve the latest status by pod IP.
// This should only contain RUNNING or PENDING pods with an allocated IP.
podsByIP map[string]string
// IPByPods is a reverse map of podsByIP. This exists to allow us to prune stale entries in the
// pod cache if a pod changes IP.
IPByPods map[string]string
// needResync is map of IP to endpoint namespace/name. This is used to requeue endpoint
// events when pod event comes. This typically happens when pod is not available
// in podCache when endpoint event comes.
needResync map[string]sets.Set
queueEndpointEvent func(string)
c *Controller
}
func newPodCache(c *Controller, informer filter.FilteredSharedIndexInformer, queueEndpointEvent func(string)) *PodCache {
out := &PodCache{
informer: informer,
c: c,
podsByIP: make(map[string]string),
IPByPods: make(map[string]string),
needResync: make(map[string]sets.Set),
queueEndpointEvent: queueEndpointEvent,
}
return out
}
// Copied from kubernetes/pkg/controller/endpoint/endpoints_controller.go
func shouldPodBeInEndpoints(pod *v1.Pod) bool {
switch pod.Spec.RestartPolicy {
case v1.RestartPolicyNever:
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
case v1.RestartPolicyOnFailure:
return pod.Status.Phase != v1.PodSucceeded
default:
return true
}
}
// IsPodReady is copied from kubernetes/pkg/api/v1/pod/utils.go
func IsPodReady(pod *v1.Pod) bool {
return IsPodReadyConditionTrue(pod.Status)
}
// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.
func IsPodReadyConditionTrue(status v1.PodStatus) bool {
condition := GetPodReadyCondition(status)
return condition != nil && condition.Status == v1.ConditionTrue
}
func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition {
_, condition := GetPodCondition(&status, v1.PodReady)
return condition
}
func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if status == nil {
return -1, nil
}
return GetPodConditionFromList(status.Conditions, conditionType)
}
// GetPodConditionFromList extracts the provided condition from the given list of condition and
// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present.
func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if conditions == nil {
return -1, nil
}
for i := range conditions {
if conditions[i].Type == conditionType {
return i, &conditions[i]
}
}
return -1, nil
}
// onEvent updates the IP-based index (pc.podsByIP).
func (pc *PodCache) onEvent(curr interface{}, ev model.Event) error {
// When a pod is deleted obj could be an *v1.Pod or a DeletionFinalStateUnknown marker item.
pod, ok := curr.(*v1.Pod)
if !ok {
tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("couldn't get object from tombstone %+v", curr)
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
return fmt.Errorf("tombstone contained object that is not a pod %#v", curr)
}
}
ip := pod.Status.PodIP
// PodIP will be empty when pod is just created, but before the IP is assigned
// via UpdateStatus.
if len(ip) == 0 {
return nil
}
key := kube.KeyFunc(pod.Name, pod.Namespace)
switch ev {
case model.EventAdd:
// can happen when istiod just starts
if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
return nil
} else if shouldPodBeInEndpoints(pod) {
pc.update(ip, key)
} else {
return nil
}
case model.EventUpdate:
if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
// delete only if this pod was in the cache
pc.deleteIP(ip, key)
ev = model.EventDelete
} else if shouldPodBeInEndpoints(pod) {
pc.update(ip, key)
} else {
return nil
}
case model.EventDelete:
// delete only if this pod was in the cache,
// in most case it has already been deleted in `UPDATE` with `DeletionTimestamp` set.
if !pc.deleteIP(ip, key) {
return nil
}
}
pc.notifyWorkloadHandlers(pod, ev)
return nil
}
// notifyWorkloadHandlers fire workloadInstance handlers for pod
func (pc *PodCache) notifyWorkloadHandlers(pod *v1.Pod, ev model.Event) {
// fire instance handles for workload
ep := NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(pod.Status.PodIP, 0, "", model.AlwaysDiscoverable)
workloadInstance := &model.WorkloadInstance{
Name: pod.Name,
Namespace: pod.Namespace,
Kind: model.PodKind,
Endpoint: ep,
PortMap: getPortMap(pod),
}
pc.c.handlers.NotifyWorkloadHandlers(workloadInstance, ev)
}
func getPortMap(pod *v1.Pod) map[string]uint32 {
pmap := map[string]uint32{}
for _, c := range pod.Spec.Containers {
for _, port := range c.Ports {
if port.Name == "" || port.Protocol != v1.ProtocolTCP {
continue
}
// First port wins, per Kubernetes (https://github.com/kubernetes/kubernetes/issues/54213)
if _, f := pmap[port.Name]; !f {
pmap[port.Name] = uint32(port.ContainerPort)
}
}
}
return pmap
}
// deleteIP returns true if the pod and ip are really deleted.
func (pc *PodCache) deleteIP(ip string, podKey string) bool {
pc.Lock()
defer pc.Unlock()
if pc.podsByIP[ip] == podKey {
delete(pc.podsByIP, ip)
delete(pc.IPByPods, podKey)
return true
}
return false
}
func (pc *PodCache) update(ip, key string) {
pc.Lock()
// if the pod has been cached, return
if key == pc.podsByIP[ip] {
pc.Unlock()
return
}
if current, f := pc.IPByPods[key]; f {
// The pod already exists, but with another IP Address. We need to clean up that
delete(pc.podsByIP, current)
}
pc.podsByIP[ip] = key
pc.IPByPods[key] = ip
if endpointsToUpdate, f := pc.needResync[ip]; f {
delete(pc.needResync, ip)
for epKey := range endpointsToUpdate {
pc.queueEndpointEvent(epKey)
}
endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
}
pc.Unlock()
pc.proxyUpdates(ip)
}
// queueEndpointEventOnPodArrival registers this endpoint and queues endpoint event
// when the corresponding pod arrives.
func (pc *PodCache) queueEndpointEventOnPodArrival(key, ip string) {
pc.Lock()
defer pc.Unlock()
if _, f := pc.needResync[ip]; !f {
pc.needResync[ip] = sets.New(key)
} else {
pc.needResync[ip].Insert(key)
}
endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
}
// endpointDeleted cleans up endpoint from resync endpoint list.
func (pc *PodCache) endpointDeleted(key string, ip string) {
pc.Lock()
defer pc.Unlock()
delete(pc.needResync[ip], key)
if len(pc.needResync[ip]) == 0 {
delete(pc.needResync, ip)
}
endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
}
func (pc *PodCache) proxyUpdates(ip string) {
if pc.c != nil && pc.c.opts.XDSUpdater != nil {
pc.c.opts.XDSUpdater.ProxyUpdate(pc.c.Cluster(), ip)
}
}
func (pc *PodCache) getPodKey(addr string) (string, bool) {
pc.RLock()
defer pc.RUnlock()
key, exists := pc.podsByIP[addr]
return key, exists
}
// getPodByIp returns the pod or nil if pod not found or an error occurred
func (pc *PodCache) getPodByIP(addr string) *v1.Pod {
key, exists := pc.getPodKey(addr)
if !exists {
return nil
}
return pc.getPodByKey(key)
}
// getPodByKey returns the pod by key formatted `ns/name`
func (pc *PodCache) getPodByKey(key string) *v1.Pod {
item, _, _ := pc.informer.GetIndexer().GetByKey(key)
if item != nil {
return item.(*v1.Pod)
}
return nil
}
// getPodByKey returns the pod of the proxy
func (pc *PodCache) getPodByProxy(proxy *model.Proxy) *v1.Pod {
var pod *v1.Pod
key := podKeyByProxy(proxy)
if key != "" {
pod = pc.getPodByKey(key)
if pod != nil {
return pod
}
}
// only need to fetch the corresponding pod through the first IP, although there are multiple IP scenarios,
// because multiple ips belong to the same pod
proxyIP := proxy.IPAddresses[0]
// just in case the proxy ID is bad formatted
return pc.getPodByIP(proxyIP)
}