| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package serviceentry |
| |
| import ( |
| "fmt" |
| "reflect" |
| "strconv" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| networking "istio.io/api/networking/v1alpha3" |
| istiolog "istio.io/pkg/log" |
| "k8s.io/apimachinery/pkg/types" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/model/status" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/util/workloadinstances" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/util/informermetric" |
| "github.com/apache/dubbo-go-pixiu/pkg/cluster" |
| "github.com/apache/dubbo-go-pixiu/pkg/config" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/constants" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/host" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/labels" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk" |
| "github.com/apache/dubbo-go-pixiu/pkg/network" |
| "github.com/apache/dubbo-go-pixiu/pkg/queue" |
| "github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal" |
| ) |
| |
| var ( |
| _ serviceregistry.Instance = &Controller{} |
| log = istiolog.RegisterScope("serviceentry", "ServiceEntry registry", 0) |
| ) |
| |
| // instancesKey acts as a key to identify all instances for a given hostname/namespace pair |
| // This is mostly used as an index |
| type instancesKey struct { |
| hostname host.Name |
| namespace string |
| } |
| |
| func makeInstanceKey(i *model.ServiceInstance) instancesKey { |
| return instancesKey{i.Service.Hostname, i.Service.Attributes.Namespace} |
| } |
| |
| type externalConfigType int |
| |
| const ( |
| serviceEntryConfigType externalConfigType = iota |
| workloadEntryConfigType |
| podConfigType |
| ) |
| |
| // configKey unique identifies a config object managed by this registry (ServiceEntry and WorkloadEntry) |
| type configKey struct { |
| kind externalConfigType |
| name string |
| namespace string |
| } |
| |
| // Controller communicates with ServiceEntry CRDs and monitors for changes. |
| type Controller struct { |
| XdsUpdater model.XDSUpdater |
| |
| store model.ConfigStore |
| clusterID cluster.ID |
| |
| // This lock is to make multi ops on the below stores. For example, in some case, |
| // it requires delete all instances and then update new ones. |
| mutex sync.RWMutex |
| |
| serviceInstances serviceInstancesStore |
| // NOTE: historically, one index for both WorkloadEntry(s) and Pod(s); |
| // beware of naming collisions |
| workloadInstances workloadinstances.Index |
| services serviceStore |
| |
| // to make sure the eds update run in serial to prevent stale ones can override new ones |
| // There are multiple threads calling edsUpdate. |
| // If all share one lock, then all the threads can have an obvious performance downgrade. |
| edsQueue queue.Instance |
| |
| workloadHandlers []func(*model.WorkloadInstance, model.Event) |
| |
| // callback function used to get the networkID according to workload ip and labels. |
| networkIDCallback func(IP string, labels labels.Instance) network.ID |
| |
| processServiceEntry bool |
| |
| model.NetworkGatewaysHandler |
| } |
| |
| type Option func(*Controller) |
| |
| func WithClusterID(clusterID cluster.ID) Option { |
| return func(o *Controller) { |
| o.clusterID = clusterID |
| } |
| } |
| |
| func WithNetworkIDCb(cb func(endpointIP string, labels labels.Instance) network.ID) Option { |
| return func(o *Controller) { |
| o.networkIDCallback = cb |
| } |
| } |
| |
| // NewController creates a new ServiceEntry discovery service. |
| func NewController(configController model.ConfigStoreController, store model.ConfigStore, xdsUpdater model.XDSUpdater, |
| options ...Option) *Controller { |
| s := newController(store, xdsUpdater, options...) |
| if configController != nil { |
| configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler) |
| configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler) |
| _ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID)) |
| } |
| return s |
| } |
| |
| // NewWorkloadEntryController creates a new WorkloadEntry discovery service. |
| func NewWorkloadEntryController(configController model.ConfigStoreController, store model.ConfigStore, xdsUpdater model.XDSUpdater, |
| options ...Option) *Controller { |
| s := newController(store, xdsUpdater, options...) |
| // Disable service entry processing for workload entry controller. |
| s.processServiceEntry = false |
| for _, o := range options { |
| o(s) |
| } |
| |
| if configController != nil { |
| configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler) |
| _ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID)) |
| } |
| return s |
| } |
| |
| func newController(store model.ConfigStore, xdsUpdater model.XDSUpdater, options ...Option) *Controller { |
| s := &Controller{ |
| XdsUpdater: xdsUpdater, |
| store: store, |
| serviceInstances: serviceInstancesStore{ |
| ip2instance: map[string][]*model.ServiceInstance{}, |
| instances: map[instancesKey]map[configKey][]*model.ServiceInstance{}, |
| instancesBySE: map[types.NamespacedName]map[configKey][]*model.ServiceInstance{}, |
| }, |
| workloadInstances: workloadinstances.NewIndex(), |
| services: serviceStore{ |
| servicesBySE: map[types.NamespacedName][]*model.Service{}, |
| }, |
| edsQueue: queue.NewQueue(time.Second), |
| processServiceEntry: true, |
| } |
| for _, o := range options { |
| o(s) |
| } |
| return s |
| } |
| |
| // convertWorkloadEntry convert wle from Config.Spec and populate the metadata labels into it. |
| func convertWorkloadEntry(cfg config.Config) *networking.WorkloadEntry { |
| wle := cfg.Spec.(*networking.WorkloadEntry) |
| if wle == nil { |
| return nil |
| } |
| |
| labels := make(map[string]string, len(wle.Labels)+len(cfg.Labels)) |
| for k, v := range wle.Labels { |
| labels[k] = v |
| } |
| // we will merge labels from metadata with spec, with precedence to the metadata |
| for k, v := range cfg.Labels { |
| labels[k] = v |
| } |
| // shallow copy |
| copied := &networking.WorkloadEntry{} |
| protomarshal.ShallowCopy(copied, wle) |
| copied.Labels = labels |
| return copied |
| } |
| |
| // workloadEntryHandler defines the handler for workload entries |
| func (s *Controller) workloadEntryHandler(old, curr config.Config, event model.Event) { |
| log.Debugf("Handle event %s for workload entry %s/%s", event, curr.Namespace, curr.Name) |
| var oldWle *networking.WorkloadEntry |
| if old.Spec != nil { |
| oldWle = convertWorkloadEntry(old) |
| } |
| wle := convertWorkloadEntry(curr) |
| curr.Spec = wle |
| key := configKey{ |
| kind: workloadEntryConfigType, |
| name: curr.Name, |
| namespace: curr.Namespace, |
| } |
| |
| // If an entry is unhealthy, we will mark this as a delete instead |
| // This ensures we do not track unhealthy endpoints |
| if features.WorkloadEntryHealthChecks && !isHealthy(curr) { |
| event = model.EventDelete |
| } |
| |
| wi := s.convertWorkloadEntryToWorkloadInstance(curr, s.Cluster()) |
| if wi != nil && !wi.DNSServiceEntryOnly { |
| // fire off the k8s handlers |
| for _, h := range s.workloadHandlers { |
| h(wi, event) |
| } |
| } |
| |
| // includes instances new updated or unchanged, in other word it is the current state. |
| instancesUpdated := []*model.ServiceInstance{} |
| instancesDeleted := []*model.ServiceInstance{} |
| fullPush := false |
| configsUpdated := map[model.ConfigKey]struct{}{} |
| |
| addConfigs := func(se *networking.ServiceEntry, services []*model.Service) { |
| // If serviceentry's resolution is DNS, make a full push |
| // TODO: maybe cds? |
| if se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN { |
| fullPush = true |
| for key, value := range getUpdatedConfigs(services) { |
| configsUpdated[key] = value |
| } |
| } |
| } |
| |
| cfgs, _ := s.store.List(gvk.ServiceEntry, curr.Namespace) |
| currSes := getWorkloadServiceEntries(cfgs, wle) |
| var oldSes map[types.NamespacedName]*config.Config |
| if oldWle != nil { |
| if labels.Instance(oldWle.Labels).Equals(curr.Labels) { |
| oldSes = currSes |
| } else { |
| oldSes = getWorkloadServiceEntries(cfgs, oldWle) |
| } |
| } |
| unSelected := difference(oldSes, currSes) |
| log.Debugf("workloadEntry %s/%s selected %v, unSelected %v serviceEntry", curr.Namespace, curr.Name, currSes, unSelected) |
| s.mutex.Lock() |
| for namespacedName, cfg := range currSes { |
| services := s.services.getServices(namespacedName) |
| se := cfg.Spec.(*networking.ServiceEntry) |
| if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS && |
| se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN { |
| log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts) |
| continue |
| } |
| instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster()) |
| instancesUpdated = append(instancesUpdated, instance...) |
| addConfigs(se, services) |
| } |
| |
| for _, namespacedName := range unSelected { |
| services := s.services.getServices(namespacedName) |
| cfg := oldSes[namespacedName] |
| se := cfg.Spec.(*networking.ServiceEntry) |
| if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS && |
| se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN { |
| log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts) |
| continue |
| } |
| instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster()) |
| instancesDeleted = append(instancesDeleted, instance...) |
| addConfigs(se, services) |
| } |
| |
| s.serviceInstances.deleteInstances(key, instancesDeleted) |
| if event == model.EventDelete { |
| s.workloadInstances.Delete(wi) |
| s.serviceInstances.deleteInstances(key, instancesUpdated) |
| } else { |
| s.workloadInstances.Insert(wi) |
| s.serviceInstances.updateInstances(key, instancesUpdated) |
| } |
| s.mutex.Unlock() |
| |
| allInstances := append(instancesUpdated, instancesDeleted...) |
| if !fullPush { |
| // trigger full xds push to the related sidecar proxy |
| if event == model.EventAdd { |
| s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address) |
| } |
| s.edsUpdate(allInstances) |
| return |
| } |
| |
| // update eds cache only |
| s.edsCacheUpdate(allInstances) |
| |
| pushReq := &model.PushRequest{ |
| Full: true, |
| ConfigsUpdated: configsUpdated, |
| Reason: []model.TriggerReason{model.EndpointUpdate}, |
| } |
| // trigger a full push |
| s.XdsUpdater.ConfigUpdate(pushReq) |
| } |
| |
| // getUpdatedConfigs returns related service entries when full push |
| func getUpdatedConfigs(services []*model.Service) map[model.ConfigKey]struct{} { |
| configsUpdated := map[model.ConfigKey]struct{}{} |
| for _, svc := range services { |
| configsUpdated[model.ConfigKey{ |
| Kind: gvk.ServiceEntry, |
| Name: string(svc.Hostname), |
| Namespace: svc.Attributes.Namespace, |
| }] = struct{}{} |
| } |
| return configsUpdated |
| } |
| |
| // serviceEntryHandler defines the handler for service entries |
| func (s *Controller) serviceEntryHandler(_, curr config.Config, event model.Event) { |
| currentServiceEntry := curr.Spec.(*networking.ServiceEntry) |
| cs := convertServices(curr) |
| configsUpdated := map[model.ConfigKey]struct{}{} |
| key := types.NamespacedName{Namespace: curr.Namespace, Name: curr.Name} |
| |
| s.mutex.Lock() |
| // If it is add/delete event we should always do a full push. If it is update event, we should do full push, |
| // only when services have changed - otherwise, just push endpoint updates. |
| var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service |
| switch event { |
| case model.EventUpdate: |
| addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(s.services.getServices(key), cs) |
| s.services.updateServices(key, cs) |
| case model.EventDelete: |
| deletedSvcs = cs |
| s.services.deleteServices(key) |
| case model.EventAdd: |
| addedSvcs = cs |
| s.services.updateServices(key, cs) |
| default: |
| // this should not happen |
| unchangedSvcs = cs |
| } |
| |
| serviceInstancesByConfig, serviceInstances := s.buildServiceInstances(curr, cs) |
| oldInstances := s.serviceInstances.getServiceEntryInstances(key) |
| for configKey, old := range oldInstances { |
| s.serviceInstances.deleteInstances(configKey, old) |
| } |
| if event == model.EventDelete { |
| s.serviceInstances.deleteAllServiceEntryInstances(key) |
| } else { |
| // Update the indexes with new instances. |
| for ckey, value := range serviceInstancesByConfig { |
| s.serviceInstances.addInstances(ckey, value) |
| } |
| s.serviceInstances.updateServiceEntryInstances(key, serviceInstancesByConfig) |
| } |
| |
| shard := model.ShardKeyFromRegistry(s) |
| |
| for _, svc := range addedSvcs { |
| s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd) |
| configsUpdated[makeConfigKey(svc)] = struct{}{} |
| } |
| |
| for _, svc := range updatedSvcs { |
| s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventUpdate) |
| configsUpdated[makeConfigKey(svc)] = struct{}{} |
| } |
| // If service entry is deleted, call SvcUpdate to cleanup endpoint shards for services. |
| for _, svc := range deletedSvcs { |
| instanceKey := instancesKey{namespace: svc.Attributes.Namespace, hostname: svc.Hostname} |
| // There can be multiple service entries of same host reside in same namespace. |
| // Delete endpoint shards only if there are no service instances. |
| if len(s.serviceInstances.getByKey(instanceKey)) == 0 { |
| s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete) |
| } |
| configsUpdated[makeConfigKey(svc)] = struct{}{} |
| } |
| |
| // If a service is updated and is not part of updatedSvcs, that means its endpoints might have changed. |
| // If this service entry had endpoints with IPs (i.e. resolution STATIC), then we do EDS update. |
| // If the service entry had endpoints with FQDNs (i.e. resolution DNS), then we need to do |
| // full push (as fqdn endpoints go via strict_dns clusters in cds). |
| if len(unchangedSvcs) > 0 { |
| if currentServiceEntry.Resolution == networking.ServiceEntry_DNS || currentServiceEntry.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN { |
| for _, svc := range unchangedSvcs { |
| configsUpdated[makeConfigKey(svc)] = struct{}{} |
| } |
| } |
| } |
| s.mutex.Unlock() |
| |
| fullPush := len(configsUpdated) > 0 |
| // if not full push needed, at least one service unchanged |
| if !fullPush { |
| s.edsUpdate(serviceInstances) |
| return |
| } |
| |
| // When doing a full push, the non DNS added, updated, unchanged services trigger an eds update |
| // so that endpoint shards are updated. |
| allServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs)) |
| nonDNSServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs)) |
| allServices = append(allServices, addedSvcs...) |
| allServices = append(allServices, updatedSvcs...) |
| allServices = append(allServices, unchangedSvcs...) |
| for _, svc := range allServices { |
| if !(svc.Resolution == model.DNSLB || svc.Resolution == model.DNSRoundRobinLB) { |
| nonDNSServices = append(nonDNSServices, svc) |
| } |
| } |
| // non dns service instances |
| keys := map[instancesKey]struct{}{} |
| for _, svc := range nonDNSServices { |
| keys[instancesKey{hostname: svc.Hostname, namespace: curr.Namespace}] = struct{}{} |
| } |
| |
| s.queueEdsEvent(keys, s.doEdsCacheUpdate) |
| |
| pushReq := &model.PushRequest{ |
| Full: true, |
| ConfigsUpdated: configsUpdated, |
| Reason: []model.TriggerReason{model.ServiceUpdate}, |
| } |
| s.XdsUpdater.ConfigUpdate(pushReq) |
| } |
| |
| // WorkloadInstanceHandler defines the handler for service instances generated by other registries |
| func (s *Controller) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) { |
| log.Debugf("Handle event %s for workload instance (%s/%s) in namespace %s", event, |
| wi.Kind, wi.Endpoint.Address, wi.Namespace) |
| key := configKey{ |
| kind: podConfigType, |
| name: wi.Name, |
| namespace: wi.Namespace, |
| } |
| // Used to indicate if this event was fired for a pod->workloadentry conversion |
| // and that the event can be ignored due to no relevant change in the workloadentry |
| redundantEventForPod := false |
| |
| var addressToDelete string |
| s.mutex.Lock() |
| // this is from a pod. Store it in separate map so that |
| // the refreshIndexes function can use these as well as the store ones. |
| switch event { |
| case model.EventDelete: |
| redundantEventForPod = s.workloadInstances.Delete(wi) == nil |
| default: // add or update |
| if old := s.workloadInstances.Insert(wi); old != nil { |
| if old.Endpoint.Address != wi.Endpoint.Address { |
| addressToDelete = old.Endpoint.Address |
| } |
| // If multiple k8s services select the same pod or a service has multiple ports, |
| // we may be getting multiple events ignore them as we only care about the Endpoint IP itself. |
| if model.WorkloadInstancesEqual(old, wi) { |
| // ignore the update as nothing has changed |
| redundantEventForPod = true |
| } |
| } |
| } |
| |
| if redundantEventForPod { |
| s.mutex.Unlock() |
| return |
| } |
| |
| // We will only select entries in the same namespace |
| cfgs, _ := s.store.List(gvk.ServiceEntry, wi.Namespace) |
| if len(cfgs) == 0 { |
| s.mutex.Unlock() |
| return |
| } |
| |
| instances := []*model.ServiceInstance{} |
| instancesDeleted := []*model.ServiceInstance{} |
| for _, cfg := range cfgs { |
| se := cfg.Spec.(*networking.ServiceEntry) |
| if se.WorkloadSelector == nil || !labels.Instance(se.WorkloadSelector.Labels).SubsetOf(wi.Endpoint.Labels) { |
| // Not a match, skip this one |
| continue |
| } |
| seNamespacedName := types.NamespacedName{Namespace: cfg.Namespace, Name: cfg.Name} |
| services := s.services.getServices(seNamespacedName) |
| instance := convertWorkloadInstanceToServiceInstance(wi.Endpoint, services, se) |
| instances = append(instances, instance...) |
| if addressToDelete != "" { |
| for _, i := range instance { |
| di := i.DeepCopy() |
| di.Endpoint.Address = addressToDelete |
| instancesDeleted = append(instancesDeleted, di) |
| } |
| s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key) |
| } else if event == model.EventDelete { |
| s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key) |
| } else { |
| s.serviceInstances.updateServiceEntryInstancesPerConfig(seNamespacedName, key, instance) |
| } |
| } |
| if len(instancesDeleted) > 0 { |
| s.serviceInstances.deleteInstances(key, instancesDeleted) |
| } |
| |
| if event == model.EventDelete { |
| s.serviceInstances.deleteInstances(key, instances) |
| } else { |
| s.serviceInstances.updateInstances(key, instances) |
| } |
| s.mutex.Unlock() |
| |
| s.edsUpdate(instances) |
| } |
| |
| func (s *Controller) Provider() provider.ID { |
| return provider.External |
| } |
| |
| func (s *Controller) Cluster() cluster.ID { |
| return s.clusterID |
| } |
| |
| // AppendServiceHandler adds service resource event handler. Service Entries does not use these handlers. |
| func (s *Controller) AppendServiceHandler(_ func(*model.Service, model.Event)) {} |
| |
| // AppendWorkloadHandler adds instance event handler. Service Entries does not use these handlers. |
| func (s *Controller) AppendWorkloadHandler(h func(*model.WorkloadInstance, model.Event)) { |
| s.workloadHandlers = append(s.workloadHandlers, h) |
| } |
| |
| // Run is used by some controllers to execute background jobs after init is done. |
| func (s *Controller) Run(stopCh <-chan struct{}) { |
| s.edsQueue.Run(stopCh) |
| } |
| |
| // HasSynced always returns true for SE |
| func (s *Controller) HasSynced() bool { |
| return true |
| } |
| |
| // Services list declarations of all services in the system |
| func (s *Controller) Services() []*model.Service { |
| s.mutex.Lock() |
| allServices := s.services.getAllServices() |
| out := make([]*model.Service, 0, len(allServices)) |
| if s.services.allocateNeeded { |
| autoAllocateIPs(allServices) |
| s.services.allocateNeeded = false |
| } |
| s.mutex.Unlock() |
| for _, svc := range allServices { |
| // shallow copy, copy `AutoAllocatedIPv4Address` and `AutoAllocatedIPv6Address` |
| // if return the pointer directly, there will be a race with `BuildNameTable` |
| // nolint: govet |
| shallowSvc := *svc |
| out = append(out, &shallowSvc) |
| } |
| return out |
| } |
| |
| // GetService retrieves a service by host name if it exists. |
| // NOTE: The service entry implementation is used only for tests. |
| func (s *Controller) GetService(hostname host.Name) *model.Service { |
| if !s.processServiceEntry { |
| return nil |
| } |
| // TODO(@hzxuzhonghu): only get the specific service instead of converting all the serviceEntries |
| services := s.Services() |
| for _, service := range services { |
| if service.Hostname == hostname { |
| return service |
| } |
| } |
| |
| return nil |
| } |
| |
| // InstancesByPort retrieves instances for a service on the given ports with labels that |
| // match any of the supplied labels. All instances match an empty tag list. |
| func (s *Controller) InstancesByPort(svc *model.Service, port int, labels labels.Instance) []*model.ServiceInstance { |
| out := make([]*model.ServiceInstance, 0) |
| s.mutex.RLock() |
| instanceLists := s.serviceInstances.getByKey(instancesKey{svc.Hostname, svc.Attributes.Namespace}) |
| s.mutex.RUnlock() |
| for _, instance := range instanceLists { |
| if labels.SubsetOf(instance.Endpoint.Labels) && |
| portMatchSingle(instance, port) { |
| out = append(out, instance) |
| } |
| } |
| |
| return out |
| } |
| |
| // ResyncEDS will do a full EDS update. This is needed for some tests where we have many configs loaded without calling |
| // the config handlers. |
| // This should probably not be used in production code. |
| func (s *Controller) ResyncEDS() { |
| s.mutex.RLock() |
| allInstances := s.serviceInstances.getAll() |
| s.mutex.RUnlock() |
| s.edsUpdate(allInstances) |
| } |
| |
| // edsUpdate triggers an EDS push serially such that we can prevent all instances |
| // got at t1 can accidentally override that got at t2 if multiple threads are |
| // running this function. Queueing ensures latest updated wins. |
| func (s *Controller) edsUpdate(instances []*model.ServiceInstance) { |
| // Find all keys we need to lookup |
| keys := map[instancesKey]struct{}{} |
| for _, i := range instances { |
| keys[makeInstanceKey(i)] = struct{}{} |
| } |
| s.queueEdsEvent(keys, s.doEdsUpdate) |
| } |
| |
| // edsCacheUpdate upates eds cache serially such that we can prevent allinstances |
| // got at t1 can accidentally override that got at t2 if multiple threads are |
| // running this function. Queueing ensures latest updated wins. |
| func (s *Controller) edsCacheUpdate(instances []*model.ServiceInstance) { |
| // Find all keys we need to lookup |
| keys := map[instancesKey]struct{}{} |
| for _, i := range instances { |
| keys[makeInstanceKey(i)] = struct{}{} |
| } |
| s.queueEdsEvent(keys, s.doEdsCacheUpdate) |
| } |
| |
| // queueEdsEvent processes eds events sequentially for the passed keys and invokes the passed function. |
| func (s *Controller) queueEdsEvent(keys map[instancesKey]struct{}, edsFn func(keys map[instancesKey]struct{})) { |
| // wait for the cache update finished |
| waitCh := make(chan struct{}) |
| // trigger update eds endpoint shards |
| s.edsQueue.Push(func() error { |
| defer close(waitCh) |
| edsFn(keys) |
| return nil |
| }) |
| select { |
| case <-waitCh: |
| return |
| // To prevent goroutine leak in tests |
| // in case the queue is stopped but the task has not been executed.. |
| case <-s.edsQueue.Closed(): |
| return |
| } |
| } |
| |
| // doEdsCacheUpdate invokes XdsUpdater's EDSCacheUpdate to update endpoint shards. |
| func (s *Controller) doEdsCacheUpdate(keys map[instancesKey]struct{}) { |
| endpoints := s.buildEndpoints(keys) |
| shard := model.ShardKeyFromRegistry(s) |
| // This is delete. |
| if len(endpoints) == 0 { |
| for k := range keys { |
| s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, nil) |
| } |
| } else { |
| for k, eps := range endpoints { |
| s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, eps) |
| } |
| } |
| } |
| |
| // doEdsUpdate invokes XdsUpdater's eds update to trigger eds push. |
| func (s *Controller) doEdsUpdate(keys map[instancesKey]struct{}) { |
| endpoints := s.buildEndpoints(keys) |
| shard := model.ShardKeyFromRegistry(s) |
| // This is delete. |
| if len(endpoints) == 0 { |
| for k := range keys { |
| s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, nil) |
| } |
| } else { |
| for k, eps := range endpoints { |
| s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, eps) |
| } |
| } |
| } |
| |
| // buildEndpoints builds endpoints for the instance keys. |
| func (s *Controller) buildEndpoints(keys map[instancesKey]struct{}) map[instancesKey][]*model.IstioEndpoint { |
| var endpoints map[instancesKey][]*model.IstioEndpoint |
| allInstances := []*model.ServiceInstance{} |
| s.mutex.RLock() |
| for key := range keys { |
| i := s.serviceInstances.getByKey(key) |
| allInstances = append(allInstances, i...) |
| } |
| s.mutex.RUnlock() |
| |
| if len(allInstances) > 0 { |
| endpoints = make(map[instancesKey][]*model.IstioEndpoint) |
| for _, instance := range allInstances { |
| port := instance.ServicePort |
| key := makeInstanceKey(instance) |
| endpoints[key] = append(endpoints[key], |
| &model.IstioEndpoint{ |
| Address: instance.Endpoint.Address, |
| EndpointPort: instance.Endpoint.EndpointPort, |
| ServicePortName: port.Name, |
| Labels: instance.Endpoint.Labels, |
| ServiceAccount: instance.Endpoint.ServiceAccount, |
| Network: instance.Endpoint.Network, |
| Locality: instance.Endpoint.Locality, |
| LbWeight: instance.Endpoint.LbWeight, |
| TLSMode: instance.Endpoint.TLSMode, |
| WorkloadName: instance.Endpoint.WorkloadName, |
| Namespace: instance.Endpoint.Namespace, |
| }) |
| } |
| |
| } |
| return endpoints |
| } |
| |
| // returns true if an instance's port matches with any in the provided list |
| func portMatchSingle(instance *model.ServiceInstance, port int) bool { |
| return port == 0 || port == instance.ServicePort.Port |
| } |
| |
| // GetProxyServiceInstances lists service instances co-located with a given proxy |
| // NOTE: The service objects in these instances do not have the auto allocated IP set. |
| func (s *Controller) GetProxyServiceInstances(node *model.Proxy) []*model.ServiceInstance { |
| out := make([]*model.ServiceInstance, 0) |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| for _, ip := range node.IPAddresses { |
| instances := s.serviceInstances.getByIP(ip) |
| for _, i := range instances { |
| // Insert all instances for this IP for services within the same namespace This ensures we |
| // match Kubernetes logic where Services do not cross namespace boundaries and avoids |
| // possibility of other namespaces inserting service instances into namespaces they do not |
| // control. |
| if node.Metadata.Namespace == "" || i.Service.Attributes.Namespace == node.Metadata.Namespace { |
| out = append(out, i) |
| } |
| } |
| } |
| return out |
| } |
| |
| func (s *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance { |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| for _, ip := range proxy.IPAddresses { |
| instances := s.serviceInstances.getByIP(ip) |
| for _, instance := range instances { |
| return instance.Endpoint.Labels |
| } |
| } |
| return nil |
| } |
| |
| // GetIstioServiceAccounts implements model.ServiceAccounts operation |
| // For service entries using workload entries or mix of workload entries and pods, |
| // this function returns the appropriate service accounts used by these. |
| func (s *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string { |
| // service entries with built in endpoints have SANs as a dedicated field. |
| // Those with selector labels will have service accounts embedded inside workloadEntries and pods as well. |
| return model.GetServiceAccounts(svc, ports, s) |
| } |
| |
| func (s *Controller) NetworkGateways() []model.NetworkGateway { |
| // TODO implement mesh networks loading logic from kube controller if needed |
| return nil |
| } |
| |
| func (s *Controller) MCSServices() []model.MCSServiceInfo { |
| return nil |
| } |
| |
| func servicesDiff(os []*model.Service, ns []*model.Service) ([]*model.Service, []*model.Service, []*model.Service, []*model.Service) { |
| var added, deleted, updated, unchanged []*model.Service |
| |
| oldServiceHosts := make(map[host.Name]*model.Service, len(os)) |
| newServiceHosts := make(map[host.Name]*model.Service, len(ns)) |
| for _, s := range os { |
| oldServiceHosts[s.Hostname] = s |
| } |
| for _, s := range ns { |
| newServiceHosts[s.Hostname] = s |
| } |
| |
| for _, s := range os { |
| newSvc, f := newServiceHosts[s.Hostname] |
| if !f { |
| deleted = append(deleted, s) |
| } else if !reflect.DeepEqual(s, newSvc) { |
| updated = append(updated, newSvc) |
| } else { |
| unchanged = append(unchanged, newSvc) |
| } |
| } |
| |
| for _, s := range ns { |
| if _, f := oldServiceHosts[s.Hostname]; !f { |
| added = append(added, s) |
| } |
| } |
| |
| return added, deleted, updated, unchanged |
| } |
| |
| // Automatically allocates IPs for service entry services WITHOUT an |
| // address field if the hostname is not a wildcard, or when resolution |
| // is not NONE. The IPs are allocated from the reserved Class E subnet |
| // (240.240.0.0/16) that is not reachable outside the pod or reserved |
| // Benchmarking IP range (2001:2::/48) in RFC5180. When DNS |
| // capture is enabled, Envoy will resolve the DNS to these IPs. The |
| // listeners for TCP services will also be set up on these IPs. The |
| // IPs allocated to a service entry may differ from istiod to istiod |
| // but it does not matter because these IPs only affect the listener |
| // IPs on a given proxy managed by a given istiod. |
| // |
| // NOTE: If DNS capture is not enabled by the proxy, the automatically |
| // allocated IP addresses do not take effect. |
| // |
| // The current algorithm to allocate IPs is deterministic across all istiods. |
| // At stable state, given two istiods with exact same set of services, there should |
| // be no change in XDS as the algorithm is just a dumb iterative one that allocates sequentially. |
| // |
| // TODO: Rather than sequentially allocate IPs, switch to a hash based allocation mechanism so that |
| // deletion of the oldest service entry does not cause change of IPs for all other service entries. |
| // Currently, the sequential allocation will result in unnecessary XDS reloads (lds/rds) when a |
| // service entry with auto allocated IP is deleted. We are trading off a perf problem (xds reload) |
| // for a usability problem (e.g., multiple cloud SQL or AWS RDS tcp services with no VIPs end up having |
| // the same port, causing traffic to go to the wrong place). Once we move to a deterministic hash-based |
| // allocation with deterministic collision resolution, the perf problem will go away. If the collision guarantee |
| // cannot be made within the IP address space we have (which is about 64K services), then we may need to |
| // have the sequential allocation algorithm as a fallback when too many collisions take place. |
| func autoAllocateIPs(services []*model.Service) []*model.Service { |
| // i is everything from 240.240.0.(j) to 240.240.255.(j) |
| // j is everything from 240.240.(i).1 to 240.240.(i).254 |
| // we can capture this in one integer variable. |
| // given X, we can compute i by X/255, and j is X%255 |
| // To avoid allocating 240.240.(i).255, if X % 255 is 0, increment X. |
| // For example, when X=510, the resulting IP would be 240.240.2.0 (invalid) |
| // So we bump X to 511, so that the resulting IP is 240.240.2.1 |
| maxIPs := 255 * 255 // are we going to exceed this limit by processing 64K services? |
| x := 0 |
| for _, svc := range services { |
| // we can allocate IPs only if |
| // 1. the service has resolution set to static/dns. We cannot allocate |
| // for NONE because we will not know the original DST IP that the application requested. |
| // 2. the address is not set (0.0.0.0) |
| // 3. the hostname is not a wildcard |
| if svc.DefaultAddress == constants.UnspecifiedIP && !svc.Hostname.IsWildCarded() && |
| svc.Resolution != model.Passthrough { |
| x++ |
| if x%255 == 0 { |
| x++ |
| } |
| if x >= maxIPs { |
| log.Errorf("out of IPs to allocate for service entries") |
| return services |
| } |
| thirdOctet := x / 255 |
| fourthOctet := x % 255 |
| |
| svc.AutoAllocatedIPv4Address = fmt.Sprintf("240.240.%d.%d", thirdOctet, fourthOctet) |
| // if the service of service entry has IPv6 address, then allocate the IPv4-Mapped IPv6 Address for it |
| if thirdOctet == 0 { |
| svc.AutoAllocatedIPv6Address = fmt.Sprintf("2001:2::f0f0:%x", fourthOctet) |
| } else { |
| svc.AutoAllocatedIPv6Address = fmt.Sprintf("2001:2::f0f0:%x%x", thirdOctet, fourthOctet) |
| } |
| } |
| } |
| return services |
| } |
| |
| func makeConfigKey(svc *model.Service) model.ConfigKey { |
| return model.ConfigKey{ |
| Kind: gvk.ServiceEntry, |
| Name: string(svc.Hostname), |
| Namespace: svc.Attributes.Namespace, |
| } |
| } |
| |
| // isHealthy checks that the provided WorkloadEntry is healthy. If health checks are not enabled, |
| // it is assumed to always be healthy |
| func isHealthy(cfg config.Config) bool { |
| if parseHealthAnnotation(cfg.Annotations[status.WorkloadEntryHealthCheckAnnotation]) { |
| // We default to false if the condition is not set. This ensures newly created WorkloadEntries |
| // are treated as unhealthy until we prove they are healthy by probe success. |
| return status.GetBoolConditionFromSpec(cfg, status.ConditionHealthy, false) |
| } |
| // If health check is not enabled, assume its healthy |
| return true |
| } |
| |
| func parseHealthAnnotation(s string) bool { |
| if s == "" { |
| return false |
| } |
| p, err := strconv.ParseBool(s) |
| if err != nil { |
| return false |
| } |
| return p |
| } |
| |
| func (s *Controller) buildServiceInstances( |
| curr config.Config, |
| services []*model.Service, |
| ) (map[configKey][]*model.ServiceInstance, []*model.ServiceInstance) { |
| currentServiceEntry := curr.Spec.(*networking.ServiceEntry) |
| var serviceInstances []*model.ServiceInstance |
| serviceInstancesByConfig := map[configKey][]*model.ServiceInstance{} |
| // for service entry with labels |
| if currentServiceEntry.WorkloadSelector != nil { |
| selector := workloadinstances.ByServiceSelector(curr.Namespace, currentServiceEntry.WorkloadSelector.Labels) |
| workloadInstances := workloadinstances.FindAllInIndex(s.workloadInstances, selector) |
| for _, wi := range workloadInstances { |
| if wi.DNSServiceEntryOnly && currentServiceEntry.Resolution != networking.ServiceEntry_DNS && |
| currentServiceEntry.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN { |
| log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, |
| currentServiceEntry.Hosts) |
| continue |
| } |
| instances := convertWorkloadInstanceToServiceInstance(wi.Endpoint, services, currentServiceEntry) |
| serviceInstances = append(serviceInstances, instances...) |
| ckey := configKey{namespace: wi.Namespace, name: wi.Name} |
| if wi.Kind == model.PodKind { |
| ckey.kind = podConfigType |
| } else { |
| ckey.kind = workloadEntryConfigType |
| } |
| serviceInstancesByConfig[ckey] = instances |
| } |
| } else { |
| serviceInstances = s.convertServiceEntryToInstances(curr, services) |
| ckey := configKey{ |
| kind: serviceEntryConfigType, |
| name: curr.Name, |
| namespace: curr.Namespace, |
| } |
| serviceInstancesByConfig[ckey] = serviceInstances |
| } |
| |
| return serviceInstancesByConfig, serviceInstances |
| } |