blob: 917375f8132c3304353525cf9f2eba3e150163c6 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"sync"
)
import (
"istio.io/api/label"
v1 "k8s.io/api/discovery/v1"
"k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
listerv1 "k8s.io/client-go/listers/discovery/v1"
listerv1beta1 "k8s.io/client-go/listers/discovery/v1beta1"
"k8s.io/client-go/tools/cache"
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller/filter"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube"
)
type endpointSliceController struct {
kubeEndpoints
endpointCache *endpointSliceCache
useV1Resource bool
}
var _ kubeEndpointsController = &endpointSliceController{}
var (
endpointSliceRequirement = labelRequirement(mcs.LabelServiceName, selection.DoesNotExist, nil)
endpointSliceSelector = klabels.NewSelector().Add(*endpointSliceRequirement)
)
func newEndpointSliceController(c *Controller) *endpointSliceController {
// TODO Endpoints has a special cache, to filter out irrelevant updates to kube-system
// Investigate if we need this, or if EndpointSlice is makes this not relevant
useV1Resource := endpointSliceV1Available(c.client)
var informer cache.SharedIndexInformer
if useV1Resource {
informer = c.client.KubeInformer().Discovery().V1().EndpointSlices().Informer()
} else {
informer = c.client.KubeInformer().Discovery().V1beta1().EndpointSlices().Informer()
}
filteredInformer := filter.NewFilteredSharedIndexInformer(
c.opts.DiscoveryNamespacesFilter.Filter,
informer,
)
out := &endpointSliceController{
kubeEndpoints: kubeEndpoints{
c: c,
informer: filteredInformer,
},
useV1Resource: useV1Resource,
endpointCache: newEndpointSliceCache(),
}
c.registerHandlers(filteredInformer, "EndpointSlice", out.onEvent, nil)
return out
}
// TODO use this to automatically switch to EndpointSlice mode
func endpointSliceV1Available(client kubelib.Client) bool {
return client != nil && kubelib.IsAtLeastVersion(client, 21)
}
func (esc *endpointSliceController) getInformer() filter.FilteredSharedIndexInformer {
return esc.informer
}
func (esc *endpointSliceController) listSlices(ns string, selector klabels.Selector) (slices []interface{}, err error) {
if esc.useV1Resource {
var eps []*v1.EndpointSlice
eps, err = listerv1.NewEndpointSliceLister(esc.informer.GetIndexer()).EndpointSlices(ns).List(selector)
slices = make([]interface{}, len(eps))
for i, ep := range eps {
slices[i] = ep
}
} else {
var eps []*v1beta1.EndpointSlice
eps, err = listerv1beta1.NewEndpointSliceLister(esc.informer.GetIndexer()).EndpointSlices(ns).List(selector)
slices = make([]interface{}, len(eps))
for i, ep := range eps {
slices[i] = ep
}
}
return
}
func (esc *endpointSliceController) onEvent(curr interface{}, event model.Event) error {
ep, ok := curr.(metav1.Object)
if !ok {
tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("Couldn't get object from tombstone %#v", curr)
return nil
}
epGvk, ok := tombstone.Obj.(runtime.Object)
if !ok || epGvk.GetObjectKind().GroupVersionKind().Kind != "EndpointSlice" {
log.Errorf("Tombstone contained an object that is not an endpoints slice %#v", curr)
return nil
}
}
esLabels := ep.GetLabels()
if endpointSliceSelector.Matches(klabels.Set(esLabels)) {
return processEndpointEvent(esc.c, esc, serviceNameForEndpointSlice(esLabels), ep.GetNamespace(), event, ep)
}
return nil
}
// GetProxyServiceInstances returns service instances co-located with a given proxy
// TODO: this code does not return k8s service instances when the proxy's IP is a workload entry
// To tackle this, we need a ip2instance map like what we have in service entry.
func (esc *endpointSliceController) GetProxyServiceInstances(c *Controller, proxy *model.Proxy) []*model.ServiceInstance {
eps, err := esc.listSlices(proxy.Metadata.Namespace, endpointSliceSelector)
if err != nil {
log.Errorf("Get endpointslice by index failed: %v", err)
return nil
}
var out []*model.ServiceInstance
for _, ep := range eps {
instances := esc.sliceServiceInstances(c, ep, proxy)
out = append(out, instances...)
}
return out
}
func serviceNameForEndpointSlice(labels map[string]string) string {
return labels[v1beta1.LabelServiceName]
}
func (esc *endpointSliceController) sliceServiceInstances(c *Controller, slice interface{}, proxy *model.Proxy) []*model.ServiceInstance {
var out []*model.ServiceInstance
ep := wrapEndpointSlice(slice)
if ep.AddressType() == v1.AddressTypeFQDN {
// TODO(https://github.com/istio/istio/issues/34995) support FQDN endpointslice
return out
}
for _, svc := range c.servicesForNamespacedName(esc.getServiceNamespacedName(ep)) {
pod := c.pods.getPodByProxy(proxy)
builder := NewEndpointBuilder(c, pod)
discoverabilityPolicy := c.exports.EndpointDiscoverabilityPolicy(svc)
for _, port := range ep.Ports() {
if port.Name == nil || port.Port == nil {
continue
}
svcPort, exists := svc.Ports.Get(*port.Name)
if !exists {
continue
}
// consider multiple IP scenarios
for _, ip := range proxy.IPAddresses {
for _, ep := range ep.Endpoints() {
for _, a := range ep.Addresses {
if a == ip {
istioEndpoint := builder.buildIstioEndpoint(ip, *port.Port, svcPort.Name, discoverabilityPolicy)
out = append(out, &model.ServiceInstance{
Endpoint: istioEndpoint,
ServicePort: svcPort,
Service: svc,
})
// If the endpoint isn't ready, report this
if ep.Conditions.Ready != nil && !*ep.Conditions.Ready && c.opts.Metrics != nil {
c.opts.Metrics.AddMetric(model.ProxyStatusEndpointNotReady, proxy.ID, proxy.ID, "")
}
}
}
}
}
}
}
return out
}
func (esc *endpointSliceController) forgetEndpoint(endpoint interface{}) map[host.Name][]*model.IstioEndpoint {
slice := wrapEndpointSlice(endpoint)
key := kube.KeyFunc(slice.Name, slice.Namespace)
for _, e := range slice.Endpoints() {
for _, a := range e.Addresses {
esc.c.pods.endpointDeleted(key, a)
}
}
out := make(map[host.Name][]*model.IstioEndpoint)
for _, hostName := range esc.c.hostNamesForNamespacedName(esc.getServiceNamespacedName(slice)) {
// endpointSlice cache update
if esc.endpointCache.Has(hostName) {
esc.endpointCache.Delete(hostName, slice.Name)
out[hostName] = esc.endpointCache.Get(hostName)
}
}
return out
}
func (esc *endpointSliceController) buildIstioEndpoints(es interface{}, hostName host.Name) []*model.IstioEndpoint {
esc.updateEndpointCacheForSlice(hostName, es)
return esc.endpointCache.Get(hostName)
}
func (esc *endpointSliceController) updateEndpointCacheForSlice(hostName host.Name, ep interface{}) {
var endpoints []*model.IstioEndpoint
slice := wrapEndpointSlice(ep)
if slice.AddressType() == v1.AddressTypeFQDN {
// TODO(https://github.com/istio/istio/issues/34995) support FQDN endpointslice
return
}
discoverabilityPolicy := esc.c.exports.EndpointDiscoverabilityPolicy(esc.c.GetService(hostName))
for _, e := range slice.Endpoints() {
if !features.SendUnhealthyEndpoints {
if e.Conditions.Ready != nil && !*e.Conditions.Ready {
// Ignore not ready endpoints
continue
}
}
ready := e.Conditions.Ready == nil || *e.Conditions.Ready
for _, a := range e.Addresses {
pod, expectedPod := getPod(esc.c, a, &metav1.ObjectMeta{Name: slice.Name, Namespace: slice.Namespace}, e.TargetRef, hostName)
if pod == nil && expectedPod {
continue
}
builder := NewEndpointBuilder(esc.c, pod)
// EDS and ServiceEntry use name for service port - ADS will need to map to numbers.
for _, port := range slice.Ports() {
var portNum int32
if port.Port != nil {
portNum = *port.Port
}
var portName string
if port.Name != nil {
portName = *port.Name
}
istioEndpoint := builder.buildIstioEndpoint(a, portNum, portName, discoverabilityPolicy)
if ready {
istioEndpoint.HealthStatus = model.Healthy
} else {
istioEndpoint.HealthStatus = model.UnHealthy
}
endpoints = append(endpoints, istioEndpoint)
}
}
}
esc.endpointCache.Update(hostName, slice.Name, endpoints)
}
func (esc *endpointSliceController) buildIstioEndpointsWithService(name, namespace string, hostName host.Name, updateCache bool) []*model.IstioEndpoint {
esLabelSelector := endpointSliceSelectorForService(name)
slices, err := esc.listSlices(namespace, esLabelSelector)
if err != nil || len(slices) == 0 {
log.Debugf("endpoint slices of (%s, %s) not found => error %v", name, namespace, err)
return nil
}
if updateCache {
// A cache update was requested. Rebuild the endpoints for these slices.
for _, slice := range slices {
esc.updateEndpointCacheForSlice(hostName, slice)
}
}
return esc.endpointCache.Get(hostName)
}
func (esc *endpointSliceController) getServiceNamespacedName(es interface{}) types.NamespacedName {
slice := es.(metav1.Object)
return types.NamespacedName{
Namespace: slice.GetNamespace(),
Name: serviceNameForEndpointSlice(slice.GetLabels()),
}
}
func (esc *endpointSliceController) InstancesByPort(c *Controller, svc *model.Service, reqSvcPort int, lbls labels.Instance) []*model.ServiceInstance {
esLabelSelector := endpointSliceSelectorForService(svc.Attributes.Name)
slices, err := esc.listSlices(svc.Attributes.Namespace, esLabelSelector)
if err != nil {
log.Infof("get endpoints(%s, %s) => error %v", svc.Attributes.Name, svc.Attributes.Namespace, err)
return nil
}
if len(slices) == 0 {
return nil
}
// Locate all ports in the actual service
svcPort, exists := svc.Ports.GetByPort(reqSvcPort)
if !exists {
return nil
}
discoverabilityPolicy := c.exports.EndpointDiscoverabilityPolicy(svc)
var out []*model.ServiceInstance
for _, es := range slices {
slice := wrapEndpointSlice(es)
if slice.AddressType() == v1.AddressTypeFQDN {
// TODO(https://github.com/istio/istio/issues/34995) support FQDN endpointslice
continue
}
for _, e := range slice.Endpoints() {
for _, a := range e.Addresses {
var podLabels labels.Instance
pod, expectedPod := getPod(c, a, &metav1.ObjectMeta{Name: slice.Name, Namespace: slice.Namespace}, e.TargetRef, svc.Hostname)
if pod == nil && expectedPod {
continue
}
if pod != nil {
podLabels = pod.Labels
}
// check that one of the input labels is a subset of the labels
if !lbls.SubsetOf(podLabels) {
continue
}
builder := NewEndpointBuilder(esc.c, pod)
// identify the port by name. K8S EndpointPort uses the service port name
for _, port := range slice.Ports() {
var portNum int32
if port.Port != nil {
portNum = *port.Port
}
if port.Name == nil ||
svcPort.Name == *port.Name {
istioEndpoint := builder.buildIstioEndpoint(a, portNum, svcPort.Name, discoverabilityPolicy)
out = append(out, &model.ServiceInstance{
Endpoint: istioEndpoint,
ServicePort: svcPort,
Service: svc,
})
}
}
}
}
}
return out
}
// TODO this isn't used now, but we may still want to extract locality from the v1 EnspointSlice instead of node
func getLocalityFromTopology(topology map[string]string) string {
locality := topology[NodeRegionLabelGA]
if _, f := topology[NodeZoneLabelGA]; f {
locality += "/" + topology[NodeZoneLabelGA]
}
if _, f := topology[label.TopologySubzone.Name]; f {
locality += "/" + topology[label.TopologySubzone.Name]
}
return locality
}
// endpointKey unique identifies an endpoint by IP and port name
// This is used for deduping endpoints across slices.
type endpointKey struct {
ip string
port string
}
type endpointSliceCache struct {
mu sync.RWMutex
endpointsByServiceAndSlice map[host.Name]map[string][]*model.IstioEndpoint
}
func newEndpointSliceCache() *endpointSliceCache {
out := &endpointSliceCache{
endpointsByServiceAndSlice: make(map[host.Name]map[string][]*model.IstioEndpoint),
}
return out
}
func (e *endpointSliceCache) Update(hostname host.Name, slice string, endpoints []*model.IstioEndpoint) {
e.mu.Lock()
defer e.mu.Unlock()
if len(endpoints) == 0 {
delete(e.endpointsByServiceAndSlice[hostname], slice)
}
if _, f := e.endpointsByServiceAndSlice[hostname]; !f {
e.endpointsByServiceAndSlice[hostname] = make(map[string][]*model.IstioEndpoint)
}
// We will always overwrite. A conflict here means an endpoint is transitioning
// from one slice to another See
// https://github.com/kubernetes/website/blob/master/content/en/docs/concepts/services-networking/endpoint-slices.md#duplicate-endpoints
// In this case, we can always assume and update is fresh, although older slices
// we have not gotten updates may be stale; therefor we always take the new
// update.
e.endpointsByServiceAndSlice[hostname][slice] = endpoints
}
func (e *endpointSliceCache) Delete(hostname host.Name, slice string) {
e.mu.Lock()
defer e.mu.Unlock()
delete(e.endpointsByServiceAndSlice[hostname], slice)
if len(e.endpointsByServiceAndSlice[hostname]) == 0 {
delete(e.endpointsByServiceAndSlice, hostname)
}
}
func (e *endpointSliceCache) Get(hostname host.Name) []*model.IstioEndpoint {
e.mu.RLock()
defer e.mu.RUnlock()
var endpoints []*model.IstioEndpoint
found := map[endpointKey]struct{}{}
for _, eps := range e.endpointsByServiceAndSlice[hostname] {
for _, ep := range eps {
key := endpointKey{ep.Address, ep.ServicePortName}
if _, f := found[key]; f {
// This a duplicate. Update() already handles conflict resolution, so we don't
// need to pick the "right" one here.
continue
}
found[key] = struct{}{}
endpoints = append(endpoints, ep)
}
}
return endpoints
}
func (e *endpointSliceCache) Has(hostname host.Name) bool {
e.mu.RLock()
defer e.mu.RUnlock()
_, found := e.endpointsByServiceAndSlice[hostname]
return found
}
func endpointSliceSelectorForService(name string) klabels.Selector {
return klabels.Set(map[string]string{
v1beta1.LabelServiceName: name,
}).AsSelectorPreValidated().Add(*endpointSliceRequirement)
}
func wrapEndpointSlice(slice interface{}) *endpointSliceWrapper {
switch es := slice.(type) {
case *v1.EndpointSlice:
return &endpointSliceWrapper{ObjectMeta: es.ObjectMeta, v1: es}
case *v1beta1.EndpointSlice:
return &endpointSliceWrapper{ObjectMeta: es.ObjectMeta, v1beta1: es}
}
return nil
}
type endpointSliceWrapper struct {
metav1.ObjectMeta
v1beta1 *v1beta1.EndpointSlice
v1 *v1.EndpointSlice
}
func (esw *endpointSliceWrapper) AddressType() v1.AddressType {
if esw.v1 != nil {
return esw.v1.AddressType
}
return v1.AddressType(esw.v1beta1.AddressType)
}
func (esw *endpointSliceWrapper) Ports() []v1.EndpointPort {
if esw.v1 != nil {
return esw.v1.Ports
}
out := make([]v1.EndpointPort, len(esw.v1beta1.Ports))
for i, p := range esw.v1beta1.Ports {
out[i] = v1.EndpointPort{
Name: p.Name,
Protocol: p.Protocol,
Port: p.Port,
AppProtocol: p.AppProtocol,
}
}
return out
}
func (esw *endpointSliceWrapper) Endpoints() []v1.Endpoint {
if esw.v1 != nil {
return esw.v1.Endpoints
}
out := make([]v1.Endpoint, len(esw.v1beta1.Endpoints))
for i, ep := range esw.v1beta1.Endpoints {
zone := ep.Topology[NodeZoneLabelGA]
var fz []v1.ForZone
if ep.Hints != nil {
fz = make([]v1.ForZone, len(ep.Hints.ForZones))
for i, el := range fz {
fz[i] = v1.ForZone{Name: el.Name}
}
}
out[i] = v1.Endpoint{
Addresses: ep.Addresses,
Conditions: v1.EndpointConditions{
Ready: ep.Conditions.Ready,
Serving: ep.Conditions.Serving,
Terminating: ep.Conditions.Serving,
},
Hostname: ep.Hostname,
TargetRef: ep.TargetRef,
DeprecatedTopology: ep.Topology,
NodeName: ep.NodeName,
Zone: &zone,
Hints: &v1.EndpointHints{
ForZones: fz,
},
}
}
return out
}