blob: fc7a24dbeaae107610ae64844de8a56f894af5bb [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package serviceentry
import (
"fmt"
"reflect"
"strconv"
"sync"
"time"
)
import (
networking "istio.io/api/networking/v1alpha3"
istiolog "istio.io/pkg/log"
"k8s.io/apimachinery/pkg/types"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model/status"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/util/workloadinstances"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/util/informermetric"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/constants"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/network"
"github.com/apache/dubbo-go-pixiu/pkg/queue"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
)
var (
_ serviceregistry.Instance = &Controller{}
log = istiolog.RegisterScope("serviceentry", "ServiceEntry registry", 0)
)
// instancesKey acts as a key to identify all instances for a given hostname/namespace pair
// This is mostly used as an index
type instancesKey struct {
hostname host.Name
namespace string
}
func makeInstanceKey(i *model.ServiceInstance) instancesKey {
return instancesKey{i.Service.Hostname, i.Service.Attributes.Namespace}
}
type externalConfigType int
const (
serviceEntryConfigType externalConfigType = iota
workloadEntryConfigType
podConfigType
)
// configKey unique identifies a config object managed by this registry (ServiceEntry and WorkloadEntry)
type configKey struct {
kind externalConfigType
name string
namespace string
}
// Controller communicates with ServiceEntry CRDs and monitors for changes.
type Controller struct {
XdsUpdater model.XDSUpdater
store model.ConfigStore
clusterID cluster.ID
// This lock is to make multi ops on the below stores. For example, in some case,
// it requires delete all instances and then update new ones.
mutex sync.RWMutex
serviceInstances serviceInstancesStore
// NOTE: historically, one index for both WorkloadEntry(s) and Pod(s);
// beware of naming collisions
workloadInstances workloadinstances.Index
services serviceStore
// to make sure the eds update run in serial to prevent stale ones can override new ones
// There are multiple threads calling edsUpdate.
// If all share one lock, then all the threads can have an obvious performance downgrade.
edsQueue queue.Instance
workloadHandlers []func(*model.WorkloadInstance, model.Event)
// callback function used to get the networkID according to workload ip and labels.
networkIDCallback func(IP string, labels labels.Instance) network.ID
processServiceEntry bool
model.NetworkGatewaysHandler
}
type Option func(*Controller)
func WithClusterID(clusterID cluster.ID) Option {
return func(o *Controller) {
o.clusterID = clusterID
}
}
func WithNetworkIDCb(cb func(endpointIP string, labels labels.Instance) network.ID) Option {
return func(o *Controller) {
o.networkIDCallback = cb
}
}
// NewController creates a new ServiceEntry discovery service.
func NewController(configController model.ConfigStoreController, store model.ConfigStore, xdsUpdater model.XDSUpdater,
options ...Option) *Controller {
s := newController(store, xdsUpdater, options...)
if configController != nil {
configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)
configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
_ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID))
}
return s
}
// NewWorkloadEntryController creates a new WorkloadEntry discovery service.
func NewWorkloadEntryController(configController model.ConfigStoreController, store model.ConfigStore, xdsUpdater model.XDSUpdater,
options ...Option) *Controller {
s := newController(store, xdsUpdater, options...)
// Disable service entry processing for workload entry controller.
s.processServiceEntry = false
for _, o := range options {
o(s)
}
if configController != nil {
configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
_ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID))
}
return s
}
func newController(store model.ConfigStore, xdsUpdater model.XDSUpdater, options ...Option) *Controller {
s := &Controller{
XdsUpdater: xdsUpdater,
store: store,
serviceInstances: serviceInstancesStore{
ip2instance: map[string][]*model.ServiceInstance{},
instances: map[instancesKey]map[configKey][]*model.ServiceInstance{},
instancesBySE: map[types.NamespacedName]map[configKey][]*model.ServiceInstance{},
},
workloadInstances: workloadinstances.NewIndex(),
services: serviceStore{
servicesBySE: map[types.NamespacedName][]*model.Service{},
},
edsQueue: queue.NewQueue(time.Second),
processServiceEntry: true,
}
for _, o := range options {
o(s)
}
return s
}
// convertWorkloadEntry convert wle from Config.Spec and populate the metadata labels into it.
func convertWorkloadEntry(cfg config.Config) *networking.WorkloadEntry {
wle := cfg.Spec.(*networking.WorkloadEntry)
if wle == nil {
return nil
}
labels := make(map[string]string, len(wle.Labels)+len(cfg.Labels))
for k, v := range wle.Labels {
labels[k] = v
}
// we will merge labels from metadata with spec, with precedence to the metadata
for k, v := range cfg.Labels {
labels[k] = v
}
// shallow copy
copied := &networking.WorkloadEntry{}
protomarshal.ShallowCopy(copied, wle)
copied.Labels = labels
return copied
}
// workloadEntryHandler defines the handler for workload entries
func (s *Controller) workloadEntryHandler(old, curr config.Config, event model.Event) {
log.Debugf("Handle event %s for workload entry %s/%s", event, curr.Namespace, curr.Name)
var oldWle *networking.WorkloadEntry
if old.Spec != nil {
oldWle = convertWorkloadEntry(old)
}
wle := convertWorkloadEntry(curr)
curr.Spec = wle
key := configKey{
kind: workloadEntryConfigType,
name: curr.Name,
namespace: curr.Namespace,
}
// If an entry is unhealthy, we will mark this as a delete instead
// This ensures we do not track unhealthy endpoints
if features.WorkloadEntryHealthChecks && !isHealthy(curr) {
event = model.EventDelete
}
wi := s.convertWorkloadEntryToWorkloadInstance(curr, s.Cluster())
if wi != nil && !wi.DNSServiceEntryOnly {
// fire off the k8s handlers
for _, h := range s.workloadHandlers {
h(wi, event)
}
}
// includes instances new updated or unchanged, in other word it is the current state.
instancesUpdated := []*model.ServiceInstance{}
instancesDeleted := []*model.ServiceInstance{}
fullPush := false
configsUpdated := map[model.ConfigKey]struct{}{}
addConfigs := func(se *networking.ServiceEntry, services []*model.Service) {
// If serviceentry's resolution is DNS, make a full push
// TODO: maybe cds?
if se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
fullPush = true
for key, value := range getUpdatedConfigs(services) {
configsUpdated[key] = value
}
}
}
cfgs, _ := s.store.List(gvk.ServiceEntry, curr.Namespace)
currSes := getWorkloadServiceEntries(cfgs, wle)
var oldSes map[types.NamespacedName]*config.Config
if oldWle != nil {
if labels.Instance(oldWle.Labels).Equals(curr.Labels) {
oldSes = currSes
} else {
oldSes = getWorkloadServiceEntries(cfgs, oldWle)
}
}
unSelected := difference(oldSes, currSes)
log.Debugf("workloadEntry %s/%s selected %v, unSelected %v serviceEntry", curr.Namespace, curr.Name, currSes, unSelected)
s.mutex.Lock()
for namespacedName, cfg := range currSes {
services := s.services.getServices(namespacedName)
se := cfg.Spec.(*networking.ServiceEntry)
if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
continue
}
instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
instancesUpdated = append(instancesUpdated, instance...)
addConfigs(se, services)
}
for _, namespacedName := range unSelected {
services := s.services.getServices(namespacedName)
cfg := oldSes[namespacedName]
se := cfg.Spec.(*networking.ServiceEntry)
if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
continue
}
instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
instancesDeleted = append(instancesDeleted, instance...)
addConfigs(se, services)
}
s.serviceInstances.deleteInstances(key, instancesDeleted)
if event == model.EventDelete {
s.workloadInstances.Delete(wi)
s.serviceInstances.deleteInstances(key, instancesUpdated)
} else {
s.workloadInstances.Insert(wi)
s.serviceInstances.updateInstances(key, instancesUpdated)
}
s.mutex.Unlock()
allInstances := append(instancesUpdated, instancesDeleted...)
if !fullPush {
// trigger full xds push to the related sidecar proxy
if event == model.EventAdd {
s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
}
s.edsUpdate(allInstances)
return
}
// update eds cache only
s.edsCacheUpdate(allInstances)
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: configsUpdated,
Reason: []model.TriggerReason{model.EndpointUpdate},
}
// trigger a full push
s.XdsUpdater.ConfigUpdate(pushReq)
}
// getUpdatedConfigs returns related service entries when full push
func getUpdatedConfigs(services []*model.Service) map[model.ConfigKey]struct{} {
configsUpdated := map[model.ConfigKey]struct{}{}
for _, svc := range services {
configsUpdated[model.ConfigKey{
Kind: gvk.ServiceEntry,
Name: string(svc.Hostname),
Namespace: svc.Attributes.Namespace,
}] = struct{}{}
}
return configsUpdated
}
// serviceEntryHandler defines the handler for service entries
func (s *Controller) serviceEntryHandler(_, curr config.Config, event model.Event) {
currentServiceEntry := curr.Spec.(*networking.ServiceEntry)
cs := convertServices(curr)
configsUpdated := map[model.ConfigKey]struct{}{}
key := types.NamespacedName{Namespace: curr.Namespace, Name: curr.Name}
s.mutex.Lock()
// If it is add/delete event we should always do a full push. If it is update event, we should do full push,
// only when services have changed - otherwise, just push endpoint updates.
var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service
switch event {
case model.EventUpdate:
addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(s.services.getServices(key), cs)
s.services.updateServices(key, cs)
case model.EventDelete:
deletedSvcs = cs
s.services.deleteServices(key)
case model.EventAdd:
addedSvcs = cs
s.services.updateServices(key, cs)
default:
// this should not happen
unchangedSvcs = cs
}
serviceInstancesByConfig, serviceInstances := s.buildServiceInstances(curr, cs)
oldInstances := s.serviceInstances.getServiceEntryInstances(key)
for configKey, old := range oldInstances {
s.serviceInstances.deleteInstances(configKey, old)
}
if event == model.EventDelete {
s.serviceInstances.deleteAllServiceEntryInstances(key)
} else {
// Update the indexes with new instances.
for ckey, value := range serviceInstancesByConfig {
s.serviceInstances.addInstances(ckey, value)
}
s.serviceInstances.updateServiceEntryInstances(key, serviceInstancesByConfig)
}
shard := model.ShardKeyFromRegistry(s)
for _, svc := range addedSvcs {
s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd)
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
for _, svc := range updatedSvcs {
s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventUpdate)
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
// If service entry is deleted, call SvcUpdate to cleanup endpoint shards for services.
for _, svc := range deletedSvcs {
instanceKey := instancesKey{namespace: svc.Attributes.Namespace, hostname: svc.Hostname}
// There can be multiple service entries of same host reside in same namespace.
// Delete endpoint shards only if there are no service instances.
if len(s.serviceInstances.getByKey(instanceKey)) == 0 {
s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)
}
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
// If a service is updated and is not part of updatedSvcs, that means its endpoints might have changed.
// If this service entry had endpoints with IPs (i.e. resolution STATIC), then we do EDS update.
// If the service entry had endpoints with FQDNs (i.e. resolution DNS), then we need to do
// full push (as fqdn endpoints go via strict_dns clusters in cds).
if len(unchangedSvcs) > 0 {
if currentServiceEntry.Resolution == networking.ServiceEntry_DNS || currentServiceEntry.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
for _, svc := range unchangedSvcs {
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
}
}
s.mutex.Unlock()
fullPush := len(configsUpdated) > 0
// if not full push needed, at least one service unchanged
if !fullPush {
s.edsUpdate(serviceInstances)
return
}
// When doing a full push, the non DNS added, updated, unchanged services trigger an eds update
// so that endpoint shards are updated.
allServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))
nonDNSServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))
allServices = append(allServices, addedSvcs...)
allServices = append(allServices, updatedSvcs...)
allServices = append(allServices, unchangedSvcs...)
for _, svc := range allServices {
if !(svc.Resolution == model.DNSLB || svc.Resolution == model.DNSRoundRobinLB) {
nonDNSServices = append(nonDNSServices, svc)
}
}
// non dns service instances
keys := map[instancesKey]struct{}{}
for _, svc := range nonDNSServices {
keys[instancesKey{hostname: svc.Hostname, namespace: curr.Namespace}] = struct{}{}
}
s.queueEdsEvent(keys, s.doEdsCacheUpdate)
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: configsUpdated,
Reason: []model.TriggerReason{model.ServiceUpdate},
}
s.XdsUpdater.ConfigUpdate(pushReq)
}
// WorkloadInstanceHandler defines the handler for service instances generated by other registries
func (s *Controller) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) {
log.Debugf("Handle event %s for workload instance (%s/%s) in namespace %s", event,
wi.Kind, wi.Endpoint.Address, wi.Namespace)
key := configKey{
kind: podConfigType,
name: wi.Name,
namespace: wi.Namespace,
}
// Used to indicate if this event was fired for a pod->workloadentry conversion
// and that the event can be ignored due to no relevant change in the workloadentry
redundantEventForPod := false
var addressToDelete string
s.mutex.Lock()
// this is from a pod. Store it in separate map so that
// the refreshIndexes function can use these as well as the store ones.
switch event {
case model.EventDelete:
redundantEventForPod = s.workloadInstances.Delete(wi) == nil
default: // add or update
if old := s.workloadInstances.Insert(wi); old != nil {
if old.Endpoint.Address != wi.Endpoint.Address {
addressToDelete = old.Endpoint.Address
}
// If multiple k8s services select the same pod or a service has multiple ports,
// we may be getting multiple events ignore them as we only care about the Endpoint IP itself.
if model.WorkloadInstancesEqual(old, wi) {
// ignore the update as nothing has changed
redundantEventForPod = true
}
}
}
if redundantEventForPod {
s.mutex.Unlock()
return
}
// We will only select entries in the same namespace
cfgs, _ := s.store.List(gvk.ServiceEntry, wi.Namespace)
if len(cfgs) == 0 {
s.mutex.Unlock()
return
}
instances := []*model.ServiceInstance{}
instancesDeleted := []*model.ServiceInstance{}
for _, cfg := range cfgs {
se := cfg.Spec.(*networking.ServiceEntry)
if se.WorkloadSelector == nil || !labels.Instance(se.WorkloadSelector.Labels).SubsetOf(wi.Endpoint.Labels) {
// Not a match, skip this one
continue
}
seNamespacedName := types.NamespacedName{Namespace: cfg.Namespace, Name: cfg.Name}
services := s.services.getServices(seNamespacedName)
instance := convertWorkloadInstanceToServiceInstance(wi.Endpoint, services, se)
instances = append(instances, instance...)
if addressToDelete != "" {
for _, i := range instance {
di := i.DeepCopy()
di.Endpoint.Address = addressToDelete
instancesDeleted = append(instancesDeleted, di)
}
s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
} else if event == model.EventDelete {
s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
} else {
s.serviceInstances.updateServiceEntryInstancesPerConfig(seNamespacedName, key, instance)
}
}
if len(instancesDeleted) > 0 {
s.serviceInstances.deleteInstances(key, instancesDeleted)
}
if event == model.EventDelete {
s.serviceInstances.deleteInstances(key, instances)
} else {
s.serviceInstances.updateInstances(key, instances)
}
s.mutex.Unlock()
s.edsUpdate(instances)
}
func (s *Controller) Provider() provider.ID {
return provider.External
}
func (s *Controller) Cluster() cluster.ID {
return s.clusterID
}
// AppendServiceHandler adds service resource event handler. Service Entries does not use these handlers.
func (s *Controller) AppendServiceHandler(_ func(*model.Service, model.Event)) {}
// AppendWorkloadHandler adds instance event handler. Service Entries does not use these handlers.
func (s *Controller) AppendWorkloadHandler(h func(*model.WorkloadInstance, model.Event)) {
s.workloadHandlers = append(s.workloadHandlers, h)
}
// Run is used by some controllers to execute background jobs after init is done.
func (s *Controller) Run(stopCh <-chan struct{}) {
s.edsQueue.Run(stopCh)
}
// HasSynced always returns true for SE
func (s *Controller) HasSynced() bool {
return true
}
// Services list declarations of all services in the system
func (s *Controller) Services() []*model.Service {
s.mutex.Lock()
allServices := s.services.getAllServices()
out := make([]*model.Service, 0, len(allServices))
if s.services.allocateNeeded {
autoAllocateIPs(allServices)
s.services.allocateNeeded = false
}
s.mutex.Unlock()
for _, svc := range allServices {
// shallow copy, copy `AutoAllocatedIPv4Address` and `AutoAllocatedIPv6Address`
// if return the pointer directly, there will be a race with `BuildNameTable`
// nolint: govet
shallowSvc := *svc
out = append(out, &shallowSvc)
}
return out
}
// GetService retrieves a service by host name if it exists.
// NOTE: The service entry implementation is used only for tests.
func (s *Controller) GetService(hostname host.Name) *model.Service {
if !s.processServiceEntry {
return nil
}
// TODO(@hzxuzhonghu): only get the specific service instead of converting all the serviceEntries
services := s.Services()
for _, service := range services {
if service.Hostname == hostname {
return service
}
}
return nil
}
// InstancesByPort retrieves instances for a service on the given ports with labels that
// match any of the supplied labels. All instances match an empty tag list.
func (s *Controller) InstancesByPort(svc *model.Service, port int, labels labels.Instance) []*model.ServiceInstance {
out := make([]*model.ServiceInstance, 0)
s.mutex.RLock()
instanceLists := s.serviceInstances.getByKey(instancesKey{svc.Hostname, svc.Attributes.Namespace})
s.mutex.RUnlock()
for _, instance := range instanceLists {
if labels.SubsetOf(instance.Endpoint.Labels) &&
portMatchSingle(instance, port) {
out = append(out, instance)
}
}
return out
}
// ResyncEDS will do a full EDS update. This is needed for some tests where we have many configs loaded without calling
// the config handlers.
// This should probably not be used in production code.
func (s *Controller) ResyncEDS() {
s.mutex.RLock()
allInstances := s.serviceInstances.getAll()
s.mutex.RUnlock()
s.edsUpdate(allInstances)
}
// edsUpdate triggers an EDS push serially such that we can prevent all instances
// got at t1 can accidentally override that got at t2 if multiple threads are
// running this function. Queueing ensures latest updated wins.
func (s *Controller) edsUpdate(instances []*model.ServiceInstance) {
// Find all keys we need to lookup
keys := map[instancesKey]struct{}{}
for _, i := range instances {
keys[makeInstanceKey(i)] = struct{}{}
}
s.queueEdsEvent(keys, s.doEdsUpdate)
}
// edsCacheUpdate upates eds cache serially such that we can prevent allinstances
// got at t1 can accidentally override that got at t2 if multiple threads are
// running this function. Queueing ensures latest updated wins.
func (s *Controller) edsCacheUpdate(instances []*model.ServiceInstance) {
// Find all keys we need to lookup
keys := map[instancesKey]struct{}{}
for _, i := range instances {
keys[makeInstanceKey(i)] = struct{}{}
}
s.queueEdsEvent(keys, s.doEdsCacheUpdate)
}
// queueEdsEvent processes eds events sequentially for the passed keys and invokes the passed function.
func (s *Controller) queueEdsEvent(keys map[instancesKey]struct{}, edsFn func(keys map[instancesKey]struct{})) {
// wait for the cache update finished
waitCh := make(chan struct{})
// trigger update eds endpoint shards
s.edsQueue.Push(func() error {
defer close(waitCh)
edsFn(keys)
return nil
})
select {
case <-waitCh:
return
// To prevent goroutine leak in tests
// in case the queue is stopped but the task has not been executed..
case <-s.edsQueue.Closed():
return
}
}
// doEdsCacheUpdate invokes XdsUpdater's EDSCacheUpdate to update endpoint shards.
func (s *Controller) doEdsCacheUpdate(keys map[instancesKey]struct{}) {
endpoints := s.buildEndpoints(keys)
shard := model.ShardKeyFromRegistry(s)
// This is delete.
if len(endpoints) == 0 {
for k := range keys {
s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, nil)
}
} else {
for k, eps := range endpoints {
s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, eps)
}
}
}
// doEdsUpdate invokes XdsUpdater's eds update to trigger eds push.
func (s *Controller) doEdsUpdate(keys map[instancesKey]struct{}) {
endpoints := s.buildEndpoints(keys)
shard := model.ShardKeyFromRegistry(s)
// This is delete.
if len(endpoints) == 0 {
for k := range keys {
s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, nil)
}
} else {
for k, eps := range endpoints {
s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, eps)
}
}
}
// buildEndpoints builds endpoints for the instance keys.
func (s *Controller) buildEndpoints(keys map[instancesKey]struct{}) map[instancesKey][]*model.IstioEndpoint {
var endpoints map[instancesKey][]*model.IstioEndpoint
allInstances := []*model.ServiceInstance{}
s.mutex.RLock()
for key := range keys {
i := s.serviceInstances.getByKey(key)
allInstances = append(allInstances, i...)
}
s.mutex.RUnlock()
if len(allInstances) > 0 {
endpoints = make(map[instancesKey][]*model.IstioEndpoint)
for _, instance := range allInstances {
port := instance.ServicePort
key := makeInstanceKey(instance)
endpoints[key] = append(endpoints[key],
&model.IstioEndpoint{
Address: instance.Endpoint.Address,
EndpointPort: instance.Endpoint.EndpointPort,
ServicePortName: port.Name,
Labels: instance.Endpoint.Labels,
ServiceAccount: instance.Endpoint.ServiceAccount,
Network: instance.Endpoint.Network,
Locality: instance.Endpoint.Locality,
LbWeight: instance.Endpoint.LbWeight,
TLSMode: instance.Endpoint.TLSMode,
WorkloadName: instance.Endpoint.WorkloadName,
Namespace: instance.Endpoint.Namespace,
})
}
}
return endpoints
}
// returns true if an instance's port matches with any in the provided list
func portMatchSingle(instance *model.ServiceInstance, port int) bool {
return port == 0 || port == instance.ServicePort.Port
}
// GetProxyServiceInstances lists service instances co-located with a given proxy
// NOTE: The service objects in these instances do not have the auto allocated IP set.
func (s *Controller) GetProxyServiceInstances(node *model.Proxy) []*model.ServiceInstance {
out := make([]*model.ServiceInstance, 0)
s.mutex.RLock()
defer s.mutex.RUnlock()
for _, ip := range node.IPAddresses {
instances := s.serviceInstances.getByIP(ip)
for _, i := range instances {
// Insert all instances for this IP for services within the same namespace This ensures we
// match Kubernetes logic where Services do not cross namespace boundaries and avoids
// possibility of other namespaces inserting service instances into namespaces they do not
// control.
if node.Metadata.Namespace == "" || i.Service.Attributes.Namespace == node.Metadata.Namespace {
out = append(out, i)
}
}
}
return out
}
func (s *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
s.mutex.RLock()
defer s.mutex.RUnlock()
for _, ip := range proxy.IPAddresses {
instances := s.serviceInstances.getByIP(ip)
for _, instance := range instances {
return instance.Endpoint.Labels
}
}
return nil
}
// GetIstioServiceAccounts implements model.ServiceAccounts operation
// For service entries using workload entries or mix of workload entries and pods,
// this function returns the appropriate service accounts used by these.
func (s *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string {
// service entries with built in endpoints have SANs as a dedicated field.
// Those with selector labels will have service accounts embedded inside workloadEntries and pods as well.
return model.GetServiceAccounts(svc, ports, s)
}
func (s *Controller) NetworkGateways() []model.NetworkGateway {
// TODO implement mesh networks loading logic from kube controller if needed
return nil
}
func (s *Controller) MCSServices() []model.MCSServiceInfo {
return nil
}
func servicesDiff(os []*model.Service, ns []*model.Service) ([]*model.Service, []*model.Service, []*model.Service, []*model.Service) {
var added, deleted, updated, unchanged []*model.Service
oldServiceHosts := make(map[host.Name]*model.Service, len(os))
newServiceHosts := make(map[host.Name]*model.Service, len(ns))
for _, s := range os {
oldServiceHosts[s.Hostname] = s
}
for _, s := range ns {
newServiceHosts[s.Hostname] = s
}
for _, s := range os {
newSvc, f := newServiceHosts[s.Hostname]
if !f {
deleted = append(deleted, s)
} else if !reflect.DeepEqual(s, newSvc) {
updated = append(updated, newSvc)
} else {
unchanged = append(unchanged, newSvc)
}
}
for _, s := range ns {
if _, f := oldServiceHosts[s.Hostname]; !f {
added = append(added, s)
}
}
return added, deleted, updated, unchanged
}
// Automatically allocates IPs for service entry services WITHOUT an
// address field if the hostname is not a wildcard, or when resolution
// is not NONE. The IPs are allocated from the reserved Class E subnet
// (240.240.0.0/16) that is not reachable outside the pod or reserved
// Benchmarking IP range (2001:2::/48) in RFC5180. When DNS
// capture is enabled, Envoy will resolve the DNS to these IPs. The
// listeners for TCP services will also be set up on these IPs. The
// IPs allocated to a service entry may differ from istiod to istiod
// but it does not matter because these IPs only affect the listener
// IPs on a given proxy managed by a given istiod.
//
// NOTE: If DNS capture is not enabled by the proxy, the automatically
// allocated IP addresses do not take effect.
//
// The current algorithm to allocate IPs is deterministic across all istiods.
// At stable state, given two istiods with exact same set of services, there should
// be no change in XDS as the algorithm is just a dumb iterative one that allocates sequentially.
//
// TODO: Rather than sequentially allocate IPs, switch to a hash based allocation mechanism so that
// deletion of the oldest service entry does not cause change of IPs for all other service entries.
// Currently, the sequential allocation will result in unnecessary XDS reloads (lds/rds) when a
// service entry with auto allocated IP is deleted. We are trading off a perf problem (xds reload)
// for a usability problem (e.g., multiple cloud SQL or AWS RDS tcp services with no VIPs end up having
// the same port, causing traffic to go to the wrong place). Once we move to a deterministic hash-based
// allocation with deterministic collision resolution, the perf problem will go away. If the collision guarantee
// cannot be made within the IP address space we have (which is about 64K services), then we may need to
// have the sequential allocation algorithm as a fallback when too many collisions take place.
func autoAllocateIPs(services []*model.Service) []*model.Service {
// i is everything from 240.240.0.(j) to 240.240.255.(j)
// j is everything from 240.240.(i).1 to 240.240.(i).254
// we can capture this in one integer variable.
// given X, we can compute i by X/255, and j is X%255
// To avoid allocating 240.240.(i).255, if X % 255 is 0, increment X.
// For example, when X=510, the resulting IP would be 240.240.2.0 (invalid)
// So we bump X to 511, so that the resulting IP is 240.240.2.1
maxIPs := 255 * 255 // are we going to exceed this limit by processing 64K services?
x := 0
for _, svc := range services {
// we can allocate IPs only if
// 1. the service has resolution set to static/dns. We cannot allocate
// for NONE because we will not know the original DST IP that the application requested.
// 2. the address is not set (0.0.0.0)
// 3. the hostname is not a wildcard
if svc.DefaultAddress == constants.UnspecifiedIP && !svc.Hostname.IsWildCarded() &&
svc.Resolution != model.Passthrough {
x++
if x%255 == 0 {
x++
}
if x >= maxIPs {
log.Errorf("out of IPs to allocate for service entries")
return services
}
thirdOctet := x / 255
fourthOctet := x % 255
svc.AutoAllocatedIPv4Address = fmt.Sprintf("240.240.%d.%d", thirdOctet, fourthOctet)
// if the service of service entry has IPv6 address, then allocate the IPv4-Mapped IPv6 Address for it
if thirdOctet == 0 {
svc.AutoAllocatedIPv6Address = fmt.Sprintf("2001:2::f0f0:%x", fourthOctet)
} else {
svc.AutoAllocatedIPv6Address = fmt.Sprintf("2001:2::f0f0:%x%x", thirdOctet, fourthOctet)
}
}
}
return services
}
func makeConfigKey(svc *model.Service) model.ConfigKey {
return model.ConfigKey{
Kind: gvk.ServiceEntry,
Name: string(svc.Hostname),
Namespace: svc.Attributes.Namespace,
}
}
// isHealthy checks that the provided WorkloadEntry is healthy. If health checks are not enabled,
// it is assumed to always be healthy
func isHealthy(cfg config.Config) bool {
if parseHealthAnnotation(cfg.Annotations[status.WorkloadEntryHealthCheckAnnotation]) {
// We default to false if the condition is not set. This ensures newly created WorkloadEntries
// are treated as unhealthy until we prove they are healthy by probe success.
return status.GetBoolConditionFromSpec(cfg, status.ConditionHealthy, false)
}
// If health check is not enabled, assume its healthy
return true
}
func parseHealthAnnotation(s string) bool {
if s == "" {
return false
}
p, err := strconv.ParseBool(s)
if err != nil {
return false
}
return p
}
func (s *Controller) buildServiceInstances(
curr config.Config,
services []*model.Service,
) (map[configKey][]*model.ServiceInstance, []*model.ServiceInstance) {
currentServiceEntry := curr.Spec.(*networking.ServiceEntry)
var serviceInstances []*model.ServiceInstance
serviceInstancesByConfig := map[configKey][]*model.ServiceInstance{}
// for service entry with labels
if currentServiceEntry.WorkloadSelector != nil {
selector := workloadinstances.ByServiceSelector(curr.Namespace, currentServiceEntry.WorkloadSelector.Labels)
workloadInstances := workloadinstances.FindAllInIndex(s.workloadInstances, selector)
for _, wi := range workloadInstances {
if wi.DNSServiceEntryOnly && currentServiceEntry.Resolution != networking.ServiceEntry_DNS &&
currentServiceEntry.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name,
currentServiceEntry.Hosts)
continue
}
instances := convertWorkloadInstanceToServiceInstance(wi.Endpoint, services, currentServiceEntry)
serviceInstances = append(serviceInstances, instances...)
ckey := configKey{namespace: wi.Namespace, name: wi.Name}
if wi.Kind == model.PodKind {
ckey.kind = podConfigType
} else {
ckey.kind = workloadEntryConfigType
}
serviceInstancesByConfig[ckey] = instances
}
} else {
serviceInstances = s.convertServiceEntryToInstances(curr, services)
ckey := configKey{
kind: serviceEntryConfigType,
name: curr.Name,
namespace: curr.Namespace,
}
serviceInstancesByConfig[ckey] = serviceInstances
}
return serviceInstancesByConfig, serviceInstances
}