blob: 77887013eab1290c8adcf0b3c79db94a051099c8 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregate
import (
"fmt"
"reflect"
"testing"
"time"
)
import (
"github.com/google/go-cmp/cmp"
"go.uber.org/atomic"
meshconfig "istio.io/api/mesh/v1alpha1"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/mock"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
)
type mockMeshConfigHolder struct {
trustDomainAliases []string
}
func (mh mockMeshConfigHolder) Mesh() *meshconfig.MeshConfig {
return &meshconfig.MeshConfig{
TrustDomainAliases: mh.trustDomainAliases,
}
}
func buildMockController() *Controller {
discovery1 := memory.NewServiceDiscovery(mock.ReplicatedFooServiceV1.DeepCopy(),
mock.HelloService.DeepCopy(),
mock.ExtHTTPService.DeepCopy(),
)
for _, port := range mock.HelloService.Ports {
discovery1.AddInstance(mock.HelloService.Hostname, mock.MakeServiceInstance(mock.HelloService, port, 0, model.Locality{}))
discovery1.AddInstance(mock.HelloService.Hostname, mock.MakeServiceInstance(mock.HelloService, port, 1, model.Locality{}))
}
discovery2 := memory.NewServiceDiscovery(mock.ReplicatedFooServiceV2.DeepCopy(),
mock.WorldService.DeepCopy(),
mock.ExtHTTPSService.DeepCopy(),
)
for _, port := range mock.WorldService.Ports {
discovery2.AddInstance(mock.WorldService.Hostname, mock.MakeServiceInstance(mock.WorldService, port, 0, model.Locality{}))
discovery2.AddInstance(mock.WorldService.Hostname, mock.MakeServiceInstance(mock.WorldService, port, 1, model.Locality{}))
}
registry1 := serviceregistry.Simple{
ProviderID: provider.ID("mockAdapter1"),
ServiceDiscovery: discovery1,
Controller: &mock.Controller{},
}
registry2 := serviceregistry.Simple{
ProviderID: provider.ID("mockAdapter2"),
ServiceDiscovery: discovery2,
Controller: &mock.Controller{},
}
ctls := NewController(Options{&mockMeshConfigHolder{}})
ctls.AddRegistry(registry1)
ctls.AddRegistry(registry2)
return ctls
}
// return aggregator and cluster1 and cluster2 service discovery
func buildMockControllerForMultiCluster() (*Controller, *memory.ServiceDiscovery, *memory.ServiceDiscovery) {
discovery1 := memory.NewServiceDiscovery(mock.HelloService)
discovery2 := memory.NewServiceDiscovery(mock.MakeService(mock.ServiceArgs{
Hostname: mock.HelloService.Hostname,
Address: "10.1.2.0",
ServiceAccounts: []string{},
ClusterID: "cluster-2",
}), mock.WorldService)
registry1 := serviceregistry.Simple{
ProviderID: provider.Kubernetes,
ClusterID: "cluster-1",
ServiceDiscovery: discovery1,
Controller: &mock.Controller{},
}
registry2 := serviceregistry.Simple{
ProviderID: provider.Kubernetes,
ClusterID: "cluster-2",
ServiceDiscovery: discovery2,
Controller: &mock.Controller{},
}
ctls := NewController(Options{})
ctls.AddRegistry(registry1)
ctls.AddRegistry(registry2)
return ctls, discovery1, discovery2
}
func TestServicesForMultiCluster(t *testing.T) {
originalHelloService := mock.HelloService.DeepCopy()
aggregateCtl, _, registry2 := buildMockControllerForMultiCluster()
// List Services from aggregate controller
services := aggregateCtl.Services()
// Set up ground truth hostname values
hosts := map[host.Name]bool{
mock.HelloService.Hostname: false,
mock.WorldService.Hostname: false,
}
count := 0
// Compare return value to ground truth
for _, svc := range services {
if counted, existed := hosts[svc.Hostname]; existed && !counted {
count++
hosts[svc.Hostname] = true
}
}
if count != len(hosts) {
t.Fatalf("Cluster local service map expected size %d, actual %v vs %v", count, hosts, services)
}
// Now verify ClusterVIPs for each service
ClusterVIPs := map[host.Name]map[cluster.ID][]string{
mock.HelloService.Hostname: {
"cluster-1": []string{"10.1.0.0"},
"cluster-2": []string{"10.1.2.0"},
},
mock.WorldService.Hostname: {
"cluster-2": []string{"10.2.0.0"},
},
}
for _, svc := range services {
if !reflect.DeepEqual(svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname]) {
t.Fatalf("Service %s ClusterVIPs actual %v, expected %v", svc.Hostname,
svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname])
}
}
registry2.RemoveService(mock.HelloService.Hostname)
// List Services from aggregate controller
services = aggregateCtl.Services()
// Now verify ClusterVIPs for each service
ClusterVIPs = map[host.Name]map[cluster.ID][]string{
mock.HelloService.Hostname: {
"cluster-1": []string{"10.1.0.0"},
},
mock.WorldService.Hostname: {
"cluster-2": []string{"10.2.0.0"},
},
}
for _, svc := range services {
if !reflect.DeepEqual(svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname]) {
t.Fatalf("Service %s ClusterVIPs actual %v, expected %v", svc.Hostname,
svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname])
}
}
// check HelloService is not mutated
if !reflect.DeepEqual(originalHelloService, mock.HelloService) {
t.Errorf("Original hello service is mutated")
}
}
func TestServices(t *testing.T) {
aggregateCtl := buildMockController()
// List Services from aggregate controller
services := aggregateCtl.Services()
// Set up ground truth hostname values
serviceMap := map[host.Name]bool{
mock.HelloService.Hostname: false,
mock.ExtHTTPService.Hostname: false,
mock.WorldService.Hostname: false,
mock.ExtHTTPSService.Hostname: false,
}
svcCount := 0
// Compare return value to ground truth
for _, svc := range services {
if counted, existed := serviceMap[svc.Hostname]; existed && !counted {
svcCount++
serviceMap[svc.Hostname] = true
}
}
if svcCount != len(serviceMap) {
t.Fatal("Return services does not match ground truth")
}
}
func TestGetService(t *testing.T) {
aggregateCtl := buildMockController()
// Get service from mockAdapter1
svc := aggregateCtl.GetService(mock.HelloService.Hostname)
if svc == nil {
t.Fatal("Fail to get service")
}
if svc.Hostname != mock.HelloService.Hostname {
t.Fatal("Returned service is incorrect")
}
// Get service from mockAdapter2
svc = aggregateCtl.GetService(mock.WorldService.Hostname)
if svc == nil {
t.Fatal("Fail to get service")
}
if svc.Hostname != mock.WorldService.Hostname {
t.Fatal("Returned service is incorrect")
}
}
func TestGetProxyServiceInstances(t *testing.T) {
aggregateCtl := buildMockController()
// Get Instances from mockAdapter1
instances := aggregateCtl.GetProxyServiceInstances(&model.Proxy{IPAddresses: []string{mock.HelloInstanceV0}})
if len(instances) != 6 {
t.Fatalf("Returned GetProxyServiceInstances' amount %d is not correct", len(instances))
}
for _, inst := range instances {
if inst.Service.Hostname != mock.HelloService.Hostname {
t.Fatal("Returned Instance is incorrect")
}
}
// Get Instances from mockAdapter2
instances = aggregateCtl.GetProxyServiceInstances(&model.Proxy{IPAddresses: []string{mock.MakeIP(mock.WorldService, 1)}})
if len(instances) != 6 {
t.Fatalf("Returned GetProxyServiceInstances' amount %d is not correct", len(instances))
}
for _, inst := range instances {
if inst.Service.Hostname != mock.WorldService.Hostname {
t.Fatal("Returned Instance is incorrect")
}
}
}
func TestGetProxyWorkloadLabels(t *testing.T) {
// If no registries return workload labels, we must return nil, rather than an empty list.
// This ensures callers can distinguish between no labels, and labels not found.
aggregateCtl := buildMockController()
instances := aggregateCtl.GetProxyWorkloadLabels(&model.Proxy{IPAddresses: []string{mock.HelloInstanceV0}})
if instances != nil {
t.Fatalf("expected nil workload labels, got: %v", instances)
}
}
func TestInstances(t *testing.T) {
aggregateCtl := buildMockController()
// Get Instances from mockAdapter1
instances := aggregateCtl.InstancesByPort(mock.HelloService, 80, nil)
if len(instances) != 2 {
t.Fatal("Returned wrong number of instances from controller")
}
for _, instance := range instances {
if instance.Service.Hostname != mock.HelloService.Hostname {
t.Fatal("Returned instance's hostname does not match desired value")
}
if _, ok := instance.Service.Ports.Get(mock.PortHTTPName); !ok {
t.Fatal("Returned instance does not contain desired port")
}
}
// Get Instances from mockAdapter2
instances = aggregateCtl.InstancesByPort(mock.WorldService, 80, nil)
if len(instances) != 2 {
t.Fatal("Returned wrong number of instances from controller")
}
for _, instance := range instances {
if instance.Service.Hostname != mock.WorldService.Hostname {
t.Fatal("Returned instance's hostname does not match desired value")
}
if _, ok := instance.Service.Ports.Get(mock.PortHTTPName); !ok {
t.Fatal("Returned instance does not contain desired port")
}
}
}
func TestGetIstioServiceAccounts(t *testing.T) {
aggregateCtl := buildMockController()
testCases := []struct {
name string
svc *model.Service
trustDomainAliases []string
want []string
}{
{
name: "HelloEmpty",
svc: mock.HelloService,
want: []string{},
},
{
name: "World",
svc: mock.WorldService,
want: []string{
"spiffe://cluster.local/ns/default/sa/world1",
"spiffe://cluster.local/ns/default/sa/world2",
},
},
{
name: "ReplicatedFoo",
svc: mock.ReplicatedFooServiceV1,
want: []string{
"spiffe://cluster.local/ns/default/sa/foo-share",
"spiffe://cluster.local/ns/default/sa/foo1",
"spiffe://cluster.local/ns/default/sa/foo2",
},
},
{
name: "ExpansionByTrustDomainAliases",
trustDomainAliases: []string{"cluster.local", "example.com"},
svc: mock.WorldService,
want: []string{
"spiffe://cluster.local/ns/default/sa/world1",
"spiffe://cluster.local/ns/default/sa/world2",
"spiffe://example.com/ns/default/sa/world1",
"spiffe://example.com/ns/default/sa/world2",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
aggregateCtl.meshHolder = &mockMeshConfigHolder{trustDomainAliases: tc.trustDomainAliases}
accounts := aggregateCtl.GetIstioServiceAccounts(tc.svc, []int{})
if diff := cmp.Diff(accounts, tc.want); diff != "" {
t.Errorf("unexpected service account, diff %v, %v", diff, accounts)
}
})
}
}
func TestAddRegistry(t *testing.T) {
registries := []serviceregistry.Simple{
{
ProviderID: "registry1",
ClusterID: "cluster1",
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
},
{
ProviderID: "registry2",
ClusterID: "cluster2",
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
},
}
ctrl := NewController(Options{})
registry1Counter := atomic.NewInt32(0)
registry2Counter := atomic.NewInt32(0)
for _, r := range registries {
clusterID := r.Cluster()
counter := registry1Counter
if clusterID == "cluster2" {
counter = registry2Counter
}
ctrl.AppendServiceHandlerForCluster(clusterID, func(service *model.Service, event model.Event) {
counter.Add(1)
})
ctrl.AddRegistry(r)
}
if l := len(ctrl.registries); l != 2 {
t.Fatalf("Expected length of the registries slice should be 2, got %d", l)
}
registries[0].Controller.(*mock.Controller).OnServiceEvent(mock.HelloService, model.EventAdd)
registries[1].Controller.(*mock.Controller).OnServiceEvent(mock.WorldService, model.EventAdd)
ctrl.DeleteRegistry(registries[1].Cluster(), registries[1].Provider())
ctrl.UnRegisterHandlersForCluster(registries[1].Cluster())
registries[0].Controller.(*mock.Controller).OnServiceEvent(mock.HelloService, model.EventAdd)
if registry1Counter.Load() != 3 {
t.Errorf("cluster1 expected 3 event, but got %d", registry1Counter.Load())
}
if registry2Counter.Load() != 2 {
t.Errorf("cluster2 expected 2 event, but got %d", registry2Counter.Load())
}
}
func TestGetDeleteRegistry(t *testing.T) {
registries := []serviceregistry.Simple{
{
ProviderID: "registry1",
ClusterID: "cluster1",
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
},
{
ProviderID: "registry2",
ClusterID: "cluster2",
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
},
{
ProviderID: "registry3",
ClusterID: "cluster3",
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
},
}
wrapRegistry := func(r serviceregistry.Instance) serviceregistry.Instance {
return &registryEntry{Instance: r}
}
ctrl := NewController(Options{})
for _, r := range registries {
ctrl.AddRegistry(r)
}
// Test Get
result := ctrl.GetRegistries()
if l := len(result); l != 3 {
t.Fatalf("Expected length of the registries slice should be 3, got %d", l)
}
// Test Delete cluster2
ctrl.DeleteRegistry(registries[1].ClusterID, registries[1].ProviderID)
result = ctrl.GetRegistries()
if l := len(result); l != 2 {
t.Fatalf("Expected length of the registries slice should be 2, got %d", l)
}
// check left registries are orders as before
if !reflect.DeepEqual(result[0], wrapRegistry(registries[0])) || !reflect.DeepEqual(result[1], wrapRegistry(registries[2])) {
t.Fatalf("Expected registries order has been changed")
}
}
func TestSkipSearchingRegistryForProxy(t *testing.T) {
cluster1 := serviceregistry.Simple{
ClusterID: "cluster-1",
ProviderID: provider.Kubernetes,
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
}
cluster2 := serviceregistry.Simple{
ClusterID: "cluster-2",
ProviderID: provider.Kubernetes,
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
}
// external registries may eventually be associated with a cluster
external := serviceregistry.Simple{
ClusterID: "cluster-1",
ProviderID: provider.External,
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
}
cases := []struct {
nodeClusterID cluster.ID
registry serviceregistry.Instance
want bool
}{
// matching kube registry
{"cluster-1", cluster1, false},
// unmatching kube registry
{"cluster-1", cluster2, true},
// always search external
{"cluster-1", external, false},
{"cluster-2", external, false},
{"", external, false},
// always search for empty node cluster id
{"", cluster1, false},
{"", cluster2, false},
{"", external, false},
}
for i, c := range cases {
got := skipSearchingRegistryForProxy(c.nodeClusterID, c.registry)
if got != c.want {
t.Errorf("%s: got %v want %v",
fmt.Sprintf("[%v] registry=%v node=%v", i, c.registry, c.nodeClusterID),
got, c.want)
}
}
}
func runnableRegistry(name string) *RunnableRegistry {
return &RunnableRegistry{
Instance: serviceregistry.Simple{
ClusterID: cluster.ID(name), ProviderID: "test",
Controller: &mock.Controller{},
ServiceDiscovery: memory.NewServiceDiscovery(),
},
running: atomic.NewBool(false),
}
}
type RunnableRegistry struct {
serviceregistry.Instance
running *atomic.Bool
}
func (rr *RunnableRegistry) Run(stop <-chan struct{}) {
if rr.running.Load() {
panic("--- registry has been run twice ---")
}
rr.running.Store(true)
<-stop
}
func expectRunningOrFail(t *testing.T, ctrl *Controller, want bool) {
// running gets flipped in a goroutine, retry to avoid race
retry.UntilSuccessOrFail(t, func() error {
for _, registry := range ctrl.registries {
if running := registry.Instance.(*RunnableRegistry).running.Load(); running != want {
return fmt.Errorf("%s running is %v but wanted %v", registry.Cluster(), running, want)
}
}
return nil
}, retry.Timeout(50*time.Millisecond), retry.Delay(0))
}
func TestDeferredRun(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
ctrl := NewController(Options{})
t.Run("AddRegistry before aggregate Run does not run", func(t *testing.T) {
ctrl.AddRegistry(runnableRegistry("earlyAdd"))
ctrl.AddRegistryAndRun(runnableRegistry("earlyAddAndRun"), nil)
expectRunningOrFail(t, ctrl, false)
})
t.Run("aggregate Run starts all registries", func(t *testing.T) {
go ctrl.Run(stop)
expectRunningOrFail(t, ctrl, true)
ctrl.DeleteRegistry("earlyAdd", "test")
ctrl.DeleteRegistry("earlyAddAndRun", "test")
})
t.Run("AddRegistry after aggregate Run does not start registry", func(t *testing.T) {
ctrl.AddRegistry(runnableRegistry("missed"))
expectRunningOrFail(t, ctrl, false)
ctrl.DeleteRegistry("missed", "test")
expectRunningOrFail(t, ctrl, true)
})
t.Run("AddRegistryAndRun after aggregate Run starts registry", func(t *testing.T) {
ctrl.AddRegistryAndRun(runnableRegistry("late"), nil)
expectRunningOrFail(t, ctrl, true)
})
}