blob: fa12bc3d35a60e1870546d056bbe4d2bef1e31c5 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"fmt"
"sync"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
)
// ServiceController is a mock service controller
type ServiceController struct {
svcHandlers []func(*model.Service, model.Event)
sync.RWMutex
}
var _ model.Controller = &ServiceController{}
// Memory does not support workload handlers; everything is done in terms of instances
func (c *ServiceController) AppendWorkloadHandler(func(*model.WorkloadInstance, model.Event)) {}
// AppendServiceHandler appends a service handler to the controller
func (c *ServiceController) AppendServiceHandler(f func(*model.Service, model.Event)) {
c.Lock()
c.svcHandlers = append(c.svcHandlers, f)
c.Unlock()
}
// Run will run the controller
func (c *ServiceController) Run(<-chan struct{}) {}
// HasSynced always returns true
func (c *ServiceController) HasSynced() bool { return true }
// ServiceDiscovery is a mock discovery interface
type ServiceDiscovery struct {
services map[host.Name]*model.Service
networkGateways []model.NetworkGateway
model.NetworkGatewaysHandler
// EndpointShards table. Key is the fqdn of the service, ':', port
instancesByPortNum map[string][]*model.ServiceInstance
instancesByPortName map[string][]*model.ServiceInstance
// Used by GetProxyServiceInstance, used to configure inbound (list of services per IP)
// We generally expect a single instance - conflicting services need to be reported.
ip2instance map[string][]*model.ServiceInstance
WantGetProxyServiceInstances []*model.ServiceInstance
InstancesError error
Controller model.Controller
ClusterID cluster.ID
// Used by GetProxyWorkloadLabels
ip2workloadLabels map[string]labels.Instance
// XDSUpdater will push EDS changes to the ADS model.
EDSUpdater model.XDSUpdater
// Single mutex for now - it's for debug only.
mutex sync.Mutex
}
var _ model.ServiceDiscovery = &ServiceDiscovery{}
// NewServiceDiscovery builds an in-memory ServiceDiscovery
func NewServiceDiscovery(services ...*model.Service) *ServiceDiscovery {
svcs := map[host.Name]*model.Service{}
for _, svc := range services {
svcs[svc.Hostname] = svc
}
return &ServiceDiscovery{
services: svcs,
Controller: &ServiceController{},
instancesByPortNum: map[string][]*model.ServiceInstance{},
instancesByPortName: map[string][]*model.ServiceInstance{},
ip2instance: map[string][]*model.ServiceInstance{},
ip2workloadLabels: map[string]labels.Instance{},
}
}
func (sd *ServiceDiscovery) shardKey() model.ShardKey {
return model.ShardKey{Cluster: sd.ClusterID, Provider: provider.Mock}
}
func (sd *ServiceDiscovery) AddWorkload(ip string, labels labels.Instance) {
sd.ip2workloadLabels[ip] = labels
}
// AddHTTPService is a helper to add a service of type http, named 'http-main', with the
// specified vip and port.
func (sd *ServiceDiscovery) AddHTTPService(name, vip string, port int) {
sd.AddService(&model.Service{
Hostname: host.Name(name),
DefaultAddress: vip,
Ports: model.PortList{
{
Name: "http-main",
Port: port,
Protocol: protocol.HTTP,
},
},
})
}
// AddService adds an in-memory service.
func (sd *ServiceDiscovery) AddService(svc *model.Service) {
sd.mutex.Lock()
svc.Attributes.ServiceRegistry = provider.Mock
sd.services[svc.Hostname] = svc
sd.mutex.Unlock()
// TODO: notify listeners
}
// RemoveService removes an in-memory service.
func (sd *ServiceDiscovery) RemoveService(name host.Name) {
sd.mutex.Lock()
delete(sd.services, name)
sd.mutex.Unlock()
if sd.EDSUpdater != nil {
sd.EDSUpdater.SvcUpdate(sd.shardKey(), string(name), "", model.EventDelete)
}
}
// AddInstance adds an in-memory instance.
func (sd *ServiceDiscovery) AddInstance(service host.Name, instance *model.ServiceInstance) {
// WIP: add enough code to allow tests and load tests to work
sd.mutex.Lock()
defer sd.mutex.Unlock()
svc := sd.services[service]
if svc == nil {
return
}
instance.Service = svc
sd.ip2instance[instance.Endpoint.Address] = append(sd.ip2instance[instance.Endpoint.Address], instance)
key := fmt.Sprintf("%s:%d", service, instance.ServicePort.Port)
instanceList := sd.instancesByPortNum[key]
sd.instancesByPortNum[key] = append(instanceList, instance)
key = fmt.Sprintf("%s:%s", service, instance.ServicePort.Name)
instanceList = sd.instancesByPortName[key]
sd.instancesByPortName[key] = append(instanceList, instance)
}
// AddEndpoint adds an endpoint to a service.
func (sd *ServiceDiscovery) AddEndpoint(service host.Name, servicePortName string, servicePort int, address string, port int) *model.ServiceInstance {
instance := &model.ServiceInstance{
Endpoint: &model.IstioEndpoint{
Address: address,
ServicePortName: servicePortName,
EndpointPort: uint32(port),
},
ServicePort: &model.Port{
Name: servicePortName,
Port: servicePort,
Protocol: protocol.HTTP,
},
}
sd.AddInstance(service, instance)
return instance
}
// SetEndpoints update the list of endpoints for a service, similar with K8S controller.
func (sd *ServiceDiscovery) SetEndpoints(service string, namespace string, endpoints []*model.IstioEndpoint) {
sh := host.Name(service)
sd.mutex.Lock()
svc := sd.services[sh]
if svc == nil {
sd.mutex.Unlock()
return
}
// remove old entries
for k, v := range sd.ip2instance {
if len(v) > 0 && v[0].Service.Hostname == sh {
delete(sd.ip2instance, k)
}
}
for k, v := range sd.instancesByPortNum {
if len(v) > 0 && v[0].Service.Hostname == sh {
delete(sd.instancesByPortNum, k)
}
}
for k, v := range sd.instancesByPortName {
if len(v) > 0 && v[0].Service.Hostname == sh {
delete(sd.instancesByPortName, k)
}
}
for _, e := range endpoints {
// servicePortName string, servicePort int, address string, port int
p, _ := svc.Ports.Get(e.ServicePortName)
instance := &model.ServiceInstance{
Service: svc,
ServicePort: &model.Port{
Name: e.ServicePortName,
Port: p.Port,
Protocol: p.Protocol,
},
Endpoint: e,
}
sd.ip2instance[instance.Endpoint.Address] = []*model.ServiceInstance{instance}
key := fmt.Sprintf("%s:%d", service, instance.ServicePort.Port)
instanceList := sd.instancesByPortNum[key]
sd.instancesByPortNum[key] = append(instanceList, instance)
key = fmt.Sprintf("%s:%s", service, instance.ServicePort.Name)
instanceList = sd.instancesByPortName[key]
sd.instancesByPortName[key] = append(instanceList, instance)
}
sd.mutex.Unlock()
sd.EDSUpdater.EDSUpdate(sd.shardKey(), service, namespace, endpoints)
}
// Services implements discovery interface
// Each call to Services() should return a list of new *model.Service
func (sd *ServiceDiscovery) Services() []*model.Service {
sd.mutex.Lock()
defer sd.mutex.Unlock()
out := make([]*model.Service, 0, len(sd.services))
for _, service := range sd.services {
out = append(out, service)
}
return out
}
// GetService implements discovery interface
// Each call to GetService() should return a new *model.Service
func (sd *ServiceDiscovery) GetService(hostname host.Name) *model.Service {
sd.mutex.Lock()
defer sd.mutex.Unlock()
return sd.services[hostname]
}
// InstancesByPort filters the service instances by labels. This assumes single port, as is
// used by EDS/ADS.
func (sd *ServiceDiscovery) InstancesByPort(svc *model.Service, port int, labels labels.Instance) []*model.ServiceInstance {
sd.mutex.Lock()
defer sd.mutex.Unlock()
if sd.InstancesError != nil {
return nil
}
key := fmt.Sprintf("%s:%d", string(svc.Hostname), port)
instances, ok := sd.instancesByPortNum[key]
if !ok {
return nil
}
return instances
}
// GetProxyServiceInstances returns service instances associated with a node, resulting in
// 'in' services.
func (sd *ServiceDiscovery) GetProxyServiceInstances(node *model.Proxy) []*model.ServiceInstance {
sd.mutex.Lock()
defer sd.mutex.Unlock()
if sd.WantGetProxyServiceInstances != nil {
return sd.WantGetProxyServiceInstances
}
out := make([]*model.ServiceInstance, 0)
for _, ip := range node.IPAddresses {
si, found := sd.ip2instance[ip]
if found {
out = append(out, si...)
}
}
return out
}
func (sd *ServiceDiscovery) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
sd.mutex.Lock()
defer sd.mutex.Unlock()
for _, ip := range proxy.IPAddresses {
if l, found := sd.ip2workloadLabels[ip]; found {
return l
}
}
return nil
}
// GetIstioServiceAccounts gets the Istio service accounts for a service hostname.
func (sd *ServiceDiscovery) GetIstioServiceAccounts(svc *model.Service, _ []int) []string {
sd.mutex.Lock()
defer sd.mutex.Unlock()
for h, s := range sd.services {
if h == svc.Hostname {
return s.ServiceAccounts
}
}
return make([]string, 0)
}
func (sd *ServiceDiscovery) AddGateways(gws ...model.NetworkGateway) {
sd.networkGateways = append(sd.networkGateways, gws...)
sd.NotifyGatewayHandlers()
}
func (sd *ServiceDiscovery) NetworkGateways() []model.NetworkGateway {
return sd.networkGateways
}
func (sd *ServiceDiscovery) MCSServices() []model.MCSServiceInfo {
return nil
}