blob: 7ff002e7b61583b0d1e24716b42b8c6deeac660d [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller/filter"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
)
// Pilot can get EDS information from Kubernetes from two mutually exclusive sources, Endpoints and
// EndpointSlices. The kubeEndpointsController abstracts these details and provides a common interface
// that both sources implement.
type kubeEndpointsController interface {
HasSynced() bool
Run(stopCh <-chan struct{})
getInformer() filter.FilteredSharedIndexInformer
onEvent(curr interface{}, event model.Event) error
InstancesByPort(c *Controller, svc *model.Service, reqSvcPort int, labelsList labels.Instance) []*model.ServiceInstance
GetProxyServiceInstances(c *Controller, proxy *model.Proxy) []*model.ServiceInstance
buildIstioEndpoints(ep interface{}, host host.Name) []*model.IstioEndpoint
buildIstioEndpointsWithService(name, namespace string, host host.Name, clearCache bool) []*model.IstioEndpoint
// forgetEndpoint does internal bookkeeping on a deleted endpoint
forgetEndpoint(endpoint interface{}) map[host.Name][]*model.IstioEndpoint
getServiceNamespacedName(ep interface{}) types.NamespacedName
}
// kubeEndpoints abstracts the common behavior across endpoint and endpoint slices.
type kubeEndpoints struct {
c *Controller
informer filter.FilteredSharedIndexInformer
}
func (e *kubeEndpoints) HasSynced() bool {
return e.informer.HasSynced()
}
func (e *kubeEndpoints) Run(stopCh <-chan struct{}) {
e.informer.Run(stopCh)
}
// processEndpointEvent triggers the config update.
func processEndpointEvent(c *Controller, epc kubeEndpointsController, name string, namespace string, event model.Event, ep interface{}) error {
// Update internal endpoint cache no matter what kind of service, even headless service.
// As for gateways, the cluster discovery type is `EDS` for headless service.
updateEDS(c, epc, ep, event)
if features.EnableHeadlessService {
if svc, _ := c.serviceLister.Services(namespace).Get(name); svc != nil {
for _, modelSvc := range c.servicesForNamespacedName(kube.NamespacedNameForK8sObject(svc)) {
// if the service is headless service, trigger a full push.
if svc.Spec.ClusterIP == v1.ClusterIPNone {
c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
Full: true,
// TODO: extend and set service instance type, so no need to re-init push context
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: modelSvc.Hostname.String(),
Namespace: svc.Namespace,
}: {}},
Reason: []model.TriggerReason{model.EndpointUpdate},
})
return nil
}
}
}
}
return nil
}
func updateEDS(c *Controller, epc kubeEndpointsController, ep interface{}, event model.Event) {
namespacedName := epc.getServiceNamespacedName(ep)
log.Debugf("Handle EDS endpoint %s %s %s in namespace %s", namespacedName.Name, event, namespacedName.Namespace)
var forgottenEndpointsByHost map[host.Name][]*model.IstioEndpoint
if event == model.EventDelete {
forgottenEndpointsByHost = epc.forgetEndpoint(ep)
}
shard := model.ShardKeyFromRegistry(c)
for _, hostName := range c.hostNamesForNamespacedName(namespacedName) {
var endpoints []*model.IstioEndpoint
if forgottenEndpointsByHost != nil {
endpoints = forgottenEndpointsByHost[hostName]
} else {
endpoints = epc.buildIstioEndpoints(ep, hostName)
}
svc := c.GetService(hostName)
if svc != nil {
fep := c.collectWorkloadInstanceEndpoints(svc)
endpoints = append(endpoints, fep...)
} else {
log.Debugf("Handle EDS endpoint: skip collecting workload entry endpoints, service %s/%s has not been populated",
namespacedName.Namespace, namespacedName.Name)
}
c.opts.XDSUpdater.EDSUpdate(shard, string(hostName), namespacedName.Namespace, endpoints)
}
}
// getPod fetches a pod by name or IP address.
// A pod may be missing (nil) for two reasons:
// - It is an endpoint without an associated Pod. In this case, expectPod will be false.
// - It is an endpoint with an associate Pod, but its not found. In this case, expectPod will be true.
// this may happen due to eventually consistency issues, out of order events, etc. In this case, the caller
// should not precede with the endpoint, or inaccurate information would be sent which may have impacts on
// correctness and security.
//
// Note: this is only used by endpoints and endpointslice controller
func getPod(c *Controller, ip string, ep *metav1.ObjectMeta, targetRef *v1.ObjectReference, host host.Name) (*v1.Pod, bool) {
var expectPod bool
pod := c.getPod(ip, ep.Namespace, targetRef)
if targetRef != nil && targetRef.Kind == "Pod" {
expectPod = true
if pod == nil {
c.registerEndpointResync(ep, ip, host)
}
}
return pod, expectPod
}
func (c *Controller) registerEndpointResync(ep *metav1.ObjectMeta, ip string, host host.Name) {
// This means, the endpoint event has arrived before pod event.
// This might happen because PodCache is eventually consistent.
log.Debugf("Endpoint without pod %s %s.%s", ip, ep.Name, ep.Namespace)
endpointsWithNoPods.Increment()
if c.opts.Metrics != nil {
c.opts.Metrics.AddMetric(model.EndpointNoPod, string(host), "", ip)
}
// Tell pod cache we want to queue the endpoint event when this pod arrives.
epkey := kube.KeyFunc(ep.Name, ep.Namespace)
c.pods.queueEndpointEventOnPodArrival(epkey, ip)
}
// getPod fetches a pod by name or IP address.
// A pod may be missing (nil) for two reasons:
// * It is an endpoint without an associated Pod.
// * It is an endpoint with an associate Pod, but its not found.
func (c *Controller) getPod(ip string, namespace string, targetRef *v1.ObjectReference) *v1.Pod {
if targetRef != nil && targetRef.Kind == "Pod" {
key := kube.KeyFunc(targetRef.Name, targetRef.Namespace)
pod := c.pods.getPodByKey(key)
return pod
}
// This means the endpoint is manually controlled
// TODO: this may be not correct because of the hostnetwork pods may have same ip address
// Do we have a way to get the pod from only endpoint?
pod := c.pods.getPodByIP(ip)
if pod != nil {
// This prevents selecting a pod in another different namespace
if pod.Namespace != namespace {
pod = nil
}
}
// There maybe no pod at all
return pod
}