blob: 0930b7567fa4505ca2f40884563b8acbd791a572 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregate
import (
"sort"
"sync"
)
import (
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/spiffe"
)
// The aggregate controller does not implement serviceregistry.Instance since it may be comprised of various
// providers and clusters.
var (
_ model.ServiceDiscovery = &Controller{}
_ model.AggregateController = &Controller{}
)
// Controller aggregates data across different registries and monitors for changes
type Controller struct {
meshHolder mesh.Holder
// The lock is used to protect the registries and controller's running status.
storeLock sync.RWMutex
registries []*registryEntry
// indicates whether the controller has run.
// if true, all the registries added later should be run manually.
running bool
handlers model.ControllerHandlers
handlersByCluster map[cluster.ID]*model.ControllerHandlers
model.NetworkGatewaysHandler
}
type registryEntry struct {
serviceregistry.Instance
// stop if not nil is the per-registry stop chan. If null, the server stop chan should be used to Run the registry.
stop <-chan struct{}
}
type Options struct {
MeshHolder mesh.Holder
}
// NewController creates a new Aggregate controller
func NewController(opt Options) *Controller {
return &Controller{
registries: make([]*registryEntry, 0),
meshHolder: opt.MeshHolder,
running: false,
handlersByCluster: map[cluster.ID]*model.ControllerHandlers{},
}
}
func (c *Controller) addRegistry(registry serviceregistry.Instance, stop <-chan struct{}) {
c.registries = append(c.registries, &registryEntry{Instance: registry, stop: stop})
// Observe the registry for events.
registry.AppendNetworkGatewayHandler(c.NotifyGatewayHandlers)
registry.AppendServiceHandler(c.handlers.NotifyServiceHandlers)
registry.AppendServiceHandler(func(service *model.Service, event model.Event) {
for _, handlers := range c.getClusterHandlers() {
handlers.NotifyServiceHandlers(service, event)
}
})
}
func (c *Controller) getClusterHandlers() []*model.ControllerHandlers {
c.storeLock.Lock()
defer c.storeLock.Unlock()
out := make([]*model.ControllerHandlers, 0, len(c.handlersByCluster))
for _, handlers := range c.handlersByCluster {
out = append(out, handlers)
}
return out
}
// AddRegistry adds registries into the aggregated controller.
// If the aggregated controller is already Running, the given registry will never be started.
func (c *Controller) AddRegistry(registry serviceregistry.Instance) {
c.storeLock.Lock()
defer c.storeLock.Unlock()
c.addRegistry(registry, nil)
}
// AddRegistryAndRun adds registries into the aggregated controller and makes sure it is Run.
// If the aggregated controller is running, the given registry is Run immediately.
// Otherwise, the given registry is Run when the aggregate controller is Run, using the given stop.
func (c *Controller) AddRegistryAndRun(registry serviceregistry.Instance, stop <-chan struct{}) {
if stop == nil {
log.Warnf("nil stop channel passed to AddRegistryAndRun for registry %s/%s", registry.Provider(), registry.Cluster())
}
c.storeLock.Lock()
defer c.storeLock.Unlock()
c.addRegistry(registry, stop)
if c.running {
go registry.Run(stop)
}
}
// DeleteRegistry deletes specified registry from the aggregated controller
func (c *Controller) DeleteRegistry(clusterID cluster.ID, providerID provider.ID) {
c.storeLock.Lock()
defer c.storeLock.Unlock()
if len(c.registries) == 0 {
log.Warnf("Registry list is empty, nothing to delete")
return
}
index, ok := c.getRegistryIndex(clusterID, providerID)
if !ok {
log.Warnf("Registry %s/%s is not found in the registries list, nothing to delete", providerID, clusterID)
return
}
c.registries[index] = nil
c.registries = append(c.registries[:index], c.registries[index+1:]...)
log.Infof("%s registry for the cluster %s has been deleted.", providerID, clusterID)
}
// GetRegistries returns a copy of all registries
func (c *Controller) GetRegistries() []serviceregistry.Instance {
c.storeLock.RLock()
defer c.storeLock.RUnlock()
// copy registries to prevent race, no need to deep copy here.
out := make([]serviceregistry.Instance, len(c.registries))
for i := range c.registries {
out[i] = c.registries[i]
}
return out
}
func (c *Controller) getRegistryIndex(clusterID cluster.ID, provider provider.ID) (int, bool) {
for i, r := range c.registries {
if r.Cluster().Equals(clusterID) && r.Provider() == provider {
return i, true
}
}
return 0, false
}
// Services lists services from all platforms
func (c *Controller) Services() []*model.Service {
// smap is a map of hostname (string) to service index, used to identify services that
// are installed in multiple clusters.
smap := make(map[host.Name]int)
index := 0
services := make([]*model.Service, 0)
// Locking Registries list while walking it to prevent inconsistent results
for _, r := range c.GetRegistries() {
svcs := r.Services()
if r.Provider() != provider.Kubernetes {
index += len(svcs)
services = append(services, svcs...)
} else {
for _, s := range svcs {
previous, ok := smap[s.Hostname]
if !ok {
// First time we see a service. The result will have a single service per hostname
// The first cluster will be listed first, so the services in the primary cluster
// will be used for default settings. If a service appears in multiple clusters,
// the order is less clear.
smap[s.Hostname] = index
index++
services = append(services, s)
} else {
// We must deepcopy before merge, and after merging, the ClusterVips length will be >= 2.
// This is an optimization to prevent deepcopy multi-times
if len(services[previous].ClusterVIPs.GetAddresses()) < 2 {
// Deep copy before merging, otherwise there is a case
// a service in remote cluster can be deleted, but the ClusterIP left.
services[previous] = services[previous].DeepCopy()
}
// If it is seen second time, that means it is from a different cluster, update cluster VIPs.
mergeService(services[previous], s, r)
}
}
}
}
return services
}
// GetService retrieves a service by hostname if exists
func (c *Controller) GetService(hostname host.Name) *model.Service {
var out *model.Service
for _, r := range c.GetRegistries() {
service := r.GetService(hostname)
if service == nil {
continue
}
if r.Provider() != provider.Kubernetes {
return service
}
if out == nil {
out = service.DeepCopy()
} else {
// If we are seeing the service for the second time, it means it is available in multiple clusters.
mergeService(out, service, r)
}
}
return out
}
func mergeService(dst, src *model.Service, srcRegistry serviceregistry.Instance) {
// Prefer the k8s HostVIPs where possible
clusterID := srcRegistry.Cluster()
if srcRegistry.Provider() == provider.Kubernetes || len(dst.ClusterVIPs.GetAddressesFor(clusterID)) == 0 {
newAddresses := src.ClusterVIPs.GetAddressesFor(clusterID)
dst.ClusterVIPs.SetAddressesFor(clusterID, newAddresses)
}
}
// NetworkGateways merges the service-based cross-network gateways from each registry.
func (c *Controller) NetworkGateways() []model.NetworkGateway {
var gws []model.NetworkGateway
for _, r := range c.GetRegistries() {
gws = append(gws, r.NetworkGateways()...)
}
return gws
}
func (c *Controller) MCSServices() []model.MCSServiceInfo {
var out []model.MCSServiceInfo
for _, r := range c.GetRegistries() {
out = append(out, r.MCSServices()...)
}
return out
}
// InstancesByPort retrieves instances for a service on a given port that match
// any of the supplied labels. All instances match an empty label list.
func (c *Controller) InstancesByPort(svc *model.Service, port int, labels labels.Instance) []*model.ServiceInstance {
var instances []*model.ServiceInstance
for _, r := range c.GetRegistries() {
instances = append(instances, r.InstancesByPort(svc, port, labels)...)
}
return instances
}
func nodeClusterID(node *model.Proxy) cluster.ID {
if node.Metadata == nil || node.Metadata.ClusterID == "" {
return ""
}
return node.Metadata.ClusterID
}
// Skip the service registry when there won't be a match
// because the proxy is in a different cluster.
func skipSearchingRegistryForProxy(nodeClusterID cluster.ID, r serviceregistry.Instance) bool {
// Always search non-kube (usually serviceentry) registry.
// Check every registry if cluster ID isn't specified.
if r.Provider() != provider.Kubernetes || nodeClusterID == "" {
return false
}
return !r.Cluster().Equals(nodeClusterID)
}
// GetProxyServiceInstances lists service instances co-located with a given proxy
func (c *Controller) GetProxyServiceInstances(node *model.Proxy) []*model.ServiceInstance {
out := make([]*model.ServiceInstance, 0)
nodeClusterID := nodeClusterID(node)
for _, r := range c.GetRegistries() {
if skipSearchingRegistryForProxy(nodeClusterID, r) {
log.Debugf("GetProxyServiceInstances(): not searching registry %v: proxy %v CLUSTER_ID is %v",
r.Cluster(), node.ID, nodeClusterID)
continue
}
instances := r.GetProxyServiceInstances(node)
if len(instances) > 0 {
out = append(out, instances...)
}
}
return out
}
func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
clusterID := nodeClusterID(proxy)
for _, r := range c.GetRegistries() {
// If proxy clusterID unset, we may find incorrect workload label.
// This can not happen in k8s env.
if clusterID == "" {
lbls := r.GetProxyWorkloadLabels(proxy)
if lbls != nil {
return lbls
}
} else if clusterID == r.Cluster() {
// find proxy in the specified cluster
lbls := r.GetProxyWorkloadLabels(proxy)
if lbls != nil {
return lbls
}
}
}
return nil
}
// Run starts all the controllers
func (c *Controller) Run(stop <-chan struct{}) {
c.storeLock.Lock()
for _, r := range c.registries {
// prefer the per-registry stop channel
registryStop := stop
if s := r.stop; s != nil {
registryStop = s
}
go r.Run(registryStop)
}
c.running = true
c.storeLock.Unlock()
<-stop
log.Info("Registry Aggregator terminated")
}
// HasSynced returns true when all registries have synced
func (c *Controller) HasSynced() bool {
for _, r := range c.GetRegistries() {
if !r.HasSynced() {
log.Debugf("registry %s is syncing", r.Cluster())
return false
}
}
return true
}
func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) {
c.handlers.AppendServiceHandler(f)
}
func (c *Controller) AppendWorkloadHandler(f func(*model.WorkloadInstance, model.Event)) {
// Currently, it is not used.
// Note: take care when you want to enable it, it will register the handlers to all registries
// c.handlers.AppendWorkloadHandler(f)
}
func (c *Controller) AppendServiceHandlerForCluster(id cluster.ID, f func(*model.Service, model.Event)) {
c.storeLock.Lock()
defer c.storeLock.Unlock()
handler, ok := c.handlersByCluster[id]
if !ok {
c.handlersByCluster[id] = &model.ControllerHandlers{}
handler = c.handlersByCluster[id]
}
handler.AppendServiceHandler(f)
}
func (c *Controller) AppendWorkloadHandlerForCluster(id cluster.ID, f func(*model.WorkloadInstance, model.Event)) {
c.storeLock.Lock()
defer c.storeLock.Unlock()
handler, ok := c.handlersByCluster[id]
if !ok {
c.handlersByCluster[id] = &model.ControllerHandlers{}
handler = c.handlersByCluster[id]
}
handler.AppendWorkloadHandler(f)
}
func (c *Controller) UnRegisterHandlersForCluster(id cluster.ID) {
c.storeLock.Lock()
defer c.storeLock.Unlock()
delete(c.handlersByCluster, id)
}
// GetIstioServiceAccounts implements model.ServiceAccounts operation.
// The returned list contains all SPIFFE based identities that backs the service.
// This method also expand the results from different registries based on the mesh config trust domain aliases.
// To retain such trust domain expansion behavior, the xDS server implementation should wrap any (even if single)
// service registry by this aggreated one.
// For example,
// - { "spiffe://cluster.local/bar@iam.gserviceaccount.com"}; when annotation is used on corresponding workloads.
// - { "spiffe://cluster.local/ns/default/sa/foo" }; normal kubernetes cases
// - { "spiffe://cluster.local/ns/default/sa/foo", "spiffe://trust-domain-alias/ns/default/sa/foo" };
// if the trust domain alias is configured.
func (c *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string {
out := map[string]struct{}{}
for _, r := range c.GetRegistries() {
svcAccounts := r.GetIstioServiceAccounts(svc, ports)
for _, sa := range svcAccounts {
out[sa] = struct{}{}
}
}
result := make([]string, 0, len(out))
for k := range out {
result = append(result, k)
}
tds := make([]string, 0)
if c.meshHolder != nil {
m := c.meshHolder.Mesh()
if m != nil {
tds = m.TrustDomainAliases
}
}
expanded := spiffe.ExpandWithTrustDomains(result, tds)
result = make([]string, 0, len(expanded))
for k := range expanded {
result = append(result, k)
}
// Sort to make the return result deterministic.
sort.Strings(result)
return result
}