blob: 9ac7ea6d66a527872722de844a5dd9c9948beaa3 [file] [log] [blame]
// Copyright Istio Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
v1 ""
metav1 ""
klabels ""
listerv1 ""
import (
type endpointsController struct {
var _ kubeEndpointsController = &endpointsController{}
func newEndpointsController(c *Controller) *endpointsController {
informer := filter.NewFilteredSharedIndexInformer(
out := &endpointsController{
kubeEndpoints: kubeEndpoints{
c: c,
informer: informer,
c.registerHandlers(informer, "Endpoints", out.onEvent, endpointsEqual)
return out
func (e *endpointsController) GetProxyServiceInstances(c *Controller, proxy *model.Proxy) []*model.ServiceInstance {
eps, err := listerv1.NewEndpointsLister(e.informer.GetIndexer()).Endpoints(proxy.Metadata.Namespace).List(klabels.Everything())
if err != nil {
log.Errorf("Get endpoints by index failed: %v", err)
return nil
var out []*model.ServiceInstance
for _, ep := range eps {
instances := endpointServiceInstances(c, ep, proxy)
out = append(out, instances...)
return out
func endpointServiceInstances(c *Controller, endpoints *v1.Endpoints, proxy *model.Proxy) []*model.ServiceInstance {
var out []*model.ServiceInstance
for _, svc := range c.servicesForNamespacedName(kube.NamespacedNameForK8sObject(endpoints)) {
pod := c.pods.getPodByProxy(proxy)
builder := NewEndpointBuilder(c, pod)
discoverabilityPolicy := c.exports.EndpointDiscoverabilityPolicy(svc)
for _, ss := range endpoints.Subsets {
for _, port := range ss.Ports {
svcPort, exists := svc.Ports.Get(port.Name)
if !exists {
// consider multiple IP scenarios
for _, ip := range proxy.IPAddresses {
if hasProxyIP(ss.Addresses, ip) || hasProxyIP(ss.NotReadyAddresses, ip) {
istioEndpoint := builder.buildIstioEndpoint(ip, port.Port, svcPort.Name, discoverabilityPolicy)
out = append(out, &model.ServiceInstance{
Endpoint: istioEndpoint,
ServicePort: svcPort,
Service: svc,
if hasProxyIP(ss.NotReadyAddresses, ip) {
if c.opts.Metrics != nil {
c.opts.Metrics.AddMetric(model.ProxyStatusEndpointNotReady, proxy.ID, proxy.ID, "")
return out
func (e *endpointsController) InstancesByPort(c *Controller, svc *model.Service, reqSvcPort int, labels labels.Instance) []*model.ServiceInstance {
item, exists, err := e.informer.GetIndexer().GetByKey(kube.KeyFunc(svc.Attributes.Name, svc.Attributes.Namespace))
if err != nil {
log.Infof("get endpoints(%s, %s) => error %v", svc.Attributes.Name, svc.Attributes.Namespace, err)
return nil
if !exists {
return nil
discoverabilityPolicy := c.exports.EndpointDiscoverabilityPolicy(svc)
// Locate all ports in the actual service
svcPort, exists := svc.Ports.GetByPort(reqSvcPort)
if !exists {
return nil
ep := item.(*v1.Endpoints)
var out []*model.ServiceInstance
for _, ss := range ep.Subsets {
out = append(out, e.buildServiceInstances(ep, ss, ss.Addresses, svc, discoverabilityPolicy, labels, svcPort, model.Healthy)...)
if features.SendUnhealthyEndpoints {
out = append(out, e.buildServiceInstances(ep, ss, ss.NotReadyAddresses, svc, discoverabilityPolicy, labels, svcPort, model.UnHealthy)...)
return out
func (e *endpointsController) getInformer() filter.FilteredSharedIndexInformer {
return e.informer
func (e *endpointsController) onEvent(curr interface{}, event model.Event) error {
ep, ok := curr.(*v1.Endpoints)
if !ok {
tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("Couldn't get object from tombstone %#v", curr)
return nil
ep, ok = tombstone.Obj.(*v1.Endpoints)
if !ok {
log.Errorf("Tombstone contained object that is not an endpoints %#v", curr)
return nil
return processEndpointEvent(e.c, e, ep.Name, ep.Namespace, event, ep)
func (e *endpointsController) forgetEndpoint(endpoint interface{}) map[host.Name][]*model.IstioEndpoint {
ep := endpoint.(*v1.Endpoints)
key := kube.KeyFunc(ep.Name, ep.Namespace)
for _, ss := range ep.Subsets {
for _, ea := range ss.Addresses {
e.c.pods.endpointDeleted(key, ea.IP)
return make(map[host.Name][]*model.IstioEndpoint)
func (e *endpointsController) buildIstioEndpoints(endpoint interface{}, host host.Name) []*model.IstioEndpoint {
var endpoints []*model.IstioEndpoint
ep := endpoint.(*v1.Endpoints)
discoverabilityPolicy := e.c.exports.EndpointDiscoverabilityPolicy(e.c.GetService(host))
for _, ss := range ep.Subsets {
endpoints = append(endpoints, e.buildIstioEndpointFromAddress(ep, ss, ss.Addresses, host, discoverabilityPolicy, model.Healthy)...)
if features.SendUnhealthyEndpoints {
endpoints = append(endpoints, e.buildIstioEndpointFromAddress(ep, ss, ss.NotReadyAddresses, host, discoverabilityPolicy, model.UnHealthy)...)
return endpoints
func (e *endpointsController) buildServiceInstances(ep *v1.Endpoints, ss v1.EndpointSubset, endpoints []v1.EndpointAddress,
svc *model.Service, discoverabilityPolicy model.EndpointDiscoverabilityPolicy, lbls labels.Instance,
svcPort *model.Port, health model.HealthStatus) []*model.ServiceInstance {
var out []*model.ServiceInstance
for _, ea := range endpoints {
var podLabels labels.Instance
pod, expectedPod := getPod(e.c, ea.IP, &metav1.ObjectMeta{Name: ep.Name, Namespace: ep.Namespace}, ea.TargetRef, svc.Hostname)
if pod == nil && expectedPod {
if pod != nil {
podLabels = pod.Labels
// check that one of the input labels is a subset of the labels
if !lbls.SubsetOf(podLabels) {
builder := NewEndpointBuilder(e.c, pod)
// identify the port by name. K8S EndpointPort uses the service port name
for _, port := range ss.Ports {
if port.Name == "" || // 'name optional if single port is defined'
svcPort.Name == port.Name {
istioEndpoint := builder.buildIstioEndpoint(ea.IP, port.Port, svcPort.Name, discoverabilityPolicy)
istioEndpoint.HealthStatus = health
out = append(out, &model.ServiceInstance{
Endpoint: istioEndpoint,
ServicePort: svcPort,
Service: svc,
return out
func (e *endpointsController) buildIstioEndpointFromAddress(ep *v1.Endpoints, ss v1.EndpointSubset, endpoints []v1.EndpointAddress,
host host.Name, discoverabilityPolicy model.EndpointDiscoverabilityPolicy, health model.HealthStatus) []*model.IstioEndpoint {
var istioEndpoints []*model.IstioEndpoint
for _, ea := range endpoints {
pod, expectedPod := getPod(e.c, ea.IP, &metav1.ObjectMeta{Name: ep.Name, Namespace: ep.Namespace}, ea.TargetRef, host)
if pod == nil && expectedPod {
builder := NewEndpointBuilder(e.c, pod)
// EDS and ServiceEntry use name for service port - ADS will need to map to numbers.
for _, port := range ss.Ports {
istioEndpoint := builder.buildIstioEndpoint(ea.IP, port.Port, port.Name, discoverabilityPolicy)
istioEndpoint.HealthStatus = health
istioEndpoints = append(istioEndpoints, istioEndpoint)
return istioEndpoints
func (e *endpointsController) buildIstioEndpointsWithService(name, namespace string, host host.Name, _ bool) []*model.IstioEndpoint {
ep, err := listerv1.NewEndpointsLister(e.informer.GetIndexer()).Endpoints(namespace).Get(name)
if err != nil || ep == nil {
log.Debugf("endpoints(%s, %s) not found => error %v", name, namespace, err)
return nil
return e.buildIstioEndpoints(ep, host)
func (e *endpointsController) getServiceNamespacedName(ep interface{}) types.NamespacedName {
endpoint := ep.(*v1.Endpoints)
return kube.NamespacedNameForK8sObject(endpoint)
// endpointsEqual returns true if the two endpoints are the same in aspects Pilot cares about
// This currently means only looking at "Ready" endpoints
func endpointsEqual(first, second interface{}) bool {
a := first.(*v1.Endpoints)
b := second.(*v1.Endpoints)
if len(a.Subsets) != len(b.Subsets) {
return false
for i := range a.Subsets {
if !portsEqual(a.Subsets[i].Ports, b.Subsets[i].Ports) {
return false
if !addressesEqual(a.Subsets[i].Addresses, b.Subsets[i].Addresses) {
return false
return true